CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
concurrency.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_UTIL_CONCURRENCY_H_
6 #define CVMFS_UTIL_CONCURRENCY_H_
7 
8 #include <pthread.h>
9 
10 #include <cassert>
11 #include <queue>
12 #include <set>
13 #include <vector>
14 
15 #include "util/async.h"
16 #include "util/atomic.h"
17 #include "util/export.h"
18 #include "util/mutex.h"
19 #include "util/single_copy.h"
20 
21 #ifdef CVMFS_NAMESPACE_GUARD
22 namespace CVMFS_NAMESPACE_GUARD {
23 #endif
24 
30 template <class ItemT>
32  public:
33  Channel() {
34  int retval = pthread_mutex_init(&lock_, NULL);
35  assert(retval == 0);
36  retval = pthread_cond_init(&cond_populated_, NULL);
37  assert(retval == 0);
38  }
39 
41  pthread_cond_destroy(&cond_populated_);
42  pthread_mutex_destroy(&lock_);
43  }
44 
48  std::vector<ItemT *> *StartEnqueueing() {
49  int retval = pthread_mutex_lock(&lock_);
50  assert(retval == 0);
51  return &items_;
52  }
53 
57  void AbortEnqueueing() {
58  int retval = pthread_mutex_unlock(&lock_);
59  assert(retval == 0);
60  }
61 
66  int retval = pthread_cond_signal(&cond_populated_);
67  assert(retval == 0);
68  retval = pthread_mutex_unlock(&lock_);
69  assert(retval == 0);
70  }
71 
72  void PushBack(ItemT *item) {
73  MutexLockGuard lock_guard(&lock_);
74  items_.push_back(item);
75  int retval = pthread_cond_signal(&cond_populated_);
76  assert(retval == 0);
77  }
78 
83  ItemT *PopFront() {
84  MutexLockGuard lock_guard(&lock_);
85  while (items_.size() == 0)
86  pthread_cond_wait(&cond_populated_, &lock_);
87  ItemT *item = items_[0];
88  items_.erase(items_.begin());
89  return item;
90  }
91 
92  private:
96  std::vector<ItemT *> items_;
100  pthread_mutex_t lock_;
104  pthread_cond_t cond_populated_;
105 };
106 
113  public:
114  inline virtual ~Lockable() { pthread_mutex_destroy(&mutex_); }
115 
116  void Lock() const { pthread_mutex_lock(&mutex_); }
117  int TryLock() const { return pthread_mutex_trylock(&mutex_); }
118  void Unlock() const { pthread_mutex_unlock(&mutex_); }
119 
120  protected:
122  const int retval = pthread_mutex_init(&mutex_, NULL);
123  assert(retval == 0);
124  }
125 
126  private:
127  mutable pthread_mutex_t mutex_;
128 };
129 
130 
131 //
132 // -----------------------------------------------------------------------------
133 //
134 
135 
149 template <typename T>
151  public:
153  value_(T(0)), maximal_value_(T(0)) { Initialize(); }
154 
155  explicit SynchronizingCounter(const T maximal_value)
156  : value_(T(0))
157  , maximal_value_(maximal_value)
158  {
159  assert(maximal_value > T(0));
160  Initialize();
161  }
162 
163  ~SynchronizingCounter() { Destroy(); }
164 
165  T Increment() {
166  MutexLockGuard l(mutex_);
167  WaitForFreeSlotUnprotected();
168  SetValueUnprotected(value_ + T(1));
169  return value_;
170  }
171 
172  T Decrement() {
173  MutexLockGuard l(mutex_);
174  SetValueUnprotected(value_ - T(1));
175  return value_;
176  }
177 
178  void WaitForZero() const {
179  MutexLockGuard l(mutex_);
180  while (value_ != T(0)) {
181  pthread_cond_wait(&became_zero_, &mutex_);
182  }
183  assert(value_ == T(0));
184  }
185 
186  bool HasMaximalValue() const { return maximal_value_ != T(0); }
187  T maximal_value() const { return maximal_value_; }
188 
189  T operator++() { return Increment(); }
190  const T operator++(int) { return Increment() - T(1); }
191  T operator--() { return Decrement(); }
192  const T operator--(int) { return Decrement() + T(1); }
193 
194  T Get() const {
195  MutexLockGuard l(mutex_);
196  return value_;
197  }
198 
200  MutexLockGuard l(mutex_);
201  SetValueUnprotected(other);
202  return *this;
203  }
204 
205  protected:
206  void SetValueUnprotected(const T new_value);
207  void WaitForFreeSlotUnprotected();
208 
209  private:
210  void Initialize();
211  void Destroy();
212 
213  private:
215  const T maximal_value_;
216 
217  mutable pthread_mutex_t mutex_;
218  mutable pthread_cond_t became_zero_;
219  pthread_cond_t free_slot_;
220 };
221 
222 
223 //
224 // -----------------------------------------------------------------------------
225 //
226 
227 
228 template <typename ParamT>
230 
231 
250 template <typename ParamT>
251 class Observable : public Callbackable<ParamT>,
252  SingleCopy {
253  public:
255  protected:
256  typedef std::set<CallbackPtr> Callbacks;
257 
258  public:
259  virtual ~Observable();
260 
274  template <class DelegateT, class ClosureDataT>
275  CallbackPtr RegisterListener(
276  typename BoundClosure<ParamT,
277  DelegateT,
278  ClosureDataT>::CallbackMethod method,
279  DelegateT *delegate,
280  ClosureDataT data);
281 
292  template <class DelegateT>
293  CallbackPtr RegisterListener(
295  DelegateT *delegate);
296 
305  CallbackPtr RegisterListener(typename Callback<ParamT>::CallbackFunction fn);
306 
313  void UnregisterListener(CallbackPtr callback_object);
314 
318  void UnregisterListeners();
319 
320  protected:
321  Observable(); // don't instantiate this as a stand alone object
322 
323  void RegisterListener(CallbackPtr callback_object);
324 
332  void NotifyListeners(const ParamT &parameter);
333 
334  private:
336  mutable pthread_rwlock_t listeners_rw_lock_;
338 };
339 
340 
341 //
342 // -----------------------------------------------------------------------------
343 //
344 
345 
352 CVMFS_EXPORT unsigned int GetNumberOfCpuCores();
353 static const unsigned int kFallbackNumberOfCpus = 1;
354 
355 
360  public:
361  Signal();
362  ~Signal();
363  void Wakeup();
364  void Wait();
365  bool IsSleeping();
366 
367  private:
368  bool fired_;
369  pthread_mutex_t lock_;
370  pthread_cond_t signal_;
371 };
372 
373 
374 //
375 // -----------------------------------------------------------------------------
376 //
377 
378 
386 template <class T>
387 class FifoChannel : protected std::queue<T> {
388  public:
396  FifoChannel(const size_t maximal_length,
397  const size_t drainout_threshold);
398  virtual ~FifoChannel();
399 
407  void Enqueue(const T &data);
408 
415  const T Dequeue();
416 
422  unsigned int Drop();
423 
424  inline size_t GetItemCount() const;
425  inline bool IsEmpty() const;
426  inline size_t GetMaximalItemCount() const;
427 
428  private:
429  // general configuration
430  const size_t maximal_queue_length_;
432 
433  // thread synchronisation structures
434  mutable pthread_mutex_t mutex_;
435  mutable pthread_cond_t queue_is_not_empty_;
436  mutable pthread_cond_t queue_is_not_full_;
437 };
438 
439 
458 template <class WorkerT>
459 class ConcurrentWorkers : public Observable<typename WorkerT::returned_data> {
460  public:
461  // these data types must be defined by the worker class
465  typedef typename WorkerT::expected_data expected_data_t;
469  typedef typename WorkerT::returned_data returned_data_t;
473  typedef typename WorkerT::worker_context worker_context_t;
474 
475  protected:
476  typedef std::vector<pthread_t> WorkerThreads;
477 
483  template <class DataT>
484  struct Job {
485  explicit Job(const DataT &data) :
486  data(data),
487  is_death_sentence(false) {}
488  Job() :
489  data(),
490  is_death_sentence(true) {}
491  const DataT data;
492  const bool is_death_sentence;
493  };
494  typedef Job<expected_data_t> WorkerJob;
495  typedef Job<returned_data_t> CallbackJob;
496 
504  struct RunBinding {
506  delegate(delegate) {}
508  };
510 
513  const worker_context_t *worker_context) :
514  RunBinding(delegate),
515  worker_context(worker_context) {}
520  };
521 
522  public:
532  ConcurrentWorkers(const size_t number_of_workers,
533  const size_t maximal_queue_length,
534  worker_context_t *worker_context = NULL);
535  virtual ~ConcurrentWorkers();
536 
543  bool Initialize();
544 
551  inline void Schedule(const expected_data_t &data) {
552  Schedule(WorkerJob(data));
553  }
554 
562  void Terminate();
563 
570  void WaitForEmptyQueue() const;
571 
579  void WaitForTermination();
580 
581  inline unsigned int GetNumberOfWorkers() const { return number_of_workers_; }
582  inline unsigned int GetNumberOfFailedJobs() const {
583  return atomic_read32(&jobs_failed_);
584  }
585 
592  inline void JobSuccessful(const returned_data_t& data) {
593  JobDone(data, true);
594  }
595 
606  inline void JobFailed(const returned_data_t& data) { JobDone(data, false); }
607 
608  void RunCallbackThread();
609 
610  protected:
611  bool SpawnWorkers();
612 
621  static void* RunWorker(void *run_binding);
622 
623  static void* RunCallbackThreadWrapper(void *run_binding);
624 
629  void ReportStartedWorker() const;
630 
631  void Schedule(WorkerJob job);
632  void ScheduleDeathSentences();
633 
639  void TruncateJobQueue(const bool forget_pending = false);
640 
648  inline WorkerJob Acquire();
649 
657  void JobDone(const returned_data_t& data, const bool success = true);
658 
659  inline void StartRunning() {
660  MutexLockGuard guard(status_mutex_);
661  running_ = true;
662  }
663  inline void StopRunning() {
664  MutexLockGuard guard(status_mutex_);
665  running_ = false;
666  }
667  inline bool IsRunning() const {
668  MutexLockGuard guard(status_mutex_);
669  return running_;
670  }
671 
672  private:
673  // general configuration
674  const size_t number_of_workers_;
675  const worker_context_t *worker_context_;
676 
679  WorkerRunBinding thread_context_;
680 
681  // status information
683  bool running_;
684  mutable unsigned int workers_started_;
685  mutable pthread_mutex_t status_mutex_;
686  mutable pthread_cond_t worker_started_;
687  mutable pthread_mutex_t jobs_all_done_mutex_;
688  mutable pthread_cond_t jobs_all_done_;
689 
690  // worker threads
692  pthread_t callback_thread_;
693 
694  // job queue
700 
701  // callback channel
704 };
705 
706 
750 template <class DerivedWorkerT>
752  public:
753  virtual ~ConcurrentWorker() {}
754 
761  virtual bool Initialize() { return true; }
762 
767  virtual void TearDown() {}
768 
780  // void operator()(const expected_data &data); // do the actual job of the
781  // worker
782 
783  protected:
784  ConcurrentWorker() : master_(NULL) {}
785 
791  inline ConcurrentWorkers<DerivedWorkerT>* master() const { return master_; }
792 
793  private:
794  friend class ConcurrentWorkers<DerivedWorkerT>;
796  master_ = master;
797  }
798 
799  private:
801 };
802 
803 #ifdef CVMFS_NAMESPACE_GUARD
804 } // namespace CVMFS_NAMESPACE_GUARD
805 #endif
806 
807 #include "util/concurrency_impl.h"
808 
809 #endif // CVMFS_UTIL_CONCURRENCY_H_
JobQueue jobs_queue_
Definition: concurrency.h:696
Job< returned_data_t > CallbackJob
Definition: concurrency.h:495
T maximal_value() const
Definition: concurrency.h:187
void RegisterMaster(ConcurrentWorkers< DerivedWorkerT > *master)
Definition: concurrency.h:795
SynchronizingCounter< T > & operator=(const T &other)
Definition: concurrency.h:199
Callbackable< ParamT >::CallbackTN * CallbackPtr
Definition: concurrency.h:254
FifoChannel< WorkerJob > JobQueue
Definition: concurrency.h:695
pthread_t callback_thread_
handles callback invokes
Definition: concurrency.h:692
virtual bool Initialize()
Definition: concurrency.h:761
int64_t atomic_int64
Definition: atomic.h:18
pthread_mutex_t mutex_
Definition: concurrency.h:217
pthread_cond_t queue_is_not_empty_
Definition: concurrency.h:435
const size_t queue_drainout_threshold_
Definition: concurrency.h:431
~Channel()
Definition: concurrency.h:40
WorkerT::returned_data returned_data_t
Definition: concurrency.h:469
ConcurrentWorkers< DerivedWorkerT > * master() const
Definition: concurrency.h:791
ItemT * PopFront()
Definition: concurrency.h:83
int TryLock() const
Definition: concurrency.h:117
RunBinding(ConcurrentWorkers< WorkerT > *delegate)
Definition: concurrency.h:505
SynchronizingCounter(const T maximal_value)
Definition: concurrency.h:155
pthread_cond_t became_zero_
Definition: concurrency.h:218
pthread_cond_t signal_
Definition: concurrency.h:370
#define CVMFS_EXPORT
Definition: export.h:11
atomic_int32 jobs_pending_
Definition: concurrency.h:697
unsigned int GetNumberOfCpuCores()
Definition: concurrency.cc:18
unsigned int GetNumberOfWorkers() const
Definition: concurrency.h:581
pthread_cond_t cond_populated_
Definition: concurrency.h:104
Callbacks listeners_
Definition: concurrency.h:335
assert((mem||(size==0))&&"Out Of Memory")
WorkerThreads worker_threads_
list of worker threads
Definition: concurrency.h:691
FifoChannel< CallbackJob > CallbackQueue
Definition: concurrency.h:702
const T operator--(int)
Definition: concurrency.h:192
bool HasMaximalValue() const
Definition: concurrency.h:186
WorkerT::worker_context worker_context_t
Definition: concurrency.h:473
void JobSuccessful(const returned_data_t &data)
Definition: concurrency.h:592
pthread_cond_t jobs_all_done_
Definition: concurrency.h:688
Definition: async.h:45
void Schedule(const expected_data_t &data)
Definition: concurrency.h:551
bool fired_
Definition: concurrency.h:368
virtual ~ConcurrentWorker()
Definition: concurrency.h:753
int32_t atomic_int32
Definition: atomic.h:17
virtual void TearDown()
Definition: concurrency.h:767
pthread_cond_t worker_started_
Definition: concurrency.h:686
void Lock() const
Definition: concurrency.h:116
const worker_context_t * worker_context
Definition: concurrency.h:519
const worker_context_t * worker_context_
the WorkerT defined context
Definition: concurrency.h:675
std::vector< pthread_t > WorkerThreads
Definition: concurrency.h:476
atomic_int32 jobs_failed_
Definition: concurrency.h:698
const bool is_death_sentence
death sentence flag
Definition: concurrency.h:492
pthread_mutex_t mutex_
Definition: concurrency.h:127
Job< expected_data_t > WorkerJob
Definition: concurrency.h:494
pthread_mutex_t status_mutex_
Definition: concurrency.h:685
pthread_cond_t queue_is_not_full_
Definition: concurrency.h:436
WorkerRunBinding thread_context_
Definition: concurrency.h:679
static const unsigned int kFallbackNumberOfCpus
Definition: concurrency.h:353
const T operator++(int)
Definition: concurrency.h:190
virtual ~Lockable()
Definition: concurrency.h:114
const size_t maximal_queue_length_
Definition: concurrency.h:430
WorkerT::expected_data expected_data_t
Definition: concurrency.h:465
pthread_mutex_t mutex_
Definition: concurrency.h:434
std::vector< ItemT * > * StartEnqueueing()
Definition: concurrency.h:48
CallbackQueue results_queue_
Definition: concurrency.h:703
std::vector< ItemT * > items_
Definition: concurrency.h:96
pthread_cond_t free_slot_
Definition: concurrency.h:219
unsigned int workers_started_
Definition: concurrency.h:684
pthread_mutex_t lock_
Definition: concurrency.h:100
const DataT data
job payload
Definition: concurrency.h:491
void Unlock() const
Definition: concurrency.h:118
Job(const DataT &data)
Definition: concurrency.h:485
void WaitForZero() const
Definition: concurrency.h:178
ConcurrentWorkers< WorkerT > * delegate
Definition: concurrency.h:507
void UnregisterListener(ListenerHandle *handle)
Definition: mutex.h:42
atomic_int64 jobs_processed_
Definition: concurrency.h:699
unsigned int GetNumberOfFailedJobs() const
Definition: concurrency.h:582
const size_t number_of_workers_
number of concurrent worker threads
Definition: concurrency.h:674
void CommitEnqueueing()
Definition: concurrency.h:65
void PushBack(ItemT *item)
Definition: concurrency.h:72
void JobFailed(const returned_data_t &data)
Definition: concurrency.h:606
std::set< CallbackPtr > Callbacks
Definition: concurrency.h:256
pthread_mutex_t jobs_all_done_mutex_
Definition: concurrency.h:687
ConcurrentWorkers< DerivedWorkerT > * master_
Definition: concurrency.h:800
WorkerRunBinding(ConcurrentWorkers< WorkerT > *delegate, const worker_context_t *worker_context)
Definition: concurrency.h:512
void AbortEnqueueing()
Definition: concurrency.h:57
bool IsRunning() const
Definition: concurrency.h:667
pthread_mutex_t lock_
Definition: concurrency.h:369