CernVM-FS  2.11.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
concurrency_impl.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_UTIL_CONCURRENCY_IMPL_H_
6 #define CVMFS_UTIL_CONCURRENCY_IMPL_H_
7 
8 #include "util/logging.h"
9 
10 #ifdef CVMFS_NAMESPACE_GUARD
11 namespace CVMFS_NAMESPACE_GUARD {
12 #endif
13 
14 //
15 // +----------------------------------------------------------------------------
16 // | Future
17 //
18 
19 
20 template <typename T>
21 Future<T>::Future() : object_(), object_was_set_(false) {
22  const bool init_successful = (pthread_mutex_init(&mutex_, NULL) == 0 &&
23  pthread_cond_init(&object_set_, NULL) == 0);
24  assert(init_successful);
25 }
26 
27 
28 template <typename T>
30  pthread_cond_destroy(&object_set_);
31  pthread_mutex_destroy(&mutex_);
32 }
33 
34 
35 template <typename T>
36 void Future<T>::Set(const T &object) {
37  MutexLockGuard guard(mutex_);
38  assert(!object_was_set_);
39  object_ = object;
40  object_was_set_ = true;
41  pthread_cond_broadcast(&object_set_);
42 }
43 
44 
45 template <typename T>
46 void Future<T>::Wait() const {
47  MutexLockGuard guard(mutex_);
48  if (!object_was_set_) {
49  pthread_cond_wait(&object_set_, &mutex_);
50  }
51  assert(object_was_set_);
52 }
53 
54 
55 template <typename T>
57  Wait();
58  return object_;
59 }
60 
61 
62 template <typename T>
63 const T& Future<T>::Get() const {
64  Wait();
65  return object_;
66 }
67 
68 
69 //
70 // +----------------------------------------------------------------------------
71 // | SynchronizingCounter
72 //
73 
74 
75 template <typename T>
77  // make sure that 0 <= new_value <= maximal_value_ if maximal_value_ != 0
78  assert(!HasMaximalValue() ||
79  (new_value >= T(0) && new_value <= maximal_value_));
80 
81  value_ = new_value;
82 
83  if (value_ == T(0)) {
84  pthread_cond_broadcast(&became_zero_);
85  }
86 
87  if (HasMaximalValue() && value_ < maximal_value_) {
88  pthread_cond_broadcast(&free_slot_);
89  }
90 }
91 
92 
93 template <typename T>
95  while (HasMaximalValue() && value_ >= maximal_value_) {
96  pthread_cond_wait(&free_slot_, &mutex_);
97  }
98  assert(!HasMaximalValue() || value_ < maximal_value_);
99 }
100 
101 
102 template <typename T>
104  const bool init_successful = (
105  pthread_mutex_init(&mutex_, NULL) == 0 &&
106  pthread_cond_init(&became_zero_, NULL) == 0 &&
107  pthread_cond_init(&free_slot_, NULL) == 0);
108  assert(init_successful);
109 }
110 
111 
112 template <typename T>
114  pthread_mutex_destroy(&mutex_);
115  pthread_cond_destroy(&became_zero_);
116  pthread_cond_destroy(&free_slot_);
117 }
118 
119 
120 //
121 // +----------------------------------------------------------------------------
122 // | Observable
123 //
124 
125 
126 template <typename ParamT>
128  const int ret = pthread_rwlock_init(&listeners_rw_lock_, NULL);
129  assert(ret == 0);
130 }
131 
132 
133 template <typename ParamT>
135  UnregisterListeners();
136  pthread_rwlock_destroy(&listeners_rw_lock_);
137 }
138 
139 
140 template <typename ParamT>
141 template <class DelegateT, class ClosureDataT>
143  typename BoundClosure<ParamT,
144  DelegateT,
145  ClosureDataT>::CallbackMethod method,
146  DelegateT *delegate,
147  ClosureDataT data) {
148  // create a new BoundClosure, register it and return the handle
149  CallbackBase<ParamT> *callback =
150  Observable<ParamT>::MakeClosure(method, delegate, data);
151  RegisterListener(callback);
152  return callback;
153 }
154 
155 
156 template <typename ParamT>
157 template <class DelegateT>
160  DelegateT *delegate) {
161  // create a new BoundCallback, register it and return the handle
162  CallbackBase<ParamT> *callback =
163  Observable<ParamT>::MakeCallback(method, delegate);
164  RegisterListener(callback);
165  return callback;
166 }
167 
168 
169 template <typename ParamT>
172  // create a new Callback, register it and return the handle
173  CallbackBase<ParamT> *callback =
175  RegisterListener(callback);
176  return callback;
177 }
178 
179 
180 template <typename ParamT>
182  Observable<ParamT>::CallbackPtr callback_object) {
183  // register a generic CallbackBase callback
184  WriteLockGuard guard(listeners_rw_lock_);
185  listeners_.insert(callback_object);
186 }
187 
188 
189 template <typename ParamT>
191  typename Observable<ParamT>::CallbackPtr callback_object) {
192  // remove a callback handle from the callbacks list
193  // if it is not registered --> crash
194  WriteLockGuard guard(listeners_rw_lock_);
195  const size_t was_removed = listeners_.erase(callback_object);
196  assert(was_removed > 0);
197  delete callback_object;
198 }
199 
200 
201 template <typename ParamT>
203  WriteLockGuard guard(listeners_rw_lock_);
204 
205  // remove all callbacks from the list
206  typename Callbacks::const_iterator i = listeners_.begin();
207  typename Callbacks::const_iterator iend = listeners_.end();
208  for (; i != iend; ++i) {
209  delete *i;
210  }
211  listeners_.clear();
212 }
213 
214 
215 template <typename ParamT>
216 void Observable<ParamT>::NotifyListeners(const ParamT &parameter) {
217  ReadLockGuard guard(listeners_rw_lock_);
218 
219  // invoke all callbacks and inform them about new data
220  typename Callbacks::const_iterator i = listeners_.begin();
221  typename Callbacks::const_iterator iend = listeners_.end();
222  for (; i != iend; ++i) {
223  (**i)(parameter);
224  }
225 }
226 
227 
228 //
229 // +----------------------------------------------------------------------------
230 // | FifoChannel
231 //
232 
233 
234 template <class T>
235 FifoChannel<T>::FifoChannel(const size_t maximal_length,
236  const size_t drainout_threshold) :
237  maximal_queue_length_(maximal_length),
238  queue_drainout_threshold_(drainout_threshold)
239 {
240  assert(drainout_threshold <= maximal_length);
241  assert(drainout_threshold > 0);
242 
243  const bool successful = (
244  pthread_mutex_init(&mutex_, NULL) == 0 &&
245  pthread_cond_init(&queue_is_not_empty_, NULL) == 0 &&
246  pthread_cond_init(&queue_is_not_full_, NULL) == 0);
247 
248  assert(successful);
249 }
250 
251 
252 template <class T>
254  pthread_cond_destroy(&queue_is_not_empty_);
255  pthread_cond_destroy(&queue_is_not_full_);
256  pthread_mutex_destroy(&mutex_);
257 }
258 
259 
260 template <class T>
261 void FifoChannel<T>::Enqueue(const T &data) {
262  MutexLockGuard lock(mutex_);
263 
264  // wait for space in the queue
265  while (this->size() >= maximal_queue_length_) {
266  pthread_cond_wait(&queue_is_not_full_, &mutex_);
267  }
268 
269  // put something into the queue
270  this->push(data);
271 
272  // wake all waiting threads
273  pthread_cond_broadcast(&queue_is_not_empty_);
274 }
275 
276 
277 template <class T>
279  MutexLockGuard lock(mutex_);
280 
281  // wait until there is something to do
282  while (this->empty()) {
283  pthread_cond_wait(&queue_is_not_empty_, &mutex_);
284  }
285 
286  // get the item from the queue
287  T data = this->front(); this->pop();
288 
289  // signal waiting threads about the free space
290  if (this->size() < queue_drainout_threshold_) {
291  pthread_cond_broadcast(&queue_is_not_full_);
292  }
293 
294  // return the acquired job
295  return data;
296 }
297 
298 
299 template <class T>
300 unsigned int FifoChannel<T>::Drop() {
301  MutexLockGuard lock(mutex_);
302 
303  unsigned int dropped_items = 0;
304  while (!this->empty()) {
305  this->pop();
306  ++dropped_items;
307  }
308 
309  pthread_cond_broadcast(&queue_is_not_full_);
310 
311  return dropped_items;
312 }
313 
314 
315 template <class T>
317  MutexLockGuard lock(mutex_);
318  return this->size();
319 }
320 
321 
322 template <class T>
324  MutexLockGuard lock(mutex_);
325  return this->empty();
326 }
327 
328 
329 template <class T>
331  return maximal_queue_length_;
332 }
333 
334 
335 //
336 // +----------------------------------------------------------------------------
337 // | ConcurrentWorkers
338 //
339 
340 
341 template <class WorkerT>
343  const size_t number_of_workers,
344  const size_t maximal_queue_length,
346  number_of_workers_(number_of_workers),
347  worker_context_(worker_context),
348  thread_context_(this, worker_context_),
349  initialized_(false),
350  running_(false),
351  workers_started_(0),
352  jobs_queue_(maximal_queue_length, maximal_queue_length / 4 + 1),
353  results_queue_(maximal_queue_length, 1)
354 {
355  assert(maximal_queue_length >= number_of_workers);
356  assert(number_of_workers > 0);
357 
358  atomic_init32(&jobs_pending_);
359  atomic_init32(&jobs_failed_);
360  atomic_init64(&jobs_processed_);
361 }
362 
363 
364 template <class WorkerT>
366  if (IsRunning()) {
367  Terminate();
368  }
369 
370  // destroy some synchronisation data structures
371  pthread_cond_destroy(&worker_started_);
372  pthread_cond_destroy(&jobs_all_done_);
373  pthread_mutex_destroy(&status_mutex_);
374  pthread_mutex_destroy(&jobs_all_done_mutex_);
375 }
376 
377 
378 template <class WorkerT>
380  LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Initializing ConcurrentWorker "
381  "object with %d worker threads "
382  "and a queue length of %d",
383  number_of_workers_, jobs_queue_.GetMaximalItemCount());
384  // LogCvmfs(kLogConcurrency, kLogStdout, "sizeof(expected_data_t): %d\n"
385  // "sizeof(returned_data_t): %d",
386  // sizeof(expected_data_t), sizeof(returned_data_t));
387 
388  // initialize synchronisation for job queue (Workers)
389  if (pthread_mutex_init(&status_mutex_, NULL) != 0 ||
390  pthread_mutex_init(&jobs_all_done_mutex_, NULL) != 0 ||
391  pthread_cond_init(&worker_started_, NULL) != 0 ||
392  pthread_cond_init(&jobs_all_done_, NULL) != 0) {
393  return false;
394  }
395 
396  // spawn the Worker objects in their own threads
397  if (!SpawnWorkers()) {
398  LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn workers");
399  return false;
400  }
401 
402  // all done...
403  initialized_ = true;
404  return true;
405 }
406 
407 
408 template <class WorkerT>
410  assert(worker_threads_.size() == 0);
411  worker_threads_.resize(number_of_workers_);
412 
413  // set the running flag to trap workers in their treadmills
414  StartRunning();
415 
416  // spawn the swarm and make them work
417  bool success = true;
418  WorkerThreads::iterator i = worker_threads_.begin();
419  WorkerThreads::const_iterator iend = worker_threads_.end();
420  for (; i != iend; ++i) {
421  pthread_t* thread = &(*i);
422  const int retval = pthread_create(
423  thread,
424  NULL,
426  reinterpret_cast<void *>(&thread_context_));
427  if (retval != 0) {
428  LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn a Worker");
429  success = false;
430  }
431  }
432 
433  // spawn the callback processing thread
434  const int retval =
435  pthread_create(
436  &callback_thread_,
437  NULL,
439  reinterpret_cast<void *>(&thread_context_));
440  if (retval != 0) {
441  LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn the callback "
442  "worker thread");
443  success = false;
444  }
445 
446  // wait for all workers to report in...
447  {
448  MutexLockGuard guard(status_mutex_);
449  // +1 -> callback thread
450  while (workers_started_ < number_of_workers_ + 1) {
451  pthread_cond_wait(&worker_started_, &status_mutex_);
452  }
453  }
454 
455  // all done...
456  return success;
457 }
458 
459 
460 template <class WorkerT>
461 void* ConcurrentWorkers<WorkerT>::RunWorker(void *run_binding) {
462  // NOTE: This is the actual worker thread code!
463 
464  //
465  // INITIALIZATION
467 
468  // get contextual information
469  const WorkerRunBinding &binding =
470  *(static_cast<WorkerRunBinding*>(run_binding));
471  ConcurrentWorkers<WorkerT> *master = binding.delegate;
472  const worker_context_t *worker_context = binding.worker_context;
473 
474  // boot up the worker object and make sure it works
475  WorkerT worker(worker_context);
476  worker.RegisterMaster(master);
477  const bool init_success = worker.Initialize();
478 
479  // tell the master that this worker was started
480  master->ReportStartedWorker();
481 
482  if (!init_success) {
483  LogCvmfs(kLogConcurrency, kLogWarning, "Worker was not initialized "
484  "properly... it will die now!");
485  return NULL;
486  }
487 
488  //
489  // PROCESSING LOOP
491 
492  // start the processing loop
493  LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Starting Worker...");
494  while (master->IsRunning()) {
495  // acquire a new job
496  WorkerJob job = master->Acquire();
497 
498  // check if we need to terminate
499  if (job.is_death_sentence)
500  break;
501 
502  // do what you are supposed to do
503  worker(job.data);
504  }
505 
506  //
507  // TEAR DOWN
509 
510  // give the worker the chance to tidy up
511  worker.TearDown();
512 
513  // good bye thread...
514  LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Terminating Worker...");
515  return NULL;
516 }
517 
518 
519 template <class WorkerT>
521  const RunBinding &binding = *(static_cast<RunBinding*>(run_binding));
522  ConcurrentWorkers<WorkerT> *master = binding.delegate;
523 
524  master->ReportStartedWorker();
525 
527  "Started dedicated callback worker");
528  master->RunCallbackThread();
529  LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Terminating Callback Worker...");
530 
531  return NULL;
532 }
533 
534 
535 template <class WorkerT>
537  while (IsRunning()) {
538  const CallbackJob callback_job = results_queue_.Dequeue();
539 
540  // stop callback processing if needed
541  if (callback_job.is_death_sentence) {
542  break;
543  }
544 
545  // notify all observers about the finished job
546  this->NotifyListeners(callback_job.data);
547 
548  // remove the job from the pending 'list' and add it to the ready 'list'
549  atomic_dec32(&jobs_pending_);
550  atomic_inc64(&jobs_processed_);
551 
552  // signal the Spooler that all jobs are done...
553  if (atomic_read32(&jobs_pending_) == 0) {
554  pthread_cond_broadcast(&jobs_all_done_);
555  }
556  }
557 }
558 
559 
560 template <class WorkerT>
562  MutexLockGuard lock(status_mutex_);
563  ++workers_started_;
564  pthread_cond_signal(&worker_started_);
565 }
566 
567 
568 template <class WorkerT>
570  // Note: This method can be called from arbitrary threads. Thus we do not
571  // necessarily have just one producer in the system.
572 
573  // check if it makes sense to schedule this job
574  if (!IsRunning() && !job.is_death_sentence) {
575  LogCvmfs(kLogConcurrency, kLogWarning, "Tried to schedule a job but "
576  "concurrency was not running...");
577  return;
578  }
579 
580  jobs_queue_.Enqueue(job);
581  if (!job.is_death_sentence) {
582  atomic_inc32(&jobs_pending_);
583  }
584 }
585 
586 
587 template <class WorkerT>
589  assert(!IsRunning());
590 
591  // make sure that the queue is empty before we schedule a death sentence
592  TruncateJobQueue();
593 
594  // schedule a death sentence for each running thread
595  const unsigned int number_of_workers = GetNumberOfWorkers();
596  for (unsigned int i = 0; i < number_of_workers; ++i) {
597  Schedule(WorkerJob());
598  }
599 
600  // schedule a death sentence for the callback thread
601  results_queue_.Enqueue(CallbackJob());
602 }
603 
604 
605 template <class WorkerT>
608 {
609  // Note: This method is exclusively called inside the worker threads!
610  // Any other usage might produce undefined behavior.
611  return jobs_queue_.Dequeue();
612 }
613 
614 
615 template <class WorkerT>
616 void ConcurrentWorkers<WorkerT>::TruncateJobQueue(const bool forget_pending) {
617  // Note: This method will throw away all jobs currently waiting in the job
618  // queue. These jobs will not be processed!
619  const unsigned int dropped_jobs = jobs_queue_.Drop();
620 
621  // if desired, we remove the jobs from the pending 'list'
622  if (forget_pending) {
623  atomic_xadd32(&jobs_pending_, -dropped_jobs);
624  }
625 }
626 
627 
628 template <class WorkerT>
630  // Note: this method causes workers to die immediately after they finished
631  // their last acquired job. To make sure that each worker will check
632  // the running state, we schedule empty jobs or Death Sentences.
633 
634  assert(IsRunning());
635 
636  // unset the running flag (causing threads to die on the next checkpoint)
637  StopRunning();
638 
639  // schedule empty jobs to make sure that each worker will actually reach the
640  // next checkpoint in their processing loop and terminate as expected
641  ScheduleDeathSentences();
642 
643  // wait for the worker threads to return
644  WorkerThreads::const_iterator i = worker_threads_.begin();
645  WorkerThreads::const_iterator iend = worker_threads_.end();
646  for (; i != iend; ++i) {
647  pthread_join(*i, NULL);
648  }
649 
650  // wait for the callback worker thread
651  pthread_join(callback_thread_, NULL);
652 
653  // check if we finished all pending jobs
654  const int pending = atomic_read32(&jobs_pending_);
655  if (pending > 0) {
656  LogCvmfs(kLogConcurrency, kLogWarning, "Job queue was not fully processed. "
657  "Still %d jobs were pending and "
658  "will not be executed anymore.",
659  pending);
660  }
661 
662  // check if we had failed jobs
663  const int failed = atomic_read32(&jobs_failed_);
664  if (failed > 0) {
665  LogCvmfs(kLogConcurrency, kLogWarning, "We've had %d failed jobs.",
666  failed);
667  }
668 
669  // thanks, and good bye...
671  "All workers stopped. They processed %d jobs. Terminating...",
672  atomic_read64(&jobs_processed_));
673 }
674 
675 
676 template <class WorkerT>
679  "Waiting for %d jobs to be finished", atomic_read32(&jobs_pending_));
680 
681  // wait until all pending jobs are processed
682  {
683  MutexLockGuard lock(jobs_all_done_mutex_);
684  while (atomic_read32(&jobs_pending_) > 0) {
685  pthread_cond_wait(&jobs_all_done_, &jobs_all_done_mutex_);
686  }
687  }
688 
689  LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Jobs are done... go on");
690 }
691 
692 
693 template <class WorkerT>
695  if (!IsRunning())
696  return;
697 
698  WaitForEmptyQueue();
699  Terminate();
700 }
701 
702 
703 template <class WorkerT>
706  const bool success) {
707  // BEWARE!
708  // This is a callback method that might be called from a different thread!
709 
710  // check if the finished job was successful
711  if (!success) {
712  atomic_inc32(&jobs_failed_);
713  LogCvmfs(kLogConcurrency, kLogWarning, "Job failed");
714  }
715 
716  // queue the result in the callback channel
717  results_queue_.Enqueue(CallbackJob(data));
718 }
719 
720 #ifdef CVMFS_NAMESPACE_GUARD
721 } // namespace CVMFS_NAMESPACE_GUARD
722 #endif
723 
724 #endif // CVMFS_UTIL_CONCURRENCY_IMPL_H_
#define LogCvmfs(source, mask,...)
Definition: logging.h:25
bool IsEmpty() const
void Enqueue(const T &data)
CallbackPtr RegisterListener(typename BoundClosure< ParamT, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data)
pthread_cond_t queue_is_not_empty_
Definition: concurrency.h:402
void Wait() const
WorkerT::returned_data returned_data_t
Definition: concurrency.h:436
void WaitForEmptyQueue() const
const T Dequeue()
const int kLogWarning
void SetValueUnprotected(const T new_value)
pthread_cond_t object_set_
Definition: concurrency.h:93
void NotifyListeners(const ParamT &parameter)
atomic_int32 jobs_pending_
Definition: concurrency.h:664
size_t GetMaximalItemCount() const
virtual ~ConcurrentWorkers()
assert((mem||(size==0))&&"Out Of Memory")
void TruncateJobQueue(const bool forget_pending=false)
static CallbackTN * MakeCallback(typename BoundCallback< ParamT, DelegateT >::CallbackMethod method, DelegateT *delegate)
Definition: async.h:222
void Set(const T &object)
WorkerT::worker_context worker_context_t
Definition: concurrency.h:440
void UnregisterListener(CallbackPtr callback_object)
Definition: async.h:45
void Schedule(const expected_data_t &data)
Definition: concurrency.h:518
static void * RunCallbackThreadWrapper(void *run_binding)
const worker_context_t * worker_context
Definition: concurrency.h:486
FifoChannel(const size_t maximal_length, const size_t drainout_threshold)
ConcurrentWorkers(const size_t number_of_workers, const size_t maximal_queue_length, worker_context_t *worker_context=NULL)
void ReportStartedWorker() const
atomic_int32 jobs_failed_
Definition: concurrency.h:665
size_t GetItemCount() const
const bool is_death_sentence
death sentence flag
Definition: concurrency.h:459
pthread_cond_t queue_is_not_full_
Definition: concurrency.h:403
pthread_mutex_t mutex_
Definition: concurrency.h:401
unsigned int Drop()
const DataT data
job payload
Definition: concurrency.h:458
pthread_mutex_t mutex_
Definition: concurrency.h:92
void UnregisterListeners()
ConcurrentWorkers< WorkerT > * delegate
Definition: concurrency.h:474
Definition: mutex.h:42
virtual ~FifoChannel()
atomic_int64 jobs_processed_
Definition: concurrency.h:666
virtual ~Future()
const int kLogVerboseMsg
virtual ~Observable()
static void * RunWorker(void *run_binding)
static void size_t size
Definition: smalloc.h:54
bool IsRunning() const
Definition: concurrency.h:634
static CallbackTN * MakeClosure(typename BoundClosure< ParamT, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, const ClosureDataT &closure_data)
Definition: async.h:204
void JobDone(const returned_data_t &data, const bool success=true)