CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
telemetry_aggregator.cc
Go to the documentation of this file.
1 
5 #include "telemetry_aggregator.h"
6 
7 #include <errno.h>
8 #include <poll.h>
9 #include <unistd.h>
10 
11 #include "glue_buffer.h"
12 #include "shortstring.h"
14 #include "util/exception.h"
15 #include "util/logging.h"
16 #include "util/platform.h"
17 #include "util/pointer.h"
18 #include "util/posix.h"
19 namespace perf {
20 
22  int send_rate,
23  OptionsManager *options_mgr,
24  MountPoint *mount_point,
25  const std::string &fqrn,
26  const TelemetrySelector type) {
29 
30  switch (type) {
31  case kTelemetryInflux:
32  telemetryInflux = new TelemetryAggregatorInflux(
33  statistics, send_rate, options_mgr, mount_point, fqrn);
34  telemetry = reinterpret_cast<UniquePtr<TelemetryAggregator> *>(
35  &telemetryInflux);
36  break;
37  default:
39  "No implementation available for given telemetry class.");
40  return NULL;
41  break;
42  }
43 
44  if (telemetry->weak_ref()->is_zombie_) {
46  "Requested telemetry will NOT be used. "
47  "It was not constructed correctly.");
48  return NULL;
49  }
50 
51  LogCvmfs(kLogTelemetry, kLogDebug, "TelemetryAggregator created.");
52  return telemetry->Release();
53 }
54 
56  if (pipe_terminate_[1] >= 0) {
57  char t = 'T';
58  WritePipe(pipe_terminate_[1], &t, 1);
59  pthread_join(thread_telemetry_, NULL);
61  }
62 }
63 
65  assert(pipe_terminate_[0] == -1);
68  const int retval =
69  pthread_create(&thread_telemetry_, NULL, MainTelemetry, this);
70  assert(retval == 0);
71  LogCvmfs(kLogTelemetry, kLogDebug, "Spawning of telemetry thread.");
72 }
73 
75  if (!mount_point_) {
76  return;
77  }
78 
79  // Manually setting the inode tracker numbers
81  ->GetStatistics();
82  const glue::DentryTracker::Statistics dentry_stats =
84  const glue::PageCacheTracker::Statistics page_cache_stats =
87  ->Lookup("inode_tracker.n_insert")
88  ->Set(atomic_read64(&inode_stats.num_inserts));
90  ->Lookup("inode_tracker.n_remove")
91  ->Set(atomic_read64(&inode_stats.num_removes));
93  ->Lookup("inode_tracker.no_reference")
94  ->Set(atomic_read64(&inode_stats.num_references));
96  ->Lookup("inode_tracker.n_hit_inode")
97  ->Set(atomic_read64(&inode_stats.num_hits_inode));
99  ->Lookup("inode_tracker.n_hit_path")
100  ->Set(atomic_read64(&inode_stats.num_hits_path));
102  ->Lookup("inode_tracker.n_miss_path")
103  ->Set(atomic_read64(&inode_stats.num_misses_path));
105  ->Lookup("dentry_tracker.n_insert")
106  ->Set(dentry_stats.num_insert);
108  ->Lookup("dentry_tracker.n_remove")
109  ->Set(dentry_stats.num_remove);
111  ->Lookup("dentry_tracker.n_prune")
112  ->Set(dentry_stats.num_prune);
114  ->Lookup("page_cache_tracker.n_insert")
115  ->Set(page_cache_stats.n_insert);
117  ->Lookup("page_cache_tracker.n_remove")
118  ->Set(page_cache_stats.n_remove);
120  ->Lookup("page_cache_tracker.n_open_direct")
121  ->Set(page_cache_stats.n_open_direct);
123  ->Lookup("page_cache_tracker.n_open_flush")
124  ->Set(page_cache_stats.n_open_flush);
126  ->Lookup("page_cache_tracker.n_open_cached")
127  ->Set(page_cache_stats.n_open_cached);
128 }
129 
131  TelemetryAggregator *telemetry = reinterpret_cast<TelemetryAggregator *>(
132  data);
133  Statistics *statistics = telemetry->statistics_;
134 
135  struct pollfd watch_term;
136  watch_term.fd = telemetry->pipe_terminate_[0];
137  watch_term.events = POLLIN | POLLPRI;
138  int timeout_ms = telemetry->send_rate_sec_ * 1000;
139  uint64_t deadline_sec = platform_monotonic_time() + telemetry->send_rate_sec_;
140  while (true) {
141  // sleep and check if end - blocking wait for "send_rate_sec_" seconds
142  watch_term.revents = 0;
143  const int retval = poll(&watch_term, 1, timeout_ms);
144  if (retval < 0) {
145  if (errno == EINTR) { // external interrupt occurred - no error for us
146  if (timeout_ms >= 0) {
147  const uint64_t now = platform_monotonic_time();
148  timeout_ms = (now > deadline_sec)
149  ? 0
150  : static_cast<int>((deadline_sec - now) * 1000);
151  }
152  continue;
153  }
155  "Error in telemetry thread. "
156  "Poll returned %d",
157  retval);
158  }
159 
160  // reset timeout and deadline of poll
161  timeout_ms = telemetry->send_rate_sec_ * 1000;
162  deadline_sec = platform_monotonic_time() + telemetry->send_rate_sec_;
163 
164  // aggregate + send stuff
165  if (retval == 0) {
166  telemetry->ManuallyUpdateSelectedCounters();
167  statistics->SnapshotCounters(&telemetry->counters_,
168  &telemetry->timestamp_);
169  telemetry->PushMetrics();
170  continue;
171  }
172 
173  // stop thread due to poll event
174  assert(watch_term.revents != 0);
175 
176  char c = 0;
177  ReadPipe(telemetry->pipe_terminate_[0], &c, 1);
178  assert(c == 'T');
179  break;
180  }
181  LogCvmfs(kLogTelemetry, kLogDebug, "Stopping telemetry thread");
182  return NULL;
183 }
184 
185 } // namespace perf
Statistics GetStatistics()
Definition: glue_buffer.h:648
perf::Statistics * statistics()
Definition: mountpoint.h:538
#define PANIC(...)
Definition: exception.h:29
virtual void PushMetrics()=0
assert((mem||(size==0))&&"Out Of Memory")
void MakePipe(int pipe_fd[2])
Definition: posix.cc:487
Counter * Lookup(const std::string &name) const
Definition: statistics.cc:63
glue::PageCacheTracker * page_cache_tracker()
Definition: mountpoint.h:534
void Set(const int64_t val)
Definition: statistics.h:33
static TelemetryAggregator * Create(Statistics *statistics, int send_rate, OptionsManager *options_mgr, MountPoint *mount_point, const std::string &fqrn, const TelemetrySelector type)
void SnapshotCounters(std::map< std::string, int64_t > *counters, uint64_t *timestamp_ns)
Definition: statistics.cc:150
Statistics GetStatistics()
Definition: glue_buffer.h:990
Statistics GetStatistics()
Definition: glue_buffer.h:820
std::map< std::string, int64_t > counters_
uint64_t platform_monotonic_time()
glue::DentryTracker * dentry_tracker()
Definition: mountpoint.h:533
static void * MainTelemetry(void *data)
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:496
glue::InodeTracker * inode_tracker()
Definition: mountpoint.h:528
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:508
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:559
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545