CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
concurrency.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_UTIL_CONCURRENCY_H_
6 #define CVMFS_UTIL_CONCURRENCY_H_
7 
8 #include <pthread.h>
9 
10 #include <cassert>
11 #include <queue>
12 #include <set>
13 #include <vector>
14 
15 #include "util/async.h"
16 #include "util/atomic.h"
17 #include "util/export.h"
18 #include "util/mutex.h"
19 #include "util/single_copy.h"
20 
21 #ifdef CVMFS_NAMESPACE_GUARD
22 namespace CVMFS_NAMESPACE_GUARD {
23 #endif
24 
30 template<class ItemT>
32  public:
33  Channel() {
34  int retval = pthread_mutex_init(&lock_, NULL);
35  assert(retval == 0);
36  retval = pthread_cond_init(&cond_populated_, NULL);
37  assert(retval == 0);
38  }
39 
41  pthread_cond_destroy(&cond_populated_);
42  pthread_mutex_destroy(&lock_);
43  }
44 
48  std::vector<ItemT *> *StartEnqueueing() {
49  int retval = pthread_mutex_lock(&lock_);
50  assert(retval == 0);
51  return &items_;
52  }
53 
57  void AbortEnqueueing() {
58  int retval = pthread_mutex_unlock(&lock_);
59  assert(retval == 0);
60  }
61 
66  int retval = pthread_cond_signal(&cond_populated_);
67  assert(retval == 0);
68  retval = pthread_mutex_unlock(&lock_);
69  assert(retval == 0);
70  }
71 
72  void PushBack(ItemT *item) {
73  MutexLockGuard lock_guard(&lock_);
74  items_.push_back(item);
75  int retval = pthread_cond_signal(&cond_populated_);
76  assert(retval == 0);
77  }
78 
83  ItemT *PopFront() {
84  MutexLockGuard lock_guard(&lock_);
85  while (items_.size() == 0)
86  pthread_cond_wait(&cond_populated_, &lock_);
87  ItemT *item = items_[0];
88  items_.erase(items_.begin());
89  return item;
90  }
91 
92  private:
96  std::vector<ItemT *> items_;
100  pthread_mutex_t lock_;
104  pthread_cond_t cond_populated_;
105 };
106 
113  public:
114  inline virtual ~Lockable() { pthread_mutex_destroy(&mutex_); }
115 
116  void Lock() const { pthread_mutex_lock(&mutex_); }
117  int TryLock() const { return pthread_mutex_trylock(&mutex_); }
118  void Unlock() const { pthread_mutex_unlock(&mutex_); }
119 
120  protected:
122  const int retval = pthread_mutex_init(&mutex_, NULL);
123  assert(retval == 0);
124  }
125 
126  private:
127  mutable pthread_mutex_t mutex_;
128 };
129 
130 
131 //
132 // -----------------------------------------------------------------------------
133 //
134 
135 
149 template<typename T>
151  public:
152  SynchronizingCounter() : value_(T(0)), maximal_value_(T(0)) { Initialize(); }
153 
154  explicit SynchronizingCounter(const T maximal_value)
155  : value_(T(0)), maximal_value_(maximal_value) {
156  assert(maximal_value > T(0));
157  Initialize();
158  }
159 
160  ~SynchronizingCounter() { Destroy(); }
161 
162  T Increment() {
163  MutexLockGuard l(mutex_);
164  WaitForFreeSlotUnprotected();
165  SetValueUnprotected(value_ + T(1));
166  return value_;
167  }
168 
169  T Decrement() {
170  MutexLockGuard l(mutex_);
171  SetValueUnprotected(value_ - T(1));
172  return value_;
173  }
174 
175  void WaitForZero() const {
176  MutexLockGuard l(mutex_);
177  while (value_ != T(0)) {
178  pthread_cond_wait(&became_zero_, &mutex_);
179  }
180  assert(value_ == T(0));
181  }
182 
183  bool HasMaximalValue() const { return maximal_value_ != T(0); }
184  T maximal_value() const { return maximal_value_; }
185 
186  T operator++() { return Increment(); }
187  const T operator++(int) { return Increment() - T(1); }
188  T operator--() { return Decrement(); }
189  const T operator--(int) { return Decrement() + T(1); }
190 
191  T Get() const {
192  MutexLockGuard l(mutex_);
193  return value_;
194  }
195 
197  MutexLockGuard l(mutex_);
198  SetValueUnprotected(other);
199  return *this;
200  }
201 
202  protected:
203  void SetValueUnprotected(const T new_value);
204  void WaitForFreeSlotUnprotected();
205 
206  private:
207  void Initialize();
208  void Destroy();
209 
210  private:
212  const T maximal_value_;
213 
214  mutable pthread_mutex_t mutex_;
215  mutable pthread_cond_t became_zero_;
216  pthread_cond_t free_slot_;
217 };
218 
219 
220 //
221 // -----------------------------------------------------------------------------
222 //
223 
224 
225 template<typename ParamT>
227 
228 
247 template<typename ParamT>
248 class Observable : public Callbackable<ParamT>, SingleCopy {
249  public:
251 
252  protected:
253  typedef std::set<CallbackPtr> Callbacks;
254 
255  public:
256  virtual ~Observable();
257 
271  template<class DelegateT, class ClosureDataT>
272  CallbackPtr RegisterListener(
274  method,
275  DelegateT *delegate,
276  ClosureDataT data);
277 
288  template<class DelegateT>
289  CallbackPtr RegisterListener(
291  DelegateT *delegate);
292 
301  CallbackPtr RegisterListener(typename Callback<ParamT>::CallbackFunction fn);
302 
309  void UnregisterListener(CallbackPtr callback_object);
310 
314  void UnregisterListeners();
315 
316  protected:
317  Observable(); // don't instantiate this as a stand alone object
318 
319  void RegisterListener(CallbackPtr callback_object);
320 
328  void NotifyListeners(const ParamT &parameter);
329 
330  private:
332  mutable pthread_rwlock_t listeners_rw_lock_;
334 };
335 
336 
337 //
338 // -----------------------------------------------------------------------------
339 //
340 
341 
348 CVMFS_EXPORT unsigned int GetNumberOfCpuCores();
349 static const unsigned int kFallbackNumberOfCpus = 1;
350 
351 
356  public:
357  Signal();
358  ~Signal();
359  void Wakeup();
360  void Wait();
361  bool IsSleeping();
362 
363  private:
364  bool fired_;
365  pthread_mutex_t lock_;
366  pthread_cond_t signal_;
367 };
368 
369 
370 //
371 // -----------------------------------------------------------------------------
372 //
373 
374 
382 template<class T>
383 class FifoChannel : protected std::queue<T> {
384  public:
392  FifoChannel(const size_t maximal_length, const size_t drainout_threshold);
393  virtual ~FifoChannel();
394 
402  void Enqueue(const T &data);
403 
410  const T Dequeue();
411 
417  unsigned int Drop();
418 
419  inline size_t GetItemCount() const;
420  inline bool IsEmpty() const;
421  inline size_t GetMaximalItemCount() const;
422 
423  private:
424  // general configuration
425  const size_t maximal_queue_length_;
427 
428  // thread synchronisation structures
429  mutable pthread_mutex_t mutex_;
430  mutable pthread_cond_t queue_is_not_empty_;
431  mutable pthread_cond_t queue_is_not_full_;
432 };
433 
434 
453 template<class WorkerT>
454 class ConcurrentWorkers : public Observable<typename WorkerT::returned_data> {
455  public:
456  // these data types must be defined by the worker class
460  typedef typename WorkerT::expected_data expected_data_t;
464  typedef typename WorkerT::returned_data returned_data_t;
468  typedef typename WorkerT::worker_context worker_context_t;
469 
470  protected:
471  typedef std::vector<pthread_t> WorkerThreads;
472 
478  template<class DataT>
479  struct Job {
480  explicit Job(const DataT &data) : data(data), is_death_sentence(false) { }
481  Job() : data(), is_death_sentence(true) { }
482  const DataT data;
483  const bool is_death_sentence;
484  };
485  typedef Job<expected_data_t> WorkerJob;
486  typedef Job<returned_data_t> CallbackJob;
487 
495  struct RunBinding {
497  : delegate(delegate) { }
499  };
501 
504  const worker_context_t *worker_context)
505  : RunBinding(delegate), worker_context(worker_context) { }
510  };
511 
512  public:
523  ConcurrentWorkers(const size_t number_of_workers,
524  const size_t maximal_queue_length,
525  worker_context_t *worker_context = NULL);
526  virtual ~ConcurrentWorkers();
527 
534  bool Initialize();
535 
543  inline void Schedule(const expected_data_t &data) {
544  Schedule(WorkerJob(data));
545  }
546 
554  void Terminate();
555 
562  void WaitForEmptyQueue() const;
563 
571  void WaitForTermination();
572 
573  inline unsigned int GetNumberOfWorkers() const { return number_of_workers_; }
574  inline unsigned int GetNumberOfFailedJobs() const {
575  return atomic_read32(&jobs_failed_);
576  }
577 
584  inline void JobSuccessful(const returned_data_t &data) {
585  JobDone(data, true);
586  }
587 
598  inline void JobFailed(const returned_data_t &data) { JobDone(data, false); }
599 
600  void RunCallbackThread();
601 
602  protected:
603  bool SpawnWorkers();
604 
613  static void *RunWorker(void *run_binding);
614 
615  static void *RunCallbackThreadWrapper(void *run_binding);
616 
621  void ReportStartedWorker() const;
622 
623  void Schedule(WorkerJob job);
624  void ScheduleDeathSentences();
625 
632  void TruncateJobQueue(const bool forget_pending = false);
633 
641  inline WorkerJob Acquire();
642 
650  void JobDone(const returned_data_t &data, const bool success = true);
651 
652  inline void StartRunning() {
653  MutexLockGuard guard(status_mutex_);
654  running_ = true;
655  }
656  inline void StopRunning() {
657  MutexLockGuard guard(status_mutex_);
658  running_ = false;
659  }
660  inline bool IsRunning() const {
661  MutexLockGuard guard(status_mutex_);
662  return running_;
663  }
664 
665  private:
666  // general configuration
667  const size_t number_of_workers_;
668  const worker_context_t *worker_context_;
669 
672  WorkerRunBinding thread_context_;
673 
674  // status information
676  bool running_;
677  mutable unsigned int workers_started_;
678  mutable pthread_mutex_t status_mutex_;
679  mutable pthread_cond_t worker_started_;
680  mutable pthread_mutex_t jobs_all_done_mutex_;
681  mutable pthread_cond_t jobs_all_done_;
682 
683  // worker threads
685  pthread_t callback_thread_;
686 
687  // job queue
693 
694  // callback channel
697 };
698 
699 
743 template<class DerivedWorkerT>
745  public:
746  virtual ~ConcurrentWorker() { }
747 
754  virtual bool Initialize() { return true; }
755 
760  virtual void TearDown() { }
761 
773  // void operator()(const expected_data &data); // do the actual job of the
774  // worker
775 
776  protected:
777  ConcurrentWorker() : master_(NULL) { }
778 
784  inline ConcurrentWorkers<DerivedWorkerT> *master() const { return master_; }
785 
786  private:
787  friend class ConcurrentWorkers<DerivedWorkerT>;
789  master_ = master;
790  }
791 
792  private:
794 };
795 
796 #ifdef CVMFS_NAMESPACE_GUARD
797 } // namespace CVMFS_NAMESPACE_GUARD
798 #endif
799 
800 #include "util/concurrency_impl.h"
801 
802 #endif // CVMFS_UTIL_CONCURRENCY_H_
JobQueue jobs_queue_
Definition: concurrency.h:689
Job< returned_data_t > CallbackJob
Definition: concurrency.h:486
T maximal_value() const
Definition: concurrency.h:184
void RegisterMaster(ConcurrentWorkers< DerivedWorkerT > *master)
Definition: concurrency.h:788
SynchronizingCounter< T > & operator=(const T &other)
Definition: concurrency.h:196
Callbackable< ParamT >::CallbackTN * CallbackPtr
Definition: concurrency.h:250
pthread_t callback_thread_
handles callback invokes
Definition: concurrency.h:685
virtual bool Initialize()
Definition: concurrency.h:754
int64_t atomic_int64
Definition: atomic.h:18
pthread_mutex_t mutex_
Definition: concurrency.h:214
pthread_cond_t queue_is_not_empty_
Definition: concurrency.h:430
const size_t queue_drainout_threshold_
Definition: concurrency.h:426
~Channel()
Definition: concurrency.h:40
WorkerT::returned_data returned_data_t
Definition: concurrency.h:464
ConcurrentWorkers< DerivedWorkerT > * master() const
Definition: concurrency.h:784
ItemT * PopFront()
Definition: concurrency.h:83
int TryLock() const
Definition: concurrency.h:117
RunBinding(ConcurrentWorkers< WorkerT > *delegate)
Definition: concurrency.h:496
SynchronizingCounter(const T maximal_value)
Definition: concurrency.h:154
pthread_cond_t became_zero_
Definition: concurrency.h:215
pthread_cond_t signal_
Definition: concurrency.h:366
#define CVMFS_EXPORT
Definition: export.h:11
atomic_int32 jobs_pending_
Definition: concurrency.h:690
unsigned int GetNumberOfCpuCores()
Definition: concurrency.cc:18
unsigned int GetNumberOfWorkers() const
Definition: concurrency.h:573
pthread_cond_t cond_populated_
Definition: concurrency.h:104
Callbacks listeners_
Definition: concurrency.h:331
assert((mem||(size==0))&&"Out Of Memory")
WorkerThreads worker_threads_
list of worker threads
Definition: concurrency.h:684
const T operator--(int)
Definition: concurrency.h:189
bool HasMaximalValue() const
Definition: concurrency.h:183
WorkerT::worker_context worker_context_t
Definition: concurrency.h:468
void JobSuccessful(const returned_data_t &data)
Definition: concurrency.h:584
pthread_cond_t jobs_all_done_
Definition: concurrency.h:681
Definition: async.h:45
void Schedule(const expected_data_t &data)
Definition: concurrency.h:543
bool fired_
Definition: concurrency.h:364
virtual ~ConcurrentWorker()
Definition: concurrency.h:746
int32_t atomic_int32
Definition: atomic.h:17
virtual void TearDown()
Definition: concurrency.h:760
pthread_cond_t worker_started_
Definition: concurrency.h:679
void Lock() const
Definition: concurrency.h:116
const worker_context_t * worker_context
Definition: concurrency.h:509
const worker_context_t * worker_context_
the WorkerT defined context
Definition: concurrency.h:668
std::vector< pthread_t > WorkerThreads
Definition: concurrency.h:471
FifoChannel< CallbackJob > CallbackQueue
Definition: concurrency.h:695
atomic_int32 jobs_failed_
Definition: concurrency.h:691
const bool is_death_sentence
death sentence flag
Definition: concurrency.h:483
pthread_mutex_t mutex_
Definition: concurrency.h:127
Job< expected_data_t > WorkerJob
Definition: concurrency.h:485
pthread_mutex_t status_mutex_
Definition: concurrency.h:678
FifoChannel< WorkerJob > JobQueue
Definition: concurrency.h:688
pthread_cond_t queue_is_not_full_
Definition: concurrency.h:431
WorkerRunBinding thread_context_
Definition: concurrency.h:672
static const unsigned int kFallbackNumberOfCpus
Definition: concurrency.h:349
const T operator++(int)
Definition: concurrency.h:187
virtual ~Lockable()
Definition: concurrency.h:114
const size_t maximal_queue_length_
Definition: concurrency.h:425
WorkerT::expected_data expected_data_t
Definition: concurrency.h:460
pthread_mutex_t mutex_
Definition: concurrency.h:429
std::vector< ItemT * > * StartEnqueueing()
Definition: concurrency.h:48
CallbackQueue results_queue_
Definition: concurrency.h:696
std::vector< ItemT * > items_
Definition: concurrency.h:96
pthread_cond_t free_slot_
Definition: concurrency.h:216
unsigned int workers_started_
Definition: concurrency.h:677
pthread_mutex_t lock_
Definition: concurrency.h:100
const DataT data
job payload
Definition: concurrency.h:482
void Unlock() const
Definition: concurrency.h:118
Job(const DataT &data)
Definition: concurrency.h:480
void WaitForZero() const
Definition: concurrency.h:175
ConcurrentWorkers< WorkerT > * delegate
Definition: concurrency.h:498
void UnregisterListener(ListenerHandle *handle)
Definition: mutex.h:42
atomic_int64 jobs_processed_
Definition: concurrency.h:692
unsigned int GetNumberOfFailedJobs() const
Definition: concurrency.h:574
const size_t number_of_workers_
number of concurrent worker threads
Definition: concurrency.h:667
void CommitEnqueueing()
Definition: concurrency.h:65
void PushBack(ItemT *item)
Definition: concurrency.h:72
void JobFailed(const returned_data_t &data)
Definition: concurrency.h:598
std::set< CallbackPtr > Callbacks
Definition: concurrency.h:253
pthread_mutex_t jobs_all_done_mutex_
Definition: concurrency.h:680
ConcurrentWorkers< DerivedWorkerT > * master_
Definition: concurrency.h:793
WorkerRunBinding(ConcurrentWorkers< WorkerT > *delegate, const worker_context_t *worker_context)
Definition: concurrency.h:503
void AbortEnqueueing()
Definition: concurrency.h:57
bool IsRunning() const
Definition: concurrency.h:660
pthread_mutex_t lock_
Definition: concurrency.h:365