7 #include <netinet/in.h>
10 #include <sys/socket.h>
11 #include <sys/types.h>
28 const std::string &fqrn) :
31 influx_extra_fields_(
""), influx_extra_tags_(
""),
40 "No value given for CVMFS_INFLUX_HOST");
45 if (options_mgr->
GetValue(
"CVMFS_INFLUX_PORT", &opt)) {
51 "Invalid value for CVMFS_INFLUX_PORT [%s]", opt.c_str());
64 if (!options_mgr->
GetValue(
"CVMFS_INFLUX_EXTRA_FIELDS",
72 "Send to [%s:%d] metric [%s]. Extra tags"
73 "[%s]. Extra fields [%s].",
83 "Not enabling influx metrics. Error while open socket. %d", ret);
88 "Not enabling influx metrics. Not all mandatory variables set: "
89 "CVMFS_INFLUX_METRIC_NAME, CVMFS_INFLUX_HOST, CVMFS_INFLUX_PORT");
124 bool add_token =
false;
125 for (std::map<std::string, int64_t>::iterator it
127 if (it->second != 0) {
178 bool add_token =
false;
179 for (std::map<std::string, int64_t>::iterator it
181 int64_t value = it->second;
187 ret += it->first +
"=" +
StringifyInt(value - old_value);
203 struct addrinfo hints;
206 memset(&hints, 0,
sizeof(hints));
207 hints.ai_family = AF_INET;
208 hints.ai_socktype = SOCK_DGRAM;
211 if (err != 0 ||
res_ == NULL) {
213 "Failed to resolve influx server [%s]. errno=%d",
221 "Failed to open socket");
230 const std::string &payload) {
231 struct sockaddr_in *dest_addr = NULL;
232 dest_addr =
reinterpret_cast<sockaddr_in*
>(
res_->ai_addr);
239 reinterpret_cast<struct sockaddr*
>(dest_addr),
240 sizeof(
struct sockaddr_in));
242 if (num_bytes_sent < 0) {
244 "Failed to send to influx. errno=%d", errno);
246 }
else if (static_cast<size_t>(num_bytes_sent) != payload.size()) {
248 "Incomplete send. Bytes transferred: %zd. Bytes expected %lu",
249 num_bytes_sent, payload.size());
261 std::string delta_payload =
"";
264 payload = payload +
"\n" + delta_payload;
TelemetryReturn OpenSocket()
std::string StringifyUint(const uint64_t value)
std::string influx_metric_name_
std::string influx_extra_fields_
virtual ~TelemetryAggregatorInflux()
std::string influx_extra_tags_
int64_t String2Int64(const string &value)
TelemetryAggregatorInflux(Statistics *statistics, int send_rate_sec, OptionsManager *options_mgr, MountPoint *mount_point, const std::string &fqrn)
std::string MakePayload()
std::map< std::string, int64_t > old_counters_
std::string MakeDeltaPayload()
std::map< std::string, int64_t > counters_
string StringifyInt(const int64_t value)
bool GetValue(const std::string &key, std::string *value) const
virtual void PushMetrics()
TelemetryReturn SendToInflux(const std::string &payload)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)