CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ConcurrentWorkers< WorkerT > Class Template Reference

#include <concurrency.h>

Inheritance diagram for ConcurrentWorkers< WorkerT >:
Collaboration diagram for ConcurrentWorkers< WorkerT >:

Classes

struct  Job
 
struct  RunBinding
 
struct  WorkerRunBinding
 

Public Types

typedef WorkerT::expected_data expected_data_t
 
typedef WorkerT::returned_data returned_data_t
 
typedef WorkerT::worker_context worker_context_t
 
- Public Types inherited from Observable< WorkerT::returned_data >
typedef Callbackable
< WorkerT::returned_data >
::CallbackTN
CallbackPtr
 
- Public Types inherited from Callbackable< WorkerT::returned_data >
typedef CallbackBase
< WorkerT::returned_data > 
CallbackTN
 

Public Member Functions

 ConcurrentWorkers (const size_t number_of_workers, const size_t maximal_queue_length, worker_context_t *worker_context=NULL)
 
virtual ~ConcurrentWorkers ()
 
bool Initialize ()
 
void Schedule (const expected_data_t &data)
 
void Terminate ()
 
void WaitForEmptyQueue () const
 
void WaitForTermination ()
 
unsigned int GetNumberOfWorkers () const
 
unsigned int GetNumberOfFailedJobs () const
 
void JobSuccessful (const returned_data_t &data)
 
void JobFailed (const returned_data_t &data)
 
void RunCallbackThread ()
 
- Public Member Functions inherited from Observable< WorkerT::returned_data >
virtual ~Observable ()
 
CallbackPtr RegisterListener (typename BoundClosure< WorkerT::returned_data, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data)
 
CallbackPtr RegisterListener (typename BoundCallback< WorkerT::returned_data, DelegateT >::CallbackMethod method, DelegateT *delegate)
 
CallbackPtr RegisterListener (typename Callback< WorkerT::returned_data >::CallbackFunction fn)
 
void UnregisterListener (CallbackPtr callback_object)
 
void UnregisterListeners ()
 

Protected Types

typedef std::vector< pthread_t > WorkerThreads
 
typedef Job< expected_data_tWorkerJob
 
typedef Job< returned_data_tCallbackJob
 
- Protected Types inherited from Observable< WorkerT::returned_data >
typedef std::set< CallbackPtrCallbacks
 

Protected Member Functions

bool SpawnWorkers ()
 
void ReportStartedWorker () const
 
void Schedule (WorkerJob job)
 
void ScheduleDeathSentences ()
 
void TruncateJobQueue (const bool forget_pending=false)
 
WorkerJob Acquire ()
 
void JobDone (const returned_data_t &data, const bool success=true)
 
void StartRunning ()
 
void StopRunning ()
 
bool IsRunning () const
 
- Protected Member Functions inherited from Observable< WorkerT::returned_data >
void RegisterListener (CallbackPtr callback_object)
 
 Observable ()
 
void NotifyListeners (const WorkerT::returned_data &parameter)
 

Static Protected Member Functions

static void * RunWorker (void *run_binding)
 
static void * RunCallbackThreadWrapper (void *run_binding)
 

Private Types

typedef FifoChannel< WorkerJobJobQueue
 
typedef FifoChannel< CallbackJobCallbackQueue
 

Private Attributes

const size_t number_of_workers_
 number of concurrent worker threads More...
 
const worker_context_tworker_context_
 the WorkerT defined context More...
 
WorkerRunBinding thread_context_
 
bool initialized_
 
bool running_
 
unsigned int workers_started_
 
pthread_mutex_t status_mutex_
 
pthread_cond_t worker_started_
 
pthread_mutex_t jobs_all_done_mutex_
 
pthread_cond_t jobs_all_done_
 
WorkerThreads worker_threads_
 list of worker threads More...
 
pthread_t callback_thread_
 handles callback invokes More...
 
JobQueue jobs_queue_
 
atomic_int32 jobs_pending_
 
atomic_int32 jobs_failed_
 
atomic_int64 jobs_processed_
 
CallbackQueue results_queue_
 

Additional Inherited Members

- Static Public Member Functions inherited from Callbackable< WorkerT::returned_data >
static CallbackTNMakeClosure (typename BoundClosure< WorkerT::returned_data, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, const ClosureDataT &closure_data)
 
static CallbackTNMakeCallback (typename BoundCallback< WorkerT::returned_data, DelegateT >::CallbackMethod method, DelegateT *delegate)
 
static CallbackTNMakeCallback (typename Callback< WorkerT::returned_data >::CallbackFunction function)
 

Detailed Description

template<class WorkerT>
class ConcurrentWorkers< WorkerT >

This template implements a generic producer/consumer approach to concurrent worker tasks. It spawns a given number of Workers derived from the base class ConcurrentWorker and uses them to distribute the work load onto concurrent threads. One can have multiple producers, that use Schedule() to post new work into a central job queue, which in turn is processed concurrently by the Worker objects in multiple threads. Furthermore the template provides an interface to control the worker swarm, i.e. to wait for their completion or cancel them before all jobs are processed.

Note: A worker is a class inheriting from ConcurrentWorker that needs to meet a couple of requirements. See the documentation of ConcurrentWorker for additional details.

Parameters
WorkerTthe class to be used as a worker for a concurrent worker swarm

Definition at line 377 of file concurrency.h.

Member Typedef Documentation

template<class WorkerT>
typedef Job<returned_data_t> ConcurrentWorkers< WorkerT >::CallbackJob
protected

Definition at line 413 of file concurrency.h.

template<class WorkerT>
typedef FifoChannel<CallbackJob > ConcurrentWorkers< WorkerT >::CallbackQueue
private

Definition at line 620 of file concurrency.h.

template<class WorkerT>
typedef WorkerT::expected_data ConcurrentWorkers< WorkerT >::expected_data_t

Input data type

Definition at line 383 of file concurrency.h.

template<class WorkerT>
typedef FifoChannel<WorkerJob > ConcurrentWorkers< WorkerT >::JobQueue
private

Definition at line 613 of file concurrency.h.

template<class WorkerT>
typedef WorkerT::returned_data ConcurrentWorkers< WorkerT >::returned_data_t

Output data type

Definition at line 387 of file concurrency.h.

template<class WorkerT>
typedef WorkerT::worker_context ConcurrentWorkers< WorkerT >::worker_context_t

Common context type

Definition at line 391 of file concurrency.h.

template<class WorkerT>
typedef Job<expected_data_t> ConcurrentWorkers< WorkerT >::WorkerJob
protected

Definition at line 412 of file concurrency.h.

template<class WorkerT>
typedef std::vector<pthread_t> ConcurrentWorkers< WorkerT >::WorkerThreads
protected

Definition at line 394 of file concurrency.h.

Constructor & Destructor Documentation

template<class WorkerT>
ConcurrentWorkers< WorkerT >::ConcurrentWorkers ( const size_t  number_of_workers,
const size_t  maximal_queue_length,
worker_context_t worker_context = NULL 
)

Creates a ConcurrentWorkers master object that encapsulates the actual workers.

Parameters
number_of_workersthe number of concurrent workers to be spawned
maximal_queue_lengththe maximal length of the job queue (>= number_of_workers)
worker_contexta pointer to the WorkerT defined context object

Definition at line 287 of file concurrency_impl.h.

Here is the call graph for this function:

template<class WorkerT >
ConcurrentWorkers< WorkerT >::~ConcurrentWorkers ( )
virtual

Definition at line 310 of file concurrency_impl.h.

Member Function Documentation

template<class WorkerT >
ConcurrentWorkers< WorkerT >::WorkerJob ConcurrentWorkers< WorkerT >::Acquire ( )
inlineprotected

Retrieves a job from the job queue. If the job queue is empty it will block until there is a new job available for processing. THIS METHOD MUST ONLY BE CALLED INSIDE THE WORKER OBJECTS

Returns
a job to be processed by a worker

Definition at line 552 of file concurrency_impl.h.

Referenced by ConcurrentWorkers< WorkerT >::RunWorker().

Here is the caller graph for this function:

template<class WorkerT>
unsigned int ConcurrentWorkers< WorkerT >::GetNumberOfFailedJobs ( ) const
inline

Definition at line 500 of file concurrency.h.

Referenced by swissknife::CommandMigrate::DoMigrationAndCommit().

Here is the caller graph for this function:

template<class WorkerT>
unsigned int ConcurrentWorkers< WorkerT >::GetNumberOfWorkers ( ) const
inline

Definition at line 499 of file concurrency.h.

template<class WorkerT >
bool ConcurrentWorkers< WorkerT >::Initialize ( )

Initializes the ConcurrentWorkers swarm, spawnes a thread for each new worker object and puts everything into working state.

Returns
true if all went fine

Definition at line 324 of file concurrency_impl.h.

Referenced by swissknife::CommandMigrate::DoMigrationAndCommit().

Here is the call graph for this function:

Here is the caller graph for this function:

template<class WorkerT>
bool ConcurrentWorkers< WorkerT >::IsRunning ( ) const
inlineprotected

Definition at line 585 of file concurrency.h.

Referenced by ConcurrentWorkers< WorkerT >::RunWorker().

Here is the caller graph for this function:

template<class WorkerT>
void ConcurrentWorkers< WorkerT >::JobDone ( const returned_data_t data,
const bool  success = true 
)
protected

Controls the asynchronous finishing of a job. DO NOT CALL THIS, use JobSuccessful() or JobFailed() wrappers instead.

Parameters
datathe data to be returned to the user
successflag if job was successful

Definition at line 649 of file concurrency_impl.h.

Here is the call graph for this function:

template<class WorkerT>
void ConcurrentWorkers< WorkerT >::JobFailed ( const returned_data_t data)
inline

Defines a job as failed. DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT!

Note: Even for failed jobs the user will get a callback with a data object. You might want to make sure, that this data contains a status flag as well, telling the user what went wrong.

Parameters
datathe data to be returned back to the user

Definition at line 524 of file concurrency.h.

Referenced by swissknife::CommandMigrate::AbstractMigrationWorker< DerivedT >::operator()().

Here is the caller graph for this function:

template<class WorkerT>
void ConcurrentWorkers< WorkerT >::JobSuccessful ( const returned_data_t data)
inline

Defines a job as successfully finished. DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT!

Parameters
datathe data to be returned back to the user

Definition at line 510 of file concurrency.h.

Referenced by swissknife::CommandMigrate::AbstractMigrationWorker< DerivedT >::operator()().

Here is the caller graph for this function:

template<class WorkerT >
void ConcurrentWorkers< WorkerT >::ReportStartedWorker ( ) const
protected

Tells the master that a worker thread did start. This does not mean, that it was initialized successfully.

Definition at line 506 of file concurrency_impl.h.

Referenced by ConcurrentWorkers< WorkerT >::RunCallbackThreadWrapper(), and ConcurrentWorkers< WorkerT >::RunWorker().

Here is the caller graph for this function:

template<class WorkerT >
void ConcurrentWorkers< WorkerT >::RunCallbackThread ( )

Definition at line 481 of file concurrency_impl.h.

Referenced by ConcurrentWorkers< WorkerT >::RunCallbackThreadWrapper().

Here is the caller graph for this function:

template<class WorkerT >
void * ConcurrentWorkers< WorkerT >::RunCallbackThreadWrapper ( void *  run_binding)
staticprotected

Definition at line 465 of file concurrency_impl.h.

Here is the call graph for this function:

template<class WorkerT >
void * ConcurrentWorkers< WorkerT >::RunWorker ( void *  run_binding)
staticprotected

POSIX conform function for thread entry point. Is invoked for every new worker thread and contains the initialization, processing loop and tear down of the unique worker objects

Parameters
run_bindingvoid pointer to a RunBinding structure (C interface)
Returns
NULL in any case

Definition at line 406 of file concurrency_impl.h.

Here is the call graph for this function:

template<class WorkerT>
void ConcurrentWorkers< WorkerT >::Schedule ( const expected_data_t data)
inline

Schedules a new job for processing into the internal job queue. This method will block in case the job queue is already full and wait for an empty slot.

Parameters
datathe data to be processed

Definition at line 469 of file concurrency.h.

template<class WorkerT >
void ConcurrentWorkers< WorkerT >::Schedule ( WorkerJob  job)
protected

Definition at line 514 of file concurrency_impl.h.

Here is the call graph for this function:

template<class WorkerT >
void ConcurrentWorkers< WorkerT >::ScheduleDeathSentences ( )
protected

Definition at line 533 of file concurrency_impl.h.

Here is the call graph for this function:

template<class WorkerT >
bool ConcurrentWorkers< WorkerT >::SpawnWorkers ( )
protected

Definition at line 354 of file concurrency_impl.h.

Here is the call graph for this function:

template<class WorkerT>
void ConcurrentWorkers< WorkerT >::StartRunning ( )
inlineprotected

Definition at line 577 of file concurrency.h.

template<class WorkerT>
void ConcurrentWorkers< WorkerT >::StopRunning ( )
inlineprotected

Definition at line 581 of file concurrency.h.

template<class WorkerT >
void ConcurrentWorkers< WorkerT >::Terminate ( )

Shuts down the ConcurrentWorkers object as well as the encapsulated workers as soon as possible. Workers will finish their current job and will termi- nate afterwards. If no jobs are scheduled they will simply stop waiting for new ones and terminate afterwards. This method MUST not be called more than once per ConcurrentWorkers.

Definition at line 574 of file concurrency_impl.h.

Here is the call graph for this function:

template<class WorkerT >
void ConcurrentWorkers< WorkerT >::TruncateJobQueue ( const bool  forget_pending = false)
protected

Empties the job queue

Parameters
forget_pendingcontrols if cancelled jobs should be seen as finished

Definition at line 561 of file concurrency_impl.h.

template<class WorkerT >
void ConcurrentWorkers< WorkerT >::WaitForEmptyQueue ( ) const

Waits until the job queue is fully processed

Note: this might lead to undefined behaviour or infinite waiting if other producers still schedule jobs into the job queue.

Definition at line 622 of file concurrency_impl.h.

Referenced by swissknife::CommandMigrate::DoMigrationAndCommit().

Here is the call graph for this function:

Here is the caller graph for this function:

template<class WorkerT >
void ConcurrentWorkers< WorkerT >::WaitForTermination ( )

Waits until the ConcurrentWorkers swarm fully processed the current job queue and shuts down afterwards.

Note: just as for WaitForEmptyQueue() this assumes that no other producers schedule jobs in the mean time.

Definition at line 639 of file concurrency_impl.h.

Member Data Documentation

template<class WorkerT>
pthread_t ConcurrentWorkers< WorkerT >::callback_thread_
private

handles callback invokes

Definition at line 610 of file concurrency.h.

template<class WorkerT>
bool ConcurrentWorkers< WorkerT >::initialized_
private

Definition at line 600 of file concurrency.h.

template<class WorkerT>
pthread_cond_t ConcurrentWorkers< WorkerT >::jobs_all_done_
mutableprivate

Definition at line 606 of file concurrency.h.

template<class WorkerT>
pthread_mutex_t ConcurrentWorkers< WorkerT >::jobs_all_done_mutex_
mutableprivate

Definition at line 605 of file concurrency.h.

template<class WorkerT>
atomic_int32 ConcurrentWorkers< WorkerT >::jobs_failed_
mutableprivate

Definition at line 616 of file concurrency.h.

Referenced by ConcurrentWorkers< WorkerT >::ConcurrentWorkers().

template<class WorkerT>
atomic_int32 ConcurrentWorkers< WorkerT >::jobs_pending_
mutableprivate

Definition at line 615 of file concurrency.h.

Referenced by ConcurrentWorkers< WorkerT >::ConcurrentWorkers().

template<class WorkerT>
atomic_int64 ConcurrentWorkers< WorkerT >::jobs_processed_
mutableprivate

Definition at line 617 of file concurrency.h.

Referenced by ConcurrentWorkers< WorkerT >::ConcurrentWorkers().

template<class WorkerT>
JobQueue ConcurrentWorkers< WorkerT >::jobs_queue_
private

Definition at line 614 of file concurrency.h.

template<class WorkerT>
const size_t ConcurrentWorkers< WorkerT >::number_of_workers_
private

number of concurrent worker threads

Definition at line 592 of file concurrency.h.

template<class WorkerT>
CallbackQueue ConcurrentWorkers< WorkerT >::results_queue_
private

Definition at line 621 of file concurrency.h.

template<class WorkerT>
bool ConcurrentWorkers< WorkerT >::running_
private

Definition at line 601 of file concurrency.h.

template<class WorkerT>
pthread_mutex_t ConcurrentWorkers< WorkerT >::status_mutex_
mutableprivate

Definition at line 603 of file concurrency.h.

template<class WorkerT>
WorkerRunBinding ConcurrentWorkers< WorkerT >::thread_context_
private

The thread context passed to newly spawned threads

Definition at line 597 of file concurrency.h.

template<class WorkerT>
const worker_context_t* ConcurrentWorkers< WorkerT >::worker_context_
private

the WorkerT defined context

Definition at line 593 of file concurrency.h.

template<class WorkerT>
pthread_cond_t ConcurrentWorkers< WorkerT >::worker_started_
mutableprivate

Definition at line 604 of file concurrency.h.

template<class WorkerT>
WorkerThreads ConcurrentWorkers< WorkerT >::worker_threads_
private

list of worker threads

Definition at line 609 of file concurrency.h.

template<class WorkerT>
unsigned int ConcurrentWorkers< WorkerT >::workers_started_
mutableprivate

Definition at line 602 of file concurrency.h.


The documentation for this class was generated from the following files: