5 #ifndef CVMFS_UTIL_CONCURRENCY_H_
6 #define CVMFS_UTIL_CONCURRENCY_H_
21 #ifdef CVMFS_NAMESPACE_GUARD
22 namespace CVMFS_NAMESPACE_GUARD {
32 inline virtual ~Lockable() { pthread_mutex_destroy(&mutex_); }
34 void Lock()
const { pthread_mutex_lock(&mutex_); }
35 int TryLock()
const {
return pthread_mutex_trylock(&mutex_); }
36 void Unlock()
const { pthread_mutex_unlock(&mutex_); }
40 const int retval = pthread_mutex_init(&mutex_, NULL);
77 void Set(
const T &
object);
116 template <
typename T>
120 value_(T(0)), maximal_value_(T(0)) { Initialize(); }
124 , maximal_value_(maximal_value)
126 assert(maximal_value > T(0));
134 WaitForFreeSlotUnprotected();
135 SetValueUnprotected(value_ + T(1));
141 SetValueUnprotected(value_ - T(1));
147 while (value_ != T(0)) {
148 pthread_cond_wait(&became_zero_, &mutex_);
168 SetValueUnprotected(other);
173 void SetValueUnprotected(
const T new_value);
174 void WaitForFreeSlotUnprotected();
195 template <
typename ParamT>
217 template <
typename ParamT>
241 template <
class DelegateT,
class ClosureDataT>
245 ClosureDataT>::CallbackMethod method,
259 template <
class DelegateT>
262 DelegateT *delegate);
285 void UnregisterListeners();
290 void RegisterListener(
CallbackPtr callback_object);
299 void NotifyListeners(
const ParamT ¶meter);
303 mutable pthread_rwlock_t listeners_rw_lock_;
364 const size_t drainout_threshold);
374 void Enqueue(
const T &data);
391 inline size_t GetItemCount()
const;
392 inline bool IsEmpty()
const;
393 inline size_t GetMaximalItemCount()
const;
425 template <
class WorkerT>
450 template <
class DataT>
452 explicit Job(
const DataT &data) :
454 is_death_sentence(false) {}
457 is_death_sentence(true) {}
473 delegate(delegate) {}
482 worker_context(worker_context) {}
500 const size_t maximal_queue_length,
537 void WaitForEmptyQueue()
const;
546 void WaitForTermination();
550 return atomic_read32(&jobs_failed_);
575 void RunCallbackThread();
588 static void* RunWorker(
void *run_binding);
590 static void* RunCallbackThreadWrapper(
void *run_binding);
596 void ReportStartedWorker()
const;
598 void Schedule(WorkerJob job);
599 void ScheduleDeathSentences();
606 void TruncateJobQueue(
const bool forget_pending =
false);
615 inline WorkerJob Acquire();
624 void JobDone(
const returned_data_t& data,
const bool success =
true);
717 template <
class DerivedWorkerT>
770 #ifdef CVMFS_NAMESPACE_GUARD
776 #endif // CVMFS_UTIL_CONCURRENCY_H_
Job< returned_data_t > CallbackJob
void RegisterMaster(ConcurrentWorkers< DerivedWorkerT > *master)
SynchronizingCounter< T > & operator=(const T &other)
Callbackable< ParamT >::CallbackTN * CallbackPtr
FifoChannel< WorkerJob > JobQueue
pthread_t callback_thread_
handles callback invokes
virtual bool Initialize()
pthread_cond_t queue_is_not_empty_
const size_t queue_drainout_threshold_
WorkerT::returned_data returned_data_t
ConcurrentWorkers< DerivedWorkerT > * master() const
RunBinding(ConcurrentWorkers< WorkerT > *delegate)
SynchronizingCounter(const T maximal_value)
pthread_cond_t became_zero_
pthread_cond_t object_set_
atomic_int32 jobs_pending_
unsigned int GetNumberOfCpuCores()
unsigned int GetNumberOfWorkers() const
assert((mem||(size==0))&&"Out Of Memory")
WorkerThreads worker_threads_
list of worker threads
FifoChannel< CallbackJob > CallbackQueue
bool HasMaximalValue() const
WorkerT::worker_context worker_context_t
void JobSuccessful(const returned_data_t &data)
pthread_cond_t jobs_all_done_
void Schedule(const expected_data_t &data)
virtual ~ConcurrentWorker()
pthread_cond_t worker_started_
const worker_context_t * worker_context
const worker_context_t * worker_context_
the WorkerT defined context
std::vector< pthread_t > WorkerThreads
atomic_int32 jobs_failed_
const bool is_death_sentence
death sentence flag
Job< expected_data_t > WorkerJob
pthread_mutex_t status_mutex_
pthread_cond_t queue_is_not_full_
WorkerRunBinding thread_context_
static const unsigned int kFallbackNumberOfCpus
const size_t maximal_queue_length_
WorkerT::expected_data expected_data_t
CallbackQueue results_queue_
pthread_cond_t free_slot_
unsigned int workers_started_
const DataT data
job payload
ConcurrentWorkers< WorkerT > * delegate
void UnregisterListener(ListenerHandle *handle)
atomic_int64 jobs_processed_
unsigned int GetNumberOfFailedJobs() const
const size_t number_of_workers_
number of concurrent worker threads
void JobFailed(const returned_data_t &data)
std::set< CallbackPtr > Callbacks
pthread_mutex_t jobs_all_done_mutex_
ConcurrentWorkers< DerivedWorkerT > * master_
WorkerRunBinding(ConcurrentWorkers< WorkerT > *delegate, const worker_context_t *worker_context)