CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
telemetry_aggregator_influx.cc
Go to the documentation of this file.
1 
6 
7 #include <netinet/in.h>
8 #include <pthread.h>
9 #include <stdint.h>
10 #include <sys/socket.h>
11 #include <sys/types.h>
12 
13 #include <cstring>
14 #include <map>
15 #include <string>
16 #include <vector>
17 
18 #include "util/logging.h"
19 #include "util/posix.h"
20 #include "util/string.h"
21 
22 namespace perf {
23 
25  int send_rate_sec,
26  OptionsManager *options_mgr,
27  MountPoint* mount_point,
28  const std::string &fqrn) :
29  TelemetryAggregator(statistics, send_rate_sec,
30  mount_point, fqrn),
31  influx_extra_fields_(""), influx_extra_tags_(""),
32  socket_fd_(-1), res_(NULL) {
33  int params = 0;
34 
35  if (options_mgr->GetValue("CVMFS_INFLUX_HOST", &influx_host_)) {
36  if (influx_host_.size() > 1) {
37  params++;
38  } else {
40  "No value given for CVMFS_INFLUX_HOST");
41  }
42  }
43 
44  std::string opt;
45  if (options_mgr->GetValue("CVMFS_INFLUX_PORT", &opt)) {
46  influx_port_ = static_cast<int>(String2Int64(opt.c_str()));
47  if (influx_port_ > 0 && influx_port_ < 65536) {
48  params++;
49  } else {
51  "Invalid value for CVMFS_INFLUX_PORT [%s]", opt.c_str());
52  }
53  }
54 
55  if (options_mgr->GetValue("CVMFS_INFLUX_METRIC_NAME", &influx_metric_name_))
56  {
57  params++;
58  }
59 
60  if (!options_mgr->GetValue("CVMFS_INFLUX_EXTRA_TAGS", &influx_extra_tags_)) {
61  influx_extra_tags_ = "";
62  }
63 
64  if (!options_mgr->GetValue("CVMFS_INFLUX_EXTRA_FIELDS",
67  }
68 
69  if (params == 3) {
70  is_zombie_ = false;
71  LogCvmfs(kLogTelemetry, kLogDebug, "Enabling influx metrics. "
72  "Send to [%s:%d] metric [%s]. Extra tags"
73  "[%s]. Extra fields [%s].",
74  influx_host_.c_str(),
76  influx_metric_name_.c_str(),
77  influx_extra_tags_.c_str(),
78  influx_extra_fields_.c_str());
80  if (ret != kTelemetrySuccess) {
81  is_zombie_ = true;
83  "Not enabling influx metrics. Error while open socket. %d", ret);
84  }
85  } else {
86  is_zombie_ = true;
88  "Not enabling influx metrics. Not all mandatory variables set: "
89  "CVMFS_INFLUX_METRIC_NAME, CVMFS_INFLUX_HOST, CVMFS_INFLUX_PORT");
90  }
91 }
92 
94  if (socket_fd_ >= 0) {
95  close(socket_fd_);
96  }
97  if (res_) {
98  freeaddrinfo(res_);
99  }
100 }
101 
115  // measurement and tags
116  std::string ret = influx_metric_name_ + "_absolute,repo=" + fqrn_;
117 
118  if (influx_extra_tags_ != "") {
119  ret += "," + influx_extra_tags_;
120  }
121 
122  // fields
123  ret += " ";
124  bool add_token = false;
125  for (std::map<std::string, int64_t>::iterator it
126  = counters_.begin(), iEnd = counters_.end(); it != iEnd; it++) {
127  if (it->second != 0) {
128  if (add_token) {
129  ret += ",";
130  }
131  ret += it->first + "=" + StringifyInt(it->second);
132  add_token = true;
133  }
134  }
135  if (influx_extra_fields_ != "") {
136  if (add_token) {
137  ret += ",";
138  }
139  ret += influx_extra_fields_;
140  add_token = true;
141  }
142 
143  // timestamp
144  if (add_token) {
145  ret += " ";
146  }
147  ret += StringifyUint(timestamp_);
148 
149  return ret;
150 }
151 
169  // measurement and tags
170  std::string ret = "" + influx_metric_name_ + "_delta,repo=" + fqrn_;
171 
172  if (influx_extra_tags_ != "") {
173  ret += "," + influx_extra_tags_;
174  }
175 
176  // fields
177  ret += " ";
178  bool add_token = false;
179  for (std::map<std::string, int64_t>::iterator it
180  = counters_.begin(), iEnd = counters_.end(); it != iEnd; it++) {
181  int64_t value = it->second;
182  if (value != 0) {
183  int64_t old_value;
184  try {
185  old_value = old_counters_.at(it->first);
186  } catch(const std::out_of_range& ex) {
187  old_value = 0;
188  }
189  if (add_token) {
190  ret += ",";
191  }
192  ret += it->first + "=" + StringifyInt(value - old_value);
193  add_token = true;
194  }
195  }
196 
197  // timestamp
198  if (add_token) {
199  ret += " ";
200  }
201  ret += StringifyUint(timestamp_);
202 
203  return ret;
204 }
205 
207  const char *hostname = influx_host_.c_str();
208  struct addrinfo hints;
209  int err;
210 
211  memset(&hints, 0, sizeof(hints));
212  hints.ai_family = AF_INET;
213  hints.ai_socktype = SOCK_DGRAM;
214 
215  err = getaddrinfo(influx_host_.c_str(), NULL, &hints, &res_);
216  if (err != 0 || res_ == NULL) {
218  "Failed to resolve influx server [%s]. errno=%d",
219  hostname, errno);
221  }
222 
223  socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
224  if (socket_fd_ < 0) {
226  "Failed to open socket");
227  freeaddrinfo(res_);
228  return kTelemetryFailSocket;
229  }
230 
231  return kTelemetrySuccess;
232 }
233 
235  const std::string &payload) {
236  struct sockaddr_in *dest_addr = NULL;
237  dest_addr = reinterpret_cast<sockaddr_in*>(res_->ai_addr);
238  dest_addr->sin_port = htons(influx_port_);
239 
240  ssize_t num_bytes_sent = sendto(socket_fd_,
241  payload.data(),
242  payload.size(),
243  0,
244  reinterpret_cast<struct sockaddr*>(dest_addr),
245  sizeof(struct sockaddr_in));
246 
247  if (num_bytes_sent < 0) {
249  "Failed to send to influx. errno=%d", errno);
250  return kTelemetryFailSend;
251  } else if (static_cast<size_t>(num_bytes_sent) != payload.size()) {
253  "Incomplete send. Bytes transferred: %zd. Bytes expected %lu",
254  num_bytes_sent, payload.size());
255  return kTelemetryFailSend;
256  }
257 
258  LogCvmfs(kLogTelemetry, kLogDebug, "INFLUX: POSTING [%s]", payload.c_str());
259  return kTelemetrySuccess;
260 }
261 
262 
264  // create payload
265  std::string payload = MakePayload();
266  std::string delta_payload = "";
267  if (old_counters_.size() > 0) {
268  delta_payload = MakeDeltaPayload();
269  payload = payload + "\n" + delta_payload;
270  }
271  payload += "\n";
272 
273  // send to influx
274  SendToInflux(payload);
275 
276  // current counter is now old counter
277  counters_.swap(old_counters_);
278 }
279 
280 } // namespace perf
std::string StringifyUint(const uint64_t value)
Definition: string.cc:84
int64_t String2Int64(const string &value)
Definition: string.cc:240
TelemetryAggregatorInflux(Statistics *statistics, int send_rate_sec, OptionsManager *options_mgr, MountPoint *mount_point, const std::string &fqrn)
std::map< std::string, int64_t > old_counters_
std::map< std::string, int64_t > counters_
string StringifyInt(const int64_t value)
Definition: string.cc:78
bool GetValue(const std::string &key, std::string *value) const
Definition: options.cc:376
TelemetryReturn SendToInflux(const std::string &payload)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528