7 #include <netinet/in.h>
10 #include <sys/socket.h>
11 #include <sys/types.h>
29 const std::string &fqrn)
31 , influx_extra_fields_(
"")
32 , influx_extra_tags_(
"")
42 "No value given for CVMFS_INFLUX_HOST");
47 if (options_mgr->
GetValue(
"CVMFS_INFLUX_PORT", &opt)) {
53 "Invalid value for CVMFS_INFLUX_PORT [%s]", opt.c_str());
65 if (!options_mgr->
GetValue(
"CVMFS_INFLUX_EXTRA_FIELDS",
73 "Enabling influx metrics. "
74 "Send to [%s:%d] metric [%s]. Extra tags"
75 "[%s]. Extra fields [%s].",
82 "Not enabling influx metrics. Error while open socket. %d", ret);
87 "Not enabling influx metrics. Not all mandatory variables set: "
88 "CVMFS_INFLUX_METRIC_NAME, CVMFS_INFLUX_HOST, CVMFS_INFLUX_PORT");
126 bool add_token =
false;
127 for (std::map<std::string, int64_t>::iterator it =
counters_.begin(),
130 if (it->second != 0) {
184 bool add_token =
false;
185 for (std::map<std::string, int64_t>::iterator it =
counters_.begin(),
188 int64_t value = it->second;
193 }
catch (
const std::out_of_range &ex) {
199 ret += it->first +
"=" +
StringifyInt(value - old_value);
215 struct addrinfo hints;
218 memset(&hints, 0,
sizeof(hints));
219 hints.ai_family = AF_INET;
220 hints.ai_socktype = SOCK_DGRAM;
223 if (err != 0 ||
res_ == NULL) {
225 "Failed to resolve influx server [%s]. errno=%d", hostname, errno);
240 const std::string &payload) {
241 struct sockaddr_in *dest_addr = NULL;
242 dest_addr =
reinterpret_cast<sockaddr_in *
>(
res_->ai_addr);
245 ssize_t num_bytes_sent = sendto(
250 reinterpret_cast<struct sockaddr *
>(dest_addr),
251 sizeof(
struct sockaddr_in));
253 if (num_bytes_sent < 0) {
255 "Failed to send to influx. errno=%d", errno);
257 }
else if (static_cast<size_t>(num_bytes_sent) != payload.size()) {
259 "Incomplete send. Bytes transferred: %zd. Bytes expected %lu",
260 num_bytes_sent, payload.size());
272 std::string delta_payload =
"";
275 payload = payload +
"\n" + delta_payload;
TelemetryReturn OpenSocket()
std::string StringifyUint(const uint64_t value)
std::string influx_metric_name_
std::string influx_extra_fields_
virtual ~TelemetryAggregatorInflux()
std::string influx_extra_tags_
int64_t String2Int64(const string &value)
TelemetryAggregatorInflux(Statistics *statistics, int send_rate_sec, OptionsManager *options_mgr, MountPoint *mount_point, const std::string &fqrn)
std::string MakePayload()
std::map< std::string, int64_t > old_counters_
std::string MakeDeltaPayload()
std::map< std::string, int64_t > counters_
string StringifyInt(const int64_t value)
bool GetValue(const std::string &key, std::string *value) const
virtual void PushMetrics()
TelemetryReturn SendToInflux(const std::string &payload)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)