GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/util/concurrency_impl.h
Date: 2025-08-03 02:35:45
Exec Total Coverage
Lines: 118 264 44.7%
Branches: 52 142 36.6%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_UTIL_CONCURRENCY_IMPL_H_
6 #define CVMFS_UTIL_CONCURRENCY_IMPL_H_
7
8 // clang-format off
9 // Only needed to let clang-tidy see the class definitions.
10 #ifndef CVMFS_UTIL_CONCURRENCY_H_
11 #include "util/concurrency.h"
12 #endif
13 // clang-format on
14
15 #include <cstddef>
16
17 #include "util/logging.h"
18
19 #ifdef CVMFS_NAMESPACE_GUARD
20 namespace CVMFS_NAMESPACE_GUARD {
21 #endif
22
23 //
24 // +----------------------------------------------------------------------------
25 // | SynchronizingCounter
26 //
27
28
29 template<typename T>
30 5928692494 void SynchronizingCounter<T>::SetValueUnprotected(const T new_value) {
31 // make sure that 0 <= new_value <= maximal_value_ if maximal_value_ != 0
32
5/6
✓ Branch 1 taken 3208781 times.
✓ Branch 2 taken 5925483713 times.
✓ Branch 3 taken 3176853 times.
✓ Branch 4 taken 31928 times.
✓ Branch 5 taken 3176853 times.
✗ Branch 6 not taken.
5928692494 assert(!HasMaximalValue()
33 || (new_value >= T(0) && new_value <= maximal_value_));
34
35 5928692494 value_ = new_value;
36
37
2/2
✓ Branch 0 taken 121136 times.
✓ Branch 1 taken 5928571358 times.
5928692494 if (value_ == T(0)) {
38 121136 pthread_cond_broadcast(&became_zero_);
39 }
40
41
6/6
✓ Branch 1 taken 3208781 times.
✓ Branch 2 taken 5925483713 times.
✓ Branch 3 taken 2613428 times.
✓ Branch 4 taken 595353 times.
✓ Branch 5 taken 2613428 times.
✓ Branch 6 taken 5926079066 times.
5928692494 if (HasMaximalValue() && value_ < maximal_value_) {
42 2613428 pthread_cond_broadcast(&free_slot_);
43 }
44 5928692494 }
45
46
47 template<typename T>
48 2964346000 void SynchronizingCounter<T>::WaitForFreeSlotUnprotected() {
49
6/6
✓ Branch 1 taken 4904804 times.
✓ Branch 2 taken 2962741672 times.
✓ Branch 3 taken 3300476 times.
✓ Branch 4 taken 1604328 times.
✓ Branch 5 taken 3300476 times.
✓ Branch 6 taken 2964346000 times.
2967646476 while (HasMaximalValue() && value_ >= maximal_value_) {
50 3300476 pthread_cond_wait(&free_slot_, &mutex_);
51 }
52
3/4
✓ Branch 1 taken 1604328 times.
✓ Branch 2 taken 2962741672 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 1604328 times.
2964346000 assert(!HasMaximalValue() || value_ < maximal_value_);
53 2964346000 }
54
55
56 template<typename T>
57 3619 void SynchronizingCounter<T>::Initialize() {
58 3619 const bool init_successful = (pthread_mutex_init(&mutex_, NULL) == 0
59
1/2
✓ Branch 1 taken 3619 times.
✗ Branch 2 not taken.
3619 && pthread_cond_init(&became_zero_, NULL) == 0
60
2/4
✓ Branch 0 taken 3619 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 3619 times.
✗ Branch 4 not taken.
7238 && pthread_cond_init(&free_slot_, NULL) == 0);
61
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3619 times.
3619 assert(init_successful);
62 3619 }
63
64
65 template<typename T>
66 3618 void SynchronizingCounter<T>::Destroy() {
67 3618 pthread_mutex_destroy(&mutex_);
68 3618 pthread_cond_destroy(&became_zero_);
69 3618 pthread_cond_destroy(&free_slot_);
70 3618 }
71
72
73 //
74 // +----------------------------------------------------------------------------
75 // | Observable
76 //
77
78
79 template<typename ParamT>
80 19924926 Observable<ParamT>::Observable() {
81 19924926 const int ret = pthread_rwlock_init(&listeners_rw_lock_, NULL);
82
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9963394 times.
19924926 assert(ret == 0);
83 19924926 }
84
85
86 template<typename ParamT>
87 19924068 Observable<ParamT>::~Observable() {
88 19924068 UnregisterListeners();
89 19924500 pthread_rwlock_destroy(&listeners_rw_lock_);
90 }
91
92
93 template<typename ParamT>
94 template<class DelegateT, class ClosureDataT>
95 4961051 typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener(
96 typename BoundClosure<ParamT, DelegateT, ClosureDataT>::CallbackMethod
97 method,
98 DelegateT *delegate,
99 ClosureDataT data) {
100 // create a new BoundClosure, register it and return the handle
101 4961051 CallbackBase<ParamT> *callback = Observable<ParamT>::MakeClosure(
102 method, delegate, data);
103 4961051 RegisterListener(callback);
104 4961051 return callback;
105 }
106
107
108 template<typename ParamT>
109 template<class DelegateT>
110 24253 typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener(
111 typename BoundCallback<ParamT, DelegateT>::CallbackMethod method,
112 DelegateT *delegate) {
113 // create a new BoundCallback, register it and return the handle
114 24253 CallbackBase<ParamT> *callback = Observable<ParamT>::MakeCallback(method,
115 delegate);
116 24253 RegisterListener(callback);
117 24253 return callback;
118 }
119
120
121 template<typename ParamT>
122 2083 typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener(
123 typename Callback<ParamT>::CallbackFunction fn) {
124 // create a new Callback, register it and return the handle
125 2083 CallbackBase<ParamT> *callback = Observable<ParamT>::MakeCallback(fn);
126 2083 RegisterListener(callback);
127 2083 return callback;
128 }
129
130
131 template<typename ParamT>
132 9947997 void Observable<ParamT>::RegisterListener(
133 Observable<ParamT>::CallbackPtr callback_object) {
134 // register a generic CallbackBase callback
135 9947997 WriteLockGuard guard(listeners_rw_lock_);
136
1/2
✓ Branch 1 taken 4974917 times.
✗ Branch 2 not taken.
9947997 listeners_.insert(callback_object);
137 9947997 }
138
139
140 template<typename ParamT>
141 2043 void Observable<ParamT>::UnregisterListener(
142 typename Observable<ParamT>::CallbackPtr callback_object) {
143 // remove a callback handle from the callbacks list
144 // if it is not registered --> crash
145 2043 WriteLockGuard guard(listeners_rw_lock_);
146
1/2
✓ Branch 1 taken 2043 times.
✗ Branch 2 not taken.
2043 const size_t was_removed = listeners_.erase(callback_object);
147
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2043 times.
2043 assert(was_removed > 0);
148
1/2
✓ Branch 0 taken 2043 times.
✗ Branch 1 not taken.
2043 delete callback_object;
149 2043 }
150
151
152 template<typename ParamT>
153 19913716 void Observable<ParamT>::UnregisterListeners() {
154 19913716 WriteLockGuard guard(listeners_rw_lock_);
155
156 // remove all callbacks from the list
157 19915552 typename Callbacks::const_iterator i = listeners_.begin();
158 19913932 typename Callbacks::const_iterator iend = listeners_.end();
159
2/2
✓ Branch 2 taken 4971272 times.
✓ Branch 3 taken 9962043 times.
29845985 for (; i != iend; ++i) {
160
1/2
✓ Branch 1 taken 4971056 times.
✗ Branch 2 not taken.
9931297 delete *i;
161 }
162 19911826 listeners_.clear();
163 19911934 }
164
165
166 template<typename ParamT>
167 31729821 void Observable<ParamT>::NotifyListeners(const ParamT &parameter) {
168 31729821 ReadLockGuard guard(listeners_rw_lock_);
169
170 // invoke all callbacks and inform them about new data
171 31729813 typename Callbacks::const_iterator i = listeners_.begin();
172 31728887 typename Callbacks::const_iterator iend = listeners_.end();
173
2/2
✓ Branch 2 taken 16259168 times.
✓ Branch 3 taken 16257386 times.
63453874 for (; i != iend; ++i) {
174
1/2
✓ Branch 2 taken 16259361 times.
✗ Branch 3 not taken.
31725149 (**i)(parameter);
175 }
176 31721585 }
177
178
179 //
180 // +----------------------------------------------------------------------------
181 // | FifoChannel
182 //
183
184
185 template<class T>
186 354 FifoChannel<T>::FifoChannel(const size_t maximal_length,
187 const size_t drainout_threshold)
188 354 : maximal_queue_length_(maximal_length)
189 354 , queue_drainout_threshold_(drainout_threshold) {
190
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 354 times.
354 assert(drainout_threshold <= maximal_length);
191
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 354 times.
354 assert(drainout_threshold > 0);
192
193 354 const bool successful = (pthread_mutex_init(&mutex_, NULL) == 0
194
1/2
✓ Branch 1 taken 354 times.
✗ Branch 2 not taken.
354 && pthread_cond_init(&queue_is_not_empty_, NULL) == 0
195
2/4
✓ Branch 0 taken 354 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 354 times.
✗ Branch 4 not taken.
708 && pthread_cond_init(&queue_is_not_full_, NULL)
196 == 0);
197
198
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 354 times.
354 assert(successful);
199 354 }
200
201
202 template<class T>
203 1044 FifoChannel<T>::~FifoChannel() {
204 708 pthread_cond_destroy(&queue_is_not_empty_);
205 708 pthread_cond_destroy(&queue_is_not_full_);
206 708 pthread_mutex_destroy(&mutex_);
207 1752 }
208
209
210 template<class T>
211 37007976 void FifoChannel<T>::Enqueue(const T &data) {
212 37007976 MutexLockGuard lock(mutex_);
213
214 // wait for space in the queue
215
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 37007976 times.
37007976 while (this->size() >= maximal_queue_length_) {
216 pthread_cond_wait(&queue_is_not_full_, &mutex_);
217 }
218
219 // put something into the queue
220
1/2
✓ Branch 1 taken 37007976 times.
✗ Branch 2 not taken.
37007976 this->push(data);
221
222 // wake all waiting threads
223 37007976 pthread_cond_broadcast(&queue_is_not_empty_);
224 37007976 }
225
226
227 template<class T>
228 36517837 const T FifoChannel<T>::Dequeue() {
229 36517837 MutexLockGuard lock(mutex_);
230
231 // wait until there is something to do
232
2/2
✓ Branch 1 taken 115658903 times.
✓ Branch 2 taken 37004313 times.
152663216 while (this->empty()) {
233
1/2
✓ Branch 1 taken 115658903 times.
✗ Branch 2 not taken.
115658903 pthread_cond_wait(&queue_is_not_empty_, &mutex_);
234 }
235
236 // get the item from the queue
237 37004313 T data = this->front();
238 37004313 this->pop();
239
240 // signal waiting threads about the free space
241
2/2
✓ Branch 1 taken 36990660 times.
✓ Branch 2 taken 13653 times.
37004313 if (this->size() < queue_drainout_threshold_) {
242 36990660 pthread_cond_broadcast(&queue_is_not_full_);
243 }
244
245 // return the acquired job
246 36568231 return data;
247 37004313 }
248
249
250 template<class T>
251 37 unsigned int FifoChannel<T>::Drop() {
252 37 MutexLockGuard lock(mutex_);
253
254 37 unsigned int dropped_items = 0;
255
2/2
✓ Branch 1 taken 3663 times.
✓ Branch 2 taken 37 times.
3700 while (!this->empty()) {
256 3663 this->pop();
257 3663 ++dropped_items;
258 }
259
260 37 pthread_cond_broadcast(&queue_is_not_full_);
261
262 74 return dropped_items;
263 37 }
264
265
266 template<class T>
267 74 size_t FifoChannel<T>::GetItemCount() const {
268 74 MutexLockGuard lock(mutex_);
269 74 return this->size();
270 74 }
271
272
273 template<class T>
274 185 bool FifoChannel<T>::IsEmpty() const {
275 185 MutexLockGuard lock(mutex_);
276 185 return this->empty();
277 185 }
278
279
280 template<class T>
281 37 size_t FifoChannel<T>::GetMaximalItemCount() const {
282 37 return maximal_queue_length_;
283 }
284
285
286 //
287 // +----------------------------------------------------------------------------
288 // | ConcurrentWorkers
289 //
290
291
292 template<class WorkerT>
293 ConcurrentWorkers<WorkerT>::ConcurrentWorkers(
294 const size_t number_of_workers,
295 const size_t maximal_queue_length,
296 ConcurrentWorkers<WorkerT>::worker_context_t *worker_context)
297 : number_of_workers_(number_of_workers)
298 , worker_context_(worker_context)
299 , thread_context_(this, worker_context_)
300 , initialized_(false)
301 , running_(false)
302 , workers_started_(0)
303 , jobs_queue_(maximal_queue_length, maximal_queue_length / 4 + 1)
304 , results_queue_(maximal_queue_length, 1) {
305 assert(maximal_queue_length >= number_of_workers);
306 assert(number_of_workers > 0);
307
308 atomic_init32(&jobs_pending_);
309 atomic_init32(&jobs_failed_);
310 atomic_init64(&jobs_processed_);
311 }
312
313
314 template<class WorkerT>
315 ConcurrentWorkers<WorkerT>::~ConcurrentWorkers() {
316 if (IsRunning()) {
317 Terminate();
318 }
319
320 // destroy some synchronisation data structures
321 pthread_cond_destroy(&worker_started_);
322 pthread_cond_destroy(&jobs_all_done_);
323 pthread_mutex_destroy(&status_mutex_);
324 pthread_mutex_destroy(&jobs_all_done_mutex_);
325 }
326
327
328 template<class WorkerT>
329 bool ConcurrentWorkers<WorkerT>::Initialize() {
330 LogCvmfs(kLogConcurrency, kLogVerboseMsg,
331 "Initializing ConcurrentWorker "
332 "object with %lu worker threads "
333 "and a queue length of %zu",
334 number_of_workers_, jobs_queue_.GetMaximalItemCount());
335 // LogCvmfs(kLogConcurrency, kLogStdout, "sizeof(expected_data_t): %d\n"
336 // "sizeof(returned_data_t): %d",
337 // sizeof(expected_data_t), sizeof(returned_data_t));
338
339 // initialize synchronisation for job queue (Workers)
340 if (pthread_mutex_init(&status_mutex_, NULL) != 0
341 || pthread_mutex_init(&jobs_all_done_mutex_, NULL) != 0
342 || pthread_cond_init(&worker_started_, NULL) != 0
343 || pthread_cond_init(&jobs_all_done_, NULL) != 0) {
344 return false;
345 }
346
347 // spawn the Worker objects in their own threads
348 if (!SpawnWorkers()) {
349 LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn workers");
350 return false;
351 }
352
353 // all done...
354 initialized_ = true;
355 return true;
356 }
357
358
359 template<class WorkerT>
360 bool ConcurrentWorkers<WorkerT>::SpawnWorkers() {
361 assert(worker_threads_.size() == 0);
362 worker_threads_.resize(number_of_workers_);
363
364 // set the running flag to trap workers in their treadmills
365 StartRunning();
366
367 // spawn the swarm and make them work
368 bool success = true;
369 WorkerThreads::iterator i = worker_threads_.begin();
370 WorkerThreads::const_iterator iend = worker_threads_.end();
371 for (; i != iend; ++i) {
372 pthread_t *thread = &(*i);
373 const int retval = pthread_create(
374 thread,
375 NULL,
376 &ConcurrentWorkers<WorkerT>::RunWorker,
377 reinterpret_cast<void *>(&thread_context_));
378 if (retval != 0) {
379 LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn a Worker");
380 success = false;
381 }
382 }
383
384 // spawn the callback processing thread
385 const int retval = pthread_create(
386 &callback_thread_,
387 NULL,
388 &ConcurrentWorkers<WorkerT>::RunCallbackThreadWrapper,
389 reinterpret_cast<void *>(&thread_context_));
390 if (retval != 0) {
391 LogCvmfs(kLogConcurrency, kLogWarning,
392 "Failed to spawn the callback "
393 "worker thread");
394 success = false;
395 }
396
397 // wait for all workers to report in...
398 {
399 MutexLockGuard guard(status_mutex_);
400 // +1 -> callback thread
401 while (workers_started_ < number_of_workers_ + 1) {
402 pthread_cond_wait(&worker_started_, &status_mutex_);
403 }
404 }
405
406 // all done...
407 return success;
408 }
409
410
411 template<class WorkerT>
412 void *ConcurrentWorkers<WorkerT>::RunWorker(void *run_binding) {
413 // NOTE: This is the actual worker thread code!
414
415 //
416 // INITIALIZATION
417 /////////////////
418
419 // get contextual information
420 const WorkerRunBinding &binding = *(
421 static_cast<WorkerRunBinding *>(run_binding));
422 ConcurrentWorkers<WorkerT> *master = binding.delegate;
423 const worker_context_t *worker_context = binding.worker_context;
424
425 // boot up the worker object and make sure it works
426 WorkerT worker(worker_context);
427 worker.RegisterMaster(master);
428 const bool init_success = worker.Initialize();
429
430 // tell the master that this worker was started
431 master->ReportStartedWorker();
432
433 if (!init_success) {
434 LogCvmfs(kLogConcurrency, kLogWarning,
435 "Worker was not initialized "
436 "properly... it will die now!");
437 return NULL;
438 }
439
440 //
441 // PROCESSING LOOP
442 //////////////////
443
444 // start the processing loop
445 LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Starting Worker...");
446 while (master->IsRunning()) {
447 // acquire a new job
448 WorkerJob job = master->Acquire();
449
450 // check if we need to terminate
451 if (job.is_death_sentence)
452 break;
453
454 // do what you are supposed to do
455 worker(job.data);
456 }
457
458 //
459 // TEAR DOWN
460 ////////////
461
462 // give the worker the chance to tidy up
463 worker.TearDown();
464
465 // good bye thread...
466 LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Terminating Worker...");
467 return NULL;
468 }
469
470
471 template<class WorkerT>
472 void *ConcurrentWorkers<WorkerT>::RunCallbackThreadWrapper(void *run_binding) {
473 const RunBinding &binding = *(static_cast<RunBinding *>(run_binding));
474 ConcurrentWorkers<WorkerT> *master = binding.delegate;
475
476 master->ReportStartedWorker();
477
478 LogCvmfs(kLogConcurrency, kLogVerboseMsg,
479 "Started dedicated callback worker");
480 master->RunCallbackThread();
481 LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Terminating Callback Worker...");
482
483 return NULL;
484 }
485
486
487 template<class WorkerT>
488 void ConcurrentWorkers<WorkerT>::RunCallbackThread() {
489 while (IsRunning()) {
490 const CallbackJob callback_job = results_queue_.Dequeue();
491
492 // stop callback processing if needed
493 if (callback_job.is_death_sentence) {
494 break;
495 }
496
497 // notify all observers about the finished job
498 this->NotifyListeners(callback_job.data);
499
500 // remove the job from the pending 'list' and add it to the ready 'list'
501 atomic_dec32(&jobs_pending_);
502 atomic_inc64(&jobs_processed_);
503
504 // signal the Spooler that all jobs are done...
505 if (atomic_read32(&jobs_pending_) == 0) {
506 pthread_cond_broadcast(&jobs_all_done_);
507 }
508 }
509 }
510
511
512 template<class WorkerT>
513 void ConcurrentWorkers<WorkerT>::ReportStartedWorker() const {
514 MutexLockGuard lock(status_mutex_);
515 ++workers_started_;
516 pthread_cond_signal(&worker_started_);
517 }
518
519
520 template<class WorkerT>
521 void ConcurrentWorkers<WorkerT>::Schedule(WorkerJob job) {
522 // Note: This method can be called from arbitrary threads. Thus we do not
523 // necessarily have just one producer in the system.
524
525 // check if it makes sense to schedule this job
526 if (!IsRunning() && !job.is_death_sentence) {
527 LogCvmfs(kLogConcurrency, kLogWarning,
528 "Tried to schedule a job but "
529 "concurrency was not running...");
530 return;
531 }
532
533 jobs_queue_.Enqueue(job);
534 if (!job.is_death_sentence) {
535 atomic_inc32(&jobs_pending_);
536 }
537 }
538
539
540 template<class WorkerT>
541 void ConcurrentWorkers<WorkerT>::ScheduleDeathSentences() {
542 assert(!IsRunning());
543
544 // make sure that the queue is empty before we schedule a death sentence
545 TruncateJobQueue();
546
547 // schedule a death sentence for each running thread
548 const unsigned int number_of_workers = GetNumberOfWorkers();
549 for (unsigned int i = 0; i < number_of_workers; ++i) {
550 Schedule(WorkerJob());
551 }
552
553 // schedule a death sentence for the callback thread
554 results_queue_.Enqueue(CallbackJob());
555 }
556
557
558 template<class WorkerT>
559 typename ConcurrentWorkers<WorkerT>::WorkerJob
560 ConcurrentWorkers<WorkerT>::Acquire() {
561 // Note: This method is exclusively called inside the worker threads!
562 // Any other usage might produce undefined behavior.
563 return jobs_queue_.Dequeue();
564 }
565
566
567 template<class WorkerT>
568 void ConcurrentWorkers<WorkerT>::TruncateJobQueue(const bool forget_pending) {
569 // Note: This method will throw away all jobs currently waiting in the job
570 // queue. These jobs will not be processed!
571 const unsigned int dropped_jobs = jobs_queue_.Drop();
572
573 // if desired, we remove the jobs from the pending 'list'
574 if (forget_pending) {
575 atomic_xadd32(&jobs_pending_, -dropped_jobs);
576 }
577 }
578
579
580 template<class WorkerT>
581 void ConcurrentWorkers<WorkerT>::Terminate() {
582 // Note: this method causes workers to die immediately after they finished
583 // their last acquired job. To make sure that each worker will check
584 // the running state, we schedule empty jobs or Death Sentences.
585
586 assert(IsRunning());
587
588 // unset the running flag (causing threads to die on the next checkpoint)
589 StopRunning();
590
591 // schedule empty jobs to make sure that each worker will actually reach the
592 // next checkpoint in their processing loop and terminate as expected
593 ScheduleDeathSentences();
594
595 // wait for the worker threads to return
596 WorkerThreads::const_iterator i = worker_threads_.begin();
597 WorkerThreads::const_iterator iend = worker_threads_.end();
598 for (; i != iend; ++i) {
599 pthread_join(*i, NULL);
600 }
601
602 // wait for the callback worker thread
603 pthread_join(callback_thread_, NULL);
604
605 // check if we finished all pending jobs
606 const int pending = atomic_read32(&jobs_pending_);
607 if (pending > 0) {
608 LogCvmfs(kLogConcurrency, kLogWarning,
609 "Job queue was not fully processed. "
610 "Still %d jobs were pending and "
611 "will not be executed anymore.",
612 pending);
613 }
614
615 // check if we had failed jobs
616 const int failed = atomic_read32(&jobs_failed_);
617 if (failed > 0) {
618 LogCvmfs(kLogConcurrency, kLogWarning, "We've had %d failed jobs.", failed);
619 }
620
621 // thanks, and good bye...
622 LogCvmfs(kLogConcurrency, kLogVerboseMsg,
623 "All workers stopped. They processed %ld jobs. Terminating...",
624 atomic_read64(&jobs_processed_));
625 }
626
627
628 template<class WorkerT>
629 void ConcurrentWorkers<WorkerT>::WaitForEmptyQueue() const {
630 LogCvmfs(kLogConcurrency, kLogVerboseMsg,
631 "Waiting for %d jobs to be finished", atomic_read32(&jobs_pending_));
632
633 // wait until all pending jobs are processed
634 {
635 MutexLockGuard lock(jobs_all_done_mutex_);
636 while (atomic_read32(&jobs_pending_) > 0) {
637 pthread_cond_wait(&jobs_all_done_, &jobs_all_done_mutex_);
638 }
639 }
640
641 LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Jobs are done... go on");
642 }
643
644
645 template<class WorkerT>
646 void ConcurrentWorkers<WorkerT>::WaitForTermination() {
647 if (!IsRunning())
648 return;
649
650 WaitForEmptyQueue();
651 Terminate();
652 }
653
654
655 template<class WorkerT>
656 void ConcurrentWorkers<WorkerT>::JobDone(
657 const ConcurrentWorkers<WorkerT>::returned_data_t &data,
658 const bool success) {
659 // BEWARE!
660 // This is a callback method that might be called from a different thread!
661
662 // check if the finished job was successful
663 if (!success) {
664 atomic_inc32(&jobs_failed_);
665 LogCvmfs(kLogConcurrency, kLogWarning, "Job failed");
666 }
667
668 // queue the result in the callback channel
669 results_queue_.Enqueue(CallbackJob(data));
670 }
671
672 #ifdef CVMFS_NAMESPACE_GUARD
673 } // namespace CVMFS_NAMESPACE_GUARD
674 #endif
675
676 #endif // CVMFS_UTIL_CONCURRENCY_IMPL_H_
677