5 #ifndef CVMFS_UTIL_CONCURRENCY_H_
6 #define CVMFS_UTIL_CONCURRENCY_H_
21 #ifdef CVMFS_NAMESPACE_GUARD
22 namespace CVMFS_NAMESPACE_GUARD {
32 inline virtual ~Lockable() { pthread_mutex_destroy(&mutex_); }
34 void Lock()
const { pthread_mutex_lock(&mutex_); }
35 int TryLock()
const {
return pthread_mutex_trylock(&mutex_); }
36 void Unlock()
const { pthread_mutex_unlock(&mutex_); }
40 const int retval = pthread_mutex_init(&mutex_, NULL);
71 value_(T(0)), maximal_value_(T(0)) { Initialize(); }
75 , maximal_value_(maximal_value)
77 assert(maximal_value > T(0));
85 WaitForFreeSlotUnprotected();
86 SetValueUnprotected(value_ + T(1));
92 SetValueUnprotected(value_ - T(1));
98 while (value_ != T(0)) {
99 pthread_cond_wait(&became_zero_, &mutex_);
119 SetValueUnprotected(other);
124 void SetValueUnprotected(
const T new_value);
125 void WaitForFreeSlotUnprotected();
146 template <
typename ParamT>
168 template <
typename ParamT>
192 template <
class DelegateT,
class ClosureDataT>
196 ClosureDataT>::CallbackMethod method,
210 template <
class DelegateT>
213 DelegateT *delegate);
236 void UnregisterListeners();
241 void RegisterListener(
CallbackPtr callback_object);
250 void NotifyListeners(
const ParamT ¶meter);
254 mutable pthread_rwlock_t listeners_rw_lock_;
315 const size_t drainout_threshold);
325 void Enqueue(
const T &data);
342 inline size_t GetItemCount()
const;
343 inline bool IsEmpty()
const;
344 inline size_t GetMaximalItemCount()
const;
376 template <
class WorkerT>
401 template <
class DataT>
403 explicit Job(
const DataT &data) :
405 is_death_sentence(false) {}
408 is_death_sentence(true) {}
424 delegate(delegate) {}
433 worker_context(worker_context) {}
451 const size_t maximal_queue_length,
488 void WaitForEmptyQueue()
const;
497 void WaitForTermination();
501 return atomic_read32(&jobs_failed_);
526 void RunCallbackThread();
539 static void* RunWorker(
void *run_binding);
541 static void* RunCallbackThreadWrapper(
void *run_binding);
547 void ReportStartedWorker()
const;
549 void Schedule(WorkerJob job);
550 void ScheduleDeathSentences();
557 void TruncateJobQueue(
const bool forget_pending =
false);
566 inline WorkerJob Acquire();
575 void JobDone(
const returned_data_t& data,
const bool success =
true);
668 template <
class DerivedWorkerT>
721 #ifdef CVMFS_NAMESPACE_GUARD
727 #endif // CVMFS_UTIL_CONCURRENCY_H_
Job< returned_data_t > CallbackJob
void RegisterMaster(ConcurrentWorkers< DerivedWorkerT > *master)
SynchronizingCounter< T > & operator=(const T &other)
Callbackable< ParamT >::CallbackTN * CallbackPtr
FifoChannel< WorkerJob > JobQueue
pthread_t callback_thread_
handles callback invokes
virtual bool Initialize()
pthread_cond_t queue_is_not_empty_
const size_t queue_drainout_threshold_
WorkerT::returned_data returned_data_t
ConcurrentWorkers< DerivedWorkerT > * master() const
RunBinding(ConcurrentWorkers< WorkerT > *delegate)
SynchronizingCounter(const T maximal_value)
pthread_cond_t became_zero_
atomic_int32 jobs_pending_
unsigned int GetNumberOfCpuCores()
unsigned int GetNumberOfWorkers() const
assert((mem||(size==0))&&"Out Of Memory")
WorkerThreads worker_threads_
list of worker threads
FifoChannel< CallbackJob > CallbackQueue
bool HasMaximalValue() const
WorkerT::worker_context worker_context_t
void JobSuccessful(const returned_data_t &data)
pthread_cond_t jobs_all_done_
void Schedule(const expected_data_t &data)
virtual ~ConcurrentWorker()
pthread_cond_t worker_started_
const worker_context_t * worker_context
const worker_context_t * worker_context_
the WorkerT defined context
std::vector< pthread_t > WorkerThreads
atomic_int32 jobs_failed_
const bool is_death_sentence
death sentence flag
Job< expected_data_t > WorkerJob
pthread_mutex_t status_mutex_
pthread_cond_t queue_is_not_full_
WorkerRunBinding thread_context_
static const unsigned int kFallbackNumberOfCpus
const size_t maximal_queue_length_
WorkerT::expected_data expected_data_t
CallbackQueue results_queue_
pthread_cond_t free_slot_
unsigned int workers_started_
const DataT data
job payload
ConcurrentWorkers< WorkerT > * delegate
void UnregisterListener(ListenerHandle *handle)
atomic_int64 jobs_processed_
unsigned int GetNumberOfFailedJobs() const
const size_t number_of_workers_
number of concurrent worker threads
void JobFailed(const returned_data_t &data)
std::set< CallbackPtr > Callbacks
pthread_mutex_t jobs_all_done_mutex_
ConcurrentWorkers< DerivedWorkerT > * master_
WorkerRunBinding(ConcurrentWorkers< WorkerT > *delegate, const worker_context_t *worker_context)