GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/util/concurrency.h
Date: 2024-04-21 02:33:16
Exec Total Coverage
Lines: 49 83 59.0%
Branches: 9 16 56.2%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_UTIL_CONCURRENCY_H_
6 #define CVMFS_UTIL_CONCURRENCY_H_
7
8 #include <pthread.h>
9
10 #include <cassert>
11 #include <queue>
12 #include <set>
13 #include <vector>
14
15 #include "util/async.h"
16 #include "util/atomic.h"
17 #include "util/export.h"
18 #include "util/mutex.h"
19 #include "util/single_copy.h"
20
21 #ifdef CVMFS_NAMESPACE_GUARD
22 namespace CVMFS_NAMESPACE_GUARD {
23 #endif
24
25 /**
26 * Implements a simple interface to lock objects of derived classes. Classes that
27 * inherit from Lockable are also usable with the LockGuard template for scoped
28 * locking semantics.
29 */
30 class CVMFS_EXPORT Lockable : SingleCopy {
31 public:
32 2 inline virtual ~Lockable() { pthread_mutex_destroy(&mutex_); }
33
34 1 void Lock() const { pthread_mutex_lock(&mutex_); }
35 2 int TryLock() const { return pthread_mutex_trylock(&mutex_); }
36 1 void Unlock() const { pthread_mutex_unlock(&mutex_); }
37
38 protected:
39 1 Lockable() {
40 1 const int retval = pthread_mutex_init(&mutex_, NULL);
41
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 assert(retval == 0);
42 1 }
43
44 private:
45 mutable pthread_mutex_t mutex_;
46 };
47
48
49 //
50 // -----------------------------------------------------------------------------
51 //
52
53
54 /**
55 * This counter can be counted up and down using the usual increment/decrement
56 * operators. It allows threads to wait for it to become zero as well as to
57 * block when a specified maximal value would be exceeded by an increment.
58 *
59 * Note: If a maximal value is specified on creation, the SynchronizingCounter
60 * is assumed to never leave the interval [0, maximal_value]! Otherwise
61 * the numerical limits of the specified template parameter define this
62 * interval and an increment _never_ blocks.
63 *
64 * Caveat: This implementation uses a simple mutex mechanism and therefore might
65 * become a scalability bottle neck!
66 */
67 template <typename T>
68 class SynchronizingCounter : SingleCopy {
69 public:
70 8 SynchronizingCounter() :
71 8 value_(T(0)), maximal_value_(T(0)) { Initialize(); }
72
73 131 explicit SynchronizingCounter(const T maximal_value)
74 131 : value_(T(0))
75 131 , maximal_value_(maximal_value)
76 {
77
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 131 times.
131 assert(maximal_value > T(0));
78 131 Initialize();
79 131 }
80
81 139 ~SynchronizingCounter() { Destroy(); }
82
83 71888251 T Increment() {
84 71888251 MutexLockGuard l(mutex_);
85
1/2
✓ Branch 1 taken 72935238 times.
✗ Branch 2 not taken.
72935238 WaitForFreeSlotUnprotected();
86
1/2
✓ Branch 1 taken 673231 times.
✗ Branch 2 not taken.
72935238 SetValueUnprotected(value_ + T(1));
87 72042438 return value_;
88 72935238 }
89
90 72210462 T Decrement() {
91 72210462 MutexLockGuard l(mutex_);
92
1/2
✓ Branch 1 taken 673231 times.
✗ Branch 2 not taken.
72935238 SetValueUnprotected(value_ - T(1));
93 144554863 return value_;
94 72935238 }
95
96 147 void WaitForZero() const {
97 147 MutexLockGuard l(mutex_);
98
2/2
✓ Branch 0 taken 43 times.
✓ Branch 1 taken 147 times.
190 while (value_ != T(0)) {
99
1/2
✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
43 pthread_cond_wait(&became_zero_, &mutex_);
100 }
101
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 147 times.
147 assert(value_ == T(0));
102 147 }
103
104 440766463 bool HasMaximalValue() const { return maximal_value_ != T(0); }
105 8 T maximal_value() const { return maximal_value_; }
106
107 671969 T operator++() { return Increment(); }
108 4 const T operator++(int) { return Increment() - T(1); }
109 672626 T operator--() { return Decrement(); }
110 4 const T operator--(int) { return Decrement() + T(1); }
111
112 2765 T Get() const {
113 2765 MutexLockGuard l(mutex_);
114 5530 return value_;
115 2765 }
116
117 14 SynchronizingCounter<T>& operator=(const T &other) {
118 14 MutexLockGuard l(mutex_);
119 14 SetValueUnprotected(other);
120 28 return *this;
121 14 }
122
123 protected:
124 void SetValueUnprotected(const T new_value);
125 void WaitForFreeSlotUnprotected();
126
127 private:
128 void Initialize();
129 void Destroy();
130
131 private:
132 T value_;
133 const T maximal_value_;
134
135 mutable pthread_mutex_t mutex_;
136 mutable pthread_cond_t became_zero_;
137 pthread_cond_t free_slot_;
138 };
139
140
141 //
142 // -----------------------------------------------------------------------------
143 //
144
145
146 template <typename ParamT>
147 class Observable;
148
149
150 /**
151 * This is a base class for classes that need to expose a callback interface for
152 * asynchronous callback methods. One can register an arbitrary number of
153 * observers on an Observable that get notified when the method NotifyListeners()
154 * is invoked.
155 *
156 * Note: the registration and invocation of callbacks in Observable is thread-
157 * safe, but be aware that the callbacks of observing classes might run in
158 * arbitrary threads. When using these classes, you should take extra care
159 * for thread-safety.
160 *
161 * Note: The RegisterListener() methods return a pointer to a CallbackBase.
162 * You MUST NOT free these objects, they are managed by the Observable
163 * class. Use them only as handles to unregister specific callbacks.
164 *
165 * @param ParamT the type of the parameter that is passed to every callback
166 * invocation.
167 */
168 template <typename ParamT>
169 class Observable : public Callbackable<ParamT>,
170 SingleCopy {
171 public:
172 typedef typename Callbackable<ParamT>::CallbackTN* CallbackPtr;
173 protected:
174 typedef std::set<CallbackPtr> Callbacks;
175
176 public:
177 virtual ~Observable();
178
179 /**
180 * Registers a method of a specific object as a listener to the Observable
181 * object. The method is invoked on the given delegate when the callback is
182 * fired by the observed object using NotifyListeners(). Since this is meant
183 * to be a closure, it also passes the third argument to the method being in-
184 * voked by the Observable object.
185 *
186 * @param DelegateT the type of the delegate object
187 * @param method a pointer to the method to be invoked by the callback
188 * @param delegate a pointer to the object to invoke the callback on
189 * @param closure something to be passed to `method`
190 * @return a handle to the registered callback
191 */
192 template <class DelegateT, class ClosureDataT>
193 CallbackPtr RegisterListener(
194 typename BoundClosure<ParamT,
195 DelegateT,
196 ClosureDataT>::CallbackMethod method,
197 DelegateT *delegate,
198 ClosureDataT data);
199
200 /**
201 * Registers a method of a specific object as a listener to the Observable
202 * object. The method is invoked on the given delegate when the callback is
203 * fired by the observed object using NotifyListeners().
204 *
205 * @param DelegateT the type of the delegate object
206 * @param method a pointer to the method to be invoked by the callback
207 * @param delegate a pointer to the object to invoke the callback on
208 * @return a handle to the registered callback
209 */
210 template <class DelegateT>
211 CallbackPtr RegisterListener(
212 typename BoundCallback<ParamT, DelegateT>::CallbackMethod method,
213 DelegateT *delegate);
214
215 /**
216 * Registers a static class member or a C-like function as a callback to the
217 * Observable object. The function is invoked when the callback is fired by
218 * the observed object using NotifyListeners().
219 *
220 * @param fn a pointer to the function to be called by the callback
221 * @return a handle to the registered callback
222 */
223 CallbackPtr RegisterListener(typename Callback<ParamT>::CallbackFunction fn);
224
225 /**
226 * Removes the given callback from the listeners group of this Observable.
227 *
228 * @param callback_object a callback handle that was returned by
229 * RegisterListener() before.
230 */
231 void UnregisterListener(CallbackPtr callback_object);
232
233 /**
234 * Removes all listeners from the Observable
235 */
236 void UnregisterListeners();
237
238 protected:
239 Observable(); // don't instantiate this as a stand alone object
240
241 void RegisterListener(CallbackPtr callback_object);
242
243 /**
244 * Notifies all registered listeners and passes them the provided argument
245 * This method should be called by a derived class to send out asynchronous
246 * messages to registered observers.
247 *
248 * @param parameter the data to be passed to the observers
249 */
250 void NotifyListeners(const ParamT &parameter);
251
252 private:
253 Callbacks listeners_; //!< the set of registered
254 //!< callback objects
255 mutable pthread_rwlock_t listeners_rw_lock_;
256 };
257
258
259 //
260 // -----------------------------------------------------------------------------
261 //
262
263
264 /**
265 * Returns the number of CPU cores present in the system or a fallback number
266 * if it failed to determine the number of CPU cores.
267 *
268 * @return the number of active CPU cores in the system
269 */
270 CVMFS_EXPORT unsigned int GetNumberOfCpuCores();
271 static const unsigned int kFallbackNumberOfCpus = 1;
272
273
274 /**
275 * A blocking signal for thread synchronization
276 */
277 class CVMFS_EXPORT Signal : SingleCopy {
278 public:
279 Signal();
280 ~Signal();
281 void Wakeup();
282 void Wait();
283 bool IsSleeping();
284
285 private:
286 bool fired_;
287 pthread_mutex_t lock_;
288 pthread_cond_t signal_;
289 };
290
291
292 //
293 // -----------------------------------------------------------------------------
294 //
295
296
297 /**
298 * Asynchronous FIFO channel template
299 * Implements a thread safe FIFO queue that handles thread blocking if the queue
300 * is full or empty.
301 *
302 * @param T the data type to be enqueued in the queue
303 */
304 template <class T>
305 class FifoChannel : protected std::queue<T> {
306 public:
307 /**
308 * Creates a new FIFO channel.
309 *
310 * @param maximal_length the maximal number of items that can be enqueued
311 * @param drainout_threshold if less than xx elements are in the queue it is
312 * considered to be "not full"
313 */
314 FifoChannel(const size_t maximal_length,
315 const size_t drainout_threshold);
316 virtual ~FifoChannel();
317
318 /**
319 * Adds a new item to the end of the FIFO channel. If the queue is full, this
320 * call will block until items were dequeued by another thread allowing the
321 * desired insertion.
322 *
323 * @param data the data to be enqueued into the FIFO channel
324 */
325 void Enqueue(const T &data);
326
327 /**
328 * Removes the next element from the channel. If the queue is empty, this will
329 * block until another thread enqueues an item into the channel.
330 *
331 * @return the first item in the channel queue
332 */
333 const T Dequeue();
334
335 /**
336 * Clears all items in the FIFO channel. The cleared items will be lost.
337 *
338 * @return the number of dropped items
339 */
340 unsigned int Drop();
341
342 inline size_t GetItemCount() const;
343 inline bool IsEmpty() const;
344 inline size_t GetMaximalItemCount() const;
345
346 private:
347 // general configuration
348 const size_t maximal_queue_length_;
349 const size_t queue_drainout_threshold_;
350
351 // thread synchronisation structures
352 mutable pthread_mutex_t mutex_;
353 mutable pthread_cond_t queue_is_not_empty_;
354 mutable pthread_cond_t queue_is_not_full_;
355 };
356
357
358 /**
359 * This template implements a generic producer/consumer approach to concurrent
360 * worker tasks. It spawns a given number of Workers derived from the base class
361 * ConcurrentWorker and uses them to distribute the work load onto concurrent
362 * threads.
363 * One can have multiple producers, that use Schedule() to post new work into
364 * a central job queue, which in turn is processed concurrently by the Worker
365 * objects in multiple threads. Furthermore the template provides an interface
366 * to control the worker swarm, i.e. to wait for their completion or cancel them
367 * before all jobs are processed.
368 *
369 * Note: A worker is a class inheriting from ConcurrentWorker that needs to meet
370 * a couple of requirements. See the documentation of ConcurrentWorker for
371 * additional details.
372 *
373 * @param WorkerT the class to be used as a worker for a concurrent worker
374 * swarm
375 */
376 template <class WorkerT>
377 class ConcurrentWorkers : public Observable<typename WorkerT::returned_data> {
378 public:
379 // these data types must be defined by the worker class
380 /**
381 * Input data type
382 */
383 typedef typename WorkerT::expected_data expected_data_t;
384 /**
385 * Output data type
386 */
387 typedef typename WorkerT::returned_data returned_data_t;
388 /**
389 * Common context type
390 */
391 typedef typename WorkerT::worker_context worker_context_t;
392
393 protected:
394 typedef std::vector<pthread_t> WorkerThreads;
395
396 /**
397 * This is a simple wrapper structure to piggy-back control information on
398 * scheduled jobs. Job structures are scheduled into a central FIFO queue and
399 * are then processed concurrently by the workers.
400 */
401 template <class DataT>
402 struct Job {
403 explicit Job(const DataT &data) :
404 data(data),
405 is_death_sentence(false) {}
406 Job() :
407 data(),
408 is_death_sentence(true) {}
409 const DataT data; //!< job payload
410 const bool is_death_sentence; //!< death sentence flag
411 };
412 typedef Job<expected_data_t> WorkerJob;
413 typedef Job<returned_data_t> CallbackJob;
414
415 /**
416 * Provides a wrapper for initialization data passed to newly spawned worker
417 * threads for initialization.
418 * It contains a pointer to the spawning ConcurrentWorkers master object as
419 * well as a pointer to a context object defined by the concrete worker to be
420 * spawned.
421 */
422 struct RunBinding {
423 explicit RunBinding(ConcurrentWorkers<WorkerT> *delegate) :
424 delegate(delegate) {}
425 ConcurrentWorkers<WorkerT> *delegate; //!< delegate to the Concurrent-
426 //!< Workers master
427 };
428
429 struct WorkerRunBinding : RunBinding {
430 WorkerRunBinding(ConcurrentWorkers<WorkerT> *delegate,
431 const worker_context_t *worker_context) :
432 RunBinding(delegate),
433 worker_context(worker_context) {}
434 /**
435 * WorkerT defined context objects for worker init.
436 */
437 const worker_context_t *worker_context;
438 };
439
440 public:
441 /**
442 * Creates a ConcurrentWorkers master object that encapsulates the actual
443 * workers.
444 *
445 * @param number_of_workers the number of concurrent workers to be spawned
446 * @param maximal_queue_length the maximal length of the job queue
447 * (>= number_of_workers)
448 * @param worker_context a pointer to the WorkerT defined context object
449 */
450 ConcurrentWorkers(const size_t number_of_workers,
451 const size_t maximal_queue_length,
452 worker_context_t *worker_context = NULL);
453 virtual ~ConcurrentWorkers();
454
455 /**
456 * Initializes the ConcurrentWorkers swarm, spawnes a thread for each new
457 * worker object and puts everything into working state.
458 *
459 * @return true if all went fine
460 */
461 bool Initialize();
462
463 /**
464 * Schedules a new job for processing into the internal job queue. This method
465 * will block in case the job queue is already full and wait for an empty slot.
466 *
467 * @param data the data to be processed
468 */
469 inline void Schedule(const expected_data_t &data) {
470 Schedule(WorkerJob(data));
471 }
472
473 /**
474 * Shuts down the ConcurrentWorkers object as well as the encapsulated workers
475 * as soon as possible. Workers will finish their current job and will termi-
476 * nate afterwards. If no jobs are scheduled they will simply stop waiting for
477 * new ones and terminate afterwards.
478 * This method MUST not be called more than once per ConcurrentWorkers.
479 */
480 void Terminate();
481
482 /**
483 * Waits until the job queue is fully processed
484 *
485 * Note: this might lead to undefined behaviour or infinite waiting if other
486 * producers still schedule jobs into the job queue.
487 */
488 void WaitForEmptyQueue() const;
489
490 /**
491 * Waits until the ConcurrentWorkers swarm fully processed the current job
492 * queue and shuts down afterwards.
493 *
494 * Note: just as for WaitForEmptyQueue() this assumes that no other producers
495 * schedule jobs in the mean time.
496 */
497 void WaitForTermination();
498
499 inline unsigned int GetNumberOfWorkers() const { return number_of_workers_; }
500 inline unsigned int GetNumberOfFailedJobs() const {
501 return atomic_read32(&jobs_failed_);
502 }
503
504 /**
505 * Defines a job as successfully finished.
506 * DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT!
507 *
508 * @param data the data to be returned back to the user
509 */
510 inline void JobSuccessful(const returned_data_t& data) {
511 JobDone(data, true);
512 }
513
514 /**
515 * Defines a job as failed.
516 * DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT!
517 *
518 * Note: Even for failed jobs the user will get a callback with a data object.
519 * You might want to make sure, that this data contains a status flag as
520 * well, telling the user what went wrong.
521 *
522 * @param data the data to be returned back to the user
523 */
524 inline void JobFailed(const returned_data_t& data) { JobDone(data, false); }
525
526 void RunCallbackThread();
527
528 protected:
529 bool SpawnWorkers();
530
531 /**
532 * POSIX conform function for thread entry point. Is invoked for every new
533 * worker thread and contains the initialization, processing loop and tear
534 * down of the unique worker objects
535 *
536 * @param run_binding void pointer to a RunBinding structure (C interface)
537 * @return NULL in any case
538 */
539 static void* RunWorker(void *run_binding);
540
541 static void* RunCallbackThreadWrapper(void *run_binding);
542
543 /**
544 * Tells the master that a worker thread did start. This does not mean, that
545 * it was initialized successfully.
546 */
547 void ReportStartedWorker() const;
548
549 void Schedule(WorkerJob job);
550 void ScheduleDeathSentences();
551
552 /**
553 * Empties the job queue
554 *
555 * @param forget_pending controls if cancelled jobs should be seen as finished
556 */
557 void TruncateJobQueue(const bool forget_pending = false);
558
559 /**
560 * Retrieves a job from the job queue. If the job queue is empty it will block
561 * until there is a new job available for processing.
562 * THIS METHOD MUST ONLY BE CALLED INSIDE THE WORKER OBJECTS
563 *
564 * @return a job to be processed by a worker
565 */
566 inline WorkerJob Acquire();
567
568 /**
569 * Controls the asynchronous finishing of a job.
570 * DO NOT CALL THIS, use JobSuccessful() or JobFailed() wrappers instead.
571 *
572 * @param data the data to be returned to the user
573 * @param success flag if job was successful
574 */
575 void JobDone(const returned_data_t& data, const bool success = true);
576
577 inline void StartRunning() {
578 MutexLockGuard guard(status_mutex_);
579 running_ = true;
580 }
581 inline void StopRunning() {
582 MutexLockGuard guard(status_mutex_);
583 running_ = false;
584 }
585 inline bool IsRunning() const {
586 MutexLockGuard guard(status_mutex_);
587 return running_;
588 }
589
590 private:
591 // general configuration
592 const size_t number_of_workers_; //!< number of concurrent worker threads
593 const worker_context_t *worker_context_; //!< the WorkerT defined context
594 /**
595 * The thread context passed to newly spawned threads
596 */
597 WorkerRunBinding thread_context_;
598
599 // status information
600 bool initialized_;
601 bool running_;
602 mutable unsigned int workers_started_;
603 mutable pthread_mutex_t status_mutex_;
604 mutable pthread_cond_t worker_started_;
605 mutable pthread_mutex_t jobs_all_done_mutex_;
606 mutable pthread_cond_t jobs_all_done_;
607
608 // worker threads
609 WorkerThreads worker_threads_; //!< list of worker threads
610 pthread_t callback_thread_; //!< handles callback invokes
611
612 // job queue
613 typedef FifoChannel<WorkerJob > JobQueue;
614 JobQueue jobs_queue_;
615 mutable atomic_int32 jobs_pending_;
616 mutable atomic_int32 jobs_failed_;
617 mutable atomic_int64 jobs_processed_;
618
619 // callback channel
620 typedef FifoChannel<CallbackJob > CallbackQueue;
621 CallbackQueue results_queue_;
622 };
623
624
625 /**
626 * Base class for worker classes that should be used in a ConcurrentWorkers
627 * swarm. These classes need to fulfill a number of requirements in order to
628 * satisfy the needs of the ConcurrentWorkers template.
629 *
630 * Requirements:
631 * -> needs to define the following types:
632 * - expected_data - input data structure of the worker
633 * - returned_data - output data structure of the worker
634 * - worker_context - context structure for initialization information
635 *
636 * -> implement a constructor that takes a pointer to its worker_context
637 * as its only parameter:
638 * AwesomeWorker(const AwesomeWorker::worker_context*)
639 * Note: do not rely on the context object to be available after the
640 * constructor has returned!
641 *
642 * -> needs to define the calling-operator expecting one parameter of type:
643 * const expected_data& and returning void
644 * This will be invoked for every new job the worker should process
645 *
646 * -> inside the implementation of the described calling-operator it needs to
647 * invoke either:
648 * master()->JobSuccessful(const returned_data&)
649 * or:
650 * master()->JobFailed(const returned_data&)
651 * as its LAST operation before returning.
652 * This will keep track of finished jobs and inform the user of Concurrent-
653 * Workers about finished jobs.
654 *
655 * -> [optional] overwrite Initialize() and/or TearDown() to do environmental
656 * setup work, before or respectively after jobs will be processed
657 *
658 * General Reminder:
659 * You will be running in a multi-threaded environment here! Buckle up and
660 * make suitable preparations to shield yourself from serious head-ache.
661 *
662 * Note: This implements a Curiously Recurring Template Pattern
663 * (http://en.wikipedia.org/wiki/Curiously_recurring_template_pattern)
664 *
665 * @param DerivedWorkerT the class name of the inheriting class
666 * (f.e. class AwesomeWorker : public ConcurrentWorker<AwesomeWorker>)
667 */
668 template <class DerivedWorkerT>
669 class ConcurrentWorker : SingleCopy {
670 public:
671 virtual ~ConcurrentWorker() {}
672
673 /**
674 * Does general initialization before any jobs will get scheduled. You do not
675 * need to up-call this initialize method, since it is seen as a dummy here.
676 *
677 * @returns true one successful initialization
678 */
679 virtual bool Initialize() { return true; }
680
681 /**
682 * Does general clean-up after the last job was processed in the worker object
683 * and it is about to vanish. You do not need to up-call this method.
684 */
685 virtual void TearDown() {}
686
687 /**
688 * The actual job-processing entry point. See the description of the inheriting
689 * class requirements to learn about the semantics of this methods.
690 * DO NOT FORGET TO CALL master()->JobSuccessful() OR master()->JobFinished()
691 * at the end of thismethod!!
692 *
693 * Note: There is no way to generally define this operator, it is therefore
694 * commented out and placed here just as a reminder.
695 *
696 * @param data the data to be processed.
697 */
698 // void operator()(const expected_data &data); // do the actual job of the
699 // worker
700
701 protected:
702 ConcurrentWorker() : master_(NULL) {}
703
704 /**
705 * Gets a pointer to the ConcurrentWorkers object that this worker resides in
706 *
707 * @returns a pointer to the ConcurrentWorkers object
708 */
709 inline ConcurrentWorkers<DerivedWorkerT>* master() const { return master_; }
710
711 private:
712 friend class ConcurrentWorkers<DerivedWorkerT>;
713 void RegisterMaster(ConcurrentWorkers<DerivedWorkerT> *master) {
714 master_ = master;
715 }
716
717 private:
718 ConcurrentWorkers<DerivedWorkerT> *master_;
719 };
720
721 #ifdef CVMFS_NAMESPACE_GUARD
722 } // namespace CVMFS_NAMESPACE_GUARD
723 #endif
724
725 #include "util/concurrency_impl.h"
726
727 #endif // CVMFS_UTIL_CONCURRENCY_H_
728