CernVM-FS  2.10.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
util_concurrency.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_UTIL_CONCURRENCY_H_
6 #define CVMFS_UTIL_CONCURRENCY_H_
7 
8 #include <pthread.h>
9 
10 #include <cassert>
11 #include <queue>
12 #include <set>
13 #include <vector>
14 
15 #include "atomic.h"
16 #include "util/async.h"
17 #include "util/mutex.h"
18 #include "util/single_copy.h"
19 
20 #ifdef CVMFS_NAMESPACE_GUARD
21 namespace CVMFS_NAMESPACE_GUARD {
22 #endif
23 
30  public:
31  inline virtual ~Lockable() { pthread_mutex_destroy(&mutex_); }
32 
33  void Lock() const { pthread_mutex_lock(&mutex_); }
34  int TryLock() const { return pthread_mutex_trylock(&mutex_); }
35  void Unlock() const { pthread_mutex_unlock(&mutex_); }
36 
37  protected:
39  const int retval = pthread_mutex_init(&mutex_, NULL);
40  assert(retval == 0);
41  }
42 
43  private:
44  mutable pthread_mutex_t mutex_;
45 };
46 
47 
48 //
49 // -----------------------------------------------------------------------------
50 //
51 
52 
65 template <typename T>
66 class Future : SingleCopy {
67  public:
68  Future();
69  virtual ~Future();
70 
76  void Set(const T &object);
77 
83  T& Get();
84  const T& Get() const;
85 
86  protected:
87  void Wait() const;
88 
89  private:
91  mutable pthread_mutex_t mutex_;
92  mutable pthread_cond_t object_set_;
94 };
95 
96 
97 //
98 // -----------------------------------------------------------------------------
99 //
100 
101 
115 template <typename T>
117  public:
119  value_(T(0)), maximal_value_(T(0)) { Initialize(); }
120 
121  explicit SynchronizingCounter(const T maximal_value)
122  : value_(T(0))
123  , maximal_value_(maximal_value)
124  {
125  assert(maximal_value > T(0));
126  Initialize();
127  }
128 
129  ~SynchronizingCounter() { Destroy(); }
130 
131  T Increment() {
132  MutexLockGuard l(mutex_);
133  WaitForFreeSlotUnprotected();
134  SetValueUnprotected(value_ + T(1));
135  return value_;
136  }
137 
138  T Decrement() {
139  MutexLockGuard l(mutex_);
140  SetValueUnprotected(value_ - T(1));
141  return value_;
142  }
143 
144  void WaitForZero() const {
145  MutexLockGuard l(mutex_);
146  while (value_ != T(0)) {
147  pthread_cond_wait(&became_zero_, &mutex_);
148  }
149  assert(value_ == T(0));
150  }
151 
152  bool HasMaximalValue() const { return maximal_value_ != T(0); }
153  T maximal_value() const { return maximal_value_; }
154 
155  T operator++() { return Increment(); }
156  const T operator++(int) { return Increment() - T(1); }
157  T operator--() { return Decrement(); }
158  const T operator--(int) { return Decrement() + T(1); }
159 
160  T Get() const {
161  MutexLockGuard l(mutex_);
162  return value_;
163  }
164 
166  MutexLockGuard l(mutex_);
167  SetValueUnprotected(other);
168  return *this;
169  }
170 
171  protected:
172  void SetValueUnprotected(const T new_value);
173  void WaitForFreeSlotUnprotected();
174 
175  private:
176  void Initialize();
177  void Destroy();
178 
179  private:
181  const T maximal_value_;
182 
183  mutable pthread_mutex_t mutex_;
184  mutable pthread_cond_t became_zero_;
185  pthread_cond_t free_slot_;
186 };
187 
188 
189 //
190 // -----------------------------------------------------------------------------
191 //
192 
193 
194 template <typename ParamT>
196 
197 
216 template <typename ParamT>
217 class Observable : public Callbackable<ParamT>,
218  SingleCopy {
219  public:
221  protected:
222  typedef std::set<CallbackPtr> Callbacks;
223 
224  public:
225  virtual ~Observable();
226 
240  template <class DelegateT, class ClosureDataT>
241  CallbackPtr RegisterListener(
242  typename BoundClosure<ParamT,
243  DelegateT,
244  ClosureDataT>::CallbackMethod method,
245  DelegateT *delegate,
246  ClosureDataT data);
247 
258  template <class DelegateT>
259  CallbackPtr RegisterListener(
261  DelegateT *delegate);
262 
271  CallbackPtr RegisterListener(typename Callback<ParamT>::CallbackFunction fn);
272 
279  void UnregisterListener(CallbackPtr callback_object);
280 
284  void UnregisterListeners();
285 
286  protected:
287  Observable(); // don't instantiate this as a stand alone object
288 
289  void RegisterListener(CallbackPtr callback_object);
290 
298  void NotifyListeners(const ParamT &parameter);
299 
300  private:
302  mutable pthread_rwlock_t listeners_rw_lock_;
304 };
305 
306 
307 //
308 // -----------------------------------------------------------------------------
309 //
310 
311 
318 unsigned int GetNumberOfCpuCores();
319 static const unsigned int kFallbackNumberOfCpus = 1;
320 
321 
326  public:
327  Signal();
328  ~Signal();
329  void Wakeup();
330  void Wait();
331  bool IsSleeping();
332 
333  private:
334  bool fired_;
335  pthread_mutex_t lock_;
336  pthread_cond_t signal_;
337 };
338 
339 
340 //
341 // -----------------------------------------------------------------------------
342 //
343 
344 
352 template <class T>
353 class FifoChannel : protected std::queue<T> {
354  public:
362  FifoChannel(const size_t maximal_length,
363  const size_t drainout_threshold);
364  virtual ~FifoChannel();
365 
373  void Enqueue(const T &data);
374 
381  const T Dequeue();
382 
388  unsigned int Drop();
389 
390  inline size_t GetItemCount() const;
391  inline bool IsEmpty() const;
392  inline size_t GetMaximalItemCount() const;
393 
394  private:
395  // general configuration
396  const size_t maximal_queue_length_;
398 
399  // thread synchronisation structures
400  mutable pthread_mutex_t mutex_;
401  mutable pthread_cond_t queue_is_not_empty_;
402  mutable pthread_cond_t queue_is_not_full_;
403 };
404 
405 
424 template <class WorkerT>
425 class ConcurrentWorkers : public Observable<typename WorkerT::returned_data> {
426  public:
427  // these data types must be defined by the worker class
431  typedef typename WorkerT::expected_data expected_data_t;
435  typedef typename WorkerT::returned_data returned_data_t;
439  typedef typename WorkerT::worker_context worker_context_t;
440 
441  protected:
442  typedef std::vector<pthread_t> WorkerThreads;
443 
449  template <class DataT>
450  struct Job {
451  explicit Job(const DataT &data) :
452  data(data),
453  is_death_sentence(false) {}
454  Job() :
455  data(),
456  is_death_sentence(true) {}
457  const DataT data;
458  const bool is_death_sentence;
459  };
460  typedef Job<expected_data_t> WorkerJob;
461  typedef Job<returned_data_t> CallbackJob;
462 
470  struct RunBinding {
472  delegate(delegate) {}
474  };
476 
479  const worker_context_t *worker_context) :
480  RunBinding(delegate),
481  worker_context(worker_context) {}
486  };
487 
488  public:
498  ConcurrentWorkers(const size_t number_of_workers,
499  const size_t maximal_queue_length,
500  worker_context_t *worker_context = NULL);
501  virtual ~ConcurrentWorkers();
502 
509  bool Initialize();
510 
517  inline void Schedule(const expected_data_t &data) {
518  Schedule(WorkerJob(data));
519  }
520 
528  void Terminate();
529 
536  void WaitForEmptyQueue() const;
537 
545  void WaitForTermination();
546 
547  inline unsigned int GetNumberOfWorkers() const { return number_of_workers_; }
548  inline unsigned int GetNumberOfFailedJobs() const {
549  return atomic_read32(&jobs_failed_);
550  }
551 
558  inline void JobSuccessful(const returned_data_t& data) {
559  JobDone(data, true);
560  }
561 
572  inline void JobFailed(const returned_data_t& data) { JobDone(data, false); }
573 
574  void RunCallbackThread();
575 
576  protected:
577  bool SpawnWorkers();
578 
587  static void* RunWorker(void *run_binding);
588 
589  static void* RunCallbackThreadWrapper(void *run_binding);
590 
595  void ReportStartedWorker() const;
596 
597  void Schedule(WorkerJob job);
598  void ScheduleDeathSentences();
599 
605  void TruncateJobQueue(const bool forget_pending = false);
606 
614  inline WorkerJob Acquire();
615 
623  void JobDone(const returned_data_t& data, const bool success = true);
624 
625  inline void StartRunning() {
626  MutexLockGuard guard(status_mutex_);
627  running_ = true;
628  }
629  inline void StopRunning() {
630  MutexLockGuard guard(status_mutex_);
631  running_ = false;
632  }
633  inline bool IsRunning() const {
634  MutexLockGuard guard(status_mutex_);
635  return running_;
636  }
637 
638  private:
639  // general configuration
640  const size_t number_of_workers_;
641  const worker_context_t *worker_context_;
642 
645  WorkerRunBinding thread_context_;
646 
647  // status information
649  bool running_;
650  mutable unsigned int workers_started_;
651  mutable pthread_mutex_t status_mutex_;
652  mutable pthread_cond_t worker_started_;
653  mutable pthread_mutex_t jobs_all_done_mutex_;
654  mutable pthread_cond_t jobs_all_done_;
655 
656  // worker threads
658  pthread_t callback_thread_;
659 
660  // job queue
666 
667  // callback channel
670 };
671 
672 
716 template <class DerivedWorkerT>
718  public:
719  virtual ~ConcurrentWorker() {}
720 
727  virtual bool Initialize() { return true; }
728 
733  virtual void TearDown() {}
734 
746  // void operator()(const expected_data &data); // do the actual job of the
747  // worker
748 
749  protected:
750  ConcurrentWorker() : master_(NULL) {}
751 
757  inline ConcurrentWorkers<DerivedWorkerT>* master() const { return master_; }
758 
759  private:
760  friend class ConcurrentWorkers<DerivedWorkerT>;
762  master_ = master;
763  }
764 
765  private:
767 };
768 
769 #ifdef CVMFS_NAMESPACE_GUARD
770 } // namespace CVMFS_NAMESPACE_GUARD
771 #endif
772 
773 #include "util_concurrency_impl.h"
774 
775 #endif // CVMFS_UTIL_CONCURRENCY_H_
Job< returned_data_t > CallbackJob
void RegisterMaster(ConcurrentWorkers< DerivedWorkerT > *master)
SynchronizingCounter< T > & operator=(const T &other)
Callbackable< ParamT >::CallbackTN * CallbackPtr
FifoChannel< WorkerJob > JobQueue
pthread_t callback_thread_
handles callback invokes
virtual bool Initialize()
int64_t atomic_int64
Definition: atomic.h:18
pthread_mutex_t mutex_
pthread_cond_t queue_is_not_empty_
unsigned int GetNumberOfCpuCores()
const size_t queue_drainout_threshold_
WorkerT::returned_data returned_data_t
ConcurrentWorkers< DerivedWorkerT > * master() const
int TryLock() const
bool object_was_set_
RunBinding(ConcurrentWorkers< WorkerT > *delegate)
SynchronizingCounter(const T maximal_value)
pthread_cond_t became_zero_
pthread_cond_t signal_
pthread_cond_t object_set_
atomic_int32 jobs_pending_
unsigned int GetNumberOfWorkers() const
Callbacks listeners_
assert((mem||(size==0))&&"Out Of Memory")
WorkerThreads worker_threads_
list of worker threads
FifoChannel< CallbackJob > CallbackQueue
bool HasMaximalValue() const
WorkerT::worker_context worker_context_t
void JobSuccessful(const returned_data_t &data)
pthread_cond_t jobs_all_done_
Definition: async.h:45
void Schedule(const expected_data_t &data)
virtual ~ConcurrentWorker()
int32_t atomic_int32
Definition: atomic.h:17
virtual void TearDown()
pthread_cond_t worker_started_
void Lock() const
const worker_context_t * worker_context
const worker_context_t * worker_context_
the WorkerT defined context
static const unsigned int kFallbackNumberOfCpus
std::vector< pthread_t > WorkerThreads
atomic_int32 jobs_failed_
const bool is_death_sentence
death sentence flag
pthread_mutex_t mutex_
Job< expected_data_t > WorkerJob
pthread_mutex_t status_mutex_
pthread_cond_t queue_is_not_full_
WorkerRunBinding thread_context_
virtual ~Lockable()
const size_t maximal_queue_length_
WorkerT::expected_data expected_data_t
pthread_mutex_t mutex_
CallbackQueue results_queue_
pthread_cond_t free_slot_
unsigned int workers_started_
const DataT data
job payload
pthread_mutex_t mutex_
void Unlock() const
Job(const DataT &data)
void WaitForZero() const
ConcurrentWorkers< WorkerT > * delegate
void UnregisterListener(ListenerHandle *handle)
Definition: mutex.h:42
atomic_int64 jobs_processed_
unsigned int GetNumberOfFailedJobs() const
const size_t number_of_workers_
number of concurrent worker threads
void JobFailed(const returned_data_t &data)
std::set< CallbackPtr > Callbacks
pthread_mutex_t jobs_all_done_mutex_
ConcurrentWorkers< DerivedWorkerT > * master_
WorkerRunBinding(ConcurrentWorkers< WorkerT > *delegate, const worker_context_t *worker_context)
bool IsRunning() const
pthread_mutex_t lock_