CernVM-FS  2.9.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ConcurrentWorkers< WorkerT > Class Template Reference

#include <util_concurrency.h>

Inheritance diagram for ConcurrentWorkers< WorkerT >:
Collaboration diagram for ConcurrentWorkers< WorkerT >:

Classes

struct  Job
 
struct  RunBinding
 
struct  WorkerRunBinding
 

Public Types

typedef WorkerT::expected_data expected_data_t
 
typedef WorkerT::returned_data returned_data_t
 
typedef WorkerT::worker_context worker_context_t
 
- Public Types inherited from Observable< WorkerT::returned_data >
typedef Callbackable
< WorkerT::returned_data >
::CallbackTN
CallbackPtr
 
- Public Types inherited from Callbackable< WorkerT::returned_data >
typedef CallbackBase
< WorkerT::returned_data > 
CallbackTN
 

Public Member Functions

 ConcurrentWorkers (const size_t number_of_workers, const size_t maximal_queue_length, worker_context_t *worker_context=NULL)
 
virtual ~ConcurrentWorkers ()
 
bool Initialize ()
 
void Schedule (const expected_data_t &data)
 
void Terminate ()
 
void WaitForEmptyQueue () const
 
void WaitForTermination ()
 
unsigned int GetNumberOfWorkers () const
 
unsigned int GetNumberOfFailedJobs () const
 
void JobSuccessful (const returned_data_t &data)
 
void JobFailed (const returned_data_t &data)
 
void RunCallbackThread ()
 
- Public Member Functions inherited from Observable< WorkerT::returned_data >
virtual ~Observable ()
 
CallbackPtr RegisterListener (typename BoundClosure< WorkerT::returned_data, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data)
 
CallbackPtr RegisterListener (typename BoundCallback< WorkerT::returned_data, DelegateT >::CallbackMethod method, DelegateT *delegate)
 
CallbackPtr RegisterListener (typename Callback< WorkerT::returned_data >::CallbackFunction fn)
 
void UnregisterListener (CallbackPtr callback_object)
 
void UnregisterListeners ()
 

Protected Types

typedef std::vector< pthread_t > WorkerThreads
 
typedef Job< expected_data_tWorkerJob
 
typedef Job< returned_data_tCallbackJob
 
- Protected Types inherited from Observable< WorkerT::returned_data >
typedef std::set< CallbackPtrCallbacks
 

Protected Member Functions

bool SpawnWorkers ()
 
void ReportStartedWorker () const
 
void Schedule (WorkerJob job)
 
void ScheduleDeathSentences ()
 
void TruncateJobQueue (const bool forget_pending=false)
 
WorkerJob Acquire ()
 
void JobDone (const returned_data_t &data, const bool success=true)
 
void StartRunning ()
 
void StopRunning ()
 
bool IsRunning () const
 
- Protected Member Functions inherited from Observable< WorkerT::returned_data >
void RegisterListener (CallbackPtr callback_object)
 
 Observable ()
 
void NotifyListeners (const WorkerT::returned_data &parameter)
 

Static Protected Member Functions

static void * RunWorker (void *run_binding)
 
static void * RunCallbackThreadWrapper (void *run_binding)
 

Private Types

typedef FifoChannel< WorkerJobJobQueue
 
typedef FifoChannel< CallbackJobCallbackQueue
 

Private Attributes

const size_t number_of_workers_
 number of concurrent worker threads More...
 
const worker_context_tworker_context_
 the WorkerT defined context More...
 
WorkerRunBinding thread_context_
 
bool initialized_
 
bool running_
 
unsigned int workers_started_
 
pthread_mutex_t status_mutex_
 
pthread_cond_t worker_started_
 
pthread_mutex_t jobs_all_done_mutex_
 
pthread_cond_t jobs_all_done_
 
WorkerThreads worker_threads_
 list of worker threads More...
 
pthread_t callback_thread_
 handles callback invokes More...
 
JobQueue jobs_queue_
 
atomic_int32 jobs_pending_
 
atomic_int32 jobs_failed_
 
atomic_int64 jobs_processed_
 
CallbackQueue results_queue_
 

Additional Inherited Members

- Static Public Member Functions inherited from Callbackable< WorkerT::returned_data >
static CallbackTNMakeClosure (typename BoundClosure< WorkerT::returned_data, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, const ClosureDataT &closure_data)
 
static CallbackTNMakeCallback (typename BoundCallback< WorkerT::returned_data, DelegateT >::CallbackMethod method, DelegateT *delegate)
 
static CallbackTNMakeCallback (typename Callback< WorkerT::returned_data >::CallbackFunction function)
 

Detailed Description

template<class WorkerT>
class ConcurrentWorkers< WorkerT >

This template implements a generic producer/consumer approach to concurrent worker tasks. It spawns a given number of Workers derived from the base class ConcurrentWorker and uses them to distribute the work load onto concurrent threads. One can have multiple producers, that use Schedule() to post new work into a central job queue, which in turn is processed concurrently by the Worker objects in multiple threads. Furthermore the template provides an interface to control the worker swarm, i.e. to wait for their completion or cancel them before all jobs are processed.

Note: A worker is a class inheriting from ConcurrentWorker that needs to meet a couple of requirements. See the documentation of ConcurrentWorker for additional details.

Parameters
WorkerTthe class to be used as a worker for a concurrent worker swarm

Definition at line 518 of file util_concurrency.h.

Member Typedef Documentation

template<class WorkerT>
typedef Job<returned_data_t> ConcurrentWorkers< WorkerT >::CallbackJob
protected

Definition at line 554 of file util_concurrency.h.

template<class WorkerT>
typedef FifoChannel<CallbackJob > ConcurrentWorkers< WorkerT >::CallbackQueue
private

Definition at line 761 of file util_concurrency.h.

template<class WorkerT>
typedef WorkerT::expected_data ConcurrentWorkers< WorkerT >::expected_data_t

Input data type

Definition at line 524 of file util_concurrency.h.

template<class WorkerT>
typedef FifoChannel<WorkerJob > ConcurrentWorkers< WorkerT >::JobQueue
private

Definition at line 754 of file util_concurrency.h.

template<class WorkerT>
typedef WorkerT::returned_data ConcurrentWorkers< WorkerT >::returned_data_t

Output data type

Definition at line 528 of file util_concurrency.h.

template<class WorkerT>
typedef WorkerT::worker_context ConcurrentWorkers< WorkerT >::worker_context_t

Common context type

Definition at line 532 of file util_concurrency.h.

template<class WorkerT>
typedef Job<expected_data_t> ConcurrentWorkers< WorkerT >::WorkerJob
protected

Definition at line 553 of file util_concurrency.h.

template<class WorkerT>
typedef std::vector<pthread_t> ConcurrentWorkers< WorkerT >::WorkerThreads
protected

Definition at line 535 of file util_concurrency.h.

Constructor & Destructor Documentation

template<class WorkerT>
ConcurrentWorkers< WorkerT >::ConcurrentWorkers ( const size_t  number_of_workers,
const size_t  maximal_queue_length,
worker_context_t worker_context = NULL 
)

Creates a ConcurrentWorkers master object that encapsulates the actual workers.

Parameters
number_of_workersthe number of concurrent workers to be spawned
maximal_queue_lengththe maximal length of the job queue (>= number_of_workers)
worker_contexta pointer to the WorkerT defined context object

Definition at line 342 of file util_concurrency_impl.h.

Here is the call graph for this function:

template<class WorkerT >
ConcurrentWorkers< WorkerT >::~ConcurrentWorkers ( )
virtual

Definition at line 365 of file util_concurrency_impl.h.

Member Function Documentation

template<class WorkerT >
ConcurrentWorkers< WorkerT >::WorkerJob ConcurrentWorkers< WorkerT >::Acquire ( )
inlineprotected

Retrieves a job from the job queue. If the job queue is empty it will block until there is a new job available for processing. THIS METHOD MUST ONLY BE CALLED INSIDE THE WORKER OBJECTS

Returns
a job to be processed by a worker

Definition at line 607 of file util_concurrency_impl.h.

Referenced by ConcurrentWorkers< WorkerT >::RunWorker().

Here is the caller graph for this function:

template<class WorkerT>
unsigned int ConcurrentWorkers< WorkerT >::GetNumberOfFailedJobs ( ) const
inline

Definition at line 641 of file util_concurrency.h.

Referenced by swissknife::CommandMigrate::DoMigrationAndCommit().

Here is the caller graph for this function:

template<class WorkerT>
unsigned int ConcurrentWorkers< WorkerT >::GetNumberOfWorkers ( ) const
inline

Definition at line 640 of file util_concurrency.h.

template<class WorkerT >
bool ConcurrentWorkers< WorkerT >::Initialize ( )

Initializes the ConcurrentWorkers swarm, spawnes a thread for each new worker object and puts everything into working state.

Returns
true if all went fine

Definition at line 379 of file util_concurrency_impl.h.

Referenced by swissknife::CommandMigrate::DoMigrationAndCommit().

Here is the caller graph for this function:

template<class WorkerT>
bool ConcurrentWorkers< WorkerT >::IsRunning ( ) const
inlineprotected

Definition at line 726 of file util_concurrency.h.

Referenced by ConcurrentWorkers< WorkerT >::RunWorker().

Here is the caller graph for this function:

template<class WorkerT>
void ConcurrentWorkers< WorkerT >::JobDone ( const returned_data_t data,
const bool  success = true 
)
protected

Controls the asynchronous finishing of a job. DO NOT CALL THIS, use JobSuccessful() or JobFailed() wrappers instead.

Parameters
datathe data to be returned to the user
successflag if job was successful

Definition at line 704 of file util_concurrency_impl.h.

template<class WorkerT>
void ConcurrentWorkers< WorkerT >::JobFailed ( const returned_data_t data)
inline

Defines a job as failed. DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT!

Note: Even for failed jobs the user will get a callback with a data object. You might want to make sure, that this data contains a status flag as well, telling the user what went wrong.

Parameters
datathe data to be returned back to the user

Definition at line 665 of file util_concurrency.h.

Referenced by swissknife::CommandMigrate::AbstractMigrationWorker< DerivedT >::operator()().

Here is the caller graph for this function:

template<class WorkerT>
void ConcurrentWorkers< WorkerT >::JobSuccessful ( const returned_data_t data)
inline

Defines a job as successfully finished. DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT!

Parameters
datathe data to be returned back to the user

Definition at line 651 of file util_concurrency.h.

Referenced by swissknife::CommandMigrate::AbstractMigrationWorker< DerivedT >::operator()().

Here is the caller graph for this function:

template<class WorkerT >
void ConcurrentWorkers< WorkerT >::ReportStartedWorker ( ) const
protected

Tells the master that a worker thread did start. This does not mean, that it was initialized successfully.

Definition at line 561 of file util_concurrency_impl.h.

Referenced by ConcurrentWorkers< WorkerT >::RunCallbackThreadWrapper(), and ConcurrentWorkers< WorkerT >::RunWorker().

Here is the caller graph for this function:

template<class WorkerT >
void ConcurrentWorkers< WorkerT >::RunCallbackThread ( )

Definition at line 536 of file util_concurrency_impl.h.

Referenced by ConcurrentWorkers< WorkerT >::RunCallbackThreadWrapper().

Here is the caller graph for this function:

template<class WorkerT >
void * ConcurrentWorkers< WorkerT >::RunCallbackThreadWrapper ( void *  run_binding)
staticprotected

Definition at line 520 of file util_concurrency_impl.h.

Here is the call graph for this function:

template<class WorkerT >
void * ConcurrentWorkers< WorkerT >::RunWorker ( void *  run_binding)
staticprotected

POSIX conform function for thread entry point. Is invoked for every new worker thread and contains the initialization, processing loop and tear down of the unique worker objects

Parameters
run_bindingvoid pointer to a RunBinding structure (C interface)
Returns
NULL in any case

Definition at line 461 of file util_concurrency_impl.h.

Here is the call graph for this function:

template<class WorkerT>
void ConcurrentWorkers< WorkerT >::Schedule ( const expected_data_t data)
inline

Schedules a new job for processing into the internal job queue. This method will block in case the job queue is already full and wait for an empty slot.

Parameters
datathe data to be processed

Definition at line 610 of file util_concurrency.h.

template<class WorkerT >
void ConcurrentWorkers< WorkerT >::Schedule ( WorkerJob  job)
protected

Definition at line 569 of file util_concurrency_impl.h.

template<class WorkerT >
void ConcurrentWorkers< WorkerT >::ScheduleDeathSentences ( )
protected

Definition at line 588 of file util_concurrency_impl.h.

Here is the call graph for this function:

template<class WorkerT >
bool ConcurrentWorkers< WorkerT >::SpawnWorkers ( )
protected

Definition at line 409 of file util_concurrency_impl.h.

Here is the call graph for this function:

template<class WorkerT>
void ConcurrentWorkers< WorkerT >::StartRunning ( )
inlineprotected

Definition at line 718 of file util_concurrency.h.

template<class WorkerT>
void ConcurrentWorkers< WorkerT >::StopRunning ( )
inlineprotected

Definition at line 722 of file util_concurrency.h.

template<class WorkerT >
void ConcurrentWorkers< WorkerT >::Terminate ( )

Shuts down the ConcurrentWorkers object as well as the encapsulated workers as soon as possible. Workers will finish their current job and will termi- nate afterwards. If no jobs are scheduled they will simply stop waiting for new ones and terminate afterwards. This method MUST not be called more than once per ConcurrentWorkers.

Definition at line 629 of file util_concurrency_impl.h.

Here is the call graph for this function:

template<class WorkerT >
void ConcurrentWorkers< WorkerT >::TruncateJobQueue ( const bool  forget_pending = false)
protected

Empties the job queue

Parameters
forget_pendingcontrols if cancelled jobs should be seen as finished

Definition at line 616 of file util_concurrency_impl.h.

template<class WorkerT >
void ConcurrentWorkers< WorkerT >::WaitForEmptyQueue ( ) const

Waits until the job queue is fully processed

Note: this might lead to undefined behaviour or infinite waiting if other producers still schedule jobs into the job queue.

Definition at line 677 of file util_concurrency_impl.h.

Referenced by swissknife::CommandMigrate::DoMigrationAndCommit().

Here is the caller graph for this function:

template<class WorkerT >
void ConcurrentWorkers< WorkerT >::WaitForTermination ( )

Waits until the ConcurrentWorkers swarm fully processed the current job queue and shuts down afterwards.

Note: just as for WaitForEmptyQueue() this assumes that no other producers schedule jobs in the mean time.

Definition at line 694 of file util_concurrency_impl.h.

Member Data Documentation

template<class WorkerT>
pthread_t ConcurrentWorkers< WorkerT >::callback_thread_
private

handles callback invokes

Definition at line 751 of file util_concurrency.h.

template<class WorkerT>
bool ConcurrentWorkers< WorkerT >::initialized_
private

Definition at line 741 of file util_concurrency.h.

template<class WorkerT>
pthread_cond_t ConcurrentWorkers< WorkerT >::jobs_all_done_
mutableprivate

Definition at line 747 of file util_concurrency.h.

template<class WorkerT>
pthread_mutex_t ConcurrentWorkers< WorkerT >::jobs_all_done_mutex_
mutableprivate

Definition at line 746 of file util_concurrency.h.

template<class WorkerT>
atomic_int32 ConcurrentWorkers< WorkerT >::jobs_failed_
mutableprivate

Definition at line 757 of file util_concurrency.h.

Referenced by ConcurrentWorkers< WorkerT >::ConcurrentWorkers().

template<class WorkerT>
atomic_int32 ConcurrentWorkers< WorkerT >::jobs_pending_
mutableprivate

Definition at line 756 of file util_concurrency.h.

Referenced by ConcurrentWorkers< WorkerT >::ConcurrentWorkers().

template<class WorkerT>
atomic_int64 ConcurrentWorkers< WorkerT >::jobs_processed_
mutableprivate

Definition at line 758 of file util_concurrency.h.

Referenced by ConcurrentWorkers< WorkerT >::ConcurrentWorkers().

template<class WorkerT>
JobQueue ConcurrentWorkers< WorkerT >::jobs_queue_
private

Definition at line 755 of file util_concurrency.h.

template<class WorkerT>
const size_t ConcurrentWorkers< WorkerT >::number_of_workers_
private

number of concurrent worker threads

Definition at line 733 of file util_concurrency.h.

template<class WorkerT>
CallbackQueue ConcurrentWorkers< WorkerT >::results_queue_
private

Definition at line 762 of file util_concurrency.h.

template<class WorkerT>
bool ConcurrentWorkers< WorkerT >::running_
private

Definition at line 742 of file util_concurrency.h.

template<class WorkerT>
pthread_mutex_t ConcurrentWorkers< WorkerT >::status_mutex_
mutableprivate

Definition at line 744 of file util_concurrency.h.

template<class WorkerT>
WorkerRunBinding ConcurrentWorkers< WorkerT >::thread_context_
private

The thread context passed to newly spawned threads

Definition at line 738 of file util_concurrency.h.

template<class WorkerT>
const worker_context_t* ConcurrentWorkers< WorkerT >::worker_context_
private

the WorkerT defined context

Definition at line 734 of file util_concurrency.h.

template<class WorkerT>
pthread_cond_t ConcurrentWorkers< WorkerT >::worker_started_
mutableprivate

Definition at line 745 of file util_concurrency.h.

template<class WorkerT>
WorkerThreads ConcurrentWorkers< WorkerT >::worker_threads_
private

list of worker threads

Definition at line 750 of file util_concurrency.h.

template<class WorkerT>
unsigned int ConcurrentWorkers< WorkerT >::workers_started_
mutableprivate

Definition at line 743 of file util_concurrency.h.


The documentation for this class was generated from the following files: