![]() |
CernVM-FS
2.12.0
|
#include <concurrency.h>
Classes | |
struct | Job |
struct | RunBinding |
struct | WorkerRunBinding |
Public Types | |
typedef WorkerT::expected_data | expected_data_t |
typedef WorkerT::returned_data | returned_data_t |
typedef WorkerT::worker_context | worker_context_t |
![]() | |
typedef Callbackable < WorkerT::returned_data > ::CallbackTN * | CallbackPtr |
![]() | |
typedef CallbackBase < WorkerT::returned_data > | CallbackTN |
Public Member Functions | |
ConcurrentWorkers (const size_t number_of_workers, const size_t maximal_queue_length, worker_context_t *worker_context=NULL) | |
virtual | ~ConcurrentWorkers () |
bool | Initialize () |
void | Schedule (const expected_data_t &data) |
void | Terminate () |
void | WaitForEmptyQueue () const |
void | WaitForTermination () |
unsigned int | GetNumberOfWorkers () const |
unsigned int | GetNumberOfFailedJobs () const |
void | JobSuccessful (const returned_data_t &data) |
void | JobFailed (const returned_data_t &data) |
void | RunCallbackThread () |
![]() | |
virtual | ~Observable () |
CallbackPtr | RegisterListener (typename BoundClosure< WorkerT::returned_data, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data) |
CallbackPtr | RegisterListener (typename BoundCallback< WorkerT::returned_data, DelegateT >::CallbackMethod method, DelegateT *delegate) |
CallbackPtr | RegisterListener (typename Callback< WorkerT::returned_data >::CallbackFunction fn) |
void | UnregisterListener (CallbackPtr callback_object) |
void | UnregisterListeners () |
Protected Types | |
typedef std::vector< pthread_t > | WorkerThreads |
typedef Job< expected_data_t > | WorkerJob |
typedef Job< returned_data_t > | CallbackJob |
![]() | |
typedef std::set< CallbackPtr > | Callbacks |
Protected Member Functions | |
bool | SpawnWorkers () |
void | ReportStartedWorker () const |
void | Schedule (WorkerJob job) |
void | ScheduleDeathSentences () |
void | TruncateJobQueue (const bool forget_pending=false) |
WorkerJob | Acquire () |
void | JobDone (const returned_data_t &data, const bool success=true) |
void | StartRunning () |
void | StopRunning () |
bool | IsRunning () const |
![]() | |
void | RegisterListener (CallbackPtr callback_object) |
Observable () | |
void | NotifyListeners (const WorkerT::returned_data ¶meter) |
Static Protected Member Functions | |
static void * | RunWorker (void *run_binding) |
static void * | RunCallbackThreadWrapper (void *run_binding) |
Private Types | |
typedef FifoChannel< WorkerJob > | JobQueue |
typedef FifoChannel< CallbackJob > | CallbackQueue |
Private Attributes | |
const size_t | number_of_workers_ |
number of concurrent worker threads More... | |
const worker_context_t * | worker_context_ |
the WorkerT defined context More... | |
WorkerRunBinding | thread_context_ |
bool | initialized_ |
bool | running_ |
unsigned int | workers_started_ |
pthread_mutex_t | status_mutex_ |
pthread_cond_t | worker_started_ |
pthread_mutex_t | jobs_all_done_mutex_ |
pthread_cond_t | jobs_all_done_ |
WorkerThreads | worker_threads_ |
list of worker threads More... | |
pthread_t | callback_thread_ |
handles callback invokes More... | |
JobQueue | jobs_queue_ |
atomic_int32 | jobs_pending_ |
atomic_int32 | jobs_failed_ |
atomic_int64 | jobs_processed_ |
CallbackQueue | results_queue_ |
Additional Inherited Members | |
![]() | |
static CallbackTN * | MakeClosure (typename BoundClosure< WorkerT::returned_data, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, const ClosureDataT &closure_data) |
static CallbackTN * | MakeCallback (typename BoundCallback< WorkerT::returned_data, DelegateT >::CallbackMethod method, DelegateT *delegate) |
static CallbackTN * | MakeCallback (typename Callback< WorkerT::returned_data >::CallbackFunction function) |
This template implements a generic producer/consumer approach to concurrent worker tasks. It spawns a given number of Workers derived from the base class ConcurrentWorker and uses them to distribute the work load onto concurrent threads. One can have multiple producers, that use Schedule() to post new work into a central job queue, which in turn is processed concurrently by the Worker objects in multiple threads. Furthermore the template provides an interface to control the worker swarm, i.e. to wait for their completion or cancel them before all jobs are processed.
Note: A worker is a class inheriting from ConcurrentWorker that needs to meet a couple of requirements. See the documentation of ConcurrentWorker for additional details.
WorkerT | the class to be used as a worker for a concurrent worker swarm |
Definition at line 426 of file concurrency.h.
|
protected |
Definition at line 462 of file concurrency.h.
|
private |
Definition at line 669 of file concurrency.h.
typedef WorkerT::expected_data ConcurrentWorkers< WorkerT >::expected_data_t |
Input data type
Definition at line 432 of file concurrency.h.
|
private |
Definition at line 662 of file concurrency.h.
typedef WorkerT::returned_data ConcurrentWorkers< WorkerT >::returned_data_t |
Output data type
Definition at line 436 of file concurrency.h.
typedef WorkerT::worker_context ConcurrentWorkers< WorkerT >::worker_context_t |
Common context type
Definition at line 440 of file concurrency.h.
|
protected |
Definition at line 461 of file concurrency.h.
|
protected |
Definition at line 443 of file concurrency.h.
ConcurrentWorkers< WorkerT >::ConcurrentWorkers | ( | const size_t | number_of_workers, |
const size_t | maximal_queue_length, | ||
worker_context_t * | worker_context = NULL |
||
) |
Creates a ConcurrentWorkers master object that encapsulates the actual workers.
number_of_workers | the number of concurrent workers to be spawned |
maximal_queue_length | the maximal length of the job queue (>= number_of_workers) |
worker_context | a pointer to the WorkerT defined context object |
Definition at line 342 of file concurrency_impl.h.
|
virtual |
Definition at line 365 of file concurrency_impl.h.
|
inlineprotected |
Retrieves a job from the job queue. If the job queue is empty it will block until there is a new job available for processing. THIS METHOD MUST ONLY BE CALLED INSIDE THE WORKER OBJECTS
Definition at line 607 of file concurrency_impl.h.
Referenced by ConcurrentWorkers< WorkerT >::RunWorker().
|
inline |
Definition at line 549 of file concurrency.h.
Referenced by swissknife::CommandMigrate::DoMigrationAndCommit().
|
inline |
Definition at line 548 of file concurrency.h.
bool ConcurrentWorkers< WorkerT >::Initialize | ( | ) |
Initializes the ConcurrentWorkers swarm, spawnes a thread for each new worker object and puts everything into working state.
Definition at line 379 of file concurrency_impl.h.
Referenced by swissknife::CommandMigrate::DoMigrationAndCommit().
|
inlineprotected |
Definition at line 634 of file concurrency.h.
Referenced by ConcurrentWorkers< WorkerT >::RunWorker().
|
protected |
Controls the asynchronous finishing of a job. DO NOT CALL THIS, use JobSuccessful() or JobFailed() wrappers instead.
data | the data to be returned to the user |
success | flag if job was successful |
Definition at line 704 of file concurrency_impl.h.
|
inline |
Defines a job as failed. DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT!
Note: Even for failed jobs the user will get a callback with a data object. You might want to make sure, that this data contains a status flag as well, telling the user what went wrong.
data | the data to be returned back to the user |
Definition at line 573 of file concurrency.h.
Referenced by swissknife::CommandMigrate::AbstractMigrationWorker< DerivedT >::operator()().
|
inline |
Defines a job as successfully finished. DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT!
data | the data to be returned back to the user |
Definition at line 559 of file concurrency.h.
Referenced by swissknife::CommandMigrate::AbstractMigrationWorker< DerivedT >::operator()().
|
protected |
Tells the master that a worker thread did start. This does not mean, that it was initialized successfully.
Definition at line 561 of file concurrency_impl.h.
Referenced by ConcurrentWorkers< WorkerT >::RunCallbackThreadWrapper(), and ConcurrentWorkers< WorkerT >::RunWorker().
void ConcurrentWorkers< WorkerT >::RunCallbackThread | ( | ) |
Definition at line 536 of file concurrency_impl.h.
Referenced by ConcurrentWorkers< WorkerT >::RunCallbackThreadWrapper().
|
staticprotected |
|
staticprotected |
POSIX conform function for thread entry point. Is invoked for every new worker thread and contains the initialization, processing loop and tear down of the unique worker objects
run_binding | void pointer to a RunBinding structure (C interface) |
Definition at line 461 of file concurrency_impl.h.
|
inline |
Schedules a new job for processing into the internal job queue. This method will block in case the job queue is already full and wait for an empty slot.
data | the data to be processed |
Definition at line 518 of file concurrency.h.
|
protected |
Definition at line 569 of file concurrency_impl.h.
|
protected |
|
protected |
|
inlineprotected |
Definition at line 626 of file concurrency.h.
|
inlineprotected |
Definition at line 630 of file concurrency.h.
void ConcurrentWorkers< WorkerT >::Terminate | ( | ) |
Shuts down the ConcurrentWorkers object as well as the encapsulated workers as soon as possible. Workers will finish their current job and will termi- nate afterwards. If no jobs are scheduled they will simply stop waiting for new ones and terminate afterwards. This method MUST not be called more than once per ConcurrentWorkers.
Definition at line 629 of file concurrency_impl.h.
|
protected |
Empties the job queue
forget_pending | controls if cancelled jobs should be seen as finished |
Definition at line 616 of file concurrency_impl.h.
void ConcurrentWorkers< WorkerT >::WaitForEmptyQueue | ( | ) | const |
Waits until the job queue is fully processed
Note: this might lead to undefined behaviour or infinite waiting if other producers still schedule jobs into the job queue.
Definition at line 677 of file concurrency_impl.h.
Referenced by swissknife::CommandMigrate::DoMigrationAndCommit().
void ConcurrentWorkers< WorkerT >::WaitForTermination | ( | ) |
Waits until the ConcurrentWorkers swarm fully processed the current job queue and shuts down afterwards.
Note: just as for WaitForEmptyQueue() this assumes that no other producers schedule jobs in the mean time.
Definition at line 694 of file concurrency_impl.h.
|
private |
handles callback invokes
Definition at line 659 of file concurrency.h.
|
private |
Definition at line 649 of file concurrency.h.
|
mutableprivate |
Definition at line 655 of file concurrency.h.
|
mutableprivate |
Definition at line 654 of file concurrency.h.
|
mutableprivate |
Definition at line 665 of file concurrency.h.
Referenced by ConcurrentWorkers< WorkerT >::ConcurrentWorkers().
|
mutableprivate |
Definition at line 664 of file concurrency.h.
Referenced by ConcurrentWorkers< WorkerT >::ConcurrentWorkers().
|
mutableprivate |
Definition at line 666 of file concurrency.h.
Referenced by ConcurrentWorkers< WorkerT >::ConcurrentWorkers().
|
private |
Definition at line 663 of file concurrency.h.
|
private |
number of concurrent worker threads
Definition at line 641 of file concurrency.h.
|
private |
Definition at line 670 of file concurrency.h.
|
private |
Definition at line 650 of file concurrency.h.
|
mutableprivate |
Definition at line 652 of file concurrency.h.
|
private |
The thread context passed to newly spawned threads
Definition at line 646 of file concurrency.h.
|
private |
the WorkerT defined context
Definition at line 642 of file concurrency.h.
|
mutableprivate |
Definition at line 653 of file concurrency.h.
|
private |
list of worker threads
Definition at line 658 of file concurrency.h.
|
mutableprivate |
Definition at line 651 of file concurrency.h.