GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/telemetry_aggregator.cc
Date: 2025-02-09 02:34:19
Exec Total Coverage
Lines: 2 94 2.1%
Branches: 1 110 0.9%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "telemetry_aggregator.h"
6
7 #include <errno.h>
8 #include <poll.h>
9 #include <unistd.h>
10
11 #include "glue_buffer.h"
12 #include "shortstring.h"
13 #include "util/exception.h"
14 #include "util/logging.h"
15 #include "util/platform.h"
16 #include "util/pointer.h"
17 #include "util/posix.h"
18
19 #include "telemetry_aggregator_influx.h"
20 namespace perf {
21
22 TelemetryAggregator* TelemetryAggregator::Create(Statistics* statistics,
23 int send_rate,
24 OptionsManager *options_mgr,
25 MountPoint* mount_point,
26 const std::string &fqrn,
27 const TelemetrySelector type) {
28 UniquePtr<TelemetryAggregatorInflux> telemetryInflux;
29 UniquePtr<TelemetryAggregator> *telemetry;
30
31 switch (type) {
32 case kTelemetryInflux:
33 telemetryInflux = new TelemetryAggregatorInflux(statistics, send_rate,
34 options_mgr, mount_point, fqrn);
35 telemetry = reinterpret_cast<UniquePtr<TelemetryAggregator>*>
36 (&telemetryInflux);
37 break;
38 default:
39 LogCvmfs(kLogTelemetry, kLogDebug,
40 "No implementation available for given telemetry class.");
41 return NULL;
42 break;
43 }
44
45 if (telemetry->weak_ref()->is_zombie_) {
46 LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr,
47 "Requested telemetry will NOT be used. "
48 "It was not constructed correctly.");
49 return NULL;
50 }
51
52 LogCvmfs(kLogTelemetry, kLogDebug, "TelemetryAggregator created.");
53 return telemetry->Release();
54 }
55
56 8 TelemetryAggregator::~TelemetryAggregator() {
57
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
8 if (pipe_terminate_[1] >= 0) {
58 char t = 'T';
59 WritePipe(pipe_terminate_[1], &t, 1);
60 pthread_join(thread_telemetry_, NULL);
61 ClosePipe(pipe_terminate_);
62 }
63 }
64
65 void TelemetryAggregator::Spawn() {
66 assert(pipe_terminate_[0] == -1);
67 assert(send_rate_sec_ > 0);
68 MakePipe(pipe_terminate_);
69 int retval = pthread_create(&thread_telemetry_, NULL, MainTelemetry, this);
70 assert(retval == 0);
71 LogCvmfs(kLogTelemetry, kLogDebug, "Spawning of telemetry thread.");
72 }
73
74 void TelemetryAggregator::ManuallyUpdateSelectedCounters() {
75 if (!mount_point_) {
76 return;
77 }
78
79 // Manually setting the inode tracker numbers
80 glue::InodeTracker::Statistics inode_stats =
81 mount_point_->inode_tracker()->GetStatistics();
82 glue::DentryTracker::Statistics dentry_stats =
83 mount_point_->dentry_tracker()->GetStatistics();
84 glue::PageCacheTracker::Statistics page_cache_stats =
85 mount_point_->page_cache_tracker()->GetStatistics();
86 mount_point_->statistics()->Lookup("inode_tracker.n_insert")->
87 Set(atomic_read64(&inode_stats.num_inserts));
88 mount_point_->statistics()->Lookup("inode_tracker.n_remove")->
89 Set(atomic_read64(&inode_stats.num_removes));
90 mount_point_->statistics()->Lookup("inode_tracker.no_reference")->
91 Set(atomic_read64(&inode_stats.num_references));
92 mount_point_->statistics()->Lookup("inode_tracker.n_hit_inode")->
93 Set(atomic_read64(&inode_stats.num_hits_inode));
94 mount_point_->statistics()->Lookup("inode_tracker.n_hit_path")->
95 Set(atomic_read64(&inode_stats.num_hits_path));
96 mount_point_->statistics()->Lookup("inode_tracker.n_miss_path")->
97 Set(atomic_read64(&inode_stats.num_misses_path));
98 mount_point_->statistics()->Lookup("dentry_tracker.n_insert")->
99 Set(dentry_stats.num_insert);
100 mount_point_->statistics()->Lookup("dentry_tracker.n_remove")->
101 Set(dentry_stats.num_remove);
102 mount_point_->statistics()->Lookup("dentry_tracker.n_prune")->
103 Set(dentry_stats.num_prune);
104 mount_point_->statistics()->Lookup("page_cache_tracker.n_insert")->
105 Set(page_cache_stats.n_insert);
106 mount_point_->statistics()->Lookup("page_cache_tracker.n_remove")->
107 Set(page_cache_stats.n_remove);
108 mount_point_->statistics()->Lookup("page_cache_tracker.n_open_direct")->
109 Set(page_cache_stats.n_open_direct);
110 mount_point_->statistics()->Lookup("page_cache_tracker.n_open_flush")->
111 Set(page_cache_stats.n_open_flush);
112 mount_point_->statistics()->Lookup("page_cache_tracker.n_open_cached")->
113 Set(page_cache_stats.n_open_cached);
114 }
115
116 void *TelemetryAggregator::MainTelemetry(void *data) {
117 TelemetryAggregator *telemetry = reinterpret_cast<TelemetryAggregator*>(data);
118 Statistics *statistics = telemetry->statistics_;
119
120 struct pollfd watch_term;
121 watch_term.fd = telemetry->pipe_terminate_[0];
122 watch_term.events = POLLIN | POLLPRI;
123 int timeout_ms = telemetry->send_rate_sec_ * 1000;
124 uint64_t deadline_sec = platform_monotonic_time()
125 + telemetry->send_rate_sec_;
126 while (true) {
127 // sleep and check if end - blocking wait for "send_rate_sec_" seconds
128 watch_term.revents = 0;
129 int retval = poll(&watch_term, 1, timeout_ms);
130 if (retval < 0) {
131 if (errno == EINTR) { // external interrupt occurred - no error for us
132 if (timeout_ms >= 0) {
133 uint64_t now = platform_monotonic_time();
134 timeout_ms = (now > deadline_sec) ? 0 :
135 static_cast<int>((deadline_sec - now) * 1000);
136 }
137 continue;
138 }
139 PANIC(kLogSyslogErr | kLogDebug, "Error in telemetry thread. "
140 "Poll returned %d", retval);
141 }
142
143 // reset timeout and deadline of poll
144 timeout_ms = telemetry->send_rate_sec_ * 1000;
145 deadline_sec = platform_monotonic_time() + telemetry->send_rate_sec_;
146
147 // aggregate + send stuff
148 if (retval == 0) {
149 telemetry->ManuallyUpdateSelectedCounters();
150 statistics->SnapshotCounters(&telemetry->counters_,
151 &telemetry->timestamp_);
152 telemetry->PushMetrics();
153 continue;
154 }
155
156 // stop thread due to poll event
157 assert(watch_term.revents != 0);
158
159 char c = 0;
160 ReadPipe(telemetry->pipe_terminate_[0], &c, 1);
161 assert(c == 'T');
162 break;
163 }
164 LogCvmfs(kLogTelemetry, kLogDebug, "Stopping telemetry thread");
165 return NULL;
166 }
167
168 } // namespace perf
169