5 #ifndef CVMFS_UTIL_CONCURRENCY_H_
6 #define CVMFS_UTIL_CONCURRENCY_H_
21 #ifdef CVMFS_NAMESPACE_GUARD
22 namespace CVMFS_NAMESPACE_GUARD {
30 template <
class ItemT>
34 int retval = pthread_mutex_init(&lock_, NULL);
36 retval = pthread_cond_init(&cond_populated_, NULL);
41 pthread_cond_destroy(&cond_populated_);
42 pthread_mutex_destroy(&lock_);
49 int retval = pthread_mutex_lock(&lock_);
58 int retval = pthread_mutex_unlock(&lock_);
66 int retval = pthread_cond_signal(&cond_populated_);
68 retval = pthread_mutex_unlock(&lock_);
74 items_.push_back(item);
75 int retval = pthread_cond_signal(&cond_populated_);
85 while (items_.size() == 0)
86 pthread_cond_wait(&cond_populated_, &lock_);
87 ItemT *item = items_[0];
88 items_.erase(items_.begin());
114 inline virtual ~Lockable() { pthread_mutex_destroy(&mutex_); }
116 void Lock()
const { pthread_mutex_lock(&mutex_); }
117 int TryLock()
const {
return pthread_mutex_trylock(&mutex_); }
118 void Unlock()
const { pthread_mutex_unlock(&mutex_); }
122 const int retval = pthread_mutex_init(&mutex_, NULL);
149 template <
typename T>
153 value_(T(0)), maximal_value_(T(0)) { Initialize(); }
157 , maximal_value_(maximal_value)
159 assert(maximal_value > T(0));
167 WaitForFreeSlotUnprotected();
168 SetValueUnprotected(value_ + T(1));
174 SetValueUnprotected(value_ - T(1));
180 while (value_ != T(0)) {
181 pthread_cond_wait(&became_zero_, &mutex_);
201 SetValueUnprotected(other);
206 void SetValueUnprotected(
const T new_value);
207 void WaitForFreeSlotUnprotected();
228 template <
typename ParamT>
250 template <
typename ParamT>
274 template <
class DelegateT,
class ClosureDataT>
278 ClosureDataT>::CallbackMethod method,
292 template <
class DelegateT>
295 DelegateT *delegate);
318 void UnregisterListeners();
323 void RegisterListener(
CallbackPtr callback_object);
332 void NotifyListeners(
const ParamT ¶meter);
336 mutable pthread_rwlock_t listeners_rw_lock_;
397 const size_t drainout_threshold);
407 void Enqueue(
const T &data);
424 inline size_t GetItemCount()
const;
425 inline bool IsEmpty()
const;
426 inline size_t GetMaximalItemCount()
const;
458 template <
class WorkerT>
483 template <
class DataT>
485 explicit Job(
const DataT &data) :
487 is_death_sentence(false) {}
490 is_death_sentence(true) {}
506 delegate(delegate) {}
515 worker_context(worker_context) {}
533 const size_t maximal_queue_length,
570 void WaitForEmptyQueue()
const;
579 void WaitForTermination();
583 return atomic_read32(&jobs_failed_);
608 void RunCallbackThread();
621 static void* RunWorker(
void *run_binding);
623 static void* RunCallbackThreadWrapper(
void *run_binding);
629 void ReportStartedWorker()
const;
631 void Schedule(WorkerJob job);
632 void ScheduleDeathSentences();
639 void TruncateJobQueue(
const bool forget_pending =
false);
648 inline WorkerJob Acquire();
657 void JobDone(
const returned_data_t& data,
const bool success =
true);
750 template <
class DerivedWorkerT>
803 #ifdef CVMFS_NAMESPACE_GUARD
809 #endif // CVMFS_UTIL_CONCURRENCY_H_
Job< returned_data_t > CallbackJob
void RegisterMaster(ConcurrentWorkers< DerivedWorkerT > *master)
SynchronizingCounter< T > & operator=(const T &other)
Callbackable< ParamT >::CallbackTN * CallbackPtr
FifoChannel< WorkerJob > JobQueue
pthread_t callback_thread_
handles callback invokes
virtual bool Initialize()
pthread_cond_t queue_is_not_empty_
const size_t queue_drainout_threshold_
WorkerT::returned_data returned_data_t
ConcurrentWorkers< DerivedWorkerT > * master() const
RunBinding(ConcurrentWorkers< WorkerT > *delegate)
SynchronizingCounter(const T maximal_value)
pthread_cond_t became_zero_
atomic_int32 jobs_pending_
unsigned int GetNumberOfCpuCores()
unsigned int GetNumberOfWorkers() const
pthread_cond_t cond_populated_
assert((mem||(size==0))&&"Out Of Memory")
WorkerThreads worker_threads_
list of worker threads
FifoChannel< CallbackJob > CallbackQueue
bool HasMaximalValue() const
WorkerT::worker_context worker_context_t
void JobSuccessful(const returned_data_t &data)
pthread_cond_t jobs_all_done_
void Schedule(const expected_data_t &data)
virtual ~ConcurrentWorker()
pthread_cond_t worker_started_
const worker_context_t * worker_context
const worker_context_t * worker_context_
the WorkerT defined context
std::vector< pthread_t > WorkerThreads
atomic_int32 jobs_failed_
const bool is_death_sentence
death sentence flag
Job< expected_data_t > WorkerJob
pthread_mutex_t status_mutex_
pthread_cond_t queue_is_not_full_
WorkerRunBinding thread_context_
static const unsigned int kFallbackNumberOfCpus
const size_t maximal_queue_length_
WorkerT::expected_data expected_data_t
std::vector< ItemT * > * StartEnqueueing()
CallbackQueue results_queue_
std::vector< ItemT * > items_
pthread_cond_t free_slot_
unsigned int workers_started_
const DataT data
job payload
ConcurrentWorkers< WorkerT > * delegate
void UnregisterListener(ListenerHandle *handle)
atomic_int64 jobs_processed_
unsigned int GetNumberOfFailedJobs() const
const size_t number_of_workers_
number of concurrent worker threads
void PushBack(ItemT *item)
void JobFailed(const returned_data_t &data)
std::set< CallbackPtr > Callbacks
pthread_mutex_t jobs_all_done_mutex_
ConcurrentWorkers< DerivedWorkerT > * master_
WorkerRunBinding(ConcurrentWorkers< WorkerT > *delegate, const worker_context_t *worker_context)