GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/telemetry_aggregator_influx.cc
Date: 2025-09-28 02:35:26
Exec Total Coverage
Lines: 97 134 72.4%
Branches: 104 212 49.1%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "telemetry_aggregator_influx.h"
6
7 #include <netinet/in.h>
8 #include <pthread.h>
9 #include <stdint.h>
10 #include <sys/socket.h>
11 #include <sys/types.h>
12
13 #include <cstring>
14 #include <map>
15 #include <string>
16 #include <vector>
17
18 #include "util/logging.h"
19 #include "util/posix.h"
20 #include "util/string.h"
21
22 namespace perf {
23
24 273 TelemetryAggregatorInflux::TelemetryAggregatorInflux(
25 Statistics *statistics,
26 int send_rate_sec,
27 OptionsManager *options_mgr,
28 MountPoint *mount_point,
29 273 const std::string &fqrn)
30 : TelemetryAggregator(statistics, send_rate_sec, mount_point, fqrn)
31
1/2
✓ Branch 2 taken 273 times.
✗ Branch 3 not taken.
273 , influx_extra_fields_("")
32
1/2
✓ Branch 2 taken 273 times.
✗ Branch 3 not taken.
273 , influx_extra_tags_("")
33 273 , send_delta_(true)
34 273 , socket_fd_(-1)
35 273 , res_(NULL) {
36 273 int params = 0;
37
38
4/6
✓ Branch 2 taken 273 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 273 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 234 times.
✓ Branch 10 taken 39 times.
273 if (options_mgr->GetValue("CVMFS_INFLUX_HOST", &influx_host_)) {
39
1/2
✓ Branch 1 taken 234 times.
✗ Branch 2 not taken.
234 if (influx_host_.size() > 1) {
40 234 params++;
41 } else {
42 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn,
43 "No value given for CVMFS_INFLUX_HOST");
44 }
45 }
46
47 273 std::string opt;
48
3/6
✓ Branch 2 taken 273 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 273 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 273 times.
✗ Branch 10 not taken.
273 if (options_mgr->GetValue("CVMFS_INFLUX_PORT", &opt)) {
49
2/4
✓ Branch 3 taken 273 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 273 times.
✗ Branch 7 not taken.
273 influx_port_ = static_cast<int>(String2Int64(opt.c_str()));
50
2/4
✓ Branch 0 taken 273 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 273 times.
✗ Branch 3 not taken.
273 if (influx_port_ > 0 && influx_port_ < 65536) {
51 273 params++;
52 } else {
53 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn,
54 "Invalid value for CVMFS_INFLUX_PORT [%s]", opt.c_str());
55 }
56 }
57
58
3/6
✓ Branch 2 taken 273 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 273 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 273 times.
✗ Branch 10 not taken.
273 if (options_mgr->GetValue("CVMFS_INFLUX_METRIC_NAME", &influx_metric_name_)) {
59 273 params++;
60 }
61
62
4/6
✓ Branch 2 taken 273 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 273 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 195 times.
✓ Branch 10 taken 78 times.
273 if (!options_mgr->GetValue("CVMFS_INFLUX_EXTRA_TAGS", &influx_extra_tags_)) {
63
1/2
✓ Branch 1 taken 195 times.
✗ Branch 2 not taken.
195 influx_extra_tags_ = "";
64 }
65
66
4/6
✓ Branch 2 taken 273 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 273 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 195 times.
✓ Branch 10 taken 78 times.
273 if (!options_mgr->GetValue("CVMFS_INFLUX_EXTRA_FIELDS",
67 &influx_extra_fields_)) {
68
1/2
✓ Branch 1 taken 195 times.
✗ Branch 2 not taken.
195 influx_extra_fields_ = "";
69 }
70
71 // Read CVMFS_INFLUX_SEND_DELTA option, default to true (ON)
72 273 std::string send_delta_str;
73
4/6
✓ Branch 2 taken 273 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 273 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 78 times.
✓ Branch 10 taken 195 times.
273 if (options_mgr->GetValue("CVMFS_INFLUX_SEND_DELTA", &send_delta_str)) {
74
1/2
✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
78 send_delta_ = !options_mgr->IsOff(send_delta_str);
75 }
76
77
2/2
✓ Branch 0 taken 234 times.
✓ Branch 1 taken 39 times.
273 if (params == 3) {
78 234 is_zombie_ = false;
79
1/2
✓ Branch 5 taken 234 times.
✗ Branch 6 not taken.
234 LogCvmfs(kLogTelemetry, kLogDebug,
80 "Enabling influx metrics. "
81 "Send to [%s:%d] metric [%s]. Extra tags"
82 "[%s]. Extra fields [%s].",
83 influx_host_.c_str(), influx_port_, influx_metric_name_.c_str(),
84 influx_extra_tags_.c_str(), influx_extra_fields_.c_str());
85
1/2
✓ Branch 1 taken 234 times.
✗ Branch 2 not taken.
234 const TelemetryReturn ret = OpenSocket();
86
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 234 times.
234 if (ret != kTelemetrySuccess) {
87 is_zombie_ = true;
88 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn,
89 "Not enabling influx metrics. Error while open socket. %d", ret);
90 }
91 } else {
92 39 is_zombie_ = true;
93
1/2
✓ Branch 1 taken 39 times.
✗ Branch 2 not taken.
39 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn,
94 "Not enabling influx metrics. Not all mandatory variables set: "
95 "CVMFS_INFLUX_METRIC_NAME, CVMFS_INFLUX_HOST, CVMFS_INFLUX_PORT");
96 }
97 273 }
98
99 546 TelemetryAggregatorInflux::~TelemetryAggregatorInflux() {
100
2/2
✓ Branch 0 taken 234 times.
✓ Branch 1 taken 39 times.
546 if (socket_fd_ >= 0) {
101 468 close(socket_fd_);
102 }
103
2/2
✓ Branch 0 taken 234 times.
✓ Branch 1 taken 39 times.
546 if (res_) {
104 468 freeaddrinfo(res_);
105 }
106 }
107
108 /**
109 * Creates a string in the influx data format containing the absolute values
110 * of the counters. Counters are only included if their absolute value is > 0.
111 *
112 * Influx dataformat
113 * ( https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/
114 )
115 * Syntax
116 <measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]]
117 <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
118 *
119 * Example
120 myMeasurement,tag1=value1,tag2=value2 fieldKey="fieldValue"
121 1556813561098000000
122 */
123 312 std::string TelemetryAggregatorInflux::MakePayload() {
124 // measurement and tags
125
1/2
✓ Branch 1 taken 312 times.
✗ Branch 2 not taken.
312 std::string metric_name = influx_metric_name_;
126
2/2
✓ Branch 0 taken 273 times.
✓ Branch 1 taken 39 times.
312 if (send_delta_) {
127
1/2
✓ Branch 1 taken 273 times.
✗ Branch 2 not taken.
273 metric_name += "_absolute";
128 }
129
2/4
✓ Branch 1 taken 312 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 312 times.
✗ Branch 5 not taken.
312 std::string ret = metric_name + ",repo=" + fqrn_;
130
131
3/4
✓ Branch 1 taken 312 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 156 times.
✓ Branch 4 taken 156 times.
312 if (influx_extra_tags_ != "") {
132
2/4
✓ Branch 1 taken 156 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 156 times.
✗ Branch 5 not taken.
156 ret += "," + influx_extra_tags_;
133 }
134
135 // fields
136
1/2
✓ Branch 1 taken 312 times.
✗ Branch 2 not taken.
312 ret += " ";
137 312 bool add_token = false;
138 624 for (std::map<std::string, int64_t>::iterator it = counters_.begin(),
139 312 iEnd = counters_.end();
140
2/2
✓ Branch 1 taken 936 times.
✓ Branch 2 taken 312 times.
1248 it != iEnd;
141 936 it++) {
142
2/2
✓ Branch 1 taken 429 times.
✓ Branch 2 taken 507 times.
936 if (it->second != 0) {
143
2/2
✓ Branch 0 taken 234 times.
✓ Branch 1 taken 195 times.
429 if (add_token) {
144
1/2
✓ Branch 1 taken 234 times.
✗ Branch 2 not taken.
234 ret += ",";
145 }
146
4/8
✓ Branch 2 taken 429 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 429 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 429 times.
✗ Branch 10 not taken.
✓ Branch 12 taken 429 times.
✗ Branch 13 not taken.
429 ret += it->first + "=" + StringifyInt(it->second);
147 429 add_token = true;
148 }
149 }
150
3/4
✓ Branch 1 taken 312 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 156 times.
✓ Branch 4 taken 156 times.
312 if (influx_extra_fields_ != "") {
151
2/2
✓ Branch 0 taken 78 times.
✓ Branch 1 taken 78 times.
156 if (add_token) {
152
1/2
✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
78 ret += ",";
153 }
154
1/2
✓ Branch 1 taken 156 times.
✗ Branch 2 not taken.
156 ret += influx_extra_fields_;
155 156 add_token = true;
156 }
157
158 // timestamp
159
2/2
✓ Branch 0 taken 273 times.
✓ Branch 1 taken 39 times.
312 if (add_token) {
160
1/2
✓ Branch 1 taken 273 times.
✗ Branch 2 not taken.
273 ret += " ";
161 }
162
2/4
✓ Branch 1 taken 312 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 312 times.
✗ Branch 5 not taken.
312 ret += StringifyUint(timestamp_);
163
164 624 return ret;
165 312 }
166
167 /**
168 * Creates a string in the influx data format containing the delta between
169 * 2 measurements of the same counter. Counters are only included if their
170 * absolute value is > 0 (delta can be 0).
171 *
172 * NOTE: As influx_extra_fields_ are static, they are excluded of this
173 * delta format
174 *
175 * Influx dataformat
176 * ( https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/
177 )
178 * Syntax
179 <measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]]
180 <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
181 *
182 * Example
183 myMeasurement,tag1=value1,tag2=value2 fieldKey="fieldValue"
184 1556813561098000000
185 */
186 78 std::string TelemetryAggregatorInflux::MakeDeltaPayload() {
187 // measurement and tags
188
2/4
✓ Branch 2 taken 78 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 78 times.
✗ Branch 6 not taken.
156 std::string ret = "" + influx_metric_name_ + "_delta,repo=" + fqrn_;
189
190
2/4
✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 78 times.
✗ Branch 4 not taken.
78 if (influx_extra_tags_ != "") {
191
2/4
✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 78 times.
✗ Branch 5 not taken.
78 ret += "," + influx_extra_tags_;
192 }
193
194 // fields
195
1/2
✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
78 ret += " ";
196 78 bool add_token = false;
197 156 for (std::map<std::string, int64_t>::iterator it = counters_.begin(),
198 78 iEnd = counters_.end();
199
2/2
✓ Branch 1 taken 234 times.
✓ Branch 2 taken 78 times.
312 it != iEnd;
200 234 it++) {
201 234 const int64_t value = it->second;
202
2/2
✓ Branch 0 taken 195 times.
✓ Branch 1 taken 39 times.
234 if (value != 0) {
203 int64_t old_value;
204 try {
205
1/2
✓ Branch 2 taken 195 times.
✗ Branch 3 not taken.
195 old_value = old_counters_.at(it->first);
206 } catch (const std::out_of_range &ex) {
207 old_value = 0;
208 }
209
2/2
✓ Branch 0 taken 117 times.
✓ Branch 1 taken 78 times.
195 if (add_token) {
210
1/2
✓ Branch 1 taken 117 times.
✗ Branch 2 not taken.
117 ret += ",";
211 }
212
4/8
✓ Branch 1 taken 195 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 195 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 195 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 195 times.
✗ Branch 12 not taken.
195 ret += it->first + "=" + StringifyInt(value - old_value);
213 195 add_token = true;
214 }
215 }
216
217 // timestamp
218
1/2
✓ Branch 0 taken 78 times.
✗ Branch 1 not taken.
78 if (add_token) {
219
1/2
✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
78 ret += " ";
220 }
221
2/4
✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 78 times.
✗ Branch 5 not taken.
78 ret += StringifyUint(timestamp_);
222
223 78 return ret;
224 }
225
226 234 TelemetryReturn TelemetryAggregatorInflux::OpenSocket() {
227 234 const char *hostname = influx_host_.c_str();
228 struct addrinfo hints;
229 int err;
230
231 234 memset(&hints, 0, sizeof(hints));
232 234 hints.ai_family = AF_INET;
233 234 hints.ai_socktype = SOCK_DGRAM;
234
235
1/2
✓ Branch 2 taken 234 times.
✗ Branch 3 not taken.
234 err = getaddrinfo(influx_host_.c_str(), NULL, &hints, &res_);
236
2/4
✓ Branch 0 taken 234 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 234 times.
234 if (err != 0 || res_ == NULL) {
237 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr,
238 "Failed to resolve influx server [%s]. errno=%d", hostname, errno);
239 return kTelemetryFailHostAddress;
240 }
241
242 234 socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
243
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 234 times.
234 if (socket_fd_ < 0) {
244 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr, "Failed to open socket");
245 freeaddrinfo(res_);
246 return kTelemetryFailSocket;
247 }
248
249 234 return kTelemetrySuccess;
250 }
251
252 TelemetryReturn TelemetryAggregatorInflux::SendToInflux(
253 const std::string &payload) {
254 struct sockaddr_in *dest_addr = NULL;
255 dest_addr = reinterpret_cast<sockaddr_in *>(res_->ai_addr);
256 dest_addr->sin_port = htons(influx_port_);
257
258 const ssize_t num_bytes_sent = sendto(
259 socket_fd_, payload.data(), payload.size(), 0,
260 reinterpret_cast<struct sockaddr *>(dest_addr),
261 sizeof(struct sockaddr_in));
262
263 if (num_bytes_sent < 0) {
264 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr,
265 "Failed to send to influx. errno=%d", errno);
266 return kTelemetryFailSend;
267 } else if (static_cast<size_t>(num_bytes_sent) != payload.size()) {
268 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr,
269 "Incomplete send. Bytes transferred: %zd. Bytes expected %lu",
270 num_bytes_sent, payload.size());
271 return kTelemetryFailSend;
272 }
273
274 LogCvmfs(kLogTelemetry, kLogDebug, "INFLUX: POSTING [%s]", payload.c_str());
275 return kTelemetrySuccess;
276 }
277
278
279 void TelemetryAggregatorInflux::PushMetrics() {
280 // create payload
281 std::string payload = MakePayload();
282 std::string delta_payload = "";
283 if (send_delta_ && old_counters_.size() > 0) {
284 delta_payload = MakeDeltaPayload();
285 payload = payload + "\n" + delta_payload;
286 }
287 payload += "\n";
288
289 // send to influx
290 SendToInflux(payload);
291
292 // current counter is now old counter (only if delta sending is enabled)
293 if (send_delta_) {
294 counters_.swap(old_counters_);
295 }
296 }
297
298 } // namespace perf
299