GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/util/concurrency.h
Date: 2025-06-22 02:36:02
Exec Total Coverage
Lines: 85 115 73.9%
Branches: 21 38 55.3%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_UTIL_CONCURRENCY_H_
6 #define CVMFS_UTIL_CONCURRENCY_H_
7
8 #include <pthread.h>
9
10 #include <cassert>
11 #include <queue>
12 #include <set>
13 #include <vector>
14
15 #include "util/async.h"
16 #include "util/atomic.h"
17 #include "util/export.h"
18 #include "util/mutex.h"
19 #include "util/single_copy.h"
20
21 #ifdef CVMFS_NAMESPACE_GUARD
22 namespace CVMFS_NAMESPACE_GUARD {
23 #endif
24
25 /**
26 * A thread-safe, unbounded vector of items that implement a FIFO channel.
27 * Uses conditional variables to block when threads try to pop from the empty
28 * channel.
29 */
30 template<class ItemT>
31 class Channel : SingleCopy {
32 public:
33 90 Channel() {
34 90 int retval = pthread_mutex_init(&lock_, NULL);
35
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 90 times.
90 assert(retval == 0);
36 90 retval = pthread_cond_init(&cond_populated_, NULL);
37
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 90 times.
90 assert(retval == 0);
38 90 }
39
40 90 ~Channel() {
41 90 pthread_cond_destroy(&cond_populated_);
42 90 pthread_mutex_destroy(&lock_);
43 90 }
44
45 /**
46 * Returns the queue locked and ready for appending 1 item.
47 */
48 200000 std::vector<ItemT *> *StartEnqueueing() {
49 200000 int retval = pthread_mutex_lock(&lock_);
50
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 200000 times.
200000 assert(retval == 0);
51 200000 return &items_;
52 }
53
54 /**
55 * Unlocks the queue. The queue must remain unchanged when this is called.
56 */
57 100000 void AbortEnqueueing() {
58 100000 int retval = pthread_mutex_unlock(&lock_);
59
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 100000 times.
100000 assert(retval == 0);
60 100000 }
61
62 /**
63 * 1 new item was added to the queue. Unlock and signal to reader thread.
64 */
65 100000 void CommitEnqueueing() {
66 100000 int retval = pthread_cond_signal(&cond_populated_);
67
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 100000 times.
100000 assert(retval == 0);
68 100000 retval = pthread_mutex_unlock(&lock_);
69
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 100000 times.
100000 assert(retval == 0);
70 100000 }
71
72 126 void PushBack(ItemT *item) {
73 126 MutexLockGuard lock_guard(&lock_);
74
1/2
✓ Branch 1 taken 126 times.
✗ Branch 2 not taken.
126 items_.push_back(item);
75 126 int retval = pthread_cond_signal(&cond_populated_);
76
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 126 times.
126 assert(retval == 0);
77 126 }
78
79 /**
80 * Remove and return the first element from the queue. Block if tube is
81 * empty.
82 */
83 100126 ItemT *PopFront() {
84 100126 MutexLockGuard lock_guard(&lock_);
85
2/2
✓ Branch 1 taken 98 times.
✓ Branch 2 taken 100126 times.
100224 while (items_.size() == 0)
86
1/2
✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
98 pthread_cond_wait(&cond_populated_, &lock_);
87 100126 ItemT *item = items_[0];
88
1/2
✓ Branch 3 taken 100126 times.
✗ Branch 4 not taken.
100126 items_.erase(items_.begin());
89 100126 return item;
90 100126 }
91
92 private:
93 /**
94 * The locked queue/channel
95 */
96 std::vector<ItemT *> items_;
97 /**
98 * Protects all internal state
99 */
100 pthread_mutex_t lock_;
101 /**
102 * Signals if there are items enqueued
103 */
104 pthread_cond_t cond_populated_;
105 };
106
107 /**
108 * Implements a simple interface to lock objects of derived classes. Classes
109 * that inherit from Lockable are also usable with the LockGuard template for
110 * scoped locking semantics.
111 */
112 class CVMFS_EXPORT Lockable : SingleCopy {
113 public:
114 40 inline virtual ~Lockable() { pthread_mutex_destroy(&mutex_); }
115
116 20 void Lock() const { pthread_mutex_lock(&mutex_); }
117 40 int TryLock() const { return pthread_mutex_trylock(&mutex_); }
118 20 void Unlock() const { pthread_mutex_unlock(&mutex_); }
119
120 protected:
121 20 Lockable() {
122 20 const int retval = pthread_mutex_init(&mutex_, NULL);
123
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20 times.
20 assert(retval == 0);
124 20 }
125
126 private:
127 mutable pthread_mutex_t mutex_;
128 };
129
130
131 //
132 // -----------------------------------------------------------------------------
133 //
134
135
136 /**
137 * This counter can be counted up and down using the usual increment/decrement
138 * operators. It allows threads to wait for it to become zero as well as to
139 * block when a specified maximal value would be exceeded by an increment.
140 *
141 * Note: If a maximal value is specified on creation, the SynchronizingCounter
142 * is assumed to never leave the interval [0, maximal_value]! Otherwise
143 * the numerical limits of the specified template parameter define this
144 * interval and an increment _never_ blocks.
145 *
146 * Caveat: This implementation uses a simple mutex mechanism and therefore might
147 * become a scalability bottle neck!
148 */
149 template<typename T>
150 class SynchronizingCounter : SingleCopy {
151 public:
152 72 SynchronizingCounter() : value_(T(0)), maximal_value_(T(0)) { Initialize(); }
153
154 2890 explicit SynchronizingCounter(const T maximal_value)
155 2890 : value_(T(0)), maximal_value_(maximal_value) {
156
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2890 times.
2890 assert(maximal_value > T(0));
157 2890 Initialize();
158 2890 }
159
160 2961 ~SynchronizingCounter() { Destroy(); }
161
162 643686918 T Increment() {
163 643686918 MutexLockGuard l(mutex_);
164
1/2
✓ Branch 1 taken 651562387 times.
✗ Branch 2 not taken.
651562387 WaitForFreeSlotUnprotected();
165
1/2
✓ Branch 1 taken 1204444 times.
✗ Branch 2 not taken.
651562387 SetValueUnprotected(value_ + T(1));
166 644772101 return value_;
167 651562387 }
168
169 644477857 T Decrement() {
170 644477857 MutexLockGuard l(mutex_);
171
1/2
✓ Branch 1 taken 1204444 times.
✗ Branch 2 not taken.
651562387 SetValueUnprotected(value_ - T(1));
172 1295706739 return value_;
173 651562387 }
174
175 3829 void WaitForZero() const {
176 3829 MutexLockGuard l(mutex_);
177
2/2
✓ Branch 0 taken 548 times.
✓ Branch 1 taken 3829 times.
4377 while (value_ != T(0)) {
178
1/2
✓ Branch 1 taken 548 times.
✗ Branch 2 not taken.
548 pthread_cond_wait(&became_zero_, &mutex_);
179 }
180
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3829 times.
3829 assert(value_ == T(0));
181 3829 }
182
183 3912890192 bool HasMaximalValue() const { return maximal_value_ != T(0); }
184 8 T maximal_value() const { return maximal_value_; }
185
186 1195565 T operator++() { return Increment(); }
187 28 const T operator++(int) { return Increment() - T(1); }
188 1195905 T operator--() { return Decrement(); }
189 28 const T operator--(int) { return Decrement() + T(1); }
190
191 38364 T Get() const {
192 38364 MutexLockGuard l(mutex_);
193 76728 return value_;
194 38364 }
195
196 86 SynchronizingCounter<T> &operator=(const T &other) {
197 86 MutexLockGuard l(mutex_);
198 86 SetValueUnprotected(other);
199 172 return *this;
200 86 }
201
202 protected:
203 void SetValueUnprotected(const T new_value);
204 void WaitForFreeSlotUnprotected();
205
206 private:
207 void Initialize();
208 void Destroy();
209
210 private:
211 T value_;
212 const T maximal_value_;
213
214 mutable pthread_mutex_t mutex_;
215 mutable pthread_cond_t became_zero_;
216 pthread_cond_t free_slot_;
217 };
218
219
220 //
221 // -----------------------------------------------------------------------------
222 //
223
224
225 template<typename ParamT>
226 class Observable;
227
228
229 /**
230 * This is a base class for classes that need to expose a callback interface for
231 * asynchronous callback methods. One can register an arbitrary number of
232 * observers on an Observable that get notified when the method
233 * NotifyListeners() is invoked.
234 *
235 * Note: the registration and invocation of callbacks in Observable is thread-
236 * safe, but be aware that the callbacks of observing classes might run in
237 * arbitrary threads. When using these classes, you should take extra care
238 * for thread-safety.
239 *
240 * Note: The RegisterListener() methods return a pointer to a CallbackBase.
241 * You MUST NOT free these objects, they are managed by the Observable
242 * class. Use them only as handles to unregister specific callbacks.
243 *
244 * @param ParamT the type of the parameter that is passed to every callback
245 * invocation.
246 */
247 template<typename ParamT>
248 class Observable : public Callbackable<ParamT>, SingleCopy {
249 public:
250 typedef typename Callbackable<ParamT>::CallbackTN *CallbackPtr;
251
252 protected:
253 typedef std::set<CallbackPtr> Callbacks;
254
255 public:
256 virtual ~Observable();
257
258 /**
259 * Registers a method of a specific object as a listener to the Observable
260 * object. The method is invoked on the given delegate when the callback is
261 * fired by the observed object using NotifyListeners(). Since this is meant
262 * to be a closure, it also passes the third argument to the method being in-
263 * voked by the Observable object.
264 *
265 * @param DelegateT the type of the delegate object
266 * @param method a pointer to the method to be invoked by the callback
267 * @param delegate a pointer to the object to invoke the callback on
268 * @param closure something to be passed to `method`
269 * @return a handle to the registered callback
270 */
271 template<class DelegateT, class ClosureDataT>
272 CallbackPtr RegisterListener(
273 typename BoundClosure<ParamT, DelegateT, ClosureDataT>::CallbackMethod
274 method,
275 DelegateT *delegate,
276 ClosureDataT data);
277
278 /**
279 * Registers a method of a specific object as a listener to the Observable
280 * object. The method is invoked on the given delegate when the callback is
281 * fired by the observed object using NotifyListeners().
282 *
283 * @param DelegateT the type of the delegate object
284 * @param method a pointer to the method to be invoked by the callback
285 * @param delegate a pointer to the object to invoke the callback on
286 * @return a handle to the registered callback
287 */
288 template<class DelegateT>
289 CallbackPtr RegisterListener(
290 typename BoundCallback<ParamT, DelegateT>::CallbackMethod method,
291 DelegateT *delegate);
292
293 /**
294 * Registers a static class member or a C-like function as a callback to the
295 * Observable object. The function is invoked when the callback is fired by
296 * the observed object using NotifyListeners().
297 *
298 * @param fn a pointer to the function to be called by the callback
299 * @return a handle to the registered callback
300 */
301 CallbackPtr RegisterListener(typename Callback<ParamT>::CallbackFunction fn);
302
303 /**
304 * Removes the given callback from the listeners group of this Observable.
305 *
306 * @param callback_object a callback handle that was returned by
307 * RegisterListener() before.
308 */
309 void UnregisterListener(CallbackPtr callback_object);
310
311 /**
312 * Removes all listeners from the Observable
313 */
314 void UnregisterListeners();
315
316 protected:
317 Observable(); // don't instantiate this as a stand alone object
318
319 void RegisterListener(CallbackPtr callback_object);
320
321 /**
322 * Notifies all registered listeners and passes them the provided argument
323 * This method should be called by a derived class to send out asynchronous
324 * messages to registered observers.
325 *
326 * @param parameter the data to be passed to the observers
327 */
328 void NotifyListeners(const ParamT &parameter);
329
330 private:
331 Callbacks listeners_; //!< the set of registered
332 //!< callback objects
333 mutable pthread_rwlock_t listeners_rw_lock_;
334 };
335
336
337 //
338 // -----------------------------------------------------------------------------
339 //
340
341
342 /**
343 * Returns the number of CPU cores present in the system or a fallback number
344 * if it failed to determine the number of CPU cores.
345 *
346 * @return the number of active CPU cores in the system
347 */
348 CVMFS_EXPORT unsigned int GetNumberOfCpuCores();
349 static const unsigned int kFallbackNumberOfCpus = 1;
350
351
352 /**
353 * A blocking signal for thread synchronization
354 */
355 class CVMFS_EXPORT Signal : SingleCopy {
356 public:
357 Signal();
358 ~Signal();
359 void Wakeup();
360 void Wait();
361 bool IsSleeping();
362
363 private:
364 bool fired_;
365 pthread_mutex_t lock_;
366 pthread_cond_t signal_;
367 };
368
369
370 //
371 // -----------------------------------------------------------------------------
372 //
373
374
375 /**
376 * Asynchronous FIFO channel template
377 * Implements a thread safe FIFO queue that handles thread blocking if the queue
378 * is full or empty.
379 *
380 * @param T the data type to be enqueued in the queue
381 */
382 template<class T>
383 class FifoChannel : protected std::queue<T> {
384 public:
385 /**
386 * Creates a new FIFO channel.
387 *
388 * @param maximal_length the maximal number of items that can be enqueued
389 * @param drainout_threshold if less than xx elements are in the queue it is
390 * considered to be "not full"
391 */
392 FifoChannel(const size_t maximal_length, const size_t drainout_threshold);
393 virtual ~FifoChannel();
394
395 /**
396 * Adds a new item to the end of the FIFO channel. If the queue is full, this
397 * call will block until items were dequeued by another thread allowing the
398 * desired insertion.
399 *
400 * @param data the data to be enqueued into the FIFO channel
401 */
402 void Enqueue(const T &data);
403
404 /**
405 * Removes the next element from the channel. If the queue is empty, this will
406 * block until another thread enqueues an item into the channel.
407 *
408 * @return the first item in the channel queue
409 */
410 const T Dequeue();
411
412 /**
413 * Clears all items in the FIFO channel. The cleared items will be lost.
414 *
415 * @return the number of dropped items
416 */
417 unsigned int Drop();
418
419 inline size_t GetItemCount() const;
420 inline bool IsEmpty() const;
421 inline size_t GetMaximalItemCount() const;
422
423 private:
424 // general configuration
425 const size_t maximal_queue_length_;
426 const size_t queue_drainout_threshold_;
427
428 // thread synchronisation structures
429 mutable pthread_mutex_t mutex_;
430 mutable pthread_cond_t queue_is_not_empty_;
431 mutable pthread_cond_t queue_is_not_full_;
432 };
433
434
435 /**
436 * This template implements a generic producer/consumer approach to concurrent
437 * worker tasks. It spawns a given number of Workers derived from the base class
438 * ConcurrentWorker and uses them to distribute the work load onto concurrent
439 * threads.
440 * One can have multiple producers, that use Schedule() to post new work into
441 * a central job queue, which in turn is processed concurrently by the Worker
442 * objects in multiple threads. Furthermore the template provides an interface
443 * to control the worker swarm, i.e. to wait for their completion or cancel them
444 * before all jobs are processed.
445 *
446 * Note: A worker is a class inheriting from ConcurrentWorker that needs to meet
447 * a couple of requirements. See the documentation of ConcurrentWorker for
448 * additional details.
449 *
450 * @param WorkerT the class to be used as a worker for a concurrent worker
451 * swarm
452 */
453 template<class WorkerT>
454 class ConcurrentWorkers : public Observable<typename WorkerT::returned_data> {
455 public:
456 // these data types must be defined by the worker class
457 /**
458 * Input data type
459 */
460 typedef typename WorkerT::expected_data expected_data_t;
461 /**
462 * Output data type
463 */
464 typedef typename WorkerT::returned_data returned_data_t;
465 /**
466 * Common context type
467 */
468 typedef typename WorkerT::worker_context worker_context_t;
469
470 protected:
471 typedef std::vector<pthread_t> WorkerThreads;
472
473 /**
474 * This is a simple wrapper structure to piggy-back control information on
475 * scheduled jobs. Job structures are scheduled into a central FIFO queue and
476 * are then processed concurrently by the workers.
477 */
478 template<class DataT>
479 struct Job {
480 explicit Job(const DataT &data) : data(data), is_death_sentence(false) { }
481 Job() : data(), is_death_sentence(true) { }
482 const DataT data; //!< job payload
483 const bool is_death_sentence; //!< death sentence flag
484 };
485 typedef Job<expected_data_t> WorkerJob;
486 typedef Job<returned_data_t> CallbackJob;
487
488 /**
489 * Provides a wrapper for initialization data passed to newly spawned worker
490 * threads for initialization.
491 * It contains a pointer to the spawning ConcurrentWorkers master object as
492 * well as a pointer to a context object defined by the concrete worker to be
493 * spawned.
494 */
495 struct RunBinding {
496 explicit RunBinding(ConcurrentWorkers<WorkerT> *delegate)
497 : delegate(delegate) { }
498 ConcurrentWorkers<WorkerT> *delegate; //!< delegate to the Concurrent-
499 //!< Workers master
500 };
501
502 struct WorkerRunBinding : RunBinding {
503 WorkerRunBinding(ConcurrentWorkers<WorkerT> *delegate,
504 const worker_context_t *worker_context)
505 : RunBinding(delegate), worker_context(worker_context) { }
506 /**
507 * WorkerT defined context objects for worker init.
508 */
509 const worker_context_t *worker_context;
510 };
511
512 public:
513 /**
514 * Creates a ConcurrentWorkers master object that encapsulates the actual
515 * workers.
516 *
517 * @param number_of_workers the number of concurrent workers to be spawned
518 * @param maximal_queue_length the maximal length of the job queue
519 * (>= number_of_workers)
520 * @param worker_context a pointer to the WorkerT defined context
521 * object
522 */
523 ConcurrentWorkers(const size_t number_of_workers,
524 const size_t maximal_queue_length,
525 worker_context_t *worker_context = NULL);
526 virtual ~ConcurrentWorkers();
527
528 /**
529 * Initializes the ConcurrentWorkers swarm, spawnes a thread for each new
530 * worker object and puts everything into working state.
531 *
532 * @return true if all went fine
533 */
534 bool Initialize();
535
536 /**
537 * Schedules a new job for processing into the internal job queue. This method
538 * will block in case the job queue is already full and wait for an empty
539 * slot.
540 *
541 * @param data the data to be processed
542 */
543 inline void Schedule(const expected_data_t &data) {
544 Schedule(WorkerJob(data));
545 }
546
547 /**
548 * Shuts down the ConcurrentWorkers object as well as the encapsulated workers
549 * as soon as possible. Workers will finish their current job and will termi-
550 * nate afterwards. If no jobs are scheduled they will simply stop waiting for
551 * new ones and terminate afterwards.
552 * This method MUST not be called more than once per ConcurrentWorkers.
553 */
554 void Terminate();
555
556 /**
557 * Waits until the job queue is fully processed
558 *
559 * Note: this might lead to undefined behaviour or infinite waiting if other
560 * producers still schedule jobs into the job queue.
561 */
562 void WaitForEmptyQueue() const;
563
564 /**
565 * Waits until the ConcurrentWorkers swarm fully processed the current job
566 * queue and shuts down afterwards.
567 *
568 * Note: just as for WaitForEmptyQueue() this assumes that no other producers
569 * schedule jobs in the mean time.
570 */
571 void WaitForTermination();
572
573 inline unsigned int GetNumberOfWorkers() const { return number_of_workers_; }
574 inline unsigned int GetNumberOfFailedJobs() const {
575 return atomic_read32(&jobs_failed_);
576 }
577
578 /**
579 * Defines a job as successfully finished.
580 * DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT!
581 *
582 * @param data the data to be returned back to the user
583 */
584 inline void JobSuccessful(const returned_data_t &data) {
585 JobDone(data, true);
586 }
587
588 /**
589 * Defines a job as failed.
590 * DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT!
591 *
592 * Note: Even for failed jobs the user will get a callback with a data object.
593 * You might want to make sure, that this data contains a status flag as
594 * well, telling the user what went wrong.
595 *
596 * @param data the data to be returned back to the user
597 */
598 inline void JobFailed(const returned_data_t &data) { JobDone(data, false); }
599
600 void RunCallbackThread();
601
602 protected:
603 bool SpawnWorkers();
604
605 /**
606 * POSIX conform function for thread entry point. Is invoked for every new
607 * worker thread and contains the initialization, processing loop and tear
608 * down of the unique worker objects
609 *
610 * @param run_binding void pointer to a RunBinding structure (C interface)
611 * @return NULL in any case
612 */
613 static void *RunWorker(void *run_binding);
614
615 static void *RunCallbackThreadWrapper(void *run_binding);
616
617 /**
618 * Tells the master that a worker thread did start. This does not mean, that
619 * it was initialized successfully.
620 */
621 void ReportStartedWorker() const;
622
623 void Schedule(WorkerJob job);
624 void ScheduleDeathSentences();
625
626 /**
627 * Empties the job queue
628 *
629 * @param forget_pending controls if cancelled jobs should be seen as
630 * finished
631 */
632 void TruncateJobQueue(const bool forget_pending = false);
633
634 /**
635 * Retrieves a job from the job queue. If the job queue is empty it will block
636 * until there is a new job available for processing.
637 * THIS METHOD MUST ONLY BE CALLED INSIDE THE WORKER OBJECTS
638 *
639 * @return a job to be processed by a worker
640 */
641 inline WorkerJob Acquire();
642
643 /**
644 * Controls the asynchronous finishing of a job.
645 * DO NOT CALL THIS, use JobSuccessful() or JobFailed() wrappers instead.
646 *
647 * @param data the data to be returned to the user
648 * @param success flag if job was successful
649 */
650 void JobDone(const returned_data_t &data, const bool success = true);
651
652 inline void StartRunning() {
653 MutexLockGuard guard(status_mutex_);
654 running_ = true;
655 }
656 inline void StopRunning() {
657 MutexLockGuard guard(status_mutex_);
658 running_ = false;
659 }
660 inline bool IsRunning() const {
661 MutexLockGuard guard(status_mutex_);
662 return running_;
663 }
664
665 private:
666 // general configuration
667 const size_t number_of_workers_; //!< number of concurrent worker threads
668 const worker_context_t *worker_context_; //!< the WorkerT defined context
669 /**
670 * The thread context passed to newly spawned threads
671 */
672 WorkerRunBinding thread_context_;
673
674 // status information
675 bool initialized_;
676 bool running_;
677 mutable unsigned int workers_started_;
678 mutable pthread_mutex_t status_mutex_;
679 mutable pthread_cond_t worker_started_;
680 mutable pthread_mutex_t jobs_all_done_mutex_;
681 mutable pthread_cond_t jobs_all_done_;
682
683 // worker threads
684 WorkerThreads worker_threads_; //!< list of worker threads
685 pthread_t callback_thread_; //!< handles callback invokes
686
687 // job queue
688 typedef FifoChannel<WorkerJob> JobQueue;
689 JobQueue jobs_queue_;
690 mutable atomic_int32 jobs_pending_;
691 mutable atomic_int32 jobs_failed_;
692 mutable atomic_int64 jobs_processed_;
693
694 // callback channel
695 typedef FifoChannel<CallbackJob> CallbackQueue;
696 CallbackQueue results_queue_;
697 };
698
699
700 /**
701 * Base class for worker classes that should be used in a ConcurrentWorkers
702 * swarm. These classes need to fulfill a number of requirements in order to
703 * satisfy the needs of the ConcurrentWorkers template.
704 *
705 * Requirements:
706 * -> needs to define the following types:
707 * - expected_data - input data structure of the worker
708 * - returned_data - output data structure of the worker
709 * - worker_context - context structure for initialization information
710 *
711 * -> implement a constructor that takes a pointer to its worker_context
712 * as its only parameter:
713 * AwesomeWorker(const AwesomeWorker::worker_context*)
714 * Note: do not rely on the context object to be available after the
715 * constructor has returned!
716 *
717 * -> needs to define the calling-operator expecting one parameter of type:
718 * const expected_data& and returning void
719 * This will be invoked for every new job the worker should process
720 *
721 * -> inside the implementation of the described calling-operator it needs to
722 * invoke either:
723 * master()->JobSuccessful(const returned_data&)
724 * or:
725 * master()->JobFailed(const returned_data&)
726 * as its LAST operation before returning.
727 * This will keep track of finished jobs and inform the user of Concurrent-
728 * Workers about finished jobs.
729 *
730 * -> [optional] overwrite Initialize() and/or TearDown() to do environmental
731 * setup work, before or respectively after jobs will be processed
732 *
733 * General Reminder:
734 * You will be running in a multi-threaded environment here! Buckle up and
735 * make suitable preparations to shield yourself from serious head-ache.
736 *
737 * Note: This implements a Curiously Recurring Template Pattern
738 * (http://en.wikipedia.org/wiki/Curiously_recurring_template_pattern)
739 *
740 * @param DerivedWorkerT the class name of the inheriting class
741 * (f.e. class AwesomeWorker : public ConcurrentWorker<AwesomeWorker>)
742 */
743 template<class DerivedWorkerT>
744 class ConcurrentWorker : SingleCopy {
745 public:
746 virtual ~ConcurrentWorker() { }
747
748 /**
749 * Does general initialization before any jobs will get scheduled. You do not
750 * need to up-call this initialize method, since it is seen as a dummy here.
751 *
752 * @returns true one successful initialization
753 */
754 virtual bool Initialize() { return true; }
755
756 /**
757 * Does general clean-up after the last job was processed in the worker object
758 * and it is about to vanish. You do not need to up-call this method.
759 */
760 virtual void TearDown() { }
761
762 /**
763 * The actual job-processing entry point. See the description of the
764 * inheriting class requirements to learn about the semantics of this methods.
765 * DO NOT FORGET TO CALL master()->JobSuccessful() OR master()->JobFinished()
766 * at the end of thismethod!!
767 *
768 * Note: There is no way to generally define this operator, it is therefore
769 * commented out and placed here just as a reminder.
770 *
771 * @param data the data to be processed.
772 */
773 // void operator()(const expected_data &data); // do the actual job of the
774 // worker
775
776 protected:
777 ConcurrentWorker() : master_(NULL) { }
778
779 /**
780 * Gets a pointer to the ConcurrentWorkers object that this worker resides in
781 *
782 * @returns a pointer to the ConcurrentWorkers object
783 */
784 inline ConcurrentWorkers<DerivedWorkerT> *master() const { return master_; }
785
786 private:
787 friend class ConcurrentWorkers<DerivedWorkerT>;
788 void RegisterMaster(ConcurrentWorkers<DerivedWorkerT> *master) {
789 master_ = master;
790 }
791
792 private:
793 ConcurrentWorkers<DerivedWorkerT> *master_;
794 };
795
796 #ifdef CVMFS_NAMESPACE_GUARD
797 } // namespace CVMFS_NAMESPACE_GUARD
798 #endif
799
800 #include "util/concurrency_impl.h"
801
802 #endif // CVMFS_UTIL_CONCURRENCY_H_
803