GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/telemetry_aggregator_influx.cc
Date: 2025-08-31 02:39:21
Exec Total Coverage
Lines: 97 134 72.4%
Branches: 104 212 49.1%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "telemetry_aggregator_influx.h"
6
7 #include <netinet/in.h>
8 #include <pthread.h>
9 #include <stdint.h>
10 #include <sys/socket.h>
11 #include <sys/types.h>
12
13 #include <cstring>
14 #include <map>
15 #include <string>
16 #include <vector>
17
18 #include "util/logging.h"
19 #include "util/posix.h"
20 #include "util/string.h"
21
22 namespace perf {
23
24 301 TelemetryAggregatorInflux::TelemetryAggregatorInflux(
25 Statistics *statistics,
26 int send_rate_sec,
27 OptionsManager *options_mgr,
28 MountPoint *mount_point,
29 301 const std::string &fqrn)
30 : TelemetryAggregator(statistics, send_rate_sec, mount_point, fqrn)
31
1/2
✓ Branch 2 taken 301 times.
✗ Branch 3 not taken.
301 , influx_extra_fields_("")
32
1/2
✓ Branch 2 taken 301 times.
✗ Branch 3 not taken.
301 , influx_extra_tags_("")
33 301 , send_delta_(true)
34 301 , socket_fd_(-1)
35 301 , res_(NULL) {
36 301 int params = 0;
37
38
4/6
✓ Branch 2 taken 301 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 301 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 258 times.
✓ Branch 10 taken 43 times.
301 if (options_mgr->GetValue("CVMFS_INFLUX_HOST", &influx_host_)) {
39
1/2
✓ Branch 1 taken 258 times.
✗ Branch 2 not taken.
258 if (influx_host_.size() > 1) {
40 258 params++;
41 } else {
42 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn,
43 "No value given for CVMFS_INFLUX_HOST");
44 }
45 }
46
47 301 std::string opt;
48
3/6
✓ Branch 2 taken 301 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 301 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 301 times.
✗ Branch 10 not taken.
301 if (options_mgr->GetValue("CVMFS_INFLUX_PORT", &opt)) {
49
2/4
✓ Branch 3 taken 301 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 301 times.
✗ Branch 7 not taken.
301 influx_port_ = static_cast<int>(String2Int64(opt.c_str()));
50
2/4
✓ Branch 0 taken 301 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 301 times.
✗ Branch 3 not taken.
301 if (influx_port_ > 0 && influx_port_ < 65536) {
51 301 params++;
52 } else {
53 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn,
54 "Invalid value for CVMFS_INFLUX_PORT [%s]", opt.c_str());
55 }
56 }
57
58
3/6
✓ Branch 2 taken 301 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 301 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 301 times.
✗ Branch 10 not taken.
301 if (options_mgr->GetValue("CVMFS_INFLUX_METRIC_NAME", &influx_metric_name_)) {
59 301 params++;
60 }
61
62
4/6
✓ Branch 2 taken 301 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 301 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 215 times.
✓ Branch 10 taken 86 times.
301 if (!options_mgr->GetValue("CVMFS_INFLUX_EXTRA_TAGS", &influx_extra_tags_)) {
63
1/2
✓ Branch 1 taken 215 times.
✗ Branch 2 not taken.
215 influx_extra_tags_ = "";
64 }
65
66
4/6
✓ Branch 2 taken 301 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 301 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 215 times.
✓ Branch 10 taken 86 times.
301 if (!options_mgr->GetValue("CVMFS_INFLUX_EXTRA_FIELDS",
67 &influx_extra_fields_)) {
68
1/2
✓ Branch 1 taken 215 times.
✗ Branch 2 not taken.
215 influx_extra_fields_ = "";
69 }
70
71 // Read CVMFS_INFLUX_SEND_DELTA option, default to true (ON)
72 301 std::string send_delta_str;
73
4/6
✓ Branch 2 taken 301 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 301 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 86 times.
✓ Branch 10 taken 215 times.
301 if (options_mgr->GetValue("CVMFS_INFLUX_SEND_DELTA", &send_delta_str)) {
74
1/2
✓ Branch 1 taken 86 times.
✗ Branch 2 not taken.
86 send_delta_ = !options_mgr->IsOff(send_delta_str);
75 }
76
77
2/2
✓ Branch 0 taken 258 times.
✓ Branch 1 taken 43 times.
301 if (params == 3) {
78 258 is_zombie_ = false;
79
1/2
✓ Branch 5 taken 258 times.
✗ Branch 6 not taken.
258 LogCvmfs(kLogTelemetry, kLogDebug,
80 "Enabling influx metrics. "
81 "Send to [%s:%d] metric [%s]. Extra tags"
82 "[%s]. Extra fields [%s].",
83 influx_host_.c_str(), influx_port_, influx_metric_name_.c_str(),
84 influx_extra_tags_.c_str(), influx_extra_fields_.c_str());
85
1/2
✓ Branch 1 taken 258 times.
✗ Branch 2 not taken.
258 const TelemetryReturn ret = OpenSocket();
86
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 258 times.
258 if (ret != kTelemetrySuccess) {
87 is_zombie_ = true;
88 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn,
89 "Not enabling influx metrics. Error while open socket. %d", ret);
90 }
91 } else {
92 43 is_zombie_ = true;
93
1/2
✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
43 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn,
94 "Not enabling influx metrics. Not all mandatory variables set: "
95 "CVMFS_INFLUX_METRIC_NAME, CVMFS_INFLUX_HOST, CVMFS_INFLUX_PORT");
96 }
97 301 }
98
99 602 TelemetryAggregatorInflux::~TelemetryAggregatorInflux() {
100
2/2
✓ Branch 0 taken 258 times.
✓ Branch 1 taken 43 times.
602 if (socket_fd_ >= 0) {
101 516 close(socket_fd_);
102 }
103
2/2
✓ Branch 0 taken 258 times.
✓ Branch 1 taken 43 times.
602 if (res_) {
104 516 freeaddrinfo(res_);
105 }
106 }
107
108 /**
109 * Creates a string in the influx data format containing the absolute values
110 * of the counters. Counters are only included if their absolute value is > 0.
111 *
112 * Influx dataformat
113 * ( https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/
114 )
115 * Syntax
116 <measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]]
117 <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
118 *
119 * Example
120 myMeasurement,tag1=value1,tag2=value2 fieldKey="fieldValue"
121 1556813561098000000
122 */
123 344 std::string TelemetryAggregatorInflux::MakePayload() {
124 // measurement and tags
125
1/2
✓ Branch 1 taken 344 times.
✗ Branch 2 not taken.
344 std::string metric_name = influx_metric_name_;
126
2/2
✓ Branch 0 taken 301 times.
✓ Branch 1 taken 43 times.
344 if (send_delta_) {
127
1/2
✓ Branch 1 taken 301 times.
✗ Branch 2 not taken.
301 metric_name += "_absolute";
128 }
129
2/4
✓ Branch 1 taken 344 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 344 times.
✗ Branch 5 not taken.
344 std::string ret = metric_name + ",repo=" + fqrn_;
130
131
3/4
✓ Branch 1 taken 344 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 172 times.
✓ Branch 4 taken 172 times.
344 if (influx_extra_tags_ != "") {
132
2/4
✓ Branch 1 taken 172 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 172 times.
✗ Branch 5 not taken.
172 ret += "," + influx_extra_tags_;
133 }
134
135 // fields
136
1/2
✓ Branch 1 taken 344 times.
✗ Branch 2 not taken.
344 ret += " ";
137 344 bool add_token = false;
138 688 for (std::map<std::string, int64_t>::iterator it = counters_.begin(),
139 344 iEnd = counters_.end();
140
2/2
✓ Branch 1 taken 1032 times.
✓ Branch 2 taken 344 times.
1376 it != iEnd;
141 1032 it++) {
142
2/2
✓ Branch 1 taken 473 times.
✓ Branch 2 taken 559 times.
1032 if (it->second != 0) {
143
2/2
✓ Branch 0 taken 258 times.
✓ Branch 1 taken 215 times.
473 if (add_token) {
144
1/2
✓ Branch 1 taken 258 times.
✗ Branch 2 not taken.
258 ret += ",";
145 }
146
4/8
✓ Branch 2 taken 473 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 473 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 473 times.
✗ Branch 10 not taken.
✓ Branch 12 taken 473 times.
✗ Branch 13 not taken.
473 ret += it->first + "=" + StringifyInt(it->second);
147 473 add_token = true;
148 }
149 }
150
3/4
✓ Branch 1 taken 344 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 172 times.
✓ Branch 4 taken 172 times.
344 if (influx_extra_fields_ != "") {
151
2/2
✓ Branch 0 taken 86 times.
✓ Branch 1 taken 86 times.
172 if (add_token) {
152
1/2
✓ Branch 1 taken 86 times.
✗ Branch 2 not taken.
86 ret += ",";
153 }
154
1/2
✓ Branch 1 taken 172 times.
✗ Branch 2 not taken.
172 ret += influx_extra_fields_;
155 172 add_token = true;
156 }
157
158 // timestamp
159
2/2
✓ Branch 0 taken 301 times.
✓ Branch 1 taken 43 times.
344 if (add_token) {
160
1/2
✓ Branch 1 taken 301 times.
✗ Branch 2 not taken.
301 ret += " ";
161 }
162
2/4
✓ Branch 1 taken 344 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 344 times.
✗ Branch 5 not taken.
344 ret += StringifyUint(timestamp_);
163
164 688 return ret;
165 344 }
166
167 /**
168 * Creates a string in the influx data format containing the delta between
169 * 2 measurements of the same counter. Counters are only included if their
170 * absolute value is > 0 (delta can be 0).
171 *
172 * NOTE: As influx_extra_fields_ are static, they are excluded of this
173 * delta format
174 *
175 * Influx dataformat
176 * ( https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/
177 )
178 * Syntax
179 <measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]]
180 <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
181 *
182 * Example
183 myMeasurement,tag1=value1,tag2=value2 fieldKey="fieldValue"
184 1556813561098000000
185 */
186 86 std::string TelemetryAggregatorInflux::MakeDeltaPayload() {
187 // measurement and tags
188
2/4
✓ Branch 2 taken 86 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 86 times.
✗ Branch 6 not taken.
172 std::string ret = "" + influx_metric_name_ + "_delta,repo=" + fqrn_;
189
190
2/4
✓ Branch 1 taken 86 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 86 times.
✗ Branch 4 not taken.
86 if (influx_extra_tags_ != "") {
191
2/4
✓ Branch 1 taken 86 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 86 times.
✗ Branch 5 not taken.
86 ret += "," + influx_extra_tags_;
192 }
193
194 // fields
195
1/2
✓ Branch 1 taken 86 times.
✗ Branch 2 not taken.
86 ret += " ";
196 86 bool add_token = false;
197 172 for (std::map<std::string, int64_t>::iterator it = counters_.begin(),
198 86 iEnd = counters_.end();
199
2/2
✓ Branch 1 taken 258 times.
✓ Branch 2 taken 86 times.
344 it != iEnd;
200 258 it++) {
201 258 const int64_t value = it->second;
202
2/2
✓ Branch 0 taken 215 times.
✓ Branch 1 taken 43 times.
258 if (value != 0) {
203 int64_t old_value;
204 try {
205
1/2
✓ Branch 2 taken 215 times.
✗ Branch 3 not taken.
215 old_value = old_counters_.at(it->first);
206 } catch (const std::out_of_range &ex) {
207 old_value = 0;
208 }
209
2/2
✓ Branch 0 taken 129 times.
✓ Branch 1 taken 86 times.
215 if (add_token) {
210
1/2
✓ Branch 1 taken 129 times.
✗ Branch 2 not taken.
129 ret += ",";
211 }
212
4/8
✓ Branch 1 taken 215 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 215 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 215 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 215 times.
✗ Branch 12 not taken.
215 ret += it->first + "=" + StringifyInt(value - old_value);
213 215 add_token = true;
214 }
215 }
216
217 // timestamp
218
1/2
✓ Branch 0 taken 86 times.
✗ Branch 1 not taken.
86 if (add_token) {
219
1/2
✓ Branch 1 taken 86 times.
✗ Branch 2 not taken.
86 ret += " ";
220 }
221
2/4
✓ Branch 1 taken 86 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 86 times.
✗ Branch 5 not taken.
86 ret += StringifyUint(timestamp_);
222
223 86 return ret;
224 }
225
226 258 TelemetryReturn TelemetryAggregatorInflux::OpenSocket() {
227 258 const char *hostname = influx_host_.c_str();
228 struct addrinfo hints;
229 int err;
230
231 258 memset(&hints, 0, sizeof(hints));
232 258 hints.ai_family = AF_INET;
233 258 hints.ai_socktype = SOCK_DGRAM;
234
235
1/2
✓ Branch 2 taken 258 times.
✗ Branch 3 not taken.
258 err = getaddrinfo(influx_host_.c_str(), NULL, &hints, &res_);
236
2/4
✓ Branch 0 taken 258 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 258 times.
258 if (err != 0 || res_ == NULL) {
237 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr,
238 "Failed to resolve influx server [%s]. errno=%d", hostname, errno);
239 return kTelemetryFailHostAddress;
240 }
241
242 258 socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
243
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 258 times.
258 if (socket_fd_ < 0) {
244 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr, "Failed to open socket");
245 freeaddrinfo(res_);
246 return kTelemetryFailSocket;
247 }
248
249 258 return kTelemetrySuccess;
250 }
251
252 TelemetryReturn TelemetryAggregatorInflux::SendToInflux(
253 const std::string &payload) {
254 struct sockaddr_in *dest_addr = NULL;
255 dest_addr = reinterpret_cast<sockaddr_in *>(res_->ai_addr);
256 dest_addr->sin_port = htons(influx_port_);
257
258 const ssize_t num_bytes_sent = sendto(
259 socket_fd_, payload.data(), payload.size(), 0,
260 reinterpret_cast<struct sockaddr *>(dest_addr),
261 sizeof(struct sockaddr_in));
262
263 if (num_bytes_sent < 0) {
264 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr,
265 "Failed to send to influx. errno=%d", errno);
266 return kTelemetryFailSend;
267 } else if (static_cast<size_t>(num_bytes_sent) != payload.size()) {
268 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr,
269 "Incomplete send. Bytes transferred: %zd. Bytes expected %lu",
270 num_bytes_sent, payload.size());
271 return kTelemetryFailSend;
272 }
273
274 LogCvmfs(kLogTelemetry, kLogDebug, "INFLUX: POSTING [%s]", payload.c_str());
275 return kTelemetrySuccess;
276 }
277
278
279 void TelemetryAggregatorInflux::PushMetrics() {
280 // create payload
281 std::string payload = MakePayload();
282 std::string delta_payload = "";
283 if (send_delta_ && old_counters_.size() > 0) {
284 delta_payload = MakeDeltaPayload();
285 payload = payload + "\n" + delta_payload;
286 }
287 payload += "\n";
288
289 // send to influx
290 SendToInflux(payload);
291
292 // current counter is now old counter (only if delta sending is enabled)
293 if (send_delta_) {
294 counters_.swap(old_counters_);
295 }
296 }
297
298 } // namespace perf
299