7 #include <netinet/in.h>
10 #include <sys/socket.h>
11 #include <sys/types.h>
27 const std::string &fqrn) :
29 influx_extra_fields_(
""), influx_extra_tags_(
""),
38 "No value given for CVMFS_INFLUX_HOST");
43 if (options_mgr->
GetValue(
"CVMFS_INFLUX_PORT", &opt)) {
49 "Invalid value for CVMFS_INFLUX_PORT [%s]", opt.c_str());
62 if (!options_mgr->
GetValue(
"CVMFS_INFLUX_EXTRA_FIELDS",
70 "Send to [%s:%d] metric [%s]. Extra tags"
71 "[%s]. Extra fields [%s].",
81 "Not enabling influx metrics. Error while open socket. %d", ret);
86 "Not enabling influx metrics. Not all mandatory variables set: "
87 "CVMFS_INFLUX_METRIC_NAME, CVMFS_INFLUX_HOST, CVMFS_INFLUX_PORT");
122 bool add_token =
false;
123 for (std::map<std::string, int64_t>::iterator it
125 if (it->second != 0) {
176 bool add_token =
false;
177 for (std::map<std::string, int64_t>::iterator it
179 int64_t value = it->second;
185 ret += it->first +
"=" +
StringifyInt(value - old_value);
201 struct addrinfo hints;
204 memset(&hints, 0,
sizeof(hints));
205 hints.ai_family = AF_INET;
206 hints.ai_socktype = SOCK_DGRAM;
209 if (err != 0 ||
res_ == NULL) {
211 "Failed to resolve influx server [%s]. errno=%d",
219 "Failed to open socket");
228 const std::string &payload) {
229 struct sockaddr_in *dest_addr = NULL;
230 dest_addr =
reinterpret_cast<sockaddr_in*
>(
res_->ai_addr);
237 reinterpret_cast<struct sockaddr*
>(dest_addr),
238 sizeof(
struct sockaddr_in));
240 if (num_bytes_sent < 0) {
242 "Failed to send to influx. errno=%d", errno);
244 }
else if (static_cast<size_t>(num_bytes_sent) != payload.size()) {
246 "Incomplete send. Bytes transferred: %zd. Bytes expected %lu",
247 num_bytes_sent, payload.size());
259 std::string delta_payload =
"";
262 payload = payload +
"\n" + delta_payload;
TelemetryReturn OpenSocket()
std::string StringifyUint(const uint64_t value)
std::string influx_metric_name_
std::string influx_extra_fields_
virtual ~TelemetryAggregatorInflux()
std::string influx_extra_tags_
int64_t String2Int64(const string &value)
std::string MakePayload()
std::map< std::string, int64_t > old_counters_
std::string MakeDeltaPayload()
std::map< std::string, int64_t > counters_
string StringifyInt(const int64_t value)
bool GetValue(const std::string &key, std::string *value) const
virtual void PushMetrics()
TelemetryReturn SendToInflux(const std::string &payload)
TelemetryAggregatorInflux(Statistics *statistics, int send_rate_sec, OptionsManager *options_mgr, const std::string &fqrn)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)