CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
concurrency_impl.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_UTIL_CONCURRENCY_IMPL_H_
6 #define CVMFS_UTIL_CONCURRENCY_IMPL_H_
7 
8 #include "util/logging.h"
9 
10 #ifdef CVMFS_NAMESPACE_GUARD
11 namespace CVMFS_NAMESPACE_GUARD {
12 #endif
13 
14 //
15 // +----------------------------------------------------------------------------
16 // | SynchronizingCounter
17 //
18 
19 
20 template<typename T>
22  // make sure that 0 <= new_value <= maximal_value_ if maximal_value_ != 0
23  assert(!HasMaximalValue()
24  || (new_value >= T(0) && new_value <= maximal_value_));
25 
26  value_ = new_value;
27 
28  if (value_ == T(0)) {
29  pthread_cond_broadcast(&became_zero_);
30  }
31 
32  if (HasMaximalValue() && value_ < maximal_value_) {
33  pthread_cond_broadcast(&free_slot_);
34  }
35 }
36 
37 
38 template<typename T>
40  while (HasMaximalValue() && value_ >= maximal_value_) {
41  pthread_cond_wait(&free_slot_, &mutex_);
42  }
43  assert(!HasMaximalValue() || value_ < maximal_value_);
44 }
45 
46 
47 template<typename T>
49  const bool init_successful = (pthread_mutex_init(&mutex_, NULL) == 0
50  && pthread_cond_init(&became_zero_, NULL) == 0
51  && pthread_cond_init(&free_slot_, NULL) == 0);
52  assert(init_successful);
53 }
54 
55 
56 template<typename T>
58  pthread_mutex_destroy(&mutex_);
59  pthread_cond_destroy(&became_zero_);
60  pthread_cond_destroy(&free_slot_);
61 }
62 
63 
64 //
65 // +----------------------------------------------------------------------------
66 // | Observable
67 //
68 
69 
70 template<typename ParamT>
72  const int ret = pthread_rwlock_init(&listeners_rw_lock_, NULL);
73  assert(ret == 0);
74 }
75 
76 
77 template<typename ParamT>
79  UnregisterListeners();
80  pthread_rwlock_destroy(&listeners_rw_lock_);
81 }
82 
83 
84 template<typename ParamT>
85 template<class DelegateT, class ClosureDataT>
88  method,
89  DelegateT *delegate,
90  ClosureDataT data) {
91  // create a new BoundClosure, register it and return the handle
93  method, delegate, data);
94  RegisterListener(callback);
95  return callback;
96 }
97 
98 
99 template<typename ParamT>
100 template<class DelegateT>
103  DelegateT *delegate) {
104  // create a new BoundCallback, register it and return the handle
106  delegate);
107  RegisterListener(callback);
108  return callback;
109 }
110 
111 
112 template<typename ParamT>
115  // create a new Callback, register it and return the handle
117  RegisterListener(callback);
118  return callback;
119 }
120 
121 
122 template<typename ParamT>
124  Observable<ParamT>::CallbackPtr callback_object) {
125  // register a generic CallbackBase callback
126  WriteLockGuard guard(listeners_rw_lock_);
127  listeners_.insert(callback_object);
128 }
129 
130 
131 template<typename ParamT>
133  typename Observable<ParamT>::CallbackPtr callback_object) {
134  // remove a callback handle from the callbacks list
135  // if it is not registered --> crash
136  WriteLockGuard guard(listeners_rw_lock_);
137  const size_t was_removed = listeners_.erase(callback_object);
138  assert(was_removed > 0);
139  delete callback_object;
140 }
141 
142 
143 template<typename ParamT>
145  WriteLockGuard guard(listeners_rw_lock_);
146 
147  // remove all callbacks from the list
148  typename Callbacks::const_iterator i = listeners_.begin();
149  typename Callbacks::const_iterator iend = listeners_.end();
150  for (; i != iend; ++i) {
151  delete *i;
152  }
153  listeners_.clear();
154 }
155 
156 
157 template<typename ParamT>
158 void Observable<ParamT>::NotifyListeners(const ParamT &parameter) {
159  ReadLockGuard guard(listeners_rw_lock_);
160 
161  // invoke all callbacks and inform them about new data
162  typename Callbacks::const_iterator i = listeners_.begin();
163  typename Callbacks::const_iterator iend = listeners_.end();
164  for (; i != iend; ++i) {
165  (**i)(parameter);
166  }
167 }
168 
169 
170 //
171 // +----------------------------------------------------------------------------
172 // | FifoChannel
173 //
174 
175 
176 template<class T>
177 FifoChannel<T>::FifoChannel(const size_t maximal_length,
178  const size_t drainout_threshold)
179  : maximal_queue_length_(maximal_length)
180  , queue_drainout_threshold_(drainout_threshold) {
181  assert(drainout_threshold <= maximal_length);
182  assert(drainout_threshold > 0);
183 
184  const bool successful = (pthread_mutex_init(&mutex_, NULL) == 0
185  && pthread_cond_init(&queue_is_not_empty_, NULL) == 0
186  && pthread_cond_init(&queue_is_not_full_, NULL)
187  == 0);
188 
189  assert(successful);
190 }
191 
192 
193 template<class T>
195  pthread_cond_destroy(&queue_is_not_empty_);
196  pthread_cond_destroy(&queue_is_not_full_);
197  pthread_mutex_destroy(&mutex_);
198 }
199 
200 
201 template<class T>
202 void FifoChannel<T>::Enqueue(const T &data) {
203  MutexLockGuard lock(mutex_);
204 
205  // wait for space in the queue
206  while (this->size() >= maximal_queue_length_) {
207  pthread_cond_wait(&queue_is_not_full_, &mutex_);
208  }
209 
210  // put something into the queue
211  this->push(data);
212 
213  // wake all waiting threads
214  pthread_cond_broadcast(&queue_is_not_empty_);
215 }
216 
217 
218 template<class T>
220  MutexLockGuard lock(mutex_);
221 
222  // wait until there is something to do
223  while (this->empty()) {
224  pthread_cond_wait(&queue_is_not_empty_, &mutex_);
225  }
226 
227  // get the item from the queue
228  T data = this->front();
229  this->pop();
230 
231  // signal waiting threads about the free space
232  if (this->size() < queue_drainout_threshold_) {
233  pthread_cond_broadcast(&queue_is_not_full_);
234  }
235 
236  // return the acquired job
237  return data;
238 }
239 
240 
241 template<class T>
242 unsigned int FifoChannel<T>::Drop() {
243  MutexLockGuard lock(mutex_);
244 
245  unsigned int dropped_items = 0;
246  while (!this->empty()) {
247  this->pop();
248  ++dropped_items;
249  }
250 
251  pthread_cond_broadcast(&queue_is_not_full_);
252 
253  return dropped_items;
254 }
255 
256 
257 template<class T>
259  MutexLockGuard lock(mutex_);
260  return this->size();
261 }
262 
263 
264 template<class T>
266  MutexLockGuard lock(mutex_);
267  return this->empty();
268 }
269 
270 
271 template<class T>
273  return maximal_queue_length_;
274 }
275 
276 
277 //
278 // +----------------------------------------------------------------------------
279 // | ConcurrentWorkers
280 //
281 
282 
283 template<class WorkerT>
285  const size_t number_of_workers,
286  const size_t maximal_queue_length,
288  : number_of_workers_(number_of_workers)
289  , worker_context_(worker_context)
290  , thread_context_(this, worker_context_)
291  , initialized_(false)
292  , running_(false)
293  , workers_started_(0)
294  , jobs_queue_(maximal_queue_length, maximal_queue_length / 4 + 1)
295  , results_queue_(maximal_queue_length, 1) {
296  assert(maximal_queue_length >= number_of_workers);
297  assert(number_of_workers > 0);
298 
299  atomic_init32(&jobs_pending_);
300  atomic_init32(&jobs_failed_);
301  atomic_init64(&jobs_processed_);
302 }
303 
304 
305 template<class WorkerT>
307  if (IsRunning()) {
308  Terminate();
309  }
310 
311  // destroy some synchronisation data structures
312  pthread_cond_destroy(&worker_started_);
313  pthread_cond_destroy(&jobs_all_done_);
314  pthread_mutex_destroy(&status_mutex_);
315  pthread_mutex_destroy(&jobs_all_done_mutex_);
316 }
317 
318 
319 template<class WorkerT>
322  "Initializing ConcurrentWorker "
323  "object with %lu worker threads "
324  "and a queue length of %zu",
325  number_of_workers_, jobs_queue_.GetMaximalItemCount());
326  // LogCvmfs(kLogConcurrency, kLogStdout, "sizeof(expected_data_t): %d\n"
327  // "sizeof(returned_data_t): %d",
328  // sizeof(expected_data_t), sizeof(returned_data_t));
329 
330  // initialize synchronisation for job queue (Workers)
331  if (pthread_mutex_init(&status_mutex_, NULL) != 0
332  || pthread_mutex_init(&jobs_all_done_mutex_, NULL) != 0
333  || pthread_cond_init(&worker_started_, NULL) != 0
334  || pthread_cond_init(&jobs_all_done_, NULL) != 0) {
335  return false;
336  }
337 
338  // spawn the Worker objects in their own threads
339  if (!SpawnWorkers()) {
340  LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn workers");
341  return false;
342  }
343 
344  // all done...
345  initialized_ = true;
346  return true;
347 }
348 
349 
350 template<class WorkerT>
352  assert(worker_threads_.size() == 0);
353  worker_threads_.resize(number_of_workers_);
354 
355  // set the running flag to trap workers in their treadmills
356  StartRunning();
357 
358  // spawn the swarm and make them work
359  bool success = true;
360  WorkerThreads::iterator i = worker_threads_.begin();
361  WorkerThreads::const_iterator iend = worker_threads_.end();
362  for (; i != iend; ++i) {
363  pthread_t *thread = &(*i);
364  const int retval = pthread_create(
365  thread,
366  NULL,
368  reinterpret_cast<void *>(&thread_context_));
369  if (retval != 0) {
370  LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn a Worker");
371  success = false;
372  }
373  }
374 
375  // spawn the callback processing thread
376  const int retval = pthread_create(
377  &callback_thread_,
378  NULL,
380  reinterpret_cast<void *>(&thread_context_));
381  if (retval != 0) {
383  "Failed to spawn the callback "
384  "worker thread");
385  success = false;
386  }
387 
388  // wait for all workers to report in...
389  {
390  MutexLockGuard guard(status_mutex_);
391  // +1 -> callback thread
392  while (workers_started_ < number_of_workers_ + 1) {
393  pthread_cond_wait(&worker_started_, &status_mutex_);
394  }
395  }
396 
397  // all done...
398  return success;
399 }
400 
401 
402 template<class WorkerT>
403 void *ConcurrentWorkers<WorkerT>::RunWorker(void *run_binding) {
404  // NOTE: This is the actual worker thread code!
405 
406  //
407  // INITIALIZATION
409 
410  // get contextual information
411  const WorkerRunBinding &binding = *(
412  static_cast<WorkerRunBinding *>(run_binding));
413  ConcurrentWorkers<WorkerT> *master = binding.delegate;
414  const worker_context_t *worker_context = binding.worker_context;
415 
416  // boot up the worker object and make sure it works
417  WorkerT worker(worker_context);
418  worker.RegisterMaster(master);
419  const bool init_success = worker.Initialize();
420 
421  // tell the master that this worker was started
422  master->ReportStartedWorker();
423 
424  if (!init_success) {
426  "Worker was not initialized "
427  "properly... it will die now!");
428  return NULL;
429  }
430 
431  //
432  // PROCESSING LOOP
434 
435  // start the processing loop
436  LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Starting Worker...");
437  while (master->IsRunning()) {
438  // acquire a new job
439  WorkerJob job = master->Acquire();
440 
441  // check if we need to terminate
442  if (job.is_death_sentence)
443  break;
444 
445  // do what you are supposed to do
446  worker(job.data);
447  }
448 
449  //
450  // TEAR DOWN
452 
453  // give the worker the chance to tidy up
454  worker.TearDown();
455 
456  // good bye thread...
457  LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Terminating Worker...");
458  return NULL;
459 }
460 
461 
462 template<class WorkerT>
464  const RunBinding &binding = *(static_cast<RunBinding *>(run_binding));
465  ConcurrentWorkers<WorkerT> *master = binding.delegate;
466 
467  master->ReportStartedWorker();
468 
470  "Started dedicated callback worker");
471  master->RunCallbackThread();
472  LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Terminating Callback Worker...");
473 
474  return NULL;
475 }
476 
477 
478 template<class WorkerT>
480  while (IsRunning()) {
481  const CallbackJob callback_job = results_queue_.Dequeue();
482 
483  // stop callback processing if needed
484  if (callback_job.is_death_sentence) {
485  break;
486  }
487 
488  // notify all observers about the finished job
489  this->NotifyListeners(callback_job.data);
490 
491  // remove the job from the pending 'list' and add it to the ready 'list'
492  atomic_dec32(&jobs_pending_);
493  atomic_inc64(&jobs_processed_);
494 
495  // signal the Spooler that all jobs are done...
496  if (atomic_read32(&jobs_pending_) == 0) {
497  pthread_cond_broadcast(&jobs_all_done_);
498  }
499  }
500 }
501 
502 
503 template<class WorkerT>
505  MutexLockGuard lock(status_mutex_);
506  ++workers_started_;
507  pthread_cond_signal(&worker_started_);
508 }
509 
510 
511 template<class WorkerT>
513  // Note: This method can be called from arbitrary threads. Thus we do not
514  // necessarily have just one producer in the system.
515 
516  // check if it makes sense to schedule this job
517  if (!IsRunning() && !job.is_death_sentence) {
519  "Tried to schedule a job but "
520  "concurrency was not running...");
521  return;
522  }
523 
524  jobs_queue_.Enqueue(job);
525  if (!job.is_death_sentence) {
526  atomic_inc32(&jobs_pending_);
527  }
528 }
529 
530 
531 template<class WorkerT>
533  assert(!IsRunning());
534 
535  // make sure that the queue is empty before we schedule a death sentence
536  TruncateJobQueue();
537 
538  // schedule a death sentence for each running thread
539  const unsigned int number_of_workers = GetNumberOfWorkers();
540  for (unsigned int i = 0; i < number_of_workers; ++i) {
541  Schedule(WorkerJob());
542  }
543 
544  // schedule a death sentence for the callback thread
545  results_queue_.Enqueue(CallbackJob());
546 }
547 
548 
549 template<class WorkerT>
552  // Note: This method is exclusively called inside the worker threads!
553  // Any other usage might produce undefined behavior.
554  return jobs_queue_.Dequeue();
555 }
556 
557 
558 template<class WorkerT>
559 void ConcurrentWorkers<WorkerT>::TruncateJobQueue(const bool forget_pending) {
560  // Note: This method will throw away all jobs currently waiting in the job
561  // queue. These jobs will not be processed!
562  const unsigned int dropped_jobs = jobs_queue_.Drop();
563 
564  // if desired, we remove the jobs from the pending 'list'
565  if (forget_pending) {
566  atomic_xadd32(&jobs_pending_, -dropped_jobs);
567  }
568 }
569 
570 
571 template<class WorkerT>
573  // Note: this method causes workers to die immediately after they finished
574  // their last acquired job. To make sure that each worker will check
575  // the running state, we schedule empty jobs or Death Sentences.
576 
577  assert(IsRunning());
578 
579  // unset the running flag (causing threads to die on the next checkpoint)
580  StopRunning();
581 
582  // schedule empty jobs to make sure that each worker will actually reach the
583  // next checkpoint in their processing loop and terminate as expected
584  ScheduleDeathSentences();
585 
586  // wait for the worker threads to return
587  WorkerThreads::const_iterator i = worker_threads_.begin();
588  WorkerThreads::const_iterator iend = worker_threads_.end();
589  for (; i != iend; ++i) {
590  pthread_join(*i, NULL);
591  }
592 
593  // wait for the callback worker thread
594  pthread_join(callback_thread_, NULL);
595 
596  // check if we finished all pending jobs
597  const int pending = atomic_read32(&jobs_pending_);
598  if (pending > 0) {
600  "Job queue was not fully processed. "
601  "Still %d jobs were pending and "
602  "will not be executed anymore.",
603  pending);
604  }
605 
606  // check if we had failed jobs
607  const int failed = atomic_read32(&jobs_failed_);
608  if (failed > 0) {
609  LogCvmfs(kLogConcurrency, kLogWarning, "We've had %d failed jobs.", failed);
610  }
611 
612  // thanks, and good bye...
614  "All workers stopped. They processed %ld jobs. Terminating...",
615  atomic_read64(&jobs_processed_));
616 }
617 
618 
619 template<class WorkerT>
622  "Waiting for %d jobs to be finished", atomic_read32(&jobs_pending_));
623 
624  // wait until all pending jobs are processed
625  {
626  MutexLockGuard lock(jobs_all_done_mutex_);
627  while (atomic_read32(&jobs_pending_) > 0) {
628  pthread_cond_wait(&jobs_all_done_, &jobs_all_done_mutex_);
629  }
630  }
631 
632  LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Jobs are done... go on");
633 }
634 
635 
636 template<class WorkerT>
638  if (!IsRunning())
639  return;
640 
641  WaitForEmptyQueue();
642  Terminate();
643 }
644 
645 
646 template<class WorkerT>
649  const bool success) {
650  // BEWARE!
651  // This is a callback method that might be called from a different thread!
652 
653  // check if the finished job was successful
654  if (!success) {
655  atomic_inc32(&jobs_failed_);
656  LogCvmfs(kLogConcurrency, kLogWarning, "Job failed");
657  }
658 
659  // queue the result in the callback channel
660  results_queue_.Enqueue(CallbackJob(data));
661 }
662 
663 #ifdef CVMFS_NAMESPACE_GUARD
664 } // namespace CVMFS_NAMESPACE_GUARD
665 #endif
666 
667 #endif // CVMFS_UTIL_CONCURRENCY_IMPL_H_
bool IsEmpty() const
void Enqueue(const T &data)
CallbackPtr RegisterListener(typename BoundClosure< ParamT, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data)
pthread_cond_t queue_is_not_empty_
Definition: concurrency.h:430
WorkerT::returned_data returned_data_t
Definition: concurrency.h:464
void WaitForEmptyQueue() const
const T Dequeue()
const int kLogWarning
void SetValueUnprotected(const T new_value)
void NotifyListeners(const ParamT &parameter)
atomic_int32 jobs_pending_
Definition: concurrency.h:690
size_t GetMaximalItemCount() const
virtual ~ConcurrentWorkers()
assert((mem||(size==0))&&"Out Of Memory")
void TruncateJobQueue(const bool forget_pending=false)
static CallbackTN * MakeCallback(typename BoundCallback< ParamT, DelegateT >::CallbackMethod method, DelegateT *delegate)
Definition: async.h:213
WorkerT::worker_context worker_context_t
Definition: concurrency.h:468
void UnregisterListener(CallbackPtr callback_object)
Definition: async.h:45
void Schedule(const expected_data_t &data)
Definition: concurrency.h:543
static void * RunCallbackThreadWrapper(void *run_binding)
const worker_context_t * worker_context
Definition: concurrency.h:509
FifoChannel(const size_t maximal_length, const size_t drainout_threshold)
ConcurrentWorkers(const size_t number_of_workers, const size_t maximal_queue_length, worker_context_t *worker_context=NULL)
void ReportStartedWorker() const
atomic_int32 jobs_failed_
Definition: concurrency.h:691
size_t GetItemCount() const
const bool is_death_sentence
death sentence flag
Definition: concurrency.h:483
pthread_cond_t queue_is_not_full_
Definition: concurrency.h:431
pthread_mutex_t mutex_
Definition: concurrency.h:429
unsigned int Drop()
const DataT data
job payload
Definition: concurrency.h:482
void UnregisterListeners()
ConcurrentWorkers< WorkerT > * delegate
Definition: concurrency.h:498
Definition: mutex.h:42
virtual ~FifoChannel()
atomic_int64 jobs_processed_
Definition: concurrency.h:692
const int kLogVerboseMsg
virtual ~Observable()
static void * RunWorker(void *run_binding)
static void size_t size
Definition: smalloc.h:54
bool IsRunning() const
Definition: concurrency.h:660
static CallbackTN * MakeClosure(typename BoundClosure< ParamT, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, const ClosureDataT &closure_data)
Definition: async.h:197
void JobDone(const returned_data_t &data, const bool success=true)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545