CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
tracer.cc
Go to the documentation of this file.
1 
6 #include "tracer.h"
7 
8 #include <pthread.h>
9 
10 #include <cassert>
11 #include <cerrno>
12 #include <cstdio>
13 #include <cstdlib>
14 #include <cstring>
15 #include <string>
16 
17 #include "util/atomic.h"
18 #include "util/concurrency.h"
19 #include "util/posix.h"
20 #include "util/string.h"
21 
22 using namespace std; // NOLINT
23 
24 
25 void Tracer::Activate(const int buffer_size,
26  const int flush_threshold,
27  const string &trace_file) {
28  trace_file_ = trace_file;
29  buffer_size_ = buffer_size;
30  flush_threshold_ = flush_threshold;
31  assert(buffer_size_ > 1 && flush_threshold_ >= 0
32  && flush_threshold_ < buffer_size_);
33 
34  ring_buffer_ = new BufferEntry[buffer_size_];
35  commit_buffer_ = new atomic_int32[buffer_size_];
36  for (int i = 0; i < buffer_size_; i++)
37  atomic_init32(&commit_buffer_[i]);
38 
39  int retval;
40  retval = pthread_cond_init(&sig_continue_trace_, NULL);
41  retval |= pthread_mutex_init(&sig_continue_trace_mutex_, NULL);
42  retval |= pthread_cond_init(&sig_flush_, NULL);
43  retval |= pthread_mutex_init(&sig_flush_mutex_, NULL);
44  assert(retval == 0);
45 
46  active_ = true;
47 }
48 
49 
67 int32_t Tracer::DoTrace(const int event,
68  const PathString &path,
69  const string &msg) {
70  const int32_t my_seq_no = atomic_xadd32(&seq_no_, 1);
71  timeval now;
72  gettimeofday(&now, NULL);
73  const int pos = my_seq_no % buffer_size_;
74 
75  while (my_seq_no - atomic_read32(&flushed_) >= buffer_size_) {
76  timespec timeout;
77  int retval;
78  GetTimespecRel(25, &timeout);
79  retval = pthread_mutex_lock(&sig_continue_trace_mutex_);
80  retval |= pthread_cond_timedwait(&sig_continue_trace_,
81  &sig_continue_trace_mutex_, &timeout);
82  retval |= pthread_mutex_unlock(&sig_continue_trace_mutex_);
83  assert(retval == ETIMEDOUT || retval == 0);
84  }
85 
86  ring_buffer_[pos].time_stamp = now;
87  ring_buffer_[pos].code = event;
88  ring_buffer_[pos].path = path;
89  ring_buffer_[pos].msg = msg;
90  atomic_inc32(&commit_buffer_[pos]);
91 
92  if (my_seq_no - atomic_read32(&flushed_) == flush_threshold_) {
93  const MutexLockGuard m(&sig_flush_mutex_);
94  const int err_code __attribute__((unused)) =
95  pthread_cond_signal(&sig_flush_);
96  assert(err_code == 0 && "Could not signal flush thread");
97  }
98 
99  return my_seq_no;
100 }
101 
102 
104  if (!active_)
105  return;
106 
107  const int32_t save_seq_no =
108  DoTrace(kEventFlush, PathString("Tracer", 6), "flushed ring buffer");
109  while (atomic_read32(&flushed_) <= save_seq_no) {
110  timespec timeout;
111  int retval;
112 
113  atomic_cas32(&flush_immediately_, 0, 1);
114  {
115  const MutexLockGuard m(&sig_flush_mutex_);
116  retval = pthread_cond_signal(&sig_flush_);
117  assert(retval == 0);
118  }
119 
120  GetTimespecRel(250, &timeout);
121  retval = pthread_mutex_lock(&sig_continue_trace_mutex_);
122  retval |= pthread_cond_timedwait(
123  &sig_continue_trace_, &sig_continue_trace_mutex_, &timeout);
124  retval |= pthread_mutex_unlock(&sig_continue_trace_mutex_);
125  assert(retval == ETIMEDOUT || retval == 0);
126  }
127 }
128 
129 
130 void Tracer::GetTimespecRel(const int64_t ms, timespec *ts) {
131  timeval now;
132  gettimeofday(&now, NULL);
133  int64_t nsecs = now.tv_usec * 1000 + (ms % 1000) * 1000 * 1000;
134  int carry = 0;
135  if (nsecs >= 1000 * 1000 * 1000) {
136  carry = 1;
137  nsecs -= 1000 * 1000 * 1000;
138  }
139  ts->tv_sec = now.tv_sec + ms / 1000 + carry;
140  ts->tv_nsec = nsecs;
141 }
142 
143 
144 void *Tracer::MainFlush(void *data) {
145  Tracer *tracer = reinterpret_cast<Tracer *>(data);
146  int retval;
147  const MutexLockGuard m(&tracer->sig_flush_mutex_);
148  FILE *f = fopen(tracer->trace_file_.c_str(), "a");
149  assert(f != NULL && "Could not open trace file");
150  struct timespec timeout;
151 
152  do {
153  while (
154  (atomic_read32(&tracer->terminate_flush_thread_) == 0)
155  && (atomic_read32(&tracer->flush_immediately_) == 0)
156  && (atomic_read32(&tracer->seq_no_) - atomic_read32(&tracer->flushed_)
157  <= tracer->flush_threshold_)) {
158  tracer->GetTimespecRel(2000, &timeout);
159  retval = pthread_cond_timedwait(
160  &tracer->sig_flush_, &tracer->sig_flush_mutex_, &timeout);
161  assert(retval != EINVAL);
162  }
163 
164  const int base = atomic_read32(&tracer->flushed_) % tracer->buffer_size_;
165  int pos, i = 0;
166  while ((i <= tracer->flush_threshold_)
167  && (atomic_read32(
168  &tracer->commit_buffer_[pos = ((base + i)
169  % tracer->buffer_size_)])
170  == 1)) {
171  string tmp;
172  tmp = StringifyTimeval(tracer->ring_buffer_[pos].time_stamp);
173  retval = tracer->WriteCsvFile(f, tmp);
174  retval |= fputc(',', f) - ',';
175  tmp = StringifyInt(tracer->ring_buffer_[pos].code);
176  retval = tracer->WriteCsvFile(f, tmp);
177  retval |= fputc(',', f) - ',';
178  retval |= tracer->WriteCsvFile(f,
179  tracer->ring_buffer_[pos].path.ToString());
180  retval |= fputc(',', f) - ',';
181  retval |= tracer->WriteCsvFile(f, tracer->ring_buffer_[pos].msg);
182  retval |= (fputc(13, f) - 13) | (fputc(10, f) - 10);
183  assert(retval == 0);
184 
185  atomic_dec32(&tracer->commit_buffer_[pos]);
186  ++i;
187  }
188  retval = fflush(f);
189  assert(retval == 0);
190  atomic_xadd32(&tracer->flushed_, i);
191  atomic_cas32(&tracer->flush_immediately_, 1, 0);
192 
193  {
194  const MutexLockGuard l(&tracer->sig_continue_trace_mutex_);
195  retval = pthread_cond_broadcast(&tracer->sig_continue_trace_);
196  assert(retval == 0);
197  }
198  } while (
199  (atomic_read32(&tracer->terminate_flush_thread_) == 0)
200  || (atomic_read32(&tracer->flushed_) < atomic_read32(&tracer->seq_no_)));
201 
202  retval = fclose(f);
203  assert(retval == 0);
204  return NULL;
205 }
206 
207 
209  if (active_) {
210  const int retval = pthread_create(&thread_flush_, NULL, MainFlush, this);
211  assert(retval == 0);
212 
213  spawned_ = true;
214  DoTrace(kEventStart, PathString("Tracer", 6), "Trace buffer created");
215  }
216 }
217 
218 
220  : active_(false)
221  , spawned_(false)
222  , buffer_size_(0)
223  , flush_threshold_(0)
224  , ring_buffer_(NULL)
225  , commit_buffer_(NULL) {
226  memset(&thread_flush_, 0, sizeof(thread_flush_));
227  atomic_init32(&seq_no_);
228  atomic_init32(&flushed_);
229  atomic_init32(&terminate_flush_thread_);
230  atomic_init32(&flush_immediately_);
231 }
232 
233 
235  if (!active_)
236  return;
237  int retval;
238 
239  if (spawned_) {
240  DoTrace(kEventStop, PathString("Tracer", 6), "Destroying trace buffer...");
241 
242  // Trigger flushing and wait for it
243  atomic_inc32(&terminate_flush_thread_);
244  {
246  retval = pthread_cond_signal(&sig_flush_);
247  assert(retval == 0);
248  }
249  retval = pthread_join(thread_flush_, NULL);
250  assert(retval == 0);
251  }
252 
253  retval = pthread_cond_destroy(&sig_continue_trace_);
254  retval |= pthread_mutex_destroy(&sig_continue_trace_mutex_);
255  retval |= pthread_cond_destroy(&sig_flush_);
256  retval |= pthread_mutex_destroy(&sig_flush_mutex_);
257  assert(retval == 0);
258 
259  delete[] ring_buffer_;
260  delete[] commit_buffer_;
261 }
262 
263 
264 int Tracer::WriteCsvFile(FILE *fp, const string &field) {
265  if (fp == NULL)
266  return 0;
267 
268  int retval;
269 
270  if ((retval = fputc('"', fp)) != '"')
271  return retval;
272 
273  for (unsigned i = 0, l = field.length(); i < l; ++i) {
274  if (field[i] == '"') {
275  if ((retval = fputc('"', fp)) != '"')
276  return retval;
277  }
278  if ((retval = fputc(field[i], fp)) != field[i])
279  return retval;
280  }
281 
282  if ((retval = fputc('"', fp)) != '"')
283  return retval;
284 
285  return 0;
286 }
timeval time_stamp
Definition: tracer.h:85
Tracer()
Definition: tracer.cc:219
PathString path
Definition: tracer.h:90
void Activate(const int buffer_size, const int flush_threshold, const std::string &trace_file)
Definition: tracer.cc:25
void GetTimespecRel(const int64_t ms, timespec *ts)
Definition: tracer.cc:130
static void * MainFlush(void *data)
Definition: tracer.cc:144
pthread_mutex_t sig_flush_mutex_
Definition: tracer.h:115
atomic_int32 seq_no_
Definition: tracer.h:122
void Spawn()
Definition: tracer.cc:208
std::string msg
Definition: tracer.h:91
assert((mem||(size==0))&&"Out Of Memory")
int code
Definition: tracer.h:89
int WriteCsvFile(FILE *fp, const std::string &field)
Definition: tracer.cc:264
pthread_cond_t sig_flush_
Definition: tracer.h:114
pthread_cond_t sig_continue_trace_
Definition: tracer.h:116
struct cvmcache_object_info __attribute__
Definition: atomic.h:24
int buffer_size_
Definition: tracer.h:104
Definition: tracer.h:35
atomic_int32 flush_immediately_
Definition: tracer.h:129
int32_t atomic_int32
Definition: atomic.h:17
int flush_threshold_
Definition: tracer.h:105
void Flush()
Definition: tracer.cc:103
pthread_t thread_flush_
Definition: tracer.h:113
~Tracer()
Definition: tracer.cc:234
atomic_int32 terminate_flush_thread_
Definition: tracer.h:128
bool active_
Definition: tracer.h:101
string StringifyInt(const int64_t value)
Definition: string.cc:77
BufferEntry * ring_buffer_
Definition: tracer.h:106
std::string ToString() const
Definition: shortstring.h:139
ShortString< kDefaultMaxPath, 0 > PathString
Definition: shortstring.h:213
Definition: mutex.h:42
static const int kEventStop
Definition: tracer.h:72
Definition: tracer.h:81
pthread_mutex_t sig_continue_trace_mutex_
Definition: tracer.h:117
std::string trace_file_
Definition: tracer.h:103
void const PathString const std::string &msg DoTrace(event, path, msg)
string StringifyTimeval(const timeval value)
Definition: string.cc:195
atomic_int32 flushed_
Definition: tracer.h:127
bool spawned_
Definition: tracer.h:102
atomic_int32 * commit_buffer_
Definition: tracer.h:112