26 const int flush_threshold,
27 const string &trace_file) {
28 trace_file_ = trace_file;
29 buffer_size_ = buffer_size;
30 flush_threshold_ = flush_threshold;
31 assert(buffer_size_ > 1 && flush_threshold_ >= 0
32 && flush_threshold_ < buffer_size_);
36 for (
int i = 0; i < buffer_size_; i++)
37 atomic_init32(&commit_buffer_[i]);
40 retval = pthread_cond_init(&sig_continue_trace_, NULL);
41 retval |= pthread_mutex_init(&sig_continue_trace_mutex_, NULL);
42 retval |= pthread_cond_init(&sig_flush_, NULL);
43 retval |= pthread_mutex_init(&sig_flush_mutex_, NULL);
70 const int32_t my_seq_no = atomic_xadd32(&seq_no_, 1);
72 gettimeofday(&now, NULL);
73 const int pos = my_seq_no % buffer_size_;
75 while (my_seq_no - atomic_read32(&flushed_) >= buffer_size_) {
78 GetTimespecRel(25, &timeout);
79 retval = pthread_mutex_lock(&sig_continue_trace_mutex_);
80 retval |= pthread_cond_timedwait(&sig_continue_trace_,
81 &sig_continue_trace_mutex_, &timeout);
82 retval |= pthread_mutex_unlock(&sig_continue_trace_mutex_);
83 assert(retval == ETIMEDOUT || retval == 0);
86 ring_buffer_[pos].time_stamp = now;
87 ring_buffer_[pos].code = event;
88 ring_buffer_[pos].path = path;
89 ring_buffer_[pos].msg = msg;
90 atomic_inc32(&commit_buffer_[pos]);
92 if (my_seq_no - atomic_read32(&flushed_) == flush_threshold_) {
95 pthread_cond_signal(&sig_flush_);
96 assert(err_code == 0 &&
"Could not signal flush thread");
107 const int32_t save_seq_no =
108 DoTrace(kEventFlush,
PathString(
"Tracer", 6),
"flushed ring buffer");
109 while (atomic_read32(&flushed_) <= save_seq_no) {
113 atomic_cas32(&flush_immediately_, 0, 1);
116 retval = pthread_cond_signal(&sig_flush_);
120 GetTimespecRel(250, &timeout);
121 retval = pthread_mutex_lock(&sig_continue_trace_mutex_);
122 retval |= pthread_cond_timedwait(
123 &sig_continue_trace_, &sig_continue_trace_mutex_, &timeout);
124 retval |= pthread_mutex_unlock(&sig_continue_trace_mutex_);
125 assert(retval == ETIMEDOUT || retval == 0);
132 gettimeofday(&now, NULL);
133 int64_t nsecs = now.tv_usec * 1000 + (ms % 1000) * 1000 * 1000;
135 if (nsecs >= 1000 * 1000 * 1000) {
137 nsecs -= 1000 * 1000 * 1000;
139 ts->tv_sec = now.tv_sec + ms / 1000 + carry;
149 assert(f != NULL &&
"Could not open trace file");
150 struct timespec timeout;
159 retval = pthread_cond_timedwait(
166 while ((i <= tracer->flush_threshold_)
174 retval |= fputc(
',', f) -
',';
177 retval |= fputc(
',', f) -
',';
180 retval |= fputc(
',', f) -
',';
182 retval |= (fputc(13, f) - 13) | (fputc(10, f) - 10);
190 atomic_xadd32(&tracer->
flushed_, i);
200 || (atomic_read32(&tracer->
flushed_) < atomic_read32(&tracer->
seq_no_)));
210 const int retval = pthread_create(&thread_flush_, NULL, MainFlush,
this);
214 DoTrace(kEventStart,
PathString(
"Tracer", 6),
"Trace buffer created");
223 , flush_threshold_(0)
225 , commit_buffer_(NULL) {
270 if ((retval = fputc(
'"', fp)) !=
'"')
273 for (
unsigned i = 0, l = field.length(); i < l; ++i) {
274 if (field[i] ==
'"') {
275 if ((retval = fputc(
'"', fp)) !=
'"')
278 if ((retval = fputc(field[i], fp)) != field[i])
282 if ((retval = fputc(
'"', fp)) !=
'"')
void Activate(const int buffer_size, const int flush_threshold, const std::string &trace_file)
void GetTimespecRel(const int64_t ms, timespec *ts)
static void * MainFlush(void *data)
pthread_mutex_t sig_flush_mutex_
assert((mem||(size==0))&&"Out Of Memory")
int WriteCsvFile(FILE *fp, const std::string &field)
pthread_cond_t sig_flush_
pthread_cond_t sig_continue_trace_
struct cvmcache_object_info __attribute__
atomic_int32 flush_immediately_
atomic_int32 terminate_flush_thread_
string StringifyInt(const int64_t value)
BufferEntry * ring_buffer_
std::string ToString() const
ShortString< kDefaultMaxPath, 0 > PathString
static const int kEventStop
pthread_mutex_t sig_continue_trace_mutex_
void const PathString const std::string &msg DoTrace(event, path, msg)
string StringifyTimeval(const timeval value)
atomic_int32 * commit_buffer_