26 const int flush_threshold,
27 const string &trace_file) {
28 trace_file_ = trace_file;
29 buffer_size_ = buffer_size;
30 flush_threshold_ = flush_threshold;
31 assert(buffer_size_ > 1 && flush_threshold_ >= 0
32 && flush_threshold_ < buffer_size_);
36 for (
int i = 0; i < buffer_size_; i++)
37 atomic_init32(&commit_buffer_[i]);
40 retval = pthread_cond_init(&sig_continue_trace_, NULL);
41 retval |= pthread_mutex_init(&sig_continue_trace_mutex_, NULL);
42 retval |= pthread_cond_init(&sig_flush_, NULL);
43 retval |= pthread_mutex_init(&sig_flush_mutex_, NULL);
70 int32_t my_seq_no = atomic_xadd32(&seq_no_, 1);
72 gettimeofday(&now, NULL);
73 int pos = my_seq_no % buffer_size_;
75 while (my_seq_no - atomic_read32(&flushed_) >= buffer_size_) {
78 GetTimespecRel(25, &timeout);
79 retval = pthread_mutex_lock(&sig_continue_trace_mutex_);
80 retval |= pthread_cond_timedwait(&sig_continue_trace_,
81 &sig_continue_trace_mutex_, &timeout);
82 retval |= pthread_mutex_unlock(&sig_continue_trace_mutex_);
83 assert(retval == ETIMEDOUT || retval == 0);
86 ring_buffer_[pos].time_stamp = now;
87 ring_buffer_[pos].code = event;
88 ring_buffer_[pos].path = path;
89 ring_buffer_[pos].msg = msg;
90 atomic_inc32(&commit_buffer_[pos]);
92 if (my_seq_no - atomic_read32(&flushed_) == flush_threshold_) {
94 int err_code
__attribute__((unused)) = pthread_cond_signal(&sig_flush_);
95 assert(err_code == 0 &&
"Could not signal flush thread");
106 int32_t save_seq_no = DoTrace(kEventFlush,
PathString(
"Tracer", 6),
107 "flushed ring buffer");
108 while (atomic_read32(&flushed_) <= save_seq_no) {
112 atomic_cas32(&flush_immediately_, 0, 1);
115 retval = pthread_cond_signal(&sig_flush_);
119 GetTimespecRel(250, &timeout);
120 retval = pthread_mutex_lock(&sig_continue_trace_mutex_);
121 retval |= pthread_cond_timedwait(
122 &sig_continue_trace_, &sig_continue_trace_mutex_, &timeout);
123 retval |= pthread_mutex_unlock(&sig_continue_trace_mutex_);
124 assert(retval == ETIMEDOUT || retval == 0);
131 gettimeofday(&now, NULL);
132 int64_t nsecs = now.tv_usec * 1000 + (ms % 1000) * 1000 * 1000;
134 if (nsecs >= 1000 * 1000 * 1000) {
136 nsecs -= 1000 * 1000 * 1000;
138 ts->tv_sec = now.tv_sec + ms / 1000 + carry;
148 assert(f != NULL &&
"Could not open trace file");
149 struct timespec timeout;
158 retval = pthread_cond_timedwait(
165 while ((i <= tracer->flush_threshold_)
173 retval |= fputc(
',', f) -
',';
176 retval |= fputc(
',', f) -
',';
179 retval |= fputc(
',', f) -
',';
181 retval |= (fputc(13, f) - 13) | (fputc(10, f) - 10);
189 atomic_xadd32(&tracer->
flushed_, i);
199 || (atomic_read32(&tracer->
flushed_) < atomic_read32(&tracer->
seq_no_)));
209 int retval = pthread_create(&thread_flush_, NULL, MainFlush,
this);
213 DoTrace(kEventStart,
PathString(
"Tracer", 6),
"Trace buffer created");
222 , flush_threshold_(0)
224 , commit_buffer_(NULL) {
269 if ((retval = fputc(
'"', fp)) !=
'"')
272 for (
unsigned i = 0, l = field.length(); i < l; ++i) {
273 if (field[i] ==
'"') {
274 if ((retval = fputc(
'"', fp)) !=
'"')
277 if ((retval = fputc(field[i], fp)) != field[i])
281 if ((retval = fputc(
'"', fp)) !=
'"')
void Activate(const int buffer_size, const int flush_threshold, const std::string &trace_file)
void GetTimespecRel(const int64_t ms, timespec *ts)
static void * MainFlush(void *data)
pthread_mutex_t sig_flush_mutex_
assert((mem||(size==0))&&"Out Of Memory")
int WriteCsvFile(FILE *fp, const std::string &field)
pthread_cond_t sig_flush_
pthread_cond_t sig_continue_trace_
struct cvmcache_object_info __attribute__
atomic_int32 flush_immediately_
int32_t DoTrace(const int event, const PathString &path, const std::string &msg)
atomic_int32 terminate_flush_thread_
string StringifyInt(const int64_t value)
BufferEntry * ring_buffer_
std::string ToString() const
ShortString< kDefaultMaxPath, 0 > PathString
static const int kEventStop
pthread_mutex_t sig_continue_trace_mutex_
string StringifyTimeval(const timeval value)
atomic_int32 * commit_buffer_