CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
telemetry_aggregator_influx.cc
Go to the documentation of this file.
1 
6 
7 #include <netinet/in.h>
8 #include <pthread.h>
9 #include <stdint.h>
10 #include <sys/socket.h>
11 #include <sys/types.h>
12 
13 #include <cstring>
14 #include <map>
15 #include <string>
16 #include <vector>
17 
18 #include "util/logging.h"
19 #include "util/posix.h"
20 #include "util/string.h"
21 
22 namespace perf {
23 
25  int send_rate_sec,
26  OptionsManager *options_mgr,
27  const std::string &fqrn) :
28  TelemetryAggregator(statistics, send_rate_sec, fqrn),
29  influx_extra_fields_(""), influx_extra_tags_(""),
30  socket_fd_(-1), res_(NULL) {
31  int params = 0;
32 
33  if (options_mgr->GetValue("CVMFS_INFLUX_HOST", &influx_host_)) {
34  if (influx_host_.size() > 1) {
35  params++;
36  } else {
38  "No value given for CVMFS_INFLUX_HOST");
39  }
40  }
41 
42  std::string opt;
43  if (options_mgr->GetValue("CVMFS_INFLUX_PORT", &opt)) {
44  influx_port_ = static_cast<int>(String2Int64(opt.c_str()));
45  if (influx_port_ > 0 && influx_port_ < 65536) {
46  params++;
47  } else {
49  "Invalid value for CVMFS_INFLUX_PORT [%s]", opt.c_str());
50  }
51  }
52 
53  if (options_mgr->GetValue("CVMFS_INFLUX_METRIC_NAME", &influx_metric_name_))
54  {
55  params++;
56  }
57 
58  if (!options_mgr->GetValue("CVMFS_INFLUX_EXTRA_TAGS", &influx_extra_tags_)) {
59  influx_extra_tags_ = "";
60  }
61 
62  if (!options_mgr->GetValue("CVMFS_INFLUX_EXTRA_FIELDS",
65  }
66 
67  if (params == 3) {
68  is_zombie_ = false;
69  LogCvmfs(kLogTelemetry, kLogDebug, "Enabling influx metrics. "
70  "Send to [%s:%d] metric [%s]. Extra tags"
71  "[%s]. Extra fields [%s].",
72  influx_host_.c_str(),
74  influx_metric_name_.c_str(),
75  influx_extra_tags_.c_str(),
76  influx_extra_fields_.c_str());
78  if (ret != kTelemetrySuccess) {
79  is_zombie_ = true;
81  "Not enabling influx metrics. Error while open socket. %d", ret);
82  }
83  } else {
84  is_zombie_ = true;
86  "Not enabling influx metrics. Not all mandatory variables set: "
87  "CVMFS_INFLUX_METRIC_NAME, CVMFS_INFLUX_HOST, CVMFS_INFLUX_PORT");
88  }
89 }
90 
92  if (socket_fd_ >= 0) {
93  close(socket_fd_);
94  }
95  if (res_) {
96  freeaddrinfo(res_);
97  }
98 }
99 
113  // measurement and tags
114  std::string ret = influx_metric_name_ + "_absolute,repo=" + fqrn_;
115 
116  if (influx_extra_tags_ != "") {
117  ret += "," + influx_extra_tags_;
118  }
119 
120  // fields
121  ret += " ";
122  bool add_token = false;
123  for (std::map<std::string, int64_t>::iterator it
124  = counters_.begin(), iEnd = counters_.end(); it != iEnd; it++) {
125  if (it->second != 0) {
126  if (add_token) {
127  ret += ",";
128  }
129  ret += it->first + "=" + StringifyInt(it->second);
130  add_token = true;
131  }
132  }
133  if (influx_extra_fields_ != "") {
134  if (add_token) {
135  ret += ",";
136  }
137  ret += influx_extra_fields_;
138  add_token = true;
139  }
140 
141  // timestamp
142  if (add_token) {
143  ret += " ";
144  }
145  ret += StringifyUint(timestamp_);
146 
147  return ret;
148 }
149 
167  // measurement and tags
168  std::string ret = "" + influx_metric_name_ + "_delta,repo=" + fqrn_;
169 
170  if (influx_extra_tags_ != "") {
171  ret += "," + influx_extra_tags_;
172  }
173 
174  // fields
175  ret += " ";
176  bool add_token = false;
177  for (std::map<std::string, int64_t>::iterator it
178  = counters_.begin(), iEnd = counters_.end(); it != iEnd; it++) {
179  int64_t value = it->second;
180  int64_t old_value = old_counters_.at(it->first);
181  if (value != 0) {
182  if (add_token) {
183  ret += ",";
184  }
185  ret += it->first + "=" + StringifyInt(value - old_value);
186  add_token = true;
187  }
188  }
189 
190  // timestamp
191  if (add_token) {
192  ret += " ";
193  }
194  ret += StringifyUint(timestamp_);
195 
196  return ret;
197 }
198 
200  const char *hostname = influx_host_.c_str();
201  struct addrinfo hints;
202  int err;
203 
204  memset(&hints, 0, sizeof(hints));
205  hints.ai_family = AF_INET;
206  hints.ai_socktype = SOCK_DGRAM;
207 
208  err = getaddrinfo(influx_host_.c_str(), NULL, &hints, &res_);
209  if (err != 0 || res_ == NULL) {
211  "Failed to resolve influx server [%s]. errno=%d",
212  hostname, errno);
214  }
215 
216  socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
217  if (socket_fd_ < 0) {
219  "Failed to open socket");
220  freeaddrinfo(res_);
221  return kTelemetryFailSocket;
222  }
223 
224  return kTelemetrySuccess;
225 }
226 
228  const std::string &payload) {
229  struct sockaddr_in *dest_addr = NULL;
230  dest_addr = reinterpret_cast<sockaddr_in*>(res_->ai_addr);
231  dest_addr->sin_port = htons(influx_port_);
232 
233  ssize_t num_bytes_sent = sendto(socket_fd_,
234  payload.data(),
235  payload.size(),
236  0,
237  reinterpret_cast<struct sockaddr*>(dest_addr),
238  sizeof(struct sockaddr_in));
239 
240  if (num_bytes_sent < 0) {
242  "Failed to send to influx. errno=%d", errno);
243  return kTelemetryFailSend;
244  } else if (static_cast<size_t>(num_bytes_sent) != payload.size()) {
246  "Incomplete send. Bytes transferred: %zd. Bytes expected %lu",
247  num_bytes_sent, payload.size());
248  return kTelemetryFailSend;
249  }
250 
251  LogCvmfs(kLogTelemetry, kLogDebug, "INFLUX: POSTING [%s]", payload.c_str());
252  return kTelemetrySuccess;
253 }
254 
255 
257  // create payload
258  std::string payload = MakePayload();
259  std::string delta_payload = "";
260  if (old_counters_.size() > 0) {
261  delta_payload = MakeDeltaPayload();
262  payload = payload + "\n" + delta_payload;
263  }
264  payload += "\n";
265 
266  // send to influx
267  SendToInflux(payload);
268 
269  // current counter is now old counter
270  counters_.swap(old_counters_);
271 }
272 
273 } // namespace perf
std::string StringifyUint(const uint64_t value)
Definition: string.cc:84
int64_t String2Int64(const string &value)
Definition: string.cc:222
std::map< std::string, int64_t > old_counters_
std::map< std::string, int64_t > counters_
string StringifyInt(const int64_t value)
Definition: string.cc:78
bool GetValue(const std::string &key, std::string *value) const
Definition: options.cc:376
TelemetryReturn SendToInflux(const std::string &payload)
TelemetryAggregatorInflux(Statistics *statistics, int send_rate_sec, OptionsManager *options_mgr, const std::string &fqrn)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528