GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/util/concurrency_impl.h
Date: 2025-06-22 02:36:02
Exec Total Coverage
Lines: 119 264 45.1%
Branches: 54 142 38.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_UTIL_CONCURRENCY_IMPL_H_
6 #define CVMFS_UTIL_CONCURRENCY_IMPL_H_
7
8 #include "util/logging.h"
9
10 #ifdef CVMFS_NAMESPACE_GUARD
11 namespace CVMFS_NAMESPACE_GUARD {
12 #endif
13
14 //
15 // +----------------------------------------------------------------------------
16 // | SynchronizingCounter
17 //
18
19
20 template<typename T>
21 1303124860 void SynchronizingCounter<T>::SetValueUnprotected(const T new_value) {
22 // make sure that 0 <= new_value <= maximal_value_ if maximal_value_ != 0
23
5/6
✓ Branch 1 taken 2408923 times.
✓ Branch 2 taken 1300715937 times.
✓ Branch 3 taken 2391731 times.
✓ Branch 4 taken 17192 times.
✓ Branch 5 taken 2391731 times.
✗ Branch 6 not taken.
1303124860 assert(!HasMaximalValue()
24 || (new_value >= T(0) && new_value <= maximal_value_));
25
26 1303124860 value_ = new_value;
27
28
2/2
✓ Branch 0 taken 86795 times.
✓ Branch 1 taken 1303038065 times.
1303124860 if (value_ == T(0)) {
29 86795 pthread_cond_broadcast(&became_zero_);
30 }
31
32
6/6
✓ Branch 1 taken 2408923 times.
✓ Branch 2 taken 1300715937 times.
✓ Branch 3 taken 1813891 times.
✓ Branch 4 taken 595032 times.
✓ Branch 5 taken 1813891 times.
✓ Branch 6 taken 1301310969 times.
1303124860 if (HasMaximalValue() && value_ < maximal_value_) {
33 1813891 pthread_cond_broadcast(&free_slot_);
34 }
35 1303124860 }
36
37
38 template<typename T>
39 651562387 void SynchronizingCounter<T>::WaitForFreeSlotUnprotected() {
40
6/6
✓ Branch 1 taken 4720078 times.
✓ Branch 2 taken 650357928 times.
✓ Branch 3 taken 3515619 times.
✓ Branch 4 taken 1204459 times.
✓ Branch 5 taken 3515619 times.
✓ Branch 6 taken 651562387 times.
655078006 while (HasMaximalValue() && value_ >= maximal_value_) {
41 3515619 pthread_cond_wait(&free_slot_, &mutex_);
42 }
43
3/4
✓ Branch 1 taken 1204459 times.
✓ Branch 2 taken 650357928 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 1204459 times.
651562387 assert(!HasMaximalValue() || value_ < maximal_value_);
44 651562387 }
45
46
47 template<typename T>
48 2962 void SynchronizingCounter<T>::Initialize() {
49 2962 const bool init_successful = (pthread_mutex_init(&mutex_, NULL) == 0
50
1/2
✓ Branch 1 taken 2962 times.
✗ Branch 2 not taken.
2962 && pthread_cond_init(&became_zero_, NULL) == 0
51
2/4
✓ Branch 0 taken 2962 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 2962 times.
✗ Branch 4 not taken.
5924 && pthread_cond_init(&free_slot_, NULL) == 0);
52
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2962 times.
2962 assert(init_successful);
53 2962 }
54
55
56 template<typename T>
57 2961 void SynchronizingCounter<T>::Destroy() {
58 2961 pthread_mutex_destroy(&mutex_);
59 2961 pthread_cond_destroy(&became_zero_);
60 2961 pthread_cond_destroy(&free_slot_);
61 2961 }
62
63
64 //
65 // +----------------------------------------------------------------------------
66 // | Observable
67 //
68
69
70 template<typename ParamT>
71 36106223 Observable<ParamT>::Observable() {
72 36106223 const int ret = pthread_rwlock_init(&listeners_rw_lock_, NULL);
73
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18053912 times.
36106223 assert(ret == 0);
74 36106223 }
75
76
77 template<typename ParamT>
78 36103296 Observable<ParamT>::~Observable() {
79 36103296 UnregisterListeners();
80 36102414 pthread_rwlock_destroy(&listeners_rw_lock_);
81 }
82
83
84 template<typename ParamT>
85 template<class DelegateT, class ClosureDataT>
86 8999867 typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener(
87 typename BoundClosure<ParamT, DelegateT, ClosureDataT>::CallbackMethod
88 method,
89 DelegateT *delegate,
90 ClosureDataT data) {
91 // create a new BoundClosure, register it and return the handle
92 8999867 CallbackBase<ParamT> *callback = Observable<ParamT>::MakeClosure(
93 method, delegate, data);
94 8999867 RegisterListener(callback);
95 8999867 return callback;
96 }
97
98
99 template<typename ParamT>
100 template<class DelegateT>
101 25796 typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener(
102 typename BoundCallback<ParamT, DelegateT>::CallbackMethod method,
103 DelegateT *delegate) {
104 // create a new BoundCallback, register it and return the handle
105 25796 CallbackBase<ParamT> *callback = Observable<ParamT>::MakeCallback(method,
106 delegate);
107 25796 RegisterListener(callback);
108 25796 return callback;
109 }
110
111
112 template<typename ParamT>
113 2349 typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener(
114 typename Callback<ParamT>::CallbackFunction fn) {
115 // create a new Callback, register it and return the handle
116 2349 CallbackBase<ParamT> *callback = Observable<ParamT>::MakeCallback(fn);
117 2349 RegisterListener(callback);
118 2349 return callback;
119 }
120
121
122 template<typename ParamT>
123 18028039 void Observable<ParamT>::RegisterListener(
124 Observable<ParamT>::CallbackPtr callback_object) {
125 // register a generic CallbackBase callback
126 18028039 WriteLockGuard guard(listeners_rw_lock_);
127
1/2
✓ Branch 1 taken 9014809 times.
✗ Branch 2 not taken.
18028039 listeners_.insert(callback_object);
128 18028039 }
129
130
131 template<typename ParamT>
132 1866 void Observable<ParamT>::UnregisterListener(
133 typename Observable<ParamT>::CallbackPtr callback_object) {
134 // remove a callback handle from the callbacks list
135 // if it is not registered --> crash
136 1866 WriteLockGuard guard(listeners_rw_lock_);
137
1/2
✓ Branch 1 taken 1866 times.
✗ Branch 2 not taken.
1866 const size_t was_removed = listeners_.erase(callback_object);
138
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1866 times.
1866 assert(was_removed > 0);
139
1/2
✓ Branch 0 taken 1866 times.
✗ Branch 1 not taken.
1866 delete callback_object;
140 1866 }
141
142
143 template<typename ParamT>
144 36091826 void Observable<ParamT>::UnregisterListeners() {
145 36091826 WriteLockGuard guard(listeners_rw_lock_);
146
147 // remove all callbacks from the list
148 36094864 typename Callbacks::const_iterator i = listeners_.begin();
149 36089964 typename Callbacks::const_iterator iend = listeners_.end();
150
2/2
✓ Branch 2 taken 9010288 times.
✓ Branch 3 taken 18048530 times.
54100551 for (; i != iend; ++i) {
151
1/2
✓ Branch 1 taken 9009455 times.
✗ Branch 2 not taken.
18008235 delete *i;
152 }
153 36083594 listeners_.clear();
154 36087514 }
155
156
157 template<typename ParamT>
158 55855309 void Observable<ParamT>::NotifyListeners(const ParamT &parameter) {
159 55855309 ReadLockGuard guard(listeners_rw_lock_);
160
161 // invoke all callbacks and inform them about new data
162 55855279 typename Callbacks::const_iterator i = listeners_.begin();
163 55852953 typename Callbacks::const_iterator iend = listeners_.end();
164
2/2
✓ Branch 2 taken 28271112 times.
✓ Branch 3 taken 28266850 times.
111701632 for (; i != iend; ++i) {
165
1/2
✓ Branch 2 taken 28272086 times.
✗ Branch 3 not taken.
55848789 (**i)(parameter);
166 }
167 55840265 }
168
169
170 //
171 // +----------------------------------------------------------------------------
172 // | FifoChannel
173 //
174
175
176 template<class T>
177 117 FifoChannel<T>::FifoChannel(const size_t maximal_length,
178 const size_t drainout_threshold)
179 117 : maximal_queue_length_(maximal_length)
180 117 , queue_drainout_threshold_(drainout_threshold) {
181
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 117 times.
117 assert(drainout_threshold <= maximal_length);
182
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 117 times.
117 assert(drainout_threshold > 0);
183
184 117 const bool successful = (pthread_mutex_init(&mutex_, NULL) == 0
185
1/2
✓ Branch 1 taken 117 times.
✗ Branch 2 not taken.
117 && pthread_cond_init(&queue_is_not_empty_, NULL) == 0
186
2/4
✓ Branch 0 taken 117 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 117 times.
✗ Branch 4 not taken.
234 && pthread_cond_init(&queue_is_not_full_, NULL)
187 == 0);
188
189
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 117 times.
117 assert(successful);
190 117 }
191
192
193 template<class T>
194 346 FifoChannel<T>::~FifoChannel() {
195 234 pthread_cond_destroy(&queue_is_not_empty_);
196 234 pthread_cond_destroy(&queue_is_not_full_);
197 234 pthread_mutex_destroy(&mutex_);
198 580 }
199
200
201 template<class T>
202 20004237 void FifoChannel<T>::Enqueue(const T &data) {
203 20004237 MutexLockGuard lock(mutex_);
204
205 // wait for space in the queue
206
2/2
✓ Branch 1 taken 20 times.
✓ Branch 2 taken 20004237 times.
20004257 while (this->size() >= maximal_queue_length_) {
207
1/2
✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
20 pthread_cond_wait(&queue_is_not_full_, &mutex_);
208 }
209
210 // put something into the queue
211
1/2
✓ Branch 1 taken 20004237 times.
✗ Branch 2 not taken.
20004237 this->push(data);
212
213 // wake all waiting threads
214 20004237 pthread_cond_broadcast(&queue_is_not_empty_);
215 20004237 }
216
217
218 template<class T>
219 19723977 const T FifoChannel<T>::Dequeue() {
220 19723977 MutexLockGuard lock(mutex_);
221
222 // wait until there is something to do
223
2/2
✓ Branch 1 taken 58677729 times.
✓ Branch 2 taken 20002257 times.
78679986 while (this->empty()) {
224
1/2
✓ Branch 1 taken 58677729 times.
✗ Branch 2 not taken.
58677729 pthread_cond_wait(&queue_is_not_empty_, &mutex_);
225 }
226
227 // get the item from the queue
228 20002257 T data = this->front();
229 20002257 this->pop();
230
231 // signal waiting threads about the free space
232
2/2
✓ Branch 1 taken 19995937 times.
✓ Branch 2 taken 6320 times.
20002257 if (this->size() < queue_drainout_threshold_) {
233 19995937 pthread_cond_broadcast(&queue_is_not_full_);
234 }
235
236 // return the acquired job
237 19777277 return data;
238 20002257 }
239
240
241 template<class T>
242 20 unsigned int FifoChannel<T>::Drop() {
243 20 MutexLockGuard lock(mutex_);
244
245 20 unsigned int dropped_items = 0;
246
2/2
✓ Branch 1 taken 1980 times.
✓ Branch 2 taken 20 times.
2000 while (!this->empty()) {
247 1980 this->pop();
248 1980 ++dropped_items;
249 }
250
251 20 pthread_cond_broadcast(&queue_is_not_full_);
252
253 40 return dropped_items;
254 20 }
255
256
257 template<class T>
258 40 size_t FifoChannel<T>::GetItemCount() const {
259 40 MutexLockGuard lock(mutex_);
260 40 return this->size();
261 40 }
262
263
264 template<class T>
265 100 bool FifoChannel<T>::IsEmpty() const {
266 100 MutexLockGuard lock(mutex_);
267 100 return this->empty();
268 100 }
269
270
271 template<class T>
272 20 size_t FifoChannel<T>::GetMaximalItemCount() const {
273 20 return maximal_queue_length_;
274 }
275
276
277 //
278 // +----------------------------------------------------------------------------
279 // | ConcurrentWorkers
280 //
281
282
283 template<class WorkerT>
284 ConcurrentWorkers<WorkerT>::ConcurrentWorkers(
285 const size_t number_of_workers,
286 const size_t maximal_queue_length,
287 ConcurrentWorkers<WorkerT>::worker_context_t *worker_context)
288 : number_of_workers_(number_of_workers)
289 , worker_context_(worker_context)
290 , thread_context_(this, worker_context_)
291 , initialized_(false)
292 , running_(false)
293 , workers_started_(0)
294 , jobs_queue_(maximal_queue_length, maximal_queue_length / 4 + 1)
295 , results_queue_(maximal_queue_length, 1) {
296 assert(maximal_queue_length >= number_of_workers);
297 assert(number_of_workers > 0);
298
299 atomic_init32(&jobs_pending_);
300 atomic_init32(&jobs_failed_);
301 atomic_init64(&jobs_processed_);
302 }
303
304
305 template<class WorkerT>
306 ConcurrentWorkers<WorkerT>::~ConcurrentWorkers() {
307 if (IsRunning()) {
308 Terminate();
309 }
310
311 // destroy some synchronisation data structures
312 pthread_cond_destroy(&worker_started_);
313 pthread_cond_destroy(&jobs_all_done_);
314 pthread_mutex_destroy(&status_mutex_);
315 pthread_mutex_destroy(&jobs_all_done_mutex_);
316 }
317
318
319 template<class WorkerT>
320 bool ConcurrentWorkers<WorkerT>::Initialize() {
321 LogCvmfs(kLogConcurrency, kLogVerboseMsg,
322 "Initializing ConcurrentWorker "
323 "object with %lu worker threads "
324 "and a queue length of %zu",
325 number_of_workers_, jobs_queue_.GetMaximalItemCount());
326 // LogCvmfs(kLogConcurrency, kLogStdout, "sizeof(expected_data_t): %d\n"
327 // "sizeof(returned_data_t): %d",
328 // sizeof(expected_data_t), sizeof(returned_data_t));
329
330 // initialize synchronisation for job queue (Workers)
331 if (pthread_mutex_init(&status_mutex_, NULL) != 0
332 || pthread_mutex_init(&jobs_all_done_mutex_, NULL) != 0
333 || pthread_cond_init(&worker_started_, NULL) != 0
334 || pthread_cond_init(&jobs_all_done_, NULL) != 0) {
335 return false;
336 }
337
338 // spawn the Worker objects in their own threads
339 if (!SpawnWorkers()) {
340 LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn workers");
341 return false;
342 }
343
344 // all done...
345 initialized_ = true;
346 return true;
347 }
348
349
350 template<class WorkerT>
351 bool ConcurrentWorkers<WorkerT>::SpawnWorkers() {
352 assert(worker_threads_.size() == 0);
353 worker_threads_.resize(number_of_workers_);
354
355 // set the running flag to trap workers in their treadmills
356 StartRunning();
357
358 // spawn the swarm and make them work
359 bool success = true;
360 WorkerThreads::iterator i = worker_threads_.begin();
361 WorkerThreads::const_iterator iend = worker_threads_.end();
362 for (; i != iend; ++i) {
363 pthread_t *thread = &(*i);
364 const int retval = pthread_create(
365 thread,
366 NULL,
367 &ConcurrentWorkers<WorkerT>::RunWorker,
368 reinterpret_cast<void *>(&thread_context_));
369 if (retval != 0) {
370 LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn a Worker");
371 success = false;
372 }
373 }
374
375 // spawn the callback processing thread
376 const int retval = pthread_create(
377 &callback_thread_,
378 NULL,
379 &ConcurrentWorkers<WorkerT>::RunCallbackThreadWrapper,
380 reinterpret_cast<void *>(&thread_context_));
381 if (retval != 0) {
382 LogCvmfs(kLogConcurrency, kLogWarning,
383 "Failed to spawn the callback "
384 "worker thread");
385 success = false;
386 }
387
388 // wait for all workers to report in...
389 {
390 MutexLockGuard guard(status_mutex_);
391 // +1 -> callback thread
392 while (workers_started_ < number_of_workers_ + 1) {
393 pthread_cond_wait(&worker_started_, &status_mutex_);
394 }
395 }
396
397 // all done...
398 return success;
399 }
400
401
402 template<class WorkerT>
403 void *ConcurrentWorkers<WorkerT>::RunWorker(void *run_binding) {
404 // NOTE: This is the actual worker thread code!
405
406 //
407 // INITIALIZATION
408 /////////////////
409
410 // get contextual information
411 const WorkerRunBinding &binding = *(
412 static_cast<WorkerRunBinding *>(run_binding));
413 ConcurrentWorkers<WorkerT> *master = binding.delegate;
414 const worker_context_t *worker_context = binding.worker_context;
415
416 // boot up the worker object and make sure it works
417 WorkerT worker(worker_context);
418 worker.RegisterMaster(master);
419 const bool init_success = worker.Initialize();
420
421 // tell the master that this worker was started
422 master->ReportStartedWorker();
423
424 if (!init_success) {
425 LogCvmfs(kLogConcurrency, kLogWarning,
426 "Worker was not initialized "
427 "properly... it will die now!");
428 return NULL;
429 }
430
431 //
432 // PROCESSING LOOP
433 //////////////////
434
435 // start the processing loop
436 LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Starting Worker...");
437 while (master->IsRunning()) {
438 // acquire a new job
439 WorkerJob job = master->Acquire();
440
441 // check if we need to terminate
442 if (job.is_death_sentence)
443 break;
444
445 // do what you are supposed to do
446 worker(job.data);
447 }
448
449 //
450 // TEAR DOWN
451 ////////////
452
453 // give the worker the chance to tidy up
454 worker.TearDown();
455
456 // good bye thread...
457 LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Terminating Worker...");
458 return NULL;
459 }
460
461
462 template<class WorkerT>
463 void *ConcurrentWorkers<WorkerT>::RunCallbackThreadWrapper(void *run_binding) {
464 const RunBinding &binding = *(static_cast<RunBinding *>(run_binding));
465 ConcurrentWorkers<WorkerT> *master = binding.delegate;
466
467 master->ReportStartedWorker();
468
469 LogCvmfs(kLogConcurrency, kLogVerboseMsg,
470 "Started dedicated callback worker");
471 master->RunCallbackThread();
472 LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Terminating Callback Worker...");
473
474 return NULL;
475 }
476
477
478 template<class WorkerT>
479 void ConcurrentWorkers<WorkerT>::RunCallbackThread() {
480 while (IsRunning()) {
481 const CallbackJob callback_job = results_queue_.Dequeue();
482
483 // stop callback processing if needed
484 if (callback_job.is_death_sentence) {
485 break;
486 }
487
488 // notify all observers about the finished job
489 this->NotifyListeners(callback_job.data);
490
491 // remove the job from the pending 'list' and add it to the ready 'list'
492 atomic_dec32(&jobs_pending_);
493 atomic_inc64(&jobs_processed_);
494
495 // signal the Spooler that all jobs are done...
496 if (atomic_read32(&jobs_pending_) == 0) {
497 pthread_cond_broadcast(&jobs_all_done_);
498 }
499 }
500 }
501
502
503 template<class WorkerT>
504 void ConcurrentWorkers<WorkerT>::ReportStartedWorker() const {
505 MutexLockGuard lock(status_mutex_);
506 ++workers_started_;
507 pthread_cond_signal(&worker_started_);
508 }
509
510
511 template<class WorkerT>
512 void ConcurrentWorkers<WorkerT>::Schedule(WorkerJob job) {
513 // Note: This method can be called from arbitrary threads. Thus we do not
514 // necessarily have just one producer in the system.
515
516 // check if it makes sense to schedule this job
517 if (!IsRunning() && !job.is_death_sentence) {
518 LogCvmfs(kLogConcurrency, kLogWarning,
519 "Tried to schedule a job but "
520 "concurrency was not running...");
521 return;
522 }
523
524 jobs_queue_.Enqueue(job);
525 if (!job.is_death_sentence) {
526 atomic_inc32(&jobs_pending_);
527 }
528 }
529
530
531 template<class WorkerT>
532 void ConcurrentWorkers<WorkerT>::ScheduleDeathSentences() {
533 assert(!IsRunning());
534
535 // make sure that the queue is empty before we schedule a death sentence
536 TruncateJobQueue();
537
538 // schedule a death sentence for each running thread
539 const unsigned int number_of_workers = GetNumberOfWorkers();
540 for (unsigned int i = 0; i < number_of_workers; ++i) {
541 Schedule(WorkerJob());
542 }
543
544 // schedule a death sentence for the callback thread
545 results_queue_.Enqueue(CallbackJob());
546 }
547
548
549 template<class WorkerT>
550 typename ConcurrentWorkers<WorkerT>::WorkerJob
551 ConcurrentWorkers<WorkerT>::Acquire() {
552 // Note: This method is exclusively called inside the worker threads!
553 // Any other usage might produce undefined behavior.
554 return jobs_queue_.Dequeue();
555 }
556
557
558 template<class WorkerT>
559 void ConcurrentWorkers<WorkerT>::TruncateJobQueue(const bool forget_pending) {
560 // Note: This method will throw away all jobs currently waiting in the job
561 // queue. These jobs will not be processed!
562 const unsigned int dropped_jobs = jobs_queue_.Drop();
563
564 // if desired, we remove the jobs from the pending 'list'
565 if (forget_pending) {
566 atomic_xadd32(&jobs_pending_, -dropped_jobs);
567 }
568 }
569
570
571 template<class WorkerT>
572 void ConcurrentWorkers<WorkerT>::Terminate() {
573 // Note: this method causes workers to die immediately after they finished
574 // their last acquired job. To make sure that each worker will check
575 // the running state, we schedule empty jobs or Death Sentences.
576
577 assert(IsRunning());
578
579 // unset the running flag (causing threads to die on the next checkpoint)
580 StopRunning();
581
582 // schedule empty jobs to make sure that each worker will actually reach the
583 // next checkpoint in their processing loop and terminate as expected
584 ScheduleDeathSentences();
585
586 // wait for the worker threads to return
587 WorkerThreads::const_iterator i = worker_threads_.begin();
588 WorkerThreads::const_iterator iend = worker_threads_.end();
589 for (; i != iend; ++i) {
590 pthread_join(*i, NULL);
591 }
592
593 // wait for the callback worker thread
594 pthread_join(callback_thread_, NULL);
595
596 // check if we finished all pending jobs
597 const int pending = atomic_read32(&jobs_pending_);
598 if (pending > 0) {
599 LogCvmfs(kLogConcurrency, kLogWarning,
600 "Job queue was not fully processed. "
601 "Still %d jobs were pending and "
602 "will not be executed anymore.",
603 pending);
604 }
605
606 // check if we had failed jobs
607 const int failed = atomic_read32(&jobs_failed_);
608 if (failed > 0) {
609 LogCvmfs(kLogConcurrency, kLogWarning, "We've had %d failed jobs.", failed);
610 }
611
612 // thanks, and good bye...
613 LogCvmfs(kLogConcurrency, kLogVerboseMsg,
614 "All workers stopped. They processed %ld jobs. Terminating...",
615 atomic_read64(&jobs_processed_));
616 }
617
618
619 template<class WorkerT>
620 void ConcurrentWorkers<WorkerT>::WaitForEmptyQueue() const {
621 LogCvmfs(kLogConcurrency, kLogVerboseMsg,
622 "Waiting for %d jobs to be finished", atomic_read32(&jobs_pending_));
623
624 // wait until all pending jobs are processed
625 {
626 MutexLockGuard lock(jobs_all_done_mutex_);
627 while (atomic_read32(&jobs_pending_) > 0) {
628 pthread_cond_wait(&jobs_all_done_, &jobs_all_done_mutex_);
629 }
630 }
631
632 LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Jobs are done... go on");
633 }
634
635
636 template<class WorkerT>
637 void ConcurrentWorkers<WorkerT>::WaitForTermination() {
638 if (!IsRunning())
639 return;
640
641 WaitForEmptyQueue();
642 Terminate();
643 }
644
645
646 template<class WorkerT>
647 void ConcurrentWorkers<WorkerT>::JobDone(
648 const ConcurrentWorkers<WorkerT>::returned_data_t &data,
649 const bool success) {
650 // BEWARE!
651 // This is a callback method that might be called from a different thread!
652
653 // check if the finished job was successful
654 if (!success) {
655 atomic_inc32(&jobs_failed_);
656 LogCvmfs(kLogConcurrency, kLogWarning, "Job failed");
657 }
658
659 // queue the result in the callback channel
660 results_queue_.Enqueue(CallbackJob(data));
661 }
662
663 #ifdef CVMFS_NAMESPACE_GUARD
664 } // namespace CVMFS_NAMESPACE_GUARD
665 #endif
666
667 #endif // CVMFS_UTIL_CONCURRENCY_IMPL_H_
668