GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/tracer.cc Lines: 142 147 96.6 %
Date: 2019-02-03 02:48:13 Branches: 72 104 69.2 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#include "cvmfs_config.h"
6
#include "tracer.h"
7
8
#include <pthread.h>
9
10
#include <cassert>
11
#include <cerrno>
12
#include <cstdio>
13
#include <cstdlib>
14
#include <cstring>
15
#include <string>
16
17
#include "atomic.h"
18
#include "util/posix.h"
19
#include "util/string.h"
20
21
using namespace std;  // NOLINT
22
23
24
210
void Tracer::Activate(
25
  const int buffer_size,
26
  const int flush_threshold,
27
  const string &trace_file)
28
{
29
210
  trace_file_ = trace_file;
30
210
  buffer_size_ = buffer_size;
31
210
  flush_threshold_ = flush_threshold;
32
  assert(buffer_size_ > 1 && flush_threshold_>= 0
33

210
    && flush_threshold_ < buffer_size_);
34
35

210
  ring_buffer_ = new BufferEntry[buffer_size_];
36
210
  commit_buffer_ = new atomic_int32[buffer_size_];
37
63120
  for (int i = 0; i < buffer_size_; i++)
38
62910
    atomic_init32(&commit_buffer_[i]);
39
40
  int retval;
41
210
  retval = pthread_cond_init(&sig_continue_trace_, NULL);
42
210
  retval |= pthread_mutex_init(&sig_continue_trace_mutex_, NULL);
43
210
  retval |= pthread_cond_init(&sig_flush_, NULL);
44
210
  retval |= pthread_mutex_init(&sig_flush_mutex_, NULL);
45
210
  assert(retval == 0);
46
47
210
  active_ = true;
48
210
}
49
50
51
/**
52
 * Trace a message.  This is usually a lock-free procedure that just
53
 * requires two fetch_and_add operations and a gettimeofday syscall.
54
 * There are two exceptions:
55
 *   -# If the ring buffer is full, the function blocks until the flush
56
 *      thread made some space.  Avoid that by carefully choosing size
57
 *      and threshold.
58
 *   -# If this message reaches the threshold, the flush thread gets
59
 *      signaled.
60
 *
61
 * \param[in] event Arbitrary code, for consistency applications should use one
62
 *            of the TraceEvents constants. Negative codes are reserved
63
 *            for internal use.
64
 * \param[in] id Arbitrary id, for example file name or module name which is
65
 *            doing the trace.
66
 * \return The sequence number which was used to trace the record
67
 */
68
5576175
int32_t Tracer::DoTrace(
69
  const int event,
70
  const PathString &path,
71
  const string &msg)
72
{
73
5576175
  int32_t my_seq_no = atomic_xadd32(&seq_no_, 1);
74
  timeval now;
75
5574450
  gettimeofday(&now, NULL);
76
5580960
  int pos = my_seq_no % buffer_size_;
77
78
32244450
  while (my_seq_no - atomic_read32(&flushed_) >= buffer_size_) {
79
    timespec timeout;
80
    int retval;
81
21085020
    GetTimespecRel(25, &timeout);
82
21086295
    retval = pthread_mutex_lock(&sig_continue_trace_mutex_);
83
    retval |= pthread_cond_timedwait(&sig_continue_trace_,
84
21088050
                                     &sig_continue_trace_mutex_, &timeout);
85
21088050
    retval |= pthread_mutex_unlock(&sig_continue_trace_mutex_);
86

21082530
    assert(retval == ETIMEDOUT || retval == 0);
87
  }
88
89
5578830
  ring_buffer_[pos].time_stamp = now;
90
5578830
  ring_buffer_[pos].code = event;
91
5578830
  ring_buffer_[pos].path = path;
92
5555775
  ring_buffer_[pos].msg = msg;
93
5584785
  atomic_inc32(&commit_buffer_[pos]);
94
95
5586615
  if (my_seq_no - atomic_read32(&flushed_) == flush_threshold_) {
96
651435
    LockMutex(&sig_flush_mutex_);
97
651435
    int err_code __attribute__((unused)) = pthread_cond_signal(&sig_flush_);
98
651435
    assert(err_code == 0 && "Could not signal flush thread");
99
651435
    UnlockMutex(&sig_flush_mutex_);
100
  }
101
102
5584320
  return my_seq_no;
103
}
104
105
106
22515
void Tracer::Flush() {
107
22515
  if (!active_) return;
108
109
  int32_t save_seq_no = DoTrace(kEventFlush, PathString("Tracer", 6),
110
22515
                                "flushed ring buffer");
111
22515
  while (atomic_read32(&flushed_) <= save_seq_no) {
112
    timespec timeout;
113
    int retval;
114
115
24405
    atomic_cas32(&flush_immediately_, 0, 1);
116
24405
    LockMutex(&sig_flush_mutex_);
117
24405
    retval = pthread_cond_signal(&sig_flush_);
118
24405
    assert(retval == 0);
119
24405
    UnlockMutex(&sig_flush_mutex_);
120
121
24405
    GetTimespecRel(250, &timeout);
122
24405
    retval = pthread_mutex_lock(&sig_continue_trace_mutex_);
123
    retval |= pthread_cond_timedwait(&sig_continue_trace_,
124
                                     &sig_continue_trace_mutex_,
125
24405
                                     &timeout);
126
24405
    retval |= pthread_mutex_unlock(&sig_continue_trace_mutex_);
127

24405
    assert(retval == ETIMEDOUT || retval == 0);
128
  }
129
}
130
131
132
21481560
void Tracer::GetTimespecRel(const int64_t ms, timespec *ts) {
133
  timeval now;
134
21481560
  gettimeofday(&now, NULL);
135
21480480
  int64_t nsecs = now.tv_usec * 1000 + (ms % 1000)*1000*1000;
136
21480480
  int carry = 0;
137
21480480
  if (nsecs >= 1000*1000*1000) {
138
551685
    carry = 1;
139
551685
    nsecs -= 1000*1000*1000;
140
  }
141
21480480
  ts->tv_sec = now.tv_sec + ms/1000 + carry;
142
21480480
  ts->tv_nsec = nsecs;
143
21480480
}
144
145
146
210
void *Tracer::MainFlush(void *data) {
147
210
  Tracer *tracer = reinterpret_cast<Tracer *>(data);
148
  int retval;
149
210
  LockMutex(&tracer->sig_flush_mutex_);
150
210
  FILE *f = fopen(tracer->trace_file_.c_str(), "a");
151
210
  assert(f != NULL && "Could not open trace file");
152
  struct timespec timeout;
153
154

9317674470
  do {
155


18635720955
    while ((atomic_read32(&tracer->terminate_flush_thread_) == 0) &&
156
           (atomic_read32(&tracer->flush_immediately_) == 0) &&
157
           (atomic_read32(&tracer->seq_no_) -
158
              atomic_read32(&tracer->flushed_)
159
              <= tracer->flush_threshold_))
160
    {
161
372015
      tracer->GetTimespecRel(2000, &timeout);
162
      retval = pthread_cond_timedwait(&tracer->sig_flush_,
163
                                      &tracer->sig_flush_mutex_,
164
372015
                                      &timeout);
165
372015
      assert(retval != EINVAL);
166
    }
167
168
9317674470
    int base = atomic_read32(&tracer->flushed_) % tracer->buffer_size_;
169
9317674470
    int pos, i = 0;
170

9323263905
    while ((i <= tracer->flush_threshold_) &&
171
           (atomic_read32(&tracer->commit_buffer_[
172
             pos = ((base + i) % tracer->buffer_size_)]) == 1))
173
    {
174
5589435
      string tmp;
175
5589435
      tmp = StringifyTimeval(tracer->ring_buffer_[pos].time_stamp);
176
5589435
      retval = tracer->WriteCsvFile(f, tmp);
177
5589435
      retval |= fputc(',', f) - ',';
178
5589435
      tmp = StringifyInt(tracer->ring_buffer_[pos].code);
179
5589435
      retval = tracer->WriteCsvFile(f, tmp);
180
5589435
      retval |= fputc(',', f) - ',';
181
      retval |= tracer->WriteCsvFile(
182
5589435
        f, tracer->ring_buffer_[pos].path.ToString());
183
5589435
      retval |= fputc(',', f) - ',';
184
5589435
      retval |= tracer->WriteCsvFile(f, tracer->ring_buffer_[pos].msg);
185
5589435
      retval |= (fputc(13, f) - 13) | (fputc(10, f) - 10);
186
5589435
      assert(retval == 0);
187
188
5589435
      atomic_dec32(&tracer->commit_buffer_[pos]);
189
5589435
      ++i;
190
    }
191
9317674470
    retval = fflush(f);
192
9317674470
    assert(retval == 0);
193
9317674470
    atomic_xadd32(&tracer->flushed_, i);
194
9317674470
    atomic_cas32(&tracer->flush_immediately_, 1, 0);
195
196
9317674470
    LockMutex(&tracer->sig_continue_trace_mutex_);
197
9317674470
    retval = pthread_cond_broadcast(&tracer->sig_continue_trace_);
198
9317674470
    assert(retval == 0);
199
9317674470
    UnlockMutex(&tracer->sig_continue_trace_mutex_);
200
  } while ((atomic_read32(&tracer->terminate_flush_thread_) == 0) ||
201
           (atomic_read32(&tracer->flushed_) <
202
             atomic_read32(&tracer->seq_no_)));
203
204
210
  UnlockMutex(&tracer->sig_flush_mutex_);
205
210
  retval = fclose(f);
206
210
  assert(retval == 0);
207
210
  return NULL;
208
}
209
210
211
225
void Tracer::Spawn() {
212
225
  if (active_) {
213
210
    int retval = pthread_create(&thread_flush_, NULL, MainFlush, this);
214
210
    assert(retval == 0);
215
216
210
    spawned_ = true;
217
210
    DoTrace(kEventStart, PathString("Tracer", 6), "Trace buffer created");
218
  }
219
225
}
220
221
222
262
Tracer::Tracer()
223
  : active_(false)
224
  , spawned_(false)
225
  , buffer_size_(0)
226
  , flush_threshold_(0)
227
  , ring_buffer_(NULL)
228
262
  , commit_buffer_(NULL)
229
{
230
262
  memset(&thread_flush_, 0, sizeof(thread_flush_));
231
262
  atomic_init32(&seq_no_);
232
262
  atomic_init32(&flushed_);
233
262
  atomic_init32(&terminate_flush_thread_);
234
262
  atomic_init32(&flush_immediately_);
235
262
}
236
237
238
262
Tracer::~Tracer() {
239
262
  if (!active_)
240
    return;
241
  int retval;
242
243
210
  if (spawned_) {
244
210
    DoTrace(kEventStop, PathString("Tracer", 6), "Destroying trace buffer...");
245
246
    // Trigger flushing and wait for it
247
210
    atomic_inc32(&terminate_flush_thread_);
248
210
    LockMutex(&sig_flush_mutex_);
249
210
    retval = pthread_cond_signal(&sig_flush_);
250
210
    assert(retval == 0);
251
210
    UnlockMutex(&sig_flush_mutex_);
252
210
    retval = pthread_join(thread_flush_, NULL);
253
210
    assert(retval == 0);
254
  }
255
256
210
  retval = pthread_cond_destroy(&sig_continue_trace_);
257
210
  retval |= pthread_mutex_destroy(&sig_continue_trace_mutex_);
258
210
  retval |= pthread_cond_destroy(&sig_flush_);
259
210
  retval |= pthread_mutex_destroy(&sig_flush_mutex_);
260
210
  assert(retval == 0);
261
262

210
  delete[] ring_buffer_;
263
210
  delete[] commit_buffer_;
264
}
265
266
267
22357740
int Tracer::WriteCsvFile(FILE *fp, const string &field) {
268
22357740
  if (fp == NULL)
269
    return 0;
270
271
  int retval;
272
273
22357740
  if ((retval = fputc('"', fp)) != '"')
274
    return retval;
275
276
344700060
  for (unsigned i = 0, l = field.length(); i < l; ++i) {
277
322342320
    if (field[i] == '"') {
278
3915000
      if ((retval = fputc('"', fp)) != '"')
279
        return retval;
280
    }
281
322342320
    if ((retval = fputc(field[i], fp)) != field[i])
282
      return retval;
283
  }
284
285
22357740
  if ((retval = fputc('"', fp)) != '"')
286
    return retval;
287
288
22357740
  return 0;
289
}