GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/telemetry_aggregator.cc
Date: 2024-04-28 02:33:07
Exec Total Coverage
Lines: 2 59 3.4%
Branches: 1 50 2.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "telemetry_aggregator.h"
6
7 #include <errno.h>
8 #include <poll.h>
9 #include <unistd.h>
10
11 #include "util/exception.h"
12 #include "util/logging.h"
13 #include "util/platform.h"
14 #include "util/pointer.h"
15 #include "util/posix.h"
16
17 #include "telemetry_aggregator_influx.h"
18 namespace perf {
19
20 TelemetryAggregator* TelemetryAggregator::Create(Statistics* statistics,
21 int send_rate,
22 OptionsManager *options_mgr,
23 const std::string &fqrn,
24 const TelemetrySelector type) {
25 UniquePtr<TelemetryAggregatorInflux> telemetryInflux;
26 UniquePtr<TelemetryAggregator> *telemetry;
27
28 switch (type) {
29 case kTelemetryInflux:
30 telemetryInflux = new TelemetryAggregatorInflux(statistics, send_rate,
31 options_mgr, fqrn);
32 telemetry = reinterpret_cast<UniquePtr<TelemetryAggregator>*>
33 (&telemetryInflux);
34 break;
35 default:
36 LogCvmfs(kLogTelemetry, kLogDebug,
37 "No implementation available for given telemetry class.");
38 return NULL;
39 break;
40 }
41
42 if (telemetry->weak_ref()->is_zombie_) {
43 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr,
44 "Requested telemetry will NOT be used. "
45 "It was not constructed correctly.");
46 return NULL;
47 }
48
49 LogCvmfs(kLogTelemetry, kLogDebug, "TelemetryAggregator created.");
50 return telemetry->Release();
51 }
52
53 8 TelemetryAggregator::~TelemetryAggregator() {
54
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
8 if (pipe_terminate_[1] >= 0) {
55 char t = 'T';
56 WritePipe(pipe_terminate_[1], &t, 1);
57 pthread_join(thread_telemetry_, NULL);
58 ClosePipe(pipe_terminate_);
59 }
60 }
61
62 void TelemetryAggregator::Spawn() {
63 assert(pipe_terminate_[0] == -1);
64 assert(send_rate_sec_ > 0);
65 MakePipe(pipe_terminate_);
66 int retval = pthread_create(&thread_telemetry_, NULL, MainTelemetry, this);
67 assert(retval == 0);
68 LogCvmfs(kLogTelemetry, kLogDebug, "Spawning of telemetry thread.");
69 }
70
71 void *TelemetryAggregator::MainTelemetry(void *data) {
72 TelemetryAggregator *telemetry = reinterpret_cast<TelemetryAggregator*>(data);
73 Statistics *statistics = telemetry->statistics_;
74
75 struct pollfd watch_term;
76 watch_term.fd = telemetry->pipe_terminate_[0];
77 watch_term.events = POLLIN | POLLPRI;
78 int timeout_ms = telemetry->send_rate_sec_ * 1000;
79 uint64_t deadline_sec = platform_monotonic_time()
80 + telemetry->send_rate_sec_;
81 while (true) {
82 // sleep and check if end - blocking wait for "send_rate_sec_" seconds
83 watch_term.revents = 0;
84 int retval = poll(&watch_term, 1, timeout_ms);
85 if (retval < 0) {
86 if (errno == EINTR) { // external interrupt occurred - no error for us
87 if (timeout_ms >= 0) {
88 uint64_t now = platform_monotonic_time();
89 timeout_ms = (now > deadline_sec) ? 0 :
90 static_cast<int>((deadline_sec - now) * 1000);
91 }
92 continue;
93 }
94 PANIC(kLogSyslogErr | kLogDebug, "Error in telemetry thread. "
95 "Poll returned %d", retval);
96 }
97
98 // reset timeout and deadline of poll
99 timeout_ms = telemetry->send_rate_sec_ * 1000;
100 deadline_sec = platform_monotonic_time() + telemetry->send_rate_sec_;
101
102 // aggregate + send stuff
103 if (retval == 0) {
104 statistics->SnapshotCounters(&telemetry->counters_,
105 &telemetry->timestamp_);
106 telemetry->PushMetrics();
107 continue;
108 }
109
110 // stop thread due to poll event
111 assert(watch_term.revents != 0);
112
113 char c = 0;
114 ReadPipe(telemetry->pipe_terminate_[0], &c, 1);
115 assert(c == 'T');
116 break;
117 }
118 LogCvmfs(kLogTelemetry, kLogDebug, "Stopping telemetry thread");
119 return NULL;
120 }
121
122 } // namespace perf
123