26 const int buffer_size,
27 const int flush_threshold,
28 const string &trace_file)
30 trace_file_ = trace_file;
31 buffer_size_ = buffer_size;
32 flush_threshold_ = flush_threshold;
33 assert(buffer_size_ > 1 && flush_threshold_>= 0
34 && flush_threshold_ < buffer_size_);
38 for (
int i = 0; i < buffer_size_; i++)
39 atomic_init32(&commit_buffer_[i]);
42 retval = pthread_cond_init(&sig_continue_trace_, NULL);
43 retval |= pthread_mutex_init(&sig_continue_trace_mutex_, NULL);
44 retval |= pthread_cond_init(&sig_flush_, NULL);
45 retval |= pthread_mutex_init(&sig_flush_mutex_, NULL);
74 int32_t my_seq_no = atomic_xadd32(&seq_no_, 1);
76 gettimeofday(&now, NULL);
77 int pos = my_seq_no % buffer_size_;
79 while (my_seq_no - atomic_read32(&flushed_) >= buffer_size_) {
82 GetTimespecRel(25, &timeout);
83 retval = pthread_mutex_lock(&sig_continue_trace_mutex_);
84 retval |= pthread_cond_timedwait(&sig_continue_trace_,
85 &sig_continue_trace_mutex_, &timeout);
86 retval |= pthread_mutex_unlock(&sig_continue_trace_mutex_);
87 assert(retval == ETIMEDOUT || retval == 0);
90 ring_buffer_[pos].time_stamp = now;
91 ring_buffer_[pos].code = event;
92 ring_buffer_[pos].path = path;
93 ring_buffer_[pos].msg = msg;
94 atomic_inc32(&commit_buffer_[pos]);
96 if (my_seq_no - atomic_read32(&flushed_) == flush_threshold_) {
98 int err_code
__attribute__((unused)) = pthread_cond_signal(&sig_flush_);
99 assert(err_code == 0 &&
"Could not signal flush thread");
107 if (!active_)
return;
109 int32_t save_seq_no = DoTrace(kEventFlush,
PathString(
"Tracer", 6),
110 "flushed ring buffer");
111 while (atomic_read32(&flushed_) <= save_seq_no) {
115 atomic_cas32(&flush_immediately_, 0, 1);
118 retval = pthread_cond_signal(&sig_flush_);
122 GetTimespecRel(250, &timeout);
123 retval = pthread_mutex_lock(&sig_continue_trace_mutex_);
124 retval |= pthread_cond_timedwait(&sig_continue_trace_,
125 &sig_continue_trace_mutex_,
127 retval |= pthread_mutex_unlock(&sig_continue_trace_mutex_);
128 assert(retval == ETIMEDOUT || retval == 0);
135 gettimeofday(&now, NULL);
136 int64_t nsecs = now.tv_usec * 1000 + (ms % 1000)*1000*1000;
138 if (nsecs >= 1000*1000*1000) {
140 nsecs -= 1000*1000*1000;
142 ts->tv_sec = now.tv_sec + ms/1000 + carry;
152 assert(f != NULL &&
"Could not open trace file");
153 struct timespec timeout;
158 (atomic_read32(&tracer->
seq_no_) -
163 retval = pthread_cond_timedwait(&tracer->
sig_flush_,
171 while ((i <= tracer->flush_threshold_) &&
178 retval |= fputc(
',', f) -
',';
181 retval |= fputc(
',', f) -
',';
184 retval |= fputc(
',', f) -
',';
186 retval |= (fputc(13, f) - 13) | (fputc(10, f) - 10);
194 atomic_xadd32(&tracer->
flushed_, i);
204 atomic_read32(&tracer->
seq_no_)));
214 int retval = pthread_create(&thread_flush_, NULL, MainFlush,
this);
218 DoTrace(kEventStart,
PathString(
"Tracer", 6),
"Trace buffer created");
227 , flush_threshold_(0)
229 , commit_buffer_(NULL)
275 if ((retval = fputc(
'"', fp)) !=
'"')
278 for (
unsigned i = 0, l = field.length(); i < l; ++i) {
279 if (field[i] ==
'"') {
280 if ((retval = fputc(
'"', fp)) !=
'"')
283 if ((retval = fputc(field[i], fp)) != field[i])
287 if ((retval = fputc(
'"', fp)) !=
'"')
void Activate(const int buffer_size, const int flush_threshold, const std::string &trace_file)
void GetTimespecRel(const int64_t ms, timespec *ts)
static void * MainFlush(void *data)
pthread_mutex_t sig_flush_mutex_
assert((mem||(size==0))&&"Out Of Memory")
int WriteCsvFile(FILE *fp, const std::string &field)
pthread_cond_t sig_flush_
pthread_cond_t sig_continue_trace_
struct cvmcache_object_info __attribute__
atomic_int32 flush_immediately_
int32_t DoTrace(const int event, const PathString &path, const std::string &msg)
atomic_int32 terminate_flush_thread_
string StringifyInt(const int64_t value)
BufferEntry * ring_buffer_
std::string ToString() const
ShortString< kDefaultMaxPath, 0 > PathString
static const int kEventStop
pthread_mutex_t sig_continue_trace_mutex_
string StringifyTimeval(const timeval value)
atomic_int32 * commit_buffer_