| Directory: | cvmfs/ |
|---|---|
| File: | cvmfs/telemetry_aggregator_influx.cc |
| Date: | 2026-04-26 02:35:59 |
| Exec | Total | Coverage | |
|---|---|---|---|
| Lines: | 97 | 134 | 72.4% |
| Branches: | 104 | 212 | 49.1% |
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | /** | ||
| 2 | * This file is part of the CernVM File System. | ||
| 3 | */ | ||
| 4 | |||
| 5 | #include "telemetry_aggregator_influx.h" | ||
| 6 | |||
| 7 | #include <netinet/in.h> | ||
| 8 | #include <pthread.h> | ||
| 9 | #include <stdint.h> | ||
| 10 | #include <sys/socket.h> | ||
| 11 | #include <sys/types.h> | ||
| 12 | |||
| 13 | #include <cstring> | ||
| 14 | #include <map> | ||
| 15 | #include <stdexcept> | ||
| 16 | #include <string> | ||
| 17 | #include <vector> | ||
| 18 | |||
| 19 | #include "util/logging.h" | ||
| 20 | #include "util/posix.h" | ||
| 21 | #include "util/string.h" | ||
| 22 | |||
| 23 | namespace perf { | ||
| 24 | |||
| 25 | 259 | TelemetryAggregatorInflux::TelemetryAggregatorInflux( | |
| 26 | Statistics *statistics, | ||
| 27 | int send_rate_sec, | ||
| 28 | OptionsManager *options_mgr, | ||
| 29 | MountPoint *mount_point, | ||
| 30 | 259 | const std::string &fqrn) | |
| 31 | : TelemetryAggregator(statistics, send_rate_sec, mount_point, fqrn) | ||
| 32 |
1/2✓ Branch 2 taken 259 times.
✗ Branch 3 not taken.
|
259 | , influx_extra_fields_("") |
| 33 |
1/2✓ Branch 2 taken 259 times.
✗ Branch 3 not taken.
|
259 | , influx_extra_tags_("") |
| 34 | 259 | , send_delta_(true) | |
| 35 | 259 | , socket_fd_(-1) | |
| 36 | 259 | , res_(NULL) { | |
| 37 | 259 | int params = 0; | |
| 38 | |||
| 39 |
4/6✓ Branch 2 taken 259 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 259 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 222 times.
✓ Branch 10 taken 37 times.
|
259 | if (options_mgr->GetValue("CVMFS_INFLUX_HOST", &influx_host_)) { |
| 40 |
1/2✓ Branch 1 taken 222 times.
✗ Branch 2 not taken.
|
222 | if (influx_host_.size() > 1) { |
| 41 | 222 | params++; | |
| 42 | } else { | ||
| 43 | ✗ | LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn, | |
| 44 | "No value given for CVMFS_INFLUX_HOST"); | ||
| 45 | } | ||
| 46 | } | ||
| 47 | |||
| 48 | 259 | std::string opt; | |
| 49 |
3/6✓ Branch 2 taken 259 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 259 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 259 times.
✗ Branch 10 not taken.
|
259 | if (options_mgr->GetValue("CVMFS_INFLUX_PORT", &opt)) { |
| 50 |
2/4✓ Branch 3 taken 259 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 259 times.
✗ Branch 7 not taken.
|
259 | influx_port_ = static_cast<int>(String2Int64(opt.c_str())); |
| 51 |
2/4✓ Branch 0 taken 259 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 259 times.
✗ Branch 3 not taken.
|
259 | if (influx_port_ > 0 && influx_port_ < 65536) { |
| 52 | 259 | params++; | |
| 53 | } else { | ||
| 54 | ✗ | LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn, | |
| 55 | "Invalid value for CVMFS_INFLUX_PORT [%s]", opt.c_str()); | ||
| 56 | } | ||
| 57 | } | ||
| 58 | |||
| 59 |
3/6✓ Branch 2 taken 259 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 259 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 259 times.
✗ Branch 10 not taken.
|
259 | if (options_mgr->GetValue("CVMFS_INFLUX_METRIC_NAME", &influx_metric_name_)) { |
| 60 | 259 | params++; | |
| 61 | } | ||
| 62 | |||
| 63 |
4/6✓ Branch 2 taken 259 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 259 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 185 times.
✓ Branch 10 taken 74 times.
|
259 | if (!options_mgr->GetValue("CVMFS_INFLUX_EXTRA_TAGS", &influx_extra_tags_)) { |
| 64 |
1/2✓ Branch 1 taken 185 times.
✗ Branch 2 not taken.
|
185 | influx_extra_tags_ = ""; |
| 65 | } | ||
| 66 | |||
| 67 |
4/6✓ Branch 2 taken 259 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 259 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 185 times.
✓ Branch 10 taken 74 times.
|
259 | if (!options_mgr->GetValue("CVMFS_INFLUX_EXTRA_FIELDS", |
| 68 | &influx_extra_fields_)) { | ||
| 69 |
1/2✓ Branch 1 taken 185 times.
✗ Branch 2 not taken.
|
185 | influx_extra_fields_ = ""; |
| 70 | } | ||
| 71 | |||
| 72 | // Read CVMFS_INFLUX_SEND_DELTA option, default to true (ON) | ||
| 73 | 259 | std::string send_delta_str; | |
| 74 |
4/6✓ Branch 2 taken 259 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 259 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 74 times.
✓ Branch 10 taken 185 times.
|
259 | if (options_mgr->GetValue("CVMFS_INFLUX_SEND_DELTA", &send_delta_str)) { |
| 75 |
1/2✓ Branch 1 taken 74 times.
✗ Branch 2 not taken.
|
74 | send_delta_ = !options_mgr->IsOff(send_delta_str); |
| 76 | } | ||
| 77 | |||
| 78 |
2/2✓ Branch 0 taken 222 times.
✓ Branch 1 taken 37 times.
|
259 | if (params == 3) { |
| 79 | 222 | is_zombie_ = false; | |
| 80 |
1/2✓ Branch 5 taken 222 times.
✗ Branch 6 not taken.
|
222 | LogCvmfs(kLogTelemetry, kLogDebug, |
| 81 | "Enabling influx metrics. " | ||
| 82 | "Send to [%s:%d] metric [%s]. Extra tags" | ||
| 83 | "[%s]. Extra fields [%s].", | ||
| 84 | influx_host_.c_str(), influx_port_, influx_metric_name_.c_str(), | ||
| 85 | influx_extra_tags_.c_str(), influx_extra_fields_.c_str()); | ||
| 86 |
1/2✓ Branch 1 taken 222 times.
✗ Branch 2 not taken.
|
222 | const TelemetryReturn ret = OpenSocket(); |
| 87 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 222 times.
|
222 | if (ret != kTelemetrySuccess) { |
| 88 | ✗ | is_zombie_ = true; | |
| 89 | ✗ | LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn, | |
| 90 | "Not enabling influx metrics. Error while open socket. %d", ret); | ||
| 91 | } | ||
| 92 | } else { | ||
| 93 | 37 | is_zombie_ = true; | |
| 94 |
1/2✓ Branch 1 taken 37 times.
✗ Branch 2 not taken.
|
37 | LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogWarn, |
| 95 | "Not enabling influx metrics. Not all mandatory variables set: " | ||
| 96 | "CVMFS_INFLUX_METRIC_NAME, CVMFS_INFLUX_HOST, CVMFS_INFLUX_PORT"); | ||
| 97 | } | ||
| 98 | 259 | } | |
| 99 | |||
| 100 | 518 | TelemetryAggregatorInflux::~TelemetryAggregatorInflux() { | |
| 101 |
2/2✓ Branch 0 taken 222 times.
✓ Branch 1 taken 37 times.
|
518 | if (socket_fd_ >= 0) { |
| 102 | 444 | close(socket_fd_); | |
| 103 | } | ||
| 104 |
2/2✓ Branch 0 taken 222 times.
✓ Branch 1 taken 37 times.
|
518 | if (res_) { |
| 105 | 444 | freeaddrinfo(res_); | |
| 106 | } | ||
| 107 | } | ||
| 108 | |||
| 109 | /** | ||
| 110 | * Creates a string in the influx data format containing the absolute values | ||
| 111 | * of the counters. Counters are only included if their absolute value is > 0. | ||
| 112 | * | ||
| 113 | * Influx dataformat | ||
| 114 | * ( https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/ | ||
| 115 | ) | ||
| 116 | * Syntax | ||
| 117 | <measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] | ||
| 118 | <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>] | ||
| 119 | * | ||
| 120 | * Example | ||
| 121 | myMeasurement,tag1=value1,tag2=value2 fieldKey="fieldValue" | ||
| 122 | 1556813561098000000 | ||
| 123 | */ | ||
| 124 | 296 | std::string TelemetryAggregatorInflux::MakePayload() { | |
| 125 | // measurement and tags | ||
| 126 |
1/2✓ Branch 1 taken 296 times.
✗ Branch 2 not taken.
|
296 | std::string metric_name = influx_metric_name_; |
| 127 |
2/2✓ Branch 0 taken 259 times.
✓ Branch 1 taken 37 times.
|
296 | if (send_delta_) { |
| 128 |
1/2✓ Branch 1 taken 259 times.
✗ Branch 2 not taken.
|
259 | metric_name += "_absolute"; |
| 129 | } | ||
| 130 |
2/4✓ Branch 1 taken 296 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 296 times.
✗ Branch 5 not taken.
|
296 | std::string ret = metric_name + ",repo=" + fqrn_; |
| 131 | |||
| 132 |
3/4✓ Branch 1 taken 296 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 148 times.
✓ Branch 4 taken 148 times.
|
296 | if (influx_extra_tags_ != "") { |
| 133 |
2/4✓ Branch 1 taken 148 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 148 times.
✗ Branch 5 not taken.
|
148 | ret += "," + influx_extra_tags_; |
| 134 | } | ||
| 135 | |||
| 136 | // fields | ||
| 137 |
1/2✓ Branch 1 taken 296 times.
✗ Branch 2 not taken.
|
296 | ret += " "; |
| 138 | 296 | bool add_token = false; | |
| 139 | 592 | for (std::map<std::string, int64_t>::iterator it = counters_.begin(), | |
| 140 | 296 | iEnd = counters_.end(); | |
| 141 |
2/2✓ Branch 1 taken 888 times.
✓ Branch 2 taken 296 times.
|
1184 | it != iEnd; |
| 142 | 888 | it++) { | |
| 143 |
2/2✓ Branch 1 taken 407 times.
✓ Branch 2 taken 481 times.
|
888 | if (it->second != 0) { |
| 144 |
2/2✓ Branch 0 taken 222 times.
✓ Branch 1 taken 185 times.
|
407 | if (add_token) { |
| 145 |
1/2✓ Branch 1 taken 222 times.
✗ Branch 2 not taken.
|
222 | ret += ","; |
| 146 | } | ||
| 147 |
4/8✓ Branch 2 taken 407 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 407 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 407 times.
✗ Branch 10 not taken.
✓ Branch 12 taken 407 times.
✗ Branch 13 not taken.
|
407 | ret += it->first + "=" + StringifyInt(it->second); |
| 148 | 407 | add_token = true; | |
| 149 | } | ||
| 150 | } | ||
| 151 |
3/4✓ Branch 1 taken 296 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 148 times.
✓ Branch 4 taken 148 times.
|
296 | if (influx_extra_fields_ != "") { |
| 152 |
2/2✓ Branch 0 taken 74 times.
✓ Branch 1 taken 74 times.
|
148 | if (add_token) { |
| 153 |
1/2✓ Branch 1 taken 74 times.
✗ Branch 2 not taken.
|
74 | ret += ","; |
| 154 | } | ||
| 155 |
1/2✓ Branch 1 taken 148 times.
✗ Branch 2 not taken.
|
148 | ret += influx_extra_fields_; |
| 156 | 148 | add_token = true; | |
| 157 | } | ||
| 158 | |||
| 159 | // timestamp | ||
| 160 |
2/2✓ Branch 0 taken 259 times.
✓ Branch 1 taken 37 times.
|
296 | if (add_token) { |
| 161 |
1/2✓ Branch 1 taken 259 times.
✗ Branch 2 not taken.
|
259 | ret += " "; |
| 162 | } | ||
| 163 |
2/4✓ Branch 1 taken 296 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 296 times.
✗ Branch 5 not taken.
|
296 | ret += StringifyUint(timestamp_); |
| 164 | |||
| 165 | 592 | return ret; | |
| 166 | 296 | } | |
| 167 | |||
| 168 | /** | ||
| 169 | * Creates a string in the influx data format containing the delta between | ||
| 170 | * 2 measurements of the same counter. Counters are only included if their | ||
| 171 | * absolute value is > 0 (delta can be 0). | ||
| 172 | * | ||
| 173 | * NOTE: As influx_extra_fields_ are static, they are excluded of this | ||
| 174 | * delta format | ||
| 175 | * | ||
| 176 | * Influx dataformat | ||
| 177 | * ( https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/ | ||
| 178 | ) | ||
| 179 | * Syntax | ||
| 180 | <measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] | ||
| 181 | <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>] | ||
| 182 | * | ||
| 183 | * Example | ||
| 184 | myMeasurement,tag1=value1,tag2=value2 fieldKey="fieldValue" | ||
| 185 | 1556813561098000000 | ||
| 186 | */ | ||
| 187 | 74 | std::string TelemetryAggregatorInflux::MakeDeltaPayload() { | |
| 188 | // measurement and tags | ||
| 189 |
2/4✓ Branch 2 taken 74 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 74 times.
✗ Branch 6 not taken.
|
148 | std::string ret = "" + influx_metric_name_ + "_delta,repo=" + fqrn_; |
| 190 | |||
| 191 |
2/4✓ Branch 1 taken 74 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 74 times.
✗ Branch 4 not taken.
|
74 | if (influx_extra_tags_ != "") { |
| 192 |
2/4✓ Branch 1 taken 74 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 74 times.
✗ Branch 5 not taken.
|
74 | ret += "," + influx_extra_tags_; |
| 193 | } | ||
| 194 | |||
| 195 | // fields | ||
| 196 |
1/2✓ Branch 1 taken 74 times.
✗ Branch 2 not taken.
|
74 | ret += " "; |
| 197 | 74 | bool add_token = false; | |
| 198 | 148 | for (std::map<std::string, int64_t>::iterator it = counters_.begin(), | |
| 199 | 74 | iEnd = counters_.end(); | |
| 200 |
2/2✓ Branch 1 taken 222 times.
✓ Branch 2 taken 74 times.
|
296 | it != iEnd; |
| 201 | 222 | it++) { | |
| 202 | 222 | const int64_t value = it->second; | |
| 203 |
2/2✓ Branch 0 taken 185 times.
✓ Branch 1 taken 37 times.
|
222 | if (value != 0) { |
| 204 | int64_t old_value; | ||
| 205 | try { | ||
| 206 |
1/2✓ Branch 2 taken 185 times.
✗ Branch 3 not taken.
|
185 | old_value = old_counters_.at(it->first); |
| 207 | ✗ | } catch (const std::out_of_range &ex) { | |
| 208 | ✗ | old_value = 0; | |
| 209 | } | ||
| 210 |
2/2✓ Branch 0 taken 111 times.
✓ Branch 1 taken 74 times.
|
185 | if (add_token) { |
| 211 |
1/2✓ Branch 1 taken 111 times.
✗ Branch 2 not taken.
|
111 | ret += ","; |
| 212 | } | ||
| 213 |
4/8✓ Branch 1 taken 185 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 185 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 185 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 185 times.
✗ Branch 12 not taken.
|
185 | ret += it->first + "=" + StringifyInt(value - old_value); |
| 214 | 185 | add_token = true; | |
| 215 | } | ||
| 216 | } | ||
| 217 | |||
| 218 | // timestamp | ||
| 219 |
1/2✓ Branch 0 taken 74 times.
✗ Branch 1 not taken.
|
74 | if (add_token) { |
| 220 |
1/2✓ Branch 1 taken 74 times.
✗ Branch 2 not taken.
|
74 | ret += " "; |
| 221 | } | ||
| 222 |
2/4✓ Branch 1 taken 74 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 74 times.
✗ Branch 5 not taken.
|
74 | ret += StringifyUint(timestamp_); |
| 223 | |||
| 224 | 74 | return ret; | |
| 225 | } | ||
| 226 | |||
| 227 | 222 | TelemetryReturn TelemetryAggregatorInflux::OpenSocket() { | |
| 228 | 222 | const char *hostname = influx_host_.c_str(); | |
| 229 | struct addrinfo hints; | ||
| 230 | int err; | ||
| 231 | |||
| 232 | 222 | memset(&hints, 0, sizeof(hints)); | |
| 233 | 222 | hints.ai_family = AF_INET; | |
| 234 | 222 | hints.ai_socktype = SOCK_DGRAM; | |
| 235 | |||
| 236 |
1/2✓ Branch 2 taken 222 times.
✗ Branch 3 not taken.
|
222 | err = getaddrinfo(influx_host_.c_str(), NULL, &hints, &res_); |
| 237 |
2/4✓ Branch 0 taken 222 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 222 times.
|
222 | if (err != 0 || res_ == NULL) { |
| 238 | ✗ | LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr, | |
| 239 | ✗ | "Failed to resolve influx server [%s]. errno=%d", hostname, errno); | |
| 240 | ✗ | return kTelemetryFailHostAddress; | |
| 241 | } | ||
| 242 | |||
| 243 | 222 | socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0); | |
| 244 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 222 times.
|
222 | if (socket_fd_ < 0) { |
| 245 | ✗ | LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr, "Failed to open socket"); | |
| 246 | ✗ | freeaddrinfo(res_); | |
| 247 | ✗ | return kTelemetryFailSocket; | |
| 248 | } | ||
| 249 | |||
| 250 | 222 | return kTelemetrySuccess; | |
| 251 | } | ||
| 252 | |||
| 253 | ✗ | TelemetryReturn TelemetryAggregatorInflux::SendToInflux( | |
| 254 | const std::string &payload) { | ||
| 255 | ✗ | struct sockaddr_in *dest_addr = NULL; | |
| 256 | ✗ | dest_addr = reinterpret_cast<sockaddr_in *>(res_->ai_addr); | |
| 257 | ✗ | dest_addr->sin_port = htons(influx_port_); | |
| 258 | |||
| 259 | ✗ | const ssize_t num_bytes_sent = sendto( | |
| 260 | ✗ | socket_fd_, payload.data(), payload.size(), 0, | |
| 261 | reinterpret_cast<struct sockaddr *>(dest_addr), | ||
| 262 | sizeof(struct sockaddr_in)); | ||
| 263 | |||
| 264 | ✗ | if (num_bytes_sent < 0) { | |
| 265 | ✗ | LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr, | |
| 266 | ✗ | "Failed to send to influx. errno=%d", errno); | |
| 267 | ✗ | return kTelemetryFailSend; | |
| 268 | ✗ | } else if (static_cast<size_t>(num_bytes_sent) != payload.size()) { | |
| 269 | ✗ | LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr, | |
| 270 | "Incomplete send. Bytes transferred: %zd. Bytes expected %lu", | ||
| 271 | num_bytes_sent, payload.size()); | ||
| 272 | ✗ | return kTelemetryFailSend; | |
| 273 | } | ||
| 274 | |||
| 275 | ✗ | LogCvmfs(kLogTelemetry, kLogDebug, "INFLUX: POSTING [%s]", payload.c_str()); | |
| 276 | ✗ | return kTelemetrySuccess; | |
| 277 | } | ||
| 278 | |||
| 279 | |||
| 280 | ✗ | void TelemetryAggregatorInflux::PushMetrics() { | |
| 281 | // create payload | ||
| 282 | ✗ | std::string payload = MakePayload(); | |
| 283 | ✗ | std::string delta_payload = ""; | |
| 284 | ✗ | if (send_delta_ && old_counters_.size() > 0) { | |
| 285 | ✗ | delta_payload = MakeDeltaPayload(); | |
| 286 | ✗ | payload = payload + "\n" + delta_payload; | |
| 287 | } | ||
| 288 | ✗ | payload += "\n"; | |
| 289 | |||
| 290 | // send to influx | ||
| 291 | ✗ | SendToInflux(payload); | |
| 292 | |||
| 293 | // current counter is now old counter (only if delta sending is enabled) | ||
| 294 | ✗ | if (send_delta_) { | |
| 295 | ✗ | counters_.swap(old_counters_); | |
| 296 | } | ||
| 297 | } | ||
| 298 | |||
| 299 | } // namespace perf | ||
| 300 |