GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/util_concurrency.h Lines: 65 98 66.3 %
Date: 2019-02-03 02:48:13 Branches: 6 18 33.3 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#ifndef CVMFS_UTIL_CONCURRENCY_H_
6
#define CVMFS_UTIL_CONCURRENCY_H_
7
8
#include <pthread.h>
9
10
#include <cassert>
11
#include <queue>
12
#include <set>
13
#include <vector>
14
15
#include "atomic.h"
16
#include "util/async.h"
17
#include "util/single_copy.h"
18
19
#ifdef CVMFS_NAMESPACE_GUARD
20
namespace CVMFS_NAMESPACE_GUARD {
21
#endif
22
23
/**
24
 * Implements a simple interface to lock objects of derived classes. Classes that
25
 * inherit from Lockable are also usable with the LockGuard template for scoped
26
 * locking semantics.
27
 *
28
 * Note: a Lockable object should not be copied!
29
 */
30
class Lockable : SingleCopy {
31
 public:
32
1
  inline virtual ~Lockable() {        pthread_mutex_destroy(&mutex_); }
33
34
1
  void Lock()    const       {        pthread_mutex_lock(&mutex_);    }
35
2
  int  TryLock() const       { return pthread_mutex_trylock(&mutex_); }
36
1
  void Unlock()  const       {        pthread_mutex_unlock(&mutex_);  }
37
38
 protected:
39
1
  Lockable() {
40
1
    const int retval = pthread_mutex_init(&mutex_, NULL);
41
1
    assert(retval == 0);
42
1
  }
43
44
 private:
45
  mutable pthread_mutex_t mutex_;
46
};
47
48
49
//
50
// -----------------------------------------------------------------------------
51
//
52
53
/**
54
 * Used to allow for static polymorphism in the RAII template to statically
55
 * decide which 'lock' functions to use, if we have more than one possiblity.
56
 * (I.e. Read/Write locks)
57
 * Note: Static Polymorphism - Strategy Pattern
58
 *
59
 * TODO: eventually replace this by C++11 typed enum
60
 */
61
struct _RAII_Polymorphism {
62
  enum T {
63
    None,
64
    ReadLock,
65
    WriteLock
66
  };
67
};
68
69
70
/**
71
 * Basic template wrapper class for any kind of RAII-like behavior.
72
 * The user is supposed to provide a template specialization of Enter() and
73
 * Leave(). On creation of the RAII object it will call Enter() respectively
74
 * Leave() on destruction. The gold standard example is a LockGard (see below).
75
 *
76
 * Note: Resource Acquisition Is Initialization (Bjarne Stroustrup)
77
 */
78
template <typename T, _RAII_Polymorphism::T P = _RAII_Polymorphism::None>
79
class RAII : SingleCopy {
80
 public:
81
148841949
  inline explicit RAII(T &object) : ref_(object)  { Enter(); }
82
4854119
  inline explicit RAII(T *object) : ref_(*object) { Enter(); }
83
154762214
  inline ~RAII()                         { Leave(); }
84
85
 protected:
86
2
  inline void Enter() { ref_.Lock();   }
87
2
  inline void Leave() { ref_.Unlock(); }
88
89
 private:
90
  T &ref_;
91
};
92
93
94
/**
95
 * This is a simple scoped lock implementation. Every object that provides the
96
 * methods Lock() and Unlock() should work with it. Classes that will be used
97
 * with this template should therefore simply inherit from Lockable.
98
 *
99
 * Creating a LockGuard object on the stack will lock the provided object. When
100
 * the LockGuard runs out of scope it will automatically release the lock. This
101
 * ensures a clean unlock in a lot of situations!
102
 *
103
 * TODO: C++11 replace this by a type alias to RAII
104
 */
105
template <typename LockableT>
106
2
class LockGuard : public RAII<LockableT> {
107
 public:
108
2
  inline explicit LockGuard(LockableT *object) : RAII<LockableT>(object) {}
109
};
110
111
112
template <>
113
153548895
inline void RAII<pthread_mutex_t>::Enter() { pthread_mutex_lock(&ref_);   }
114
template <>
115
154677108
inline void RAII<pthread_mutex_t>::Leave() { pthread_mutex_unlock(&ref_); }
116
typedef RAII<pthread_mutex_t> MutexLockGuard;
117
118
119
template <>
120
39182
inline void RAII<pthread_rwlock_t,
121
                 _RAII_Polymorphism::ReadLock>::Enter() {
122
39182
  pthread_rwlock_rdlock(&ref_);
123
39182
}
124
template <>
125
39182
inline void RAII<pthread_rwlock_t,
126
                 _RAII_Polymorphism::ReadLock>::Leave() {
127
39182
  pthread_rwlock_unlock(&ref_);
128
39182
}
129
template <>
130
46282
inline void RAII<pthread_rwlock_t,
131
                 _RAII_Polymorphism::WriteLock>::Enter() {
132
46282
  pthread_rwlock_wrlock(&ref_);
133
46282
}
134
template <>
135
46282
inline void RAII<pthread_rwlock_t,
136
                 _RAII_Polymorphism::WriteLock>::Leave() {
137
46282
  pthread_rwlock_unlock(&ref_);
138
46282
}
139
typedef RAII<pthread_rwlock_t, _RAII_Polymorphism::ReadLock>  ReadLockGuard;
140
typedef RAII<pthread_rwlock_t, _RAII_Polymorphism::WriteLock> WriteLockGuard;
141
142
143
//
144
// -----------------------------------------------------------------------------
145
//
146
147
148
/**
149
 * This is a simple implementation of a Future wrapper template.
150
 * It is used as a proxy for results that are computed asynchronously and might
151
 * not be available on the first access.
152
 * Since this is a very simple implementation one needs to use the Future's
153
 * Get() and Set() methods to obtain the containing data resp. to write it.
154
 * Note: More than a single call to Set() is prohibited!
155
 *       If Get() is called before Set() the calling thread will block until
156
 *       the value has been set by a different thread.
157
 *
158
 * @param T  the value type wrapped by this Future template
159
 */
160
template <typename T>
161
class Future : SingleCopy {
162
 public:
163
  Future();
164
  virtual ~Future();
165
166
  /**
167
   * Save an asynchronously computed value into the Future. This potentially
168
   * unblocks threads that already wait for the value.
169
   * @param object  the value object to be set
170
   */
171
  void     Set(const T &object);
172
173
  /**
174
   * Retrieves the wrapped value object. If the value is not yet available it
175
   * will automatically block until a different thread calls Set().
176
   * @return  the containing value object
177
   */
178
  T&       Get();
179
  const T& Get() const;
180
181
 protected:
182
  void Wait() const;
183
184
 private:
185
  T                       object_;
186
  mutable pthread_mutex_t mutex_;
187
  mutable pthread_cond_t  object_set_;
188
  bool                    object_was_set_;
189
};
190
191
192
//
193
// -----------------------------------------------------------------------------
194
//
195
196
197
/**
198
 * This counter can be counted up and down using the usual increment/decrement
199
 * operators. It allows threads to wait for it to become zero as well as to
200
 * block when a specified maximal value would be exceeded by an increment.
201
 *
202
 * Note: If a maximal value is specified on creation, the SynchronizingCounter
203
 *       is assumed to never leave the interval [0, maximal_value]! Otherwise
204
 *       the numerical limits of the specified template parameter define this
205
 *       interval and an increment _never_ blocks.
206
 *
207
 * Caveat: This implementation uses a simple mutex mechanism and therefore might
208
 *         become a scalability bottle neck!
209
 */
210
template <typename T>
211
class SynchronizingCounter : SingleCopy {
212
 public:
213
8
  SynchronizingCounter() :
214
8
    value_(T(0)), maximal_value_(T(0)) { Initialize(); }
215
216
338
  explicit SynchronizingCounter(const T maximal_value)
217
    : value_(T(0))
218
338
    , maximal_value_(maximal_value)
219
  {
220
338
    assert(maximal_value > T(0));
221
338
    Initialize();
222
338
  }
223
224
346
  ~SynchronizingCounter() { Destroy(); }
225
226
72787506
  T Increment() {
227
72787506
    MutexLockGuard l(mutex_);
228
72855615
    WaitForFreeSlotUnprotected();
229
72855615
    SetValueUnprotected(value_ + T(1));
230
72855615
    return value_;
231
  }
232
233
72576889
  T Decrement() {
234
72576889
    MutexLockGuard l(mutex_);
235
72855682
    SetValueUnprotected(value_ - T(1));
236
72855682
    return value_;
237
  }
238
239
188
  void WaitForZero() const {
240
188
    MutexLockGuard l(mutex_);
241
505
    while (value_ != T(0)) {
242
129
      pthread_cond_wait(&became_zero_, &mutex_);
243
    }
244
188
    assert(value_ == T(0));
245
188
  }
246
247
437266806
  bool HasMaximalValue() const { return maximal_value_ != T(0); }
248
9
  T      maximal_value() const { return maximal_value_;         }
249
250
590650
  T operator++()    { return Increment();        }
251
4
  T operator++(int) { return Increment() - T(1); }
252
590674
  T operator--()    { return Decrement();        }
253
32
  T operator--(int) { return Decrement() + T(1); }
254
255
13491
  operator T() const {
256
13491
    MutexLockGuard l(mutex_);
257
13491
    return value_;
258
  }
259
260
44
  SynchronizingCounter<T>& operator=(const T &other) {
261
44
    MutexLockGuard l(mutex_);
262
44
    SetValueUnprotected(other);
263
44
    return *this;
264
  }
265
266
 protected:
267
  void SetValueUnprotected(const T new_value);
268
  void WaitForFreeSlotUnprotected();
269
270
 private:
271
  void Initialize();
272
  void Destroy();
273
274
 private:
275
        T                 value_;
276
  const T                 maximal_value_;
277
278
  mutable pthread_mutex_t mutex_;
279
  mutable pthread_cond_t  became_zero_;
280
          pthread_cond_t  free_slot_;
281
};
282
283
284
//
285
// -----------------------------------------------------------------------------
286
//
287
288
289
template <typename ParamT>
290
class Observable;
291
292
293
/**
294
 * This is a base class for classes that need to expose a callback interface for
295
 * asynchronous callback methods. One can register an arbitrary number of
296
 * observers on an Observable that get notified when the method NotifyListeners()
297
 * is invoked.
298
 *
299
 * Note: the registration and invocation of callbacks in Observable is thread-
300
 *       safe, but be aware that the callbacks of observing classes might run in
301
 *       arbitrary threads. When using these classes, you should take extra care
302
 *       for thread-safety.
303
 *
304
 * Note: The RegisterListener() methods return a pointer to a CallbackBase.
305
 *       You MUST NOT free these objects, they are managed by the Observable
306
 *       class. Use them only as handles to unregister specific callbacks.
307
 *
308
 * @param ParamT   the type of the parameter that is passed to every callback
309
 *                 invocation.
310
 */
311
template <typename ParamT>
312
class Observable : public Callbackable<ParamT>,
313
                   SingleCopy {
314
 public:
315
  typedef typename Callbackable<ParamT>::CallbackTN*  CallbackPtr;
316
 protected:
317
  typedef std::set<CallbackPtr>                       Callbacks;
318
319
 public:
320
  virtual ~Observable();
321
322
  /**
323
   * Registers a method of a specific object as a listener to the Observable
324
   * object. The method is invoked on the given delegate when the callback is
325
   * fired by the observed object using NotifyListeners(). Since this is meant
326
   * to be a closure, it also passes the third argument to the method being in-
327
   * voked by the Observable object.
328
   *
329
   * @param DelegateT  the type of the delegate object
330
   * @param method     a pointer to the method to be invoked by the callback
331
   * @param delegate   a pointer to the object to invoke the callback on
332
   * @param closure    something to be passed to `method`
333
   * @return  a handle to the registered callback
334
   */
335
  template <class DelegateT, class ClosureDataT>
336
  CallbackPtr RegisterListener(
337
          typename BoundClosure<ParamT,
338
                                DelegateT,
339
                                ClosureDataT>::CallbackMethod   method,
340
          DelegateT                                            *delegate,
341
          ClosureDataT                                          data);
342
343
  /**
344
   * Registers a method of a specific object as a listener to the Observable
345
   * object. The method is invoked on the given delegate when the callback is
346
   * fired by the observed object using NotifyListeners().
347
   *
348
   * @param DelegateT  the type of the delegate object
349
   * @param method     a pointer to the method to be invoked by the callback
350
   * @param delegate   a pointer to the object to invoke the callback on
351
   * @return  a handle to the registered callback
352
   */
353
  template <class DelegateT>
354
  CallbackPtr RegisterListener(
355
          typename BoundCallback<ParamT, DelegateT>::CallbackMethod method,
356
          DelegateT *delegate);
357
358
  /**
359
   * Registers a static class member or a C-like function as a callback to the
360
   * Observable object. The function is invoked when the callback is fired by
361
   * the observed object using NotifyListeners().
362
   *
363
   * @param fn  a pointer to the function to be called by the callback
364
   * @return  a handle to the registered callback
365
   */
366
  CallbackPtr RegisterListener(typename Callback<ParamT>::CallbackFunction fn);
367
368
  /**
369
   * Removes the given callback from the listeners group of this Observable.
370
   *
371
   * @param callback_object  a callback handle that was returned by
372
   *                         RegisterListener() before.
373
   */
374
  void UnregisterListener(CallbackPtr callback_object);
375
376
  /**
377
   * Removes all listeners from the Observable
378
   */
379
  void UnregisterListeners();
380
381
 protected:
382
  Observable();  // don't instantiate this as a stand alone object
383
384
  void RegisterListener(CallbackPtr callback_object);
385
386
  /**
387
   * Notifies all registered listeners and passes them the provided argument
388
   * This method should be called by a derived class to send out asynchronous
389
   * messages to registered observers.
390
   *
391
   * @param parameter  the data to be passed to the observers
392
   */
393
  void NotifyListeners(const ParamT &parameter);
394
395
 private:
396
  Callbacks                    listeners_;         //!< the set of registered
397
                                                   //!< callback objects
398
  mutable pthread_rwlock_t     listeners_rw_lock_;
399
};
400
401
402
//
403
// -----------------------------------------------------------------------------
404
//
405
406
407
/**
408
 * Returns the number of CPU cores present in the system or a fallback number
409
 * if it failed to determine the number of CPU cores.
410
 *
411
 * @return  the number of active CPU cores in the system
412
 */
413
unsigned int GetNumberOfCpuCores();
414
static const unsigned int kFallbackNumberOfCpus = 1;
415
416
417
/**
418
 * A blocking signal for thread synchronization
419
 */
420
class Signal : SingleCopy {
421
 public:
422
  Signal();
423
  ~Signal();
424
  void Wakeup();
425
  void Wait();
426
  bool IsSleeping();
427
428
 private:
429
  bool fired_;
430
  pthread_mutex_t lock_;
431
  pthread_cond_t signal_;
432
};
433
434
435
//
436
// -----------------------------------------------------------------------------
437
//
438
439
440
/**
441
 * Asynchronous FIFO channel template
442
 * Implements a thread safe FIFO queue that handles thread blocking if the queue
443
 * is full or empty.
444
 *
445
 * @param T   the data type to be enqueued in the queue
446
 */
447
template <class T>
448
class FifoChannel : protected std::queue<T> {
449
 public:
450
  /**
451
   * Creates a new FIFO channel.
452
   *
453
   * @param maximal_length      the maximal number of items that can be enqueued
454
   * @param drainout_threshold  if less than xx elements are in the queue it is
455
   *                            considered to be "not full"
456
   */
457
  FifoChannel(const size_t maximal_length,
458
              const size_t drainout_threshold);
459
  virtual ~FifoChannel();
460
461
  /**
462
   * Adds a new item to the end of the FIFO channel. If the queue is full, this
463
   * call will block until items were dequeued by another thread allowing the
464
   * desired insertion.
465
   *
466
   * @param data  the data to be enqueued into the FIFO channel
467
   */
468
  void Enqueue(const T &data);
469
470
  /**
471
   * Removes the next element from the channel. If the queue is empty, this will
472
   * block until another thread enqueues an item into the channel.
473
   *
474
   * @return  the first item in the channel queue
475
   */
476
  const T Dequeue();
477
478
  /**
479
   * Clears all items in the FIFO channel. The cleared items will be lost.
480
   *
481
   * @return  the number of dropped items
482
   */
483
  unsigned int Drop();
484
485
  inline size_t GetItemCount() const;
486
  inline bool   IsEmpty() const;
487
  inline size_t GetMaximalItemCount() const;
488
489
 private:
490
  // general configuration
491
  const   size_t             maximal_queue_length_;
492
  const   size_t             queue_drainout_threshold_;
493
494
  // thread synchronisation structures
495
  mutable pthread_mutex_t    mutex_;
496
  mutable pthread_cond_t     queue_is_not_empty_;
497
  mutable pthread_cond_t     queue_is_not_full_;
498
};
499
500
501
/**
502
 * This template implements a generic producer/consumer approach to concurrent
503
 * worker tasks. It spawns a given number of Workers derived from the base class
504
 * ConcurrentWorker and uses them to distribute the work load onto concurrent
505
 * threads.
506
 * One can have multiple producers, that use Schedule() to post new work into
507
 * a central job queue, which in turn is processed concurrently by the Worker
508
 * objects in multiple threads. Furthermore the template provides an interface
509
 * to control the worker swarm, i.e. to wait for their completion or cancel them
510
 * before all jobs are processed.
511
 *
512
 * Note: A worker is a class inheriting from ConcurrentWorker that needs to meet
513
 *       a couple of requirements. See the documentation of ConcurrentWorker for
514
 *       additional details.
515
 *
516
 * @param WorkerT   the class to be used as a worker for a concurrent worker
517
 *                  swarm
518
 */
519
template <class WorkerT>
520
class ConcurrentWorkers : public Observable<typename WorkerT::returned_data> {
521
 public:
522
  // these data types must be defined by the worker class
523
  /**
524
   * Input data type
525
   */
526
  typedef typename WorkerT::expected_data  expected_data_t;
527
  /**
528
   * Output data type
529
   */
530
  typedef typename WorkerT::returned_data  returned_data_t;
531
  /**
532
   * Common context type
533
   */
534
  typedef typename WorkerT::worker_context worker_context_t;
535
536
 protected:
537
  typedef std::vector<pthread_t>           WorkerThreads;
538
539
  /**
540
   * This is a simple wrapper structure to piggy-back control information on
541
   * scheduled jobs. Job structures are scheduled into a central FIFO queue and
542
   * are then processed concurrently by the workers.
543
   */
544
  template <class DataT>
545
  struct Job {
546
    explicit Job(const DataT &data) :
547
      data(data),
548
      is_death_sentence(false) {}
549
    Job() :
550
      data(),
551
      is_death_sentence(true) {}
552
    const DataT  data;               //!< job payload
553
    const bool   is_death_sentence;  //!< death sentence flag
554
  };
555
  typedef Job<expected_data_t> WorkerJob;
556
  typedef Job<returned_data_t> CallbackJob;
557
558
  /**
559
   * Provides a wrapper for initialization data passed to newly spawned worker
560
   * threads for initialization.
561
   * It contains a pointer to the spawning ConcurrentWorkers master object as
562
   * well as a pointer to a context object defined by the concrete worker to be
563
   * spawned.
564
   */
565
  struct RunBinding {
566
    explicit RunBinding(ConcurrentWorkers<WorkerT> *delegate) :
567
      delegate(delegate) {}
568
    ConcurrentWorkers<WorkerT> *delegate;       //!< delegate to the Concurrent-
569
                                                //!<  Workers master
570
  };
571
572
  struct WorkerRunBinding : RunBinding {
573
    WorkerRunBinding(ConcurrentWorkers<WorkerT> *delegate,
574
                     const worker_context_t     *worker_context) :
575
      RunBinding(delegate),
576
      worker_context(worker_context) {}
577
    /**
578
     * WorkerT defined context objects for worker init.
579
     */
580
    const worker_context_t     *worker_context;
581
  };
582
583
 public:
584
  /**
585
   * Creates a ConcurrentWorkers master object that encapsulates the actual
586
   * workers.
587
   *
588
   * @param number_of_workers     the number of concurrent workers to be spawned
589
   * @param maximal_queue_length  the maximal length of the job queue
590
   *                              (>= number_of_workers)
591
   * @param worker_context        a pointer to the WorkerT defined context object
592
   */
593
  ConcurrentWorkers(const size_t      number_of_workers,
594
                    const size_t      maximal_queue_length,
595
                    worker_context_t *worker_context = NULL);
596
  virtual ~ConcurrentWorkers();
597
598
  /**
599
   * Initializes the ConcurrentWorkers swarm, spawnes a thread for each new
600
   * worker object and puts everything into working state.
601
   *
602
   * @return  true if all went fine
603
   */
604
  bool Initialize();
605
606
  /**
607
   * Schedules a new job for processing into the internal job queue. This method
608
   * will block in case the job queue is already full and wait for an empty slot.
609
   *
610
   * @param data  the data to be processed
611
   */
612
  inline void Schedule(const expected_data_t &data) {
613
    Schedule(WorkerJob(data));
614
  }
615
616
  /**
617
   * Shuts down the ConcurrentWorkers object as well as the encapsulated workers
618
   * as soon as possible. Workers will finish their current job and will termi-
619
   * nate afterwards. If no jobs are scheduled they will simply stop waiting for
620
   * new ones and terminate afterwards.
621
   * This method MUST not be called more than once per ConcurrentWorkers.
622
   */
623
  void Terminate();
624
625
  /**
626
   * Waits until the job queue is fully processed
627
   *
628
   * Note: this might lead to undefined behaviour or infinite waiting if other
629
   *       producers still schedule jobs into the job queue.
630
   */
631
  void WaitForEmptyQueue() const;
632
633
  /**
634
   * Waits until the ConcurrentWorkers swarm fully processed the current job
635
   * queue and shuts down afterwards.
636
   *
637
   * Note: just as for WaitForEmptyQueue() this assumes that no other producers
638
   *       schedule jobs in the mean time.
639
   */
640
  void WaitForTermination();
641
642
  inline unsigned int GetNumberOfWorkers() const { return number_of_workers_; }
643
  inline unsigned int GetNumberOfFailedJobs() const {
644
    return atomic_read32(&jobs_failed_);
645
  }
646
647
  /**
648
   * Defines a job as successfully finished.
649
   * DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT!
650
   *
651
   * @param data  the data to be returned back to the user
652
   */
653
  inline void JobSuccessful(const returned_data_t& data) {
654
    JobDone(data, true);
655
  }
656
657
  /**
658
   * Defines a job as failed.
659
   * DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT!
660
   *
661
   * Note: Even for failed jobs the user will get a callback with a data object.
662
   *       You might want to make sure, that this data contains a status flag as
663
   *       well, telling the user what went wrong.
664
   *
665
   * @param data  the data to be returned back to the user
666
   */
667
  inline void JobFailed(const returned_data_t& data) { JobDone(data, false); }
668
669
  void RunCallbackThread();
670
671
 protected:
672
  bool SpawnWorkers();
673
674
  /**
675
   * POSIX conform function for thread entry point. Is invoked for every new
676
   * worker thread and contains the initialization, processing loop and tear
677
   * down of the unique worker objects
678
   *
679
   * @param run_binding  void pointer to a RunBinding structure (C interface)
680
   * @return  NULL in any case
681
   */
682
  static void* RunWorker(void *run_binding);
683
684
  static void* RunCallbackThreadWrapper(void *run_binding);
685
686
  /**
687
   * Tells the master that a worker thread did start. This does not mean, that
688
   * it was initialized successfully.
689
   */
690
  void ReportStartedWorker() const;
691
692
  void Schedule(WorkerJob job);
693
  void ScheduleDeathSentences();
694
695
  /**
696
   * Empties the job queue
697
   *
698
   * @param forget_pending  controls if cancelled jobs should be seen as finished
699
   */
700
  void TruncateJobQueue(const bool forget_pending = false);
701
702
  /**
703
   * Retrieves a job from the job queue. If the job queue is empty it will block
704
   * until there is a new job available for processing.
705
   * THIS METHOD MUST ONLY BE CALLED INSIDE THE WORKER OBJECTS
706
   *
707
   * @return  a job to be processed by a worker
708
   */
709
  inline WorkerJob Acquire();
710
711
  /**
712
   * Controls the asynchronous finishing of a job.
713
   * DO NOT CALL THIS, use JobSuccessful() or JobFailed() wrappers instead.
714
   *
715
   * @param data     the data to be returned to the user
716
   * @param success  flag if job was successful
717
   */
718
  void JobDone(const returned_data_t& data, const bool success = true);
719
720
  inline void StartRunning()    {
721
    MutexLockGuard guard(status_mutex_);
722
    running_ = true;
723
  }
724
  inline void StopRunning()     {
725
    MutexLockGuard guard(status_mutex_);
726
    running_ = false;
727
  }
728
  inline bool IsRunning() const {
729
    MutexLockGuard guard(status_mutex_);
730
    return running_;
731
  }
732
733
 private:
734
  // general configuration
735
  const size_t number_of_workers_;  //!< number of concurrent worker threads
736
  const worker_context_t *worker_context_;  //!< the WorkerT defined context
737
  /**
738
   * The thread context passed to newly spawned threads
739
   */
740
  WorkerRunBinding thread_context_;
741
742
  // status information
743
  bool                     initialized_;
744
  bool                     running_;
745
  mutable unsigned int     workers_started_;
746
  mutable pthread_mutex_t  status_mutex_;
747
  mutable pthread_cond_t   worker_started_;
748
  mutable pthread_mutex_t  jobs_all_done_mutex_;
749
  mutable pthread_cond_t   jobs_all_done_;
750
751
  // worker threads
752
  WorkerThreads            worker_threads_;       //!< list of worker threads
753
  pthread_t                callback_thread_;      //!< handles callback invokes
754
755
  // job queue
756
  typedef FifoChannel<WorkerJob > JobQueue;
757
  JobQueue                 jobs_queue_;
758
  mutable atomic_int32     jobs_pending_;
759
  mutable atomic_int32     jobs_failed_;
760
  mutable atomic_int64     jobs_processed_;
761
762
  // callback channel
763
  typedef FifoChannel<CallbackJob > CallbackQueue;
764
  CallbackQueue results_queue_;
765
};
766
767
768
/**
769
 * Base class for worker classes that should be used in a ConcurrentWorkers
770
 * swarm. These classes need to fulfill a number of requirements in order to
771
 * satisfy the needs of the ConcurrentWorkers template.
772
 *
773
 * Requirements:
774
 *  -> needs to define the following types:
775
 *       - expected_data   - input data structure of the worker
776
 *       - returned_data   - output data structure of the worker
777
 *       - worker_context  - context structure for initialization information
778
 *
779
 *  -> implement a constructor that takes a pointer to its worker_context
780
 *     as its only parameter:
781
 *        AwesomeWorker(const AwesomeWorker::worker_context*)
782
 *     Note: do not rely on the context object to be available after the
783
 *           consturctor has returned!
784
 *
785
 *  -> needs to define the calling-operator expecting one parameter of type:
786
 *     const expected_data& and returning void
787
 *     This will be invoked for every new job the worker should process
788
 *
789
 *  -> inside the implementation of the described calling-operator it needs to
790
 *     invoke either:
791
 *        master()->JobSuccessful(const returned_data&)
792
 *     or:
793
 *        master()->JobFailed(const returned_data&)
794
 *     as its LAST operation before returning.
795
 *     This will keep track of finished jobs and inform the user of Concurrent-
796
 *     Workers about finished jobs.
797
 *
798
 *  -> [optional] overwrite Initialize() and/or TearDown() to do environmental
799
 *     setup work, before or respectively after jobs will be processed
800
 *
801
 * General Reminder:
802
 *   You will be running in a multi-threaded environment here! Buckle up and
803
 *   make suitable preparations to shield yourself from serious head-ache.
804
 *
805
 * Note: This implements a Curiously Recurring Template Pattern
806
 *       (http://en.wikipedia.org/wiki/Curiously_recurring_template_pattern)
807
 *
808
 * @param DerivedWorkerT  the class name of the inheriting class
809
 *        (f.e.   class AwesomeWorker : public ConcurrentWorker<AwesomeWorker>)
810
 */
811
template <class DerivedWorkerT>
812
class ConcurrentWorker : SingleCopy {
813
 public:
814
  virtual ~ConcurrentWorker() {}
815
816
  /**
817
   * Does general initialization before any jobs will get scheduled. You do not
818
   * need to up-call this initialize method, since it is seen as a dummy here.
819
   *
820
   * @returns  true one successful initialization
821
   */
822
  virtual bool Initialize() { return true; }
823
824
  /**
825
   * Does general clean-up after the last job was processed in the worker object
826
   * and it is about to vanish. You do not need to up-call this method.
827
   */
828
  virtual void TearDown() {}
829
830
  /**
831
   * The actual job-processing entry point. See the description of the inheriting
832
   * class requirements to learn about the semantics of this methods.
833
   * DO NOT FORGET TO CALL master()->JobSuccessful() OR master()->JobFinished()
834
   * at the end of thismethod!!
835
   *
836
   * Note: There is no way to generally define this operator, it is therefore
837
   *       commented out and placed here just as a reminder.
838
   *
839
   * @param data  the data to be processed.
840
   */
841
  // void operator()(const expected_data &data); // do the actual job of the
842
                                                 // worker
843
844
 protected:
845
  ConcurrentWorker() : master_(NULL) {}
846
847
  /**
848
   * Gets a pointer to the ConcurrentWorkers object that this worker resides in
849
   *
850
   * @returns  a pointer to the ConcurrentWorkers object
851
   */
852
  inline ConcurrentWorkers<DerivedWorkerT>* master() const { return master_; }
853
854
 private:
855
  friend class ConcurrentWorkers<DerivedWorkerT>;
856
  void RegisterMaster(ConcurrentWorkers<DerivedWorkerT> *master) {
857
    master_ = master;
858
  }
859
860
 private:
861
  ConcurrentWorkers<DerivedWorkerT> *master_;
862
};
863
864
#ifdef CVMFS_NAMESPACE_GUARD
865
}  // namespace CVMFS_NAMESPACE_GUARD
866
#endif
867
868
#include "util_concurrency_impl.h"
869
870
#endif  // CVMFS_UTIL_CONCURRENCY_H_