GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/util_concurrency_impl.h Lines: 134 270 49.6 %
Date: 2019-02-03 02:48:13 Branches: 75 482 15.6 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#ifndef CVMFS_UTIL_CONCURRENCY_IMPL_H_
6
#define CVMFS_UTIL_CONCURRENCY_IMPL_H_
7
8
#include "logging.h"
9
10
#ifdef CVMFS_NAMESPACE_GUARD
11
namespace CVMFS_NAMESPACE_GUARD {
12
#endif
13
14
//
15
// +----------------------------------------------------------------------------
16
// |  Future
17
//
18
19
20
template <typename T>
21
39
Future<T>::Future() : object_(), object_was_set_(false) {
22
  const bool init_successful = (pthread_mutex_init(&mutex_, NULL)     == 0   &&
23




39
                                pthread_cond_init(&object_set_, NULL) == 0);
24


39
  assert(init_successful);
25
39
}
26
27
28
template <typename T>
29
70
Future<T>::~Future() {
30
39
  pthread_cond_destroy(&object_set_);
31


39
  pthread_mutex_destroy(&mutex_);
32
109
}
33
34
35
template <typename T>
36
39
void Future<T>::Set(const T &object) {
37
39
  MutexLockGuard guard(mutex_);
38


39
  assert(!object_was_set_);
39
39
  object_         = object;
40
39
  object_was_set_ = true;
41
39
  pthread_cond_broadcast(&object_set_);
42
39
}
43
44
45
template <typename T>
46
39
void Future<T>::Wait() const {
47
39
  MutexLockGuard guard(mutex_);
48


39
  if (!object_was_set_) {
49
12
    pthread_cond_wait(&object_set_, &mutex_);
50
  }
51


39
  assert(object_was_set_);
52
39
}
53
54
55
template <typename T>
56
39
T& Future<T>::Get() {
57
39
  Wait();
58
39
  return object_;
59
}
60
61
62
template <typename T>
63
const T& Future<T>::Get() const {
64
  Wait();
65
  return object_;
66
}
67
68
69
//
70
// +----------------------------------------------------------------------------
71
// |  SynchronizingCounter
72
//
73
74
75
template <typename T>
76
145711341
void SynchronizingCounter<T>::SetValueUnprotected(const T new_value) {
77
  // make sure that 0 <= new_value <= maximal_value_ if maximal_value_ != 0
78

145711341
  assert(!HasMaximalValue() ||
79
         (new_value >= T(0) && new_value <= maximal_value_));
80
81
145711341
  value_ = new_value;
82
83
145711341
  if (value_ == T(0)) {
84
3483
    pthread_cond_broadcast(&became_zero_);
85
  }
86
87

145711341
  if (HasMaximalValue() && value_ < maximal_value_) {
88
1052233
    pthread_cond_broadcast(&free_slot_);
89
  }
90
145711341
}
91
92
93
template <typename T>
94
72855615
void SynchronizingCounter<T>::WaitForFreeSlotUnprotected() {
95

145844108
  while (HasMaximalValue() && value_ >= maximal_value_) {
96
132878
    pthread_cond_wait(&free_slot_, &mutex_);
97
  }
98

72855615
  assert(!HasMaximalValue() || value_ < maximal_value_);
99
72855615
}
100
101
102
template <typename T>
103
346
void SynchronizingCounter<T>::Initialize() {
104
  const bool init_successful = (
105
    pthread_mutex_init(&mutex_,       NULL) == 0 &&
106
    pthread_cond_init(&became_zero_, NULL) == 0 &&
107

346
    pthread_cond_init(&free_slot_,   NULL) == 0);
108
346
  assert(init_successful);
109
346
}
110
111
112
template <typename T>
113
346
void SynchronizingCounter<T>::Destroy() {
114
346
  pthread_mutex_destroy(&mutex_);
115
346
  pthread_cond_destroy(&became_zero_);
116
346
  pthread_cond_destroy(&free_slot_);
117
346
}
118
119
120
//
121
// +----------------------------------------------------------------------------
122
// |  Observable
123
//
124
125
126
template <typename ParamT>
127
426
Observable<ParamT>::Observable() {
128
426
  const int ret = pthread_rwlock_init(&listeners_rw_lock_, NULL);
129

426
  assert(ret == 0);
130
426
}
131
132
133
template <typename ParamT>
134
426
Observable<ParamT>::~Observable() {
135
426
  UnregisterListeners();
136

426
  pthread_rwlock_destroy(&listeners_rw_lock_);
137
852
}
138
139
140
template <typename ParamT>
141
template <class DelegateT, class ClosureDataT>
142
10
typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener(
143
        typename BoundClosure<ParamT,
144
                              DelegateT,
145
                              ClosureDataT>::CallbackMethod   method,
146
        DelegateT                                            *delegate,
147
        ClosureDataT                                          data) {
148
  // create a new BoundClosure, register it and return the handle
149
  CallbackBase<ParamT> *callback =
150
10
    Observable<ParamT>::MakeClosure(method, delegate, data);
151
10
  RegisterListener(callback);
152
10
  return callback;
153
}
154
155
156
template <typename ParamT>
157
template <class DelegateT>
158
228
typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener(
159
    typename BoundCallback<ParamT, DelegateT>::CallbackMethod method,
160
    DelegateT *delegate) {
161
  // create a new BoundCallback, register it and return the handle
162
  CallbackBase<ParamT> *callback =
163
228
    Observable<ParamT>::MakeCallback(method, delegate);
164
228
  RegisterListener(callback);
165
228
  return callback;
166
}
167
168
169
template <typename ParamT>
170
138
typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener(
171
    typename Callback<ParamT>::CallbackFunction fn) {
172
  // create a new Callback, register it and return the handle
173
  CallbackBase<ParamT> *callback =
174
138
    Observable<ParamT>::MakeCallback(fn);
175
138
  RegisterListener(callback);
176
138
  return callback;
177
}
178
179
180
template <typename ParamT>
181
376
void Observable<ParamT>::RegisterListener(
182
    Observable<ParamT>::CallbackPtr callback_object) {
183
  // register a generic CallbackBase callback
184
376
  WriteLockGuard guard(listeners_rw_lock_);
185
376
  listeners_.insert(callback_object);
186
376
}
187
188
189
template <typename ParamT>
190
45
void Observable<ParamT>::UnregisterListener(
191
    typename Observable<ParamT>::CallbackPtr callback_object) {
192
  // remove a callback handle from the callbacks list
193
  // if it is not registered --> crash
194
45
  WriteLockGuard guard(listeners_rw_lock_);
195
45
  const size_t was_removed = listeners_.erase(callback_object);
196
45
  assert(was_removed > 0);
197
45
  delete callback_object;
198
45
}
199
200
201
template <typename ParamT>
202
437
void Observable<ParamT>::UnregisterListeners() {
203
437
  WriteLockGuard guard(listeners_rw_lock_);
204
205
  // remove all callbacks from the list
206
437
  typename Callbacks::const_iterator i    = listeners_.begin();
207
437
  typename Callbacks::const_iterator iend = listeners_.end();
208

768
  for (; i != iend; ++i) {
209

331
    delete *i;
210
  }
211
437
  listeners_.clear();
212
437
}
213
214
215
template <typename ParamT>
216
38734
void Observable<ParamT>::NotifyListeners(const ParamT &parameter) {
217
38734
  ReadLockGuard guard(listeners_rw_lock_);
218
219
  // invoke all callbacks and inform them about new data
220
38734
  typename Callbacks::const_iterator i    = listeners_.begin();
221
38734
  typename Callbacks::const_iterator iend = listeners_.end();
222

77327
  for (; i != iend; ++i) {
223
38593
    (**i)(parameter);
224
  }
225
38734
}
226
227
228
//
229
// +----------------------------------------------------------------------------
230
// |  FifoChannel
231
//
232
233
234
template <class T>
235
43
FifoChannel<T>::FifoChannel(const size_t maximal_length,
236
                            const size_t drainout_threshold) :
237
  maximal_queue_length_(maximal_length),
238
43
  queue_drainout_threshold_(drainout_threshold)
239
{
240


43
  assert(drainout_threshold <= maximal_length);
241


43
  assert(drainout_threshold >  0);
242
243
  const bool successful = (
244
    pthread_mutex_init(&mutex_, NULL)              == 0 &&
245
    pthread_cond_init(&queue_is_not_empty_, NULL)  == 0 &&
246






43
    pthread_cond_init(&queue_is_not_full_, NULL)   == 0);
247
248


43
  assert(successful);
249
43
}
250
251
252
template <class T>
253
58
FifoChannel<T>::~FifoChannel() {
254
43
  pthread_cond_destroy(&queue_is_not_empty_);
255
43
  pthread_cond_destroy(&queue_is_not_full_);
256


43
  pthread_mutex_destroy(&mutex_);
257
101
}
258
259
260
template <class T>
261
1000289
void FifoChannel<T>::Enqueue(const T &data) {
262
1000289
  MutexLockGuard lock(mutex_);
263
264
  // wait for space in the queue
265


2003512
  while (this->size() >= maximal_queue_length_) {
266
2934
    pthread_cond_wait(&queue_is_not_full_, &mutex_);
267
  }
268
269
  // put something into the queue
270
1000289
  this->push(data);
271
272
  // wake all waiting threads
273
1000289
  pthread_cond_broadcast(&queue_is_not_empty_);
274
1000289
}
275
276
277
template <class T>
278
999832
const T FifoChannel<T>::Dequeue() {
279
999832
  MutexLockGuard lock(mutex_);
280
281
  // wait until there is something to do
282


2162793
  while (this->empty()) {
283
162435
    pthread_cond_wait(&queue_is_not_empty_, &mutex_);
284
  }
285
286
  // get the item from the queue
287
1000179
  T data = this->front(); this->pop();
288
289
  // signal waiting threads about the free space
290


1000179
  if (this->size() < queue_drainout_threshold_) {
291
393502
    pthread_cond_broadcast(&queue_is_not_full_);
292
  }
293
294
  // return the acquired job
295
1000179
  return data;
296
}
297
298
299
template <class T>
300
34
unsigned int FifoChannel<T>::Drop() {
301
34
  MutexLockGuard lock(mutex_);
302
303
34
  unsigned int dropped_items = 0;
304


167
  while (!this->empty()) {
305
99
    this->pop();
306
99
    ++dropped_items;
307
  }
308
309
34
  pthread_cond_broadcast(&queue_is_not_full_);
310
311
34
  return dropped_items;
312
}
313
314
315
template <class T>
316
2
size_t FifoChannel<T>::GetItemCount() const {
317
2
  MutexLockGuard lock(mutex_);
318
2
  return this->size();
319
}
320
321
322
template <class T>
323
8972
bool FifoChannel<T>::IsEmpty() const {
324
8972
  MutexLockGuard lock(mutex_);
325
8972
  return this->empty();
326
}
327
328
329
template <class T>
330
1
size_t FifoChannel<T>::GetMaximalItemCount() const {
331
1
  return maximal_queue_length_;
332
}
333
334
335
//
336
// +----------------------------------------------------------------------------
337
// |  ConcurrentWorkers
338
//
339
340
341
template <class WorkerT>
342
ConcurrentWorkers<WorkerT>::ConcurrentWorkers(
343
          const size_t                                  number_of_workers,
344
          const size_t                                  maximal_queue_length,
345
          ConcurrentWorkers<WorkerT>::worker_context_t *worker_context) :
346
  number_of_workers_(number_of_workers),
347
  worker_context_(worker_context),
348
  thread_context_(this, worker_context_),
349
  initialized_(false),
350
  running_(false),
351
  workers_started_(0),
352
  jobs_queue_(maximal_queue_length, maximal_queue_length / 4 + 1),
353
  results_queue_(maximal_queue_length, 1)
354
{
355
  assert(maximal_queue_length >= number_of_workers);
356
  assert(number_of_workers >  0);
357
358
  atomic_init32(&jobs_pending_);
359
  atomic_init32(&jobs_failed_);
360
  atomic_init64(&jobs_processed_);
361
}
362
363
364
template <class WorkerT>
365
ConcurrentWorkers<WorkerT>::~ConcurrentWorkers() {
366
  if (IsRunning()) {
367
    Terminate();
368
  }
369
370
  // destroy some synchronisation data structures
371
  pthread_cond_destroy(&worker_started_);
372
  pthread_cond_destroy(&jobs_all_done_);
373
  pthread_mutex_destroy(&status_mutex_);
374
  pthread_mutex_destroy(&jobs_all_done_mutex_);
375
}
376
377
378
template <class WorkerT>
379
bool ConcurrentWorkers<WorkerT>::Initialize() {
380
  LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Initializing ConcurrentWorker "
381
                                            "object with %d worker threads "
382
                                            "and a queue length of %d",
383
           number_of_workers_, jobs_queue_.GetMaximalItemCount());
384
  // LogCvmfs(kLogConcurrency, kLogStdout, "sizeof(expected_data_t): %d\n"
385
  //                                           "sizeof(returned_data_t): %d",
386
  //          sizeof(expected_data_t), sizeof(returned_data_t));
387
388
  // initialize synchronisation for job queue (Workers)
389
  if (pthread_mutex_init(&status_mutex_, NULL)          != 0 ||
390
      pthread_mutex_init(&jobs_all_done_mutex_, NULL)   != 0 ||
391
      pthread_cond_init(&worker_started_, NULL)         != 0 ||
392
      pthread_cond_init(&jobs_all_done_, NULL)          != 0) {
393
    return false;
394
  }
395
396
  // spawn the Worker objects in their own threads
397
  if (!SpawnWorkers()) {
398
    LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn workers");
399
    return false;
400
  }
401
402
  // all done...
403
  initialized_ = true;
404
  return true;
405
}
406
407
408
template <class WorkerT>
409
bool ConcurrentWorkers<WorkerT>::SpawnWorkers() {
410
  assert(worker_threads_.size() == 0);
411
  worker_threads_.resize(number_of_workers_);
412
413
  // set the running flag to trap workers in their treadmills
414
  StartRunning();
415
416
  // spawn the swarm and make them work
417
  bool success = true;
418
  WorkerThreads::iterator i          = worker_threads_.begin();
419
  WorkerThreads::const_iterator iend = worker_threads_.end();
420
  for (; i != iend; ++i) {
421
    pthread_t* thread = &(*i);
422
    const int retval = pthread_create(
423
      thread,
424
      NULL,
425
      &ConcurrentWorkers<WorkerT>::RunWorker,
426
      reinterpret_cast<void *>(&thread_context_));
427
    if (retval != 0) {
428
      LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn a Worker");
429
      success = false;
430
    }
431
  }
432
433
  // spawn the callback processing thread
434
  const int retval =
435
    pthread_create(
436
      &callback_thread_,
437
      NULL,
438
      &ConcurrentWorkers<WorkerT>::RunCallbackThreadWrapper,
439
      reinterpret_cast<void *>(&thread_context_));
440
    if (retval != 0) {
441
      LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn the callback "
442
                                             "worker thread");
443
      success = false;
444
    }
445
446
  // wait for all workers to report in...
447
  {
448
    MutexLockGuard guard(status_mutex_);
449
    // +1 -> callback thread
450
    while (workers_started_ < number_of_workers_ + 1) {
451
      pthread_cond_wait(&worker_started_, &status_mutex_);
452
    }
453
  }
454
455
  // all done...
456
  return success;
457
}
458
459
460
template <class WorkerT>
461
void* ConcurrentWorkers<WorkerT>::RunWorker(void *run_binding) {
462
  // NOTE: This is the actual worker thread code!
463
464
  //
465
  // INITIALIZATION
466
  /////////////////
467
468
  // get contextual information
469
  const WorkerRunBinding &binding =
470
    *(static_cast<WorkerRunBinding*>(run_binding));
471
  ConcurrentWorkers<WorkerT> *master         = binding.delegate;
472
  const worker_context_t     *worker_context = binding.worker_context;
473
474
  // boot up the worker object and make sure it works
475
  WorkerT worker(worker_context);
476
  worker.RegisterMaster(master);
477
  const bool init_success = worker.Initialize();
478
479
  // tell the master that this worker was started
480
  master->ReportStartedWorker();
481
482
  if (!init_success) {
483
    LogCvmfs(kLogConcurrency, kLogWarning, "Worker was not initialized "
484
                                           "properly... it will die now!");
485
    return NULL;
486
  }
487
488
  //
489
  // PROCESSING LOOP
490
  //////////////////
491
492
  // start the processing loop
493
  LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Starting Worker...");
494
  while (master->IsRunning()) {
495
    // acquire a new job
496
    WorkerJob job = master->Acquire();
497
498
    // check if we need to terminate
499
    if (job.is_death_sentence)
500
      break;
501
502
    // do what you are supposed to do
503
    worker(job.data);
504
  }
505
506
  //
507
  // TEAR DOWN
508
  ////////////
509
510
  // give the worker the chance to tidy up
511
  worker.TearDown();
512
513
  // good bye thread...
514
  LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Terminating Worker...");
515
  return NULL;
516
}
517
518
519
template <class WorkerT>
520
void* ConcurrentWorkers<WorkerT>::RunCallbackThreadWrapper(void *run_binding) {
521
  const RunBinding &binding = *(static_cast<RunBinding*>(run_binding));
522
  ConcurrentWorkers<WorkerT> *master = binding.delegate;
523
524
  master->ReportStartedWorker();
525
526
  LogCvmfs(kLogConcurrency, kLogVerboseMsg,
527
           "Started dedicated callback worker");
528
  master->RunCallbackThread();
529
  LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Terminating Callback Worker...");
530
531
  return NULL;
532
}
533
534
535
template <class WorkerT>
536
void ConcurrentWorkers<WorkerT>::RunCallbackThread() {
537
  while (IsRunning()) {
538
    const CallbackJob callback_job = results_queue_.Dequeue();
539
540
    // stop callback processing if needed
541
    if (callback_job.is_death_sentence) {
542
      break;
543
    }
544
545
    // notify all observers about the finished job
546
    this->NotifyListeners(callback_job.data);
547
548
    // remove the job from the pending 'list' and add it to the ready 'list'
549
    atomic_dec32(&jobs_pending_);
550
    atomic_inc64(&jobs_processed_);
551
552
    // signal the Spooler that all jobs are done...
553
    if (atomic_read32(&jobs_pending_) == 0) {
554
      pthread_cond_broadcast(&jobs_all_done_);
555
    }
556
  }
557
}
558
559
560
template <class WorkerT>
561
void ConcurrentWorkers<WorkerT>::ReportStartedWorker() const {
562
  MutexLockGuard lock(status_mutex_);
563
  ++workers_started_;
564
  pthread_cond_signal(&worker_started_);
565
}
566
567
568
template <class WorkerT>
569
void ConcurrentWorkers<WorkerT>::Schedule(WorkerJob job) {
570
  // Note: This method can be called from arbitrary threads. Thus we do not
571
  //       necessarily have just one producer in the system.
572
573
  // check if it makes sense to schedule this job
574
  if (!IsRunning() && !job.is_death_sentence) {
575
    LogCvmfs(kLogConcurrency, kLogWarning, "Tried to schedule a job but "
576
                                           "concurrency was not running...");
577
    return;
578
  }
579
580
  jobs_queue_.Enqueue(job);
581
  if (!job.is_death_sentence) {
582
    atomic_inc32(&jobs_pending_);
583
  }
584
}
585
586
587
template <class WorkerT>
588
void ConcurrentWorkers<WorkerT>::ScheduleDeathSentences() {
589
  assert(!IsRunning());
590
591
  // make sure that the queue is empty before we schedule a death sentence
592
  TruncateJobQueue();
593
594
  // schedule a death sentence for each running thread
595
  const unsigned int number_of_workers = GetNumberOfWorkers();
596
  for (unsigned int i = 0; i < number_of_workers; ++i) {
597
    Schedule(WorkerJob());
598
  }
599
600
  // schedule a death sentence for the callback thread
601
  results_queue_.Enqueue(CallbackJob());
602
}
603
604
605
template <class WorkerT>
606
typename ConcurrentWorkers<WorkerT>::WorkerJob
607
  ConcurrentWorkers<WorkerT>::Acquire()
608
{
609
  // Note: This method is exclusively called inside the worker threads!
610
  //       Any other usage might produce undefined behavior.
611
  return jobs_queue_.Dequeue();
612
}
613
614
615
template <class WorkerT>
616
void ConcurrentWorkers<WorkerT>::TruncateJobQueue(const bool forget_pending) {
617
  // Note: This method will throw away all jobs currently waiting in the job
618
  //       queue. These jobs will not be processed!
619
  const unsigned int dropped_jobs = jobs_queue_.Drop();
620
621
  // if desired, we remove the jobs from the pending 'list'
622
  if (forget_pending) {
623
    atomic_xadd32(&jobs_pending_, -dropped_jobs);
624
  }
625
}
626
627
628
template <class WorkerT>
629
void ConcurrentWorkers<WorkerT>::Terminate() {
630
  // Note: this method causes workers to die immediately after they finished
631
  //       their last acquired job. To make sure that each worker will check
632
  //       the running state, we schedule empty jobs or Death Sentences.
633
634
  assert(IsRunning());
635
636
  // unset the running flag (causing threads to die on the next checkpoint)
637
  StopRunning();
638
639
  // schedule empty jobs to make sure that each worker will actually reach the
640
  // next checkpoint in their processing loop and terminate as expected
641
  ScheduleDeathSentences();
642
643
  // wait for the worker threads to return
644
  WorkerThreads::const_iterator i    = worker_threads_.begin();
645
  WorkerThreads::const_iterator iend = worker_threads_.end();
646
  for (; i != iend; ++i) {
647
    pthread_join(*i, NULL);
648
  }
649
650
  // wait for the callback worker thread
651
  pthread_join(callback_thread_, NULL);
652
653
  // check if we finished all pending jobs
654
  const int pending = atomic_read32(&jobs_pending_);
655
  if (pending > 0) {
656
    LogCvmfs(kLogConcurrency, kLogWarning, "Job queue was not fully processed. "
657
                                           "Still %d jobs were pending and "
658
                                           "will not be executed anymore.",
659
             pending);
660
  }
661
662
  // check if we had failed jobs
663
  const int failed = atomic_read32(&jobs_failed_);
664
  if (failed > 0) {
665
    LogCvmfs(kLogConcurrency, kLogWarning, "We've had %d failed jobs.",
666
             failed);
667
  }
668
669
  // thanks, and good bye...
670
  LogCvmfs(kLogConcurrency, kLogVerboseMsg,
671
           "All workers stopped. They processed %d jobs. Terminating...",
672
           atomic_read64(&jobs_processed_));
673
}
674
675
676
template <class WorkerT>
677
void ConcurrentWorkers<WorkerT>::WaitForEmptyQueue() const {
678
  LogCvmfs(kLogConcurrency, kLogVerboseMsg,
679
           "Waiting for %d jobs to be finished", atomic_read32(&jobs_pending_));
680
681
  // wait until all pending jobs are processed
682
  {
683
    MutexLockGuard lock(jobs_all_done_mutex_);
684
    while (atomic_read32(&jobs_pending_) > 0) {
685
      pthread_cond_wait(&jobs_all_done_, &jobs_all_done_mutex_);
686
    }
687
  }
688
689
  LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Jobs are done... go on");
690
}
691
692
693
template <class WorkerT>
694
void ConcurrentWorkers<WorkerT>::WaitForTermination() {
695
  if (!IsRunning())
696
    return;
697
698
  WaitForEmptyQueue();
699
  Terminate();
700
}
701
702
703
template <class WorkerT>
704
void ConcurrentWorkers<WorkerT>::JobDone(
705
              const ConcurrentWorkers<WorkerT>::returned_data_t& data,
706
              const bool                                         success) {
707
  // BEWARE!
708
  // This is a callback method that might be called from a different thread!
709
710
  // check if the finished job was successful
711
  if (!success) {
712
    atomic_inc32(&jobs_failed_);
713
    LogCvmfs(kLogConcurrency, kLogWarning, "Job failed");
714
  }
715
716
  // queue the result in the callback channel
717
  results_queue_.Enqueue(CallbackJob(data));
718
}
719
720
#ifdef CVMFS_NAMESPACE_GUARD
721
}  // namespace CVMFS_NAMESPACE_GUARD
722
#endif
723
724
#endif  // CVMFS_UTIL_CONCURRENCY_IMPL_H_