GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/telemetry_aggregator.cc
Date: 2026-04-26 02:35:59
Exec Total Coverage
Lines: 2 95 2.1%
Branches: 1 110 0.9%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "telemetry_aggregator.h"
6
7 #include <errno.h>
8 #include <poll.h>
9 #include <unistd.h>
10
11 #include "glue_buffer.h"
12 #include "telemetry_aggregator_influx.h"
13 #include "util/exception.h"
14 #include "util/logging.h"
15 #include "util/pointer.h"
16 #include "util/posix.h"
17 namespace perf {
18
19 TelemetryAggregator *TelemetryAggregator::Create(Statistics *statistics,
20 int send_rate,
21 OptionsManager *options_mgr,
22 MountPoint *mount_point,
23 const std::string &fqrn,
24 const TelemetrySelector type) {
25 UniquePtr<TelemetryAggregatorInflux> telemetryInflux;
26 UniquePtr<TelemetryAggregator> *telemetry;
27
28 switch (type) {
29 case kTelemetryInflux:
30 telemetryInflux = new TelemetryAggregatorInflux(
31 statistics, send_rate, options_mgr, mount_point, fqrn);
32 telemetry = reinterpret_cast<UniquePtr<TelemetryAggregator> *>(
33 &telemetryInflux);
34 break;
35 default:
36 LogCvmfs(kLogTelemetry, kLogDebug,
37 "No implementation available for given telemetry class.");
38 return NULL;
39 break;
40 }
41
42 if (telemetry->weak_ref()->is_zombie_) {
43 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr,
44 "Requested telemetry will NOT be used. "
45 "It was not constructed correctly.");
46 return NULL;
47 }
48
49 LogCvmfs(kLogTelemetry, kLogDebug, "TelemetryAggregator created.");
50 return telemetry->Release();
51 }
52
53 518 TelemetryAggregator::~TelemetryAggregator() {
54
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 259 times.
518 if (pipe_terminate_[1] >= 0) {
55 char t = 'T';
56 WritePipe(pipe_terminate_[1], &t, 1);
57 pthread_join(thread_telemetry_, NULL);
58 ClosePipe(pipe_terminate_);
59 }
60 }
61
62 void TelemetryAggregator::Spawn() {
63 assert(pipe_terminate_[0] == -1);
64 assert(send_rate_sec_ > 0);
65 MakePipe(pipe_terminate_);
66 const int retval = pthread_create(&thread_telemetry_, NULL, MainTelemetry,
67 this);
68 assert(retval == 0);
69 LogCvmfs(kLogTelemetry, kLogDebug, "Spawning of telemetry thread.");
70 }
71
72 void TelemetryAggregator::ManuallyUpdateSelectedCounters() {
73 if (!mount_point_) {
74 return;
75 }
76
77 // Manually setting the inode tracker numbers
78 glue::InodeTracker::Statistics inode_stats = mount_point_->inode_tracker()
79 ->GetStatistics();
80 const glue::DentryTracker::Statistics
81 dentry_stats = mount_point_->dentry_tracker()->GetStatistics();
82 const glue::PageCacheTracker::Statistics
83 page_cache_stats = mount_point_->page_cache_tracker()->GetStatistics();
84 mount_point_->statistics()
85 ->Lookup("inode_tracker.n_insert")
86 ->Set(atomic_read64(&inode_stats.num_inserts));
87 mount_point_->statistics()
88 ->Lookup("inode_tracker.n_remove")
89 ->Set(atomic_read64(&inode_stats.num_removes));
90 mount_point_->statistics()
91 ->Lookup("inode_tracker.no_reference")
92 ->Set(atomic_read64(&inode_stats.num_references));
93 mount_point_->statistics()
94 ->Lookup("inode_tracker.n_hit_inode")
95 ->Set(atomic_read64(&inode_stats.num_hits_inode));
96 mount_point_->statistics()
97 ->Lookup("inode_tracker.n_hit_path")
98 ->Set(atomic_read64(&inode_stats.num_hits_path));
99 mount_point_->statistics()
100 ->Lookup("inode_tracker.n_miss_path")
101 ->Set(atomic_read64(&inode_stats.num_misses_path));
102 mount_point_->statistics()
103 ->Lookup("dentry_tracker.n_insert")
104 ->Set(dentry_stats.num_insert);
105 mount_point_->statistics()
106 ->Lookup("dentry_tracker.n_remove")
107 ->Set(dentry_stats.num_remove);
108 mount_point_->statistics()
109 ->Lookup("dentry_tracker.n_prune")
110 ->Set(dentry_stats.num_prune);
111 mount_point_->statistics()
112 ->Lookup("page_cache_tracker.n_insert")
113 ->Set(page_cache_stats.n_insert);
114 mount_point_->statistics()
115 ->Lookup("page_cache_tracker.n_remove")
116 ->Set(page_cache_stats.n_remove);
117 mount_point_->statistics()
118 ->Lookup("page_cache_tracker.n_open_direct")
119 ->Set(page_cache_stats.n_open_direct);
120 mount_point_->statistics()
121 ->Lookup("page_cache_tracker.n_open_flush")
122 ->Set(page_cache_stats.n_open_flush);
123 mount_point_->statistics()
124 ->Lookup("page_cache_tracker.n_open_cached")
125 ->Set(page_cache_stats.n_open_cached);
126 }
127
128 void *TelemetryAggregator::MainTelemetry(void *data) {
129 TelemetryAggregator *telemetry = reinterpret_cast<TelemetryAggregator *>(
130 data);
131 Statistics *statistics = telemetry->statistics_;
132
133 struct pollfd watch_term;
134 watch_term.fd = telemetry->pipe_terminate_[0];
135 watch_term.events = POLLIN | POLLPRI;
136 int timeout_ms = telemetry->send_rate_sec_ * 1000;
137 uint64_t deadline_sec = platform_monotonic_time() + telemetry->send_rate_sec_;
138 while (true) {
139 // sleep and check if end - blocking wait for "send_rate_sec_" seconds
140 watch_term.revents = 0;
141 const int retval = poll(&watch_term, 1, timeout_ms);
142 if (retval < 0) {
143 if (errno == EINTR) { // external interrupt occurred - no error for us
144 if (timeout_ms >= 0) {
145 const uint64_t now = platform_monotonic_time();
146 timeout_ms = (now > deadline_sec)
147 ? 0
148 : static_cast<int>((deadline_sec - now) * 1000);
149 }
150 continue;
151 }
152 PANIC(kLogSyslogErr | kLogDebug,
153 "Error in telemetry thread. "
154 "Poll returned %d",
155 retval);
156 }
157
158 // reset timeout and deadline of poll
159 timeout_ms = telemetry->send_rate_sec_ * 1000;
160 deadline_sec = platform_monotonic_time() + telemetry->send_rate_sec_;
161
162 // aggregate + send stuff
163 if (retval == 0) {
164 telemetry->ManuallyUpdateSelectedCounters();
165 statistics->SnapshotCounters(&telemetry->counters_,
166 &telemetry->timestamp_);
167 telemetry->PushMetrics();
168 continue;
169 }
170
171 // stop thread due to poll event
172 assert(watch_term.revents != 0);
173
174 char c = 0;
175 ReadPipe(telemetry->pipe_terminate_[0], &c, 1);
176 assert(c == 'T');
177 break;
178 }
179 LogCvmfs(kLogTelemetry, kLogDebug, "Stopping telemetry thread");
180 return NULL;
181 }
182
183 } // namespace perf
184