GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/telemetry_aggregator_influx.cc
Date: 2024-04-21 02:33:16
Exec Total Coverage
Lines: 83 117 70.9%
Branches: 94 188 50.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "telemetry_aggregator_influx.h"
6
7 #include <netinet/in.h>
8 #include <pthread.h>
9 #include <stdint.h>
10 #include <sys/socket.h>
11 #include <sys/types.h>
12
13 #include <cstring>
14 #include <map>
15 #include <string>
16 #include <vector>
17
18 #include "util/logging.h"
19 #include "util/posix.h"
20 #include "util/string.h"
21
22 namespace perf {
23
24 4 TelemetryAggregatorInflux::TelemetryAggregatorInflux(Statistics* statistics,
25 int send_rate_sec,
26 OptionsManager *options_mgr,
27 4 const std::string &fqrn) :
28 TelemetryAggregator(statistics, send_rate_sec, fqrn),
29
2/4
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 4 times.
✗ Branch 8 not taken.
4 influx_extra_fields_(""), influx_extra_tags_(""),
30 4 socket_fd_(-1), res_(NULL) {
31 4 int params = 0;
32
33
4/6
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 3 times.
✓ Branch 10 taken 1 times.
4 if (options_mgr->GetValue("CVMFS_INFLUX_HOST", &influx_host_)) {
34
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3 if (influx_host_.size() > 1) {
35 3 params++;
36 } else {
37 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn,
38 "No value given for CVMFS_INFLUX_HOST");
39 }
40 }
41
42 4 std::string opt;
43
3/6
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 4 times.
✗ Branch 10 not taken.
4 if (options_mgr->GetValue("CVMFS_INFLUX_PORT", &opt)) {
44
2/4
✓ Branch 3 taken 4 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 4 times.
✗ Branch 7 not taken.
4 influx_port_ = static_cast<int>(String2Int64(opt.c_str()));
45
2/4
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
4 if (influx_port_ > 0 && influx_port_ < 65536) {
46 4 params++;
47 } else {
48 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn,
49 "Invalid value for CVMFS_INFLUX_PORT [%s]", opt.c_str());
50 }
51 }
52
53
3/6
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 4 times.
✗ Branch 10 not taken.
4 if (options_mgr->GetValue("CVMFS_INFLUX_METRIC_NAME", &influx_metric_name_))
54 {
55 4 params++;
56 }
57
58
4/6
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 2 times.
✓ Branch 10 taken 2 times.
4 if (!options_mgr->GetValue("CVMFS_INFLUX_EXTRA_TAGS", &influx_extra_tags_)) {
59
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 influx_extra_tags_ = "";
60 }
61
62
4/6
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 2 times.
✓ Branch 10 taken 2 times.
4 if (!options_mgr->GetValue("CVMFS_INFLUX_EXTRA_FIELDS",
63 &influx_extra_fields_)) {
64
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 influx_extra_fields_ = "";
65 }
66
67
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 times.
4 if (params == 3) {
68 3 is_zombie_ = false;
69
1/2
✓ Branch 5 taken 3 times.
✗ Branch 6 not taken.
3 LogCvmfs(kLogTelemetry, kLogDebug, "Enabling influx metrics. "
70 "Send to [%s:%d] metric [%s]. Extra tags"
71 "[%s]. Extra fields [%s].",
72 influx_host_.c_str(),
73 influx_port_,
74 influx_metric_name_.c_str(),
75 influx_extra_tags_.c_str(),
76 influx_extra_fields_.c_str());
77
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3 TelemetryReturn ret = OpenSocket();
78
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (ret != kTelemetrySuccess) {
79 is_zombie_ = true;
80 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn,
81 "Not enabling influx metrics. Error while open socket. %d", ret);
82 }
83 } else {
84 1 is_zombie_ = true;
85
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn,
86 "Not enabling influx metrics. Not all mandatory variables set: "
87 "CVMFS_INFLUX_METRIC_NAME, CVMFS_INFLUX_HOST, CVMFS_INFLUX_PORT");
88 }
89 4 }
90
91 8 TelemetryAggregatorInflux::~TelemetryAggregatorInflux() {
92
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 times.
8 if (socket_fd_ >= 0) {
93 6 close(socket_fd_);
94 }
95
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 times.
8 if (res_) {
96 6 freeaddrinfo(res_);
97 }
98 }
99
100 /**
101 * Creates a string in the influx data format containing the absolute values
102 * of the counters. Counters are only included if their absolute value is > 0.
103 *
104 * Influx dataformat
105 * ( https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/ )
106 * Syntax
107 <measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
108 *
109 * Example
110 myMeasurement,tag1=value1,tag2=value2 fieldKey="fieldValue" 1556813561098000000
111 */
112 5 std::string TelemetryAggregatorInflux::MakePayload() {
113 // measurement and tags
114
1/2
✓ Branch 2 taken 5 times.
✗ Branch 3 not taken.
5 std::string ret = influx_metric_name_ + "_absolute,repo=" + fqrn_;
115
116
3/4
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 4 times.
✓ Branch 4 taken 1 times.
5 if (influx_extra_tags_ != "") {
117
2/4
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 4 times.
✗ Branch 5 not taken.
4 ret += "," + influx_extra_tags_;
118 }
119
120 // fields
121
1/2
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
5 ret += " ";
122 5 bool add_token = false;
123 5 for (std::map<std::string, int64_t>::iterator it
124
2/2
✓ Branch 4 taken 15 times.
✓ Branch 5 taken 5 times.
25 = counters_.begin(), iEnd = counters_.end(); it != iEnd; it++) {
125
2/2
✓ Branch 1 taken 5 times.
✓ Branch 2 taken 10 times.
15 if (it->second != 0) {
126
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 2 times.
5 if (add_token) {
127
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3 ret += ",";
128 }
129
4/8
✓ Branch 2 taken 5 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 5 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 5 times.
✗ Branch 10 not taken.
✓ Branch 12 taken 5 times.
✗ Branch 13 not taken.
5 ret += it->first + "=" + StringifyInt(it->second);
130 5 add_token = true;
131 }
132 }
133
3/4
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 4 times.
✓ Branch 4 taken 1 times.
5 if (influx_extra_fields_ != "") {
134
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2 times.
4 if (add_token) {
135
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 ret += ",";
136 }
137
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 ret += influx_extra_fields_;
138 4 add_token = true;
139 }
140
141 // timestamp
142
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 1 times.
5 if (add_token) {
143
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 ret += " ";
144 }
145
2/4
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 5 times.
✗ Branch 5 not taken.
5 ret += StringifyUint(timestamp_);
146
147 5 return ret;
148 }
149
150 /**
151 * Creates a string in the influx data format containing the delta between
152 * 2 measurements of the same counter. Counters are only included if their
153 * absolute value is > 0 (delta can be 0).
154 *
155 * NOTE: As influx_extra_fields_ are static, they are excluded of this
156 * delta format
157 *
158 * Influx dataformat
159 * ( https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/ )
160 * Syntax
161 <measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
162 *
163 * Example
164 myMeasurement,tag1=value1,tag2=value2 fieldKey="fieldValue" 1556813561098000000
165 */
166 2 std::string TelemetryAggregatorInflux::MakeDeltaPayload() {
167 // measurement and tags
168
2/4
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2 times.
✗ Branch 6 not taken.
4 std::string ret = "" + influx_metric_name_ + "_delta,repo=" + fqrn_;
169
170
2/4
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
✗ Branch 4 not taken.
2 if (influx_extra_tags_ != "") {
171
2/4
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
2 ret += "," + influx_extra_tags_;
172 }
173
174 // fields
175
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 ret += " ";
176 2 bool add_token = false;
177 2 for (std::map<std::string, int64_t>::iterator it
178
2/2
✓ Branch 4 taken 6 times.
✓ Branch 5 taken 2 times.
10 = counters_.begin(), iEnd = counters_.end(); it != iEnd; it++) {
179 6 int64_t value = it->second;
180
1/2
✓ Branch 2 taken 6 times.
✗ Branch 3 not taken.
6 int64_t old_value = old_counters_.at(it->first);
181
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 1 times.
6 if (value != 0) {
182
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 2 times.
5 if (add_token) {
183
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
3 ret += ",";
184 }
185
4/8
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 5 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 5 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 5 times.
✗ Branch 12 not taken.
5 ret += it->first + "=" + StringifyInt(value - old_value);
186 5 add_token = true;
187 }
188 }
189
190 // timestamp
191
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if (add_token) {
192
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 ret += " ";
193 }
194
2/4
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
2 ret += StringifyUint(timestamp_);
195
196 2 return ret;
197 }
198
199 3 TelemetryReturn TelemetryAggregatorInflux::OpenSocket() {
200 3 const char *hostname = influx_host_.c_str();
201 struct addrinfo hints;
202 int err;
203
204 3 memset(&hints, 0, sizeof(hints));
205 3 hints.ai_family = AF_INET;
206 3 hints.ai_socktype = SOCK_DGRAM;
207
208
1/2
✓ Branch 2 taken 3 times.
✗ Branch 3 not taken.
3 err = getaddrinfo(influx_host_.c_str(), NULL, &hints, &res_);
209
2/4
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 3 times.
3 if (err != 0 || res_ == NULL) {
210 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr,
211 "Failed to resolve influx server [%s]. errno=%d",
212 hostname, errno);
213 return kTelemetryFailHostAddress;
214 }
215
216 3 socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
217
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (socket_fd_ < 0) {
218 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr,
219 "Failed to open socket");
220 freeaddrinfo(res_);
221 return kTelemetryFailSocket;
222 }
223
224 3 return kTelemetrySuccess;
225 }
226
227 TelemetryReturn TelemetryAggregatorInflux::SendToInflux(
228 const std::string &payload) {
229 struct sockaddr_in *dest_addr = NULL;
230 dest_addr = reinterpret_cast<sockaddr_in*>(res_->ai_addr);
231 dest_addr->sin_port = htons(influx_port_);
232
233 ssize_t num_bytes_sent = sendto(socket_fd_,
234 payload.data(),
235 payload.size(),
236 0,
237 reinterpret_cast<struct sockaddr*>(dest_addr),
238 sizeof(struct sockaddr_in));
239
240 if (num_bytes_sent < 0) {
241 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr,
242 "Failed to send to influx. errno=%d", errno);
243 return kTelemetryFailSend;
244 } else if (static_cast<size_t>(num_bytes_sent) != payload.size()) {
245 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr,
246 "Incomplete send. Bytes transferred: %zd. Bytes expected %lu",
247 num_bytes_sent, payload.size());
248 return kTelemetryFailSend;
249 }
250
251 LogCvmfs(kLogTelemetry, kLogDebug, "INFLUX: POSTING [%s]", payload.c_str());
252 return kTelemetrySuccess;
253 }
254
255
256 void TelemetryAggregatorInflux::PushMetrics() {
257 // create payload
258 std::string payload = MakePayload();
259 std::string delta_payload = "";
260 if (old_counters_.size() > 0) {
261 delta_payload = MakeDeltaPayload();
262 payload = payload + "\n" + delta_payload;
263 }
264 payload += "\n";
265
266 // send to influx
267 SendToInflux(payload);
268
269 // current counter is now old counter
270 counters_.swap(old_counters_);
271 }
272
273 } // namespace perf
274