CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
telemetry_aggregator.cc
Go to the documentation of this file.
1 
5 #include "telemetry_aggregator.h"
6 
7 #include <errno.h>
8 #include <poll.h>
9 #include <unistd.h>
10 
11 #include "glue_buffer.h"
12 #include "shortstring.h"
14 #include "util/exception.h"
15 #include "util/logging.h"
16 #include "util/platform.h"
17 #include "util/pointer.h"
18 #include "util/posix.h"
19 namespace perf {
20 
22  int send_rate,
23  OptionsManager *options_mgr,
24  MountPoint *mount_point,
25  const std::string &fqrn,
26  const TelemetrySelector type) {
29 
30  switch (type) {
31  case kTelemetryInflux:
32  telemetryInflux = new TelemetryAggregatorInflux(
33  statistics, send_rate, options_mgr, mount_point, fqrn);
34  telemetry = reinterpret_cast<UniquePtr<TelemetryAggregator> *>(
35  &telemetryInflux);
36  break;
37  default:
39  "No implementation available for given telemetry class.");
40  return NULL;
41  break;
42  }
43 
44  if (telemetry->weak_ref()->is_zombie_) {
46  "Requested telemetry will NOT be used. "
47  "It was not constructed correctly.");
48  return NULL;
49  }
50 
51  LogCvmfs(kLogTelemetry, kLogDebug, "TelemetryAggregator created.");
52  return telemetry->Release();
53 }
54 
56  if (pipe_terminate_[1] >= 0) {
57  char t = 'T';
58  WritePipe(pipe_terminate_[1], &t, 1);
59  pthread_join(thread_telemetry_, NULL);
61  }
62 }
63 
65  assert(pipe_terminate_[0] == -1);
68  int retval = pthread_create(&thread_telemetry_, NULL, MainTelemetry, this);
69  assert(retval == 0);
70  LogCvmfs(kLogTelemetry, kLogDebug, "Spawning of telemetry thread.");
71 }
72 
74  if (!mount_point_) {
75  return;
76  }
77 
78  // Manually setting the inode tracker numbers
80  ->GetStatistics();
82  ->GetStatistics();
84  page_cache_stats = mount_point_->page_cache_tracker()->GetStatistics();
86  ->Lookup("inode_tracker.n_insert")
87  ->Set(atomic_read64(&inode_stats.num_inserts));
89  ->Lookup("inode_tracker.n_remove")
90  ->Set(atomic_read64(&inode_stats.num_removes));
92  ->Lookup("inode_tracker.no_reference")
93  ->Set(atomic_read64(&inode_stats.num_references));
95  ->Lookup("inode_tracker.n_hit_inode")
96  ->Set(atomic_read64(&inode_stats.num_hits_inode));
98  ->Lookup("inode_tracker.n_hit_path")
99  ->Set(atomic_read64(&inode_stats.num_hits_path));
101  ->Lookup("inode_tracker.n_miss_path")
102  ->Set(atomic_read64(&inode_stats.num_misses_path));
104  ->Lookup("dentry_tracker.n_insert")
105  ->Set(dentry_stats.num_insert);
107  ->Lookup("dentry_tracker.n_remove")
108  ->Set(dentry_stats.num_remove);
110  ->Lookup("dentry_tracker.n_prune")
111  ->Set(dentry_stats.num_prune);
113  ->Lookup("page_cache_tracker.n_insert")
114  ->Set(page_cache_stats.n_insert);
116  ->Lookup("page_cache_tracker.n_remove")
117  ->Set(page_cache_stats.n_remove);
119  ->Lookup("page_cache_tracker.n_open_direct")
120  ->Set(page_cache_stats.n_open_direct);
122  ->Lookup("page_cache_tracker.n_open_flush")
123  ->Set(page_cache_stats.n_open_flush);
125  ->Lookup("page_cache_tracker.n_open_cached")
126  ->Set(page_cache_stats.n_open_cached);
127 }
128 
130  TelemetryAggregator *telemetry = reinterpret_cast<TelemetryAggregator *>(
131  data);
132  Statistics *statistics = telemetry->statistics_;
133 
134  struct pollfd watch_term;
135  watch_term.fd = telemetry->pipe_terminate_[0];
136  watch_term.events = POLLIN | POLLPRI;
137  int timeout_ms = telemetry->send_rate_sec_ * 1000;
138  uint64_t deadline_sec = platform_monotonic_time() + telemetry->send_rate_sec_;
139  while (true) {
140  // sleep and check if end - blocking wait for "send_rate_sec_" seconds
141  watch_term.revents = 0;
142  int retval = poll(&watch_term, 1, timeout_ms);
143  if (retval < 0) {
144  if (errno == EINTR) { // external interrupt occurred - no error for us
145  if (timeout_ms >= 0) {
146  uint64_t now = platform_monotonic_time();
147  timeout_ms = (now > deadline_sec)
148  ? 0
149  : static_cast<int>((deadline_sec - now) * 1000);
150  }
151  continue;
152  }
154  "Error in telemetry thread. "
155  "Poll returned %d",
156  retval);
157  }
158 
159  // reset timeout and deadline of poll
160  timeout_ms = telemetry->send_rate_sec_ * 1000;
161  deadline_sec = platform_monotonic_time() + telemetry->send_rate_sec_;
162 
163  // aggregate + send stuff
164  if (retval == 0) {
165  telemetry->ManuallyUpdateSelectedCounters();
166  statistics->SnapshotCounters(&telemetry->counters_,
167  &telemetry->timestamp_);
168  telemetry->PushMetrics();
169  continue;
170  }
171 
172  // stop thread due to poll event
173  assert(watch_term.revents != 0);
174 
175  char c = 0;
176  ReadPipe(telemetry->pipe_terminate_[0], &c, 1);
177  assert(c == 'T');
178  break;
179  }
180  LogCvmfs(kLogTelemetry, kLogDebug, "Stopping telemetry thread");
181  return NULL;
182 }
183 
184 } // namespace perf
Statistics GetStatistics()
Definition: glue_buffer.h:647
perf::Statistics * statistics()
Definition: mountpoint.h:538
#define PANIC(...)
Definition: exception.h:29
virtual void PushMetrics()=0
assert((mem||(size==0))&&"Out Of Memory")
void MakePipe(int pipe_fd[2])
Definition: posix.cc:487
Counter * Lookup(const std::string &name) const
Definition: statistics.cc:63
glue::PageCacheTracker * page_cache_tracker()
Definition: mountpoint.h:534
void Set(const int64_t val)
Definition: statistics.h:33
static TelemetryAggregator * Create(Statistics *statistics, int send_rate, OptionsManager *options_mgr, MountPoint *mount_point, const std::string &fqrn, const TelemetrySelector type)
void SnapshotCounters(std::map< std::string, int64_t > *counters, uint64_t *timestamp_ns)
Definition: statistics.cc:150
Statistics GetStatistics()
Definition: glue_buffer.h:989
Statistics GetStatistics()
Definition: glue_buffer.h:819
std::map< std::string, int64_t > counters_
uint64_t platform_monotonic_time()
glue::DentryTracker * dentry_tracker()
Definition: mountpoint.h:533
static void * MainTelemetry(void *data)
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:496
glue::InodeTracker * inode_tracker()
Definition: mountpoint.h:528
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:508
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:559
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545