GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/util/concurrency_impl.h
Date: 2025-07-06 02:35:01
Exec Total Coverage
Lines: 119 264 45.1%
Branches: 54 142 38.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_UTIL_CONCURRENCY_IMPL_H_
6 #define CVMFS_UTIL_CONCURRENCY_IMPL_H_
7
8 // clang-format off
9 // Only needed to let clang-tidy see the class definitions.
10 #ifndef CVMFS_UTIL_CONCURRENCY_H_
11 #include "util/concurrency.h"
12 #endif
13 // clang-format on
14
15 #include <cstddef>
16
17 #include "util/logging.h"
18
19 #ifdef CVMFS_NAMESPACE_GUARD
20 namespace CVMFS_NAMESPACE_GUARD {
21 #endif
22
23 //
24 // +----------------------------------------------------------------------------
25 // | SynchronizingCounter
26 //
27
28
29 template<typename T>
30 4374394421 void SynchronizingCounter<T>::SetValueUnprotected(const T new_value) {
31 // make sure that 0 <= new_value <= maximal_value_ if maximal_value_ != 0
32
5/6
✓ Branch 1 taken 38674631 times.
✓ Branch 2 taken 4335719790 times.
✓ Branch 3 taken 38637791 times.
✓ Branch 4 taken 36840 times.
✓ Branch 5 taken 38637791 times.
✗ Branch 6 not taken.
4374394421 assert(!HasMaximalValue()
33 || (new_value >= T(0) && new_value <= maximal_value_));
34
35 4374394421 value_ = new_value;
36
37
2/2
✓ Branch 0 taken 1339250 times.
✓ Branch 1 taken 4373055171 times.
4374394421 if (value_ == T(0)) {
38 1339250 pthread_cond_broadcast(&became_zero_);
39 }
40
41
6/6
✓ Branch 1 taken 38674631 times.
✓ Branch 2 taken 4335719790 times.
✓ Branch 3 taken 22560706 times.
✓ Branch 4 taken 16113925 times.
✓ Branch 5 taken 22560706 times.
✓ Branch 6 taken 4351833715 times.
4374394421 if (HasMaximalValue() && value_ < maximal_value_) {
42 22560706 pthread_cond_broadcast(&free_slot_);
43 }
44 4374394421 }
45
46
47 template<typename T>
48 2187196983 void SynchronizingCounter<T>::WaitForFreeSlotUnprotected() {
49
6/6
✓ Branch 1 taken 114664137 times.
✓ Branch 2 taken 2167859760 times.
✓ Branch 3 taken 95326914 times.
✓ Branch 4 taken 19337223 times.
✓ Branch 5 taken 95326914 times.
✓ Branch 6 taken 2187196983 times.
2282523897 while (HasMaximalValue() && value_ >= maximal_value_) {
50 95326914 pthread_cond_wait(&free_slot_, &mutex_);
51 }
52
3/4
✓ Branch 1 taken 19337223 times.
✓ Branch 2 taken 2167859760 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 19337223 times.
2187196983 assert(!HasMaximalValue() || value_ < maximal_value_);
53 2187196983 }
54
55
56 template<typename T>
57 3205 void SynchronizingCounter<T>::Initialize() {
58 3205 const bool init_successful = (pthread_mutex_init(&mutex_, NULL) == 0
59
1/2
✓ Branch 1 taken 3205 times.
✗ Branch 2 not taken.
3205 && pthread_cond_init(&became_zero_, NULL) == 0
60
2/4
✓ Branch 0 taken 3205 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 3205 times.
✗ Branch 4 not taken.
6410 && pthread_cond_init(&free_slot_, NULL) == 0);
61
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3205 times.
3205 assert(init_successful);
62 3205 }
63
64
65 template<typename T>
66 3204 void SynchronizingCounter<T>::Destroy() {
67 3204 pthread_mutex_destroy(&mutex_);
68 3204 pthread_cond_destroy(&became_zero_);
69 3204 pthread_cond_destroy(&free_slot_);
70 3204 }
71
72
73 //
74 // +----------------------------------------------------------------------------
75 // | Observable
76 //
77
78
79 template<typename ParamT>
80 28748222 Observable<ParamT>::Observable() {
81 28748222 const int ret = pthread_rwlock_init(&listeners_rw_lock_, NULL);
82
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 14375221 times.
28748222 assert(ret == 0);
83 28748222 }
84
85
86 template<typename ParamT>
87 28748550 Observable<ParamT>::~Observable() {
88 28748550 UnregisterListeners();
89 28747146 pthread_rwlock_destroy(&listeners_rw_lock_);
90 }
91
92
93 template<typename ParamT>
94 template<class DelegateT, class ClosureDataT>
95 7163059 typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener(
96 typename BoundClosure<ParamT, DelegateT, ClosureDataT>::CallbackMethod
97 method,
98 DelegateT *delegate,
99 ClosureDataT data) {
100 // create a new BoundClosure, register it and return the handle
101 7163059 CallbackBase<ParamT> *callback = Observable<ParamT>::MakeClosure(
102 method, delegate, data);
103 7163059 RegisterListener(callback);
104 7163059 return callback;
105 }
106
107
108 template<typename ParamT>
109 template<class DelegateT>
110 27692 typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener(
111 typename BoundCallback<ParamT, DelegateT>::CallbackMethod method,
112 DelegateT *delegate) {
113 // create a new BoundCallback, register it and return the handle
114 27692 CallbackBase<ParamT> *callback = Observable<ParamT>::MakeCallback(method,
115 delegate);
116 27692 RegisterListener(callback);
117 27692 return callback;
118 }
119
120
121 template<typename ParamT>
122 2610 typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener(
123 typename Callback<ParamT>::CallbackFunction fn) {
124 // create a new Callback, register it and return the handle
125 2610 CallbackBase<ParamT> *callback = Observable<ParamT>::MakeCallback(fn);
126 2610 RegisterListener(callback);
127 2610 return callback;
128 }
129
130
131 template<typename ParamT>
132 14356464 void Observable<ParamT>::RegisterListener(
133 Observable<ParamT>::CallbackPtr callback_object) {
134 // register a generic CallbackBase callback
135 14356464 WriteLockGuard guard(listeners_rw_lock_);
136
1/2
✓ Branch 1 taken 7179327 times.
✗ Branch 2 not taken.
14356464 listeners_.insert(callback_object);
137 14356464 }
138
139
140 template<typename ParamT>
141 1485 void Observable<ParamT>::UnregisterListener(
142 typename Observable<ParamT>::CallbackPtr callback_object) {
143 // remove a callback handle from the callbacks list
144 // if it is not registered --> crash
145 1485 WriteLockGuard guard(listeners_rw_lock_);
146
1/2
✓ Branch 1 taken 1485 times.
✗ Branch 2 not taken.
1485 const size_t was_removed = listeners_.erase(callback_object);
147
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1485 times.
1485 assert(was_removed > 0);
148
1/2
✓ Branch 0 taken 1485 times.
✗ Branch 1 not taken.
1485 delete callback_object;
149 1485 }
150
151
152 template<typename ParamT>
153 28734856 void Observable<ParamT>::UnregisterListeners() {
154 28734856 WriteLockGuard guard(listeners_rw_lock_);
155
156 // remove all callbacks from the list
157 28736104 typename Callbacks::const_iterator i = listeners_.begin();
158 28733842 typename Callbacks::const_iterator iend = listeners_.end();
159
2/2
✓ Branch 2 taken 7176078 times.
✓ Branch 3 taken 14372888 times.
43073412 for (; i != iend; ++i) {
160
1/2
✓ Branch 1 taken 7175844 times.
✗ Branch 2 not taken.
14338790 delete *i;
161 }
162 28731190 listeners_.clear();
163 28732048 }
164
165
166 template<typename ParamT>
167 74146828 void Observable<ParamT>::NotifyListeners(const ParamT &parameter) {
168 74146828 ReadLockGuard guard(listeners_rw_lock_);
169
170 // invoke all callbacks and inform them about new data
171 74145124 typename Callbacks::const_iterator i = listeners_.begin();
172 74139340 typename Callbacks::const_iterator iend = listeners_.end();
173
2/2
✓ Branch 2 taken 37559393 times.
✓ Branch 3 taken 37549559 times.
148272242 for (; i != iend; ++i) {
174
1/2
✓ Branch 2 taken 37559468 times.
✗ Branch 3 not taken.
74134276 (**i)(parameter);
175 }
176 74114608 }
177
178
179 //
180 // +----------------------------------------------------------------------------
181 // | FifoChannel
182 //
183
184
185 template<class T>
186 250 FifoChannel<T>::FifoChannel(const size_t maximal_length,
187 const size_t drainout_threshold)
188 250 : maximal_queue_length_(maximal_length)
189 250 , queue_drainout_threshold_(drainout_threshold) {
190
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 250 times.
250 assert(drainout_threshold <= maximal_length);
191
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 250 times.
250 assert(drainout_threshold > 0);
192
193 250 const bool successful = (pthread_mutex_init(&mutex_, NULL) == 0
194
1/2
✓ Branch 1 taken 250 times.
✗ Branch 2 not taken.
250 && pthread_cond_init(&queue_is_not_empty_, NULL) == 0
195
2/4
✓ Branch 0 taken 250 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 250 times.
✗ Branch 4 not taken.
500 && pthread_cond_init(&queue_is_not_full_, NULL)
196 == 0);
197
198
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 250 times.
250 assert(successful);
199 250 }
200
201
202 template<class T>
203 740 FifoChannel<T>::~FifoChannel() {
204 500 pthread_cond_destroy(&queue_is_not_empty_);
205 500 pthread_cond_destroy(&queue_is_not_full_);
206 500 pthread_mutex_destroy(&mutex_);
207 1240 }
208
209
210 template<class T>
211 5001280 void FifoChannel<T>::Enqueue(const T &data) {
212 5001280 MutexLockGuard lock(mutex_);
213
214 // wait for space in the queue
215
2/2
✓ Branch 1 taken 5 times.
✓ Branch 2 taken 5001280 times.
5001285 while (this->size() >= maximal_queue_length_) {
216
1/2
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
5 pthread_cond_wait(&queue_is_not_full_, &mutex_);
217 }
218
219 // put something into the queue
220
1/2
✓ Branch 1 taken 5001280 times.
✗ Branch 2 not taken.
5001280 this->push(data);
221
222 // wake all waiting threads
223 5001280 pthread_cond_broadcast(&queue_is_not_empty_);
224 5001280 }
225
226
227 template<class T>
228 4954255 const T FifoChannel<T>::Dequeue() {
229 4954255 MutexLockGuard lock(mutex_);
230
231 // wait until there is something to do
232
2/2
✓ Branch 1 taken 14385625 times.
✓ Branch 2 taken 5000785 times.
19386410 while (this->empty()) {
233
1/2
✓ Branch 1 taken 14385625 times.
✗ Branch 2 not taken.
14385625 pthread_cond_wait(&queue_is_not_empty_, &mutex_);
234 }
235
236 // get the item from the queue
237 5000785 T data = this->front();
238 5000785 this->pop();
239
240 // signal waiting threads about the free space
241
2/2
✓ Branch 1 taken 4999210 times.
✓ Branch 2 taken 1575 times.
5000785 if (this->size() < queue_drainout_threshold_) {
242 4999210 pthread_cond_broadcast(&queue_is_not_full_);
243 }
244
245 // return the acquired job
246 4958000 return data;
247 5000785 }
248
249
250 template<class T>
251 5 unsigned int FifoChannel<T>::Drop() {
252 5 MutexLockGuard lock(mutex_);
253
254 5 unsigned int dropped_items = 0;
255
2/2
✓ Branch 1 taken 495 times.
✓ Branch 2 taken 5 times.
500 while (!this->empty()) {
256 495 this->pop();
257 495 ++dropped_items;
258 }
259
260 5 pthread_cond_broadcast(&queue_is_not_full_);
261
262 10 return dropped_items;
263 5 }
264
265
266 template<class T>
267 10 size_t FifoChannel<T>::GetItemCount() const {
268 10 MutexLockGuard lock(mutex_);
269 10 return this->size();
270 10 }
271
272
273 template<class T>
274 25 bool FifoChannel<T>::IsEmpty() const {
275 25 MutexLockGuard lock(mutex_);
276 25 return this->empty();
277 25 }
278
279
280 template<class T>
281 5 size_t FifoChannel<T>::GetMaximalItemCount() const {
282 5 return maximal_queue_length_;
283 }
284
285
286 //
287 // +----------------------------------------------------------------------------
288 // | ConcurrentWorkers
289 //
290
291
292 template<class WorkerT>
293 ConcurrentWorkers<WorkerT>::ConcurrentWorkers(
294 const size_t number_of_workers,
295 const size_t maximal_queue_length,
296 ConcurrentWorkers<WorkerT>::worker_context_t *worker_context)
297 : number_of_workers_(number_of_workers)
298 , worker_context_(worker_context)
299 , thread_context_(this, worker_context_)
300 , initialized_(false)
301 , running_(false)
302 , workers_started_(0)
303 , jobs_queue_(maximal_queue_length, maximal_queue_length / 4 + 1)
304 , results_queue_(maximal_queue_length, 1) {
305 assert(maximal_queue_length >= number_of_workers);
306 assert(number_of_workers > 0);
307
308 atomic_init32(&jobs_pending_);
309 atomic_init32(&jobs_failed_);
310 atomic_init64(&jobs_processed_);
311 }
312
313
314 template<class WorkerT>
315 ConcurrentWorkers<WorkerT>::~ConcurrentWorkers() {
316 if (IsRunning()) {
317 Terminate();
318 }
319
320 // destroy some synchronisation data structures
321 pthread_cond_destroy(&worker_started_);
322 pthread_cond_destroy(&jobs_all_done_);
323 pthread_mutex_destroy(&status_mutex_);
324 pthread_mutex_destroy(&jobs_all_done_mutex_);
325 }
326
327
328 template<class WorkerT>
329 bool ConcurrentWorkers<WorkerT>::Initialize() {
330 LogCvmfs(kLogConcurrency, kLogVerboseMsg,
331 "Initializing ConcurrentWorker "
332 "object with %lu worker threads "
333 "and a queue length of %zu",
334 number_of_workers_, jobs_queue_.GetMaximalItemCount());
335 // LogCvmfs(kLogConcurrency, kLogStdout, "sizeof(expected_data_t): %d\n"
336 // "sizeof(returned_data_t): %d",
337 // sizeof(expected_data_t), sizeof(returned_data_t));
338
339 // initialize synchronisation for job queue (Workers)
340 if (pthread_mutex_init(&status_mutex_, NULL) != 0
341 || pthread_mutex_init(&jobs_all_done_mutex_, NULL) != 0
342 || pthread_cond_init(&worker_started_, NULL) != 0
343 || pthread_cond_init(&jobs_all_done_, NULL) != 0) {
344 return false;
345 }
346
347 // spawn the Worker objects in their own threads
348 if (!SpawnWorkers()) {
349 LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn workers");
350 return false;
351 }
352
353 // all done...
354 initialized_ = true;
355 return true;
356 }
357
358
359 template<class WorkerT>
360 bool ConcurrentWorkers<WorkerT>::SpawnWorkers() {
361 assert(worker_threads_.size() == 0);
362 worker_threads_.resize(number_of_workers_);
363
364 // set the running flag to trap workers in their treadmills
365 StartRunning();
366
367 // spawn the swarm and make them work
368 bool success = true;
369 WorkerThreads::iterator i = worker_threads_.begin();
370 WorkerThreads::const_iterator iend = worker_threads_.end();
371 for (; i != iend; ++i) {
372 pthread_t *thread = &(*i);
373 const int retval = pthread_create(
374 thread,
375 NULL,
376 &ConcurrentWorkers<WorkerT>::RunWorker,
377 reinterpret_cast<void *>(&thread_context_));
378 if (retval != 0) {
379 LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn a Worker");
380 success = false;
381 }
382 }
383
384 // spawn the callback processing thread
385 const int retval = pthread_create(
386 &callback_thread_,
387 NULL,
388 &ConcurrentWorkers<WorkerT>::RunCallbackThreadWrapper,
389 reinterpret_cast<void *>(&thread_context_));
390 if (retval != 0) {
391 LogCvmfs(kLogConcurrency, kLogWarning,
392 "Failed to spawn the callback "
393 "worker thread");
394 success = false;
395 }
396
397 // wait for all workers to report in...
398 {
399 MutexLockGuard guard(status_mutex_);
400 // +1 -> callback thread
401 while (workers_started_ < number_of_workers_ + 1) {
402 pthread_cond_wait(&worker_started_, &status_mutex_);
403 }
404 }
405
406 // all done...
407 return success;
408 }
409
410
411 template<class WorkerT>
412 void *ConcurrentWorkers<WorkerT>::RunWorker(void *run_binding) {
413 // NOTE: This is the actual worker thread code!
414
415 //
416 // INITIALIZATION
417 /////////////////
418
419 // get contextual information
420 const WorkerRunBinding &binding = *(
421 static_cast<WorkerRunBinding *>(run_binding));
422 ConcurrentWorkers<WorkerT> *master = binding.delegate;
423 const worker_context_t *worker_context = binding.worker_context;
424
425 // boot up the worker object and make sure it works
426 WorkerT worker(worker_context);
427 worker.RegisterMaster(master);
428 const bool init_success = worker.Initialize();
429
430 // tell the master that this worker was started
431 master->ReportStartedWorker();
432
433 if (!init_success) {
434 LogCvmfs(kLogConcurrency, kLogWarning,
435 "Worker was not initialized "
436 "properly... it will die now!");
437 return NULL;
438 }
439
440 //
441 // PROCESSING LOOP
442 //////////////////
443
444 // start the processing loop
445 LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Starting Worker...");
446 while (master->IsRunning()) {
447 // acquire a new job
448 WorkerJob job = master->Acquire();
449
450 // check if we need to terminate
451 if (job.is_death_sentence)
452 break;
453
454 // do what you are supposed to do
455 worker(job.data);
456 }
457
458 //
459 // TEAR DOWN
460 ////////////
461
462 // give the worker the chance to tidy up
463 worker.TearDown();
464
465 // good bye thread...
466 LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Terminating Worker...");
467 return NULL;
468 }
469
470
471 template<class WorkerT>
472 void *ConcurrentWorkers<WorkerT>::RunCallbackThreadWrapper(void *run_binding) {
473 const RunBinding &binding = *(static_cast<RunBinding *>(run_binding));
474 ConcurrentWorkers<WorkerT> *master = binding.delegate;
475
476 master->ReportStartedWorker();
477
478 LogCvmfs(kLogConcurrency, kLogVerboseMsg,
479 "Started dedicated callback worker");
480 master->RunCallbackThread();
481 LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Terminating Callback Worker...");
482
483 return NULL;
484 }
485
486
487 template<class WorkerT>
488 void ConcurrentWorkers<WorkerT>::RunCallbackThread() {
489 while (IsRunning()) {
490 const CallbackJob callback_job = results_queue_.Dequeue();
491
492 // stop callback processing if needed
493 if (callback_job.is_death_sentence) {
494 break;
495 }
496
497 // notify all observers about the finished job
498 this->NotifyListeners(callback_job.data);
499
500 // remove the job from the pending 'list' and add it to the ready 'list'
501 atomic_dec32(&jobs_pending_);
502 atomic_inc64(&jobs_processed_);
503
504 // signal the Spooler that all jobs are done...
505 if (atomic_read32(&jobs_pending_) == 0) {
506 pthread_cond_broadcast(&jobs_all_done_);
507 }
508 }
509 }
510
511
512 template<class WorkerT>
513 void ConcurrentWorkers<WorkerT>::ReportStartedWorker() const {
514 MutexLockGuard lock(status_mutex_);
515 ++workers_started_;
516 pthread_cond_signal(&worker_started_);
517 }
518
519
520 template<class WorkerT>
521 void ConcurrentWorkers<WorkerT>::Schedule(WorkerJob job) {
522 // Note: This method can be called from arbitrary threads. Thus we do not
523 // necessarily have just one producer in the system.
524
525 // check if it makes sense to schedule this job
526 if (!IsRunning() && !job.is_death_sentence) {
527 LogCvmfs(kLogConcurrency, kLogWarning,
528 "Tried to schedule a job but "
529 "concurrency was not running...");
530 return;
531 }
532
533 jobs_queue_.Enqueue(job);
534 if (!job.is_death_sentence) {
535 atomic_inc32(&jobs_pending_);
536 }
537 }
538
539
540 template<class WorkerT>
541 void ConcurrentWorkers<WorkerT>::ScheduleDeathSentences() {
542 assert(!IsRunning());
543
544 // make sure that the queue is empty before we schedule a death sentence
545 TruncateJobQueue();
546
547 // schedule a death sentence for each running thread
548 const unsigned int number_of_workers = GetNumberOfWorkers();
549 for (unsigned int i = 0; i < number_of_workers; ++i) {
550 Schedule(WorkerJob());
551 }
552
553 // schedule a death sentence for the callback thread
554 results_queue_.Enqueue(CallbackJob());
555 }
556
557
558 template<class WorkerT>
559 typename ConcurrentWorkers<WorkerT>::WorkerJob
560 ConcurrentWorkers<WorkerT>::Acquire() {
561 // Note: This method is exclusively called inside the worker threads!
562 // Any other usage might produce undefined behavior.
563 return jobs_queue_.Dequeue();
564 }
565
566
567 template<class WorkerT>
568 void ConcurrentWorkers<WorkerT>::TruncateJobQueue(const bool forget_pending) {
569 // Note: This method will throw away all jobs currently waiting in the job
570 // queue. These jobs will not be processed!
571 const unsigned int dropped_jobs = jobs_queue_.Drop();
572
573 // if desired, we remove the jobs from the pending 'list'
574 if (forget_pending) {
575 atomic_xadd32(&jobs_pending_, -dropped_jobs);
576 }
577 }
578
579
580 template<class WorkerT>
581 void ConcurrentWorkers<WorkerT>::Terminate() {
582 // Note: this method causes workers to die immediately after they finished
583 // their last acquired job. To make sure that each worker will check
584 // the running state, we schedule empty jobs or Death Sentences.
585
586 assert(IsRunning());
587
588 // unset the running flag (causing threads to die on the next checkpoint)
589 StopRunning();
590
591 // schedule empty jobs to make sure that each worker will actually reach the
592 // next checkpoint in their processing loop and terminate as expected
593 ScheduleDeathSentences();
594
595 // wait for the worker threads to return
596 WorkerThreads::const_iterator i = worker_threads_.begin();
597 WorkerThreads::const_iterator iend = worker_threads_.end();
598 for (; i != iend; ++i) {
599 pthread_join(*i, NULL);
600 }
601
602 // wait for the callback worker thread
603 pthread_join(callback_thread_, NULL);
604
605 // check if we finished all pending jobs
606 const int pending = atomic_read32(&jobs_pending_);
607 if (pending > 0) {
608 LogCvmfs(kLogConcurrency, kLogWarning,
609 "Job queue was not fully processed. "
610 "Still %d jobs were pending and "
611 "will not be executed anymore.",
612 pending);
613 }
614
615 // check if we had failed jobs
616 const int failed = atomic_read32(&jobs_failed_);
617 if (failed > 0) {
618 LogCvmfs(kLogConcurrency, kLogWarning, "We've had %d failed jobs.", failed);
619 }
620
621 // thanks, and good bye...
622 LogCvmfs(kLogConcurrency, kLogVerboseMsg,
623 "All workers stopped. They processed %ld jobs. Terminating...",
624 atomic_read64(&jobs_processed_));
625 }
626
627
628 template<class WorkerT>
629 void ConcurrentWorkers<WorkerT>::WaitForEmptyQueue() const {
630 LogCvmfs(kLogConcurrency, kLogVerboseMsg,
631 "Waiting for %d jobs to be finished", atomic_read32(&jobs_pending_));
632
633 // wait until all pending jobs are processed
634 {
635 MutexLockGuard lock(jobs_all_done_mutex_);
636 while (atomic_read32(&jobs_pending_) > 0) {
637 pthread_cond_wait(&jobs_all_done_, &jobs_all_done_mutex_);
638 }
639 }
640
641 LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Jobs are done... go on");
642 }
643
644
645 template<class WorkerT>
646 void ConcurrentWorkers<WorkerT>::WaitForTermination() {
647 if (!IsRunning())
648 return;
649
650 WaitForEmptyQueue();
651 Terminate();
652 }
653
654
655 template<class WorkerT>
656 void ConcurrentWorkers<WorkerT>::JobDone(
657 const ConcurrentWorkers<WorkerT>::returned_data_t &data,
658 const bool success) {
659 // BEWARE!
660 // This is a callback method that might be called from a different thread!
661
662 // check if the finished job was successful
663 if (!success) {
664 atomic_inc32(&jobs_failed_);
665 LogCvmfs(kLogConcurrency, kLogWarning, "Job failed");
666 }
667
668 // queue the result in the callback channel
669 results_queue_.Enqueue(CallbackJob(data));
670 }
671
672 #ifdef CVMFS_NAMESPACE_GUARD
673 } // namespace CVMFS_NAMESPACE_GUARD
674 #endif
675
676 #endif // CVMFS_UTIL_CONCURRENCY_IMPL_H_
677