GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/telemetry_aggregator_influx.cc
Date: 2025-02-09 02:34:19
Exec Total Coverage
Lines: 83 119 69.7%
Branches: 94 190 49.5%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "telemetry_aggregator_influx.h"
6
7 #include <netinet/in.h>
8 #include <pthread.h>
9 #include <stdint.h>
10 #include <sys/socket.h>
11 #include <sys/types.h>
12
13 #include <cstring>
14 #include <map>
15 #include <string>
16 #include <vector>
17
18 #include "util/logging.h"
19 #include "util/posix.h"
20 #include "util/string.h"
21
22 namespace perf {
23
24 4 TelemetryAggregatorInflux::TelemetryAggregatorInflux(Statistics* statistics,
25 int send_rate_sec,
26 OptionsManager *options_mgr,
27 MountPoint* mount_point,
28 4 const std::string &fqrn) :
29 TelemetryAggregator(statistics, send_rate_sec,
30 mount_point, fqrn),
31
2/4
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 4 times.
✗ Branch 8 not taken.
4 influx_extra_fields_(""), influx_extra_tags_(""),
32 4 socket_fd_(-1), res_(NULL) {
33 4 int params = 0;
34
35
4/6
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 3 times.
✓ Branch 10 taken 1 times.
4 if (options_mgr->GetValue("CVMFS_INFLUX_HOST", &influx_host_)) {
36
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3 if (influx_host_.size() > 1) {
37 3 params++;
38 } else {
39 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn,
40 "No value given for CVMFS_INFLUX_HOST");
41 }
42 }
43
44 4 std::string opt;
45
3/6
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 4 times.
✗ Branch 10 not taken.
4 if (options_mgr->GetValue("CVMFS_INFLUX_PORT", &opt)) {
46
2/4
✓ Branch 3 taken 4 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 4 times.
✗ Branch 7 not taken.
4 influx_port_ = static_cast<int>(String2Int64(opt.c_str()));
47
2/4
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
4 if (influx_port_ > 0 && influx_port_ < 65536) {
48 4 params++;
49 } else {
50 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn,
51 "Invalid value for CVMFS_INFLUX_PORT [%s]", opt.c_str());
52 }
53 }
54
55
3/6
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 4 times.
✗ Branch 10 not taken.
4 if (options_mgr->GetValue("CVMFS_INFLUX_METRIC_NAME", &influx_metric_name_))
56 {
57 4 params++;
58 }
59
60
4/6
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 2 times.
✓ Branch 10 taken 2 times.
4 if (!options_mgr->GetValue("CVMFS_INFLUX_EXTRA_TAGS", &influx_extra_tags_)) {
61
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 influx_extra_tags_ = "";
62 }
63
64
4/6
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 2 times.
✓ Branch 10 taken 2 times.
4 if (!options_mgr->GetValue("CVMFS_INFLUX_EXTRA_FIELDS",
65 &influx_extra_fields_)) {
66
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 influx_extra_fields_ = "";
67 }
68
69
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 times.
4 if (params == 3) {
70 3 is_zombie_ = false;
71
1/2
✓ Branch 5 taken 3 times.
✗ Branch 6 not taken.
3 LogCvmfs(kLogTelemetry, kLogDebug, "Enabling influx metrics. "
72 "Send to [%s:%d] metric [%s]. Extra tags"
73 "[%s]. Extra fields [%s].",
74 influx_host_.c_str(),
75 influx_port_,
76 influx_metric_name_.c_str(),
77 influx_extra_tags_.c_str(),
78 influx_extra_fields_.c_str());
79
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3 TelemetryReturn ret = OpenSocket();
80
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (ret != kTelemetrySuccess) {
81 is_zombie_ = true;
82 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn,
83 "Not enabling influx metrics. Error while open socket. %d", ret);
84 }
85 } else {
86 1 is_zombie_ = true;
87
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn,
88 "Not enabling influx metrics. Not all mandatory variables set: "
89 "CVMFS_INFLUX_METRIC_NAME, CVMFS_INFLUX_HOST, CVMFS_INFLUX_PORT");
90 }
91 4 }
92
93 8 TelemetryAggregatorInflux::~TelemetryAggregatorInflux() {
94
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 times.
8 if (socket_fd_ >= 0) {
95 6 close(socket_fd_);
96 }
97
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 times.
8 if (res_) {
98 6 freeaddrinfo(res_);
99 }
100 }
101
102 /**
103 * Creates a string in the influx data format containing the absolute values
104 * of the counters. Counters are only included if their absolute value is > 0.
105 *
106 * Influx dataformat
107 * ( https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/ )
108 * Syntax
109 <measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
110 *
111 * Example
112 myMeasurement,tag1=value1,tag2=value2 fieldKey="fieldValue" 1556813561098000000
113 */
114 5 std::string TelemetryAggregatorInflux::MakePayload() {
115 // measurement and tags
116
1/2
✓ Branch 2 taken 5 times.
✗ Branch 3 not taken.
5 std::string ret = influx_metric_name_ + "_absolute,repo=" + fqrn_;
117
118
3/4
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 4 times.
✓ Branch 4 taken 1 times.
5 if (influx_extra_tags_ != "") {
119
2/4
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 4 times.
✗ Branch 5 not taken.
4 ret += "," + influx_extra_tags_;
120 }
121
122 // fields
123
1/2
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
5 ret += " ";
124 5 bool add_token = false;
125 5 for (std::map<std::string, int64_t>::iterator it
126
2/2
✓ Branch 4 taken 15 times.
✓ Branch 5 taken 5 times.
25 = counters_.begin(), iEnd = counters_.end(); it != iEnd; it++) {
127
2/2
✓ Branch 1 taken 5 times.
✓ Branch 2 taken 10 times.
15 if (it->second != 0) {
128
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 2 times.
5 if (add_token) {
129
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3 ret += ",";
130 }
131
4/8
✓ Branch 2 taken 5 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 5 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 5 times.
✗ Branch 10 not taken.
✓ Branch 12 taken 5 times.
✗ Branch 13 not taken.
5 ret += it->first + "=" + StringifyInt(it->second);
132 5 add_token = true;
133 }
134 }
135
3/4
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 4 times.
✓ Branch 4 taken 1 times.
5 if (influx_extra_fields_ != "") {
136
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2 times.
4 if (add_token) {
137
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 ret += ",";
138 }
139
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 ret += influx_extra_fields_;
140 4 add_token = true;
141 }
142
143 // timestamp
144
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 1 times.
5 if (add_token) {
145
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 ret += " ";
146 }
147
2/4
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 5 times.
✗ Branch 5 not taken.
5 ret += StringifyUint(timestamp_);
148
149 5 return ret;
150 }
151
152 /**
153 * Creates a string in the influx data format containing the delta between
154 * 2 measurements of the same counter. Counters are only included if their
155 * absolute value is > 0 (delta can be 0).
156 *
157 * NOTE: As influx_extra_fields_ are static, they are excluded of this
158 * delta format
159 *
160 * Influx dataformat
161 * ( https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/ )
162 * Syntax
163 <measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
164 *
165 * Example
166 myMeasurement,tag1=value1,tag2=value2 fieldKey="fieldValue" 1556813561098000000
167 */
168 2 std::string TelemetryAggregatorInflux::MakeDeltaPayload() {
169 // measurement and tags
170
2/4
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2 times.
✗ Branch 6 not taken.
4 std::string ret = "" + influx_metric_name_ + "_delta,repo=" + fqrn_;
171
172
2/4
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
✗ Branch 4 not taken.
2 if (influx_extra_tags_ != "") {
173
2/4
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
2 ret += "," + influx_extra_tags_;
174 }
175
176 // fields
177
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 ret += " ";
178 2 bool add_token = false;
179 2 for (std::map<std::string, int64_t>::iterator it
180
2/2
✓ Branch 4 taken 6 times.
✓ Branch 5 taken 2 times.
10 = counters_.begin(), iEnd = counters_.end(); it != iEnd; it++) {
181 6 int64_t value = it->second;
182
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 1 times.
6 if (value != 0) {
183 int64_t old_value;
184 try {
185
1/2
✓ Branch 2 taken 5 times.
✗ Branch 3 not taken.
5 old_value = old_counters_.at(it->first);
186 } catch(const std::out_of_range& ex) {
187 old_value = 0;
188 }
189
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 2 times.
5 if (add_token) {
190
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3 ret += ",";
191 }
192
4/8
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 5 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 5 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 5 times.
✗ Branch 12 not taken.
5 ret += it->first + "=" + StringifyInt(value - old_value);
193 5 add_token = true;
194 }
195 }
196
197 // timestamp
198
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if (add_token) {
199
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 ret += " ";
200 }
201
2/4
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
2 ret += StringifyUint(timestamp_);
202
203 2 return ret;
204 }
205
206 3 TelemetryReturn TelemetryAggregatorInflux::OpenSocket() {
207 3 const char *hostname = influx_host_.c_str();
208 struct addrinfo hints;
209 int err;
210
211 3 memset(&hints, 0, sizeof(hints));
212 3 hints.ai_family = AF_INET;
213 3 hints.ai_socktype = SOCK_DGRAM;
214
215
1/2
✓ Branch 2 taken 3 times.
✗ Branch 3 not taken.
3 err = getaddrinfo(influx_host_.c_str(), NULL, &hints, &res_);
216
2/4
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 3 times.
3 if (err != 0 || res_ == NULL) {
217 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr,
218 "Failed to resolve influx server [%s]. errno=%d",
219 hostname, errno);
220 return kTelemetryFailHostAddress;
221 }
222
223 3 socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
224
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (socket_fd_ < 0) {
225 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr,
226 "Failed to open socket");
227 freeaddrinfo(res_);
228 return kTelemetryFailSocket;
229 }
230
231 3 return kTelemetrySuccess;
232 }
233
234 TelemetryReturn TelemetryAggregatorInflux::SendToInflux(
235 const std::string &payload) {
236 struct sockaddr_in *dest_addr = NULL;
237 dest_addr = reinterpret_cast<sockaddr_in*>(res_->ai_addr);
238 dest_addr->sin_port = htons(influx_port_);
239
240 ssize_t num_bytes_sent = sendto(socket_fd_,
241 payload.data(),
242 payload.size(),
243 0,
244 reinterpret_cast<struct sockaddr*>(dest_addr),
245 sizeof(struct sockaddr_in));
246
247 if (num_bytes_sent < 0) {
248 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr,
249 "Failed to send to influx. errno=%d", errno);
250 return kTelemetryFailSend;
251 } else if (static_cast<size_t>(num_bytes_sent) != payload.size()) {
252 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr,
253 "Incomplete send. Bytes transferred: %zd. Bytes expected %lu",
254 num_bytes_sent, payload.size());
255 return kTelemetryFailSend;
256 }
257
258 LogCvmfs(kLogTelemetry, kLogDebug, "INFLUX: POSTING [%s]", payload.c_str());
259 return kTelemetrySuccess;
260 }
261
262
263 void TelemetryAggregatorInflux::PushMetrics() {
264 // create payload
265 std::string payload = MakePayload();
266 std::string delta_payload = "";
267 if (old_counters_.size() > 0) {
268 delta_payload = MakeDeltaPayload();
269 payload = payload + "\n" + delta_payload;
270 }
271 payload += "\n";
272
273 // send to influx
274 SendToInflux(payload);
275
276 // current counter is now old counter
277 counters_.swap(old_counters_);
278 }
279
280 } // namespace perf
281