CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
concurrency_impl.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_UTIL_CONCURRENCY_IMPL_H_
6 #define CVMFS_UTIL_CONCURRENCY_IMPL_H_
7 
8 #include "util/logging.h"
9 
10 #ifdef CVMFS_NAMESPACE_GUARD
11 namespace CVMFS_NAMESPACE_GUARD {
12 #endif
13 
14 //
15 // +----------------------------------------------------------------------------
16 // | SynchronizingCounter
17 //
18 
19 
20 template <typename T>
22  // make sure that 0 <= new_value <= maximal_value_ if maximal_value_ != 0
23  assert(!HasMaximalValue() ||
24  (new_value >= T(0) && new_value <= maximal_value_));
25 
26  value_ = new_value;
27 
28  if (value_ == T(0)) {
29  pthread_cond_broadcast(&became_zero_);
30  }
31 
32  if (HasMaximalValue() && value_ < maximal_value_) {
33  pthread_cond_broadcast(&free_slot_);
34  }
35 }
36 
37 
38 template <typename T>
40  while (HasMaximalValue() && value_ >= maximal_value_) {
41  pthread_cond_wait(&free_slot_, &mutex_);
42  }
43  assert(!HasMaximalValue() || value_ < maximal_value_);
44 }
45 
46 
47 template <typename T>
49  const bool init_successful = (
50  pthread_mutex_init(&mutex_, NULL) == 0 &&
51  pthread_cond_init(&became_zero_, NULL) == 0 &&
52  pthread_cond_init(&free_slot_, NULL) == 0);
53  assert(init_successful);
54 }
55 
56 
57 template <typename T>
59  pthread_mutex_destroy(&mutex_);
60  pthread_cond_destroy(&became_zero_);
61  pthread_cond_destroy(&free_slot_);
62 }
63 
64 
65 //
66 // +----------------------------------------------------------------------------
67 // | Observable
68 //
69 
70 
71 template <typename ParamT>
73  const int ret = pthread_rwlock_init(&listeners_rw_lock_, NULL);
74  assert(ret == 0);
75 }
76 
77 
78 template <typename ParamT>
80  UnregisterListeners();
81  pthread_rwlock_destroy(&listeners_rw_lock_);
82 }
83 
84 
85 template <typename ParamT>
86 template <class DelegateT, class ClosureDataT>
88  typename BoundClosure<ParamT,
89  DelegateT,
90  ClosureDataT>::CallbackMethod method,
91  DelegateT *delegate,
92  ClosureDataT data) {
93  // create a new BoundClosure, register it and return the handle
94  CallbackBase<ParamT> *callback =
95  Observable<ParamT>::MakeClosure(method, delegate, data);
96  RegisterListener(callback);
97  return callback;
98 }
99 
100 
101 template <typename ParamT>
102 template <class DelegateT>
105  DelegateT *delegate) {
106  // create a new BoundCallback, register it and return the handle
107  CallbackBase<ParamT> *callback =
108  Observable<ParamT>::MakeCallback(method, delegate);
109  RegisterListener(callback);
110  return callback;
111 }
112 
113 
114 template <typename ParamT>
117  // create a new Callback, register it and return the handle
118  CallbackBase<ParamT> *callback =
120  RegisterListener(callback);
121  return callback;
122 }
123 
124 
125 template <typename ParamT>
127  Observable<ParamT>::CallbackPtr callback_object) {
128  // register a generic CallbackBase callback
129  WriteLockGuard guard(listeners_rw_lock_);
130  listeners_.insert(callback_object);
131 }
132 
133 
134 template <typename ParamT>
136  typename Observable<ParamT>::CallbackPtr callback_object) {
137  // remove a callback handle from the callbacks list
138  // if it is not registered --> crash
139  WriteLockGuard guard(listeners_rw_lock_);
140  const size_t was_removed = listeners_.erase(callback_object);
141  assert(was_removed > 0);
142  delete callback_object;
143 }
144 
145 
146 template <typename ParamT>
148  WriteLockGuard guard(listeners_rw_lock_);
149 
150  // remove all callbacks from the list
151  typename Callbacks::const_iterator i = listeners_.begin();
152  typename Callbacks::const_iterator iend = listeners_.end();
153  for (; i != iend; ++i) {
154  delete *i;
155  }
156  listeners_.clear();
157 }
158 
159 
160 template <typename ParamT>
161 void Observable<ParamT>::NotifyListeners(const ParamT &parameter) {
162  ReadLockGuard guard(listeners_rw_lock_);
163 
164  // invoke all callbacks and inform them about new data
165  typename Callbacks::const_iterator i = listeners_.begin();
166  typename Callbacks::const_iterator iend = listeners_.end();
167  for (; i != iend; ++i) {
168  (**i)(parameter);
169  }
170 }
171 
172 
173 //
174 // +----------------------------------------------------------------------------
175 // | FifoChannel
176 //
177 
178 
179 template <class T>
180 FifoChannel<T>::FifoChannel(const size_t maximal_length,
181  const size_t drainout_threshold) :
182  maximal_queue_length_(maximal_length),
183  queue_drainout_threshold_(drainout_threshold)
184 {
185  assert(drainout_threshold <= maximal_length);
186  assert(drainout_threshold > 0);
187 
188  const bool successful = (
189  pthread_mutex_init(&mutex_, NULL) == 0 &&
190  pthread_cond_init(&queue_is_not_empty_, NULL) == 0 &&
191  pthread_cond_init(&queue_is_not_full_, NULL) == 0);
192 
193  assert(successful);
194 }
195 
196 
197 template <class T>
199  pthread_cond_destroy(&queue_is_not_empty_);
200  pthread_cond_destroy(&queue_is_not_full_);
201  pthread_mutex_destroy(&mutex_);
202 }
203 
204 
205 template <class T>
206 void FifoChannel<T>::Enqueue(const T &data) {
207  MutexLockGuard lock(mutex_);
208 
209  // wait for space in the queue
210  while (this->size() >= maximal_queue_length_) {
211  pthread_cond_wait(&queue_is_not_full_, &mutex_);
212  }
213 
214  // put something into the queue
215  this->push(data);
216 
217  // wake all waiting threads
218  pthread_cond_broadcast(&queue_is_not_empty_);
219 }
220 
221 
222 template <class T>
224  MutexLockGuard lock(mutex_);
225 
226  // wait until there is something to do
227  while (this->empty()) {
228  pthread_cond_wait(&queue_is_not_empty_, &mutex_);
229  }
230 
231  // get the item from the queue
232  T data = this->front(); this->pop();
233 
234  // signal waiting threads about the free space
235  if (this->size() < queue_drainout_threshold_) {
236  pthread_cond_broadcast(&queue_is_not_full_);
237  }
238 
239  // return the acquired job
240  return data;
241 }
242 
243 
244 template <class T>
245 unsigned int FifoChannel<T>::Drop() {
246  MutexLockGuard lock(mutex_);
247 
248  unsigned int dropped_items = 0;
249  while (!this->empty()) {
250  this->pop();
251  ++dropped_items;
252  }
253 
254  pthread_cond_broadcast(&queue_is_not_full_);
255 
256  return dropped_items;
257 }
258 
259 
260 template <class T>
262  MutexLockGuard lock(mutex_);
263  return this->size();
264 }
265 
266 
267 template <class T>
269  MutexLockGuard lock(mutex_);
270  return this->empty();
271 }
272 
273 
274 template <class T>
276  return maximal_queue_length_;
277 }
278 
279 
280 //
281 // +----------------------------------------------------------------------------
282 // | ConcurrentWorkers
283 //
284 
285 
286 template <class WorkerT>
288  const size_t number_of_workers,
289  const size_t maximal_queue_length,
291  number_of_workers_(number_of_workers),
292  worker_context_(worker_context),
293  thread_context_(this, worker_context_),
294  initialized_(false),
295  running_(false),
296  workers_started_(0),
297  jobs_queue_(maximal_queue_length, maximal_queue_length / 4 + 1),
298  results_queue_(maximal_queue_length, 1)
299 {
300  assert(maximal_queue_length >= number_of_workers);
301  assert(number_of_workers > 0);
302 
303  atomic_init32(&jobs_pending_);
304  atomic_init32(&jobs_failed_);
305  atomic_init64(&jobs_processed_);
306 }
307 
308 
309 template <class WorkerT>
311  if (IsRunning()) {
312  Terminate();
313  }
314 
315  // destroy some synchronisation data structures
316  pthread_cond_destroy(&worker_started_);
317  pthread_cond_destroy(&jobs_all_done_);
318  pthread_mutex_destroy(&status_mutex_);
319  pthread_mutex_destroy(&jobs_all_done_mutex_);
320 }
321 
322 
323 template <class WorkerT>
325  LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Initializing ConcurrentWorker "
326  "object with %lu worker threads "
327  "and a queue length of %zu",
328  number_of_workers_, jobs_queue_.GetMaximalItemCount());
329  // LogCvmfs(kLogConcurrency, kLogStdout, "sizeof(expected_data_t): %d\n"
330  // "sizeof(returned_data_t): %d",
331  // sizeof(expected_data_t), sizeof(returned_data_t));
332 
333  // initialize synchronisation for job queue (Workers)
334  if (pthread_mutex_init(&status_mutex_, NULL) != 0 ||
335  pthread_mutex_init(&jobs_all_done_mutex_, NULL) != 0 ||
336  pthread_cond_init(&worker_started_, NULL) != 0 ||
337  pthread_cond_init(&jobs_all_done_, NULL) != 0) {
338  return false;
339  }
340 
341  // spawn the Worker objects in their own threads
342  if (!SpawnWorkers()) {
343  LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn workers");
344  return false;
345  }
346 
347  // all done...
348  initialized_ = true;
349  return true;
350 }
351 
352 
353 template <class WorkerT>
355  assert(worker_threads_.size() == 0);
356  worker_threads_.resize(number_of_workers_);
357 
358  // set the running flag to trap workers in their treadmills
359  StartRunning();
360 
361  // spawn the swarm and make them work
362  bool success = true;
363  WorkerThreads::iterator i = worker_threads_.begin();
364  WorkerThreads::const_iterator iend = worker_threads_.end();
365  for (; i != iend; ++i) {
366  pthread_t* thread = &(*i);
367  const int retval = pthread_create(
368  thread,
369  NULL,
371  reinterpret_cast<void *>(&thread_context_));
372  if (retval != 0) {
373  LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn a Worker");
374  success = false;
375  }
376  }
377 
378  // spawn the callback processing thread
379  const int retval =
380  pthread_create(
381  &callback_thread_,
382  NULL,
384  reinterpret_cast<void *>(&thread_context_));
385  if (retval != 0) {
386  LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn the callback "
387  "worker thread");
388  success = false;
389  }
390 
391  // wait for all workers to report in...
392  {
393  MutexLockGuard guard(status_mutex_);
394  // +1 -> callback thread
395  while (workers_started_ < number_of_workers_ + 1) {
396  pthread_cond_wait(&worker_started_, &status_mutex_);
397  }
398  }
399 
400  // all done...
401  return success;
402 }
403 
404 
405 template <class WorkerT>
406 void* ConcurrentWorkers<WorkerT>::RunWorker(void *run_binding) {
407  // NOTE: This is the actual worker thread code!
408 
409  //
410  // INITIALIZATION
412 
413  // get contextual information
414  const WorkerRunBinding &binding =
415  *(static_cast<WorkerRunBinding*>(run_binding));
416  ConcurrentWorkers<WorkerT> *master = binding.delegate;
417  const worker_context_t *worker_context = binding.worker_context;
418 
419  // boot up the worker object and make sure it works
420  WorkerT worker(worker_context);
421  worker.RegisterMaster(master);
422  const bool init_success = worker.Initialize();
423 
424  // tell the master that this worker was started
425  master->ReportStartedWorker();
426 
427  if (!init_success) {
428  LogCvmfs(kLogConcurrency, kLogWarning, "Worker was not initialized "
429  "properly... it will die now!");
430  return NULL;
431  }
432 
433  //
434  // PROCESSING LOOP
436 
437  // start the processing loop
438  LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Starting Worker...");
439  while (master->IsRunning()) {
440  // acquire a new job
441  WorkerJob job = master->Acquire();
442 
443  // check if we need to terminate
444  if (job.is_death_sentence)
445  break;
446 
447  // do what you are supposed to do
448  worker(job.data);
449  }
450 
451  //
452  // TEAR DOWN
454 
455  // give the worker the chance to tidy up
456  worker.TearDown();
457 
458  // good bye thread...
459  LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Terminating Worker...");
460  return NULL;
461 }
462 
463 
464 template <class WorkerT>
466  const RunBinding &binding = *(static_cast<RunBinding*>(run_binding));
467  ConcurrentWorkers<WorkerT> *master = binding.delegate;
468 
469  master->ReportStartedWorker();
470 
472  "Started dedicated callback worker");
473  master->RunCallbackThread();
474  LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Terminating Callback Worker...");
475 
476  return NULL;
477 }
478 
479 
480 template <class WorkerT>
482  while (IsRunning()) {
483  const CallbackJob callback_job = results_queue_.Dequeue();
484 
485  // stop callback processing if needed
486  if (callback_job.is_death_sentence) {
487  break;
488  }
489 
490  // notify all observers about the finished job
491  this->NotifyListeners(callback_job.data);
492 
493  // remove the job from the pending 'list' and add it to the ready 'list'
494  atomic_dec32(&jobs_pending_);
495  atomic_inc64(&jobs_processed_);
496 
497  // signal the Spooler that all jobs are done...
498  if (atomic_read32(&jobs_pending_) == 0) {
499  pthread_cond_broadcast(&jobs_all_done_);
500  }
501  }
502 }
503 
504 
505 template <class WorkerT>
507  MutexLockGuard lock(status_mutex_);
508  ++workers_started_;
509  pthread_cond_signal(&worker_started_);
510 }
511 
512 
513 template <class WorkerT>
515  // Note: This method can be called from arbitrary threads. Thus we do not
516  // necessarily have just one producer in the system.
517 
518  // check if it makes sense to schedule this job
519  if (!IsRunning() && !job.is_death_sentence) {
520  LogCvmfs(kLogConcurrency, kLogWarning, "Tried to schedule a job but "
521  "concurrency was not running...");
522  return;
523  }
524 
525  jobs_queue_.Enqueue(job);
526  if (!job.is_death_sentence) {
527  atomic_inc32(&jobs_pending_);
528  }
529 }
530 
531 
532 template <class WorkerT>
534  assert(!IsRunning());
535 
536  // make sure that the queue is empty before we schedule a death sentence
537  TruncateJobQueue();
538 
539  // schedule a death sentence for each running thread
540  const unsigned int number_of_workers = GetNumberOfWorkers();
541  for (unsigned int i = 0; i < number_of_workers; ++i) {
542  Schedule(WorkerJob());
543  }
544 
545  // schedule a death sentence for the callback thread
546  results_queue_.Enqueue(CallbackJob());
547 }
548 
549 
550 template <class WorkerT>
553 {
554  // Note: This method is exclusively called inside the worker threads!
555  // Any other usage might produce undefined behavior.
556  return jobs_queue_.Dequeue();
557 }
558 
559 
560 template <class WorkerT>
561 void ConcurrentWorkers<WorkerT>::TruncateJobQueue(const bool forget_pending) {
562  // Note: This method will throw away all jobs currently waiting in the job
563  // queue. These jobs will not be processed!
564  const unsigned int dropped_jobs = jobs_queue_.Drop();
565 
566  // if desired, we remove the jobs from the pending 'list'
567  if (forget_pending) {
568  atomic_xadd32(&jobs_pending_, -dropped_jobs);
569  }
570 }
571 
572 
573 template <class WorkerT>
575  // Note: this method causes workers to die immediately after they finished
576  // their last acquired job. To make sure that each worker will check
577  // the running state, we schedule empty jobs or Death Sentences.
578 
579  assert(IsRunning());
580 
581  // unset the running flag (causing threads to die on the next checkpoint)
582  StopRunning();
583 
584  // schedule empty jobs to make sure that each worker will actually reach the
585  // next checkpoint in their processing loop and terminate as expected
586  ScheduleDeathSentences();
587 
588  // wait for the worker threads to return
589  WorkerThreads::const_iterator i = worker_threads_.begin();
590  WorkerThreads::const_iterator iend = worker_threads_.end();
591  for (; i != iend; ++i) {
592  pthread_join(*i, NULL);
593  }
594 
595  // wait for the callback worker thread
596  pthread_join(callback_thread_, NULL);
597 
598  // check if we finished all pending jobs
599  const int pending = atomic_read32(&jobs_pending_);
600  if (pending > 0) {
601  LogCvmfs(kLogConcurrency, kLogWarning, "Job queue was not fully processed. "
602  "Still %d jobs were pending and "
603  "will not be executed anymore.",
604  pending);
605  }
606 
607  // check if we had failed jobs
608  const int failed = atomic_read32(&jobs_failed_);
609  if (failed > 0) {
610  LogCvmfs(kLogConcurrency, kLogWarning, "We've had %d failed jobs.",
611  failed);
612  }
613 
614  // thanks, and good bye...
616  "All workers stopped. They processed %ld jobs. Terminating...",
617  atomic_read64(&jobs_processed_));
618 }
619 
620 
621 template <class WorkerT>
624  "Waiting for %d jobs to be finished", atomic_read32(&jobs_pending_));
625 
626  // wait until all pending jobs are processed
627  {
628  MutexLockGuard lock(jobs_all_done_mutex_);
629  while (atomic_read32(&jobs_pending_) > 0) {
630  pthread_cond_wait(&jobs_all_done_, &jobs_all_done_mutex_);
631  }
632  }
633 
634  LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Jobs are done... go on");
635 }
636 
637 
638 template <class WorkerT>
640  if (!IsRunning())
641  return;
642 
643  WaitForEmptyQueue();
644  Terminate();
645 }
646 
647 
648 template <class WorkerT>
651  const bool success) {
652  // BEWARE!
653  // This is a callback method that might be called from a different thread!
654 
655  // check if the finished job was successful
656  if (!success) {
657  atomic_inc32(&jobs_failed_);
658  LogCvmfs(kLogConcurrency, kLogWarning, "Job failed");
659  }
660 
661  // queue the result in the callback channel
662  results_queue_.Enqueue(CallbackJob(data));
663 }
664 
665 #ifdef CVMFS_NAMESPACE_GUARD
666 } // namespace CVMFS_NAMESPACE_GUARD
667 #endif
668 
669 #endif // CVMFS_UTIL_CONCURRENCY_IMPL_H_
bool IsEmpty() const
void Enqueue(const T &data)
CallbackPtr RegisterListener(typename BoundClosure< ParamT, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data)
pthread_cond_t queue_is_not_empty_
Definition: concurrency.h:353
WorkerT::returned_data returned_data_t
Definition: concurrency.h:387
void WaitForEmptyQueue() const
const T Dequeue()
const int kLogWarning
void SetValueUnprotected(const T new_value)
void NotifyListeners(const ParamT &parameter)
atomic_int32 jobs_pending_
Definition: concurrency.h:615
size_t GetMaximalItemCount() const
virtual ~ConcurrentWorkers()
assert((mem||(size==0))&&"Out Of Memory")
void TruncateJobQueue(const bool forget_pending=false)
static CallbackTN * MakeCallback(typename BoundCallback< ParamT, DelegateT >::CallbackMethod method, DelegateT *delegate)
Definition: async.h:222
WorkerT::worker_context worker_context_t
Definition: concurrency.h:391
void UnregisterListener(CallbackPtr callback_object)
Definition: async.h:45
void Schedule(const expected_data_t &data)
Definition: concurrency.h:469
static void * RunCallbackThreadWrapper(void *run_binding)
const worker_context_t * worker_context
Definition: concurrency.h:437
FifoChannel(const size_t maximal_length, const size_t drainout_threshold)
ConcurrentWorkers(const size_t number_of_workers, const size_t maximal_queue_length, worker_context_t *worker_context=NULL)
void ReportStartedWorker() const
atomic_int32 jobs_failed_
Definition: concurrency.h:616
size_t GetItemCount() const
const bool is_death_sentence
death sentence flag
Definition: concurrency.h:410
pthread_cond_t queue_is_not_full_
Definition: concurrency.h:354
pthread_mutex_t mutex_
Definition: concurrency.h:352
unsigned int Drop()
const DataT data
job payload
Definition: concurrency.h:409
void UnregisterListeners()
ConcurrentWorkers< WorkerT > * delegate
Definition: concurrency.h:425
Definition: mutex.h:42
virtual ~FifoChannel()
atomic_int64 jobs_processed_
Definition: concurrency.h:617
const int kLogVerboseMsg
virtual ~Observable()
static void * RunWorker(void *run_binding)
static void size_t size
Definition: smalloc.h:54
bool IsRunning() const
Definition: concurrency.h:585
static CallbackTN * MakeClosure(typename BoundClosure< ParamT, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, const ClosureDataT &closure_data)
Definition: async.h:204
void JobDone(const returned_data_t &data, const bool success=true)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528