5 #ifndef CVMFS_UTIL_CONCURRENCY_IMPL_H_
6 #define CVMFS_UTIL_CONCURRENCY_IMPL_H_
10 #ifdef CVMFS_NAMESPACE_GUARD
11 namespace CVMFS_NAMESPACE_GUARD {
23 assert(!HasMaximalValue() ||
24 (new_value >= T(0) && new_value <= maximal_value_));
29 pthread_cond_broadcast(&became_zero_);
32 if (HasMaximalValue() && value_ < maximal_value_) {
33 pthread_cond_broadcast(&free_slot_);
40 while (HasMaximalValue() && value_ >= maximal_value_) {
41 pthread_cond_wait(&free_slot_, &mutex_);
43 assert(!HasMaximalValue() || value_ < maximal_value_);
49 const bool init_successful = (
50 pthread_mutex_init(&mutex_, NULL) == 0 &&
51 pthread_cond_init(&became_zero_, NULL) == 0 &&
52 pthread_cond_init(&free_slot_, NULL) == 0);
59 pthread_mutex_destroy(&mutex_);
60 pthread_cond_destroy(&became_zero_);
61 pthread_cond_destroy(&free_slot_);
71 template <
typename ParamT>
73 const int ret = pthread_rwlock_init(&listeners_rw_lock_, NULL);
78 template <
typename ParamT>
80 UnregisterListeners();
81 pthread_rwlock_destroy(&listeners_rw_lock_);
85 template <
typename ParamT>
86 template <
class DelegateT,
class ClosureDataT>
90 ClosureDataT>::CallbackMethod method,
96 RegisterListener(callback);
101 template <
typename ParamT>
102 template <
class DelegateT>
105 DelegateT *delegate) {
109 RegisterListener(callback);
114 template <
typename ParamT>
120 RegisterListener(callback);
125 template <
typename ParamT>
130 listeners_.insert(callback_object);
134 template <
typename ParamT>
140 const size_t was_removed = listeners_.erase(callback_object);
142 delete callback_object;
146 template <
typename ParamT>
151 typename Callbacks::const_iterator i = listeners_.begin();
152 typename Callbacks::const_iterator iend = listeners_.end();
153 for (; i != iend; ++i) {
160 template <
typename ParamT>
165 typename Callbacks::const_iterator i = listeners_.begin();
166 typename Callbacks::const_iterator iend = listeners_.end();
167 for (; i != iend; ++i) {
181 const size_t drainout_threshold) :
182 maximal_queue_length_(maximal_length),
183 queue_drainout_threshold_(drainout_threshold)
185 assert(drainout_threshold <= maximal_length);
186 assert(drainout_threshold > 0);
188 const bool successful = (
189 pthread_mutex_init(&
mutex_, NULL) == 0 &&
199 pthread_cond_destroy(&queue_is_not_empty_);
200 pthread_cond_destroy(&queue_is_not_full_);
201 pthread_mutex_destroy(&mutex_);
210 while (this->
size() >= maximal_queue_length_) {
211 pthread_cond_wait(&queue_is_not_full_, &mutex_);
218 pthread_cond_broadcast(&queue_is_not_empty_);
227 while (this->empty()) {
228 pthread_cond_wait(&queue_is_not_empty_, &mutex_);
232 T data = this->front(); this->pop();
235 if (this->
size() < queue_drainout_threshold_) {
236 pthread_cond_broadcast(&queue_is_not_full_);
248 unsigned int dropped_items = 0;
249 while (!this->empty()) {
254 pthread_cond_broadcast(&queue_is_not_full_);
256 return dropped_items;
270 return this->empty();
276 return maximal_queue_length_;
286 template <
class WorkerT>
288 const size_t number_of_workers,
289 const size_t maximal_queue_length,
291 number_of_workers_(number_of_workers),
292 worker_context_(worker_context),
293 thread_context_(this, worker_context_),
297 jobs_queue_(maximal_queue_length, maximal_queue_length / 4 + 1),
298 results_queue_(maximal_queue_length, 1)
300 assert(maximal_queue_length >= number_of_workers);
301 assert(number_of_workers > 0);
309 template <
class WorkerT>
316 pthread_cond_destroy(&worker_started_);
317 pthread_cond_destroy(&jobs_all_done_);
318 pthread_mutex_destroy(&status_mutex_);
319 pthread_mutex_destroy(&jobs_all_done_mutex_);
323 template <
class WorkerT>
326 "object with %lu worker threads "
327 "and a queue length of %zu",
328 number_of_workers_, jobs_queue_.GetMaximalItemCount());
334 if (pthread_mutex_init(&status_mutex_, NULL) != 0 ||
335 pthread_mutex_init(&jobs_all_done_mutex_, NULL) != 0 ||
336 pthread_cond_init(&worker_started_, NULL) != 0 ||
337 pthread_cond_init(&jobs_all_done_, NULL) != 0) {
342 if (!SpawnWorkers()) {
353 template <
class WorkerT>
355 assert(worker_threads_.size() == 0);
356 worker_threads_.resize(number_of_workers_);
363 WorkerThreads::iterator i = worker_threads_.begin();
364 WorkerThreads::const_iterator iend = worker_threads_.end();
365 for (; i != iend; ++i) {
366 pthread_t* thread = &(*i);
367 const int retval = pthread_create(
371 reinterpret_cast<void *>(&thread_context_));
384 reinterpret_cast<void *>(&thread_context_));
395 while (workers_started_ < number_of_workers_ + 1) {
396 pthread_cond_wait(&worker_started_, &status_mutex_);
405 template <
class WorkerT>
420 WorkerT worker(worker_context);
421 worker.RegisterMaster(master);
422 const bool init_success = worker.Initialize();
429 "properly... it will die now!");
464 template <
class WorkerT>
472 "Started dedicated callback worker");
480 template <
class WorkerT>
482 while (IsRunning()) {
483 const CallbackJob callback_job = results_queue_.Dequeue();
491 this->NotifyListeners(callback_job.
data);
494 atomic_dec32(&jobs_pending_);
495 atomic_inc64(&jobs_processed_);
498 if (atomic_read32(&jobs_pending_) == 0) {
499 pthread_cond_broadcast(&jobs_all_done_);
505 template <
class WorkerT>
509 pthread_cond_signal(&worker_started_);
513 template <
class WorkerT>
521 "concurrency was not running...");
525 jobs_queue_.Enqueue(job);
527 atomic_inc32(&jobs_pending_);
532 template <
class WorkerT>
540 const unsigned int number_of_workers = GetNumberOfWorkers();
541 for (
unsigned int i = 0; i < number_of_workers; ++i) {
550 template <
class WorkerT>
556 return jobs_queue_.Dequeue();
560 template <
class WorkerT>
564 const unsigned int dropped_jobs = jobs_queue_.Drop();
567 if (forget_pending) {
568 atomic_xadd32(&jobs_pending_, -dropped_jobs);
573 template <
class WorkerT>
586 ScheduleDeathSentences();
589 WorkerThreads::const_iterator i = worker_threads_.begin();
590 WorkerThreads::const_iterator iend = worker_threads_.end();
591 for (; i != iend; ++i) {
592 pthread_join(*i, NULL);
596 pthread_join(callback_thread_, NULL);
599 const int pending = atomic_read32(&jobs_pending_);
602 "Still %d jobs were pending and "
603 "will not be executed anymore.",
608 const int failed = atomic_read32(&jobs_failed_);
616 "All workers stopped. They processed %ld jobs. Terminating...",
617 atomic_read64(&jobs_processed_));
621 template <
class WorkerT>
624 "Waiting for %d jobs to be finished", atomic_read32(&jobs_pending_));
629 while (atomic_read32(&jobs_pending_) > 0) {
630 pthread_cond_wait(&jobs_all_done_, &jobs_all_done_mutex_);
638 template <
class WorkerT>
648 template <
class WorkerT>
651 const bool success) {
657 atomic_inc32(&jobs_failed_);
665 #ifdef CVMFS_NAMESPACE_GUARD
669 #endif // CVMFS_UTIL_CONCURRENCY_IMPL_H_
void Enqueue(const T &data)
CallbackPtr RegisterListener(typename BoundClosure< ParamT, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data)
pthread_cond_t queue_is_not_empty_
WorkerT::returned_data returned_data_t
void WaitForEmptyQueue() const
void SetValueUnprotected(const T new_value)
void NotifyListeners(const ParamT ¶meter)
atomic_int32 jobs_pending_
size_t GetMaximalItemCount() const
virtual ~ConcurrentWorkers()
assert((mem||(size==0))&&"Out Of Memory")
void TruncateJobQueue(const bool forget_pending=false)
static CallbackTN * MakeCallback(typename BoundCallback< ParamT, DelegateT >::CallbackMethod method, DelegateT *delegate)
WorkerT::worker_context worker_context_t
void UnregisterListener(CallbackPtr callback_object)
void Schedule(const expected_data_t &data)
static void * RunCallbackThreadWrapper(void *run_binding)
const worker_context_t * worker_context
FifoChannel(const size_t maximal_length, const size_t drainout_threshold)
ConcurrentWorkers(const size_t number_of_workers, const size_t maximal_queue_length, worker_context_t *worker_context=NULL)
void WaitForFreeSlotUnprotected()
void ReportStartedWorker() const
atomic_int32 jobs_failed_
void ScheduleDeathSentences()
size_t GetItemCount() const
const bool is_death_sentence
death sentence flag
pthread_cond_t queue_is_not_full_
void WaitForTermination()
const DataT data
job payload
void UnregisterListeners()
ConcurrentWorkers< WorkerT > * delegate
atomic_int64 jobs_processed_
static void * RunWorker(void *run_binding)
static CallbackTN * MakeClosure(typename BoundClosure< ParamT, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, const ClosureDataT &closure_data)
void JobDone(const returned_data_t &data, const bool success=true)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)