GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/util/concurrency_impl.h
Date: 2024-04-28 02:33:07
Exec Total Coverage
Lines: 120 265 45.3%
Branches: 54 142 38.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_UTIL_CONCURRENCY_IMPL_H_
6 #define CVMFS_UTIL_CONCURRENCY_IMPL_H_
7
8 #include "util/logging.h"
9
10 #ifdef CVMFS_NAMESPACE_GUARD
11 namespace CVMFS_NAMESPACE_GUARD {
12 #endif
13
14 //
15 // +----------------------------------------------------------------------------
16 // | SynchronizingCounter
17 //
18
19
20 template <typename T>
21 145870490 void SynchronizingCounter<T>::SetValueUnprotected(const T new_value) {
22 // make sure that 0 <= new_value <= maximal_value_ if maximal_value_ != 0
23
5/6
✓ Branch 1 taken 1346497 times.
✓ Branch 2 taken 144523993 times.
✓ Branch 3 taken 1345269 times.
✓ Branch 4 taken 1228 times.
✓ Branch 5 taken 1345269 times.
✗ Branch 6 not taken.
145870490 assert(!HasMaximalValue() ||
24 (new_value >= T(0) && new_value <= maximal_value_));
25
26 145870490 value_ = new_value;
27
28
2/2
✓ Branch 0 taken 65613 times.
✓ Branch 1 taken 145804877 times.
145870490 if (value_ == T(0)) {
29 65613 pthread_cond_broadcast(&became_zero_);
30 }
31
32
6/6
✓ Branch 1 taken 1346497 times.
✓ Branch 2 taken 144523993 times.
✓ Branch 3 taken 835148 times.
✓ Branch 4 taken 511349 times.
✓ Branch 5 taken 835148 times.
✓ Branch 6 taken 145035342 times.
145870490 if (HasMaximalValue() && value_ < maximal_value_) {
33 835148 pthread_cond_broadcast(&free_slot_);
34 }
35 145870490 }
36
37
38 template <typename T>
39 72935238 void SynchronizingCounter<T>::WaitForFreeSlotUnprotected() {
40
6/6
✓ Branch 1 taken 4137080 times.
✓ Branch 2 taken 72261992 times.
✓ Branch 3 taken 3463834 times.
✓ Branch 4 taken 673246 times.
✓ Branch 5 taken 3463834 times.
✓ Branch 6 taken 72935238 times.
76399072 while (HasMaximalValue() && value_ >= maximal_value_) {
41 3463834 pthread_cond_wait(&free_slot_, &mutex_);
42 }
43
3/4
✓ Branch 1 taken 673246 times.
✓ Branch 2 taken 72261992 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 673246 times.
72935238 assert(!HasMaximalValue() || value_ < maximal_value_);
44 72935238 }
45
46
47 template <typename T>
48 139 void SynchronizingCounter<T>::Initialize() {
49 139 const bool init_successful = (
50
1/2
✓ Branch 1 taken 139 times.
✗ Branch 2 not taken.
278 pthread_mutex_init(&mutex_, NULL) == 0 &&
51
2/4
✓ Branch 0 taken 139 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 139 times.
✗ Branch 4 not taken.
278 pthread_cond_init(&became_zero_, NULL) == 0 &&
52 139 pthread_cond_init(&free_slot_, NULL) == 0);
53
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 139 times.
139 assert(init_successful);
54 139 }
55
56
57 template <typename T>
58 139 void SynchronizingCounter<T>::Destroy() {
59 139 pthread_mutex_destroy(&mutex_);
60 139 pthread_cond_destroy(&became_zero_);
61 139 pthread_cond_destroy(&free_slot_);
62 139 }
63
64
65 //
66 // +----------------------------------------------------------------------------
67 // | Observable
68 //
69
70
71 template <typename ParamT>
72 738054 Observable<ParamT>::Observable() {
73 738054 const int ret = pthread_rwlock_init(&listeners_rw_lock_, NULL);
74
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 369064 times.
738054 assert(ret == 0);
75 738054 }
76
77
78 template <typename ParamT>
79 738026 Observable<ParamT>::~Observable() {
80 738026 UnregisterListeners();
81 738024 pthread_rwlock_destroy(&listeners_rw_lock_);
82 }
83
84
85 template <typename ParamT>
86 template <class DelegateT, class ClosureDataT>
87 183697 typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener(
88 typename BoundClosure<ParamT,
89 DelegateT,
90 ClosureDataT>::CallbackMethod method,
91 DelegateT *delegate,
92 ClosureDataT data) {
93 // create a new BoundClosure, register it and return the handle
94 CallbackBase<ParamT> *callback =
95 183697 Observable<ParamT>::MakeClosure(method, delegate, data);
96 183697 RegisterListener(callback);
97 183697 return callback;
98 }
99
100
101 template <typename ParamT>
102 template <class DelegateT>
103 1046 typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener(
104 typename BoundCallback<ParamT, DelegateT>::CallbackMethod method,
105 DelegateT *delegate) {
106 // create a new BoundCallback, register it and return the handle
107 CallbackBase<ParamT> *callback =
108 1046 Observable<ParamT>::MakeCallback(method, delegate);
109 1046 RegisterListener(callback);
110 1046 return callback;
111 }
112
113
114 template <typename ParamT>
115 71 typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener(
116 typename Callback<ParamT>::CallbackFunction fn) {
117 // create a new Callback, register it and return the handle
118 CallbackBase<ParamT> *callback =
119 71 Observable<ParamT>::MakeCallback(fn);
120 71 RegisterListener(callback);
121 71 return callback;
122 }
123
124
125 template <typename ParamT>
126 368537 void Observable<ParamT>::RegisterListener(
127 Observable<ParamT>::CallbackPtr callback_object) {
128 // register a generic CallbackBase callback
129 368537 WriteLockGuard guard(listeners_rw_lock_);
130
1/2
✓ Branch 1 taken 184305 times.
✗ Branch 2 not taken.
368537 listeners_.insert(callback_object);
131 368537 }
132
133
134 template <typename ParamT>
135 87 void Observable<ParamT>::UnregisterListener(
136 typename Observable<ParamT>::CallbackPtr callback_object) {
137 // remove a callback handle from the callbacks list
138 // if it is not registered --> crash
139 87 WriteLockGuard guard(listeners_rw_lock_);
140
1/2
✓ Branch 1 taken 87 times.
✗ Branch 2 not taken.
87 const size_t was_removed = listeners_.erase(callback_object);
141
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 87 times.
87 assert(was_removed > 0);
142
1/2
✓ Branch 0 taken 87 times.
✗ Branch 1 not taken.
87 delete callback_object;
143 87 }
144
145
146 template <typename ParamT>
147 737549 void Observable<ParamT>::UnregisterListeners() {
148 737549 WriteLockGuard guard(listeners_rw_lock_);
149
150 // remove all callbacks from the list
151 737597 typename Callbacks::const_iterator i = listeners_.begin();
152 737487 typename Callbacks::const_iterator iend = listeners_.end();
153
2/2
✓ Branch 2 taken 184156 times.
✓ Branch 3 taken 368869 times.
1105318 for (; i != iend; ++i) {
154
1/2
✓ Branch 1 taken 184148 times.
✗ Branch 2 not taken.
367847 delete *i;
155 }
156 737231 listeners_.clear();
157 737435 }
158
159
160 template <typename ParamT>
161 2139125 void Observable<ParamT>::NotifyListeners(const ParamT &parameter) {
162 2139125 ReadLockGuard guard(listeners_rw_lock_);
163
164 // invoke all callbacks and inform them about new data
165 2139161 typename Callbacks::const_iterator i = listeners_.begin();
166 2139105 typename Callbacks::const_iterator iend = listeners_.end();
167
2/2
✓ Branch 2 taken 1085008 times.
✓ Branch 3 taken 1084968 times.
4277938 for (; i != iend; ++i) {
168
1/2
✓ Branch 2 taken 1084969 times.
✗ Branch 3 not taken.
2138935 (**i)(parameter);
169 }
170 2138855 }
171
172
173 //
174 // +----------------------------------------------------------------------------
175 // | FifoChannel
176 //
177
178
179 template <class T>
180 10 FifoChannel<T>::FifoChannel(const size_t maximal_length,
181 const size_t drainout_threshold) :
182 10 maximal_queue_length_(maximal_length),
183 10 queue_drainout_threshold_(drainout_threshold)
184 {
185
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
10 assert(drainout_threshold <= maximal_length);
186
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
10 assert(drainout_threshold > 0);
187
188 10 const bool successful = (
189
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
20 pthread_mutex_init(&mutex_, NULL) == 0 &&
190
2/4
✓ Branch 0 taken 10 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 10 times.
✗ Branch 4 not taken.
20 pthread_cond_init(&queue_is_not_empty_, NULL) == 0 &&
191 10 pthread_cond_init(&queue_is_not_full_, NULL) == 0);
192
193
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
10 assert(successful);
194 10 }
195
196
197 template <class T>
198 28 FifoChannel<T>::~FifoChannel() {
199 20 pthread_cond_destroy(&queue_is_not_empty_);
200 20 pthread_cond_destroy(&queue_is_not_full_);
201 20 pthread_mutex_destroy(&mutex_);
202 48 }
203
204
205 template <class T>
206 1000216 void FifoChannel<T>::Enqueue(const T &data) {
207 1000216 MutexLockGuard lock(mutex_);
208
209 // wait for space in the queue
210
2/2
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 1000216 times.
1000217 while (this->size() >= maximal_queue_length_) {
211
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 pthread_cond_wait(&queue_is_not_full_, &mutex_);
212 }
213
214 // put something into the queue
215
1/2
✓ Branch 1 taken 1000216 times.
✗ Branch 2 not taken.
1000216 this->push(data);
216
217 // wake all waiting threads
218 1000216 pthread_cond_broadcast(&queue_is_not_empty_);
219 1000216 }
220
221
222 template <class T>
223 985672 const T FifoChannel<T>::Dequeue() {
224 985672 MutexLockGuard lock(mutex_);
225
226 // wait until there is something to do
227
2/2
✓ Branch 1 taken 2171703 times.
✓ Branch 2 taken 1000117 times.
3171820 while (this->empty()) {
228
1/2
✓ Branch 1 taken 2171703 times.
✗ Branch 2 not taken.
2171703 pthread_cond_wait(&queue_is_not_empty_, &mutex_);
229 }
230
231 // get the item from the queue
232 1000117 T data = this->front(); this->pop();
233
234 // signal waiting threads about the free space
235
2/2
✓ Branch 1 taken 999555 times.
✓ Branch 2 taken 562 times.
1000117 if (this->size() < queue_drainout_threshold_) {
236 999555 pthread_cond_broadcast(&queue_is_not_full_);
237 }
238
239 // return the acquired job
240 985660 return data;
241 1000117 }
242
243
244 template <class T>
245 1 unsigned int FifoChannel<T>::Drop() {
246 1 MutexLockGuard lock(mutex_);
247
248 1 unsigned int dropped_items = 0;
249
2/2
✓ Branch 1 taken 99 times.
✓ Branch 2 taken 1 times.
100 while (!this->empty()) {
250 99 this->pop();
251 99 ++dropped_items;
252 }
253
254 1 pthread_cond_broadcast(&queue_is_not_full_);
255
256 2 return dropped_items;
257 1 }
258
259
260 template <class T>
261 2 size_t FifoChannel<T>::GetItemCount() const {
262 2 MutexLockGuard lock(mutex_);
263 2 return this->size();
264 2 }
265
266
267 template <class T>
268 5 bool FifoChannel<T>::IsEmpty() const {
269 5 MutexLockGuard lock(mutex_);
270 5 return this->empty();
271 5 }
272
273
274 template <class T>
275 1 size_t FifoChannel<T>::GetMaximalItemCount() const {
276 1 return maximal_queue_length_;
277 }
278
279
280 //
281 // +----------------------------------------------------------------------------
282 // | ConcurrentWorkers
283 //
284
285
286 template <class WorkerT>
287 ConcurrentWorkers<WorkerT>::ConcurrentWorkers(
288 const size_t number_of_workers,
289 const size_t maximal_queue_length,
290 ConcurrentWorkers<WorkerT>::worker_context_t *worker_context) :
291 number_of_workers_(number_of_workers),
292 worker_context_(worker_context),
293 thread_context_(this, worker_context_),
294 initialized_(false),
295 running_(false),
296 workers_started_(0),
297 jobs_queue_(maximal_queue_length, maximal_queue_length / 4 + 1),
298 results_queue_(maximal_queue_length, 1)
299 {
300 assert(maximal_queue_length >= number_of_workers);
301 assert(number_of_workers > 0);
302
303 atomic_init32(&jobs_pending_);
304 atomic_init32(&jobs_failed_);
305 atomic_init64(&jobs_processed_);
306 }
307
308
309 template <class WorkerT>
310 ConcurrentWorkers<WorkerT>::~ConcurrentWorkers() {
311 if (IsRunning()) {
312 Terminate();
313 }
314
315 // destroy some synchronisation data structures
316 pthread_cond_destroy(&worker_started_);
317 pthread_cond_destroy(&jobs_all_done_);
318 pthread_mutex_destroy(&status_mutex_);
319 pthread_mutex_destroy(&jobs_all_done_mutex_);
320 }
321
322
323 template <class WorkerT>
324 bool ConcurrentWorkers<WorkerT>::Initialize() {
325 LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Initializing ConcurrentWorker "
326 "object with %lu worker threads "
327 "and a queue length of %zu",
328 number_of_workers_, jobs_queue_.GetMaximalItemCount());
329 // LogCvmfs(kLogConcurrency, kLogStdout, "sizeof(expected_data_t): %d\n"
330 // "sizeof(returned_data_t): %d",
331 // sizeof(expected_data_t), sizeof(returned_data_t));
332
333 // initialize synchronisation for job queue (Workers)
334 if (pthread_mutex_init(&status_mutex_, NULL) != 0 ||
335 pthread_mutex_init(&jobs_all_done_mutex_, NULL) != 0 ||
336 pthread_cond_init(&worker_started_, NULL) != 0 ||
337 pthread_cond_init(&jobs_all_done_, NULL) != 0) {
338 return false;
339 }
340
341 // spawn the Worker objects in their own threads
342 if (!SpawnWorkers()) {
343 LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn workers");
344 return false;
345 }
346
347 // all done...
348 initialized_ = true;
349 return true;
350 }
351
352
353 template <class WorkerT>
354 bool ConcurrentWorkers<WorkerT>::SpawnWorkers() {
355 assert(worker_threads_.size() == 0);
356 worker_threads_.resize(number_of_workers_);
357
358 // set the running flag to trap workers in their treadmills
359 StartRunning();
360
361 // spawn the swarm and make them work
362 bool success = true;
363 WorkerThreads::iterator i = worker_threads_.begin();
364 WorkerThreads::const_iterator iend = worker_threads_.end();
365 for (; i != iend; ++i) {
366 pthread_t* thread = &(*i);
367 const int retval = pthread_create(
368 thread,
369 NULL,
370 &ConcurrentWorkers<WorkerT>::RunWorker,
371 reinterpret_cast<void *>(&thread_context_));
372 if (retval != 0) {
373 LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn a Worker");
374 success = false;
375 }
376 }
377
378 // spawn the callback processing thread
379 const int retval =
380 pthread_create(
381 &callback_thread_,
382 NULL,
383 &ConcurrentWorkers<WorkerT>::RunCallbackThreadWrapper,
384 reinterpret_cast<void *>(&thread_context_));
385 if (retval != 0) {
386 LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn the callback "
387 "worker thread");
388 success = false;
389 }
390
391 // wait for all workers to report in...
392 {
393 MutexLockGuard guard(status_mutex_);
394 // +1 -> callback thread
395 while (workers_started_ < number_of_workers_ + 1) {
396 pthread_cond_wait(&worker_started_, &status_mutex_);
397 }
398 }
399
400 // all done...
401 return success;
402 }
403
404
405 template <class WorkerT>
406 void* ConcurrentWorkers<WorkerT>::RunWorker(void *run_binding) {
407 // NOTE: This is the actual worker thread code!
408
409 //
410 // INITIALIZATION
411 /////////////////
412
413 // get contextual information
414 const WorkerRunBinding &binding =
415 *(static_cast<WorkerRunBinding*>(run_binding));
416 ConcurrentWorkers<WorkerT> *master = binding.delegate;
417 const worker_context_t *worker_context = binding.worker_context;
418
419 // boot up the worker object and make sure it works
420 WorkerT worker(worker_context);
421 worker.RegisterMaster(master);
422 const bool init_success = worker.Initialize();
423
424 // tell the master that this worker was started
425 master->ReportStartedWorker();
426
427 if (!init_success) {
428 LogCvmfs(kLogConcurrency, kLogWarning, "Worker was not initialized "
429 "properly... it will die now!");
430 return NULL;
431 }
432
433 //
434 // PROCESSING LOOP
435 //////////////////
436
437 // start the processing loop
438 LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Starting Worker...");
439 while (master->IsRunning()) {
440 // acquire a new job
441 WorkerJob job = master->Acquire();
442
443 // check if we need to terminate
444 if (job.is_death_sentence)
445 break;
446
447 // do what you are supposed to do
448 worker(job.data);
449 }
450
451 //
452 // TEAR DOWN
453 ////////////
454
455 // give the worker the chance to tidy up
456 worker.TearDown();
457
458 // good bye thread...
459 LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Terminating Worker...");
460 return NULL;
461 }
462
463
464 template <class WorkerT>
465 void* ConcurrentWorkers<WorkerT>::RunCallbackThreadWrapper(void *run_binding) {
466 const RunBinding &binding = *(static_cast<RunBinding*>(run_binding));
467 ConcurrentWorkers<WorkerT> *master = binding.delegate;
468
469 master->ReportStartedWorker();
470
471 LogCvmfs(kLogConcurrency, kLogVerboseMsg,
472 "Started dedicated callback worker");
473 master->RunCallbackThread();
474 LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Terminating Callback Worker...");
475
476 return NULL;
477 }
478
479
480 template <class WorkerT>
481 void ConcurrentWorkers<WorkerT>::RunCallbackThread() {
482 while (IsRunning()) {
483 const CallbackJob callback_job = results_queue_.Dequeue();
484
485 // stop callback processing if needed
486 if (callback_job.is_death_sentence) {
487 break;
488 }
489
490 // notify all observers about the finished job
491 this->NotifyListeners(callback_job.data);
492
493 // remove the job from the pending 'list' and add it to the ready 'list'
494 atomic_dec32(&jobs_pending_);
495 atomic_inc64(&jobs_processed_);
496
497 // signal the Spooler that all jobs are done...
498 if (atomic_read32(&jobs_pending_) == 0) {
499 pthread_cond_broadcast(&jobs_all_done_);
500 }
501 }
502 }
503
504
505 template <class WorkerT>
506 void ConcurrentWorkers<WorkerT>::ReportStartedWorker() const {
507 MutexLockGuard lock(status_mutex_);
508 ++workers_started_;
509 pthread_cond_signal(&worker_started_);
510 }
511
512
513 template <class WorkerT>
514 void ConcurrentWorkers<WorkerT>::Schedule(WorkerJob job) {
515 // Note: This method can be called from arbitrary threads. Thus we do not
516 // necessarily have just one producer in the system.
517
518 // check if it makes sense to schedule this job
519 if (!IsRunning() && !job.is_death_sentence) {
520 LogCvmfs(kLogConcurrency, kLogWarning, "Tried to schedule a job but "
521 "concurrency was not running...");
522 return;
523 }
524
525 jobs_queue_.Enqueue(job);
526 if (!job.is_death_sentence) {
527 atomic_inc32(&jobs_pending_);
528 }
529 }
530
531
532 template <class WorkerT>
533 void ConcurrentWorkers<WorkerT>::ScheduleDeathSentences() {
534 assert(!IsRunning());
535
536 // make sure that the queue is empty before we schedule a death sentence
537 TruncateJobQueue();
538
539 // schedule a death sentence for each running thread
540 const unsigned int number_of_workers = GetNumberOfWorkers();
541 for (unsigned int i = 0; i < number_of_workers; ++i) {
542 Schedule(WorkerJob());
543 }
544
545 // schedule a death sentence for the callback thread
546 results_queue_.Enqueue(CallbackJob());
547 }
548
549
550 template <class WorkerT>
551 typename ConcurrentWorkers<WorkerT>::WorkerJob
552 ConcurrentWorkers<WorkerT>::Acquire()
553 {
554 // Note: This method is exclusively called inside the worker threads!
555 // Any other usage might produce undefined behavior.
556 return jobs_queue_.Dequeue();
557 }
558
559
560 template <class WorkerT>
561 void ConcurrentWorkers<WorkerT>::TruncateJobQueue(const bool forget_pending) {
562 // Note: This method will throw away all jobs currently waiting in the job
563 // queue. These jobs will not be processed!
564 const unsigned int dropped_jobs = jobs_queue_.Drop();
565
566 // if desired, we remove the jobs from the pending 'list'
567 if (forget_pending) {
568 atomic_xadd32(&jobs_pending_, -dropped_jobs);
569 }
570 }
571
572
573 template <class WorkerT>
574 void ConcurrentWorkers<WorkerT>::Terminate() {
575 // Note: this method causes workers to die immediately after they finished
576 // their last acquired job. To make sure that each worker will check
577 // the running state, we schedule empty jobs or Death Sentences.
578
579 assert(IsRunning());
580
581 // unset the running flag (causing threads to die on the next checkpoint)
582 StopRunning();
583
584 // schedule empty jobs to make sure that each worker will actually reach the
585 // next checkpoint in their processing loop and terminate as expected
586 ScheduleDeathSentences();
587
588 // wait for the worker threads to return
589 WorkerThreads::const_iterator i = worker_threads_.begin();
590 WorkerThreads::const_iterator iend = worker_threads_.end();
591 for (; i != iend; ++i) {
592 pthread_join(*i, NULL);
593 }
594
595 // wait for the callback worker thread
596 pthread_join(callback_thread_, NULL);
597
598 // check if we finished all pending jobs
599 const int pending = atomic_read32(&jobs_pending_);
600 if (pending > 0) {
601 LogCvmfs(kLogConcurrency, kLogWarning, "Job queue was not fully processed. "
602 "Still %d jobs were pending and "
603 "will not be executed anymore.",
604 pending);
605 }
606
607 // check if we had failed jobs
608 const int failed = atomic_read32(&jobs_failed_);
609 if (failed > 0) {
610 LogCvmfs(kLogConcurrency, kLogWarning, "We've had %d failed jobs.",
611 failed);
612 }
613
614 // thanks, and good bye...
615 LogCvmfs(kLogConcurrency, kLogVerboseMsg,
616 "All workers stopped. They processed %ld jobs. Terminating...",
617 atomic_read64(&jobs_processed_));
618 }
619
620
621 template <class WorkerT>
622 void ConcurrentWorkers<WorkerT>::WaitForEmptyQueue() const {
623 LogCvmfs(kLogConcurrency, kLogVerboseMsg,
624 "Waiting for %d jobs to be finished", atomic_read32(&jobs_pending_));
625
626 // wait until all pending jobs are processed
627 {
628 MutexLockGuard lock(jobs_all_done_mutex_);
629 while (atomic_read32(&jobs_pending_) > 0) {
630 pthread_cond_wait(&jobs_all_done_, &jobs_all_done_mutex_);
631 }
632 }
633
634 LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Jobs are done... go on");
635 }
636
637
638 template <class WorkerT>
639 void ConcurrentWorkers<WorkerT>::WaitForTermination() {
640 if (!IsRunning())
641 return;
642
643 WaitForEmptyQueue();
644 Terminate();
645 }
646
647
648 template <class WorkerT>
649 void ConcurrentWorkers<WorkerT>::JobDone(
650 const ConcurrentWorkers<WorkerT>::returned_data_t& data,
651 const bool success) {
652 // BEWARE!
653 // This is a callback method that might be called from a different thread!
654
655 // check if the finished job was successful
656 if (!success) {
657 atomic_inc32(&jobs_failed_);
658 LogCvmfs(kLogConcurrency, kLogWarning, "Job failed");
659 }
660
661 // queue the result in the callback channel
662 results_queue_.Enqueue(CallbackJob(data));
663 }
664
665 #ifdef CVMFS_NAMESPACE_GUARD
666 } // namespace CVMFS_NAMESPACE_GUARD
667 #endif
668
669 #endif // CVMFS_UTIL_CONCURRENCY_IMPL_H_
670