Line |
Branch |
Exec |
Source |
1 |
|
|
/** |
2 |
|
|
* This file is part of the CernVM File System. |
3 |
|
|
*/ |
4 |
|
|
|
5 |
|
|
#include "telemetry_aggregator.h" |
6 |
|
|
|
7 |
|
|
#include <errno.h> |
8 |
|
|
#include <poll.h> |
9 |
|
|
#include <unistd.h> |
10 |
|
|
|
11 |
|
|
#include "glue_buffer.h" |
12 |
|
|
#include "shortstring.h" |
13 |
|
|
#include "telemetry_aggregator_influx.h" |
14 |
|
|
#include "util/exception.h" |
15 |
|
|
#include "util/logging.h" |
16 |
|
|
#include "util/platform.h" |
17 |
|
|
#include "util/pointer.h" |
18 |
|
|
#include "util/posix.h" |
19 |
|
|
namespace perf { |
20 |
|
|
|
21 |
|
✗ |
TelemetryAggregator *TelemetryAggregator::Create(Statistics *statistics, |
22 |
|
|
int send_rate, |
23 |
|
|
OptionsManager *options_mgr, |
24 |
|
|
MountPoint *mount_point, |
25 |
|
|
const std::string &fqrn, |
26 |
|
|
const TelemetrySelector type) { |
27 |
|
✗ |
UniquePtr<TelemetryAggregatorInflux> telemetryInflux; |
28 |
|
|
UniquePtr<TelemetryAggregator> *telemetry; |
29 |
|
|
|
30 |
|
✗ |
switch (type) { |
31 |
|
✗ |
case kTelemetryInflux: |
32 |
|
|
telemetryInflux = new TelemetryAggregatorInflux( |
33 |
|
✗ |
statistics, send_rate, options_mgr, mount_point, fqrn); |
34 |
|
✗ |
telemetry = reinterpret_cast<UniquePtr<TelemetryAggregator> *>( |
35 |
|
|
&telemetryInflux); |
36 |
|
✗ |
break; |
37 |
|
✗ |
default: |
38 |
|
✗ |
LogCvmfs(kLogTelemetry, kLogDebug, |
39 |
|
|
"No implementation available for given telemetry class."); |
40 |
|
✗ |
return NULL; |
41 |
|
|
break; |
42 |
|
|
} |
43 |
|
|
|
44 |
|
✗ |
if (telemetry->weak_ref()->is_zombie_) { |
45 |
|
✗ |
LogCvmfs(kLogTelemetry, kLogDebug | kLogSyslogErr, |
46 |
|
|
"Requested telemetry will NOT be used. " |
47 |
|
|
"It was not constructed correctly."); |
48 |
|
✗ |
return NULL; |
49 |
|
|
} |
50 |
|
|
|
51 |
|
✗ |
LogCvmfs(kLogTelemetry, kLogDebug, "TelemetryAggregator created."); |
52 |
|
✗ |
return telemetry->Release(); |
53 |
|
|
} |
54 |
|
|
|
55 |
|
40 |
TelemetryAggregator::~TelemetryAggregator() { |
56 |
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20 times.
|
40 |
if (pipe_terminate_[1] >= 0) { |
57 |
|
✗ |
char t = 'T'; |
58 |
|
✗ |
WritePipe(pipe_terminate_[1], &t, 1); |
59 |
|
✗ |
pthread_join(thread_telemetry_, NULL); |
60 |
|
✗ |
ClosePipe(pipe_terminate_); |
61 |
|
|
} |
62 |
|
|
} |
63 |
|
|
|
64 |
|
✗ |
void TelemetryAggregator::Spawn() { |
65 |
|
✗ |
assert(pipe_terminate_[0] == -1); |
66 |
|
✗ |
assert(send_rate_sec_ > 0); |
67 |
|
✗ |
MakePipe(pipe_terminate_); |
68 |
|
|
const int retval = |
69 |
|
✗ |
pthread_create(&thread_telemetry_, NULL, MainTelemetry, this); |
70 |
|
✗ |
assert(retval == 0); |
71 |
|
✗ |
LogCvmfs(kLogTelemetry, kLogDebug, "Spawning of telemetry thread."); |
72 |
|
|
} |
73 |
|
|
|
74 |
|
✗ |
void TelemetryAggregator::ManuallyUpdateSelectedCounters() { |
75 |
|
✗ |
if (!mount_point_) { |
76 |
|
✗ |
return; |
77 |
|
|
} |
78 |
|
|
|
79 |
|
|
// Manually setting the inode tracker numbers |
80 |
|
✗ |
glue::InodeTracker::Statistics inode_stats = mount_point_->inode_tracker() |
81 |
|
✗ |
->GetStatistics(); |
82 |
|
|
const glue::DentryTracker::Statistics dentry_stats = |
83 |
|
✗ |
mount_point_->dentry_tracker()->GetStatistics(); |
84 |
|
|
const glue::PageCacheTracker::Statistics page_cache_stats = |
85 |
|
✗ |
mount_point_->page_cache_tracker()->GetStatistics(); |
86 |
|
✗ |
mount_point_->statistics() |
87 |
|
|
->Lookup("inode_tracker.n_insert") |
88 |
|
✗ |
->Set(atomic_read64(&inode_stats.num_inserts)); |
89 |
|
✗ |
mount_point_->statistics() |
90 |
|
|
->Lookup("inode_tracker.n_remove") |
91 |
|
✗ |
->Set(atomic_read64(&inode_stats.num_removes)); |
92 |
|
✗ |
mount_point_->statistics() |
93 |
|
|
->Lookup("inode_tracker.no_reference") |
94 |
|
✗ |
->Set(atomic_read64(&inode_stats.num_references)); |
95 |
|
✗ |
mount_point_->statistics() |
96 |
|
|
->Lookup("inode_tracker.n_hit_inode") |
97 |
|
✗ |
->Set(atomic_read64(&inode_stats.num_hits_inode)); |
98 |
|
✗ |
mount_point_->statistics() |
99 |
|
|
->Lookup("inode_tracker.n_hit_path") |
100 |
|
✗ |
->Set(atomic_read64(&inode_stats.num_hits_path)); |
101 |
|
✗ |
mount_point_->statistics() |
102 |
|
|
->Lookup("inode_tracker.n_miss_path") |
103 |
|
✗ |
->Set(atomic_read64(&inode_stats.num_misses_path)); |
104 |
|
✗ |
mount_point_->statistics() |
105 |
|
|
->Lookup("dentry_tracker.n_insert") |
106 |
|
✗ |
->Set(dentry_stats.num_insert); |
107 |
|
✗ |
mount_point_->statistics() |
108 |
|
|
->Lookup("dentry_tracker.n_remove") |
109 |
|
✗ |
->Set(dentry_stats.num_remove); |
110 |
|
✗ |
mount_point_->statistics() |
111 |
|
|
->Lookup("dentry_tracker.n_prune") |
112 |
|
✗ |
->Set(dentry_stats.num_prune); |
113 |
|
✗ |
mount_point_->statistics() |
114 |
|
|
->Lookup("page_cache_tracker.n_insert") |
115 |
|
✗ |
->Set(page_cache_stats.n_insert); |
116 |
|
✗ |
mount_point_->statistics() |
117 |
|
|
->Lookup("page_cache_tracker.n_remove") |
118 |
|
✗ |
->Set(page_cache_stats.n_remove); |
119 |
|
✗ |
mount_point_->statistics() |
120 |
|
|
->Lookup("page_cache_tracker.n_open_direct") |
121 |
|
✗ |
->Set(page_cache_stats.n_open_direct); |
122 |
|
✗ |
mount_point_->statistics() |
123 |
|
|
->Lookup("page_cache_tracker.n_open_flush") |
124 |
|
✗ |
->Set(page_cache_stats.n_open_flush); |
125 |
|
✗ |
mount_point_->statistics() |
126 |
|
|
->Lookup("page_cache_tracker.n_open_cached") |
127 |
|
✗ |
->Set(page_cache_stats.n_open_cached); |
128 |
|
|
} |
129 |
|
|
|
130 |
|
✗ |
void *TelemetryAggregator::MainTelemetry(void *data) { |
131 |
|
✗ |
TelemetryAggregator *telemetry = reinterpret_cast<TelemetryAggregator *>( |
132 |
|
|
data); |
133 |
|
✗ |
Statistics *statistics = telemetry->statistics_; |
134 |
|
|
|
135 |
|
|
struct pollfd watch_term; |
136 |
|
✗ |
watch_term.fd = telemetry->pipe_terminate_[0]; |
137 |
|
✗ |
watch_term.events = POLLIN | POLLPRI; |
138 |
|
✗ |
int timeout_ms = telemetry->send_rate_sec_ * 1000; |
139 |
|
✗ |
uint64_t deadline_sec = platform_monotonic_time() + telemetry->send_rate_sec_; |
140 |
|
|
while (true) { |
141 |
|
|
// sleep and check if end - blocking wait for "send_rate_sec_" seconds |
142 |
|
✗ |
watch_term.revents = 0; |
143 |
|
✗ |
const int retval = poll(&watch_term, 1, timeout_ms); |
144 |
|
✗ |
if (retval < 0) { |
145 |
|
✗ |
if (errno == EINTR) { // external interrupt occurred - no error for us |
146 |
|
✗ |
if (timeout_ms >= 0) { |
147 |
|
✗ |
const uint64_t now = platform_monotonic_time(); |
148 |
|
✗ |
timeout_ms = (now > deadline_sec) |
149 |
|
✗ |
? 0 |
150 |
|
✗ |
: static_cast<int>((deadline_sec - now) * 1000); |
151 |
|
|
} |
152 |
|
✗ |
continue; |
153 |
|
|
} |
154 |
|
✗ |
PANIC(kLogSyslogErr | kLogDebug, |
155 |
|
|
"Error in telemetry thread. " |
156 |
|
|
"Poll returned %d", |
157 |
|
|
retval); |
158 |
|
|
} |
159 |
|
|
|
160 |
|
|
// reset timeout and deadline of poll |
161 |
|
✗ |
timeout_ms = telemetry->send_rate_sec_ * 1000; |
162 |
|
✗ |
deadline_sec = platform_monotonic_time() + telemetry->send_rate_sec_; |
163 |
|
|
|
164 |
|
|
// aggregate + send stuff |
165 |
|
✗ |
if (retval == 0) { |
166 |
|
✗ |
telemetry->ManuallyUpdateSelectedCounters(); |
167 |
|
✗ |
statistics->SnapshotCounters(&telemetry->counters_, |
168 |
|
|
&telemetry->timestamp_); |
169 |
|
✗ |
telemetry->PushMetrics(); |
170 |
|
✗ |
continue; |
171 |
|
|
} |
172 |
|
|
|
173 |
|
|
// stop thread due to poll event |
174 |
|
✗ |
assert(watch_term.revents != 0); |
175 |
|
|
|
176 |
|
✗ |
char c = 0; |
177 |
|
✗ |
ReadPipe(telemetry->pipe_terminate_[0], &c, 1); |
178 |
|
✗ |
assert(c == 'T'); |
179 |
|
✗ |
break; |
180 |
|
|
} |
181 |
|
✗ |
LogCvmfs(kLogTelemetry, kLogDebug, "Stopping telemetry thread"); |
182 |
|
✗ |
return NULL; |
183 |
|
|
} |
184 |
|
|
|
185 |
|
|
} // namespace perf |
186 |
|
|
|