CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
tracer.cc
Go to the documentation of this file.
1 
6 #include "tracer.h"
7 
8 #include <pthread.h>
9 
10 #include <cassert>
11 #include <cerrno>
12 #include <cstdio>
13 #include <cstdlib>
14 #include <cstring>
15 #include <string>
16 
17 #include "util/atomic.h"
18 #include "util/concurrency.h"
19 #include "util/posix.h"
20 #include "util/string.h"
21 
22 using namespace std; // NOLINT
23 
24 
26  const int buffer_size,
27  const int flush_threshold,
28  const string &trace_file)
29 {
30  trace_file_ = trace_file;
31  buffer_size_ = buffer_size;
32  flush_threshold_ = flush_threshold;
33  assert(buffer_size_ > 1 && flush_threshold_>= 0
34  && flush_threshold_ < buffer_size_);
35 
36  ring_buffer_ = new BufferEntry[buffer_size_];
37  commit_buffer_ = new atomic_int32[buffer_size_];
38  for (int i = 0; i < buffer_size_; i++)
39  atomic_init32(&commit_buffer_[i]);
40 
41  int retval;
42  retval = pthread_cond_init(&sig_continue_trace_, NULL);
43  retval |= pthread_mutex_init(&sig_continue_trace_mutex_, NULL);
44  retval |= pthread_cond_init(&sig_flush_, NULL);
45  retval |= pthread_mutex_init(&sig_flush_mutex_, NULL);
46  assert(retval == 0);
47 
48  active_ = true;
49 }
50 
51 
70  const int event,
71  const PathString &path,
72  const string &msg)
73 {
74  int32_t my_seq_no = atomic_xadd32(&seq_no_, 1);
75  timeval now;
76  gettimeofday(&now, NULL);
77  int pos = my_seq_no % buffer_size_;
78 
79  while (my_seq_no - atomic_read32(&flushed_) >= buffer_size_) {
80  timespec timeout;
81  int retval;
82  GetTimespecRel(25, &timeout);
83  retval = pthread_mutex_lock(&sig_continue_trace_mutex_);
84  retval |= pthread_cond_timedwait(&sig_continue_trace_,
85  &sig_continue_trace_mutex_, &timeout);
86  retval |= pthread_mutex_unlock(&sig_continue_trace_mutex_);
87  assert(retval == ETIMEDOUT || retval == 0);
88  }
89 
90  ring_buffer_[pos].time_stamp = now;
91  ring_buffer_[pos].code = event;
92  ring_buffer_[pos].path = path;
93  ring_buffer_[pos].msg = msg;
94  atomic_inc32(&commit_buffer_[pos]);
95 
96  if (my_seq_no - atomic_read32(&flushed_) == flush_threshold_) {
97  MutexLockGuard m(&sig_flush_mutex_);
98  int err_code __attribute__((unused)) = pthread_cond_signal(&sig_flush_);
99  assert(err_code == 0 && "Could not signal flush thread");
100  }
101 
102  return my_seq_no;
103 }
104 
105 
107  if (!active_) return;
108 
109  int32_t save_seq_no = DoTrace(kEventFlush, PathString("Tracer", 6),
110  "flushed ring buffer");
111  while (atomic_read32(&flushed_) <= save_seq_no) {
112  timespec timeout;
113  int retval;
114 
115  atomic_cas32(&flush_immediately_, 0, 1);
116  {
117  MutexLockGuard m(&sig_flush_mutex_);
118  retval = pthread_cond_signal(&sig_flush_);
119  assert(retval == 0);
120  }
121 
122  GetTimespecRel(250, &timeout);
123  retval = pthread_mutex_lock(&sig_continue_trace_mutex_);
124  retval |= pthread_cond_timedwait(&sig_continue_trace_,
125  &sig_continue_trace_mutex_,
126  &timeout);
127  retval |= pthread_mutex_unlock(&sig_continue_trace_mutex_);
128  assert(retval == ETIMEDOUT || retval == 0);
129  }
130 }
131 
132 
133 void Tracer::GetTimespecRel(const int64_t ms, timespec *ts) {
134  timeval now;
135  gettimeofday(&now, NULL);
136  int64_t nsecs = now.tv_usec * 1000 + (ms % 1000)*1000*1000;
137  int carry = 0;
138  if (nsecs >= 1000*1000*1000) {
139  carry = 1;
140  nsecs -= 1000*1000*1000;
141  }
142  ts->tv_sec = now.tv_sec + ms/1000 + carry;
143  ts->tv_nsec = nsecs;
144 }
145 
146 
147 void *Tracer::MainFlush(void *data) {
148  Tracer *tracer = reinterpret_cast<Tracer *>(data);
149  int retval;
150  MutexLockGuard m(&tracer->sig_flush_mutex_);
151  FILE *f = fopen(tracer->trace_file_.c_str(), "a");
152  assert(f != NULL && "Could not open trace file");
153  struct timespec timeout;
154 
155  do {
156  while ((atomic_read32(&tracer->terminate_flush_thread_) == 0) &&
157  (atomic_read32(&tracer->flush_immediately_) == 0) &&
158  (atomic_read32(&tracer->seq_no_) -
159  atomic_read32(&tracer->flushed_)
160  <= tracer->flush_threshold_))
161  {
162  tracer->GetTimespecRel(2000, &timeout);
163  retval = pthread_cond_timedwait(&tracer->sig_flush_,
164  &tracer->sig_flush_mutex_,
165  &timeout);
166  assert(retval != EINVAL);
167  }
168 
169  int base = atomic_read32(&tracer->flushed_) % tracer->buffer_size_;
170  int pos, i = 0;
171  while ((i <= tracer->flush_threshold_) &&
172  (atomic_read32(&tracer->commit_buffer_[
173  pos = ((base + i) % tracer->buffer_size_)]) == 1))
174  {
175  string tmp;
176  tmp = StringifyTimeval(tracer->ring_buffer_[pos].time_stamp);
177  retval = tracer->WriteCsvFile(f, tmp);
178  retval |= fputc(',', f) - ',';
179  tmp = StringifyInt(tracer->ring_buffer_[pos].code);
180  retval = tracer->WriteCsvFile(f, tmp);
181  retval |= fputc(',', f) - ',';
182  retval |= tracer->WriteCsvFile(
183  f, tracer->ring_buffer_[pos].path.ToString());
184  retval |= fputc(',', f) - ',';
185  retval |= tracer->WriteCsvFile(f, tracer->ring_buffer_[pos].msg);
186  retval |= (fputc(13, f) - 13) | (fputc(10, f) - 10);
187  assert(retval == 0);
188 
189  atomic_dec32(&tracer->commit_buffer_[pos]);
190  ++i;
191  }
192  retval = fflush(f);
193  assert(retval == 0);
194  atomic_xadd32(&tracer->flushed_, i);
195  atomic_cas32(&tracer->flush_immediately_, 1, 0);
196 
197  {
199  retval = pthread_cond_broadcast(&tracer->sig_continue_trace_);
200  assert(retval == 0);
201  }
202  } while ((atomic_read32(&tracer->terminate_flush_thread_) == 0) ||
203  (atomic_read32(&tracer->flushed_) <
204  atomic_read32(&tracer->seq_no_)));
205 
206  retval = fclose(f);
207  assert(retval == 0);
208  return NULL;
209 }
210 
211 
213  if (active_) {
214  int retval = pthread_create(&thread_flush_, NULL, MainFlush, this);
215  assert(retval == 0);
216 
217  spawned_ = true;
218  DoTrace(kEventStart, PathString("Tracer", 6), "Trace buffer created");
219  }
220 }
221 
222 
224  : active_(false)
225  , spawned_(false)
226  , buffer_size_(0)
227  , flush_threshold_(0)
228  , ring_buffer_(NULL)
229  , commit_buffer_(NULL)
230 {
231  memset(&thread_flush_, 0, sizeof(thread_flush_));
232  atomic_init32(&seq_no_);
233  atomic_init32(&flushed_);
234  atomic_init32(&terminate_flush_thread_);
235  atomic_init32(&flush_immediately_);
236 }
237 
238 
240  if (!active_)
241  return;
242  int retval;
243 
244  if (spawned_) {
245  DoTrace(kEventStop, PathString("Tracer", 6), "Destroying trace buffer...");
246 
247  // Trigger flushing and wait for it
248  atomic_inc32(&terminate_flush_thread_);
249  {
251  retval = pthread_cond_signal(&sig_flush_);
252  assert(retval == 0);
253  }
254  retval = pthread_join(thread_flush_, NULL);
255  assert(retval == 0);
256  }
257 
258  retval = pthread_cond_destroy(&sig_continue_trace_);
259  retval |= pthread_mutex_destroy(&sig_continue_trace_mutex_);
260  retval |= pthread_cond_destroy(&sig_flush_);
261  retval |= pthread_mutex_destroy(&sig_flush_mutex_);
262  assert(retval == 0);
263 
264  delete[] ring_buffer_;
265  delete[] commit_buffer_;
266 }
267 
268 
269 int Tracer::WriteCsvFile(FILE *fp, const string &field) {
270  if (fp == NULL)
271  return 0;
272 
273  int retval;
274 
275  if ((retval = fputc('"', fp)) != '"')
276  return retval;
277 
278  for (unsigned i = 0, l = field.length(); i < l; ++i) {
279  if (field[i] == '"') {
280  if ((retval = fputc('"', fp)) != '"')
281  return retval;
282  }
283  if ((retval = fputc(field[i], fp)) != field[i])
284  return retval;
285  }
286 
287  if ((retval = fputc('"', fp)) != '"')
288  return retval;
289 
290  return 0;
291 }
timeval time_stamp
Definition: tracer.h:87
Tracer()
Definition: tracer.cc:223
PathString path
Definition: tracer.h:92
void Activate(const int buffer_size, const int flush_threshold, const std::string &trace_file)
Definition: tracer.cc:25
void GetTimespecRel(const int64_t ms, timespec *ts)
Definition: tracer.cc:133
static void * MainFlush(void *data)
Definition: tracer.cc:147
pthread_mutex_t sig_flush_mutex_
Definition: tracer.h:117
atomic_int32 seq_no_
Definition: tracer.h:124
void Spawn()
Definition: tracer.cc:212
std::string msg
Definition: tracer.h:93
assert((mem||(size==0))&&"Out Of Memory")
int code
Definition: tracer.h:91
int WriteCsvFile(FILE *fp, const std::string &field)
Definition: tracer.cc:269
pthread_cond_t sig_flush_
Definition: tracer.h:116
pthread_cond_t sig_continue_trace_
Definition: tracer.h:118
struct cvmcache_object_info __attribute__
Definition: atomic.h:24
int buffer_size_
Definition: tracer.h:106
Definition: tracer.h:35
atomic_int32 flush_immediately_
Definition: tracer.h:131
int32_t atomic_int32
Definition: atomic.h:17
int32_t DoTrace(const int event, const PathString &path, const std::string &msg)
Definition: tracer.cc:69
int flush_threshold_
Definition: tracer.h:107
void Flush()
Definition: tracer.cc:106
pthread_t thread_flush_
Definition: tracer.h:115
~Tracer()
Definition: tracer.cc:239
atomic_int32 terminate_flush_thread_
Definition: tracer.h:130
bool active_
Definition: tracer.h:103
string StringifyInt(const int64_t value)
Definition: string.cc:78
BufferEntry * ring_buffer_
Definition: tracer.h:108
std::string ToString() const
Definition: shortstring.h:141
ShortString< kDefaultMaxPath, 0 > PathString
Definition: shortstring.h:217
Definition: mutex.h:42
static const int kEventStop
Definition: tracer.h:74
Definition: tracer.h:83
pthread_mutex_t sig_continue_trace_mutex_
Definition: tracer.h:119
std::string trace_file_
Definition: tracer.h:105
string StringifyTimeval(const timeval value)
Definition: string.cc:203
atomic_int32 flushed_
Definition: tracer.h:129
bool spawned_
Definition: tracer.h:104
atomic_int32 * commit_buffer_
Definition: tracer.h:114