Directory: | cvmfs/ |
---|---|
File: | cvmfs/telemetry_aggregator_influx.cc |
Date: | 2025-08-31 02:39:21 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 97 | 134 | 72.4% |
Branches: | 104 | 212 | 49.1% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #include "telemetry_aggregator_influx.h" | ||
6 | |||
7 | #include <netinet/in.h> | ||
8 | #include <pthread.h> | ||
9 | #include <stdint.h> | ||
10 | #include <sys/socket.h> | ||
11 | #include <sys/types.h> | ||
12 | |||
13 | #include <cstring> | ||
14 | #include <map> | ||
15 | #include <string> | ||
16 | #include <vector> | ||
17 | |||
18 | #include "util/logging.h" | ||
19 | #include "util/posix.h" | ||
20 | #include "util/string.h" | ||
21 | |||
22 | namespace perf { | ||
23 | |||
24 | 301 | TelemetryAggregatorInflux::TelemetryAggregatorInflux( | |
25 | Statistics *statistics, | ||
26 | int send_rate_sec, | ||
27 | OptionsManager *options_mgr, | ||
28 | MountPoint *mount_point, | ||
29 | 301 | const std::string &fqrn) | |
30 | : TelemetryAggregator(statistics, send_rate_sec, mount_point, fqrn) | ||
31 |
1/2✓ Branch 2 taken 301 times.
✗ Branch 3 not taken.
|
301 | , influx_extra_fields_("") |
32 |
1/2✓ Branch 2 taken 301 times.
✗ Branch 3 not taken.
|
301 | , influx_extra_tags_("") |
33 | 301 | , send_delta_(true) | |
34 | 301 | , socket_fd_(-1) | |
35 | 301 | , res_(NULL) { | |
36 | 301 | int params = 0; | |
37 | |||
38 |
4/6✓ Branch 2 taken 301 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 301 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 258 times.
✓ Branch 10 taken 43 times.
|
301 | if (options_mgr->GetValue("CVMFS_INFLUX_HOST", &influx_host_)) { |
39 |
1/2✓ Branch 1 taken 258 times.
✗ Branch 2 not taken.
|
258 | if (influx_host_.size() > 1) { |
40 | 258 | params++; | |
41 | } else { | ||
42 | ✗ | LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn, | |
43 | "No value given for CVMFS_INFLUX_HOST"); | ||
44 | } | ||
45 | } | ||
46 | |||
47 | 301 | std::string opt; | |
48 |
3/6✓ Branch 2 taken 301 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 301 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 301 times.
✗ Branch 10 not taken.
|
301 | if (options_mgr->GetValue("CVMFS_INFLUX_PORT", &opt)) { |
49 |
2/4✓ Branch 3 taken 301 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 301 times.
✗ Branch 7 not taken.
|
301 | influx_port_ = static_cast<int>(String2Int64(opt.c_str())); |
50 |
2/4✓ Branch 0 taken 301 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 301 times.
✗ Branch 3 not taken.
|
301 | if (influx_port_ > 0 && influx_port_ < 65536) { |
51 | 301 | params++; | |
52 | } else { | ||
53 | ✗ | LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn, | |
54 | "Invalid value for CVMFS_INFLUX_PORT [%s]", opt.c_str()); | ||
55 | } | ||
56 | } | ||
57 | |||
58 |
3/6✓ Branch 2 taken 301 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 301 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 301 times.
✗ Branch 10 not taken.
|
301 | if (options_mgr->GetValue("CVMFS_INFLUX_METRIC_NAME", &influx_metric_name_)) { |
59 | 301 | params++; | |
60 | } | ||
61 | |||
62 |
4/6✓ Branch 2 taken 301 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 301 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 215 times.
✓ Branch 10 taken 86 times.
|
301 | if (!options_mgr->GetValue("CVMFS_INFLUX_EXTRA_TAGS", &influx_extra_tags_)) { |
63 |
1/2✓ Branch 1 taken 215 times.
✗ Branch 2 not taken.
|
215 | influx_extra_tags_ = ""; |
64 | } | ||
65 | |||
66 |
4/6✓ Branch 2 taken 301 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 301 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 215 times.
✓ Branch 10 taken 86 times.
|
301 | if (!options_mgr->GetValue("CVMFS_INFLUX_EXTRA_FIELDS", |
67 | &influx_extra_fields_)) { | ||
68 |
1/2✓ Branch 1 taken 215 times.
✗ Branch 2 not taken.
|
215 | influx_extra_fields_ = ""; |
69 | } | ||
70 | |||
71 | // Read CVMFS_INFLUX_SEND_DELTA option, default to true (ON) | ||
72 | 301 | std::string send_delta_str; | |
73 |
4/6✓ Branch 2 taken 301 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 301 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 86 times.
✓ Branch 10 taken 215 times.
|
301 | if (options_mgr->GetValue("CVMFS_INFLUX_SEND_DELTA", &send_delta_str)) { |
74 |
1/2✓ Branch 1 taken 86 times.
✗ Branch 2 not taken.
|
86 | send_delta_ = !options_mgr->IsOff(send_delta_str); |
75 | } | ||
76 | |||
77 |
2/2✓ Branch 0 taken 258 times.
✓ Branch 1 taken 43 times.
|
301 | if (params == 3) { |
78 | 258 | is_zombie_ = false; | |
79 |
1/2✓ Branch 5 taken 258 times.
✗ Branch 6 not taken.
|
258 | LogCvmfs(kLogTelemetry, kLogDebug, |
80 | "Enabling influx metrics. " | ||
81 | "Send to [%s:%d] metric [%s]. Extra tags" | ||
82 | "[%s]. Extra fields [%s].", | ||
83 | influx_host_.c_str(), influx_port_, influx_metric_name_.c_str(), | ||
84 | influx_extra_tags_.c_str(), influx_extra_fields_.c_str()); | ||
85 |
1/2✓ Branch 1 taken 258 times.
✗ Branch 2 not taken.
|
258 | const TelemetryReturn ret = OpenSocket(); |
86 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 258 times.
|
258 | if (ret != kTelemetrySuccess) { |
87 | ✗ | is_zombie_ = true; | |
88 | ✗ | LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn, | |
89 | "Not enabling influx metrics. Error while open socket. %d", ret); | ||
90 | } | ||
91 | } else { | ||
92 | 43 | is_zombie_ = true; | |
93 |
1/2✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
|
43 | LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn, |
94 | "Not enabling influx metrics. Not all mandatory variables set: " | ||
95 | "CVMFS_INFLUX_METRIC_NAME, CVMFS_INFLUX_HOST, CVMFS_INFLUX_PORT"); | ||
96 | } | ||
97 | 301 | } | |
98 | |||
99 | 602 | TelemetryAggregatorInflux::~TelemetryAggregatorInflux() { | |
100 |
2/2✓ Branch 0 taken 258 times.
✓ Branch 1 taken 43 times.
|
602 | if (socket_fd_ >= 0) { |
101 | 516 | close(socket_fd_); | |
102 | } | ||
103 |
2/2✓ Branch 0 taken 258 times.
✓ Branch 1 taken 43 times.
|
602 | if (res_) { |
104 | 516 | freeaddrinfo(res_); | |
105 | } | ||
106 | } | ||
107 | |||
108 | /** | ||
109 | * Creates a string in the influx data format containing the absolute values | ||
110 | * of the counters. Counters are only included if their absolute value is > 0. | ||
111 | * | ||
112 | * Influx dataformat | ||
113 | * ( https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/ | ||
114 | ) | ||
115 | * Syntax | ||
116 | <measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] | ||
117 | <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>] | ||
118 | * | ||
119 | * Example | ||
120 | myMeasurement,tag1=value1,tag2=value2 fieldKey="fieldValue" | ||
121 | 1556813561098000000 | ||
122 | */ | ||
123 | 344 | std::string TelemetryAggregatorInflux::MakePayload() { | |
124 | // measurement and tags | ||
125 |
1/2✓ Branch 1 taken 344 times.
✗ Branch 2 not taken.
|
344 | std::string metric_name = influx_metric_name_; |
126 |
2/2✓ Branch 0 taken 301 times.
✓ Branch 1 taken 43 times.
|
344 | if (send_delta_) { |
127 |
1/2✓ Branch 1 taken 301 times.
✗ Branch 2 not taken.
|
301 | metric_name += "_absolute"; |
128 | } | ||
129 |
2/4✓ Branch 1 taken 344 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 344 times.
✗ Branch 5 not taken.
|
344 | std::string ret = metric_name + ",repo=" + fqrn_; |
130 | |||
131 |
3/4✓ Branch 1 taken 344 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 172 times.
✓ Branch 4 taken 172 times.
|
344 | if (influx_extra_tags_ != "") { |
132 |
2/4✓ Branch 1 taken 172 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 172 times.
✗ Branch 5 not taken.
|
172 | ret += "," + influx_extra_tags_; |
133 | } | ||
134 | |||
135 | // fields | ||
136 |
1/2✓ Branch 1 taken 344 times.
✗ Branch 2 not taken.
|
344 | ret += " "; |
137 | 344 | bool add_token = false; | |
138 | 688 | for (std::map<std::string, int64_t>::iterator it = counters_.begin(), | |
139 | 344 | iEnd = counters_.end(); | |
140 |
2/2✓ Branch 1 taken 1032 times.
✓ Branch 2 taken 344 times.
|
1376 | it != iEnd; |
141 | 1032 | it++) { | |
142 |
2/2✓ Branch 1 taken 473 times.
✓ Branch 2 taken 559 times.
|
1032 | if (it->second != 0) { |
143 |
2/2✓ Branch 0 taken 258 times.
✓ Branch 1 taken 215 times.
|
473 | if (add_token) { |
144 |
1/2✓ Branch 1 taken 258 times.
✗ Branch 2 not taken.
|
258 | ret += ","; |
145 | } | ||
146 |
4/8✓ Branch 2 taken 473 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 473 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 473 times.
✗ Branch 10 not taken.
✓ Branch 12 taken 473 times.
✗ Branch 13 not taken.
|
473 | ret += it->first + "=" + StringifyInt(it->second); |
147 | 473 | add_token = true; | |
148 | } | ||
149 | } | ||
150 |
3/4✓ Branch 1 taken 344 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 172 times.
✓ Branch 4 taken 172 times.
|
344 | if (influx_extra_fields_ != "") { |
151 |
2/2✓ Branch 0 taken 86 times.
✓ Branch 1 taken 86 times.
|
172 | if (add_token) { |
152 |
1/2✓ Branch 1 taken 86 times.
✗ Branch 2 not taken.
|
86 | ret += ","; |
153 | } | ||
154 |
1/2✓ Branch 1 taken 172 times.
✗ Branch 2 not taken.
|
172 | ret += influx_extra_fields_; |
155 | 172 | add_token = true; | |
156 | } | ||
157 | |||
158 | // timestamp | ||
159 |
2/2✓ Branch 0 taken 301 times.
✓ Branch 1 taken 43 times.
|
344 | if (add_token) { |
160 |
1/2✓ Branch 1 taken 301 times.
✗ Branch 2 not taken.
|
301 | ret += " "; |
161 | } | ||
162 |
2/4✓ Branch 1 taken 344 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 344 times.
✗ Branch 5 not taken.
|
344 | ret += StringifyUint(timestamp_); |
163 | |||
164 | 688 | return ret; | |
165 | 344 | } | |
166 | |||
167 | /** | ||
168 | * Creates a string in the influx data format containing the delta between | ||
169 | * 2 measurements of the same counter. Counters are only included if their | ||
170 | * absolute value is > 0 (delta can be 0). | ||
171 | * | ||
172 | * NOTE: As influx_extra_fields_ are static, they are excluded of this | ||
173 | * delta format | ||
174 | * | ||
175 | * Influx dataformat | ||
176 | * ( https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/ | ||
177 | ) | ||
178 | * Syntax | ||
179 | <measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] | ||
180 | <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>] | ||
181 | * | ||
182 | * Example | ||
183 | myMeasurement,tag1=value1,tag2=value2 fieldKey="fieldValue" | ||
184 | 1556813561098000000 | ||
185 | */ | ||
186 | 86 | std::string TelemetryAggregatorInflux::MakeDeltaPayload() { | |
187 | // measurement and tags | ||
188 |
2/4✓ Branch 2 taken 86 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 86 times.
✗ Branch 6 not taken.
|
172 | std::string ret = "" + influx_metric_name_ + "_delta,repo=" + fqrn_; |
189 | |||
190 |
2/4✓ Branch 1 taken 86 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 86 times.
✗ Branch 4 not taken.
|
86 | if (influx_extra_tags_ != "") { |
191 |
2/4✓ Branch 1 taken 86 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 86 times.
✗ Branch 5 not taken.
|
86 | ret += "," + influx_extra_tags_; |
192 | } | ||
193 | |||
194 | // fields | ||
195 |
1/2✓ Branch 1 taken 86 times.
✗ Branch 2 not taken.
|
86 | ret += " "; |
196 | 86 | bool add_token = false; | |
197 | 172 | for (std::map<std::string, int64_t>::iterator it = counters_.begin(), | |
198 | 86 | iEnd = counters_.end(); | |
199 |
2/2✓ Branch 1 taken 258 times.
✓ Branch 2 taken 86 times.
|
344 | it != iEnd; |
200 | 258 | it++) { | |
201 | 258 | const int64_t value = it->second; | |
202 |
2/2✓ Branch 0 taken 215 times.
✓ Branch 1 taken 43 times.
|
258 | if (value != 0) { |
203 | int64_t old_value; | ||
204 | try { | ||
205 |
1/2✓ Branch 2 taken 215 times.
✗ Branch 3 not taken.
|
215 | old_value = old_counters_.at(it->first); |
206 | ✗ | } catch (const std::out_of_range &ex) { | |
207 | ✗ | old_value = 0; | |
208 | } | ||
209 |
2/2✓ Branch 0 taken 129 times.
✓ Branch 1 taken 86 times.
|
215 | if (add_token) { |
210 |
1/2✓ Branch 1 taken 129 times.
✗ Branch 2 not taken.
|
129 | ret += ","; |
211 | } | ||
212 |
4/8✓ Branch 1 taken 215 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 215 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 215 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 215 times.
✗ Branch 12 not taken.
|
215 | ret += it->first + "=" + StringifyInt(value - old_value); |
213 | 215 | add_token = true; | |
214 | } | ||
215 | } | ||
216 | |||
217 | // timestamp | ||
218 |
1/2✓ Branch 0 taken 86 times.
✗ Branch 1 not taken.
|
86 | if (add_token) { |
219 |
1/2✓ Branch 1 taken 86 times.
✗ Branch 2 not taken.
|
86 | ret += " "; |
220 | } | ||
221 |
2/4✓ Branch 1 taken 86 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 86 times.
✗ Branch 5 not taken.
|
86 | ret += StringifyUint(timestamp_); |
222 | |||
223 | 86 | return ret; | |
224 | } | ||
225 | |||
226 | 258 | TelemetryReturn TelemetryAggregatorInflux::OpenSocket() { | |
227 | 258 | const char *hostname = influx_host_.c_str(); | |
228 | struct addrinfo hints; | ||
229 | int err; | ||
230 | |||
231 | 258 | memset(&hints, 0, sizeof(hints)); | |
232 | 258 | hints.ai_family = AF_INET; | |
233 | 258 | hints.ai_socktype = SOCK_DGRAM; | |
234 | |||
235 |
1/2✓ Branch 2 taken 258 times.
✗ Branch 3 not taken.
|
258 | err = getaddrinfo(influx_host_.c_str(), NULL, &hints, &res_); |
236 |
2/4✓ Branch 0 taken 258 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 258 times.
|
258 | if (err != 0 || res_ == NULL) { |
237 | ✗ | LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr, | |
238 | ✗ | "Failed to resolve influx server [%s]. errno=%d", hostname, errno); | |
239 | ✗ | return kTelemetryFailHostAddress; | |
240 | } | ||
241 | |||
242 | 258 | socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0); | |
243 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 258 times.
|
258 | if (socket_fd_ < 0) { |
244 | ✗ | LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr, "Failed to open socket"); | |
245 | ✗ | freeaddrinfo(res_); | |
246 | ✗ | return kTelemetryFailSocket; | |
247 | } | ||
248 | |||
249 | 258 | return kTelemetrySuccess; | |
250 | } | ||
251 | |||
252 | ✗ | TelemetryReturn TelemetryAggregatorInflux::SendToInflux( | |
253 | const std::string &payload) { | ||
254 | ✗ | struct sockaddr_in *dest_addr = NULL; | |
255 | ✗ | dest_addr = reinterpret_cast<sockaddr_in *>(res_->ai_addr); | |
256 | ✗ | dest_addr->sin_port = htons(influx_port_); | |
257 | |||
258 | ✗ | const ssize_t num_bytes_sent = sendto( | |
259 | ✗ | socket_fd_, payload.data(), payload.size(), 0, | |
260 | reinterpret_cast<struct sockaddr *>(dest_addr), | ||
261 | sizeof(struct sockaddr_in)); | ||
262 | |||
263 | ✗ | if (num_bytes_sent < 0) { | |
264 | ✗ | LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr, | |
265 | ✗ | "Failed to send to influx. errno=%d", errno); | |
266 | ✗ | return kTelemetryFailSend; | |
267 | ✗ | } else if (static_cast<size_t>(num_bytes_sent) != payload.size()) { | |
268 | ✗ | LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr, | |
269 | "Incomplete send. Bytes transferred: %zd. Bytes expected %lu", | ||
270 | num_bytes_sent, payload.size()); | ||
271 | ✗ | return kTelemetryFailSend; | |
272 | } | ||
273 | |||
274 | ✗ | LogCvmfs(kLogTelemetry, kLogDebug, "INFLUX: POSTING [%s]", payload.c_str()); | |
275 | ✗ | return kTelemetrySuccess; | |
276 | } | ||
277 | |||
278 | |||
279 | ✗ | void TelemetryAggregatorInflux::PushMetrics() { | |
280 | // create payload | ||
281 | ✗ | std::string payload = MakePayload(); | |
282 | ✗ | std::string delta_payload = ""; | |
283 | ✗ | if (send_delta_ && old_counters_.size() > 0) { | |
284 | ✗ | delta_payload = MakeDeltaPayload(); | |
285 | ✗ | payload = payload + "\n" + delta_payload; | |
286 | } | ||
287 | ✗ | payload += "\n"; | |
288 | |||
289 | // send to influx | ||
290 | ✗ | SendToInflux(payload); | |
291 | |||
292 | // current counter is now old counter (only if delta sending is enabled) | ||
293 | ✗ | if (send_delta_) { | |
294 | ✗ | counters_.swap(old_counters_); | |
295 | } | ||
296 | } | ||
297 | |||
298 | } // namespace perf | ||
299 |