CernVM-FS  2.11.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
concurrency.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_UTIL_CONCURRENCY_H_
6 #define CVMFS_UTIL_CONCURRENCY_H_
7 
8 #include <pthread.h>
9 
10 #include <cassert>
11 #include <queue>
12 #include <set>
13 #include <vector>
14 
15 #include "util/async.h"
16 #include "util/atomic.h"
17 #include "util/export.h"
18 #include "util/mutex.h"
19 #include "util/single_copy.h"
20 
21 #ifdef CVMFS_NAMESPACE_GUARD
22 namespace CVMFS_NAMESPACE_GUARD {
23 #endif
24 
31  public:
32  inline virtual ~Lockable() { pthread_mutex_destroy(&mutex_); }
33 
34  void Lock() const { pthread_mutex_lock(&mutex_); }
35  int TryLock() const { return pthread_mutex_trylock(&mutex_); }
36  void Unlock() const { pthread_mutex_unlock(&mutex_); }
37 
38  protected:
40  const int retval = pthread_mutex_init(&mutex_, NULL);
41  assert(retval == 0);
42  }
43 
44  private:
45  mutable pthread_mutex_t mutex_;
46 };
47 
48 
49 //
50 // -----------------------------------------------------------------------------
51 //
52 
53 
66 template <typename T>
67 class Future : SingleCopy {
68  public:
69  Future();
70  virtual ~Future();
71 
77  void Set(const T &object);
78 
84  T& Get();
85  const T& Get() const;
86 
87  protected:
88  void Wait() const;
89 
90  private:
92  mutable pthread_mutex_t mutex_;
93  mutable pthread_cond_t object_set_;
95 };
96 
97 
98 //
99 // -----------------------------------------------------------------------------
100 //
101 
102 
116 template <typename T>
118  public:
120  value_(T(0)), maximal_value_(T(0)) { Initialize(); }
121 
122  explicit SynchronizingCounter(const T maximal_value)
123  : value_(T(0))
124  , maximal_value_(maximal_value)
125  {
126  assert(maximal_value > T(0));
127  Initialize();
128  }
129 
130  ~SynchronizingCounter() { Destroy(); }
131 
132  T Increment() {
133  MutexLockGuard l(mutex_);
134  WaitForFreeSlotUnprotected();
135  SetValueUnprotected(value_ + T(1));
136  return value_;
137  }
138 
139  T Decrement() {
140  MutexLockGuard l(mutex_);
141  SetValueUnprotected(value_ - T(1));
142  return value_;
143  }
144 
145  void WaitForZero() const {
146  MutexLockGuard l(mutex_);
147  while (value_ != T(0)) {
148  pthread_cond_wait(&became_zero_, &mutex_);
149  }
150  assert(value_ == T(0));
151  }
152 
153  bool HasMaximalValue() const { return maximal_value_ != T(0); }
154  T maximal_value() const { return maximal_value_; }
155 
156  T operator++() { return Increment(); }
157  const T operator++(int) { return Increment() - T(1); }
158  T operator--() { return Decrement(); }
159  const T operator--(int) { return Decrement() + T(1); }
160 
161  T Get() const {
162  MutexLockGuard l(mutex_);
163  return value_;
164  }
165 
167  MutexLockGuard l(mutex_);
168  SetValueUnprotected(other);
169  return *this;
170  }
171 
172  protected:
173  void SetValueUnprotected(const T new_value);
174  void WaitForFreeSlotUnprotected();
175 
176  private:
177  void Initialize();
178  void Destroy();
179 
180  private:
182  const T maximal_value_;
183 
184  mutable pthread_mutex_t mutex_;
185  mutable pthread_cond_t became_zero_;
186  pthread_cond_t free_slot_;
187 };
188 
189 
190 //
191 // -----------------------------------------------------------------------------
192 //
193 
194 
195 template <typename ParamT>
197 
198 
217 template <typename ParamT>
218 class Observable : public Callbackable<ParamT>,
219  SingleCopy {
220  public:
222  protected:
223  typedef std::set<CallbackPtr> Callbacks;
224 
225  public:
226  virtual ~Observable();
227 
241  template <class DelegateT, class ClosureDataT>
242  CallbackPtr RegisterListener(
243  typename BoundClosure<ParamT,
244  DelegateT,
245  ClosureDataT>::CallbackMethod method,
246  DelegateT *delegate,
247  ClosureDataT data);
248 
259  template <class DelegateT>
260  CallbackPtr RegisterListener(
262  DelegateT *delegate);
263 
272  CallbackPtr RegisterListener(typename Callback<ParamT>::CallbackFunction fn);
273 
280  void UnregisterListener(CallbackPtr callback_object);
281 
285  void UnregisterListeners();
286 
287  protected:
288  Observable(); // don't instantiate this as a stand alone object
289 
290  void RegisterListener(CallbackPtr callback_object);
291 
299  void NotifyListeners(const ParamT &parameter);
300 
301  private:
303  mutable pthread_rwlock_t listeners_rw_lock_;
305 };
306 
307 
308 //
309 // -----------------------------------------------------------------------------
310 //
311 
312 
319 CVMFS_EXPORT unsigned int GetNumberOfCpuCores();
320 static const unsigned int kFallbackNumberOfCpus = 1;
321 
322 
327  public:
328  Signal();
329  ~Signal();
330  void Wakeup();
331  void Wait();
332  bool IsSleeping();
333 
334  private:
335  bool fired_;
336  pthread_mutex_t lock_;
337  pthread_cond_t signal_;
338 };
339 
340 
341 //
342 // -----------------------------------------------------------------------------
343 //
344 
345 
353 template <class T>
354 class FifoChannel : protected std::queue<T> {
355  public:
363  FifoChannel(const size_t maximal_length,
364  const size_t drainout_threshold);
365  virtual ~FifoChannel();
366 
374  void Enqueue(const T &data);
375 
382  const T Dequeue();
383 
389  unsigned int Drop();
390 
391  inline size_t GetItemCount() const;
392  inline bool IsEmpty() const;
393  inline size_t GetMaximalItemCount() const;
394 
395  private:
396  // general configuration
397  const size_t maximal_queue_length_;
399 
400  // thread synchronisation structures
401  mutable pthread_mutex_t mutex_;
402  mutable pthread_cond_t queue_is_not_empty_;
403  mutable pthread_cond_t queue_is_not_full_;
404 };
405 
406 
425 template <class WorkerT>
426 class ConcurrentWorkers : public Observable<typename WorkerT::returned_data> {
427  public:
428  // these data types must be defined by the worker class
432  typedef typename WorkerT::expected_data expected_data_t;
436  typedef typename WorkerT::returned_data returned_data_t;
440  typedef typename WorkerT::worker_context worker_context_t;
441 
442  protected:
443  typedef std::vector<pthread_t> WorkerThreads;
444 
450  template <class DataT>
451  struct Job {
452  explicit Job(const DataT &data) :
453  data(data),
454  is_death_sentence(false) {}
455  Job() :
456  data(),
457  is_death_sentence(true) {}
458  const DataT data;
459  const bool is_death_sentence;
460  };
461  typedef Job<expected_data_t> WorkerJob;
462  typedef Job<returned_data_t> CallbackJob;
463 
471  struct RunBinding {
473  delegate(delegate) {}
475  };
477 
480  const worker_context_t *worker_context) :
481  RunBinding(delegate),
482  worker_context(worker_context) {}
487  };
488 
489  public:
499  ConcurrentWorkers(const size_t number_of_workers,
500  const size_t maximal_queue_length,
501  worker_context_t *worker_context = NULL);
502  virtual ~ConcurrentWorkers();
503 
510  bool Initialize();
511 
518  inline void Schedule(const expected_data_t &data) {
519  Schedule(WorkerJob(data));
520  }
521 
529  void Terminate();
530 
537  void WaitForEmptyQueue() const;
538 
546  void WaitForTermination();
547 
548  inline unsigned int GetNumberOfWorkers() const { return number_of_workers_; }
549  inline unsigned int GetNumberOfFailedJobs() const {
550  return atomic_read32(&jobs_failed_);
551  }
552 
559  inline void JobSuccessful(const returned_data_t& data) {
560  JobDone(data, true);
561  }
562 
573  inline void JobFailed(const returned_data_t& data) { JobDone(data, false); }
574 
575  void RunCallbackThread();
576 
577  protected:
578  bool SpawnWorkers();
579 
588  static void* RunWorker(void *run_binding);
589 
590  static void* RunCallbackThreadWrapper(void *run_binding);
591 
596  void ReportStartedWorker() const;
597 
598  void Schedule(WorkerJob job);
599  void ScheduleDeathSentences();
600 
606  void TruncateJobQueue(const bool forget_pending = false);
607 
615  inline WorkerJob Acquire();
616 
624  void JobDone(const returned_data_t& data, const bool success = true);
625 
626  inline void StartRunning() {
627  MutexLockGuard guard(status_mutex_);
628  running_ = true;
629  }
630  inline void StopRunning() {
631  MutexLockGuard guard(status_mutex_);
632  running_ = false;
633  }
634  inline bool IsRunning() const {
635  MutexLockGuard guard(status_mutex_);
636  return running_;
637  }
638 
639  private:
640  // general configuration
641  const size_t number_of_workers_;
642  const worker_context_t *worker_context_;
643 
646  WorkerRunBinding thread_context_;
647 
648  // status information
650  bool running_;
651  mutable unsigned int workers_started_;
652  mutable pthread_mutex_t status_mutex_;
653  mutable pthread_cond_t worker_started_;
654  mutable pthread_mutex_t jobs_all_done_mutex_;
655  mutable pthread_cond_t jobs_all_done_;
656 
657  // worker threads
659  pthread_t callback_thread_;
660 
661  // job queue
667 
668  // callback channel
671 };
672 
673 
717 template <class DerivedWorkerT>
719  public:
720  virtual ~ConcurrentWorker() {}
721 
728  virtual bool Initialize() { return true; }
729 
734  virtual void TearDown() {}
735 
747  // void operator()(const expected_data &data); // do the actual job of the
748  // worker
749 
750  protected:
751  ConcurrentWorker() : master_(NULL) {}
752 
758  inline ConcurrentWorkers<DerivedWorkerT>* master() const { return master_; }
759 
760  private:
761  friend class ConcurrentWorkers<DerivedWorkerT>;
763  master_ = master;
764  }
765 
766  private:
768 };
769 
770 #ifdef CVMFS_NAMESPACE_GUARD
771 } // namespace CVMFS_NAMESPACE_GUARD
772 #endif
773 
774 #include "util/concurrency_impl.h"
775 
776 #endif // CVMFS_UTIL_CONCURRENCY_H_
JobQueue jobs_queue_
Definition: concurrency.h:663
Job< returned_data_t > CallbackJob
Definition: concurrency.h:462
T maximal_value() const
Definition: concurrency.h:154
void RegisterMaster(ConcurrentWorkers< DerivedWorkerT > *master)
Definition: concurrency.h:762
SynchronizingCounter< T > & operator=(const T &other)
Definition: concurrency.h:166
Callbackable< ParamT >::CallbackTN * CallbackPtr
Definition: concurrency.h:221
FifoChannel< WorkerJob > JobQueue
Definition: concurrency.h:662
pthread_t callback_thread_
handles callback invokes
Definition: concurrency.h:659
virtual bool Initialize()
Definition: concurrency.h:728
int64_t atomic_int64
Definition: atomic.h:18
pthread_mutex_t mutex_
Definition: concurrency.h:184
pthread_cond_t queue_is_not_empty_
Definition: concurrency.h:402
const size_t queue_drainout_threshold_
Definition: concurrency.h:398
WorkerT::returned_data returned_data_t
Definition: concurrency.h:436
ConcurrentWorkers< DerivedWorkerT > * master() const
Definition: concurrency.h:758
int TryLock() const
Definition: concurrency.h:35
bool object_was_set_
Definition: concurrency.h:94
RunBinding(ConcurrentWorkers< WorkerT > *delegate)
Definition: concurrency.h:472
SynchronizingCounter(const T maximal_value)
Definition: concurrency.h:122
pthread_cond_t became_zero_
Definition: concurrency.h:185
pthread_cond_t signal_
Definition: concurrency.h:337
pthread_cond_t object_set_
Definition: concurrency.h:93
#define CVMFS_EXPORT
Definition: export.h:11
atomic_int32 jobs_pending_
Definition: concurrency.h:664
unsigned int GetNumberOfCpuCores()
Definition: concurrency.cc:18
unsigned int GetNumberOfWorkers() const
Definition: concurrency.h:548
T object_
Definition: concurrency.h:91
Callbacks listeners_
Definition: concurrency.h:302
assert((mem||(size==0))&&"Out Of Memory")
WorkerThreads worker_threads_
list of worker threads
Definition: concurrency.h:658
FifoChannel< CallbackJob > CallbackQueue
Definition: concurrency.h:669
const T operator--(int)
Definition: concurrency.h:159
bool HasMaximalValue() const
Definition: concurrency.h:153
WorkerT::worker_context worker_context_t
Definition: concurrency.h:440
void JobSuccessful(const returned_data_t &data)
Definition: concurrency.h:559
pthread_cond_t jobs_all_done_
Definition: concurrency.h:655
Definition: async.h:45
void Schedule(const expected_data_t &data)
Definition: concurrency.h:518
bool fired_
Definition: concurrency.h:335
virtual ~ConcurrentWorker()
Definition: concurrency.h:720
int32_t atomic_int32
Definition: atomic.h:17
virtual void TearDown()
Definition: concurrency.h:734
pthread_cond_t worker_started_
Definition: concurrency.h:653
void Lock() const
Definition: concurrency.h:34
const worker_context_t * worker_context
Definition: concurrency.h:486
const worker_context_t * worker_context_
the WorkerT defined context
Definition: concurrency.h:642
std::vector< pthread_t > WorkerThreads
Definition: concurrency.h:443
atomic_int32 jobs_failed_
Definition: concurrency.h:665
const bool is_death_sentence
death sentence flag
Definition: concurrency.h:459
pthread_mutex_t mutex_
Definition: concurrency.h:45
Job< expected_data_t > WorkerJob
Definition: concurrency.h:461
pthread_mutex_t status_mutex_
Definition: concurrency.h:652
pthread_cond_t queue_is_not_full_
Definition: concurrency.h:403
WorkerRunBinding thread_context_
Definition: concurrency.h:646
static const unsigned int kFallbackNumberOfCpus
Definition: concurrency.h:320
const T operator++(int)
Definition: concurrency.h:157
virtual ~Lockable()
Definition: concurrency.h:32
const size_t maximal_queue_length_
Definition: concurrency.h:397
WorkerT::expected_data expected_data_t
Definition: concurrency.h:432
pthread_mutex_t mutex_
Definition: concurrency.h:401
CallbackQueue results_queue_
Definition: concurrency.h:670
pthread_cond_t free_slot_
Definition: concurrency.h:186
unsigned int workers_started_
Definition: concurrency.h:651
const DataT data
job payload
Definition: concurrency.h:458
pthread_mutex_t mutex_
Definition: concurrency.h:92
void Unlock() const
Definition: concurrency.h:36
Job(const DataT &data)
Definition: concurrency.h:452
void WaitForZero() const
Definition: concurrency.h:145
ConcurrentWorkers< WorkerT > * delegate
Definition: concurrency.h:474
void UnregisterListener(ListenerHandle *handle)
Definition: mutex.h:42
atomic_int64 jobs_processed_
Definition: concurrency.h:666
unsigned int GetNumberOfFailedJobs() const
Definition: concurrency.h:549
const size_t number_of_workers_
number of concurrent worker threads
Definition: concurrency.h:641
void JobFailed(const returned_data_t &data)
Definition: concurrency.h:573
std::set< CallbackPtr > Callbacks
Definition: concurrency.h:223
pthread_mutex_t jobs_all_done_mutex_
Definition: concurrency.h:654
ConcurrentWorkers< DerivedWorkerT > * master_
Definition: concurrency.h:767
WorkerRunBinding(ConcurrentWorkers< WorkerT > *delegate, const worker_context_t *worker_context)
Definition: concurrency.h:479
bool IsRunning() const
Definition: concurrency.h:634
pthread_mutex_t lock_
Definition: concurrency.h:336