GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/telemetry_aggregator.cc
Date: 2025-06-22 02:36:02
Exec Total Coverage
Lines: 2 95 2.1%
Branches: 1 110 0.9%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "telemetry_aggregator.h"
6
7 #include <errno.h>
8 #include <poll.h>
9 #include <unistd.h>
10
11 #include "glue_buffer.h"
12 #include "shortstring.h"
13 #include "telemetry_aggregator_influx.h"
14 #include "util/exception.h"
15 #include "util/logging.h"
16 #include "util/platform.h"
17 #include "util/pointer.h"
18 #include "util/posix.h"
19 namespace perf {
20
21 TelemetryAggregator *TelemetryAggregator::Create(Statistics *statistics,
22 int send_rate,
23 OptionsManager *options_mgr,
24 MountPoint *mount_point,
25 const std::string &fqrn,
26 const TelemetrySelector type) {
27 UniquePtr<TelemetryAggregatorInflux> telemetryInflux;
28 UniquePtr<TelemetryAggregator> *telemetry;
29
30 switch (type) {
31 case kTelemetryInflux:
32 telemetryInflux = new TelemetryAggregatorInflux(
33 statistics, send_rate, options_mgr, mount_point, fqrn);
34 telemetry = reinterpret_cast<UniquePtr<TelemetryAggregator> *>(
35 &telemetryInflux);
36 break;
37 default:
38 LogCvmfs(kLogTelemetry, kLogDebug,
39 "No implementation available for given telemetry class.");
40 return NULL;
41 break;
42 }
43
44 if (telemetry->weak_ref()->is_zombie_) {
45 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr,
46 "Requested telemetry will NOT be used. "
47 "It was not constructed correctly.");
48 return NULL;
49 }
50
51 LogCvmfs(kLogTelemetry, kLogDebug, "TelemetryAggregator created.");
52 return telemetry->Release();
53 }
54
55 40 TelemetryAggregator::~TelemetryAggregator() {
56
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20 times.
40 if (pipe_terminate_[1] >= 0) {
57 char t = 'T';
58 WritePipe(pipe_terminate_[1], &t, 1);
59 pthread_join(thread_telemetry_, NULL);
60 ClosePipe(pipe_terminate_);
61 }
62 }
63
64 void TelemetryAggregator::Spawn() {
65 assert(pipe_terminate_[0] == -1);
66 assert(send_rate_sec_ > 0);
67 MakePipe(pipe_terminate_);
68 const int retval =
69 pthread_create(&thread_telemetry_, NULL, MainTelemetry, this);
70 assert(retval == 0);
71 LogCvmfs(kLogTelemetry, kLogDebug, "Spawning of telemetry thread.");
72 }
73
74 void TelemetryAggregator::ManuallyUpdateSelectedCounters() {
75 if (!mount_point_) {
76 return;
77 }
78
79 // Manually setting the inode tracker numbers
80 glue::InodeTracker::Statistics inode_stats = mount_point_->inode_tracker()
81 ->GetStatistics();
82 const glue::DentryTracker::Statistics dentry_stats =
83 mount_point_->dentry_tracker()->GetStatistics();
84 const glue::PageCacheTracker::Statistics page_cache_stats =
85 mount_point_->page_cache_tracker()->GetStatistics();
86 mount_point_->statistics()
87 ->Lookup("inode_tracker.n_insert")
88 ->Set(atomic_read64(&inode_stats.num_inserts));
89 mount_point_->statistics()
90 ->Lookup("inode_tracker.n_remove")
91 ->Set(atomic_read64(&inode_stats.num_removes));
92 mount_point_->statistics()
93 ->Lookup("inode_tracker.no_reference")
94 ->Set(atomic_read64(&inode_stats.num_references));
95 mount_point_->statistics()
96 ->Lookup("inode_tracker.n_hit_inode")
97 ->Set(atomic_read64(&inode_stats.num_hits_inode));
98 mount_point_->statistics()
99 ->Lookup("inode_tracker.n_hit_path")
100 ->Set(atomic_read64(&inode_stats.num_hits_path));
101 mount_point_->statistics()
102 ->Lookup("inode_tracker.n_miss_path")
103 ->Set(atomic_read64(&inode_stats.num_misses_path));
104 mount_point_->statistics()
105 ->Lookup("dentry_tracker.n_insert")
106 ->Set(dentry_stats.num_insert);
107 mount_point_->statistics()
108 ->Lookup("dentry_tracker.n_remove")
109 ->Set(dentry_stats.num_remove);
110 mount_point_->statistics()
111 ->Lookup("dentry_tracker.n_prune")
112 ->Set(dentry_stats.num_prune);
113 mount_point_->statistics()
114 ->Lookup("page_cache_tracker.n_insert")
115 ->Set(page_cache_stats.n_insert);
116 mount_point_->statistics()
117 ->Lookup("page_cache_tracker.n_remove")
118 ->Set(page_cache_stats.n_remove);
119 mount_point_->statistics()
120 ->Lookup("page_cache_tracker.n_open_direct")
121 ->Set(page_cache_stats.n_open_direct);
122 mount_point_->statistics()
123 ->Lookup("page_cache_tracker.n_open_flush")
124 ->Set(page_cache_stats.n_open_flush);
125 mount_point_->statistics()
126 ->Lookup("page_cache_tracker.n_open_cached")
127 ->Set(page_cache_stats.n_open_cached);
128 }
129
130 void *TelemetryAggregator::MainTelemetry(void *data) {
131 TelemetryAggregator *telemetry = reinterpret_cast<TelemetryAggregator *>(
132 data);
133 Statistics *statistics = telemetry->statistics_;
134
135 struct pollfd watch_term;
136 watch_term.fd = telemetry->pipe_terminate_[0];
137 watch_term.events = POLLIN | POLLPRI;
138 int timeout_ms = telemetry->send_rate_sec_ * 1000;
139 uint64_t deadline_sec = platform_monotonic_time() + telemetry->send_rate_sec_;
140 while (true) {
141 // sleep and check if end - blocking wait for "send_rate_sec_" seconds
142 watch_term.revents = 0;
143 const int retval = poll(&watch_term, 1, timeout_ms);
144 if (retval < 0) {
145 if (errno == EINTR) { // external interrupt occurred - no error for us
146 if (timeout_ms >= 0) {
147 const uint64_t now = platform_monotonic_time();
148 timeout_ms = (now > deadline_sec)
149 ? 0
150 : static_cast<int>((deadline_sec - now) * 1000);
151 }
152 continue;
153 }
154 PANIC(kLogSyslogErr | kLogDebug,
155 "Error in telemetry thread. "
156 "Poll returned %d",
157 retval);
158 }
159
160 // reset timeout and deadline of poll
161 timeout_ms = telemetry->send_rate_sec_ * 1000;
162 deadline_sec = platform_monotonic_time() + telemetry->send_rate_sec_;
163
164 // aggregate + send stuff
165 if (retval == 0) {
166 telemetry->ManuallyUpdateSelectedCounters();
167 statistics->SnapshotCounters(&telemetry->counters_,
168 &telemetry->timestamp_);
169 telemetry->PushMetrics();
170 continue;
171 }
172
173 // stop thread due to poll event
174 assert(watch_term.revents != 0);
175
176 char c = 0;
177 ReadPipe(telemetry->pipe_terminate_[0], &c, 1);
178 assert(c == 'T');
179 break;
180 }
181 LogCvmfs(kLogTelemetry, kLogDebug, "Stopping telemetry thread");
182 return NULL;
183 }
184
185 } // namespace perf
186