CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
concurrency.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_UTIL_CONCURRENCY_H_
6 #define CVMFS_UTIL_CONCURRENCY_H_
7 
8 #include <pthread.h>
9 
10 #include <cassert>
11 #include <queue>
12 #include <set>
13 #include <vector>
14 
15 #include "util/async.h"
16 #include "util/atomic.h"
17 #include "util/export.h"
18 #include "util/mutex.h"
19 #include "util/single_copy.h"
20 
21 #ifdef CVMFS_NAMESPACE_GUARD
22 namespace CVMFS_NAMESPACE_GUARD {
23 #endif
24 
31  public:
32  inline virtual ~Lockable() { pthread_mutex_destroy(&mutex_); }
33 
34  void Lock() const { pthread_mutex_lock(&mutex_); }
35  int TryLock() const { return pthread_mutex_trylock(&mutex_); }
36  void Unlock() const { pthread_mutex_unlock(&mutex_); }
37 
38  protected:
40  const int retval = pthread_mutex_init(&mutex_, NULL);
41  assert(retval == 0);
42  }
43 
44  private:
45  mutable pthread_mutex_t mutex_;
46 };
47 
48 
49 //
50 // -----------------------------------------------------------------------------
51 //
52 
53 
67 template <typename T>
69  public:
71  value_(T(0)), maximal_value_(T(0)) { Initialize(); }
72 
73  explicit SynchronizingCounter(const T maximal_value)
74  : value_(T(0))
75  , maximal_value_(maximal_value)
76  {
77  assert(maximal_value > T(0));
78  Initialize();
79  }
80 
81  ~SynchronizingCounter() { Destroy(); }
82 
83  T Increment() {
84  MutexLockGuard l(mutex_);
85  WaitForFreeSlotUnprotected();
86  SetValueUnprotected(value_ + T(1));
87  return value_;
88  }
89 
90  T Decrement() {
91  MutexLockGuard l(mutex_);
92  SetValueUnprotected(value_ - T(1));
93  return value_;
94  }
95 
96  void WaitForZero() const {
97  MutexLockGuard l(mutex_);
98  while (value_ != T(0)) {
99  pthread_cond_wait(&became_zero_, &mutex_);
100  }
101  assert(value_ == T(0));
102  }
103 
104  bool HasMaximalValue() const { return maximal_value_ != T(0); }
105  T maximal_value() const { return maximal_value_; }
106 
107  T operator++() { return Increment(); }
108  const T operator++(int) { return Increment() - T(1); }
109  T operator--() { return Decrement(); }
110  const T operator--(int) { return Decrement() + T(1); }
111 
112  T Get() const {
113  MutexLockGuard l(mutex_);
114  return value_;
115  }
116 
118  MutexLockGuard l(mutex_);
119  SetValueUnprotected(other);
120  return *this;
121  }
122 
123  protected:
124  void SetValueUnprotected(const T new_value);
125  void WaitForFreeSlotUnprotected();
126 
127  private:
128  void Initialize();
129  void Destroy();
130 
131  private:
133  const T maximal_value_;
134 
135  mutable pthread_mutex_t mutex_;
136  mutable pthread_cond_t became_zero_;
137  pthread_cond_t free_slot_;
138 };
139 
140 
141 //
142 // -----------------------------------------------------------------------------
143 //
144 
145 
146 template <typename ParamT>
148 
149 
168 template <typename ParamT>
169 class Observable : public Callbackable<ParamT>,
170  SingleCopy {
171  public:
173  protected:
174  typedef std::set<CallbackPtr> Callbacks;
175 
176  public:
177  virtual ~Observable();
178 
192  template <class DelegateT, class ClosureDataT>
193  CallbackPtr RegisterListener(
194  typename BoundClosure<ParamT,
195  DelegateT,
196  ClosureDataT>::CallbackMethod method,
197  DelegateT *delegate,
198  ClosureDataT data);
199 
210  template <class DelegateT>
211  CallbackPtr RegisterListener(
213  DelegateT *delegate);
214 
223  CallbackPtr RegisterListener(typename Callback<ParamT>::CallbackFunction fn);
224 
231  void UnregisterListener(CallbackPtr callback_object);
232 
236  void UnregisterListeners();
237 
238  protected:
239  Observable(); // don't instantiate this as a stand alone object
240 
241  void RegisterListener(CallbackPtr callback_object);
242 
250  void NotifyListeners(const ParamT &parameter);
251 
252  private:
254  mutable pthread_rwlock_t listeners_rw_lock_;
256 };
257 
258 
259 //
260 // -----------------------------------------------------------------------------
261 //
262 
263 
270 CVMFS_EXPORT unsigned int GetNumberOfCpuCores();
271 static const unsigned int kFallbackNumberOfCpus = 1;
272 
273 
278  public:
279  Signal();
280  ~Signal();
281  void Wakeup();
282  void Wait();
283  bool IsSleeping();
284 
285  private:
286  bool fired_;
287  pthread_mutex_t lock_;
288  pthread_cond_t signal_;
289 };
290 
291 
292 //
293 // -----------------------------------------------------------------------------
294 //
295 
296 
304 template <class T>
305 class FifoChannel : protected std::queue<T> {
306  public:
314  FifoChannel(const size_t maximal_length,
315  const size_t drainout_threshold);
316  virtual ~FifoChannel();
317 
325  void Enqueue(const T &data);
326 
333  const T Dequeue();
334 
340  unsigned int Drop();
341 
342  inline size_t GetItemCount() const;
343  inline bool IsEmpty() const;
344  inline size_t GetMaximalItemCount() const;
345 
346  private:
347  // general configuration
348  const size_t maximal_queue_length_;
350 
351  // thread synchronisation structures
352  mutable pthread_mutex_t mutex_;
353  mutable pthread_cond_t queue_is_not_empty_;
354  mutable pthread_cond_t queue_is_not_full_;
355 };
356 
357 
376 template <class WorkerT>
377 class ConcurrentWorkers : public Observable<typename WorkerT::returned_data> {
378  public:
379  // these data types must be defined by the worker class
383  typedef typename WorkerT::expected_data expected_data_t;
387  typedef typename WorkerT::returned_data returned_data_t;
391  typedef typename WorkerT::worker_context worker_context_t;
392 
393  protected:
394  typedef std::vector<pthread_t> WorkerThreads;
395 
401  template <class DataT>
402  struct Job {
403  explicit Job(const DataT &data) :
404  data(data),
405  is_death_sentence(false) {}
406  Job() :
407  data(),
408  is_death_sentence(true) {}
409  const DataT data;
410  const bool is_death_sentence;
411  };
412  typedef Job<expected_data_t> WorkerJob;
413  typedef Job<returned_data_t> CallbackJob;
414 
422  struct RunBinding {
424  delegate(delegate) {}
426  };
428 
431  const worker_context_t *worker_context) :
432  RunBinding(delegate),
433  worker_context(worker_context) {}
438  };
439 
440  public:
450  ConcurrentWorkers(const size_t number_of_workers,
451  const size_t maximal_queue_length,
452  worker_context_t *worker_context = NULL);
453  virtual ~ConcurrentWorkers();
454 
461  bool Initialize();
462 
469  inline void Schedule(const expected_data_t &data) {
470  Schedule(WorkerJob(data));
471  }
472 
480  void Terminate();
481 
488  void WaitForEmptyQueue() const;
489 
497  void WaitForTermination();
498 
499  inline unsigned int GetNumberOfWorkers() const { return number_of_workers_; }
500  inline unsigned int GetNumberOfFailedJobs() const {
501  return atomic_read32(&jobs_failed_);
502  }
503 
510  inline void JobSuccessful(const returned_data_t& data) {
511  JobDone(data, true);
512  }
513 
524  inline void JobFailed(const returned_data_t& data) { JobDone(data, false); }
525 
526  void RunCallbackThread();
527 
528  protected:
529  bool SpawnWorkers();
530 
539  static void* RunWorker(void *run_binding);
540 
541  static void* RunCallbackThreadWrapper(void *run_binding);
542 
547  void ReportStartedWorker() const;
548 
549  void Schedule(WorkerJob job);
550  void ScheduleDeathSentences();
551 
557  void TruncateJobQueue(const bool forget_pending = false);
558 
566  inline WorkerJob Acquire();
567 
575  void JobDone(const returned_data_t& data, const bool success = true);
576 
577  inline void StartRunning() {
578  MutexLockGuard guard(status_mutex_);
579  running_ = true;
580  }
581  inline void StopRunning() {
582  MutexLockGuard guard(status_mutex_);
583  running_ = false;
584  }
585  inline bool IsRunning() const {
586  MutexLockGuard guard(status_mutex_);
587  return running_;
588  }
589 
590  private:
591  // general configuration
592  const size_t number_of_workers_;
593  const worker_context_t *worker_context_;
594 
597  WorkerRunBinding thread_context_;
598 
599  // status information
601  bool running_;
602  mutable unsigned int workers_started_;
603  mutable pthread_mutex_t status_mutex_;
604  mutable pthread_cond_t worker_started_;
605  mutable pthread_mutex_t jobs_all_done_mutex_;
606  mutable pthread_cond_t jobs_all_done_;
607 
608  // worker threads
610  pthread_t callback_thread_;
611 
612  // job queue
618 
619  // callback channel
622 };
623 
624 
668 template <class DerivedWorkerT>
670  public:
671  virtual ~ConcurrentWorker() {}
672 
679  virtual bool Initialize() { return true; }
680 
685  virtual void TearDown() {}
686 
698  // void operator()(const expected_data &data); // do the actual job of the
699  // worker
700 
701  protected:
702  ConcurrentWorker() : master_(NULL) {}
703 
709  inline ConcurrentWorkers<DerivedWorkerT>* master() const { return master_; }
710 
711  private:
712  friend class ConcurrentWorkers<DerivedWorkerT>;
714  master_ = master;
715  }
716 
717  private:
719 };
720 
721 #ifdef CVMFS_NAMESPACE_GUARD
722 } // namespace CVMFS_NAMESPACE_GUARD
723 #endif
724 
725 #include "util/concurrency_impl.h"
726 
727 #endif // CVMFS_UTIL_CONCURRENCY_H_
JobQueue jobs_queue_
Definition: concurrency.h:614
Job< returned_data_t > CallbackJob
Definition: concurrency.h:413
T maximal_value() const
Definition: concurrency.h:105
void RegisterMaster(ConcurrentWorkers< DerivedWorkerT > *master)
Definition: concurrency.h:713
SynchronizingCounter< T > & operator=(const T &other)
Definition: concurrency.h:117
Callbackable< ParamT >::CallbackTN * CallbackPtr
Definition: concurrency.h:172
FifoChannel< WorkerJob > JobQueue
Definition: concurrency.h:613
pthread_t callback_thread_
handles callback invokes
Definition: concurrency.h:610
virtual bool Initialize()
Definition: concurrency.h:679
int64_t atomic_int64
Definition: atomic.h:18
pthread_mutex_t mutex_
Definition: concurrency.h:135
pthread_cond_t queue_is_not_empty_
Definition: concurrency.h:353
const size_t queue_drainout_threshold_
Definition: concurrency.h:349
WorkerT::returned_data returned_data_t
Definition: concurrency.h:387
ConcurrentWorkers< DerivedWorkerT > * master() const
Definition: concurrency.h:709
int TryLock() const
Definition: concurrency.h:35
RunBinding(ConcurrentWorkers< WorkerT > *delegate)
Definition: concurrency.h:423
SynchronizingCounter(const T maximal_value)
Definition: concurrency.h:73
pthread_cond_t became_zero_
Definition: concurrency.h:136
pthread_cond_t signal_
Definition: concurrency.h:288
#define CVMFS_EXPORT
Definition: export.h:11
atomic_int32 jobs_pending_
Definition: concurrency.h:615
unsigned int GetNumberOfCpuCores()
Definition: concurrency.cc:18
unsigned int GetNumberOfWorkers() const
Definition: concurrency.h:499
Callbacks listeners_
Definition: concurrency.h:253
assert((mem||(size==0))&&"Out Of Memory")
WorkerThreads worker_threads_
list of worker threads
Definition: concurrency.h:609
FifoChannel< CallbackJob > CallbackQueue
Definition: concurrency.h:620
const T operator--(int)
Definition: concurrency.h:110
bool HasMaximalValue() const
Definition: concurrency.h:104
WorkerT::worker_context worker_context_t
Definition: concurrency.h:391
void JobSuccessful(const returned_data_t &data)
Definition: concurrency.h:510
pthread_cond_t jobs_all_done_
Definition: concurrency.h:606
Definition: async.h:45
void Schedule(const expected_data_t &data)
Definition: concurrency.h:469
bool fired_
Definition: concurrency.h:286
virtual ~ConcurrentWorker()
Definition: concurrency.h:671
int32_t atomic_int32
Definition: atomic.h:17
virtual void TearDown()
Definition: concurrency.h:685
pthread_cond_t worker_started_
Definition: concurrency.h:604
void Lock() const
Definition: concurrency.h:34
const worker_context_t * worker_context
Definition: concurrency.h:437
const worker_context_t * worker_context_
the WorkerT defined context
Definition: concurrency.h:593
std::vector< pthread_t > WorkerThreads
Definition: concurrency.h:394
atomic_int32 jobs_failed_
Definition: concurrency.h:616
const bool is_death_sentence
death sentence flag
Definition: concurrency.h:410
pthread_mutex_t mutex_
Definition: concurrency.h:45
Job< expected_data_t > WorkerJob
Definition: concurrency.h:412
pthread_mutex_t status_mutex_
Definition: concurrency.h:603
pthread_cond_t queue_is_not_full_
Definition: concurrency.h:354
WorkerRunBinding thread_context_
Definition: concurrency.h:597
static const unsigned int kFallbackNumberOfCpus
Definition: concurrency.h:271
const T operator++(int)
Definition: concurrency.h:108
virtual ~Lockable()
Definition: concurrency.h:32
const size_t maximal_queue_length_
Definition: concurrency.h:348
WorkerT::expected_data expected_data_t
Definition: concurrency.h:383
pthread_mutex_t mutex_
Definition: concurrency.h:352
CallbackQueue results_queue_
Definition: concurrency.h:621
pthread_cond_t free_slot_
Definition: concurrency.h:137
unsigned int workers_started_
Definition: concurrency.h:602
const DataT data
job payload
Definition: concurrency.h:409
void Unlock() const
Definition: concurrency.h:36
Job(const DataT &data)
Definition: concurrency.h:403
void WaitForZero() const
Definition: concurrency.h:96
ConcurrentWorkers< WorkerT > * delegate
Definition: concurrency.h:425
void UnregisterListener(ListenerHandle *handle)
Definition: mutex.h:42
atomic_int64 jobs_processed_
Definition: concurrency.h:617
unsigned int GetNumberOfFailedJobs() const
Definition: concurrency.h:500
const size_t number_of_workers_
number of concurrent worker threads
Definition: concurrency.h:592
void JobFailed(const returned_data_t &data)
Definition: concurrency.h:524
std::set< CallbackPtr > Callbacks
Definition: concurrency.h:174
pthread_mutex_t jobs_all_done_mutex_
Definition: concurrency.h:605
ConcurrentWorkers< DerivedWorkerT > * master_
Definition: concurrency.h:718
WorkerRunBinding(ConcurrentWorkers< WorkerT > *delegate, const worker_context_t *worker_context)
Definition: concurrency.h:430
bool IsRunning() const
Definition: concurrency.h:585
pthread_mutex_t lock_
Definition: concurrency.h:287