5 #ifndef CVMFS_UTIL_CONCURRENCY_IMPL_H_
6 #define CVMFS_UTIL_CONCURRENCY_IMPL_H_
10 #ifdef CVMFS_NAMESPACE_GUARD
11 namespace CVMFS_NAMESPACE_GUARD {
22 const bool init_successful = (pthread_mutex_init(&
mutex_, NULL) == 0 &&
30 pthread_cond_destroy(&object_set_);
31 pthread_mutex_destroy(&mutex_);
40 object_was_set_ =
true;
41 pthread_cond_broadcast(&object_set_);
48 if (!object_was_set_) {
49 pthread_cond_wait(&object_set_, &mutex_);
78 assert(!HasMaximalValue() ||
79 (new_value >= T(0) && new_value <= maximal_value_));
84 pthread_cond_broadcast(&became_zero_);
87 if (HasMaximalValue() && value_ < maximal_value_) {
88 pthread_cond_broadcast(&free_slot_);
95 while (HasMaximalValue() && value_ >= maximal_value_) {
96 pthread_cond_wait(&free_slot_, &mutex_);
98 assert(!HasMaximalValue() || value_ < maximal_value_);
102 template <
typename T>
104 const bool init_successful = (
105 pthread_mutex_init(&mutex_, NULL) == 0 &&
106 pthread_cond_init(&became_zero_, NULL) == 0 &&
107 pthread_cond_init(&free_slot_, NULL) == 0);
112 template <
typename T>
114 pthread_mutex_destroy(&mutex_);
115 pthread_cond_destroy(&became_zero_);
116 pthread_cond_destroy(&free_slot_);
126 template <
typename ParamT>
128 const int ret = pthread_rwlock_init(&listeners_rw_lock_, NULL);
133 template <
typename ParamT>
135 UnregisterListeners();
136 pthread_rwlock_destroy(&listeners_rw_lock_);
140 template <
typename ParamT>
141 template <
class DelegateT,
class ClosureDataT>
145 ClosureDataT>::CallbackMethod method,
151 RegisterListener(callback);
156 template <
typename ParamT>
157 template <
class DelegateT>
160 DelegateT *delegate) {
164 RegisterListener(callback);
169 template <
typename ParamT>
175 RegisterListener(callback);
180 template <
typename ParamT>
185 listeners_.insert(callback_object);
189 template <
typename ParamT>
195 const size_t was_removed = listeners_.erase(callback_object);
197 delete callback_object;
201 template <
typename ParamT>
206 typename Callbacks::const_iterator i = listeners_.begin();
207 typename Callbacks::const_iterator iend = listeners_.end();
208 for (; i != iend; ++i) {
215 template <
typename ParamT>
220 typename Callbacks::const_iterator i = listeners_.begin();
221 typename Callbacks::const_iterator iend = listeners_.end();
222 for (; i != iend; ++i) {
236 const size_t drainout_threshold) :
237 maximal_queue_length_(maximal_length),
238 queue_drainout_threshold_(drainout_threshold)
240 assert(drainout_threshold <= maximal_length);
241 assert(drainout_threshold > 0);
243 const bool successful = (
244 pthread_mutex_init(&
mutex_, NULL) == 0 &&
254 pthread_cond_destroy(&queue_is_not_empty_);
255 pthread_cond_destroy(&queue_is_not_full_);
256 pthread_mutex_destroy(&mutex_);
265 while (this->
size() >= maximal_queue_length_) {
266 pthread_cond_wait(&queue_is_not_full_, &mutex_);
273 pthread_cond_broadcast(&queue_is_not_empty_);
282 while (this->empty()) {
283 pthread_cond_wait(&queue_is_not_empty_, &mutex_);
287 T data = this->front(); this->pop();
290 if (this->
size() < queue_drainout_threshold_) {
291 pthread_cond_broadcast(&queue_is_not_full_);
303 unsigned int dropped_items = 0;
304 while (!this->empty()) {
309 pthread_cond_broadcast(&queue_is_not_full_);
311 return dropped_items;
325 return this->empty();
331 return maximal_queue_length_;
341 template <
class WorkerT>
343 const size_t number_of_workers,
344 const size_t maximal_queue_length,
346 number_of_workers_(number_of_workers),
347 worker_context_(worker_context),
348 thread_context_(this, worker_context_),
352 jobs_queue_(maximal_queue_length, maximal_queue_length / 4 + 1),
353 results_queue_(maximal_queue_length, 1)
355 assert(maximal_queue_length >= number_of_workers);
356 assert(number_of_workers > 0);
364 template <
class WorkerT>
371 pthread_cond_destroy(&worker_started_);
372 pthread_cond_destroy(&jobs_all_done_);
373 pthread_mutex_destroy(&status_mutex_);
374 pthread_mutex_destroy(&jobs_all_done_mutex_);
378 template <
class WorkerT>
381 "object with %d worker threads "
382 "and a queue length of %d",
383 number_of_workers_, jobs_queue_.GetMaximalItemCount());
389 if (pthread_mutex_init(&status_mutex_, NULL) != 0 ||
390 pthread_mutex_init(&jobs_all_done_mutex_, NULL) != 0 ||
391 pthread_cond_init(&worker_started_, NULL) != 0 ||
392 pthread_cond_init(&jobs_all_done_, NULL) != 0) {
397 if (!SpawnWorkers()) {
408 template <
class WorkerT>
410 assert(worker_threads_.size() == 0);
411 worker_threads_.resize(number_of_workers_);
418 WorkerThreads::iterator i = worker_threads_.begin();
419 WorkerThreads::const_iterator iend = worker_threads_.end();
420 for (; i != iend; ++i) {
421 pthread_t* thread = &(*i);
422 const int retval = pthread_create(
426 reinterpret_cast<void *>(&thread_context_));
439 reinterpret_cast<void *>(&thread_context_));
450 while (workers_started_ < number_of_workers_ + 1) {
451 pthread_cond_wait(&worker_started_, &status_mutex_);
460 template <
class WorkerT>
475 WorkerT worker(worker_context);
476 worker.RegisterMaster(master);
477 const bool init_success = worker.Initialize();
484 "properly... it will die now!");
519 template <
class WorkerT>
527 "Started dedicated callback worker");
535 template <
class WorkerT>
537 while (IsRunning()) {
538 const CallbackJob callback_job = results_queue_.Dequeue();
546 this->NotifyListeners(callback_job.
data);
549 atomic_dec32(&jobs_pending_);
550 atomic_inc64(&jobs_processed_);
553 if (atomic_read32(&jobs_pending_) == 0) {
554 pthread_cond_broadcast(&jobs_all_done_);
560 template <
class WorkerT>
564 pthread_cond_signal(&worker_started_);
568 template <
class WorkerT>
576 "concurrency was not running...");
580 jobs_queue_.Enqueue(job);
582 atomic_inc32(&jobs_pending_);
587 template <
class WorkerT>
595 const unsigned int number_of_workers = GetNumberOfWorkers();
596 for (
unsigned int i = 0; i < number_of_workers; ++i) {
605 template <
class WorkerT>
611 return jobs_queue_.Dequeue();
615 template <
class WorkerT>
619 const unsigned int dropped_jobs = jobs_queue_.Drop();
622 if (forget_pending) {
623 atomic_xadd32(&jobs_pending_, -dropped_jobs);
628 template <
class WorkerT>
641 ScheduleDeathSentences();
644 WorkerThreads::const_iterator i = worker_threads_.begin();
645 WorkerThreads::const_iterator iend = worker_threads_.end();
646 for (; i != iend; ++i) {
647 pthread_join(*i, NULL);
651 pthread_join(callback_thread_, NULL);
654 const int pending = atomic_read32(&jobs_pending_);
657 "Still %d jobs were pending and "
658 "will not be executed anymore.",
663 const int failed = atomic_read32(&jobs_failed_);
671 "All workers stopped. They processed %d jobs. Terminating...",
672 atomic_read64(&jobs_processed_));
676 template <
class WorkerT>
679 "Waiting for %d jobs to be finished", atomic_read32(&jobs_pending_));
684 while (atomic_read32(&jobs_pending_) > 0) {
685 pthread_cond_wait(&jobs_all_done_, &jobs_all_done_mutex_);
693 template <
class WorkerT>
703 template <
class WorkerT>
706 const bool success) {
712 atomic_inc32(&jobs_failed_);
720 #ifdef CVMFS_NAMESPACE_GUARD
724 #endif // CVMFS_UTIL_CONCURRENCY_IMPL_H_
#define LogCvmfs(source, mask,...)
void Enqueue(const T &data)
CallbackPtr RegisterListener(typename BoundClosure< ParamT, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data)
pthread_cond_t queue_is_not_empty_
WorkerT::returned_data returned_data_t
void WaitForEmptyQueue() const
void SetValueUnprotected(const T new_value)
pthread_cond_t object_set_
void NotifyListeners(const ParamT ¶meter)
atomic_int32 jobs_pending_
size_t GetMaximalItemCount() const
virtual ~ConcurrentWorkers()
assert((mem||(size==0))&&"Out Of Memory")
void TruncateJobQueue(const bool forget_pending=false)
static CallbackTN * MakeCallback(typename BoundCallback< ParamT, DelegateT >::CallbackMethod method, DelegateT *delegate)
void Set(const T &object)
WorkerT::worker_context worker_context_t
void UnregisterListener(CallbackPtr callback_object)
void Schedule(const expected_data_t &data)
static void * RunCallbackThreadWrapper(void *run_binding)
const worker_context_t * worker_context
FifoChannel(const size_t maximal_length, const size_t drainout_threshold)
ConcurrentWorkers(const size_t number_of_workers, const size_t maximal_queue_length, worker_context_t *worker_context=NULL)
void WaitForFreeSlotUnprotected()
void ReportStartedWorker() const
atomic_int32 jobs_failed_
void ScheduleDeathSentences()
size_t GetItemCount() const
const bool is_death_sentence
death sentence flag
pthread_cond_t queue_is_not_full_
void WaitForTermination()
const DataT data
job payload
void UnregisterListeners()
ConcurrentWorkers< WorkerT > * delegate
atomic_int64 jobs_processed_
static void * RunWorker(void *run_binding)
static CallbackTN * MakeClosure(typename BoundClosure< ParamT, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, const ClosureDataT &closure_data)
void JobDone(const returned_data_t &data, const bool success=true)