CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
telemetry_aggregator_influx.cc
Go to the documentation of this file.
1 
6 
7 #include <netinet/in.h>
8 #include <pthread.h>
9 #include <stdint.h>
10 #include <sys/socket.h>
11 #include <sys/types.h>
12 
13 #include <cstring>
14 #include <map>
15 #include <string>
16 #include <vector>
17 
18 #include "util/logging.h"
19 #include "util/posix.h"
20 #include "util/string.h"
21 
22 namespace perf {
23 
25  Statistics *statistics,
26  int send_rate_sec,
27  OptionsManager *options_mgr,
28  MountPoint *mount_point,
29  const std::string &fqrn)
30  : TelemetryAggregator(statistics, send_rate_sec, mount_point, fqrn)
31  , influx_extra_fields_("")
32  , influx_extra_tags_("")
33  , socket_fd_(-1)
34  , res_(NULL) {
35  int params = 0;
36 
37  if (options_mgr->GetValue("CVMFS_INFLUX_HOST", &influx_host_)) {
38  if (influx_host_.size() > 1) {
39  params++;
40  } else {
42  "No value given for CVMFS_INFLUX_HOST");
43  }
44  }
45 
46  std::string opt;
47  if (options_mgr->GetValue("CVMFS_INFLUX_PORT", &opt)) {
48  influx_port_ = static_cast<int>(String2Int64(opt.c_str()));
49  if (influx_port_ > 0 && influx_port_ < 65536) {
50  params++;
51  } else {
53  "Invalid value for CVMFS_INFLUX_PORT [%s]", opt.c_str());
54  }
55  }
56 
57  if (options_mgr->GetValue("CVMFS_INFLUX_METRIC_NAME", &influx_metric_name_)) {
58  params++;
59  }
60 
61  if (!options_mgr->GetValue("CVMFS_INFLUX_EXTRA_TAGS", &influx_extra_tags_)) {
62  influx_extra_tags_ = "";
63  }
64 
65  if (!options_mgr->GetValue("CVMFS_INFLUX_EXTRA_FIELDS",
68  }
69 
70  if (params == 3) {
71  is_zombie_ = false;
73  "Enabling influx metrics. "
74  "Send to [%s:%d] metric [%s]. Extra tags"
75  "[%s]. Extra fields [%s].",
77  influx_extra_tags_.c_str(), influx_extra_fields_.c_str());
79  if (ret != kTelemetrySuccess) {
80  is_zombie_ = true;
82  "Not enabling influx metrics. Error while open socket. %d", ret);
83  }
84  } else {
85  is_zombie_ = true;
87  "Not enabling influx metrics. Not all mandatory variables set: "
88  "CVMFS_INFLUX_METRIC_NAME, CVMFS_INFLUX_HOST, CVMFS_INFLUX_PORT");
89  }
90 }
91 
93  if (socket_fd_ >= 0) {
94  close(socket_fd_);
95  }
96  if (res_) {
97  freeaddrinfo(res_);
98  }
99 }
100 
117  // measurement and tags
118  std::string ret = influx_metric_name_ + "_absolute,repo=" + fqrn_;
119 
120  if (influx_extra_tags_ != "") {
121  ret += "," + influx_extra_tags_;
122  }
123 
124  // fields
125  ret += " ";
126  bool add_token = false;
127  for (std::map<std::string, int64_t>::iterator it = counters_.begin(),
128  iEnd = counters_.end();
129  it != iEnd; it++) {
130  if (it->second != 0) {
131  if (add_token) {
132  ret += ",";
133  }
134  ret += it->first + "=" + StringifyInt(it->second);
135  add_token = true;
136  }
137  }
138  if (influx_extra_fields_ != "") {
139  if (add_token) {
140  ret += ",";
141  }
142  ret += influx_extra_fields_;
143  add_token = true;
144  }
145 
146  // timestamp
147  if (add_token) {
148  ret += " ";
149  }
150  ret += StringifyUint(timestamp_);
151 
152  return ret;
153 }
154 
175  // measurement and tags
176  std::string ret = "" + influx_metric_name_ + "_delta,repo=" + fqrn_;
177 
178  if (influx_extra_tags_ != "") {
179  ret += "," + influx_extra_tags_;
180  }
181 
182  // fields
183  ret += " ";
184  bool add_token = false;
185  for (std::map<std::string, int64_t>::iterator it = counters_.begin(),
186  iEnd = counters_.end();
187  it != iEnd; it++) {
188  int64_t value = it->second;
189  if (value != 0) {
190  int64_t old_value;
191  try {
192  old_value = old_counters_.at(it->first);
193  } catch (const std::out_of_range &ex) {
194  old_value = 0;
195  }
196  if (add_token) {
197  ret += ",";
198  }
199  ret += it->first + "=" + StringifyInt(value - old_value);
200  add_token = true;
201  }
202  }
203 
204  // timestamp
205  if (add_token) {
206  ret += " ";
207  }
208  ret += StringifyUint(timestamp_);
209 
210  return ret;
211 }
212 
214  const char *hostname = influx_host_.c_str();
215  struct addrinfo hints;
216  int err;
217 
218  memset(&hints, 0, sizeof(hints));
219  hints.ai_family = AF_INET;
220  hints.ai_socktype = SOCK_DGRAM;
221 
222  err = getaddrinfo(influx_host_.c_str(), NULL, &hints, &res_);
223  if (err != 0 || res_ == NULL) {
225  "Failed to resolve influx server [%s]. errno=%d", hostname, errno);
227  }
228 
229  socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
230  if (socket_fd_ < 0) {
231  LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr, "Failed to open socket");
232  freeaddrinfo(res_);
233  return kTelemetryFailSocket;
234  }
235 
236  return kTelemetrySuccess;
237 }
238 
240  const std::string &payload) {
241  struct sockaddr_in *dest_addr = NULL;
242  dest_addr = reinterpret_cast<sockaddr_in *>(res_->ai_addr);
243  dest_addr->sin_port = htons(influx_port_);
244 
245  ssize_t num_bytes_sent = sendto(
246  socket_fd_,
247  payload.data(),
248  payload.size(),
249  0,
250  reinterpret_cast<struct sockaddr *>(dest_addr),
251  sizeof(struct sockaddr_in));
252 
253  if (num_bytes_sent < 0) {
255  "Failed to send to influx. errno=%d", errno);
256  return kTelemetryFailSend;
257  } else if (static_cast<size_t>(num_bytes_sent) != payload.size()) {
259  "Incomplete send. Bytes transferred: %zd. Bytes expected %lu",
260  num_bytes_sent, payload.size());
261  return kTelemetryFailSend;
262  }
263 
264  LogCvmfs(kLogTelemetry, kLogDebug, "INFLUX: POSTING [%s]", payload.c_str());
265  return kTelemetrySuccess;
266 }
267 
268 
270  // create payload
271  std::string payload = MakePayload();
272  std::string delta_payload = "";
273  if (old_counters_.size() > 0) {
274  delta_payload = MakeDeltaPayload();
275  payload = payload + "\n" + delta_payload;
276  }
277  payload += "\n";
278 
279  // send to influx
280  SendToInflux(payload);
281 
282  // current counter is now old counter
283  counters_.swap(old_counters_);
284 }
285 
286 } // namespace perf
std::string StringifyUint(const uint64_t value)
Definition: string.cc:83
int64_t String2Int64(const string &value)
Definition: string.cc:234
TelemetryAggregatorInflux(Statistics *statistics, int send_rate_sec, OptionsManager *options_mgr, MountPoint *mount_point, const std::string &fqrn)
std::map< std::string, int64_t > old_counters_
std::map< std::string, int64_t > counters_
string StringifyInt(const int64_t value)
Definition: string.cc:77
bool GetValue(const std::string &key, std::string *value) const
Definition: options.cc:368
TelemetryReturn SendToInflux(const std::string &payload)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545