GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/telemetry_aggregator_influx.cc
Date: 2026-05-19 11:45:12
Exec Total Coverage
Lines: 97 134 72.4%
Branches: 104 212 49.1%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "telemetry_aggregator_influx.h"
6
7 #include <netinet/in.h>
8 #include <pthread.h>
9 #include <stdint.h>
10 #include <sys/socket.h>
11 #include <sys/types.h>
12
13 #include <cstring>
14 #include <map>
15 #include <stdexcept>
16 #include <string>
17 #include <vector>
18
19 #include "util/logging.h"
20 #include "util/posix.h"
21 #include "util/string.h"
22
23 namespace perf {
24
25 182 TelemetryAggregatorInflux::TelemetryAggregatorInflux(
26 Statistics *statistics,
27 int send_rate_sec,
28 OptionsManager *options_mgr,
29 MountPoint *mount_point,
30 182 const std::string &fqrn)
31 : TelemetryAggregator(statistics, send_rate_sec, mount_point, fqrn)
32
1/2
✓ Branch 2 taken 182 times.
✗ Branch 3 not taken.
182 , influx_extra_fields_("")
33
1/2
✓ Branch 2 taken 182 times.
✗ Branch 3 not taken.
182 , influx_extra_tags_("")
34 182 , send_delta_(true)
35 182 , socket_fd_(-1)
36 182 , res_(NULL) {
37 182 int params = 0;
38
39
4/6
✓ Branch 2 taken 182 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 182 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 156 times.
✓ Branch 10 taken 26 times.
182 if (options_mgr->GetValue("CVMFS_INFLUX_HOST", &influx_host_)) {
40
1/2
✓ Branch 1 taken 156 times.
✗ Branch 2 not taken.
156 if (influx_host_.size() > 1) {
41 156 params++;
42 } else {
43 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn,
44 "No value given for CVMFS_INFLUX_HOST");
45 }
46 }
47
48 182 std::string opt;
49
3/6
✓ Branch 2 taken 182 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 182 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 182 times.
✗ Branch 10 not taken.
182 if (options_mgr->GetValue("CVMFS_INFLUX_PORT", &opt)) {
50
2/4
✓ Branch 3 taken 182 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 182 times.
✗ Branch 7 not taken.
182 influx_port_ = static_cast<int>(String2Int64(opt.c_str()));
51
2/4
✓ Branch 0 taken 182 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 182 times.
✗ Branch 3 not taken.
182 if (influx_port_ > 0 && influx_port_ < 65536) {
52 182 params++;
53 } else {
54 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn,
55 "Invalid value for CVMFS_INFLUX_PORT [%s]", opt.c_str());
56 }
57 }
58
59
3/6
✓ Branch 2 taken 182 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 182 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 182 times.
✗ Branch 10 not taken.
182 if (options_mgr->GetValue("CVMFS_INFLUX_METRIC_NAME", &influx_metric_name_)) {
60 182 params++;
61 }
62
63
4/6
✓ Branch 2 taken 182 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 182 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 130 times.
✓ Branch 10 taken 52 times.
182 if (!options_mgr->GetValue("CVMFS_INFLUX_EXTRA_TAGS", &influx_extra_tags_)) {
64
1/2
✓ Branch 1 taken 130 times.
✗ Branch 2 not taken.
130 influx_extra_tags_ = "";
65 }
66
67
4/6
✓ Branch 2 taken 182 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 182 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 130 times.
✓ Branch 10 taken 52 times.
182 if (!options_mgr->GetValue("CVMFS_INFLUX_EXTRA_FIELDS",
68 &influx_extra_fields_)) {
69
1/2
✓ Branch 1 taken 130 times.
✗ Branch 2 not taken.
130 influx_extra_fields_ = "";
70 }
71
72 // Read CVMFS_INFLUX_SEND_DELTA option, default to true (ON)
73 182 std::string send_delta_str;
74
4/6
✓ Branch 2 taken 182 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 182 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 52 times.
✓ Branch 10 taken 130 times.
182 if (options_mgr->GetValue("CVMFS_INFLUX_SEND_DELTA", &send_delta_str)) {
75
1/2
✓ Branch 1 taken 52 times.
✗ Branch 2 not taken.
52 send_delta_ = !options_mgr->IsOff(send_delta_str);
76 }
77
78
2/2
✓ Branch 0 taken 156 times.
✓ Branch 1 taken 26 times.
182 if (params == 3) {
79 156 is_zombie_ = false;
80
1/2
✓ Branch 5 taken 156 times.
✗ Branch 6 not taken.
156 LogCvmfs(kLogTelemetry, kLogDebug,
81 "Enabling influx metrics. "
82 "Send to [%s:%d] metric [%s]. Extra tags"
83 "[%s]. Extra fields [%s].",
84 influx_host_.c_str(), influx_port_, influx_metric_name_.c_str(),
85 influx_extra_tags_.c_str(), influx_extra_fields_.c_str());
86
1/2
✓ Branch 1 taken 156 times.
✗ Branch 2 not taken.
156 const TelemetryReturn ret = OpenSocket();
87
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 156 times.
156 if (ret != kTelemetrySuccess) {
88 is_zombie_ = true;
89 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn,
90 "Not enabling influx metrics. Error while open socket. %d", ret);
91 }
92 } else {
93 26 is_zombie_ = true;
94
1/2
✓ Branch 1 taken 26 times.
✗ Branch 2 not taken.
26 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn,
95 "Not enabling influx metrics. Not all mandatory variables set: "
96 "CVMFS_INFLUX_METRIC_NAME, CVMFS_INFLUX_HOST, CVMFS_INFLUX_PORT");
97 }
98 182 }
99
100 364 TelemetryAggregatorInflux::~TelemetryAggregatorInflux() {
101
2/2
✓ Branch 0 taken 156 times.
✓ Branch 1 taken 26 times.
364 if (socket_fd_ >= 0) {
102 312 close(socket_fd_);
103 }
104
2/2
✓ Branch 0 taken 156 times.
✓ Branch 1 taken 26 times.
364 if (res_) {
105 312 freeaddrinfo(res_);
106 }
107 }
108
109 /**
110 * Creates a string in the influx data format containing the absolute values
111 * of the counters. Counters are only included if their absolute value is > 0.
112 *
113 * Influx dataformat
114 * ( https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/
115 )
116 * Syntax
117 <measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]]
118 <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
119 *
120 * Example
121 myMeasurement,tag1=value1,tag2=value2 fieldKey="fieldValue"
122 1556813561098000000
123 */
124 208 std::string TelemetryAggregatorInflux::MakePayload() {
125 // measurement and tags
126
1/2
✓ Branch 1 taken 208 times.
✗ Branch 2 not taken.
208 std::string metric_name = influx_metric_name_;
127
2/2
✓ Branch 0 taken 182 times.
✓ Branch 1 taken 26 times.
208 if (send_delta_) {
128
1/2
✓ Branch 1 taken 182 times.
✗ Branch 2 not taken.
182 metric_name += "_absolute";
129 }
130
2/4
✓ Branch 1 taken 208 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 208 times.
✗ Branch 5 not taken.
208 std::string ret = metric_name + ",repo=" + fqrn_;
131
132
3/4
✓ Branch 1 taken 208 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 104 times.
✓ Branch 4 taken 104 times.
208 if (influx_extra_tags_ != "") {
133
2/4
✓ Branch 1 taken 104 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 104 times.
✗ Branch 5 not taken.
104 ret += "," + influx_extra_tags_;
134 }
135
136 // fields
137
1/2
✓ Branch 1 taken 208 times.
✗ Branch 2 not taken.
208 ret += " ";
138 208 bool add_token = false;
139 416 for (std::map<std::string, int64_t>::iterator it = counters_.begin(),
140 208 iEnd = counters_.end();
141
2/2
✓ Branch 1 taken 624 times.
✓ Branch 2 taken 208 times.
832 it != iEnd;
142 624 it++) {
143
2/2
✓ Branch 1 taken 286 times.
✓ Branch 2 taken 338 times.
624 if (it->second != 0) {
144
2/2
✓ Branch 0 taken 156 times.
✓ Branch 1 taken 130 times.
286 if (add_token) {
145
1/2
✓ Branch 1 taken 156 times.
✗ Branch 2 not taken.
156 ret += ",";
146 }
147
4/8
✓ Branch 2 taken 286 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 286 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 286 times.
✗ Branch 10 not taken.
✓ Branch 12 taken 286 times.
✗ Branch 13 not taken.
286 ret += it->first + "=" + StringifyInt(it->second);
148 286 add_token = true;
149 }
150 }
151
3/4
✓ Branch 1 taken 208 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 104 times.
✓ Branch 4 taken 104 times.
208 if (influx_extra_fields_ != "") {
152
2/2
✓ Branch 0 taken 52 times.
✓ Branch 1 taken 52 times.
104 if (add_token) {
153
1/2
✓ Branch 1 taken 52 times.
✗ Branch 2 not taken.
52 ret += ",";
154 }
155
1/2
✓ Branch 1 taken 104 times.
✗ Branch 2 not taken.
104 ret += influx_extra_fields_;
156 104 add_token = true;
157 }
158
159 // timestamp
160
2/2
✓ Branch 0 taken 182 times.
✓ Branch 1 taken 26 times.
208 if (add_token) {
161
1/2
✓ Branch 1 taken 182 times.
✗ Branch 2 not taken.
182 ret += " ";
162 }
163
2/4
✓ Branch 1 taken 208 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 208 times.
✗ Branch 5 not taken.
208 ret += StringifyUint(timestamp_);
164
165 416 return ret;
166 208 }
167
168 /**
169 * Creates a string in the influx data format containing the delta between
170 * 2 measurements of the same counter. Counters are only included if their
171 * absolute value is > 0 (delta can be 0).
172 *
173 * NOTE: As influx_extra_fields_ are static, they are excluded of this
174 * delta format
175 *
176 * Influx dataformat
177 * ( https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/
178 )
179 * Syntax
180 <measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]]
181 <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
182 *
183 * Example
184 myMeasurement,tag1=value1,tag2=value2 fieldKey="fieldValue"
185 1556813561098000000
186 */
187 52 std::string TelemetryAggregatorInflux::MakeDeltaPayload() {
188 // measurement and tags
189
2/4
✓ Branch 2 taken 52 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 52 times.
✗ Branch 6 not taken.
104 std::string ret = "" + influx_metric_name_ + "_delta,repo=" + fqrn_;
190
191
2/4
✓ Branch 1 taken 52 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 52 times.
✗ Branch 4 not taken.
52 if (influx_extra_tags_ != "") {
192
2/4
✓ Branch 1 taken 52 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 52 times.
✗ Branch 5 not taken.
52 ret += "," + influx_extra_tags_;
193 }
194
195 // fields
196
1/2
✓ Branch 1 taken 52 times.
✗ Branch 2 not taken.
52 ret += " ";
197 52 bool add_token = false;
198 104 for (std::map<std::string, int64_t>::iterator it = counters_.begin(),
199 52 iEnd = counters_.end();
200
2/2
✓ Branch 1 taken 156 times.
✓ Branch 2 taken 52 times.
208 it != iEnd;
201 156 it++) {
202 156 const int64_t value = it->second;
203
2/2
✓ Branch 0 taken 130 times.
✓ Branch 1 taken 26 times.
156 if (value != 0) {
204 int64_t old_value;
205 try {
206
1/2
✓ Branch 2 taken 130 times.
✗ Branch 3 not taken.
130 old_value = old_counters_.at(it->first);
207 } catch (const std::out_of_range &ex) {
208 old_value = 0;
209 }
210
2/2
✓ Branch 0 taken 78 times.
✓ Branch 1 taken 52 times.
130 if (add_token) {
211
1/2
✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
78 ret += ",";
212 }
213
4/8
✓ Branch 1 taken 130 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 130 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 130 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 130 times.
✗ Branch 12 not taken.
130 ret += it->first + "=" + StringifyInt(value - old_value);
214 130 add_token = true;
215 }
216 }
217
218 // timestamp
219
1/2
✓ Branch 0 taken 52 times.
✗ Branch 1 not taken.
52 if (add_token) {
220
1/2
✓ Branch 1 taken 52 times.
✗ Branch 2 not taken.
52 ret += " ";
221 }
222
2/4
✓ Branch 1 taken 52 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 52 times.
✗ Branch 5 not taken.
52 ret += StringifyUint(timestamp_);
223
224 52 return ret;
225 }
226
227 156 TelemetryReturn TelemetryAggregatorInflux::OpenSocket() {
228 156 const char *hostname = influx_host_.c_str();
229 struct addrinfo hints;
230 int err;
231
232 156 memset(&hints, 0, sizeof(hints));
233 156 hints.ai_family = AF_INET;
234 156 hints.ai_socktype = SOCK_DGRAM;
235
236
1/2
✓ Branch 2 taken 156 times.
✗ Branch 3 not taken.
156 err = getaddrinfo(influx_host_.c_str(), NULL, &hints, &res_);
237
2/4
✓ Branch 0 taken 156 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 156 times.
156 if (err != 0 || res_ == NULL) {
238 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr,
239 "Failed to resolve influx server [%s]. errno=%d", hostname, errno);
240 return kTelemetryFailHostAddress;
241 }
242
243 156 socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
244
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 156 times.
156 if (socket_fd_ < 0) {
245 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr, "Failed to open socket");
246 freeaddrinfo(res_);
247 return kTelemetryFailSocket;
248 }
249
250 156 return kTelemetrySuccess;
251 }
252
253 TelemetryReturn TelemetryAggregatorInflux::SendToInflux(
254 const std::string &payload) {
255 struct sockaddr_in *dest_addr = NULL;
256 dest_addr = reinterpret_cast<sockaddr_in *>(res_->ai_addr);
257 dest_addr->sin_port = htons(influx_port_);
258
259 const ssize_t num_bytes_sent = sendto(
260 socket_fd_, payload.data(), payload.size(), 0,
261 reinterpret_cast<struct sockaddr *>(dest_addr),
262 sizeof(struct sockaddr_in));
263
264 if (num_bytes_sent < 0) {
265 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr,
266 "Failed to send to influx. errno=%d", errno);
267 return kTelemetryFailSend;
268 } else if (static_cast<size_t>(num_bytes_sent) != payload.size()) {
269 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr,
270 "Incomplete send. Bytes transferred: %zd. Bytes expected %lu",
271 num_bytes_sent, payload.size());
272 return kTelemetryFailSend;
273 }
274
275 LogCvmfs(kLogTelemetry, kLogDebug, "INFLUX: POSTING [%s]", payload.c_str());
276 return kTelemetrySuccess;
277 }
278
279
280 void TelemetryAggregatorInflux::PushMetrics() {
281 // create payload
282 std::string payload = MakePayload();
283 std::string delta_payload = "";
284 if (send_delta_ && old_counters_.size() > 0) {
285 delta_payload = MakeDeltaPayload();
286 payload = payload + "\n" + delta_payload;
287 }
288 payload += "\n";
289
290 // send to influx
291 SendToInflux(payload);
292
293 // current counter is now old counter (only if delta sending is enabled)
294 if (send_delta_) {
295 counters_.swap(old_counters_);
296 }
297 }
298
299 } // namespace perf
300