GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/telemetry_aggregator_influx.cc
Date: 2025-07-13 02:35:07
Exec Total Coverage
Lines: 89 125 71.2%
Branches: 94 190 49.5%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "telemetry_aggregator_influx.h"
6
7 #include <netinet/in.h>
8 #include <pthread.h>
9 #include <stdint.h>
10 #include <sys/socket.h>
11 #include <sys/types.h>
12
13 #include <cstring>
14 #include <map>
15 #include <string>
16 #include <vector>
17
18 #include "util/logging.h"
19 #include "util/posix.h"
20 #include "util/string.h"
21
22 namespace perf {
23
24 40 TelemetryAggregatorInflux::TelemetryAggregatorInflux(
25 Statistics *statistics,
26 int send_rate_sec,
27 OptionsManager *options_mgr,
28 MountPoint *mount_point,
29 40 const std::string &fqrn)
30 : TelemetryAggregator(statistics, send_rate_sec, mount_point, fqrn)
31
1/2
✓ Branch 2 taken 40 times.
✗ Branch 3 not taken.
40 , influx_extra_fields_("")
32
1/2
✓ Branch 2 taken 40 times.
✗ Branch 3 not taken.
40 , influx_extra_tags_("")
33 40 , socket_fd_(-1)
34 40 , res_(NULL) {
35 40 int params = 0;
36
37
4/6
✓ Branch 2 taken 40 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 40 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 30 times.
✓ Branch 10 taken 10 times.
40 if (options_mgr->GetValue("CVMFS_INFLUX_HOST", &influx_host_)) {
38
1/2
✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
30 if (influx_host_.size() > 1) {
39 30 params++;
40 } else {
41 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn,
42 "No value given for CVMFS_INFLUX_HOST");
43 }
44 }
45
46 40 std::string opt;
47
3/6
✓ Branch 2 taken 40 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 40 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 40 times.
✗ Branch 10 not taken.
40 if (options_mgr->GetValue("CVMFS_INFLUX_PORT", &opt)) {
48
2/4
✓ Branch 3 taken 40 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 40 times.
✗ Branch 7 not taken.
40 influx_port_ = static_cast<int>(String2Int64(opt.c_str()));
49
2/4
✓ Branch 0 taken 40 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 40 times.
✗ Branch 3 not taken.
40 if (influx_port_ > 0 && influx_port_ < 65536) {
50 40 params++;
51 } else {
52 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn,
53 "Invalid value for CVMFS_INFLUX_PORT [%s]", opt.c_str());
54 }
55 }
56
57
3/6
✓ Branch 2 taken 40 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 40 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 40 times.
✗ Branch 10 not taken.
40 if (options_mgr->GetValue("CVMFS_INFLUX_METRIC_NAME", &influx_metric_name_)) {
58 40 params++;
59 }
60
61
4/6
✓ Branch 2 taken 40 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 40 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 20 times.
✓ Branch 10 taken 20 times.
40 if (!options_mgr->GetValue("CVMFS_INFLUX_EXTRA_TAGS", &influx_extra_tags_)) {
62
1/2
✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
20 influx_extra_tags_ = "";
63 }
64
65
4/6
✓ Branch 2 taken 40 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 40 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 20 times.
✓ Branch 10 taken 20 times.
40 if (!options_mgr->GetValue("CVMFS_INFLUX_EXTRA_FIELDS",
66 &influx_extra_fields_)) {
67
1/2
✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
20 influx_extra_fields_ = "";
68 }
69
70
2/2
✓ Branch 0 taken 30 times.
✓ Branch 1 taken 10 times.
40 if (params == 3) {
71 30 is_zombie_ = false;
72
1/2
✓ Branch 5 taken 30 times.
✗ Branch 6 not taken.
30 LogCvmfs(kLogTelemetry, kLogDebug,
73 "Enabling influx metrics. "
74 "Send to [%s:%d] metric [%s]. Extra tags"
75 "[%s]. Extra fields [%s].",
76 influx_host_.c_str(), influx_port_, influx_metric_name_.c_str(),
77 influx_extra_tags_.c_str(), influx_extra_fields_.c_str());
78
1/2
✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
30 const TelemetryReturn ret = OpenSocket();
79
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 30 times.
30 if (ret != kTelemetrySuccess) {
80 is_zombie_ = true;
81 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn,
82 "Not enabling influx metrics. Error while open socket. %d", ret);
83 }
84 } else {
85 10 is_zombie_ = true;
86
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn,
87 "Not enabling influx metrics. Not all mandatory variables set: "
88 "CVMFS_INFLUX_METRIC_NAME, CVMFS_INFLUX_HOST, CVMFS_INFLUX_PORT");
89 }
90 40 }
91
92 80 TelemetryAggregatorInflux::~TelemetryAggregatorInflux() {
93
2/2
✓ Branch 0 taken 30 times.
✓ Branch 1 taken 10 times.
80 if (socket_fd_ >= 0) {
94 60 close(socket_fd_);
95 }
96
2/2
✓ Branch 0 taken 30 times.
✓ Branch 1 taken 10 times.
80 if (res_) {
97 60 freeaddrinfo(res_);
98 }
99 }
100
101 /**
102 * Creates a string in the influx data format containing the absolute values
103 * of the counters. Counters are only included if their absolute value is > 0.
104 *
105 * Influx dataformat
106 * ( https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/
107 )
108 * Syntax
109 <measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]]
110 <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
111 *
112 * Example
113 myMeasurement,tag1=value1,tag2=value2 fieldKey="fieldValue"
114 1556813561098000000
115 */
116 50 std::string TelemetryAggregatorInflux::MakePayload() {
117 // measurement and tags
118
1/2
✓ Branch 2 taken 50 times.
✗ Branch 3 not taken.
50 std::string ret = influx_metric_name_ + "_absolute,repo=" + fqrn_;
119
120
3/4
✓ Branch 1 taken 50 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 40 times.
✓ Branch 4 taken 10 times.
50 if (influx_extra_tags_ != "") {
121
2/4
✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 40 times.
✗ Branch 5 not taken.
40 ret += "," + influx_extra_tags_;
122 }
123
124 // fields
125
1/2
✓ Branch 1 taken 50 times.
✗ Branch 2 not taken.
50 ret += " ";
126 50 bool add_token = false;
127 100 for (std::map<std::string, int64_t>::iterator it = counters_.begin(),
128 50 iEnd = counters_.end();
129
2/2
✓ Branch 1 taken 150 times.
✓ Branch 2 taken 50 times.
200 it != iEnd;
130 150 it++) {
131
2/2
✓ Branch 1 taken 50 times.
✓ Branch 2 taken 100 times.
150 if (it->second != 0) {
132
2/2
✓ Branch 0 taken 30 times.
✓ Branch 1 taken 20 times.
50 if (add_token) {
133
1/2
✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
30 ret += ",";
134 }
135
4/8
✓ Branch 2 taken 50 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 50 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 50 times.
✗ Branch 10 not taken.
✓ Branch 12 taken 50 times.
✗ Branch 13 not taken.
50 ret += it->first + "=" + StringifyInt(it->second);
136 50 add_token = true;
137 }
138 }
139
3/4
✓ Branch 1 taken 50 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 40 times.
✓ Branch 4 taken 10 times.
50 if (influx_extra_fields_ != "") {
140
2/2
✓ Branch 0 taken 20 times.
✓ Branch 1 taken 20 times.
40 if (add_token) {
141
1/2
✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
20 ret += ",";
142 }
143
1/2
✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
40 ret += influx_extra_fields_;
144 40 add_token = true;
145 }
146
147 // timestamp
148
2/2
✓ Branch 0 taken 40 times.
✓ Branch 1 taken 10 times.
50 if (add_token) {
149
1/2
✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
40 ret += " ";
150 }
151
2/4
✓ Branch 1 taken 50 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 50 times.
✗ Branch 5 not taken.
50 ret += StringifyUint(timestamp_);
152
153 50 return ret;
154 }
155
156 /**
157 * Creates a string in the influx data format containing the delta between
158 * 2 measurements of the same counter. Counters are only included if their
159 * absolute value is > 0 (delta can be 0).
160 *
161 * NOTE: As influx_extra_fields_ are static, they are excluded of this
162 * delta format
163 *
164 * Influx dataformat
165 * ( https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/
166 )
167 * Syntax
168 <measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]]
169 <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
170 *
171 * Example
172 myMeasurement,tag1=value1,tag2=value2 fieldKey="fieldValue"
173 1556813561098000000
174 */
175 20 std::string TelemetryAggregatorInflux::MakeDeltaPayload() {
176 // measurement and tags
177
2/4
✓ Branch 2 taken 20 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 20 times.
✗ Branch 6 not taken.
40 std::string ret = "" + influx_metric_name_ + "_delta,repo=" + fqrn_;
178
179
2/4
✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 20 times.
✗ Branch 4 not taken.
20 if (influx_extra_tags_ != "") {
180
2/4
✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 20 times.
✗ Branch 5 not taken.
20 ret += "," + influx_extra_tags_;
181 }
182
183 // fields
184
1/2
✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
20 ret += " ";
185 20 bool add_token = false;
186 40 for (std::map<std::string, int64_t>::iterator it = counters_.begin(),
187 20 iEnd = counters_.end();
188
2/2
✓ Branch 1 taken 60 times.
✓ Branch 2 taken 20 times.
80 it != iEnd;
189 60 it++) {
190 60 const int64_t value = it->second;
191
2/2
✓ Branch 0 taken 50 times.
✓ Branch 1 taken 10 times.
60 if (value != 0) {
192 int64_t old_value;
193 try {
194
1/2
✓ Branch 2 taken 50 times.
✗ Branch 3 not taken.
50 old_value = old_counters_.at(it->first);
195 } catch (const std::out_of_range &ex) {
196 old_value = 0;
197 }
198
2/2
✓ Branch 0 taken 30 times.
✓ Branch 1 taken 20 times.
50 if (add_token) {
199
1/2
✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
30 ret += ",";
200 }
201
4/8
✓ Branch 1 taken 50 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 50 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 50 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 50 times.
✗ Branch 12 not taken.
50 ret += it->first + "=" + StringifyInt(value - old_value);
202 50 add_token = true;
203 }
204 }
205
206 // timestamp
207
1/2
✓ Branch 0 taken 20 times.
✗ Branch 1 not taken.
20 if (add_token) {
208
1/2
✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
20 ret += " ";
209 }
210
2/4
✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 20 times.
✗ Branch 5 not taken.
20 ret += StringifyUint(timestamp_);
211
212 20 return ret;
213 }
214
215 30 TelemetryReturn TelemetryAggregatorInflux::OpenSocket() {
216 30 const char *hostname = influx_host_.c_str();
217 struct addrinfo hints;
218 int err;
219
220 30 memset(&hints, 0, sizeof(hints));
221 30 hints.ai_family = AF_INET;
222 30 hints.ai_socktype = SOCK_DGRAM;
223
224
1/2
✓ Branch 2 taken 30 times.
✗ Branch 3 not taken.
30 err = getaddrinfo(influx_host_.c_str(), NULL, &hints, &res_);
225
2/4
✓ Branch 0 taken 30 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 30 times.
30 if (err != 0 || res_ == NULL) {
226 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr,
227 "Failed to resolve influx server [%s]. errno=%d", hostname, errno);
228 return kTelemetryFailHostAddress;
229 }
230
231 30 socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
232
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 30 times.
30 if (socket_fd_ < 0) {
233 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr, "Failed to open socket");
234 freeaddrinfo(res_);
235 return kTelemetryFailSocket;
236 }
237
238 30 return kTelemetrySuccess;
239 }
240
241 TelemetryReturn TelemetryAggregatorInflux::SendToInflux(
242 const std::string &payload) {
243 struct sockaddr_in *dest_addr = NULL;
244 dest_addr = reinterpret_cast<sockaddr_in *>(res_->ai_addr);
245 dest_addr->sin_port = htons(influx_port_);
246
247 const ssize_t num_bytes_sent = sendto(
248 socket_fd_, payload.data(), payload.size(), 0,
249 reinterpret_cast<struct sockaddr *>(dest_addr),
250 sizeof(struct sockaddr_in));
251
252 if (num_bytes_sent < 0) {
253 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr,
254 "Failed to send to influx. errno=%d", errno);
255 return kTelemetryFailSend;
256 } else if (static_cast<size_t>(num_bytes_sent) != payload.size()) {
257 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr,
258 "Incomplete send. Bytes transferred: %zd. Bytes expected %lu",
259 num_bytes_sent, payload.size());
260 return kTelemetryFailSend;
261 }
262
263 LogCvmfs(kLogTelemetry, kLogDebug, "INFLUX: POSTING [%s]", payload.c_str());
264 return kTelemetrySuccess;
265 }
266
267
268 void TelemetryAggregatorInflux::PushMetrics() {
269 // create payload
270 std::string payload = MakePayload();
271 std::string delta_payload = "";
272 if (old_counters_.size() > 0) {
273 delta_payload = MakeDeltaPayload();
274 payload = payload + "\n" + delta_payload;
275 }
276 payload += "\n";
277
278 // send to influx
279 SendToInflux(payload);
280
281 // current counter is now old counter
282 counters_.swap(old_counters_);
283 }
284
285 } // namespace perf
286