5 #ifndef CVMFS_UTIL_CONCURRENCY_IMPL_H_
6 #define CVMFS_UTIL_CONCURRENCY_IMPL_H_
10 #ifdef CVMFS_NAMESPACE_GUARD
11 namespace CVMFS_NAMESPACE_GUARD {
24 || (new_value >= T(0) && new_value <= maximal_value_));
29 pthread_cond_broadcast(&became_zero_);
32 if (HasMaximalValue() && value_ < maximal_value_) {
33 pthread_cond_broadcast(&free_slot_);
40 while (HasMaximalValue() && value_ >= maximal_value_) {
41 pthread_cond_wait(&free_slot_, &mutex_);
43 assert(!HasMaximalValue() || value_ < maximal_value_);
49 const bool init_successful = (pthread_mutex_init(&mutex_, NULL) == 0
50 && pthread_cond_init(&became_zero_, NULL) == 0
51 && pthread_cond_init(&free_slot_, NULL) == 0);
58 pthread_mutex_destroy(&mutex_);
59 pthread_cond_destroy(&became_zero_);
60 pthread_cond_destroy(&free_slot_);
70 template<
typename ParamT>
72 const int ret = pthread_rwlock_init(&listeners_rw_lock_, NULL);
77 template<
typename ParamT>
79 UnregisterListeners();
80 pthread_rwlock_destroy(&listeners_rw_lock_);
84 template<
typename ParamT>
85 template<
class DelegateT,
class ClosureDataT>
93 method, delegate, data);
94 RegisterListener(callback);
99 template<
typename ParamT>
100 template<
class DelegateT>
103 DelegateT *delegate) {
107 RegisterListener(callback);
112 template<
typename ParamT>
117 RegisterListener(callback);
122 template<
typename ParamT>
127 listeners_.insert(callback_object);
131 template<
typename ParamT>
137 const size_t was_removed = listeners_.erase(callback_object);
139 delete callback_object;
143 template<
typename ParamT>
148 typename Callbacks::const_iterator i = listeners_.begin();
149 typename Callbacks::const_iterator iend = listeners_.end();
150 for (; i != iend; ++i) {
157 template<
typename ParamT>
162 typename Callbacks::const_iterator i = listeners_.begin();
163 typename Callbacks::const_iterator iend = listeners_.end();
164 for (; i != iend; ++i) {
178 const size_t drainout_threshold)
179 : maximal_queue_length_(maximal_length)
180 , queue_drainout_threshold_(drainout_threshold) {
181 assert(drainout_threshold <= maximal_length);
182 assert(drainout_threshold > 0);
184 const bool successful = (pthread_mutex_init(&
mutex_, NULL) == 0
195 pthread_cond_destroy(&queue_is_not_empty_);
196 pthread_cond_destroy(&queue_is_not_full_);
197 pthread_mutex_destroy(&mutex_);
206 while (this->
size() >= maximal_queue_length_) {
207 pthread_cond_wait(&queue_is_not_full_, &mutex_);
214 pthread_cond_broadcast(&queue_is_not_empty_);
223 while (this->empty()) {
224 pthread_cond_wait(&queue_is_not_empty_, &mutex_);
228 T data = this->front();
232 if (this->
size() < queue_drainout_threshold_) {
233 pthread_cond_broadcast(&queue_is_not_full_);
245 unsigned int dropped_items = 0;
246 while (!this->empty()) {
251 pthread_cond_broadcast(&queue_is_not_full_);
253 return dropped_items;
267 return this->empty();
273 return maximal_queue_length_;
283 template<
class WorkerT>
285 const size_t number_of_workers,
286 const size_t maximal_queue_length,
288 : number_of_workers_(number_of_workers)
289 , worker_context_(worker_context)
290 , thread_context_(this, worker_context_)
291 , initialized_(false)
293 , workers_started_(0)
294 , jobs_queue_(maximal_queue_length, maximal_queue_length / 4 + 1)
295 , results_queue_(maximal_queue_length, 1) {
296 assert(maximal_queue_length >= number_of_workers);
297 assert(number_of_workers > 0);
305 template<
class WorkerT>
312 pthread_cond_destroy(&worker_started_);
313 pthread_cond_destroy(&jobs_all_done_);
314 pthread_mutex_destroy(&status_mutex_);
315 pthread_mutex_destroy(&jobs_all_done_mutex_);
319 template<
class WorkerT>
322 "Initializing ConcurrentWorker "
323 "object with %lu worker threads "
324 "and a queue length of %zu",
325 number_of_workers_, jobs_queue_.GetMaximalItemCount());
331 if (pthread_mutex_init(&status_mutex_, NULL) != 0
332 || pthread_mutex_init(&jobs_all_done_mutex_, NULL) != 0
333 || pthread_cond_init(&worker_started_, NULL) != 0
334 || pthread_cond_init(&jobs_all_done_, NULL) != 0) {
339 if (!SpawnWorkers()) {
350 template<
class WorkerT>
352 assert(worker_threads_.size() == 0);
353 worker_threads_.resize(number_of_workers_);
360 WorkerThreads::iterator i = worker_threads_.begin();
361 WorkerThreads::const_iterator iend = worker_threads_.end();
362 for (; i != iend; ++i) {
363 pthread_t *thread = &(*i);
364 const int retval = pthread_create(
368 reinterpret_cast<void *>(&thread_context_));
376 const int retval = pthread_create(
380 reinterpret_cast<void *>(&thread_context_));
383 "Failed to spawn the callback "
392 while (workers_started_ < number_of_workers_ + 1) {
393 pthread_cond_wait(&worker_started_, &status_mutex_);
402 template<
class WorkerT>
417 WorkerT worker(worker_context);
418 worker.RegisterMaster(master);
419 const bool init_success = worker.Initialize();
426 "Worker was not initialized "
427 "properly... it will die now!");
462 template<
class WorkerT>
470 "Started dedicated callback worker");
478 template<
class WorkerT>
480 while (IsRunning()) {
481 const CallbackJob callback_job = results_queue_.Dequeue();
489 this->NotifyListeners(callback_job.
data);
492 atomic_dec32(&jobs_pending_);
493 atomic_inc64(&jobs_processed_);
496 if (atomic_read32(&jobs_pending_) == 0) {
497 pthread_cond_broadcast(&jobs_all_done_);
503 template<
class WorkerT>
507 pthread_cond_signal(&worker_started_);
511 template<
class WorkerT>
519 "Tried to schedule a job but "
520 "concurrency was not running...");
524 jobs_queue_.Enqueue(job);
526 atomic_inc32(&jobs_pending_);
531 template<
class WorkerT>
539 const unsigned int number_of_workers = GetNumberOfWorkers();
540 for (
unsigned int i = 0; i < number_of_workers; ++i) {
549 template<
class WorkerT>
554 return jobs_queue_.Dequeue();
558 template<
class WorkerT>
562 const unsigned int dropped_jobs = jobs_queue_.Drop();
565 if (forget_pending) {
566 atomic_xadd32(&jobs_pending_, -dropped_jobs);
571 template<
class WorkerT>
584 ScheduleDeathSentences();
587 WorkerThreads::const_iterator i = worker_threads_.begin();
588 WorkerThreads::const_iterator iend = worker_threads_.end();
589 for (; i != iend; ++i) {
590 pthread_join(*i, NULL);
594 pthread_join(callback_thread_, NULL);
597 const int pending = atomic_read32(&jobs_pending_);
600 "Job queue was not fully processed. "
601 "Still %d jobs were pending and "
602 "will not be executed anymore.",
607 const int failed = atomic_read32(&jobs_failed_);
614 "All workers stopped. They processed %ld jobs. Terminating...",
615 atomic_read64(&jobs_processed_));
619 template<
class WorkerT>
622 "Waiting for %d jobs to be finished", atomic_read32(&jobs_pending_));
627 while (atomic_read32(&jobs_pending_) > 0) {
628 pthread_cond_wait(&jobs_all_done_, &jobs_all_done_mutex_);
636 template<
class WorkerT>
646 template<
class WorkerT>
649 const bool success) {
655 atomic_inc32(&jobs_failed_);
663 #ifdef CVMFS_NAMESPACE_GUARD
667 #endif // CVMFS_UTIL_CONCURRENCY_IMPL_H_
void Enqueue(const T &data)
CallbackPtr RegisterListener(typename BoundClosure< ParamT, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data)
pthread_cond_t queue_is_not_empty_
WorkerT::returned_data returned_data_t
void WaitForEmptyQueue() const
void SetValueUnprotected(const T new_value)
void NotifyListeners(const ParamT ¶meter)
atomic_int32 jobs_pending_
size_t GetMaximalItemCount() const
virtual ~ConcurrentWorkers()
assert((mem||(size==0))&&"Out Of Memory")
void TruncateJobQueue(const bool forget_pending=false)
static CallbackTN * MakeCallback(typename BoundCallback< ParamT, DelegateT >::CallbackMethod method, DelegateT *delegate)
WorkerT::worker_context worker_context_t
void UnregisterListener(CallbackPtr callback_object)
void Schedule(const expected_data_t &data)
static void * RunCallbackThreadWrapper(void *run_binding)
const worker_context_t * worker_context
FifoChannel(const size_t maximal_length, const size_t drainout_threshold)
ConcurrentWorkers(const size_t number_of_workers, const size_t maximal_queue_length, worker_context_t *worker_context=NULL)
void WaitForFreeSlotUnprotected()
void ReportStartedWorker() const
atomic_int32 jobs_failed_
void ScheduleDeathSentences()
size_t GetItemCount() const
const bool is_death_sentence
death sentence flag
pthread_cond_t queue_is_not_full_
void WaitForTermination()
const DataT data
job payload
void UnregisterListeners()
ConcurrentWorkers< WorkerT > * delegate
atomic_int64 jobs_processed_
static void * RunWorker(void *run_binding)
static CallbackTN * MakeClosure(typename BoundClosure< ParamT, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, const ClosureDataT &closure_data)
void JobDone(const returned_data_t &data, const bool success=true)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)