CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
tracer.cc
Go to the documentation of this file.
1 
6 #include "tracer.h"
7 
8 #include <pthread.h>
9 
10 #include <cassert>
11 #include <cerrno>
12 #include <cstdio>
13 #include <cstdlib>
14 #include <cstring>
15 #include <string>
16 
17 #include "util/atomic.h"
18 #include "util/concurrency.h"
19 #include "util/posix.h"
20 #include "util/string.h"
21 
22 using namespace std; // NOLINT
23 
24 
25 void Tracer::Activate(const int buffer_size,
26  const int flush_threshold,
27  const string &trace_file) {
28  trace_file_ = trace_file;
29  buffer_size_ = buffer_size;
30  flush_threshold_ = flush_threshold;
31  assert(buffer_size_ > 1 && flush_threshold_ >= 0
32  && flush_threshold_ < buffer_size_);
33 
34  ring_buffer_ = new BufferEntry[buffer_size_];
35  commit_buffer_ = new atomic_int32[buffer_size_];
36  for (int i = 0; i < buffer_size_; i++)
37  atomic_init32(&commit_buffer_[i]);
38 
39  int retval;
40  retval = pthread_cond_init(&sig_continue_trace_, NULL);
41  retval |= pthread_mutex_init(&sig_continue_trace_mutex_, NULL);
42  retval |= pthread_cond_init(&sig_flush_, NULL);
43  retval |= pthread_mutex_init(&sig_flush_mutex_, NULL);
44  assert(retval == 0);
45 
46  active_ = true;
47 }
48 
49 
67 int32_t Tracer::DoTrace(const int event,
68  const PathString &path,
69  const string &msg) {
70  int32_t my_seq_no = atomic_xadd32(&seq_no_, 1);
71  timeval now;
72  gettimeofday(&now, NULL);
73  int pos = my_seq_no % buffer_size_;
74 
75  while (my_seq_no - atomic_read32(&flushed_) >= buffer_size_) {
76  timespec timeout;
77  int retval;
78  GetTimespecRel(25, &timeout);
79  retval = pthread_mutex_lock(&sig_continue_trace_mutex_);
80  retval |= pthread_cond_timedwait(&sig_continue_trace_,
81  &sig_continue_trace_mutex_, &timeout);
82  retval |= pthread_mutex_unlock(&sig_continue_trace_mutex_);
83  assert(retval == ETIMEDOUT || retval == 0);
84  }
85 
86  ring_buffer_[pos].time_stamp = now;
87  ring_buffer_[pos].code = event;
88  ring_buffer_[pos].path = path;
89  ring_buffer_[pos].msg = msg;
90  atomic_inc32(&commit_buffer_[pos]);
91 
92  if (my_seq_no - atomic_read32(&flushed_) == flush_threshold_) {
93  MutexLockGuard m(&sig_flush_mutex_);
94  int err_code __attribute__((unused)) = pthread_cond_signal(&sig_flush_);
95  assert(err_code == 0 && "Could not signal flush thread");
96  }
97 
98  return my_seq_no;
99 }
100 
101 
103  if (!active_)
104  return;
105 
106  int32_t save_seq_no = DoTrace(kEventFlush, PathString("Tracer", 6),
107  "flushed ring buffer");
108  while (atomic_read32(&flushed_) <= save_seq_no) {
109  timespec timeout;
110  int retval;
111 
112  atomic_cas32(&flush_immediately_, 0, 1);
113  {
114  MutexLockGuard m(&sig_flush_mutex_);
115  retval = pthread_cond_signal(&sig_flush_);
116  assert(retval == 0);
117  }
118 
119  GetTimespecRel(250, &timeout);
120  retval = pthread_mutex_lock(&sig_continue_trace_mutex_);
121  retval |= pthread_cond_timedwait(
122  &sig_continue_trace_, &sig_continue_trace_mutex_, &timeout);
123  retval |= pthread_mutex_unlock(&sig_continue_trace_mutex_);
124  assert(retval == ETIMEDOUT || retval == 0);
125  }
126 }
127 
128 
129 void Tracer::GetTimespecRel(const int64_t ms, timespec *ts) {
130  timeval now;
131  gettimeofday(&now, NULL);
132  int64_t nsecs = now.tv_usec * 1000 + (ms % 1000) * 1000 * 1000;
133  int carry = 0;
134  if (nsecs >= 1000 * 1000 * 1000) {
135  carry = 1;
136  nsecs -= 1000 * 1000 * 1000;
137  }
138  ts->tv_sec = now.tv_sec + ms / 1000 + carry;
139  ts->tv_nsec = nsecs;
140 }
141 
142 
143 void *Tracer::MainFlush(void *data) {
144  Tracer *tracer = reinterpret_cast<Tracer *>(data);
145  int retval;
146  MutexLockGuard m(&tracer->sig_flush_mutex_);
147  FILE *f = fopen(tracer->trace_file_.c_str(), "a");
148  assert(f != NULL && "Could not open trace file");
149  struct timespec timeout;
150 
151  do {
152  while (
153  (atomic_read32(&tracer->terminate_flush_thread_) == 0)
154  && (atomic_read32(&tracer->flush_immediately_) == 0)
155  && (atomic_read32(&tracer->seq_no_) - atomic_read32(&tracer->flushed_)
156  <= tracer->flush_threshold_)) {
157  tracer->GetTimespecRel(2000, &timeout);
158  retval = pthread_cond_timedwait(
159  &tracer->sig_flush_, &tracer->sig_flush_mutex_, &timeout);
160  assert(retval != EINVAL);
161  }
162 
163  int base = atomic_read32(&tracer->flushed_) % tracer->buffer_size_;
164  int pos, i = 0;
165  while ((i <= tracer->flush_threshold_)
166  && (atomic_read32(
167  &tracer->commit_buffer_[pos = ((base + i)
168  % tracer->buffer_size_)])
169  == 1)) {
170  string tmp;
171  tmp = StringifyTimeval(tracer->ring_buffer_[pos].time_stamp);
172  retval = tracer->WriteCsvFile(f, tmp);
173  retval |= fputc(',', f) - ',';
174  tmp = StringifyInt(tracer->ring_buffer_[pos].code);
175  retval = tracer->WriteCsvFile(f, tmp);
176  retval |= fputc(',', f) - ',';
177  retval |= tracer->WriteCsvFile(f,
178  tracer->ring_buffer_[pos].path.ToString());
179  retval |= fputc(',', f) - ',';
180  retval |= tracer->WriteCsvFile(f, tracer->ring_buffer_[pos].msg);
181  retval |= (fputc(13, f) - 13) | (fputc(10, f) - 10);
182  assert(retval == 0);
183 
184  atomic_dec32(&tracer->commit_buffer_[pos]);
185  ++i;
186  }
187  retval = fflush(f);
188  assert(retval == 0);
189  atomic_xadd32(&tracer->flushed_, i);
190  atomic_cas32(&tracer->flush_immediately_, 1, 0);
191 
192  {
194  retval = pthread_cond_broadcast(&tracer->sig_continue_trace_);
195  assert(retval == 0);
196  }
197  } while (
198  (atomic_read32(&tracer->terminate_flush_thread_) == 0)
199  || (atomic_read32(&tracer->flushed_) < atomic_read32(&tracer->seq_no_)));
200 
201  retval = fclose(f);
202  assert(retval == 0);
203  return NULL;
204 }
205 
206 
208  if (active_) {
209  int retval = pthread_create(&thread_flush_, NULL, MainFlush, this);
210  assert(retval == 0);
211 
212  spawned_ = true;
213  DoTrace(kEventStart, PathString("Tracer", 6), "Trace buffer created");
214  }
215 }
216 
217 
219  : active_(false)
220  , spawned_(false)
221  , buffer_size_(0)
222  , flush_threshold_(0)
223  , ring_buffer_(NULL)
224  , commit_buffer_(NULL) {
225  memset(&thread_flush_, 0, sizeof(thread_flush_));
226  atomic_init32(&seq_no_);
227  atomic_init32(&flushed_);
228  atomic_init32(&terminate_flush_thread_);
229  atomic_init32(&flush_immediately_);
230 }
231 
232 
234  if (!active_)
235  return;
236  int retval;
237 
238  if (spawned_) {
239  DoTrace(kEventStop, PathString("Tracer", 6), "Destroying trace buffer...");
240 
241  // Trigger flushing and wait for it
242  atomic_inc32(&terminate_flush_thread_);
243  {
245  retval = pthread_cond_signal(&sig_flush_);
246  assert(retval == 0);
247  }
248  retval = pthread_join(thread_flush_, NULL);
249  assert(retval == 0);
250  }
251 
252  retval = pthread_cond_destroy(&sig_continue_trace_);
253  retval |= pthread_mutex_destroy(&sig_continue_trace_mutex_);
254  retval |= pthread_cond_destroy(&sig_flush_);
255  retval |= pthread_mutex_destroy(&sig_flush_mutex_);
256  assert(retval == 0);
257 
258  delete[] ring_buffer_;
259  delete[] commit_buffer_;
260 }
261 
262 
263 int Tracer::WriteCsvFile(FILE *fp, const string &field) {
264  if (fp == NULL)
265  return 0;
266 
267  int retval;
268 
269  if ((retval = fputc('"', fp)) != '"')
270  return retval;
271 
272  for (unsigned i = 0, l = field.length(); i < l; ++i) {
273  if (field[i] == '"') {
274  if ((retval = fputc('"', fp)) != '"')
275  return retval;
276  }
277  if ((retval = fputc(field[i], fp)) != field[i])
278  return retval;
279  }
280 
281  if ((retval = fputc('"', fp)) != '"')
282  return retval;
283 
284  return 0;
285 }
timeval time_stamp
Definition: tracer.h:85
Tracer()
Definition: tracer.cc:218
PathString path
Definition: tracer.h:90
void Activate(const int buffer_size, const int flush_threshold, const std::string &trace_file)
Definition: tracer.cc:25
void GetTimespecRel(const int64_t ms, timespec *ts)
Definition: tracer.cc:129
static void * MainFlush(void *data)
Definition: tracer.cc:143
pthread_mutex_t sig_flush_mutex_
Definition: tracer.h:115
atomic_int32 seq_no_
Definition: tracer.h:122
void Spawn()
Definition: tracer.cc:207
std::string msg
Definition: tracer.h:91
assert((mem||(size==0))&&"Out Of Memory")
int code
Definition: tracer.h:89
int WriteCsvFile(FILE *fp, const std::string &field)
Definition: tracer.cc:263
pthread_cond_t sig_flush_
Definition: tracer.h:114
pthread_cond_t sig_continue_trace_
Definition: tracer.h:116
struct cvmcache_object_info __attribute__
Definition: atomic.h:24
int buffer_size_
Definition: tracer.h:104
Definition: tracer.h:35
atomic_int32 flush_immediately_
Definition: tracer.h:129
int32_t atomic_int32
Definition: atomic.h:17
int32_t DoTrace(const int event, const PathString &path, const std::string &msg)
Definition: tracer.cc:67
int flush_threshold_
Definition: tracer.h:105
void Flush()
Definition: tracer.cc:102
pthread_t thread_flush_
Definition: tracer.h:113
~Tracer()
Definition: tracer.cc:233
atomic_int32 terminate_flush_thread_
Definition: tracer.h:128
bool active_
Definition: tracer.h:101
string StringifyInt(const int64_t value)
Definition: string.cc:77
BufferEntry * ring_buffer_
Definition: tracer.h:106
std::string ToString() const
Definition: shortstring.h:139
ShortString< kDefaultMaxPath, 0 > PathString
Definition: shortstring.h:213
Definition: mutex.h:42
static const int kEventStop
Definition: tracer.h:72
Definition: tracer.h:81
pthread_mutex_t sig_continue_trace_mutex_
Definition: tracer.h:117
std::string trace_file_
Definition: tracer.h:103
string StringifyTimeval(const timeval value)
Definition: string.cc:195
atomic_int32 flushed_
Definition: tracer.h:127
bool spawned_
Definition: tracer.h:102
atomic_int32 * commit_buffer_
Definition: tracer.h:112