GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/telemetry_aggregator_influx.cc
Date: 2026-04-26 02:35:59
Exec Total Coverage
Lines: 97 134 72.4%
Branches: 104 212 49.1%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "telemetry_aggregator_influx.h"
6
7 #include <netinet/in.h>
8 #include <pthread.h>
9 #include <stdint.h>
10 #include <sys/socket.h>
11 #include <sys/types.h>
12
13 #include <cstring>
14 #include <map>
15 #include <stdexcept>
16 #include <string>
17 #include <vector>
18
19 #include "util/logging.h"
20 #include "util/posix.h"
21 #include "util/string.h"
22
23 namespace perf {
24
25 259 TelemetryAggregatorInflux::TelemetryAggregatorInflux(
26 Statistics *statistics,
27 int send_rate_sec,
28 OptionsManager *options_mgr,
29 MountPoint *mount_point,
30 259 const std::string &fqrn)
31 : TelemetryAggregator(statistics, send_rate_sec, mount_point, fqrn)
32
1/2
✓ Branch 2 taken 259 times.
✗ Branch 3 not taken.
259 , influx_extra_fields_("")
33
1/2
✓ Branch 2 taken 259 times.
✗ Branch 3 not taken.
259 , influx_extra_tags_("")
34 259 , send_delta_(true)
35 259 , socket_fd_(-1)
36 259 , res_(NULL) {
37 259 int params = 0;
38
39
4/6
✓ Branch 2 taken 259 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 259 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 222 times.
✓ Branch 10 taken 37 times.
259 if (options_mgr->GetValue("CVMFS_INFLUX_HOST", &influx_host_)) {
40
1/2
✓ Branch 1 taken 222 times.
✗ Branch 2 not taken.
222 if (influx_host_.size() > 1) {
41 222 params++;
42 } else {
43 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn,
44 "No value given for CVMFS_INFLUX_HOST");
45 }
46 }
47
48 259 std::string opt;
49
3/6
✓ Branch 2 taken 259 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 259 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 259 times.
✗ Branch 10 not taken.
259 if (options_mgr->GetValue("CVMFS_INFLUX_PORT", &opt)) {
50
2/4
✓ Branch 3 taken 259 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 259 times.
✗ Branch 7 not taken.
259 influx_port_ = static_cast<int>(String2Int64(opt.c_str()));
51
2/4
✓ Branch 0 taken 259 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 259 times.
✗ Branch 3 not taken.
259 if (influx_port_ > 0 && influx_port_ < 65536) {
52 259 params++;
53 } else {
54 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn,
55 "Invalid value for CVMFS_INFLUX_PORT [%s]", opt.c_str());
56 }
57 }
58
59
3/6
✓ Branch 2 taken 259 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 259 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 259 times.
✗ Branch 10 not taken.
259 if (options_mgr->GetValue("CVMFS_INFLUX_METRIC_NAME", &influx_metric_name_)) {
60 259 params++;
61 }
62
63
4/6
✓ Branch 2 taken 259 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 259 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 185 times.
✓ Branch 10 taken 74 times.
259 if (!options_mgr->GetValue("CVMFS_INFLUX_EXTRA_TAGS", &influx_extra_tags_)) {
64
1/2
✓ Branch 1 taken 185 times.
✗ Branch 2 not taken.
185 influx_extra_tags_ = "";
65 }
66
67
4/6
✓ Branch 2 taken 259 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 259 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 185 times.
✓ Branch 10 taken 74 times.
259 if (!options_mgr->GetValue("CVMFS_INFLUX_EXTRA_FIELDS",
68 &influx_extra_fields_)) {
69
1/2
✓ Branch 1 taken 185 times.
✗ Branch 2 not taken.
185 influx_extra_fields_ = "";
70 }
71
72 // Read CVMFS_INFLUX_SEND_DELTA option, default to true (ON)
73 259 std::string send_delta_str;
74
4/6
✓ Branch 2 taken 259 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 259 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 74 times.
✓ Branch 10 taken 185 times.
259 if (options_mgr->GetValue("CVMFS_INFLUX_SEND_DELTA", &send_delta_str)) {
75
1/2
✓ Branch 1 taken 74 times.
✗ Branch 2 not taken.
74 send_delta_ = !options_mgr->IsOff(send_delta_str);
76 }
77
78
2/2
✓ Branch 0 taken 222 times.
✓ Branch 1 taken 37 times.
259 if (params == 3) {
79 222 is_zombie_ = false;
80
1/2
✓ Branch 5 taken 222 times.
✗ Branch 6 not taken.
222 LogCvmfs(kLogTelemetry, kLogDebug,
81 "Enabling influx metrics. "
82 "Send to [%s:%d] metric [%s]. Extra tags"
83 "[%s]. Extra fields [%s].",
84 influx_host_.c_str(), influx_port_, influx_metric_name_.c_str(),
85 influx_extra_tags_.c_str(), influx_extra_fields_.c_str());
86
1/2
✓ Branch 1 taken 222 times.
✗ Branch 2 not taken.
222 const TelemetryReturn ret = OpenSocket();
87
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 222 times.
222 if (ret != kTelemetrySuccess) {
88 is_zombie_ = true;
89 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn,
90 "Not enabling influx metrics. Error while open socket. %d", ret);
91 }
92 } else {
93 37 is_zombie_ = true;
94
1/2
✓ Branch 1 taken 37 times.
✗ Branch 2 not taken.
37 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn,
95 "Not enabling influx metrics. Not all mandatory variables set: "
96 "CVMFS_INFLUX_METRIC_NAME, CVMFS_INFLUX_HOST, CVMFS_INFLUX_PORT");
97 }
98 259 }
99
100 518 TelemetryAggregatorInflux::~TelemetryAggregatorInflux() {
101
2/2
✓ Branch 0 taken 222 times.
✓ Branch 1 taken 37 times.
518 if (socket_fd_ >= 0) {
102 444 close(socket_fd_);
103 }
104
2/2
✓ Branch 0 taken 222 times.
✓ Branch 1 taken 37 times.
518 if (res_) {
105 444 freeaddrinfo(res_);
106 }
107 }
108
109 /**
110 * Creates a string in the influx data format containing the absolute values
111 * of the counters. Counters are only included if their absolute value is > 0.
112 *
113 * Influx dataformat
114 * ( https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/
115 )
116 * Syntax
117 <measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]]
118 <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
119 *
120 * Example
121 myMeasurement,tag1=value1,tag2=value2 fieldKey="fieldValue"
122 1556813561098000000
123 */
124 296 std::string TelemetryAggregatorInflux::MakePayload() {
125 // measurement and tags
126
1/2
✓ Branch 1 taken 296 times.
✗ Branch 2 not taken.
296 std::string metric_name = influx_metric_name_;
127
2/2
✓ Branch 0 taken 259 times.
✓ Branch 1 taken 37 times.
296 if (send_delta_) {
128
1/2
✓ Branch 1 taken 259 times.
✗ Branch 2 not taken.
259 metric_name += "_absolute";
129 }
130
2/4
✓ Branch 1 taken 296 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 296 times.
✗ Branch 5 not taken.
296 std::string ret = metric_name + ",repo=" + fqrn_;
131
132
3/4
✓ Branch 1 taken 296 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 148 times.
✓ Branch 4 taken 148 times.
296 if (influx_extra_tags_ != "") {
133
2/4
✓ Branch 1 taken 148 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 148 times.
✗ Branch 5 not taken.
148 ret += "," + influx_extra_tags_;
134 }
135
136 // fields
137
1/2
✓ Branch 1 taken 296 times.
✗ Branch 2 not taken.
296 ret += " ";
138 296 bool add_token = false;
139 592 for (std::map<std::string, int64_t>::iterator it = counters_.begin(),
140 296 iEnd = counters_.end();
141
2/2
✓ Branch 1 taken 888 times.
✓ Branch 2 taken 296 times.
1184 it != iEnd;
142 888 it++) {
143
2/2
✓ Branch 1 taken 407 times.
✓ Branch 2 taken 481 times.
888 if (it->second != 0) {
144
2/2
✓ Branch 0 taken 222 times.
✓ Branch 1 taken 185 times.
407 if (add_token) {
145
1/2
✓ Branch 1 taken 222 times.
✗ Branch 2 not taken.
222 ret += ",";
146 }
147
4/8
✓ Branch 2 taken 407 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 407 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 407 times.
✗ Branch 10 not taken.
✓ Branch 12 taken 407 times.
✗ Branch 13 not taken.
407 ret += it->first + "=" + StringifyInt(it->second);
148 407 add_token = true;
149 }
150 }
151
3/4
✓ Branch 1 taken 296 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 148 times.
✓ Branch 4 taken 148 times.
296 if (influx_extra_fields_ != "") {
152
2/2
✓ Branch 0 taken 74 times.
✓ Branch 1 taken 74 times.
148 if (add_token) {
153
1/2
✓ Branch 1 taken 74 times.
✗ Branch 2 not taken.
74 ret += ",";
154 }
155
1/2
✓ Branch 1 taken 148 times.
✗ Branch 2 not taken.
148 ret += influx_extra_fields_;
156 148 add_token = true;
157 }
158
159 // timestamp
160
2/2
✓ Branch 0 taken 259 times.
✓ Branch 1 taken 37 times.
296 if (add_token) {
161
1/2
✓ Branch 1 taken 259 times.
✗ Branch 2 not taken.
259 ret += " ";
162 }
163
2/4
✓ Branch 1 taken 296 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 296 times.
✗ Branch 5 not taken.
296 ret += StringifyUint(timestamp_);
164
165 592 return ret;
166 296 }
167
168 /**
169 * Creates a string in the influx data format containing the delta between
170 * 2 measurements of the same counter. Counters are only included if their
171 * absolute value is > 0 (delta can be 0).
172 *
173 * NOTE: As influx_extra_fields_ are static, they are excluded of this
174 * delta format
175 *
176 * Influx dataformat
177 * ( https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/
178 )
179 * Syntax
180 <measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]]
181 <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
182 *
183 * Example
184 myMeasurement,tag1=value1,tag2=value2 fieldKey="fieldValue"
185 1556813561098000000
186 */
187 74 std::string TelemetryAggregatorInflux::MakeDeltaPayload() {
188 // measurement and tags
189
2/4
✓ Branch 2 taken 74 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 74 times.
✗ Branch 6 not taken.
148 std::string ret = "" + influx_metric_name_ + "_delta,repo=" + fqrn_;
190
191
2/4
✓ Branch 1 taken 74 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 74 times.
✗ Branch 4 not taken.
74 if (influx_extra_tags_ != "") {
192
2/4
✓ Branch 1 taken 74 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 74 times.
✗ Branch 5 not taken.
74 ret += "," + influx_extra_tags_;
193 }
194
195 // fields
196
1/2
✓ Branch 1 taken 74 times.
✗ Branch 2 not taken.
74 ret += " ";
197 74 bool add_token = false;
198 148 for (std::map<std::string, int64_t>::iterator it = counters_.begin(),
199 74 iEnd = counters_.end();
200
2/2
✓ Branch 1 taken 222 times.
✓ Branch 2 taken 74 times.
296 it != iEnd;
201 222 it++) {
202 222 const int64_t value = it->second;
203
2/2
✓ Branch 0 taken 185 times.
✓ Branch 1 taken 37 times.
222 if (value != 0) {
204 int64_t old_value;
205 try {
206
1/2
✓ Branch 2 taken 185 times.
✗ Branch 3 not taken.
185 old_value = old_counters_.at(it->first);
207 } catch (const std::out_of_range &ex) {
208 old_value = 0;
209 }
210
2/2
✓ Branch 0 taken 111 times.
✓ Branch 1 taken 74 times.
185 if (add_token) {
211
1/2
✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
111 ret += ",";
212 }
213
4/8
✓ Branch 1 taken 185 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 185 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 185 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 185 times.
✗ Branch 12 not taken.
185 ret += it->first + "=" + StringifyInt(value - old_value);
214 185 add_token = true;
215 }
216 }
217
218 // timestamp
219
1/2
✓ Branch 0 taken 74 times.
✗ Branch 1 not taken.
74 if (add_token) {
220
1/2
✓ Branch 1 taken 74 times.
✗ Branch 2 not taken.
74 ret += " ";
221 }
222
2/4
✓ Branch 1 taken 74 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 74 times.
✗ Branch 5 not taken.
74 ret += StringifyUint(timestamp_);
223
224 74 return ret;
225 }
226
227 222 TelemetryReturn TelemetryAggregatorInflux::OpenSocket() {
228 222 const char *hostname = influx_host_.c_str();
229 struct addrinfo hints;
230 int err;
231
232 222 memset(&hints, 0, sizeof(hints));
233 222 hints.ai_family = AF_INET;
234 222 hints.ai_socktype = SOCK_DGRAM;
235
236
1/2
✓ Branch 2 taken 222 times.
✗ Branch 3 not taken.
222 err = getaddrinfo(influx_host_.c_str(), NULL, &hints, &res_);
237
2/4
✓ Branch 0 taken 222 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 222 times.
222 if (err != 0 || res_ == NULL) {
238 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr,
239 "Failed to resolve influx server [%s]. errno=%d", hostname, errno);
240 return kTelemetryFailHostAddress;
241 }
242
243 222 socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
244
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 222 times.
222 if (socket_fd_ < 0) {
245 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr, "Failed to open socket");
246 freeaddrinfo(res_);
247 return kTelemetryFailSocket;
248 }
249
250 222 return kTelemetrySuccess;
251 }
252
253 TelemetryReturn TelemetryAggregatorInflux::SendToInflux(
254 const std::string &payload) {
255 struct sockaddr_in *dest_addr = NULL;
256 dest_addr = reinterpret_cast<sockaddr_in *>(res_->ai_addr);
257 dest_addr->sin_port = htons(influx_port_);
258
259 const ssize_t num_bytes_sent = sendto(
260 socket_fd_, payload.data(), payload.size(), 0,
261 reinterpret_cast<struct sockaddr *>(dest_addr),
262 sizeof(struct sockaddr_in));
263
264 if (num_bytes_sent < 0) {
265 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr,
266 "Failed to send to influx. errno=%d", errno);
267 return kTelemetryFailSend;
268 } else if (static_cast<size_t>(num_bytes_sent) != payload.size()) {
269 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr,
270 "Incomplete send. Bytes transferred: %zd. Bytes expected %lu",
271 num_bytes_sent, payload.size());
272 return kTelemetryFailSend;
273 }
274
275 LogCvmfs(kLogTelemetry, kLogDebug, "INFLUX: POSTING [%s]", payload.c_str());
276 return kTelemetrySuccess;
277 }
278
279
280 void TelemetryAggregatorInflux::PushMetrics() {
281 // create payload
282 std::string payload = MakePayload();
283 std::string delta_payload = "";
284 if (send_delta_ && old_counters_.size() > 0) {
285 delta_payload = MakeDeltaPayload();
286 payload = payload + "\n" + delta_payload;
287 }
288 payload += "\n";
289
290 // send to influx
291 SendToInflux(payload);
292
293 // current counter is now old counter (only if delta sending is enabled)
294 if (send_delta_) {
295 counters_.swap(old_counters_);
296 }
297 }
298
299 } // namespace perf
300