5 #ifndef CVMFS_UTIL_CONCURRENCY_H_
6 #define CVMFS_UTIL_CONCURRENCY_H_
20 #ifdef CVMFS_NAMESPACE_GUARD
21 namespace CVMFS_NAMESPACE_GUARD {
31 inline virtual ~Lockable() { pthread_mutex_destroy(&mutex_); }
33 void Lock()
const { pthread_mutex_lock(&mutex_); }
34 int TryLock()
const {
return pthread_mutex_trylock(&mutex_); }
35 void Unlock()
const { pthread_mutex_unlock(&mutex_); }
39 const int retval = pthread_mutex_init(&mutex_, NULL);
76 void Set(
const T &
object);
115 template <
typename T>
119 value_(T(0)), maximal_value_(T(0)) { Initialize(); }
123 , maximal_value_(maximal_value)
125 assert(maximal_value > T(0));
133 WaitForFreeSlotUnprotected();
134 SetValueUnprotected(value_ + T(1));
140 SetValueUnprotected(value_ - T(1));
146 while (value_ != T(0)) {
147 pthread_cond_wait(&became_zero_, &mutex_);
167 SetValueUnprotected(other);
172 void SetValueUnprotected(
const T new_value);
173 void WaitForFreeSlotUnprotected();
194 template <
typename ParamT>
216 template <
typename ParamT>
240 template <
class DelegateT,
class ClosureDataT>
244 ClosureDataT>::CallbackMethod method,
258 template <
class DelegateT>
261 DelegateT *delegate);
284 void UnregisterListeners();
289 void RegisterListener(
CallbackPtr callback_object);
298 void NotifyListeners(
const ParamT ¶meter);
302 mutable pthread_rwlock_t listeners_rw_lock_;
363 const size_t drainout_threshold);
373 void Enqueue(
const T &data);
390 inline size_t GetItemCount()
const;
391 inline bool IsEmpty()
const;
392 inline size_t GetMaximalItemCount()
const;
424 template <
class WorkerT>
449 template <
class DataT>
451 explicit Job(
const DataT &data) :
453 is_death_sentence(false) {}
456 is_death_sentence(true) {}
472 delegate(delegate) {}
481 worker_context(worker_context) {}
499 const size_t maximal_queue_length,
536 void WaitForEmptyQueue()
const;
545 void WaitForTermination();
549 return atomic_read32(&jobs_failed_);
574 void RunCallbackThread();
587 static void* RunWorker(
void *run_binding);
589 static void* RunCallbackThreadWrapper(
void *run_binding);
595 void ReportStartedWorker()
const;
597 void Schedule(WorkerJob job);
598 void ScheduleDeathSentences();
605 void TruncateJobQueue(
const bool forget_pending =
false);
614 inline WorkerJob Acquire();
623 void JobDone(
const returned_data_t& data,
const bool success =
true);
716 template <
class DerivedWorkerT>
769 #ifdef CVMFS_NAMESPACE_GUARD
775 #endif // CVMFS_UTIL_CONCURRENCY_H_
Job< returned_data_t > CallbackJob
void RegisterMaster(ConcurrentWorkers< DerivedWorkerT > *master)
SynchronizingCounter< T > & operator=(const T &other)
Callbackable< ParamT >::CallbackTN * CallbackPtr
FifoChannel< WorkerJob > JobQueue
pthread_t callback_thread_
handles callback invokes
virtual bool Initialize()
pthread_cond_t queue_is_not_empty_
unsigned int GetNumberOfCpuCores()
const size_t queue_drainout_threshold_
WorkerT::returned_data returned_data_t
ConcurrentWorkers< DerivedWorkerT > * master() const
RunBinding(ConcurrentWorkers< WorkerT > *delegate)
SynchronizingCounter(const T maximal_value)
pthread_cond_t became_zero_
pthread_cond_t object_set_
atomic_int32 jobs_pending_
unsigned int GetNumberOfWorkers() const
assert((mem||(size==0))&&"Out Of Memory")
WorkerThreads worker_threads_
list of worker threads
FifoChannel< CallbackJob > CallbackQueue
bool HasMaximalValue() const
WorkerT::worker_context worker_context_t
void JobSuccessful(const returned_data_t &data)
pthread_cond_t jobs_all_done_
void Schedule(const expected_data_t &data)
virtual ~ConcurrentWorker()
pthread_cond_t worker_started_
const worker_context_t * worker_context
const worker_context_t * worker_context_
the WorkerT defined context
static const unsigned int kFallbackNumberOfCpus
std::vector< pthread_t > WorkerThreads
atomic_int32 jobs_failed_
const bool is_death_sentence
death sentence flag
Job< expected_data_t > WorkerJob
pthread_mutex_t status_mutex_
pthread_cond_t queue_is_not_full_
WorkerRunBinding thread_context_
const size_t maximal_queue_length_
WorkerT::expected_data expected_data_t
CallbackQueue results_queue_
pthread_cond_t free_slot_
unsigned int workers_started_
const DataT data
job payload
ConcurrentWorkers< WorkerT > * delegate
void UnregisterListener(ListenerHandle *handle)
atomic_int64 jobs_processed_
unsigned int GetNumberOfFailedJobs() const
const size_t number_of_workers_
number of concurrent worker threads
void JobFailed(const returned_data_t &data)
std::set< CallbackPtr > Callbacks
pthread_mutex_t jobs_all_done_mutex_
ConcurrentWorkers< DerivedWorkerT > * master_
WorkerRunBinding(ConcurrentWorkers< WorkerT > *delegate, const worker_context_t *worker_context)