Directory: | cvmfs/ |
---|---|
File: | cvmfs/util/concurrency.h |
Date: | 2025-04-20 02:34:28 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 87 | 121 | 71.9% |
Branches: | 21 | 38 | 55.3% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #ifndef CVMFS_UTIL_CONCURRENCY_H_ | ||
6 | #define CVMFS_UTIL_CONCURRENCY_H_ | ||
7 | |||
8 | #include <pthread.h> | ||
9 | |||
10 | #include <cassert> | ||
11 | #include <queue> | ||
12 | #include <set> | ||
13 | #include <vector> | ||
14 | |||
15 | #include "util/async.h" | ||
16 | #include "util/atomic.h" | ||
17 | #include "util/export.h" | ||
18 | #include "util/mutex.h" | ||
19 | #include "util/single_copy.h" | ||
20 | |||
21 | #ifdef CVMFS_NAMESPACE_GUARD | ||
22 | namespace CVMFS_NAMESPACE_GUARD { | ||
23 | #endif | ||
24 | |||
25 | /** | ||
26 | * A thread-safe, unbounded vector of items that implement a FIFO channel. | ||
27 | * Uses conditional variables to block when threads try to pop from the empty | ||
28 | * channel. | ||
29 | */ | ||
30 | template <class ItemT> | ||
31 | class Channel : SingleCopy { | ||
32 | public: | ||
33 | 6 | Channel() { | |
34 | 6 | int retval = pthread_mutex_init(&lock_, NULL); | |
35 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
|
6 | assert(retval == 0); |
36 | 6 | retval = pthread_cond_init(&cond_populated_, NULL); | |
37 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
|
6 | assert(retval == 0); |
38 | 6 | } | |
39 | |||
40 | 6 | ~Channel() { | |
41 | 6 | pthread_cond_destroy(&cond_populated_); | |
42 | 6 | pthread_mutex_destroy(&lock_); | |
43 | 6 | } | |
44 | |||
45 | /** | ||
46 | * Returns the queue locked and ready for appending 1 item. | ||
47 | */ | ||
48 | 10000 | std::vector<ItemT *> *StartEnqueueing() { | |
49 | 10000 | int retval = pthread_mutex_lock(&lock_); | |
50 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 10000 times.
|
10000 | assert(retval == 0); |
51 | 10000 | return &items_; | |
52 | } | ||
53 | |||
54 | /** | ||
55 | * Unlocks the queue. The queue must remain unchanged when this is called. | ||
56 | */ | ||
57 | 5000 | void AbortEnqueueing() { | |
58 | 5000 | int retval = pthread_mutex_unlock(&lock_); | |
59 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 5000 times.
|
5000 | assert(retval == 0); |
60 | 5000 | } | |
61 | |||
62 | /** | ||
63 | * 1 new item was added to the queue. Unlock and signal to reader thread. | ||
64 | */ | ||
65 | 5000 | void CommitEnqueueing() { | |
66 | 5000 | int retval = pthread_cond_signal(&cond_populated_); | |
67 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 5000 times.
|
5000 | assert(retval == 0); |
68 | 5000 | retval = pthread_mutex_unlock(&lock_); | |
69 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 5000 times.
|
5000 | assert(retval == 0); |
70 | 5000 | } | |
71 | |||
72 | 9 | void PushBack(ItemT *item) { | |
73 | 9 | MutexLockGuard lock_guard(&lock_); | |
74 |
1/2✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
|
9 | items_.push_back(item); |
75 | 9 | int retval = pthread_cond_signal(&cond_populated_); | |
76 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
|
9 | assert(retval == 0); |
77 | 9 | } | |
78 | |||
79 | /** | ||
80 | * Remove and return the first element from the queue. Block if tube is | ||
81 | * empty. | ||
82 | */ | ||
83 | 5009 | ItemT *PopFront() { | |
84 | 5009 | MutexLockGuard lock_guard(&lock_); | |
85 |
2/2✓ Branch 1 taken 7 times.
✓ Branch 2 taken 5009 times.
|
5016 | while (items_.size() == 0) |
86 |
1/2✓ Branch 1 taken 7 times.
✗ Branch 2 not taken.
|
7 | pthread_cond_wait(&cond_populated_, &lock_); |
87 | 5009 | ItemT *item = items_[0]; | |
88 |
1/2✓ Branch 3 taken 5009 times.
✗ Branch 4 not taken.
|
5009 | items_.erase(items_.begin()); |
89 | 5009 | return item; | |
90 | 5009 | } | |
91 | |||
92 | private: | ||
93 | /** | ||
94 | * The locked queue/channel | ||
95 | */ | ||
96 | std::vector<ItemT *> items_; | ||
97 | /** | ||
98 | * Protects all internal state | ||
99 | */ | ||
100 | pthread_mutex_t lock_; | ||
101 | /** | ||
102 | * Signals if there are items enqueued | ||
103 | */ | ||
104 | pthread_cond_t cond_populated_; | ||
105 | }; | ||
106 | |||
107 | /** | ||
108 | * Implements a simple interface to lock objects of derived classes. Classes that | ||
109 | * inherit from Lockable are also usable with the LockGuard template for scoped | ||
110 | * locking semantics. | ||
111 | */ | ||
112 | class CVMFS_EXPORT Lockable : SingleCopy { | ||
113 | public: | ||
114 | 2 | inline virtual ~Lockable() { pthread_mutex_destroy(&mutex_); } | |
115 | |||
116 | 1 | void Lock() const { pthread_mutex_lock(&mutex_); } | |
117 | 2 | int TryLock() const { return pthread_mutex_trylock(&mutex_); } | |
118 | 1 | void Unlock() const { pthread_mutex_unlock(&mutex_); } | |
119 | |||
120 | protected: | ||
121 | 1 | Lockable() { | |
122 | 1 | const int retval = pthread_mutex_init(&mutex_, NULL); | |
123 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | assert(retval == 0); |
124 | 1 | } | |
125 | |||
126 | private: | ||
127 | mutable pthread_mutex_t mutex_; | ||
128 | }; | ||
129 | |||
130 | |||
131 | // | ||
132 | // ----------------------------------------------------------------------------- | ||
133 | // | ||
134 | |||
135 | |||
136 | /** | ||
137 | * This counter can be counted up and down using the usual increment/decrement | ||
138 | * operators. It allows threads to wait for it to become zero as well as to | ||
139 | * block when a specified maximal value would be exceeded by an increment. | ||
140 | * | ||
141 | * Note: If a maximal value is specified on creation, the SynchronizingCounter | ||
142 | * is assumed to never leave the interval [0, maximal_value]! Otherwise | ||
143 | * the numerical limits of the specified template parameter define this | ||
144 | * interval and an increment _never_ blocks. | ||
145 | * | ||
146 | * Caveat: This implementation uses a simple mutex mechanism and therefore might | ||
147 | * become a scalability bottle neck! | ||
148 | */ | ||
149 | template <typename T> | ||
150 | class SynchronizingCounter : SingleCopy { | ||
151 | public: | ||
152 | 8 | SynchronizingCounter() : | |
153 | 8 | value_(T(0)), maximal_value_(T(0)) { Initialize(); } | |
154 | |||
155 | 133 | explicit SynchronizingCounter(const T maximal_value) | |
156 | 133 | : value_(T(0)) | |
157 | 133 | , maximal_value_(maximal_value) | |
158 | { | ||
159 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 133 times.
|
133 | assert(maximal_value > T(0)); |
160 | 133 | Initialize(); | |
161 | 133 | } | |
162 | |||
163 | 141 | ~SynchronizingCounter() { Destroy(); } | |
164 | |||
165 | 71947402 | T Increment() { | |
166 | 71947402 | MutexLockGuard l(mutex_); | |
167 |
1/2✓ Branch 1 taken 72935256 times.
✗ Branch 2 not taken.
|
72935256 | WaitForFreeSlotUnprotected(); |
168 |
1/2✓ Branch 1 taken 673249 times.
✗ Branch 2 not taken.
|
72935256 | SetValueUnprotected(value_ + T(1)); |
169 | 72087140 | return value_; | |
170 | 72935256 | } | |
171 | |||
172 | 72222515 | T Decrement() { | |
173 | 72222515 | MutexLockGuard l(mutex_); | |
174 |
1/2✓ Branch 1 taken 673249 times.
✗ Branch 2 not taken.
|
72935256 | SetValueUnprotected(value_ - T(1)); |
175 | 144534228 | return value_; | |
176 | 72935256 | } | |
177 | |||
178 | 149 | void WaitForZero() const { | |
179 | 149 | MutexLockGuard l(mutex_); | |
180 |
2/2✓ Branch 0 taken 43 times.
✓ Branch 1 taken 151 times.
|
194 | while (value_ != T(0)) { |
181 |
1/2✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
|
43 | pthread_cond_wait(&became_zero_, &mutex_); |
182 | } | ||
183 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 151 times.
|
151 | assert(value_ == T(0)); |
184 | 151 | } | |
185 | |||
186 | 440621653 | bool HasMaximalValue() const { return maximal_value_ != T(0); } | |
187 | 8 | T maximal_value() const { return maximal_value_; } | |
188 | |||
189 | 672143 | T operator++() { return Increment(); } | |
190 | 4 | const T operator++(int) { return Increment() - T(1); } | |
191 | 672644 | T operator--() { return Decrement(); } | |
192 | 4 | const T operator--(int) { return Decrement() + T(1); } | |
193 | |||
194 | 2765 | T Get() const { | |
195 | 2765 | MutexLockGuard l(mutex_); | |
196 | 5530 | return value_; | |
197 | 2765 | } | |
198 | |||
199 | 14 | SynchronizingCounter<T>& operator=(const T &other) { | |
200 | 14 | MutexLockGuard l(mutex_); | |
201 | 14 | SetValueUnprotected(other); | |
202 | 28 | return *this; | |
203 | 14 | } | |
204 | |||
205 | protected: | ||
206 | void SetValueUnprotected(const T new_value); | ||
207 | void WaitForFreeSlotUnprotected(); | ||
208 | |||
209 | private: | ||
210 | void Initialize(); | ||
211 | void Destroy(); | ||
212 | |||
213 | private: | ||
214 | T value_; | ||
215 | const T maximal_value_; | ||
216 | |||
217 | mutable pthread_mutex_t mutex_; | ||
218 | mutable pthread_cond_t became_zero_; | ||
219 | pthread_cond_t free_slot_; | ||
220 | }; | ||
221 | |||
222 | |||
223 | // | ||
224 | // ----------------------------------------------------------------------------- | ||
225 | // | ||
226 | |||
227 | |||
228 | template <typename ParamT> | ||
229 | class Observable; | ||
230 | |||
231 | |||
232 | /** | ||
233 | * This is a base class for classes that need to expose a callback interface for | ||
234 | * asynchronous callback methods. One can register an arbitrary number of | ||
235 | * observers on an Observable that get notified when the method NotifyListeners() | ||
236 | * is invoked. | ||
237 | * | ||
238 | * Note: the registration and invocation of callbacks in Observable is thread- | ||
239 | * safe, but be aware that the callbacks of observing classes might run in | ||
240 | * arbitrary threads. When using these classes, you should take extra care | ||
241 | * for thread-safety. | ||
242 | * | ||
243 | * Note: The RegisterListener() methods return a pointer to a CallbackBase. | ||
244 | * You MUST NOT free these objects, they are managed by the Observable | ||
245 | * class. Use them only as handles to unregister specific callbacks. | ||
246 | * | ||
247 | * @param ParamT the type of the parameter that is passed to every callback | ||
248 | * invocation. | ||
249 | */ | ||
250 | template <typename ParamT> | ||
251 | class Observable : public Callbackable<ParamT>, | ||
252 | SingleCopy { | ||
253 | public: | ||
254 | typedef typename Callbackable<ParamT>::CallbackTN* CallbackPtr; | ||
255 | protected: | ||
256 | typedef std::set<CallbackPtr> Callbacks; | ||
257 | |||
258 | public: | ||
259 | virtual ~Observable(); | ||
260 | |||
261 | /** | ||
262 | * Registers a method of a specific object as a listener to the Observable | ||
263 | * object. The method is invoked on the given delegate when the callback is | ||
264 | * fired by the observed object using NotifyListeners(). Since this is meant | ||
265 | * to be a closure, it also passes the third argument to the method being in- | ||
266 | * voked by the Observable object. | ||
267 | * | ||
268 | * @param DelegateT the type of the delegate object | ||
269 | * @param method a pointer to the method to be invoked by the callback | ||
270 | * @param delegate a pointer to the object to invoke the callback on | ||
271 | * @param closure something to be passed to `method` | ||
272 | * @return a handle to the registered callback | ||
273 | */ | ||
274 | template <class DelegateT, class ClosureDataT> | ||
275 | CallbackPtr RegisterListener( | ||
276 | typename BoundClosure<ParamT, | ||
277 | DelegateT, | ||
278 | ClosureDataT>::CallbackMethod method, | ||
279 | DelegateT *delegate, | ||
280 | ClosureDataT data); | ||
281 | |||
282 | /** | ||
283 | * Registers a method of a specific object as a listener to the Observable | ||
284 | * object. The method is invoked on the given delegate when the callback is | ||
285 | * fired by the observed object using NotifyListeners(). | ||
286 | * | ||
287 | * @param DelegateT the type of the delegate object | ||
288 | * @param method a pointer to the method to be invoked by the callback | ||
289 | * @param delegate a pointer to the object to invoke the callback on | ||
290 | * @return a handle to the registered callback | ||
291 | */ | ||
292 | template <class DelegateT> | ||
293 | CallbackPtr RegisterListener( | ||
294 | typename BoundCallback<ParamT, DelegateT>::CallbackMethod method, | ||
295 | DelegateT *delegate); | ||
296 | |||
297 | /** | ||
298 | * Registers a static class member or a C-like function as a callback to the | ||
299 | * Observable object. The function is invoked when the callback is fired by | ||
300 | * the observed object using NotifyListeners(). | ||
301 | * | ||
302 | * @param fn a pointer to the function to be called by the callback | ||
303 | * @return a handle to the registered callback | ||
304 | */ | ||
305 | CallbackPtr RegisterListener(typename Callback<ParamT>::CallbackFunction fn); | ||
306 | |||
307 | /** | ||
308 | * Removes the given callback from the listeners group of this Observable. | ||
309 | * | ||
310 | * @param callback_object a callback handle that was returned by | ||
311 | * RegisterListener() before. | ||
312 | */ | ||
313 | void UnregisterListener(CallbackPtr callback_object); | ||
314 | |||
315 | /** | ||
316 | * Removes all listeners from the Observable | ||
317 | */ | ||
318 | void UnregisterListeners(); | ||
319 | |||
320 | protected: | ||
321 | Observable(); // don't instantiate this as a stand alone object | ||
322 | |||
323 | void RegisterListener(CallbackPtr callback_object); | ||
324 | |||
325 | /** | ||
326 | * Notifies all registered listeners and passes them the provided argument | ||
327 | * This method should be called by a derived class to send out asynchronous | ||
328 | * messages to registered observers. | ||
329 | * | ||
330 | * @param parameter the data to be passed to the observers | ||
331 | */ | ||
332 | void NotifyListeners(const ParamT ¶meter); | ||
333 | |||
334 | private: | ||
335 | Callbacks listeners_; //!< the set of registered | ||
336 | //!< callback objects | ||
337 | mutable pthread_rwlock_t listeners_rw_lock_; | ||
338 | }; | ||
339 | |||
340 | |||
341 | // | ||
342 | // ----------------------------------------------------------------------------- | ||
343 | // | ||
344 | |||
345 | |||
346 | /** | ||
347 | * Returns the number of CPU cores present in the system or a fallback number | ||
348 | * if it failed to determine the number of CPU cores. | ||
349 | * | ||
350 | * @return the number of active CPU cores in the system | ||
351 | */ | ||
352 | CVMFS_EXPORT unsigned int GetNumberOfCpuCores(); | ||
353 | static const unsigned int kFallbackNumberOfCpus = 1; | ||
354 | |||
355 | |||
356 | /** | ||
357 | * A blocking signal for thread synchronization | ||
358 | */ | ||
359 | class CVMFS_EXPORT Signal : SingleCopy { | ||
360 | public: | ||
361 | Signal(); | ||
362 | ~Signal(); | ||
363 | void Wakeup(); | ||
364 | void Wait(); | ||
365 | bool IsSleeping(); | ||
366 | |||
367 | private: | ||
368 | bool fired_; | ||
369 | pthread_mutex_t lock_; | ||
370 | pthread_cond_t signal_; | ||
371 | }; | ||
372 | |||
373 | |||
374 | // | ||
375 | // ----------------------------------------------------------------------------- | ||
376 | // | ||
377 | |||
378 | |||
379 | /** | ||
380 | * Asynchronous FIFO channel template | ||
381 | * Implements a thread safe FIFO queue that handles thread blocking if the queue | ||
382 | * is full or empty. | ||
383 | * | ||
384 | * @param T the data type to be enqueued in the queue | ||
385 | */ | ||
386 | template <class T> | ||
387 | class FifoChannel : protected std::queue<T> { | ||
388 | public: | ||
389 | /** | ||
390 | * Creates a new FIFO channel. | ||
391 | * | ||
392 | * @param maximal_length the maximal number of items that can be enqueued | ||
393 | * @param drainout_threshold if less than xx elements are in the queue it is | ||
394 | * considered to be "not full" | ||
395 | */ | ||
396 | FifoChannel(const size_t maximal_length, | ||
397 | const size_t drainout_threshold); | ||
398 | virtual ~FifoChannel(); | ||
399 | |||
400 | /** | ||
401 | * Adds a new item to the end of the FIFO channel. If the queue is full, this | ||
402 | * call will block until items were dequeued by another thread allowing the | ||
403 | * desired insertion. | ||
404 | * | ||
405 | * @param data the data to be enqueued into the FIFO channel | ||
406 | */ | ||
407 | void Enqueue(const T &data); | ||
408 | |||
409 | /** | ||
410 | * Removes the next element from the channel. If the queue is empty, this will | ||
411 | * block until another thread enqueues an item into the channel. | ||
412 | * | ||
413 | * @return the first item in the channel queue | ||
414 | */ | ||
415 | const T Dequeue(); | ||
416 | |||
417 | /** | ||
418 | * Clears all items in the FIFO channel. The cleared items will be lost. | ||
419 | * | ||
420 | * @return the number of dropped items | ||
421 | */ | ||
422 | unsigned int Drop(); | ||
423 | |||
424 | inline size_t GetItemCount() const; | ||
425 | inline bool IsEmpty() const; | ||
426 | inline size_t GetMaximalItemCount() const; | ||
427 | |||
428 | private: | ||
429 | // general configuration | ||
430 | const size_t maximal_queue_length_; | ||
431 | const size_t queue_drainout_threshold_; | ||
432 | |||
433 | // thread synchronisation structures | ||
434 | mutable pthread_mutex_t mutex_; | ||
435 | mutable pthread_cond_t queue_is_not_empty_; | ||
436 | mutable pthread_cond_t queue_is_not_full_; | ||
437 | }; | ||
438 | |||
439 | |||
440 | /** | ||
441 | * This template implements a generic producer/consumer approach to concurrent | ||
442 | * worker tasks. It spawns a given number of Workers derived from the base class | ||
443 | * ConcurrentWorker and uses them to distribute the work load onto concurrent | ||
444 | * threads. | ||
445 | * One can have multiple producers, that use Schedule() to post new work into | ||
446 | * a central job queue, which in turn is processed concurrently by the Worker | ||
447 | * objects in multiple threads. Furthermore the template provides an interface | ||
448 | * to control the worker swarm, i.e. to wait for their completion or cancel them | ||
449 | * before all jobs are processed. | ||
450 | * | ||
451 | * Note: A worker is a class inheriting from ConcurrentWorker that needs to meet | ||
452 | * a couple of requirements. See the documentation of ConcurrentWorker for | ||
453 | * additional details. | ||
454 | * | ||
455 | * @param WorkerT the class to be used as a worker for a concurrent worker | ||
456 | * swarm | ||
457 | */ | ||
458 | template <class WorkerT> | ||
459 | class ConcurrentWorkers : public Observable<typename WorkerT::returned_data> { | ||
460 | public: | ||
461 | // these data types must be defined by the worker class | ||
462 | /** | ||
463 | * Input data type | ||
464 | */ | ||
465 | typedef typename WorkerT::expected_data expected_data_t; | ||
466 | /** | ||
467 | * Output data type | ||
468 | */ | ||
469 | typedef typename WorkerT::returned_data returned_data_t; | ||
470 | /** | ||
471 | * Common context type | ||
472 | */ | ||
473 | typedef typename WorkerT::worker_context worker_context_t; | ||
474 | |||
475 | protected: | ||
476 | typedef std::vector<pthread_t> WorkerThreads; | ||
477 | |||
478 | /** | ||
479 | * This is a simple wrapper structure to piggy-back control information on | ||
480 | * scheduled jobs. Job structures are scheduled into a central FIFO queue and | ||
481 | * are then processed concurrently by the workers. | ||
482 | */ | ||
483 | template <class DataT> | ||
484 | struct Job { | ||
485 | ✗ | explicit Job(const DataT &data) : | |
486 | ✗ | data(data), | |
487 | ✗ | is_death_sentence(false) {} | |
488 | ✗ | Job() : | |
489 | ✗ | data(), | |
490 | ✗ | is_death_sentence(true) {} | |
491 | const DataT data; //!< job payload | ||
492 | const bool is_death_sentence; //!< death sentence flag | ||
493 | }; | ||
494 | typedef Job<expected_data_t> WorkerJob; | ||
495 | typedef Job<returned_data_t> CallbackJob; | ||
496 | |||
497 | /** | ||
498 | * Provides a wrapper for initialization data passed to newly spawned worker | ||
499 | * threads for initialization. | ||
500 | * It contains a pointer to the spawning ConcurrentWorkers master object as | ||
501 | * well as a pointer to a context object defined by the concrete worker to be | ||
502 | * spawned. | ||
503 | */ | ||
504 | struct RunBinding { | ||
505 | ✗ | explicit RunBinding(ConcurrentWorkers<WorkerT> *delegate) : | |
506 | ✗ | delegate(delegate) {} | |
507 | ConcurrentWorkers<WorkerT> *delegate; //!< delegate to the Concurrent- | ||
508 | //!< Workers master | ||
509 | }; | ||
510 | |||
511 | struct WorkerRunBinding : RunBinding { | ||
512 | ✗ | WorkerRunBinding(ConcurrentWorkers<WorkerT> *delegate, | |
513 | const worker_context_t *worker_context) : | ||
514 | RunBinding(delegate), | ||
515 | ✗ | worker_context(worker_context) {} | |
516 | /** | ||
517 | * WorkerT defined context objects for worker init. | ||
518 | */ | ||
519 | const worker_context_t *worker_context; | ||
520 | }; | ||
521 | |||
522 | public: | ||
523 | /** | ||
524 | * Creates a ConcurrentWorkers master object that encapsulates the actual | ||
525 | * workers. | ||
526 | * | ||
527 | * @param number_of_workers the number of concurrent workers to be spawned | ||
528 | * @param maximal_queue_length the maximal length of the job queue | ||
529 | * (>= number_of_workers) | ||
530 | * @param worker_context a pointer to the WorkerT defined context object | ||
531 | */ | ||
532 | ConcurrentWorkers(const size_t number_of_workers, | ||
533 | const size_t maximal_queue_length, | ||
534 | worker_context_t *worker_context = NULL); | ||
535 | virtual ~ConcurrentWorkers(); | ||
536 | |||
537 | /** | ||
538 | * Initializes the ConcurrentWorkers swarm, spawnes a thread for each new | ||
539 | * worker object and puts everything into working state. | ||
540 | * | ||
541 | * @return true if all went fine | ||
542 | */ | ||
543 | bool Initialize(); | ||
544 | |||
545 | /** | ||
546 | * Schedules a new job for processing into the internal job queue. This method | ||
547 | * will block in case the job queue is already full and wait for an empty slot. | ||
548 | * | ||
549 | * @param data the data to be processed | ||
550 | */ | ||
551 | ✗ | inline void Schedule(const expected_data_t &data) { | |
552 | ✗ | Schedule(WorkerJob(data)); | |
553 | } | ||
554 | |||
555 | /** | ||
556 | * Shuts down the ConcurrentWorkers object as well as the encapsulated workers | ||
557 | * as soon as possible. Workers will finish their current job and will termi- | ||
558 | * nate afterwards. If no jobs are scheduled they will simply stop waiting for | ||
559 | * new ones and terminate afterwards. | ||
560 | * This method MUST not be called more than once per ConcurrentWorkers. | ||
561 | */ | ||
562 | void Terminate(); | ||
563 | |||
564 | /** | ||
565 | * Waits until the job queue is fully processed | ||
566 | * | ||
567 | * Note: this might lead to undefined behaviour or infinite waiting if other | ||
568 | * producers still schedule jobs into the job queue. | ||
569 | */ | ||
570 | void WaitForEmptyQueue() const; | ||
571 | |||
572 | /** | ||
573 | * Waits until the ConcurrentWorkers swarm fully processed the current job | ||
574 | * queue and shuts down afterwards. | ||
575 | * | ||
576 | * Note: just as for WaitForEmptyQueue() this assumes that no other producers | ||
577 | * schedule jobs in the mean time. | ||
578 | */ | ||
579 | void WaitForTermination(); | ||
580 | |||
581 | ✗ | inline unsigned int GetNumberOfWorkers() const { return number_of_workers_; } | |
582 | ✗ | inline unsigned int GetNumberOfFailedJobs() const { | |
583 | ✗ | return atomic_read32(&jobs_failed_); | |
584 | } | ||
585 | |||
586 | /** | ||
587 | * Defines a job as successfully finished. | ||
588 | * DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT! | ||
589 | * | ||
590 | * @param data the data to be returned back to the user | ||
591 | */ | ||
592 | ✗ | inline void JobSuccessful(const returned_data_t& data) { | |
593 | ✗ | JobDone(data, true); | |
594 | } | ||
595 | |||
596 | /** | ||
597 | * Defines a job as failed. | ||
598 | * DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT! | ||
599 | * | ||
600 | * Note: Even for failed jobs the user will get a callback with a data object. | ||
601 | * You might want to make sure, that this data contains a status flag as | ||
602 | * well, telling the user what went wrong. | ||
603 | * | ||
604 | * @param data the data to be returned back to the user | ||
605 | */ | ||
606 | ✗ | inline void JobFailed(const returned_data_t& data) { JobDone(data, false); } | |
607 | |||
608 | void RunCallbackThread(); | ||
609 | |||
610 | protected: | ||
611 | bool SpawnWorkers(); | ||
612 | |||
613 | /** | ||
614 | * POSIX conform function for thread entry point. Is invoked for every new | ||
615 | * worker thread and contains the initialization, processing loop and tear | ||
616 | * down of the unique worker objects | ||
617 | * | ||
618 | * @param run_binding void pointer to a RunBinding structure (C interface) | ||
619 | * @return NULL in any case | ||
620 | */ | ||
621 | static void* RunWorker(void *run_binding); | ||
622 | |||
623 | static void* RunCallbackThreadWrapper(void *run_binding); | ||
624 | |||
625 | /** | ||
626 | * Tells the master that a worker thread did start. This does not mean, that | ||
627 | * it was initialized successfully. | ||
628 | */ | ||
629 | void ReportStartedWorker() const; | ||
630 | |||
631 | void Schedule(WorkerJob job); | ||
632 | void ScheduleDeathSentences(); | ||
633 | |||
634 | /** | ||
635 | * Empties the job queue | ||
636 | * | ||
637 | * @param forget_pending controls if cancelled jobs should be seen as finished | ||
638 | */ | ||
639 | void TruncateJobQueue(const bool forget_pending = false); | ||
640 | |||
641 | /** | ||
642 | * Retrieves a job from the job queue. If the job queue is empty it will block | ||
643 | * until there is a new job available for processing. | ||
644 | * THIS METHOD MUST ONLY BE CALLED INSIDE THE WORKER OBJECTS | ||
645 | * | ||
646 | * @return a job to be processed by a worker | ||
647 | */ | ||
648 | inline WorkerJob Acquire(); | ||
649 | |||
650 | /** | ||
651 | * Controls the asynchronous finishing of a job. | ||
652 | * DO NOT CALL THIS, use JobSuccessful() or JobFailed() wrappers instead. | ||
653 | * | ||
654 | * @param data the data to be returned to the user | ||
655 | * @param success flag if job was successful | ||
656 | */ | ||
657 | void JobDone(const returned_data_t& data, const bool success = true); | ||
658 | |||
659 | ✗ | inline void StartRunning() { | |
660 | ✗ | MutexLockGuard guard(status_mutex_); | |
661 | ✗ | running_ = true; | |
662 | } | ||
663 | ✗ | inline void StopRunning() { | |
664 | ✗ | MutexLockGuard guard(status_mutex_); | |
665 | ✗ | running_ = false; | |
666 | } | ||
667 | ✗ | inline bool IsRunning() const { | |
668 | ✗ | MutexLockGuard guard(status_mutex_); | |
669 | ✗ | return running_; | |
670 | } | ||
671 | |||
672 | private: | ||
673 | // general configuration | ||
674 | const size_t number_of_workers_; //!< number of concurrent worker threads | ||
675 | const worker_context_t *worker_context_; //!< the WorkerT defined context | ||
676 | /** | ||
677 | * The thread context passed to newly spawned threads | ||
678 | */ | ||
679 | WorkerRunBinding thread_context_; | ||
680 | |||
681 | // status information | ||
682 | bool initialized_; | ||
683 | bool running_; | ||
684 | mutable unsigned int workers_started_; | ||
685 | mutable pthread_mutex_t status_mutex_; | ||
686 | mutable pthread_cond_t worker_started_; | ||
687 | mutable pthread_mutex_t jobs_all_done_mutex_; | ||
688 | mutable pthread_cond_t jobs_all_done_; | ||
689 | |||
690 | // worker threads | ||
691 | WorkerThreads worker_threads_; //!< list of worker threads | ||
692 | pthread_t callback_thread_; //!< handles callback invokes | ||
693 | |||
694 | // job queue | ||
695 | typedef FifoChannel<WorkerJob > JobQueue; | ||
696 | JobQueue jobs_queue_; | ||
697 | mutable atomic_int32 jobs_pending_; | ||
698 | mutable atomic_int32 jobs_failed_; | ||
699 | mutable atomic_int64 jobs_processed_; | ||
700 | |||
701 | // callback channel | ||
702 | typedef FifoChannel<CallbackJob > CallbackQueue; | ||
703 | CallbackQueue results_queue_; | ||
704 | }; | ||
705 | |||
706 | |||
707 | /** | ||
708 | * Base class for worker classes that should be used in a ConcurrentWorkers | ||
709 | * swarm. These classes need to fulfill a number of requirements in order to | ||
710 | * satisfy the needs of the ConcurrentWorkers template. | ||
711 | * | ||
712 | * Requirements: | ||
713 | * -> needs to define the following types: | ||
714 | * - expected_data - input data structure of the worker | ||
715 | * - returned_data - output data structure of the worker | ||
716 | * - worker_context - context structure for initialization information | ||
717 | * | ||
718 | * -> implement a constructor that takes a pointer to its worker_context | ||
719 | * as its only parameter: | ||
720 | * AwesomeWorker(const AwesomeWorker::worker_context*) | ||
721 | * Note: do not rely on the context object to be available after the | ||
722 | * constructor has returned! | ||
723 | * | ||
724 | * -> needs to define the calling-operator expecting one parameter of type: | ||
725 | * const expected_data& and returning void | ||
726 | * This will be invoked for every new job the worker should process | ||
727 | * | ||
728 | * -> inside the implementation of the described calling-operator it needs to | ||
729 | * invoke either: | ||
730 | * master()->JobSuccessful(const returned_data&) | ||
731 | * or: | ||
732 | * master()->JobFailed(const returned_data&) | ||
733 | * as its LAST operation before returning. | ||
734 | * This will keep track of finished jobs and inform the user of Concurrent- | ||
735 | * Workers about finished jobs. | ||
736 | * | ||
737 | * -> [optional] overwrite Initialize() and/or TearDown() to do environmental | ||
738 | * setup work, before or respectively after jobs will be processed | ||
739 | * | ||
740 | * General Reminder: | ||
741 | * You will be running in a multi-threaded environment here! Buckle up and | ||
742 | * make suitable preparations to shield yourself from serious head-ache. | ||
743 | * | ||
744 | * Note: This implements a Curiously Recurring Template Pattern | ||
745 | * (http://en.wikipedia.org/wiki/Curiously_recurring_template_pattern) | ||
746 | * | ||
747 | * @param DerivedWorkerT the class name of the inheriting class | ||
748 | * (f.e. class AwesomeWorker : public ConcurrentWorker<AwesomeWorker>) | ||
749 | */ | ||
750 | template <class DerivedWorkerT> | ||
751 | class ConcurrentWorker : SingleCopy { | ||
752 | public: | ||
753 | ✗ | virtual ~ConcurrentWorker() {} | |
754 | |||
755 | /** | ||
756 | * Does general initialization before any jobs will get scheduled. You do not | ||
757 | * need to up-call this initialize method, since it is seen as a dummy here. | ||
758 | * | ||
759 | * @returns true one successful initialization | ||
760 | */ | ||
761 | ✗ | virtual bool Initialize() { return true; } | |
762 | |||
763 | /** | ||
764 | * Does general clean-up after the last job was processed in the worker object | ||
765 | * and it is about to vanish. You do not need to up-call this method. | ||
766 | */ | ||
767 | ✗ | virtual void TearDown() {} | |
768 | |||
769 | /** | ||
770 | * The actual job-processing entry point. See the description of the inheriting | ||
771 | * class requirements to learn about the semantics of this methods. | ||
772 | * DO NOT FORGET TO CALL master()->JobSuccessful() OR master()->JobFinished() | ||
773 | * at the end of thismethod!! | ||
774 | * | ||
775 | * Note: There is no way to generally define this operator, it is therefore | ||
776 | * commented out and placed here just as a reminder. | ||
777 | * | ||
778 | * @param data the data to be processed. | ||
779 | */ | ||
780 | // void operator()(const expected_data &data); // do the actual job of the | ||
781 | // worker | ||
782 | |||
783 | protected: | ||
784 | ✗ | ConcurrentWorker() : master_(NULL) {} | |
785 | |||
786 | /** | ||
787 | * Gets a pointer to the ConcurrentWorkers object that this worker resides in | ||
788 | * | ||
789 | * @returns a pointer to the ConcurrentWorkers object | ||
790 | */ | ||
791 | ✗ | inline ConcurrentWorkers<DerivedWorkerT>* master() const { return master_; } | |
792 | |||
793 | private: | ||
794 | friend class ConcurrentWorkers<DerivedWorkerT>; | ||
795 | ✗ | void RegisterMaster(ConcurrentWorkers<DerivedWorkerT> *master) { | |
796 | ✗ | master_ = master; | |
797 | } | ||
798 | |||
799 | private: | ||
800 | ConcurrentWorkers<DerivedWorkerT> *master_; | ||
801 | }; | ||
802 | |||
803 | #ifdef CVMFS_NAMESPACE_GUARD | ||
804 | } // namespace CVMFS_NAMESPACE_GUARD | ||
805 | #endif | ||
806 | |||
807 | #include "util/concurrency_impl.h" | ||
808 | |||
809 | #endif // CVMFS_UTIL_CONCURRENCY_H_ | ||
810 |