Directory: | cvmfs/ |
---|---|
File: | cvmfs/telemetry_aggregator_influx.cc |
Date: | 2024-04-21 02:33:16 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 83 | 117 | 70.9% |
Branches: | 94 | 188 | 50.0% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #include "telemetry_aggregator_influx.h" | ||
6 | |||
7 | #include <netinet/in.h> | ||
8 | #include <pthread.h> | ||
9 | #include <stdint.h> | ||
10 | #include <sys/socket.h> | ||
11 | #include <sys/types.h> | ||
12 | |||
13 | #include <cstring> | ||
14 | #include <map> | ||
15 | #include <string> | ||
16 | #include <vector> | ||
17 | |||
18 | #include "util/logging.h" | ||
19 | #include "util/posix.h" | ||
20 | #include "util/string.h" | ||
21 | |||
22 | namespace perf { | ||
23 | |||
24 | 4 | TelemetryAggregatorInflux::TelemetryAggregatorInflux(Statistics* statistics, | |
25 | int send_rate_sec, | ||
26 | OptionsManager *options_mgr, | ||
27 | 4 | const std::string &fqrn) : | |
28 | TelemetryAggregator(statistics, send_rate_sec, fqrn), | ||
29 |
2/4✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 4 times.
✗ Branch 8 not taken.
|
4 | influx_extra_fields_(""), influx_extra_tags_(""), |
30 | 4 | socket_fd_(-1), res_(NULL) { | |
31 | 4 | int params = 0; | |
32 | |||
33 |
4/6✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 3 times.
✓ Branch 10 taken 1 times.
|
4 | if (options_mgr->GetValue("CVMFS_INFLUX_HOST", &influx_host_)) { |
34 |
1/2✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
|
3 | if (influx_host_.size() > 1) { |
35 | 3 | params++; | |
36 | } else { | ||
37 | ✗ | LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn, | |
38 | "No value given for CVMFS_INFLUX_HOST"); | ||
39 | } | ||
40 | } | ||
41 | |||
42 | 4 | std::string opt; | |
43 |
3/6✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 4 times.
✗ Branch 10 not taken.
|
4 | if (options_mgr->GetValue("CVMFS_INFLUX_PORT", &opt)) { |
44 |
2/4✓ Branch 3 taken 4 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 4 times.
✗ Branch 7 not taken.
|
4 | influx_port_ = static_cast<int>(String2Int64(opt.c_str())); |
45 |
2/4✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
|
4 | if (influx_port_ > 0 && influx_port_ < 65536) { |
46 | 4 | params++; | |
47 | } else { | ||
48 | ✗ | LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn, | |
49 | "Invalid value for CVMFS_INFLUX_PORT [%s]", opt.c_str()); | ||
50 | } | ||
51 | } | ||
52 | |||
53 |
3/6✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 4 times.
✗ Branch 10 not taken.
|
4 | if (options_mgr->GetValue("CVMFS_INFLUX_METRIC_NAME", &influx_metric_name_)) |
54 | { | ||
55 | 4 | params++; | |
56 | } | ||
57 | |||
58 |
4/6✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 2 times.
✓ Branch 10 taken 2 times.
|
4 | if (!options_mgr->GetValue("CVMFS_INFLUX_EXTRA_TAGS", &influx_extra_tags_)) { |
59 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | influx_extra_tags_ = ""; |
60 | } | ||
61 | |||
62 |
4/6✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 2 times.
✓ Branch 10 taken 2 times.
|
4 | if (!options_mgr->GetValue("CVMFS_INFLUX_EXTRA_FIELDS", |
63 | &influx_extra_fields_)) { | ||
64 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | influx_extra_fields_ = ""; |
65 | } | ||
66 | |||
67 |
2/2✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 times.
|
4 | if (params == 3) { |
68 | 3 | is_zombie_ = false; | |
69 |
1/2✓ Branch 5 taken 3 times.
✗ Branch 6 not taken.
|
3 | LogCvmfs(kLogTelemetry, kLogDebug, "Enabling influx metrics. " |
70 | "Send to [%s:%d] metric [%s]. Extra tags" | ||
71 | "[%s]. Extra fields [%s].", | ||
72 | influx_host_.c_str(), | ||
73 | influx_port_, | ||
74 | influx_metric_name_.c_str(), | ||
75 | influx_extra_tags_.c_str(), | ||
76 | influx_extra_fields_.c_str()); | ||
77 |
1/2✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
|
3 | TelemetryReturn ret = OpenSocket(); |
78 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
|
3 | if (ret != kTelemetrySuccess) { |
79 | ✗ | is_zombie_ = true; | |
80 | ✗ | LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn, | |
81 | "Not enabling influx metrics. Error while open socket. %d", ret); | ||
82 | } | ||
83 | } else { | ||
84 | 1 | is_zombie_ = true; | |
85 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn, |
86 | "Not enabling influx metrics. Not all mandatory variables set: " | ||
87 | "CVMFS_INFLUX_METRIC_NAME, CVMFS_INFLUX_HOST, CVMFS_INFLUX_PORT"); | ||
88 | } | ||
89 | 4 | } | |
90 | |||
91 | 8 | TelemetryAggregatorInflux::~TelemetryAggregatorInflux() { | |
92 |
2/2✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 times.
|
8 | if (socket_fd_ >= 0) { |
93 | 6 | close(socket_fd_); | |
94 | } | ||
95 |
2/2✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 times.
|
8 | if (res_) { |
96 | 6 | freeaddrinfo(res_); | |
97 | } | ||
98 | } | ||
99 | |||
100 | /** | ||
101 | * Creates a string in the influx data format containing the absolute values | ||
102 | * of the counters. Counters are only included if their absolute value is > 0. | ||
103 | * | ||
104 | * Influx dataformat | ||
105 | * ( https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/ ) | ||
106 | * Syntax | ||
107 | <measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>] | ||
108 | * | ||
109 | * Example | ||
110 | myMeasurement,tag1=value1,tag2=value2 fieldKey="fieldValue" 1556813561098000000 | ||
111 | */ | ||
112 | 5 | std::string TelemetryAggregatorInflux::MakePayload() { | |
113 | // measurement and tags | ||
114 |
1/2✓ Branch 2 taken 5 times.
✗ Branch 3 not taken.
|
5 | std::string ret = influx_metric_name_ + "_absolute,repo=" + fqrn_; |
115 | |||
116 |
3/4✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 4 times.
✓ Branch 4 taken 1 times.
|
5 | if (influx_extra_tags_ != "") { |
117 |
2/4✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 4 times.
✗ Branch 5 not taken.
|
4 | ret += "," + influx_extra_tags_; |
118 | } | ||
119 | |||
120 | // fields | ||
121 |
1/2✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
|
5 | ret += " "; |
122 | 5 | bool add_token = false; | |
123 | 5 | for (std::map<std::string, int64_t>::iterator it | |
124 |
2/2✓ Branch 4 taken 15 times.
✓ Branch 5 taken 5 times.
|
25 | = counters_.begin(), iEnd = counters_.end(); it != iEnd; it++) { |
125 |
2/2✓ Branch 1 taken 5 times.
✓ Branch 2 taken 10 times.
|
15 | if (it->second != 0) { |
126 |
2/2✓ Branch 0 taken 3 times.
✓ Branch 1 taken 2 times.
|
5 | if (add_token) { |
127 |
1/2✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
|
3 | ret += ","; |
128 | } | ||
129 |
4/8✓ Branch 2 taken 5 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 5 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 5 times.
✗ Branch 10 not taken.
✓ Branch 12 taken 5 times.
✗ Branch 13 not taken.
|
5 | ret += it->first + "=" + StringifyInt(it->second); |
130 | 5 | add_token = true; | |
131 | } | ||
132 | } | ||
133 |
3/4✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 4 times.
✓ Branch 4 taken 1 times.
|
5 | if (influx_extra_fields_ != "") { |
134 |
2/2✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2 times.
|
4 | if (add_token) { |
135 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | ret += ","; |
136 | } | ||
137 |
1/2✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
|
4 | ret += influx_extra_fields_; |
138 | 4 | add_token = true; | |
139 | } | ||
140 | |||
141 | // timestamp | ||
142 |
2/2✓ Branch 0 taken 4 times.
✓ Branch 1 taken 1 times.
|
5 | if (add_token) { |
143 |
1/2✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
|
4 | ret += " "; |
144 | } | ||
145 |
2/4✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 5 times.
✗ Branch 5 not taken.
|
5 | ret += StringifyUint(timestamp_); |
146 | |||
147 | 5 | return ret; | |
148 | } | ||
149 | |||
150 | /** | ||
151 | * Creates a string in the influx data format containing the delta between | ||
152 | * 2 measurements of the same counter. Counters are only included if their | ||
153 | * absolute value is > 0 (delta can be 0). | ||
154 | * | ||
155 | * NOTE: As influx_extra_fields_ are static, they are excluded of this | ||
156 | * delta format | ||
157 | * | ||
158 | * Influx dataformat | ||
159 | * ( https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/ ) | ||
160 | * Syntax | ||
161 | <measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>] | ||
162 | * | ||
163 | * Example | ||
164 | myMeasurement,tag1=value1,tag2=value2 fieldKey="fieldValue" 1556813561098000000 | ||
165 | */ | ||
166 | 2 | std::string TelemetryAggregatorInflux::MakeDeltaPayload() { | |
167 | // measurement and tags | ||
168 |
2/4✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2 times.
✗ Branch 6 not taken.
|
4 | std::string ret = "" + influx_metric_name_ + "_delta,repo=" + fqrn_; |
169 | |||
170 |
2/4✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
✗ Branch 4 not taken.
|
2 | if (influx_extra_tags_ != "") { |
171 |
2/4✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
|
2 | ret += "," + influx_extra_tags_; |
172 | } | ||
173 | |||
174 | // fields | ||
175 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | ret += " "; |
176 | 2 | bool add_token = false; | |
177 | 2 | for (std::map<std::string, int64_t>::iterator it | |
178 |
2/2✓ Branch 4 taken 6 times.
✓ Branch 5 taken 2 times.
|
10 | = counters_.begin(), iEnd = counters_.end(); it != iEnd; it++) { |
179 | 6 | int64_t value = it->second; | |
180 |
1/2✓ Branch 2 taken 6 times.
✗ Branch 3 not taken.
|
6 | int64_t old_value = old_counters_.at(it->first); |
181 |
2/2✓ Branch 0 taken 5 times.
✓ Branch 1 taken 1 times.
|
6 | if (value != 0) { |
182 |
2/2✓ Branch 0 taken 3 times.
✓ Branch 1 taken 2 times.
|
5 | if (add_token) { |
183 |
1/2✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
|
3 | ret += ","; |
184 | } | ||
185 |
4/8✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 5 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 5 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 5 times.
✗ Branch 12 not taken.
|
5 | ret += it->first + "=" + StringifyInt(value - old_value); |
186 | 5 | add_token = true; | |
187 | } | ||
188 | } | ||
189 | |||
190 | // timestamp | ||
191 |
1/2✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
|
2 | if (add_token) { |
192 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | ret += " "; |
193 | } | ||
194 |
2/4✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
|
2 | ret += StringifyUint(timestamp_); |
195 | |||
196 | 2 | return ret; | |
197 | } | ||
198 | |||
199 | 3 | TelemetryReturn TelemetryAggregatorInflux::OpenSocket() { | |
200 | 3 | const char *hostname = influx_host_.c_str(); | |
201 | struct addrinfo hints; | ||
202 | int err; | ||
203 | |||
204 | 3 | memset(&hints, 0, sizeof(hints)); | |
205 | 3 | hints.ai_family = AF_INET; | |
206 | 3 | hints.ai_socktype = SOCK_DGRAM; | |
207 | |||
208 |
1/2✓ Branch 2 taken 3 times.
✗ Branch 3 not taken.
|
3 | err = getaddrinfo(influx_host_.c_str(), NULL, &hints, &res_); |
209 |
2/4✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 3 times.
|
3 | if (err != 0 || res_ == NULL) { |
210 | ✗ | LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr, | |
211 | "Failed to resolve influx server [%s]. errno=%d", | ||
212 | ✗ | hostname, errno); | |
213 | ✗ | return kTelemetryFailHostAddress; | |
214 | } | ||
215 | |||
216 | 3 | socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0); | |
217 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
|
3 | if (socket_fd_ < 0) { |
218 | ✗ | LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr, | |
219 | "Failed to open socket"); | ||
220 | ✗ | freeaddrinfo(res_); | |
221 | ✗ | return kTelemetryFailSocket; | |
222 | } | ||
223 | |||
224 | 3 | return kTelemetrySuccess; | |
225 | } | ||
226 | |||
227 | ✗ | TelemetryReturn TelemetryAggregatorInflux::SendToInflux( | |
228 | const std::string &payload) { | ||
229 | ✗ | struct sockaddr_in *dest_addr = NULL; | |
230 | ✗ | dest_addr = reinterpret_cast<sockaddr_in*>(res_->ai_addr); | |
231 | ✗ | dest_addr->sin_port = htons(influx_port_); | |
232 | |||
233 | ✗ | ssize_t num_bytes_sent = sendto(socket_fd_, | |
234 | ✗ | payload.data(), | |
235 | payload.size(), | ||
236 | 0, | ||
237 | reinterpret_cast<struct sockaddr*>(dest_addr), | ||
238 | sizeof(struct sockaddr_in)); | ||
239 | |||
240 | ✗ | if (num_bytes_sent < 0) { | |
241 | ✗ | LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr, | |
242 | ✗ | "Failed to send to influx. errno=%d", errno); | |
243 | ✗ | return kTelemetryFailSend; | |
244 | ✗ | } else if (static_cast<size_t>(num_bytes_sent) != payload.size()) { | |
245 | ✗ | LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr, | |
246 | "Incomplete send. Bytes transferred: %zd. Bytes expected %lu", | ||
247 | num_bytes_sent, payload.size()); | ||
248 | ✗ | return kTelemetryFailSend; | |
249 | } | ||
250 | |||
251 | ✗ | LogCvmfs(kLogTelemetry, kLogDebug, "INFLUX: POSTING [%s]", payload.c_str()); | |
252 | ✗ | return kTelemetrySuccess; | |
253 | } | ||
254 | |||
255 | |||
256 | ✗ | void TelemetryAggregatorInflux::PushMetrics() { | |
257 | // create payload | ||
258 | ✗ | std::string payload = MakePayload(); | |
259 | ✗ | std::string delta_payload = ""; | |
260 | ✗ | if (old_counters_.size() > 0) { | |
261 | ✗ | delta_payload = MakeDeltaPayload(); | |
262 | ✗ | payload = payload + "\n" + delta_payload; | |
263 | } | ||
264 | ✗ | payload += "\n"; | |
265 | |||
266 | // send to influx | ||
267 | ✗ | SendToInflux(payload); | |
268 | |||
269 | // current counter is now old counter | ||
270 | ✗ | counters_.swap(old_counters_); | |
271 | } | ||
272 | |||
273 | } // namespace perf | ||
274 |