CernVM-FS
2.12.0
|
#include <concurrency.h>
Classes | |
struct | Job |
struct | RunBinding |
struct | WorkerRunBinding |
Public Types | |
typedef WorkerT::expected_data | expected_data_t |
typedef WorkerT::returned_data | returned_data_t |
typedef WorkerT::worker_context | worker_context_t |
Public Types inherited from Observable< WorkerT::returned_data > | |
typedef Callbackable < WorkerT::returned_data > ::CallbackTN * | CallbackPtr |
Public Types inherited from Callbackable< WorkerT::returned_data > | |
typedef CallbackBase < WorkerT::returned_data > | CallbackTN |
Public Member Functions | |
ConcurrentWorkers (const size_t number_of_workers, const size_t maximal_queue_length, worker_context_t *worker_context=NULL) | |
virtual | ~ConcurrentWorkers () |
bool | Initialize () |
void | Schedule (const expected_data_t &data) |
void | Terminate () |
void | WaitForEmptyQueue () const |
void | WaitForTermination () |
unsigned int | GetNumberOfWorkers () const |
unsigned int | GetNumberOfFailedJobs () const |
void | JobSuccessful (const returned_data_t &data) |
void | JobFailed (const returned_data_t &data) |
void | RunCallbackThread () |
Public Member Functions inherited from Observable< WorkerT::returned_data > | |
virtual | ~Observable () |
CallbackPtr | RegisterListener (typename BoundClosure< WorkerT::returned_data, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data) |
CallbackPtr | RegisterListener (typename BoundCallback< WorkerT::returned_data, DelegateT >::CallbackMethod method, DelegateT *delegate) |
CallbackPtr | RegisterListener (typename Callback< WorkerT::returned_data >::CallbackFunction fn) |
void | UnregisterListener (CallbackPtr callback_object) |
void | UnregisterListeners () |
Protected Types | |
typedef std::vector< pthread_t > | WorkerThreads |
typedef Job< expected_data_t > | WorkerJob |
typedef Job< returned_data_t > | CallbackJob |
Protected Types inherited from Observable< WorkerT::returned_data > | |
typedef std::set< CallbackPtr > | Callbacks |
Protected Member Functions | |
bool | SpawnWorkers () |
void | ReportStartedWorker () const |
void | Schedule (WorkerJob job) |
void | ScheduleDeathSentences () |
void | TruncateJobQueue (const bool forget_pending=false) |
WorkerJob | Acquire () |
void | JobDone (const returned_data_t &data, const bool success=true) |
void | StartRunning () |
void | StopRunning () |
bool | IsRunning () const |
Protected Member Functions inherited from Observable< WorkerT::returned_data > | |
void | RegisterListener (CallbackPtr callback_object) |
Observable () | |
void | NotifyListeners (const WorkerT::returned_data ¶meter) |
Static Protected Member Functions | |
static void * | RunWorker (void *run_binding) |
static void * | RunCallbackThreadWrapper (void *run_binding) |
Private Types | |
typedef FifoChannel< WorkerJob > | JobQueue |
typedef FifoChannel< CallbackJob > | CallbackQueue |
Private Attributes | |
const size_t | number_of_workers_ |
number of concurrent worker threads More... | |
const worker_context_t * | worker_context_ |
the WorkerT defined context More... | |
WorkerRunBinding | thread_context_ |
bool | initialized_ |
bool | running_ |
unsigned int | workers_started_ |
pthread_mutex_t | status_mutex_ |
pthread_cond_t | worker_started_ |
pthread_mutex_t | jobs_all_done_mutex_ |
pthread_cond_t | jobs_all_done_ |
WorkerThreads | worker_threads_ |
list of worker threads More... | |
pthread_t | callback_thread_ |
handles callback invokes More... | |
JobQueue | jobs_queue_ |
atomic_int32 | jobs_pending_ |
atomic_int32 | jobs_failed_ |
atomic_int64 | jobs_processed_ |
CallbackQueue | results_queue_ |
Additional Inherited Members | |
Static Public Member Functions inherited from Callbackable< WorkerT::returned_data > | |
static CallbackTN * | MakeClosure (typename BoundClosure< WorkerT::returned_data, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, const ClosureDataT &closure_data) |
static CallbackTN * | MakeCallback (typename BoundCallback< WorkerT::returned_data, DelegateT >::CallbackMethod method, DelegateT *delegate) |
static CallbackTN * | MakeCallback (typename Callback< WorkerT::returned_data >::CallbackFunction function) |
This template implements a generic producer/consumer approach to concurrent worker tasks. It spawns a given number of Workers derived from the base class ConcurrentWorker and uses them to distribute the work load onto concurrent threads. One can have multiple producers, that use Schedule() to post new work into a central job queue, which in turn is processed concurrently by the Worker objects in multiple threads. Furthermore the template provides an interface to control the worker swarm, i.e. to wait for their completion or cancel them before all jobs are processed.
Note: A worker is a class inheriting from ConcurrentWorker that needs to meet a couple of requirements. See the documentation of ConcurrentWorker for additional details.
WorkerT | the class to be used as a worker for a concurrent worker swarm |
Definition at line 377 of file concurrency.h.
|
protected |
Definition at line 413 of file concurrency.h.
|
private |
Definition at line 620 of file concurrency.h.
typedef WorkerT::expected_data ConcurrentWorkers< WorkerT >::expected_data_t |
Input data type
Definition at line 383 of file concurrency.h.
|
private |
Definition at line 613 of file concurrency.h.
typedef WorkerT::returned_data ConcurrentWorkers< WorkerT >::returned_data_t |
Output data type
Definition at line 387 of file concurrency.h.
typedef WorkerT::worker_context ConcurrentWorkers< WorkerT >::worker_context_t |
Common context type
Definition at line 391 of file concurrency.h.
|
protected |
Definition at line 412 of file concurrency.h.
|
protected |
Definition at line 394 of file concurrency.h.
ConcurrentWorkers< WorkerT >::ConcurrentWorkers | ( | const size_t | number_of_workers, |
const size_t | maximal_queue_length, | ||
worker_context_t * | worker_context = NULL |
||
) |
Creates a ConcurrentWorkers master object that encapsulates the actual workers.
number_of_workers | the number of concurrent workers to be spawned |
maximal_queue_length | the maximal length of the job queue (>= number_of_workers) |
worker_context | a pointer to the WorkerT defined context object |
Definition at line 287 of file concurrency_impl.h.
|
virtual |
Definition at line 310 of file concurrency_impl.h.
|
inlineprotected |
Retrieves a job from the job queue. If the job queue is empty it will block until there is a new job available for processing. THIS METHOD MUST ONLY BE CALLED INSIDE THE WORKER OBJECTS
Definition at line 552 of file concurrency_impl.h.
Referenced by ConcurrentWorkers< WorkerT >::RunWorker().
|
inline |
Definition at line 500 of file concurrency.h.
Referenced by swissknife::CommandMigrate::DoMigrationAndCommit().
|
inline |
Definition at line 499 of file concurrency.h.
bool ConcurrentWorkers< WorkerT >::Initialize | ( | ) |
Initializes the ConcurrentWorkers swarm, spawnes a thread for each new worker object and puts everything into working state.
Definition at line 324 of file concurrency_impl.h.
Referenced by swissknife::CommandMigrate::DoMigrationAndCommit().
|
inlineprotected |
Definition at line 585 of file concurrency.h.
Referenced by ConcurrentWorkers< WorkerT >::RunWorker().
|
protected |
Controls the asynchronous finishing of a job. DO NOT CALL THIS, use JobSuccessful() or JobFailed() wrappers instead.
data | the data to be returned to the user |
success | flag if job was successful |
Definition at line 649 of file concurrency_impl.h.
|
inline |
Defines a job as failed. DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT!
Note: Even for failed jobs the user will get a callback with a data object. You might want to make sure, that this data contains a status flag as well, telling the user what went wrong.
data | the data to be returned back to the user |
Definition at line 524 of file concurrency.h.
Referenced by swissknife::CommandMigrate::AbstractMigrationWorker< DerivedT >::operator()().
|
inline |
Defines a job as successfully finished. DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT!
data | the data to be returned back to the user |
Definition at line 510 of file concurrency.h.
Referenced by swissknife::CommandMigrate::AbstractMigrationWorker< DerivedT >::operator()().
|
protected |
Tells the master that a worker thread did start. This does not mean, that it was initialized successfully.
Definition at line 506 of file concurrency_impl.h.
Referenced by ConcurrentWorkers< WorkerT >::RunCallbackThreadWrapper(), and ConcurrentWorkers< WorkerT >::RunWorker().
void ConcurrentWorkers< WorkerT >::RunCallbackThread | ( | ) |
Definition at line 481 of file concurrency_impl.h.
Referenced by ConcurrentWorkers< WorkerT >::RunCallbackThreadWrapper().
|
staticprotected |
|
staticprotected |
POSIX conform function for thread entry point. Is invoked for every new worker thread and contains the initialization, processing loop and tear down of the unique worker objects
run_binding | void pointer to a RunBinding structure (C interface) |
Definition at line 406 of file concurrency_impl.h.
|
inline |
Schedules a new job for processing into the internal job queue. This method will block in case the job queue is already full and wait for an empty slot.
data | the data to be processed |
Definition at line 469 of file concurrency.h.
|
protected |
|
protected |
|
protected |
|
inlineprotected |
Definition at line 577 of file concurrency.h.
|
inlineprotected |
Definition at line 581 of file concurrency.h.
void ConcurrentWorkers< WorkerT >::Terminate | ( | ) |
Shuts down the ConcurrentWorkers object as well as the encapsulated workers as soon as possible. Workers will finish their current job and will termi- nate afterwards. If no jobs are scheduled they will simply stop waiting for new ones and terminate afterwards. This method MUST not be called more than once per ConcurrentWorkers.
Definition at line 574 of file concurrency_impl.h.
|
protected |
Empties the job queue
forget_pending | controls if cancelled jobs should be seen as finished |
Definition at line 561 of file concurrency_impl.h.
void ConcurrentWorkers< WorkerT >::WaitForEmptyQueue | ( | ) | const |
Waits until the job queue is fully processed
Note: this might lead to undefined behaviour or infinite waiting if other producers still schedule jobs into the job queue.
Definition at line 622 of file concurrency_impl.h.
Referenced by swissknife::CommandMigrate::DoMigrationAndCommit().
void ConcurrentWorkers< WorkerT >::WaitForTermination | ( | ) |
Waits until the ConcurrentWorkers swarm fully processed the current job queue and shuts down afterwards.
Note: just as for WaitForEmptyQueue() this assumes that no other producers schedule jobs in the mean time.
Definition at line 639 of file concurrency_impl.h.
|
private |
handles callback invokes
Definition at line 610 of file concurrency.h.
|
private |
Definition at line 600 of file concurrency.h.
|
mutableprivate |
Definition at line 606 of file concurrency.h.
|
mutableprivate |
Definition at line 605 of file concurrency.h.
|
mutableprivate |
Definition at line 616 of file concurrency.h.
Referenced by ConcurrentWorkers< WorkerT >::ConcurrentWorkers().
|
mutableprivate |
Definition at line 615 of file concurrency.h.
Referenced by ConcurrentWorkers< WorkerT >::ConcurrentWorkers().
|
mutableprivate |
Definition at line 617 of file concurrency.h.
Referenced by ConcurrentWorkers< WorkerT >::ConcurrentWorkers().
|
private |
Definition at line 614 of file concurrency.h.
|
private |
number of concurrent worker threads
Definition at line 592 of file concurrency.h.
|
private |
Definition at line 621 of file concurrency.h.
|
private |
Definition at line 601 of file concurrency.h.
|
mutableprivate |
Definition at line 603 of file concurrency.h.
|
private |
The thread context passed to newly spawned threads
Definition at line 597 of file concurrency.h.
|
private |
the WorkerT defined context
Definition at line 593 of file concurrency.h.
|
mutableprivate |
Definition at line 604 of file concurrency.h.
|
private |
list of worker threads
Definition at line 609 of file concurrency.h.
|
mutableprivate |
Definition at line 602 of file concurrency.h.