7 #include <netinet/in.h>
10 #include <sys/socket.h>
11 #include <sys/types.h>
28 const std::string &fqrn) :
31 influx_extra_fields_(
""), influx_extra_tags_(
""),
40 "No value given for CVMFS_INFLUX_HOST");
45 if (options_mgr->
GetValue(
"CVMFS_INFLUX_PORT", &opt)) {
51 "Invalid value for CVMFS_INFLUX_PORT [%s]", opt.c_str());
64 if (!options_mgr->
GetValue(
"CVMFS_INFLUX_EXTRA_FIELDS",
72 "Send to [%s:%d] metric [%s]. Extra tags"
73 "[%s]. Extra fields [%s].",
83 "Not enabling influx metrics. Error while open socket. %d", ret);
88 "Not enabling influx metrics. Not all mandatory variables set: "
89 "CVMFS_INFLUX_METRIC_NAME, CVMFS_INFLUX_HOST, CVMFS_INFLUX_PORT");
124 bool add_token =
false;
125 for (std::map<std::string, int64_t>::iterator it
127 if (it->second != 0) {
178 bool add_token =
false;
179 for (std::map<std::string, int64_t>::iterator it
181 int64_t value = it->second;
186 }
catch(
const std::out_of_range& ex) {
192 ret += it->first +
"=" +
StringifyInt(value - old_value);
208 struct addrinfo hints;
211 memset(&hints, 0,
sizeof(hints));
212 hints.ai_family = AF_INET;
213 hints.ai_socktype = SOCK_DGRAM;
216 if (err != 0 ||
res_ == NULL) {
218 "Failed to resolve influx server [%s]. errno=%d",
226 "Failed to open socket");
235 const std::string &payload) {
236 struct sockaddr_in *dest_addr = NULL;
237 dest_addr =
reinterpret_cast<sockaddr_in*
>(
res_->ai_addr);
244 reinterpret_cast<struct sockaddr*
>(dest_addr),
245 sizeof(
struct sockaddr_in));
247 if (num_bytes_sent < 0) {
249 "Failed to send to influx. errno=%d", errno);
251 }
else if (static_cast<size_t>(num_bytes_sent) != payload.size()) {
253 "Incomplete send. Bytes transferred: %zd. Bytes expected %lu",
254 num_bytes_sent, payload.size());
266 std::string delta_payload =
"";
269 payload = payload +
"\n" + delta_payload;
TelemetryReturn OpenSocket()
std::string StringifyUint(const uint64_t value)
std::string influx_metric_name_
std::string influx_extra_fields_
virtual ~TelemetryAggregatorInflux()
std::string influx_extra_tags_
int64_t String2Int64(const string &value)
TelemetryAggregatorInflux(Statistics *statistics, int send_rate_sec, OptionsManager *options_mgr, MountPoint *mount_point, const std::string &fqrn)
std::string MakePayload()
std::map< std::string, int64_t > old_counters_
std::string MakeDeltaPayload()
std::map< std::string, int64_t > counters_
string StringifyInt(const int64_t value)
bool GetValue(const std::string &key, std::string *value) const
virtual void PushMetrics()
TelemetryReturn SendToInflux(const std::string &payload)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)