CernVM-FS  2.9.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
util_concurrency.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_UTIL_CONCURRENCY_H_
6 #define CVMFS_UTIL_CONCURRENCY_H_
7 
8 #include <pthread.h>
9 
10 #include <cassert>
11 #include <queue>
12 #include <set>
13 #include <vector>
14 
15 #include "atomic.h"
16 #include "util/async.h"
17 #include "util/single_copy.h"
18 
19 #ifdef CVMFS_NAMESPACE_GUARD
20 namespace CVMFS_NAMESPACE_GUARD {
21 #endif
22 
29  public:
30  inline virtual ~Lockable() { pthread_mutex_destroy(&mutex_); }
31 
32  void Lock() const { pthread_mutex_lock(&mutex_); }
33  int TryLock() const { return pthread_mutex_trylock(&mutex_); }
34  void Unlock() const { pthread_mutex_unlock(&mutex_); }
35 
36  protected:
38  const int retval = pthread_mutex_init(&mutex_, NULL);
39  assert(retval == 0);
40  }
41 
42  private:
43  mutable pthread_mutex_t mutex_;
44 };
45 
46 
47 //
48 // -----------------------------------------------------------------------------
49 //
50 
60  enum T {
63  WriteLock
64  };
65 };
66 
67 
76 template <typename T, _RAII_Polymorphism::T P = _RAII_Polymorphism::None>
77 class RAII : SingleCopy {
78  public:
79  inline explicit RAII(T &object) : ref_(object) { Enter(); }
80  inline explicit RAII(T *object) : ref_(*object) { Enter(); }
81  inline ~RAII() { Leave(); }
82 
83  protected:
84  inline void Enter() { ref_.Lock(); }
85  inline void Leave() { ref_.Unlock(); }
86 
87  private:
88  T &ref_;
89 };
90 
91 
103 template <typename LockableT>
104 class LockGuard : public RAII<LockableT> {
105  public:
106  inline explicit LockGuard(LockableT *object) : RAII<LockableT>(object) {}
107 };
108 
109 
110 template <>
111 inline void RAII<pthread_mutex_t>::Enter() { pthread_mutex_lock(&ref_); }
112 template <>
113 inline void RAII<pthread_mutex_t>::Leave() { pthread_mutex_unlock(&ref_); }
115 
116 
117 template <>
118 inline void RAII<pthread_rwlock_t,
120  pthread_rwlock_rdlock(&ref_);
121 }
122 template <>
123 inline void RAII<pthread_rwlock_t,
125  pthread_rwlock_unlock(&ref_);
126 }
127 template <>
128 inline void RAII<pthread_rwlock_t,
130  pthread_rwlock_wrlock(&ref_);
131 }
132 template <>
133 inline void RAII<pthread_rwlock_t,
135  pthread_rwlock_unlock(&ref_);
136 }
139 
140 
141 //
142 // -----------------------------------------------------------------------------
143 //
144 
145 
158 template <typename T>
160  public:
161  Future();
162  virtual ~Future();
163 
169  void Set(const T &object);
170 
176  T& Get();
177  const T& Get() const;
178 
179  protected:
180  void Wait() const;
181 
182  private:
184  mutable pthread_mutex_t mutex_;
185  mutable pthread_cond_t object_set_;
187 };
188 
189 
190 //
191 // -----------------------------------------------------------------------------
192 //
193 
194 
208 template <typename T>
210  public:
212  value_(T(0)), maximal_value_(T(0)) { Initialize(); }
213 
214  explicit SynchronizingCounter(const T maximal_value)
215  : value_(T(0))
216  , maximal_value_(maximal_value)
217  {
218  assert(maximal_value > T(0));
219  Initialize();
220  }
221 
222  ~SynchronizingCounter() { Destroy(); }
223 
224  T Increment() {
225  MutexLockGuard l(mutex_);
226  WaitForFreeSlotUnprotected();
227  SetValueUnprotected(value_ + T(1));
228  return value_;
229  }
230 
231  T Decrement() {
232  MutexLockGuard l(mutex_);
233  SetValueUnprotected(value_ - T(1));
234  return value_;
235  }
236 
237  void WaitForZero() const {
238  MutexLockGuard l(mutex_);
239  while (value_ != T(0)) {
240  pthread_cond_wait(&became_zero_, &mutex_);
241  }
242  assert(value_ == T(0));
243  }
244 
245  bool HasMaximalValue() const { return maximal_value_ != T(0); }
246  T maximal_value() const { return maximal_value_; }
247 
248  T operator++() { return Increment(); }
249  T operator++(int) { return Increment() - T(1); }
250  T operator--() { return Decrement(); }
251  T operator--(int) { return Decrement() + T(1); }
252 
253  operator T() const {
254  MutexLockGuard l(mutex_);
255  return value_;
256  }
257 
259  MutexLockGuard l(mutex_);
260  SetValueUnprotected(other);
261  return *this;
262  }
263 
264  protected:
265  void SetValueUnprotected(const T new_value);
266  void WaitForFreeSlotUnprotected();
267 
268  private:
269  void Initialize();
270  void Destroy();
271 
272  private:
274  const T maximal_value_;
275 
276  mutable pthread_mutex_t mutex_;
277  mutable pthread_cond_t became_zero_;
278  pthread_cond_t free_slot_;
279 };
280 
281 
282 //
283 // -----------------------------------------------------------------------------
284 //
285 
286 
287 template <typename ParamT>
289 
290 
309 template <typename ParamT>
310 class Observable : public Callbackable<ParamT>,
311  SingleCopy {
312  public:
314  protected:
315  typedef std::set<CallbackPtr> Callbacks;
316 
317  public:
318  virtual ~Observable();
319 
333  template <class DelegateT, class ClosureDataT>
334  CallbackPtr RegisterListener(
335  typename BoundClosure<ParamT,
336  DelegateT,
337  ClosureDataT>::CallbackMethod method,
338  DelegateT *delegate,
339  ClosureDataT data);
340 
351  template <class DelegateT>
352  CallbackPtr RegisterListener(
354  DelegateT *delegate);
355 
364  CallbackPtr RegisterListener(typename Callback<ParamT>::CallbackFunction fn);
365 
372  void UnregisterListener(CallbackPtr callback_object);
373 
377  void UnregisterListeners();
378 
379  protected:
380  Observable(); // don't instantiate this as a stand alone object
381 
382  void RegisterListener(CallbackPtr callback_object);
383 
391  void NotifyListeners(const ParamT &parameter);
392 
393  private:
395  mutable pthread_rwlock_t listeners_rw_lock_;
397 };
398 
399 
400 //
401 // -----------------------------------------------------------------------------
402 //
403 
404 
411 unsigned int GetNumberOfCpuCores();
412 static const unsigned int kFallbackNumberOfCpus = 1;
413 
414 
419  public:
420  Signal();
421  ~Signal();
422  void Wakeup();
423  void Wait();
424  bool IsSleeping();
425 
426  private:
427  bool fired_;
428  pthread_mutex_t lock_;
429  pthread_cond_t signal_;
430 };
431 
432 
433 //
434 // -----------------------------------------------------------------------------
435 //
436 
437 
445 template <class T>
446 class FifoChannel : protected std::queue<T> {
447  public:
455  FifoChannel(const size_t maximal_length,
456  const size_t drainout_threshold);
457  virtual ~FifoChannel();
458 
466  void Enqueue(const T &data);
467 
474  const T Dequeue();
475 
481  unsigned int Drop();
482 
483  inline size_t GetItemCount() const;
484  inline bool IsEmpty() const;
485  inline size_t GetMaximalItemCount() const;
486 
487  private:
488  // general configuration
489  const size_t maximal_queue_length_;
491 
492  // thread synchronisation structures
493  mutable pthread_mutex_t mutex_;
494  mutable pthread_cond_t queue_is_not_empty_;
495  mutable pthread_cond_t queue_is_not_full_;
496 };
497 
498 
517 template <class WorkerT>
518 class ConcurrentWorkers : public Observable<typename WorkerT::returned_data> {
519  public:
520  // these data types must be defined by the worker class
524  typedef typename WorkerT::expected_data expected_data_t;
528  typedef typename WorkerT::returned_data returned_data_t;
532  typedef typename WorkerT::worker_context worker_context_t;
533 
534  protected:
535  typedef std::vector<pthread_t> WorkerThreads;
536 
542  template <class DataT>
543  struct Job {
544  explicit Job(const DataT &data) :
545  data(data),
546  is_death_sentence(false) {}
547  Job() :
548  data(),
549  is_death_sentence(true) {}
550  const DataT data;
551  const bool is_death_sentence;
552  };
553  typedef Job<expected_data_t> WorkerJob;
554  typedef Job<returned_data_t> CallbackJob;
555 
563  struct RunBinding {
565  delegate(delegate) {}
567  };
569 
572  const worker_context_t *worker_context) :
573  RunBinding(delegate),
574  worker_context(worker_context) {}
579  };
580 
581  public:
591  ConcurrentWorkers(const size_t number_of_workers,
592  const size_t maximal_queue_length,
593  worker_context_t *worker_context = NULL);
594  virtual ~ConcurrentWorkers();
595 
602  bool Initialize();
603 
610  inline void Schedule(const expected_data_t &data) {
611  Schedule(WorkerJob(data));
612  }
613 
621  void Terminate();
622 
629  void WaitForEmptyQueue() const;
630 
638  void WaitForTermination();
639 
640  inline unsigned int GetNumberOfWorkers() const { return number_of_workers_; }
641  inline unsigned int GetNumberOfFailedJobs() const {
642  return atomic_read32(&jobs_failed_);
643  }
644 
651  inline void JobSuccessful(const returned_data_t& data) {
652  JobDone(data, true);
653  }
654 
665  inline void JobFailed(const returned_data_t& data) { JobDone(data, false); }
666 
667  void RunCallbackThread();
668 
669  protected:
670  bool SpawnWorkers();
671 
680  static void* RunWorker(void *run_binding);
681 
682  static void* RunCallbackThreadWrapper(void *run_binding);
683 
688  void ReportStartedWorker() const;
689 
690  void Schedule(WorkerJob job);
691  void ScheduleDeathSentences();
692 
698  void TruncateJobQueue(const bool forget_pending = false);
699 
707  inline WorkerJob Acquire();
708 
716  void JobDone(const returned_data_t& data, const bool success = true);
717 
718  inline void StartRunning() {
719  MutexLockGuard guard(status_mutex_);
720  running_ = true;
721  }
722  inline void StopRunning() {
723  MutexLockGuard guard(status_mutex_);
724  running_ = false;
725  }
726  inline bool IsRunning() const {
727  MutexLockGuard guard(status_mutex_);
728  return running_;
729  }
730 
731  private:
732  // general configuration
733  const size_t number_of_workers_;
734  const worker_context_t *worker_context_;
735 
738  WorkerRunBinding thread_context_;
739 
740  // status information
742  bool running_;
743  mutable unsigned int workers_started_;
744  mutable pthread_mutex_t status_mutex_;
745  mutable pthread_cond_t worker_started_;
746  mutable pthread_mutex_t jobs_all_done_mutex_;
747  mutable pthread_cond_t jobs_all_done_;
748 
749  // worker threads
751  pthread_t callback_thread_;
752 
753  // job queue
759 
760  // callback channel
763 };
764 
765 
809 template <class DerivedWorkerT>
811  public:
812  virtual ~ConcurrentWorker() {}
813 
820  virtual bool Initialize() { return true; }
821 
826  virtual void TearDown() {}
827 
839  // void operator()(const expected_data &data); // do the actual job of the
840  // worker
841 
842  protected:
843  ConcurrentWorker() : master_(NULL) {}
844 
850  inline ConcurrentWorkers<DerivedWorkerT>* master() const { return master_; }
851 
852  private:
853  friend class ConcurrentWorkers<DerivedWorkerT>;
855  master_ = master;
856  }
857 
858  private:
860 };
861 
862 #ifdef CVMFS_NAMESPACE_GUARD
863 } // namespace CVMFS_NAMESPACE_GUARD
864 #endif
865 
866 #include "util_concurrency_impl.h"
867 
868 #endif // CVMFS_UTIL_CONCURRENCY_H_
Job< returned_data_t > CallbackJob
void RegisterMaster(ConcurrentWorkers< DerivedWorkerT > *master)
SynchronizingCounter< T > & operator=(const T &other)
Callbackable< ParamT >::CallbackTN * CallbackPtr
FifoChannel< WorkerJob > JobQueue
pthread_t callback_thread_
handles callback invokes
virtual bool Initialize()
int64_t atomic_int64
Definition: atomic.h:18
pthread_mutex_t mutex_
pthread_cond_t queue_is_not_empty_
unsigned int GetNumberOfCpuCores()
void Leave()
const size_t queue_drainout_threshold_
WorkerT::returned_data returned_data_t
ConcurrentWorkers< DerivedWorkerT > * master() const
RAII< pthread_rwlock_t, _RAII_Polymorphism::WriteLock > WriteLockGuard
RAII(T &object)
int TryLock() const
bool object_was_set_
RunBinding(ConcurrentWorkers< WorkerT > *delegate)
SynchronizingCounter(const T maximal_value)
pthread_cond_t became_zero_
pthread_cond_t signal_
RAII< pthread_mutex_t > MutexLockGuard
pthread_cond_t object_set_
atomic_int32 jobs_pending_
unsigned int GetNumberOfWorkers() const
Callbacks listeners_
assert((mem||(size==0))&&"Out Of Memory")
WorkerThreads worker_threads_
list of worker threads
FifoChannel< CallbackJob > CallbackQueue
RAII(T *object)
bool HasMaximalValue() const
WorkerT::worker_context worker_context_t
void JobSuccessful(const returned_data_t &data)
pthread_cond_t jobs_all_done_
Definition: async.h:45
void Schedule(const expected_data_t &data)
virtual ~ConcurrentWorker()
int32_t atomic_int32
Definition: atomic.h:17
virtual void TearDown()
pthread_cond_t worker_started_
void Lock() const
LockGuard(LockableT *object)
const worker_context_t * worker_context
const worker_context_t * worker_context_
the WorkerT defined context
static const unsigned int kFallbackNumberOfCpus
std::vector< pthread_t > WorkerThreads
atomic_int32 jobs_failed_
const bool is_death_sentence
death sentence flag
pthread_mutex_t mutex_
Job< expected_data_t > WorkerJob
pthread_mutex_t status_mutex_
pthread_cond_t queue_is_not_full_
WorkerRunBinding thread_context_
virtual ~Lockable()
const size_t maximal_queue_length_
WorkerT::expected_data expected_data_t
pthread_mutex_t mutex_
CallbackQueue results_queue_
pthread_cond_t free_slot_
unsigned int workers_started_
const DataT data
job payload
pthread_mutex_t mutex_
void Unlock() const
Job(const DataT &data)
void WaitForZero() const
ConcurrentWorkers< WorkerT > * delegate
void UnregisterListener(ListenerHandle *handle)
atomic_int64 jobs_processed_
unsigned int GetNumberOfFailedJobs() const
const size_t number_of_workers_
number of concurrent worker threads
void JobFailed(const returned_data_t &data)
std::set< CallbackPtr > Callbacks
pthread_mutex_t jobs_all_done_mutex_
RAII< pthread_rwlock_t, _RAII_Polymorphism::ReadLock > ReadLockGuard
ConcurrentWorkers< DerivedWorkerT > * master_
WorkerRunBinding(ConcurrentWorkers< WorkerT > *delegate, const worker_context_t *worker_context)
bool IsRunning() const
void Enter()
pthread_mutex_t lock_