CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
telemetry_aggregator.cc
Go to the documentation of this file.
1 
5 #include "telemetry_aggregator.h"
6 
7 #include <errno.h>
8 #include <poll.h>
9 #include <unistd.h>
10 
11 #include "util/exception.h"
12 #include "util/logging.h"
13 #include "util/platform.h"
14 #include "util/pointer.h"
15 #include "util/posix.h"
16 
18 namespace perf {
19 
21  int send_rate,
22  OptionsManager *options_mgr,
23  const std::string &fqrn,
24  const TelemetrySelector type) {
27 
28  switch (type) {
29  case kTelemetryInflux:
30  telemetryInflux = new TelemetryAggregatorInflux(statistics, send_rate,
31  options_mgr, fqrn);
32  telemetry = reinterpret_cast<UniquePtr<TelemetryAggregator>*>
33  (&telemetryInflux);
34  break;
35  default:
37  "No implementation available for given telemetry class.");
38  return NULL;
39  break;
40  }
41 
42  if (telemetry->weak_ref()->is_zombie_) {
44  "Requested telemetry will NOT be used. "
45  "It was not constructed correctly.");
46  return NULL;
47  }
48 
49  LogCvmfs(kLogTelemetry, kLogDebug, "TelemetryAggregator created.");
50  return telemetry->Release();
51 }
52 
54  if (pipe_terminate_[1] >= 0) {
55  char t = 'T';
56  WritePipe(pipe_terminate_[1], &t, 1);
57  pthread_join(thread_telemetry_, NULL);
59  }
60 }
61 
63  assert(pipe_terminate_[0] == -1);
66  int retval = pthread_create(&thread_telemetry_, NULL, MainTelemetry, this);
67  assert(retval == 0);
68  LogCvmfs(kLogTelemetry, kLogDebug, "Spawning of telemetry thread.");
69 }
70 
72  TelemetryAggregator *telemetry = reinterpret_cast<TelemetryAggregator*>(data);
73  Statistics *statistics = telemetry->statistics_;
74 
75  struct pollfd watch_term;
76  watch_term.fd = telemetry->pipe_terminate_[0];
77  watch_term.events = POLLIN | POLLPRI;
78  int timeout_ms = telemetry->send_rate_sec_ * 1000;
79  uint64_t deadline_sec = platform_monotonic_time()
80  + telemetry->send_rate_sec_;
81  while (true) {
82  // sleep and check if end - blocking wait for "send_rate_sec_" seconds
83  watch_term.revents = 0;
84  int retval = poll(&watch_term, 1, timeout_ms);
85  if (retval < 0) {
86  if (errno == EINTR) { // external interrupt occurred - no error for us
87  if (timeout_ms >= 0) {
88  uint64_t now = platform_monotonic_time();
89  timeout_ms = (now > deadline_sec) ? 0 :
90  static_cast<int>((deadline_sec - now) * 1000);
91  }
92  continue;
93  }
94  PANIC(kLogSyslogErr | kLogDebug, "Error in telemetry thread. "
95  "Poll returned %d", retval);
96  }
97 
98  // reset timeout and deadline of poll
99  timeout_ms = telemetry->send_rate_sec_ * 1000;
100  deadline_sec = platform_monotonic_time() + telemetry->send_rate_sec_;
101 
102  // aggregate + send stuff
103  if (retval == 0) {
104  statistics->SnapshotCounters(&telemetry->counters_,
105  &telemetry->timestamp_);
106  telemetry->PushMetrics();
107  continue;
108  }
109 
110  // stop thread due to poll event
111  assert(watch_term.revents != 0);
112 
113  char c = 0;
114  ReadPipe(telemetry->pipe_terminate_[0], &c, 1);
115  assert(c == 'T');
116  break;
117  }
118  LogCvmfs(kLogTelemetry, kLogDebug, "Stopping telemetry thread");
119  return NULL;
120 }
121 
122 } // namespace perf
#define PANIC(...)
Definition: exception.h:29
virtual void PushMetrics()=0
assert((mem||(size==0))&&"Out Of Memory")
void MakePipe(int pipe_fd[2])
Definition: posix.cc:492
static TelemetryAggregator * Create(Statistics *statistics, int send_rate, OptionsManager *options_mgr, const std::string &fqrn, const TelemetrySelector type)
void SnapshotCounters(std::map< std::string, int64_t > *counters, uint64_t *timestamp_ns)
Definition: statistics.cc:147
std::map< std::string, int64_t > counters_
uint64_t platform_monotonic_time()
static void * MainTelemetry(void *data)
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:501
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:513
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:562
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528