GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/util/concurrency.h
Date: 2025-04-20 02:34:28
Exec Total Coverage
Lines: 87 121 71.9%
Branches: 21 38 55.3%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_UTIL_CONCURRENCY_H_
6 #define CVMFS_UTIL_CONCURRENCY_H_
7
8 #include <pthread.h>
9
10 #include <cassert>
11 #include <queue>
12 #include <set>
13 #include <vector>
14
15 #include "util/async.h"
16 #include "util/atomic.h"
17 #include "util/export.h"
18 #include "util/mutex.h"
19 #include "util/single_copy.h"
20
21 #ifdef CVMFS_NAMESPACE_GUARD
22 namespace CVMFS_NAMESPACE_GUARD {
23 #endif
24
25 /**
26 * A thread-safe, unbounded vector of items that implement a FIFO channel.
27 * Uses conditional variables to block when threads try to pop from the empty
28 * channel.
29 */
30 template <class ItemT>
31 class Channel : SingleCopy {
32 public:
33 6 Channel() {
34 6 int retval = pthread_mutex_init(&lock_, NULL);
35
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 assert(retval == 0);
36 6 retval = pthread_cond_init(&cond_populated_, NULL);
37
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 assert(retval == 0);
38 6 }
39
40 6 ~Channel() {
41 6 pthread_cond_destroy(&cond_populated_);
42 6 pthread_mutex_destroy(&lock_);
43 6 }
44
45 /**
46 * Returns the queue locked and ready for appending 1 item.
47 */
48 10000 std::vector<ItemT *> *StartEnqueueing() {
49 10000 int retval = pthread_mutex_lock(&lock_);
50
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 10000 times.
10000 assert(retval == 0);
51 10000 return &items_;
52 }
53
54 /**
55 * Unlocks the queue. The queue must remain unchanged when this is called.
56 */
57 5000 void AbortEnqueueing() {
58 5000 int retval = pthread_mutex_unlock(&lock_);
59
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5000 times.
5000 assert(retval == 0);
60 5000 }
61
62 /**
63 * 1 new item was added to the queue. Unlock and signal to reader thread.
64 */
65 5000 void CommitEnqueueing() {
66 5000 int retval = pthread_cond_signal(&cond_populated_);
67
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5000 times.
5000 assert(retval == 0);
68 5000 retval = pthread_mutex_unlock(&lock_);
69
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5000 times.
5000 assert(retval == 0);
70 5000 }
71
72 9 void PushBack(ItemT *item) {
73 9 MutexLockGuard lock_guard(&lock_);
74
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
9 items_.push_back(item);
75 9 int retval = pthread_cond_signal(&cond_populated_);
76
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
9 assert(retval == 0);
77 9 }
78
79 /**
80 * Remove and return the first element from the queue. Block if tube is
81 * empty.
82 */
83 5009 ItemT *PopFront() {
84 5009 MutexLockGuard lock_guard(&lock_);
85
2/2
✓ Branch 1 taken 7 times.
✓ Branch 2 taken 5009 times.
5016 while (items_.size() == 0)
86
1/2
✓ Branch 1 taken 7 times.
✗ Branch 2 not taken.
7 pthread_cond_wait(&cond_populated_, &lock_);
87 5009 ItemT *item = items_[0];
88
1/2
✓ Branch 3 taken 5009 times.
✗ Branch 4 not taken.
5009 items_.erase(items_.begin());
89 5009 return item;
90 5009 }
91
92 private:
93 /**
94 * The locked queue/channel
95 */
96 std::vector<ItemT *> items_;
97 /**
98 * Protects all internal state
99 */
100 pthread_mutex_t lock_;
101 /**
102 * Signals if there are items enqueued
103 */
104 pthread_cond_t cond_populated_;
105 };
106
107 /**
108 * Implements a simple interface to lock objects of derived classes. Classes that
109 * inherit from Lockable are also usable with the LockGuard template for scoped
110 * locking semantics.
111 */
112 class CVMFS_EXPORT Lockable : SingleCopy {
113 public:
114 2 inline virtual ~Lockable() { pthread_mutex_destroy(&mutex_); }
115
116 1 void Lock() const { pthread_mutex_lock(&mutex_); }
117 2 int TryLock() const { return pthread_mutex_trylock(&mutex_); }
118 1 void Unlock() const { pthread_mutex_unlock(&mutex_); }
119
120 protected:
121 1 Lockable() {
122 1 const int retval = pthread_mutex_init(&mutex_, NULL);
123
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 assert(retval == 0);
124 1 }
125
126 private:
127 mutable pthread_mutex_t mutex_;
128 };
129
130
131 //
132 // -----------------------------------------------------------------------------
133 //
134
135
136 /**
137 * This counter can be counted up and down using the usual increment/decrement
138 * operators. It allows threads to wait for it to become zero as well as to
139 * block when a specified maximal value would be exceeded by an increment.
140 *
141 * Note: If a maximal value is specified on creation, the SynchronizingCounter
142 * is assumed to never leave the interval [0, maximal_value]! Otherwise
143 * the numerical limits of the specified template parameter define this
144 * interval and an increment _never_ blocks.
145 *
146 * Caveat: This implementation uses a simple mutex mechanism and therefore might
147 * become a scalability bottle neck!
148 */
149 template <typename T>
150 class SynchronizingCounter : SingleCopy {
151 public:
152 8 SynchronizingCounter() :
153 8 value_(T(0)), maximal_value_(T(0)) { Initialize(); }
154
155 133 explicit SynchronizingCounter(const T maximal_value)
156 133 : value_(T(0))
157 133 , maximal_value_(maximal_value)
158 {
159
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 133 times.
133 assert(maximal_value > T(0));
160 133 Initialize();
161 133 }
162
163 141 ~SynchronizingCounter() { Destroy(); }
164
165 71947402 T Increment() {
166 71947402 MutexLockGuard l(mutex_);
167
1/2
✓ Branch 1 taken 72935256 times.
✗ Branch 2 not taken.
72935256 WaitForFreeSlotUnprotected();
168
1/2
✓ Branch 1 taken 673249 times.
✗ Branch 2 not taken.
72935256 SetValueUnprotected(value_ + T(1));
169 72087140 return value_;
170 72935256 }
171
172 72222515 T Decrement() {
173 72222515 MutexLockGuard l(mutex_);
174
1/2
✓ Branch 1 taken 673249 times.
✗ Branch 2 not taken.
72935256 SetValueUnprotected(value_ - T(1));
175 144534228 return value_;
176 72935256 }
177
178 149 void WaitForZero() const {
179 149 MutexLockGuard l(mutex_);
180
2/2
✓ Branch 0 taken 43 times.
✓ Branch 1 taken 151 times.
194 while (value_ != T(0)) {
181
1/2
✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
43 pthread_cond_wait(&became_zero_, &mutex_);
182 }
183
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 151 times.
151 assert(value_ == T(0));
184 151 }
185
186 440621653 bool HasMaximalValue() const { return maximal_value_ != T(0); }
187 8 T maximal_value() const { return maximal_value_; }
188
189 672143 T operator++() { return Increment(); }
190 4 const T operator++(int) { return Increment() - T(1); }
191 672644 T operator--() { return Decrement(); }
192 4 const T operator--(int) { return Decrement() + T(1); }
193
194 2765 T Get() const {
195 2765 MutexLockGuard l(mutex_);
196 5530 return value_;
197 2765 }
198
199 14 SynchronizingCounter<T>& operator=(const T &other) {
200 14 MutexLockGuard l(mutex_);
201 14 SetValueUnprotected(other);
202 28 return *this;
203 14 }
204
205 protected:
206 void SetValueUnprotected(const T new_value);
207 void WaitForFreeSlotUnprotected();
208
209 private:
210 void Initialize();
211 void Destroy();
212
213 private:
214 T value_;
215 const T maximal_value_;
216
217 mutable pthread_mutex_t mutex_;
218 mutable pthread_cond_t became_zero_;
219 pthread_cond_t free_slot_;
220 };
221
222
223 //
224 // -----------------------------------------------------------------------------
225 //
226
227
228 template <typename ParamT>
229 class Observable;
230
231
232 /**
233 * This is a base class for classes that need to expose a callback interface for
234 * asynchronous callback methods. One can register an arbitrary number of
235 * observers on an Observable that get notified when the method NotifyListeners()
236 * is invoked.
237 *
238 * Note: the registration and invocation of callbacks in Observable is thread-
239 * safe, but be aware that the callbacks of observing classes might run in
240 * arbitrary threads. When using these classes, you should take extra care
241 * for thread-safety.
242 *
243 * Note: The RegisterListener() methods return a pointer to a CallbackBase.
244 * You MUST NOT free these objects, they are managed by the Observable
245 * class. Use them only as handles to unregister specific callbacks.
246 *
247 * @param ParamT the type of the parameter that is passed to every callback
248 * invocation.
249 */
250 template <typename ParamT>
251 class Observable : public Callbackable<ParamT>,
252 SingleCopy {
253 public:
254 typedef typename Callbackable<ParamT>::CallbackTN* CallbackPtr;
255 protected:
256 typedef std::set<CallbackPtr> Callbacks;
257
258 public:
259 virtual ~Observable();
260
261 /**
262 * Registers a method of a specific object as a listener to the Observable
263 * object. The method is invoked on the given delegate when the callback is
264 * fired by the observed object using NotifyListeners(). Since this is meant
265 * to be a closure, it also passes the third argument to the method being in-
266 * voked by the Observable object.
267 *
268 * @param DelegateT the type of the delegate object
269 * @param method a pointer to the method to be invoked by the callback
270 * @param delegate a pointer to the object to invoke the callback on
271 * @param closure something to be passed to `method`
272 * @return a handle to the registered callback
273 */
274 template <class DelegateT, class ClosureDataT>
275 CallbackPtr RegisterListener(
276 typename BoundClosure<ParamT,
277 DelegateT,
278 ClosureDataT>::CallbackMethod method,
279 DelegateT *delegate,
280 ClosureDataT data);
281
282 /**
283 * Registers a method of a specific object as a listener to the Observable
284 * object. The method is invoked on the given delegate when the callback is
285 * fired by the observed object using NotifyListeners().
286 *
287 * @param DelegateT the type of the delegate object
288 * @param method a pointer to the method to be invoked by the callback
289 * @param delegate a pointer to the object to invoke the callback on
290 * @return a handle to the registered callback
291 */
292 template <class DelegateT>
293 CallbackPtr RegisterListener(
294 typename BoundCallback<ParamT, DelegateT>::CallbackMethod method,
295 DelegateT *delegate);
296
297 /**
298 * Registers a static class member or a C-like function as a callback to the
299 * Observable object. The function is invoked when the callback is fired by
300 * the observed object using NotifyListeners().
301 *
302 * @param fn a pointer to the function to be called by the callback
303 * @return a handle to the registered callback
304 */
305 CallbackPtr RegisterListener(typename Callback<ParamT>::CallbackFunction fn);
306
307 /**
308 * Removes the given callback from the listeners group of this Observable.
309 *
310 * @param callback_object a callback handle that was returned by
311 * RegisterListener() before.
312 */
313 void UnregisterListener(CallbackPtr callback_object);
314
315 /**
316 * Removes all listeners from the Observable
317 */
318 void UnregisterListeners();
319
320 protected:
321 Observable(); // don't instantiate this as a stand alone object
322
323 void RegisterListener(CallbackPtr callback_object);
324
325 /**
326 * Notifies all registered listeners and passes them the provided argument
327 * This method should be called by a derived class to send out asynchronous
328 * messages to registered observers.
329 *
330 * @param parameter the data to be passed to the observers
331 */
332 void NotifyListeners(const ParamT &parameter);
333
334 private:
335 Callbacks listeners_; //!< the set of registered
336 //!< callback objects
337 mutable pthread_rwlock_t listeners_rw_lock_;
338 };
339
340
341 //
342 // -----------------------------------------------------------------------------
343 //
344
345
346 /**
347 * Returns the number of CPU cores present in the system or a fallback number
348 * if it failed to determine the number of CPU cores.
349 *
350 * @return the number of active CPU cores in the system
351 */
352 CVMFS_EXPORT unsigned int GetNumberOfCpuCores();
353 static const unsigned int kFallbackNumberOfCpus = 1;
354
355
356 /**
357 * A blocking signal for thread synchronization
358 */
359 class CVMFS_EXPORT Signal : SingleCopy {
360 public:
361 Signal();
362 ~Signal();
363 void Wakeup();
364 void Wait();
365 bool IsSleeping();
366
367 private:
368 bool fired_;
369 pthread_mutex_t lock_;
370 pthread_cond_t signal_;
371 };
372
373
374 //
375 // -----------------------------------------------------------------------------
376 //
377
378
379 /**
380 * Asynchronous FIFO channel template
381 * Implements a thread safe FIFO queue that handles thread blocking if the queue
382 * is full or empty.
383 *
384 * @param T the data type to be enqueued in the queue
385 */
386 template <class T>
387 class FifoChannel : protected std::queue<T> {
388 public:
389 /**
390 * Creates a new FIFO channel.
391 *
392 * @param maximal_length the maximal number of items that can be enqueued
393 * @param drainout_threshold if less than xx elements are in the queue it is
394 * considered to be "not full"
395 */
396 FifoChannel(const size_t maximal_length,
397 const size_t drainout_threshold);
398 virtual ~FifoChannel();
399
400 /**
401 * Adds a new item to the end of the FIFO channel. If the queue is full, this
402 * call will block until items were dequeued by another thread allowing the
403 * desired insertion.
404 *
405 * @param data the data to be enqueued into the FIFO channel
406 */
407 void Enqueue(const T &data);
408
409 /**
410 * Removes the next element from the channel. If the queue is empty, this will
411 * block until another thread enqueues an item into the channel.
412 *
413 * @return the first item in the channel queue
414 */
415 const T Dequeue();
416
417 /**
418 * Clears all items in the FIFO channel. The cleared items will be lost.
419 *
420 * @return the number of dropped items
421 */
422 unsigned int Drop();
423
424 inline size_t GetItemCount() const;
425 inline bool IsEmpty() const;
426 inline size_t GetMaximalItemCount() const;
427
428 private:
429 // general configuration
430 const size_t maximal_queue_length_;
431 const size_t queue_drainout_threshold_;
432
433 // thread synchronisation structures
434 mutable pthread_mutex_t mutex_;
435 mutable pthread_cond_t queue_is_not_empty_;
436 mutable pthread_cond_t queue_is_not_full_;
437 };
438
439
440 /**
441 * This template implements a generic producer/consumer approach to concurrent
442 * worker tasks. It spawns a given number of Workers derived from the base class
443 * ConcurrentWorker and uses them to distribute the work load onto concurrent
444 * threads.
445 * One can have multiple producers, that use Schedule() to post new work into
446 * a central job queue, which in turn is processed concurrently by the Worker
447 * objects in multiple threads. Furthermore the template provides an interface
448 * to control the worker swarm, i.e. to wait for their completion or cancel them
449 * before all jobs are processed.
450 *
451 * Note: A worker is a class inheriting from ConcurrentWorker that needs to meet
452 * a couple of requirements. See the documentation of ConcurrentWorker for
453 * additional details.
454 *
455 * @param WorkerT the class to be used as a worker for a concurrent worker
456 * swarm
457 */
458 template <class WorkerT>
459 class ConcurrentWorkers : public Observable<typename WorkerT::returned_data> {
460 public:
461 // these data types must be defined by the worker class
462 /**
463 * Input data type
464 */
465 typedef typename WorkerT::expected_data expected_data_t;
466 /**
467 * Output data type
468 */
469 typedef typename WorkerT::returned_data returned_data_t;
470 /**
471 * Common context type
472 */
473 typedef typename WorkerT::worker_context worker_context_t;
474
475 protected:
476 typedef std::vector<pthread_t> WorkerThreads;
477
478 /**
479 * This is a simple wrapper structure to piggy-back control information on
480 * scheduled jobs. Job structures are scheduled into a central FIFO queue and
481 * are then processed concurrently by the workers.
482 */
483 template <class DataT>
484 struct Job {
485 explicit Job(const DataT &data) :
486 data(data),
487 is_death_sentence(false) {}
488 Job() :
489 data(),
490 is_death_sentence(true) {}
491 const DataT data; //!< job payload
492 const bool is_death_sentence; //!< death sentence flag
493 };
494 typedef Job<expected_data_t> WorkerJob;
495 typedef Job<returned_data_t> CallbackJob;
496
497 /**
498 * Provides a wrapper for initialization data passed to newly spawned worker
499 * threads for initialization.
500 * It contains a pointer to the spawning ConcurrentWorkers master object as
501 * well as a pointer to a context object defined by the concrete worker to be
502 * spawned.
503 */
504 struct RunBinding {
505 explicit RunBinding(ConcurrentWorkers<WorkerT> *delegate) :
506 delegate(delegate) {}
507 ConcurrentWorkers<WorkerT> *delegate; //!< delegate to the Concurrent-
508 //!< Workers master
509 };
510
511 struct WorkerRunBinding : RunBinding {
512 WorkerRunBinding(ConcurrentWorkers<WorkerT> *delegate,
513 const worker_context_t *worker_context) :
514 RunBinding(delegate),
515 worker_context(worker_context) {}
516 /**
517 * WorkerT defined context objects for worker init.
518 */
519 const worker_context_t *worker_context;
520 };
521
522 public:
523 /**
524 * Creates a ConcurrentWorkers master object that encapsulates the actual
525 * workers.
526 *
527 * @param number_of_workers the number of concurrent workers to be spawned
528 * @param maximal_queue_length the maximal length of the job queue
529 * (>= number_of_workers)
530 * @param worker_context a pointer to the WorkerT defined context object
531 */
532 ConcurrentWorkers(const size_t number_of_workers,
533 const size_t maximal_queue_length,
534 worker_context_t *worker_context = NULL);
535 virtual ~ConcurrentWorkers();
536
537 /**
538 * Initializes the ConcurrentWorkers swarm, spawnes a thread for each new
539 * worker object and puts everything into working state.
540 *
541 * @return true if all went fine
542 */
543 bool Initialize();
544
545 /**
546 * Schedules a new job for processing into the internal job queue. This method
547 * will block in case the job queue is already full and wait for an empty slot.
548 *
549 * @param data the data to be processed
550 */
551 inline void Schedule(const expected_data_t &data) {
552 Schedule(WorkerJob(data));
553 }
554
555 /**
556 * Shuts down the ConcurrentWorkers object as well as the encapsulated workers
557 * as soon as possible. Workers will finish their current job and will termi-
558 * nate afterwards. If no jobs are scheduled they will simply stop waiting for
559 * new ones and terminate afterwards.
560 * This method MUST not be called more than once per ConcurrentWorkers.
561 */
562 void Terminate();
563
564 /**
565 * Waits until the job queue is fully processed
566 *
567 * Note: this might lead to undefined behaviour or infinite waiting if other
568 * producers still schedule jobs into the job queue.
569 */
570 void WaitForEmptyQueue() const;
571
572 /**
573 * Waits until the ConcurrentWorkers swarm fully processed the current job
574 * queue and shuts down afterwards.
575 *
576 * Note: just as for WaitForEmptyQueue() this assumes that no other producers
577 * schedule jobs in the mean time.
578 */
579 void WaitForTermination();
580
581 inline unsigned int GetNumberOfWorkers() const { return number_of_workers_; }
582 inline unsigned int GetNumberOfFailedJobs() const {
583 return atomic_read32(&jobs_failed_);
584 }
585
586 /**
587 * Defines a job as successfully finished.
588 * DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT!
589 *
590 * @param data the data to be returned back to the user
591 */
592 inline void JobSuccessful(const returned_data_t& data) {
593 JobDone(data, true);
594 }
595
596 /**
597 * Defines a job as failed.
598 * DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT!
599 *
600 * Note: Even for failed jobs the user will get a callback with a data object.
601 * You might want to make sure, that this data contains a status flag as
602 * well, telling the user what went wrong.
603 *
604 * @param data the data to be returned back to the user
605 */
606 inline void JobFailed(const returned_data_t& data) { JobDone(data, false); }
607
608 void RunCallbackThread();
609
610 protected:
611 bool SpawnWorkers();
612
613 /**
614 * POSIX conform function for thread entry point. Is invoked for every new
615 * worker thread and contains the initialization, processing loop and tear
616 * down of the unique worker objects
617 *
618 * @param run_binding void pointer to a RunBinding structure (C interface)
619 * @return NULL in any case
620 */
621 static void* RunWorker(void *run_binding);
622
623 static void* RunCallbackThreadWrapper(void *run_binding);
624
625 /**
626 * Tells the master that a worker thread did start. This does not mean, that
627 * it was initialized successfully.
628 */
629 void ReportStartedWorker() const;
630
631 void Schedule(WorkerJob job);
632 void ScheduleDeathSentences();
633
634 /**
635 * Empties the job queue
636 *
637 * @param forget_pending controls if cancelled jobs should be seen as finished
638 */
639 void TruncateJobQueue(const bool forget_pending = false);
640
641 /**
642 * Retrieves a job from the job queue. If the job queue is empty it will block
643 * until there is a new job available for processing.
644 * THIS METHOD MUST ONLY BE CALLED INSIDE THE WORKER OBJECTS
645 *
646 * @return a job to be processed by a worker
647 */
648 inline WorkerJob Acquire();
649
650 /**
651 * Controls the asynchronous finishing of a job.
652 * DO NOT CALL THIS, use JobSuccessful() or JobFailed() wrappers instead.
653 *
654 * @param data the data to be returned to the user
655 * @param success flag if job was successful
656 */
657 void JobDone(const returned_data_t& data, const bool success = true);
658
659 inline void StartRunning() {
660 MutexLockGuard guard(status_mutex_);
661 running_ = true;
662 }
663 inline void StopRunning() {
664 MutexLockGuard guard(status_mutex_);
665 running_ = false;
666 }
667 inline bool IsRunning() const {
668 MutexLockGuard guard(status_mutex_);
669 return running_;
670 }
671
672 private:
673 // general configuration
674 const size_t number_of_workers_; //!< number of concurrent worker threads
675 const worker_context_t *worker_context_; //!< the WorkerT defined context
676 /**
677 * The thread context passed to newly spawned threads
678 */
679 WorkerRunBinding thread_context_;
680
681 // status information
682 bool initialized_;
683 bool running_;
684 mutable unsigned int workers_started_;
685 mutable pthread_mutex_t status_mutex_;
686 mutable pthread_cond_t worker_started_;
687 mutable pthread_mutex_t jobs_all_done_mutex_;
688 mutable pthread_cond_t jobs_all_done_;
689
690 // worker threads
691 WorkerThreads worker_threads_; //!< list of worker threads
692 pthread_t callback_thread_; //!< handles callback invokes
693
694 // job queue
695 typedef FifoChannel<WorkerJob > JobQueue;
696 JobQueue jobs_queue_;
697 mutable atomic_int32 jobs_pending_;
698 mutable atomic_int32 jobs_failed_;
699 mutable atomic_int64 jobs_processed_;
700
701 // callback channel
702 typedef FifoChannel<CallbackJob > CallbackQueue;
703 CallbackQueue results_queue_;
704 };
705
706
707 /**
708 * Base class for worker classes that should be used in a ConcurrentWorkers
709 * swarm. These classes need to fulfill a number of requirements in order to
710 * satisfy the needs of the ConcurrentWorkers template.
711 *
712 * Requirements:
713 * -> needs to define the following types:
714 * - expected_data - input data structure of the worker
715 * - returned_data - output data structure of the worker
716 * - worker_context - context structure for initialization information
717 *
718 * -> implement a constructor that takes a pointer to its worker_context
719 * as its only parameter:
720 * AwesomeWorker(const AwesomeWorker::worker_context*)
721 * Note: do not rely on the context object to be available after the
722 * constructor has returned!
723 *
724 * -> needs to define the calling-operator expecting one parameter of type:
725 * const expected_data& and returning void
726 * This will be invoked for every new job the worker should process
727 *
728 * -> inside the implementation of the described calling-operator it needs to
729 * invoke either:
730 * master()->JobSuccessful(const returned_data&)
731 * or:
732 * master()->JobFailed(const returned_data&)
733 * as its LAST operation before returning.
734 * This will keep track of finished jobs and inform the user of Concurrent-
735 * Workers about finished jobs.
736 *
737 * -> [optional] overwrite Initialize() and/or TearDown() to do environmental
738 * setup work, before or respectively after jobs will be processed
739 *
740 * General Reminder:
741 * You will be running in a multi-threaded environment here! Buckle up and
742 * make suitable preparations to shield yourself from serious head-ache.
743 *
744 * Note: This implements a Curiously Recurring Template Pattern
745 * (http://en.wikipedia.org/wiki/Curiously_recurring_template_pattern)
746 *
747 * @param DerivedWorkerT the class name of the inheriting class
748 * (f.e. class AwesomeWorker : public ConcurrentWorker<AwesomeWorker>)
749 */
750 template <class DerivedWorkerT>
751 class ConcurrentWorker : SingleCopy {
752 public:
753 virtual ~ConcurrentWorker() {}
754
755 /**
756 * Does general initialization before any jobs will get scheduled. You do not
757 * need to up-call this initialize method, since it is seen as a dummy here.
758 *
759 * @returns true one successful initialization
760 */
761 virtual bool Initialize() { return true; }
762
763 /**
764 * Does general clean-up after the last job was processed in the worker object
765 * and it is about to vanish. You do not need to up-call this method.
766 */
767 virtual void TearDown() {}
768
769 /**
770 * The actual job-processing entry point. See the description of the inheriting
771 * class requirements to learn about the semantics of this methods.
772 * DO NOT FORGET TO CALL master()->JobSuccessful() OR master()->JobFinished()
773 * at the end of thismethod!!
774 *
775 * Note: There is no way to generally define this operator, it is therefore
776 * commented out and placed here just as a reminder.
777 *
778 * @param data the data to be processed.
779 */
780 // void operator()(const expected_data &data); // do the actual job of the
781 // worker
782
783 protected:
784 ConcurrentWorker() : master_(NULL) {}
785
786 /**
787 * Gets a pointer to the ConcurrentWorkers object that this worker resides in
788 *
789 * @returns a pointer to the ConcurrentWorkers object
790 */
791 inline ConcurrentWorkers<DerivedWorkerT>* master() const { return master_; }
792
793 private:
794 friend class ConcurrentWorkers<DerivedWorkerT>;
795 void RegisterMaster(ConcurrentWorkers<DerivedWorkerT> *master) {
796 master_ = master;
797 }
798
799 private:
800 ConcurrentWorkers<DerivedWorkerT> *master_;
801 };
802
803 #ifdef CVMFS_NAMESPACE_GUARD
804 } // namespace CVMFS_NAMESPACE_GUARD
805 #endif
806
807 #include "util/concurrency_impl.h"
808
809 #endif // CVMFS_UTIL_CONCURRENCY_H_
810