CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
telemetry_aggregator.cc
Go to the documentation of this file.
1 
5 #include "telemetry_aggregator.h"
6 
7 #include <errno.h>
8 #include <poll.h>
9 #include <unistd.h>
10 
11 #include "glue_buffer.h"
12 #include "shortstring.h"
13 #include "util/exception.h"
14 #include "util/logging.h"
15 #include "util/platform.h"
16 #include "util/pointer.h"
17 #include "util/posix.h"
18 
20 namespace perf {
21 
23  int send_rate,
24  OptionsManager *options_mgr,
25  MountPoint* mount_point,
26  const std::string &fqrn,
27  const TelemetrySelector type) {
30 
31  switch (type) {
32  case kTelemetryInflux:
33  telemetryInflux = new TelemetryAggregatorInflux(statistics, send_rate,
34  options_mgr, mount_point, fqrn);
35  telemetry = reinterpret_cast<UniquePtr<TelemetryAggregator>*>
36  (&telemetryInflux);
37  break;
38  default:
40  "No implementation available for given telemetry class.");
41  return NULL;
42  break;
43  }
44 
45  if (telemetry->weak_ref()->is_zombie_) {
47  "Requested telemetry will NOT be used. "
48  "It was not constructed correctly.");
49  return NULL;
50  }
51 
52  LogCvmfs(kLogTelemetry, kLogDebug, "TelemetryAggregator created.");
53  return telemetry->Release();
54 }
55 
57  if (pipe_terminate_[1] >= 0) {
58  char t = 'T';
59  WritePipe(pipe_terminate_[1], &t, 1);
60  pthread_join(thread_telemetry_, NULL);
62  }
63 }
64 
66  assert(pipe_terminate_[0] == -1);
69  int retval = pthread_create(&thread_telemetry_, NULL, MainTelemetry, this);
70  assert(retval == 0);
71  LogCvmfs(kLogTelemetry, kLogDebug, "Spawning of telemetry thread.");
72 }
73 
75  if (!mount_point_) {
76  return;
77  }
78 
79  // Manually setting the inode tracker numbers
80  glue::InodeTracker::Statistics inode_stats =
82  glue::DentryTracker::Statistics dentry_stats =
84  glue::PageCacheTracker::Statistics page_cache_stats =
86  mount_point_->statistics()->Lookup("inode_tracker.n_insert")->
87  Set(atomic_read64(&inode_stats.num_inserts));
88  mount_point_->statistics()->Lookup("inode_tracker.n_remove")->
89  Set(atomic_read64(&inode_stats.num_removes));
90  mount_point_->statistics()->Lookup("inode_tracker.no_reference")->
91  Set(atomic_read64(&inode_stats.num_references));
92  mount_point_->statistics()->Lookup("inode_tracker.n_hit_inode")->
93  Set(atomic_read64(&inode_stats.num_hits_inode));
94  mount_point_->statistics()->Lookup("inode_tracker.n_hit_path")->
95  Set(atomic_read64(&inode_stats.num_hits_path));
96  mount_point_->statistics()->Lookup("inode_tracker.n_miss_path")->
97  Set(atomic_read64(&inode_stats.num_misses_path));
98  mount_point_->statistics()->Lookup("dentry_tracker.n_insert")->
99  Set(dentry_stats.num_insert);
100  mount_point_->statistics()->Lookup("dentry_tracker.n_remove")->
101  Set(dentry_stats.num_remove);
102  mount_point_->statistics()->Lookup("dentry_tracker.n_prune")->
103  Set(dentry_stats.num_prune);
104  mount_point_->statistics()->Lookup("page_cache_tracker.n_insert")->
105  Set(page_cache_stats.n_insert);
106  mount_point_->statistics()->Lookup("page_cache_tracker.n_remove")->
107  Set(page_cache_stats.n_remove);
108  mount_point_->statistics()->Lookup("page_cache_tracker.n_open_direct")->
109  Set(page_cache_stats.n_open_direct);
110  mount_point_->statistics()->Lookup("page_cache_tracker.n_open_flush")->
111  Set(page_cache_stats.n_open_flush);
112  mount_point_->statistics()->Lookup("page_cache_tracker.n_open_cached")->
113  Set(page_cache_stats.n_open_cached);
114 }
115 
117  TelemetryAggregator *telemetry = reinterpret_cast<TelemetryAggregator*>(data);
118  Statistics *statistics = telemetry->statistics_;
119 
120  struct pollfd watch_term;
121  watch_term.fd = telemetry->pipe_terminate_[0];
122  watch_term.events = POLLIN | POLLPRI;
123  int timeout_ms = telemetry->send_rate_sec_ * 1000;
124  uint64_t deadline_sec = platform_monotonic_time()
125  + telemetry->send_rate_sec_;
126  while (true) {
127  // sleep and check if end - blocking wait for "send_rate_sec_" seconds
128  watch_term.revents = 0;
129  int retval = poll(&watch_term, 1, timeout_ms);
130  if (retval < 0) {
131  if (errno == EINTR) { // external interrupt occurred - no error for us
132  if (timeout_ms >= 0) {
133  uint64_t now = platform_monotonic_time();
134  timeout_ms = (now > deadline_sec) ? 0 :
135  static_cast<int>((deadline_sec - now) * 1000);
136  }
137  continue;
138  }
139  PANIC(kLogSyslogErr | kLogDebug, "Error in telemetry thread. "
140  "Poll returned %d", retval);
141  }
142 
143  // reset timeout and deadline of poll
144  timeout_ms = telemetry->send_rate_sec_ * 1000;
145  deadline_sec = platform_monotonic_time() + telemetry->send_rate_sec_;
146 
147  // aggregate + send stuff
148  if (retval == 0) {
149  telemetry->ManuallyUpdateSelectedCounters();
150  statistics->SnapshotCounters(&telemetry->counters_,
151  &telemetry->timestamp_);
152  telemetry->PushMetrics();
153  continue;
154  }
155 
156  // stop thread due to poll event
157  assert(watch_term.revents != 0);
158 
159  char c = 0;
160  ReadPipe(telemetry->pipe_terminate_[0], &c, 1);
161  assert(c == 'T');
162  break;
163  }
164  LogCvmfs(kLogTelemetry, kLogDebug, "Stopping telemetry thread");
165  return NULL;
166 }
167 
168 } // namespace perf
Statistics GetStatistics()
Definition: glue_buffer.h:674
perf::Statistics * statistics()
Definition: mountpoint.h:536
#define PANIC(...)
Definition: exception.h:29
virtual void PushMetrics()=0
assert((mem||(size==0))&&"Out Of Memory")
void MakePipe(int pipe_fd[2])
Definition: posix.cc:492
Counter * Lookup(const std::string &name) const
Definition: statistics.cc:62
glue::PageCacheTracker * page_cache_tracker()
Definition: mountpoint.h:532
static TelemetryAggregator * Create(Statistics *statistics, int send_rate, OptionsManager *options_mgr, MountPoint *mount_point, const std::string &fqrn, const TelemetrySelector type)
void SnapshotCounters(std::map< std::string, int64_t > *counters, uint64_t *timestamp_ns)
Definition: statistics.cc:147
Statistics GetStatistics()
Definition: glue_buffer.h:1020
Statistics GetStatistics()
Definition: glue_buffer.h:851
std::map< std::string, int64_t > counters_
uint64_t platform_monotonic_time()
glue::DentryTracker * dentry_tracker()
Definition: mountpoint.h:531
static void * MainTelemetry(void *data)
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:501
glue::InodeTracker * inode_tracker()
Definition: mountpoint.h:526
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:513
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:562
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528