GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/util/concurrency.h
Date: 2025-07-13 02:35:07
Exec Total Coverage
Lines: 85 115 73.9%
Branches: 21 38 55.3%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_UTIL_CONCURRENCY_H_
6 #define CVMFS_UTIL_CONCURRENCY_H_
7
8 #include <pthread.h>
9
10 #include <cassert>
11 #include <cstddef>
12 #include <queue>
13 #include <set>
14 #include <vector>
15
16 #include "util/async.h"
17 #include "util/atomic.h"
18 #include "util/export.h"
19 #include "util/mutex.h"
20 #include "util/single_copy.h"
21
22 #ifdef CVMFS_NAMESPACE_GUARD
23 namespace CVMFS_NAMESPACE_GUARD {
24 #endif
25
26 /**
27 * A thread-safe, unbounded vector of items that implement a FIFO channel.
28 * Uses conditional variables to block when threads try to pop from the empty
29 * channel.
30 */
31 template<class ItemT>
32 class Channel : SingleCopy {
33 public:
34 71 Channel() {
35 71 int retval = pthread_mutex_init(&lock_, NULL);
36
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 71 times.
71 assert(retval == 0);
37 71 retval = pthread_cond_init(&cond_populated_, NULL);
38
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 71 times.
71 assert(retval == 0);
39 71 }
40
41 71 ~Channel() {
42 71 pthread_cond_destroy(&cond_populated_);
43 71 pthread_mutex_destroy(&lock_);
44 71 }
45
46 /**
47 * Returns the queue locked and ready for appending 1 item.
48 */
49 460000 std::vector<ItemT *> *StartEnqueueing() {
50 460000 int retval = pthread_mutex_lock(&lock_);
51
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 460000 times.
460000 assert(retval == 0);
52 460000 return &items_;
53 }
54
55 /**
56 * Unlocks the queue. The queue must remain unchanged when this is called.
57 */
58 230000 void AbortEnqueueing() {
59 230000 int retval = pthread_mutex_unlock(&lock_);
60
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 230000 times.
230000 assert(retval == 0);
61 230000 }
62
63 /**
64 * 1 new item was added to the queue. Unlock and signal to reader thread.
65 */
66 230000 void CommitEnqueueing() {
67 230000 int retval = pthread_cond_signal(&cond_populated_);
68
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 230000 times.
230000 assert(retval == 0);
69 230000 retval = pthread_mutex_unlock(&lock_);
70
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 230000 times.
230000 assert(retval == 0);
71 230000 }
72
73 45 void PushBack(ItemT *item) {
74 45 MutexLockGuard lock_guard(&lock_);
75
1/2
✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
45 items_.push_back(item);
76 45 int retval = pthread_cond_signal(&cond_populated_);
77
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 45 times.
45 assert(retval == 0);
78 45 }
79
80 /**
81 * Remove and return the first element from the queue. Block if tube is
82 * empty.
83 */
84 230045 ItemT *PopFront() {
85 230045 MutexLockGuard lock_guard(&lock_);
86
2/2
✓ Branch 1 taken 35 times.
✓ Branch 2 taken 230045 times.
230080 while (items_.size() == 0)
87
1/2
✓ Branch 1 taken 35 times.
✗ Branch 2 not taken.
35 pthread_cond_wait(&cond_populated_, &lock_);
88 230045 ItemT *item = items_[0];
89
1/2
✓ Branch 3 taken 230045 times.
✗ Branch 4 not taken.
230045 items_.erase(items_.begin());
90 230045 return item;
91 230045 }
92
93 private:
94 /**
95 * The locked queue/channel
96 */
97 std::vector<ItemT *> items_;
98 /**
99 * Protects all internal state
100 */
101 pthread_mutex_t lock_;
102 /**
103 * Signals if there are items enqueued
104 */
105 pthread_cond_t cond_populated_;
106 };
107
108 /**
109 * Implements a simple interface to lock objects of derived classes. Classes
110 * that inherit from Lockable are also usable with the LockGuard template for
111 * scoped locking semantics.
112 */
113 class CVMFS_EXPORT Lockable : SingleCopy {
114 public:
115 92 inline virtual ~Lockable() { pthread_mutex_destroy(&mutex_); }
116
117 46 void Lock() const { pthread_mutex_lock(&mutex_); }
118 92 int TryLock() const { return pthread_mutex_trylock(&mutex_); }
119 46 void Unlock() const { pthread_mutex_unlock(&mutex_); }
120
121 protected:
122 46 Lockable() {
123 46 const int retval = pthread_mutex_init(&mutex_, NULL);
124
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 46 times.
46 assert(retval == 0);
125 46 }
126
127 private:
128 mutable pthread_mutex_t mutex_;
129 };
130
131
132 //
133 // -----------------------------------------------------------------------------
134 //
135
136
137 /**
138 * This counter can be counted up and down using the usual increment/decrement
139 * operators. It allows threads to wait for it to become zero as well as to
140 * block when a specified maximal value would be exceeded by an increment.
141 *
142 * Note: If a maximal value is specified on creation, the SynchronizingCounter
143 * is assumed to never leave the interval [0, maximal_value]! Otherwise
144 * the numerical limits of the specified template parameter define this
145 * interval and an increment _never_ blocks.
146 *
147 * Caveat: This implementation uses a simple mutex mechanism and therefore might
148 * become a scalability bottle neck!
149 */
150 template<typename T>
151 class SynchronizingCounter : SingleCopy {
152 public:
153 32 SynchronizingCounter() : value_(T(0)), maximal_value_(T(0)) { Initialize(); }
154
155 1983 explicit SynchronizingCounter(const T maximal_value)
156 1983 : value_(T(0)), maximal_value_(maximal_value) {
157
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1983 times.
1983 assert(maximal_value > T(0));
158 1983 Initialize();
159 1983 }
160
161 2014 ~SynchronizingCounter() { Destroy(); }
162
163 287734071 T Increment() {
164 287734071 MutexLockGuard l(mutex_);
165
1/2
✓ Branch 1 taken 292495233 times.
✗ Branch 2 not taken.
292495233 WaitForFreeSlotUnprotected();
166
1/2
✓ Branch 1 taken 3447100 times.
✗ Branch 2 not taken.
292495233 SetValueUnprotected(value_ + T(1));
167 289333006 return value_;
168 292495233 }
169
170 289747857 T Decrement() {
171 289747857 MutexLockGuard l(mutex_);
172
1/2
✓ Branch 1 taken 3447100 times.
✗ Branch 2 not taken.
292495233 SetValueUnprotected(value_ - T(1));
173 579324322 return value_;
174 292495233 }
175
176 2775 void WaitForZero() const {
177 2775 MutexLockGuard l(mutex_);
178
2/2
✓ Branch 0 taken 191 times.
✓ Branch 1 taken 2775 times.
2966 while (value_ != T(0)) {
179
1/2
✓ Branch 1 taken 191 times.
✗ Branch 2 not taken.
191 pthread_cond_wait(&became_zero_, &mutex_);
180 }
181
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2775 times.
2775 assert(value_ == T(0));
182 2775 }
183
184 1771712780 bool HasMaximalValue() const { return maximal_value_ != T(0); }
185 88 T maximal_value() const { return maximal_value_; }
186
187 3441787 T operator++() { return Increment(); }
188 23 const T operator++(int) { return Increment() - T(1); }
189 3444075 T operator--() { return Decrement(); }
190 23 const T operator--(int) { return Decrement() + T(1); }
191
192 13920 T Get() const {
193 13920 MutexLockGuard l(mutex_);
194 27840 return value_;
195 13920 }
196
197 91 SynchronizingCounter<T> &operator=(const T &other) {
198 91 MutexLockGuard l(mutex_);
199 91 SetValueUnprotected(other);
200 182 return *this;
201 91 }
202
203 protected:
204 void SetValueUnprotected(const T new_value);
205 void WaitForFreeSlotUnprotected();
206
207 private:
208 void Initialize();
209 void Destroy();
210
211 private:
212 T value_;
213 const T maximal_value_;
214
215 mutable pthread_mutex_t mutex_;
216 mutable pthread_cond_t became_zero_;
217 pthread_cond_t free_slot_;
218 };
219
220
221 //
222 // -----------------------------------------------------------------------------
223 //
224
225
226 template<typename ParamT>
227 class Observable;
228
229
230 /**
231 * This is a base class for classes that need to expose a callback interface for
232 * asynchronous callback methods. One can register an arbitrary number of
233 * observers on an Observable that get notified when the method
234 * NotifyListeners() is invoked.
235 *
236 * Note: the registration and invocation of callbacks in Observable is thread-
237 * safe, but be aware that the callbacks of observing classes might run in
238 * arbitrary threads. When using these classes, you should take extra care
239 * for thread-safety.
240 *
241 * Note: The RegisterListener() methods return a pointer to a CallbackBase.
242 * You MUST NOT free these objects, they are managed by the Observable
243 * class. Use them only as handles to unregister specific callbacks.
244 *
245 * @param ParamT the type of the parameter that is passed to every callback
246 * invocation.
247 */
248 template<typename ParamT>
249 class Observable : public Callbackable<ParamT>, SingleCopy {
250 public:
251 typedef typename Callbackable<ParamT>::CallbackTN *CallbackPtr;
252
253 protected:
254 typedef std::set<CallbackPtr> Callbacks;
255
256 public:
257 virtual ~Observable();
258
259 /**
260 * Registers a method of a specific object as a listener to the Observable
261 * object. The method is invoked on the given delegate when the callback is
262 * fired by the observed object using NotifyListeners(). Since this is meant
263 * to be a closure, it also passes the third argument to the method being in-
264 * voked by the Observable object.
265 *
266 * @param DelegateT the type of the delegate object
267 * @param method a pointer to the method to be invoked by the callback
268 * @param delegate a pointer to the object to invoke the callback on
269 * @param closure something to be passed to `method`
270 * @return a handle to the registered callback
271 */
272 template<class DelegateT, class ClosureDataT>
273 CallbackPtr RegisterListener(
274 typename BoundClosure<ParamT, DelegateT, ClosureDataT>::CallbackMethod
275 method,
276 DelegateT *delegate,
277 ClosureDataT data);
278
279 /**
280 * Registers a method of a specific object as a listener to the Observable
281 * object. The method is invoked on the given delegate when the callback is
282 * fired by the observed object using NotifyListeners().
283 *
284 * @param DelegateT the type of the delegate object
285 * @param method a pointer to the method to be invoked by the callback
286 * @param delegate a pointer to the object to invoke the callback on
287 * @return a handle to the registered callback
288 */
289 template<class DelegateT>
290 CallbackPtr RegisterListener(
291 typename BoundCallback<ParamT, DelegateT>::CallbackMethod method,
292 DelegateT *delegate);
293
294 /**
295 * Registers a static class member or a C-like function as a callback to the
296 * Observable object. The function is invoked when the callback is fired by
297 * the observed object using NotifyListeners().
298 *
299 * @param fn a pointer to the function to be called by the callback
300 * @return a handle to the registered callback
301 */
302 CallbackPtr RegisterListener(typename Callback<ParamT>::CallbackFunction fn);
303
304 /**
305 * Removes the given callback from the listeners group of this Observable.
306 *
307 * @param callback_object a callback handle that was returned by
308 * RegisterListener() before.
309 */
310 void UnregisterListener(CallbackPtr callback_object);
311
312 /**
313 * Removes all listeners from the Observable
314 */
315 void UnregisterListeners();
316
317 protected:
318 Observable(); // don't instantiate this as a stand alone object
319
320 void RegisterListener(CallbackPtr callback_object);
321
322 /**
323 * Notifies all registered listeners and passes them the provided argument
324 * This method should be called by a derived class to send out asynchronous
325 * messages to registered observers.
326 *
327 * @param parameter the data to be passed to the observers
328 */
329 void NotifyListeners(const ParamT &parameter);
330
331 private:
332 Callbacks listeners_; //!< the set of registered
333 //!< callback objects
334 mutable pthread_rwlock_t listeners_rw_lock_;
335 };
336
337
338 //
339 // -----------------------------------------------------------------------------
340 //
341
342
343 /**
344 * Returns the number of CPU cores present in the system or a fallback number
345 * if it failed to determine the number of CPU cores.
346 *
347 * @return the number of active CPU cores in the system
348 */
349 CVMFS_EXPORT unsigned int GetNumberOfCpuCores();
350 static const unsigned int kFallbackNumberOfCpus = 1;
351
352
353 /**
354 * A blocking signal for thread synchronization
355 */
356 class CVMFS_EXPORT Signal : SingleCopy {
357 public:
358 Signal();
359 ~Signal();
360 void Wakeup();
361 void Wait();
362 bool IsSleeping();
363
364 private:
365 bool fired_;
366 pthread_mutex_t lock_;
367 pthread_cond_t signal_;
368 };
369
370
371 //
372 // -----------------------------------------------------------------------------
373 //
374
375
376 /**
377 * Asynchronous FIFO channel template
378 * Implements a thread safe FIFO queue that handles thread blocking if the queue
379 * is full or empty.
380 *
381 * @param T the data type to be enqueued in the queue
382 */
383 template<class T>
384 class FifoChannel : protected std::queue<T> {
385 public:
386 /**
387 * Creates a new FIFO channel.
388 *
389 * @param maximal_length the maximal number of items that can be enqueued
390 * @param drainout_threshold if less than xx elements are in the queue it is
391 * considered to be "not full"
392 */
393 FifoChannel(const size_t maximal_length, const size_t drainout_threshold);
394 virtual ~FifoChannel();
395
396 /**
397 * Adds a new item to the end of the FIFO channel. If the queue is full, this
398 * call will block until items were dequeued by another thread allowing the
399 * desired insertion.
400 *
401 * @param data the data to be enqueued into the FIFO channel
402 */
403 void Enqueue(const T &data);
404
405 /**
406 * Removes the next element from the channel. If the queue is empty, this will
407 * block until another thread enqueues an item into the channel.
408 *
409 * @return the first item in the channel queue
410 */
411 const T Dequeue();
412
413 /**
414 * Clears all items in the FIFO channel. The cleared items will be lost.
415 *
416 * @return the number of dropped items
417 */
418 unsigned int Drop();
419
420 inline size_t GetItemCount() const;
421 inline bool IsEmpty() const;
422 inline size_t GetMaximalItemCount() const;
423
424 private:
425 // general configuration
426 const size_t maximal_queue_length_;
427 const size_t queue_drainout_threshold_;
428
429 // thread synchronisation structures
430 mutable pthread_mutex_t mutex_;
431 mutable pthread_cond_t queue_is_not_empty_;
432 mutable pthread_cond_t queue_is_not_full_;
433 };
434
435
436 /**
437 * This template implements a generic producer/consumer approach to concurrent
438 * worker tasks. It spawns a given number of Workers derived from the base class
439 * ConcurrentWorker and uses them to distribute the work load onto concurrent
440 * threads.
441 * One can have multiple producers, that use Schedule() to post new work into
442 * a central job queue, which in turn is processed concurrently by the Worker
443 * objects in multiple threads. Furthermore the template provides an interface
444 * to control the worker swarm, i.e. to wait for their completion or cancel them
445 * before all jobs are processed.
446 *
447 * Note: A worker is a class inheriting from ConcurrentWorker that needs to meet
448 * a couple of requirements. See the documentation of ConcurrentWorker for
449 * additional details.
450 *
451 * @param WorkerT the class to be used as a worker for a concurrent worker
452 * swarm
453 */
454 template<class WorkerT>
455 class ConcurrentWorkers : public Observable<typename WorkerT::returned_data> {
456 public:
457 // these data types must be defined by the worker class
458 /**
459 * Input data type
460 */
461 typedef typename WorkerT::expected_data expected_data_t;
462 /**
463 * Output data type
464 */
465 typedef typename WorkerT::returned_data returned_data_t;
466 /**
467 * Common context type
468 */
469 typedef typename WorkerT::worker_context worker_context_t;
470
471 protected:
472 typedef std::vector<pthread_t> WorkerThreads;
473
474 /**
475 * This is a simple wrapper structure to piggy-back control information on
476 * scheduled jobs. Job structures are scheduled into a central FIFO queue and
477 * are then processed concurrently by the workers.
478 */
479 template<class DataT>
480 struct Job {
481 explicit Job(const DataT &data) : data(data), is_death_sentence(false) { }
482 Job() : data(), is_death_sentence(true) { }
483 const DataT data; //!< job payload
484 const bool is_death_sentence; //!< death sentence flag
485 };
486 typedef Job<expected_data_t> WorkerJob;
487 typedef Job<returned_data_t> CallbackJob;
488
489 /**
490 * Provides a wrapper for initialization data passed to newly spawned worker
491 * threads for initialization.
492 * It contains a pointer to the spawning ConcurrentWorkers master object as
493 * well as a pointer to a context object defined by the concrete worker to be
494 * spawned.
495 */
496 struct RunBinding {
497 explicit RunBinding(ConcurrentWorkers<WorkerT> *delegate)
498 : delegate(delegate) { }
499 ConcurrentWorkers<WorkerT> *delegate; //!< delegate to the Concurrent-
500 //!< Workers master
501 };
502
503 struct WorkerRunBinding : RunBinding {
504 WorkerRunBinding(ConcurrentWorkers<WorkerT> *delegate,
505 const worker_context_t *worker_context)
506 : RunBinding(delegate), worker_context(worker_context) { }
507 /**
508 * WorkerT defined context objects for worker init.
509 */
510 const worker_context_t *worker_context;
511 };
512
513 public:
514 /**
515 * Creates a ConcurrentWorkers master object that encapsulates the actual
516 * workers.
517 *
518 * @param number_of_workers the number of concurrent workers to be spawned
519 * @param maximal_queue_length the maximal length of the job queue
520 * (>= number_of_workers)
521 * @param worker_context a pointer to the WorkerT defined context
522 * object
523 */
524 ConcurrentWorkers(const size_t number_of_workers,
525 const size_t maximal_queue_length,
526 worker_context_t *worker_context = NULL);
527 virtual ~ConcurrentWorkers();
528
529 /**
530 * Initializes the ConcurrentWorkers swarm, spawnes a thread for each new
531 * worker object and puts everything into working state.
532 *
533 * @return true if all went fine
534 */
535 bool Initialize();
536
537 /**
538 * Schedules a new job for processing into the internal job queue. This method
539 * will block in case the job queue is already full and wait for an empty
540 * slot.
541 *
542 * @param data the data to be processed
543 */
544 inline void Schedule(const expected_data_t &data) {
545 Schedule(WorkerJob(data));
546 }
547
548 /**
549 * Shuts down the ConcurrentWorkers object as well as the encapsulated workers
550 * as soon as possible. Workers will finish their current job and will termi-
551 * nate afterwards. If no jobs are scheduled they will simply stop waiting for
552 * new ones and terminate afterwards.
553 * This method MUST not be called more than once per ConcurrentWorkers.
554 */
555 void Terminate();
556
557 /**
558 * Waits until the job queue is fully processed
559 *
560 * Note: this might lead to undefined behaviour or infinite waiting if other
561 * producers still schedule jobs into the job queue.
562 */
563 void WaitForEmptyQueue() const;
564
565 /**
566 * Waits until the ConcurrentWorkers swarm fully processed the current job
567 * queue and shuts down afterwards.
568 *
569 * Note: just as for WaitForEmptyQueue() this assumes that no other producers
570 * schedule jobs in the mean time.
571 */
572 void WaitForTermination();
573
574 inline unsigned int GetNumberOfWorkers() const { return number_of_workers_; }
575 inline unsigned int GetNumberOfFailedJobs() const {
576 return atomic_read32(&jobs_failed_);
577 }
578
579 /**
580 * Defines a job as successfully finished.
581 * DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT!
582 *
583 * @param data the data to be returned back to the user
584 */
585 inline void JobSuccessful(const returned_data_t &data) {
586 JobDone(data, true);
587 }
588
589 /**
590 * Defines a job as failed.
591 * DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT!
592 *
593 * Note: Even for failed jobs the user will get a callback with a data object.
594 * You might want to make sure, that this data contains a status flag as
595 * well, telling the user what went wrong.
596 *
597 * @param data the data to be returned back to the user
598 */
599 inline void JobFailed(const returned_data_t &data) { JobDone(data, false); }
600
601 void RunCallbackThread();
602
603 protected:
604 bool SpawnWorkers();
605
606 /**
607 * POSIX conform function for thread entry point. Is invoked for every new
608 * worker thread and contains the initialization, processing loop and tear
609 * down of the unique worker objects
610 *
611 * @param run_binding void pointer to a RunBinding structure (C interface)
612 * @return NULL in any case
613 */
614 static void *RunWorker(void *run_binding);
615
616 static void *RunCallbackThreadWrapper(void *run_binding);
617
618 /**
619 * Tells the master that a worker thread did start. This does not mean, that
620 * it was initialized successfully.
621 */
622 void ReportStartedWorker() const;
623
624 void Schedule(WorkerJob job);
625 void ScheduleDeathSentences();
626
627 /**
628 * Empties the job queue
629 *
630 * @param forget_pending controls if cancelled jobs should be seen as
631 * finished
632 */
633 void TruncateJobQueue(const bool forget_pending = false);
634
635 /**
636 * Retrieves a job from the job queue. If the job queue is empty it will block
637 * until there is a new job available for processing.
638 * THIS METHOD MUST ONLY BE CALLED INSIDE THE WORKER OBJECTS
639 *
640 * @return a job to be processed by a worker
641 */
642 inline WorkerJob Acquire();
643
644 /**
645 * Controls the asynchronous finishing of a job.
646 * DO NOT CALL THIS, use JobSuccessful() or JobFailed() wrappers instead.
647 *
648 * @param data the data to be returned to the user
649 * @param success flag if job was successful
650 */
651 void JobDone(const returned_data_t &data, const bool success = true);
652
653 inline void StartRunning() {
654 MutexLockGuard guard(status_mutex_);
655 running_ = true;
656 }
657 inline void StopRunning() {
658 MutexLockGuard guard(status_mutex_);
659 running_ = false;
660 }
661 inline bool IsRunning() const {
662 MutexLockGuard guard(status_mutex_);
663 return running_;
664 }
665
666 private:
667 // general configuration
668 const size_t number_of_workers_; //!< number of concurrent worker threads
669 const worker_context_t *worker_context_; //!< the WorkerT defined context
670 /**
671 * The thread context passed to newly spawned threads
672 */
673 WorkerRunBinding thread_context_;
674
675 // status information
676 bool initialized_;
677 bool running_;
678 mutable unsigned int workers_started_;
679 mutable pthread_mutex_t status_mutex_;
680 mutable pthread_cond_t worker_started_;
681 mutable pthread_mutex_t jobs_all_done_mutex_;
682 mutable pthread_cond_t jobs_all_done_;
683
684 // worker threads
685 WorkerThreads worker_threads_; //!< list of worker threads
686 pthread_t callback_thread_; //!< handles callback invokes
687
688 // job queue
689 typedef FifoChannel<WorkerJob> JobQueue;
690 JobQueue jobs_queue_;
691 mutable atomic_int32 jobs_pending_;
692 mutable atomic_int32 jobs_failed_;
693 mutable atomic_int64 jobs_processed_;
694
695 // callback channel
696 typedef FifoChannel<CallbackJob> CallbackQueue;
697 CallbackQueue results_queue_;
698 };
699
700
701 /**
702 * Base class for worker classes that should be used in a ConcurrentWorkers
703 * swarm. These classes need to fulfill a number of requirements in order to
704 * satisfy the needs of the ConcurrentWorkers template.
705 *
706 * Requirements:
707 * -> needs to define the following types:
708 * - expected_data - input data structure of the worker
709 * - returned_data - output data structure of the worker
710 * - worker_context - context structure for initialization information
711 *
712 * -> implement a constructor that takes a pointer to its worker_context
713 * as its only parameter:
714 * AwesomeWorker(const AwesomeWorker::worker_context*)
715 * Note: do not rely on the context object to be available after the
716 * constructor has returned!
717 *
718 * -> needs to define the calling-operator expecting one parameter of type:
719 * const expected_data& and returning void
720 * This will be invoked for every new job the worker should process
721 *
722 * -> inside the implementation of the described calling-operator it needs to
723 * invoke either:
724 * master()->JobSuccessful(const returned_data&)
725 * or:
726 * master()->JobFailed(const returned_data&)
727 * as its LAST operation before returning.
728 * This will keep track of finished jobs and inform the user of Concurrent-
729 * Workers about finished jobs.
730 *
731 * -> [optional] overwrite Initialize() and/or TearDown() to do environmental
732 * setup work, before or respectively after jobs will be processed
733 *
734 * General Reminder:
735 * You will be running in a multi-threaded environment here! Buckle up and
736 * make suitable preparations to shield yourself from serious head-ache.
737 *
738 * Note: This implements a Curiously Recurring Template Pattern
739 * (http://en.wikipedia.org/wiki/Curiously_recurring_template_pattern)
740 *
741 * @param DerivedWorkerT the class name of the inheriting class
742 * (f.e. class AwesomeWorker : public ConcurrentWorker<AwesomeWorker>)
743 */
744 template<class DerivedWorkerT>
745 class ConcurrentWorker : SingleCopy {
746 public:
747 virtual ~ConcurrentWorker() { }
748
749 /**
750 * Does general initialization before any jobs will get scheduled. You do not
751 * need to up-call this initialize method, since it is seen as a dummy here.
752 *
753 * @returns true one successful initialization
754 */
755 virtual bool Initialize() { return true; }
756
757 /**
758 * Does general clean-up after the last job was processed in the worker object
759 * and it is about to vanish. You do not need to up-call this method.
760 */
761 virtual void TearDown() { }
762
763 /**
764 * The actual job-processing entry point. See the description of the
765 * inheriting class requirements to learn about the semantics of this methods.
766 * DO NOT FORGET TO CALL master()->JobSuccessful() OR master()->JobFinished()
767 * at the end of thismethod!!
768 *
769 * Note: There is no way to generally define this operator, it is therefore
770 * commented out and placed here just as a reminder.
771 *
772 * @param data the data to be processed.
773 */
774 // void operator()(const expected_data &data); // do the actual job of the
775 // worker
776
777 protected:
778 ConcurrentWorker() : master_(NULL) { }
779
780 /**
781 * Gets a pointer to the ConcurrentWorkers object that this worker resides in
782 *
783 * @returns a pointer to the ConcurrentWorkers object
784 */
785 inline ConcurrentWorkers<DerivedWorkerT> *master() const { return master_; }
786
787 private:
788 friend class ConcurrentWorkers<DerivedWorkerT>;
789 void RegisterMaster(ConcurrentWorkers<DerivedWorkerT> *master) {
790 master_ = master;
791 }
792
793 private:
794 ConcurrentWorkers<DerivedWorkerT> *master_;
795 };
796
797 #ifdef CVMFS_NAMESPACE_GUARD
798 } // namespace CVMFS_NAMESPACE_GUARD
799 #endif
800
801 #include "util/concurrency_impl.h"
802
803 #endif // CVMFS_UTIL_CONCURRENCY_H_
804