Directory: | cvmfs/ |
---|---|
File: | cvmfs/util/concurrency.h |
Date: | 2025-07-13 02:35:07 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 85 | 115 | 73.9% |
Branches: | 21 | 38 | 55.3% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #ifndef CVMFS_UTIL_CONCURRENCY_H_ | ||
6 | #define CVMFS_UTIL_CONCURRENCY_H_ | ||
7 | |||
8 | #include <pthread.h> | ||
9 | |||
10 | #include <cassert> | ||
11 | #include <cstddef> | ||
12 | #include <queue> | ||
13 | #include <set> | ||
14 | #include <vector> | ||
15 | |||
16 | #include "util/async.h" | ||
17 | #include "util/atomic.h" | ||
18 | #include "util/export.h" | ||
19 | #include "util/mutex.h" | ||
20 | #include "util/single_copy.h" | ||
21 | |||
22 | #ifdef CVMFS_NAMESPACE_GUARD | ||
23 | namespace CVMFS_NAMESPACE_GUARD { | ||
24 | #endif | ||
25 | |||
26 | /** | ||
27 | * A thread-safe, unbounded vector of items that implement a FIFO channel. | ||
28 | * Uses conditional variables to block when threads try to pop from the empty | ||
29 | * channel. | ||
30 | */ | ||
31 | template<class ItemT> | ||
32 | class Channel : SingleCopy { | ||
33 | public: | ||
34 | 71 | Channel() { | |
35 | 71 | int retval = pthread_mutex_init(&lock_, NULL); | |
36 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 71 times.
|
71 | assert(retval == 0); |
37 | 71 | retval = pthread_cond_init(&cond_populated_, NULL); | |
38 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 71 times.
|
71 | assert(retval == 0); |
39 | 71 | } | |
40 | |||
41 | 71 | ~Channel() { | |
42 | 71 | pthread_cond_destroy(&cond_populated_); | |
43 | 71 | pthread_mutex_destroy(&lock_); | |
44 | 71 | } | |
45 | |||
46 | /** | ||
47 | * Returns the queue locked and ready for appending 1 item. | ||
48 | */ | ||
49 | 460000 | std::vector<ItemT *> *StartEnqueueing() { | |
50 | 460000 | int retval = pthread_mutex_lock(&lock_); | |
51 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 460000 times.
|
460000 | assert(retval == 0); |
52 | 460000 | return &items_; | |
53 | } | ||
54 | |||
55 | /** | ||
56 | * Unlocks the queue. The queue must remain unchanged when this is called. | ||
57 | */ | ||
58 | 230000 | void AbortEnqueueing() { | |
59 | 230000 | int retval = pthread_mutex_unlock(&lock_); | |
60 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 230000 times.
|
230000 | assert(retval == 0); |
61 | 230000 | } | |
62 | |||
63 | /** | ||
64 | * 1 new item was added to the queue. Unlock and signal to reader thread. | ||
65 | */ | ||
66 | 230000 | void CommitEnqueueing() { | |
67 | 230000 | int retval = pthread_cond_signal(&cond_populated_); | |
68 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 230000 times.
|
230000 | assert(retval == 0); |
69 | 230000 | retval = pthread_mutex_unlock(&lock_); | |
70 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 230000 times.
|
230000 | assert(retval == 0); |
71 | 230000 | } | |
72 | |||
73 | 45 | void PushBack(ItemT *item) { | |
74 | 45 | MutexLockGuard lock_guard(&lock_); | |
75 |
1/2✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
|
45 | items_.push_back(item); |
76 | 45 | int retval = pthread_cond_signal(&cond_populated_); | |
77 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 45 times.
|
45 | assert(retval == 0); |
78 | 45 | } | |
79 | |||
80 | /** | ||
81 | * Remove and return the first element from the queue. Block if tube is | ||
82 | * empty. | ||
83 | */ | ||
84 | 230045 | ItemT *PopFront() { | |
85 | 230045 | MutexLockGuard lock_guard(&lock_); | |
86 |
2/2✓ Branch 1 taken 35 times.
✓ Branch 2 taken 230045 times.
|
230080 | while (items_.size() == 0) |
87 |
1/2✓ Branch 1 taken 35 times.
✗ Branch 2 not taken.
|
35 | pthread_cond_wait(&cond_populated_, &lock_); |
88 | 230045 | ItemT *item = items_[0]; | |
89 |
1/2✓ Branch 3 taken 230045 times.
✗ Branch 4 not taken.
|
230045 | items_.erase(items_.begin()); |
90 | 230045 | return item; | |
91 | 230045 | } | |
92 | |||
93 | private: | ||
94 | /** | ||
95 | * The locked queue/channel | ||
96 | */ | ||
97 | std::vector<ItemT *> items_; | ||
98 | /** | ||
99 | * Protects all internal state | ||
100 | */ | ||
101 | pthread_mutex_t lock_; | ||
102 | /** | ||
103 | * Signals if there are items enqueued | ||
104 | */ | ||
105 | pthread_cond_t cond_populated_; | ||
106 | }; | ||
107 | |||
108 | /** | ||
109 | * Implements a simple interface to lock objects of derived classes. Classes | ||
110 | * that inherit from Lockable are also usable with the LockGuard template for | ||
111 | * scoped locking semantics. | ||
112 | */ | ||
113 | class CVMFS_EXPORT Lockable : SingleCopy { | ||
114 | public: | ||
115 | 92 | inline virtual ~Lockable() { pthread_mutex_destroy(&mutex_); } | |
116 | |||
117 | 46 | void Lock() const { pthread_mutex_lock(&mutex_); } | |
118 | 92 | int TryLock() const { return pthread_mutex_trylock(&mutex_); } | |
119 | 46 | void Unlock() const { pthread_mutex_unlock(&mutex_); } | |
120 | |||
121 | protected: | ||
122 | 46 | Lockable() { | |
123 | 46 | const int retval = pthread_mutex_init(&mutex_, NULL); | |
124 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 46 times.
|
46 | assert(retval == 0); |
125 | 46 | } | |
126 | |||
127 | private: | ||
128 | mutable pthread_mutex_t mutex_; | ||
129 | }; | ||
130 | |||
131 | |||
132 | // | ||
133 | // ----------------------------------------------------------------------------- | ||
134 | // | ||
135 | |||
136 | |||
137 | /** | ||
138 | * This counter can be counted up and down using the usual increment/decrement | ||
139 | * operators. It allows threads to wait for it to become zero as well as to | ||
140 | * block when a specified maximal value would be exceeded by an increment. | ||
141 | * | ||
142 | * Note: If a maximal value is specified on creation, the SynchronizingCounter | ||
143 | * is assumed to never leave the interval [0, maximal_value]! Otherwise | ||
144 | * the numerical limits of the specified template parameter define this | ||
145 | * interval and an increment _never_ blocks. | ||
146 | * | ||
147 | * Caveat: This implementation uses a simple mutex mechanism and therefore might | ||
148 | * become a scalability bottle neck! | ||
149 | */ | ||
150 | template<typename T> | ||
151 | class SynchronizingCounter : SingleCopy { | ||
152 | public: | ||
153 | 32 | SynchronizingCounter() : value_(T(0)), maximal_value_(T(0)) { Initialize(); } | |
154 | |||
155 | 1983 | explicit SynchronizingCounter(const T maximal_value) | |
156 | 1983 | : value_(T(0)), maximal_value_(maximal_value) { | |
157 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1983 times.
|
1983 | assert(maximal_value > T(0)); |
158 | 1983 | Initialize(); | |
159 | 1983 | } | |
160 | |||
161 | 2014 | ~SynchronizingCounter() { Destroy(); } | |
162 | |||
163 | 287734071 | T Increment() { | |
164 | 287734071 | MutexLockGuard l(mutex_); | |
165 |
1/2✓ Branch 1 taken 292495233 times.
✗ Branch 2 not taken.
|
292495233 | WaitForFreeSlotUnprotected(); |
166 |
1/2✓ Branch 1 taken 3447100 times.
✗ Branch 2 not taken.
|
292495233 | SetValueUnprotected(value_ + T(1)); |
167 | 289333006 | return value_; | |
168 | 292495233 | } | |
169 | |||
170 | 289747857 | T Decrement() { | |
171 | 289747857 | MutexLockGuard l(mutex_); | |
172 |
1/2✓ Branch 1 taken 3447100 times.
✗ Branch 2 not taken.
|
292495233 | SetValueUnprotected(value_ - T(1)); |
173 | 579324322 | return value_; | |
174 | 292495233 | } | |
175 | |||
176 | 2775 | void WaitForZero() const { | |
177 | 2775 | MutexLockGuard l(mutex_); | |
178 |
2/2✓ Branch 0 taken 191 times.
✓ Branch 1 taken 2775 times.
|
2966 | while (value_ != T(0)) { |
179 |
1/2✓ Branch 1 taken 191 times.
✗ Branch 2 not taken.
|
191 | pthread_cond_wait(&became_zero_, &mutex_); |
180 | } | ||
181 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2775 times.
|
2775 | assert(value_ == T(0)); |
182 | 2775 | } | |
183 | |||
184 | 1771712780 | bool HasMaximalValue() const { return maximal_value_ != T(0); } | |
185 | 88 | T maximal_value() const { return maximal_value_; } | |
186 | |||
187 | 3441787 | T operator++() { return Increment(); } | |
188 | 23 | const T operator++(int) { return Increment() - T(1); } | |
189 | 3444075 | T operator--() { return Decrement(); } | |
190 | 23 | const T operator--(int) { return Decrement() + T(1); } | |
191 | |||
192 | 13920 | T Get() const { | |
193 | 13920 | MutexLockGuard l(mutex_); | |
194 | 27840 | return value_; | |
195 | 13920 | } | |
196 | |||
197 | 91 | SynchronizingCounter<T> &operator=(const T &other) { | |
198 | 91 | MutexLockGuard l(mutex_); | |
199 | 91 | SetValueUnprotected(other); | |
200 | 182 | return *this; | |
201 | 91 | } | |
202 | |||
203 | protected: | ||
204 | void SetValueUnprotected(const T new_value); | ||
205 | void WaitForFreeSlotUnprotected(); | ||
206 | |||
207 | private: | ||
208 | void Initialize(); | ||
209 | void Destroy(); | ||
210 | |||
211 | private: | ||
212 | T value_; | ||
213 | const T maximal_value_; | ||
214 | |||
215 | mutable pthread_mutex_t mutex_; | ||
216 | mutable pthread_cond_t became_zero_; | ||
217 | pthread_cond_t free_slot_; | ||
218 | }; | ||
219 | |||
220 | |||
221 | // | ||
222 | // ----------------------------------------------------------------------------- | ||
223 | // | ||
224 | |||
225 | |||
226 | template<typename ParamT> | ||
227 | class Observable; | ||
228 | |||
229 | |||
230 | /** | ||
231 | * This is a base class for classes that need to expose a callback interface for | ||
232 | * asynchronous callback methods. One can register an arbitrary number of | ||
233 | * observers on an Observable that get notified when the method | ||
234 | * NotifyListeners() is invoked. | ||
235 | * | ||
236 | * Note: the registration and invocation of callbacks in Observable is thread- | ||
237 | * safe, but be aware that the callbacks of observing classes might run in | ||
238 | * arbitrary threads. When using these classes, you should take extra care | ||
239 | * for thread-safety. | ||
240 | * | ||
241 | * Note: The RegisterListener() methods return a pointer to a CallbackBase. | ||
242 | * You MUST NOT free these objects, they are managed by the Observable | ||
243 | * class. Use them only as handles to unregister specific callbacks. | ||
244 | * | ||
245 | * @param ParamT the type of the parameter that is passed to every callback | ||
246 | * invocation. | ||
247 | */ | ||
248 | template<typename ParamT> | ||
249 | class Observable : public Callbackable<ParamT>, SingleCopy { | ||
250 | public: | ||
251 | typedef typename Callbackable<ParamT>::CallbackTN *CallbackPtr; | ||
252 | |||
253 | protected: | ||
254 | typedef std::set<CallbackPtr> Callbacks; | ||
255 | |||
256 | public: | ||
257 | virtual ~Observable(); | ||
258 | |||
259 | /** | ||
260 | * Registers a method of a specific object as a listener to the Observable | ||
261 | * object. The method is invoked on the given delegate when the callback is | ||
262 | * fired by the observed object using NotifyListeners(). Since this is meant | ||
263 | * to be a closure, it also passes the third argument to the method being in- | ||
264 | * voked by the Observable object. | ||
265 | * | ||
266 | * @param DelegateT the type of the delegate object | ||
267 | * @param method a pointer to the method to be invoked by the callback | ||
268 | * @param delegate a pointer to the object to invoke the callback on | ||
269 | * @param closure something to be passed to `method` | ||
270 | * @return a handle to the registered callback | ||
271 | */ | ||
272 | template<class DelegateT, class ClosureDataT> | ||
273 | CallbackPtr RegisterListener( | ||
274 | typename BoundClosure<ParamT, DelegateT, ClosureDataT>::CallbackMethod | ||
275 | method, | ||
276 | DelegateT *delegate, | ||
277 | ClosureDataT data); | ||
278 | |||
279 | /** | ||
280 | * Registers a method of a specific object as a listener to the Observable | ||
281 | * object. The method is invoked on the given delegate when the callback is | ||
282 | * fired by the observed object using NotifyListeners(). | ||
283 | * | ||
284 | * @param DelegateT the type of the delegate object | ||
285 | * @param method a pointer to the method to be invoked by the callback | ||
286 | * @param delegate a pointer to the object to invoke the callback on | ||
287 | * @return a handle to the registered callback | ||
288 | */ | ||
289 | template<class DelegateT> | ||
290 | CallbackPtr RegisterListener( | ||
291 | typename BoundCallback<ParamT, DelegateT>::CallbackMethod method, | ||
292 | DelegateT *delegate); | ||
293 | |||
294 | /** | ||
295 | * Registers a static class member or a C-like function as a callback to the | ||
296 | * Observable object. The function is invoked when the callback is fired by | ||
297 | * the observed object using NotifyListeners(). | ||
298 | * | ||
299 | * @param fn a pointer to the function to be called by the callback | ||
300 | * @return a handle to the registered callback | ||
301 | */ | ||
302 | CallbackPtr RegisterListener(typename Callback<ParamT>::CallbackFunction fn); | ||
303 | |||
304 | /** | ||
305 | * Removes the given callback from the listeners group of this Observable. | ||
306 | * | ||
307 | * @param callback_object a callback handle that was returned by | ||
308 | * RegisterListener() before. | ||
309 | */ | ||
310 | void UnregisterListener(CallbackPtr callback_object); | ||
311 | |||
312 | /** | ||
313 | * Removes all listeners from the Observable | ||
314 | */ | ||
315 | void UnregisterListeners(); | ||
316 | |||
317 | protected: | ||
318 | Observable(); // don't instantiate this as a stand alone object | ||
319 | |||
320 | void RegisterListener(CallbackPtr callback_object); | ||
321 | |||
322 | /** | ||
323 | * Notifies all registered listeners and passes them the provided argument | ||
324 | * This method should be called by a derived class to send out asynchronous | ||
325 | * messages to registered observers. | ||
326 | * | ||
327 | * @param parameter the data to be passed to the observers | ||
328 | */ | ||
329 | void NotifyListeners(const ParamT ¶meter); | ||
330 | |||
331 | private: | ||
332 | Callbacks listeners_; //!< the set of registered | ||
333 | //!< callback objects | ||
334 | mutable pthread_rwlock_t listeners_rw_lock_; | ||
335 | }; | ||
336 | |||
337 | |||
338 | // | ||
339 | // ----------------------------------------------------------------------------- | ||
340 | // | ||
341 | |||
342 | |||
343 | /** | ||
344 | * Returns the number of CPU cores present in the system or a fallback number | ||
345 | * if it failed to determine the number of CPU cores. | ||
346 | * | ||
347 | * @return the number of active CPU cores in the system | ||
348 | */ | ||
349 | CVMFS_EXPORT unsigned int GetNumberOfCpuCores(); | ||
350 | static const unsigned int kFallbackNumberOfCpus = 1; | ||
351 | |||
352 | |||
353 | /** | ||
354 | * A blocking signal for thread synchronization | ||
355 | */ | ||
356 | class CVMFS_EXPORT Signal : SingleCopy { | ||
357 | public: | ||
358 | Signal(); | ||
359 | ~Signal(); | ||
360 | void Wakeup(); | ||
361 | void Wait(); | ||
362 | bool IsSleeping(); | ||
363 | |||
364 | private: | ||
365 | bool fired_; | ||
366 | pthread_mutex_t lock_; | ||
367 | pthread_cond_t signal_; | ||
368 | }; | ||
369 | |||
370 | |||
371 | // | ||
372 | // ----------------------------------------------------------------------------- | ||
373 | // | ||
374 | |||
375 | |||
376 | /** | ||
377 | * Asynchronous FIFO channel template | ||
378 | * Implements a thread safe FIFO queue that handles thread blocking if the queue | ||
379 | * is full or empty. | ||
380 | * | ||
381 | * @param T the data type to be enqueued in the queue | ||
382 | */ | ||
383 | template<class T> | ||
384 | class FifoChannel : protected std::queue<T> { | ||
385 | public: | ||
386 | /** | ||
387 | * Creates a new FIFO channel. | ||
388 | * | ||
389 | * @param maximal_length the maximal number of items that can be enqueued | ||
390 | * @param drainout_threshold if less than xx elements are in the queue it is | ||
391 | * considered to be "not full" | ||
392 | */ | ||
393 | FifoChannel(const size_t maximal_length, const size_t drainout_threshold); | ||
394 | virtual ~FifoChannel(); | ||
395 | |||
396 | /** | ||
397 | * Adds a new item to the end of the FIFO channel. If the queue is full, this | ||
398 | * call will block until items were dequeued by another thread allowing the | ||
399 | * desired insertion. | ||
400 | * | ||
401 | * @param data the data to be enqueued into the FIFO channel | ||
402 | */ | ||
403 | void Enqueue(const T &data); | ||
404 | |||
405 | /** | ||
406 | * Removes the next element from the channel. If the queue is empty, this will | ||
407 | * block until another thread enqueues an item into the channel. | ||
408 | * | ||
409 | * @return the first item in the channel queue | ||
410 | */ | ||
411 | const T Dequeue(); | ||
412 | |||
413 | /** | ||
414 | * Clears all items in the FIFO channel. The cleared items will be lost. | ||
415 | * | ||
416 | * @return the number of dropped items | ||
417 | */ | ||
418 | unsigned int Drop(); | ||
419 | |||
420 | inline size_t GetItemCount() const; | ||
421 | inline bool IsEmpty() const; | ||
422 | inline size_t GetMaximalItemCount() const; | ||
423 | |||
424 | private: | ||
425 | // general configuration | ||
426 | const size_t maximal_queue_length_; | ||
427 | const size_t queue_drainout_threshold_; | ||
428 | |||
429 | // thread synchronisation structures | ||
430 | mutable pthread_mutex_t mutex_; | ||
431 | mutable pthread_cond_t queue_is_not_empty_; | ||
432 | mutable pthread_cond_t queue_is_not_full_; | ||
433 | }; | ||
434 | |||
435 | |||
436 | /** | ||
437 | * This template implements a generic producer/consumer approach to concurrent | ||
438 | * worker tasks. It spawns a given number of Workers derived from the base class | ||
439 | * ConcurrentWorker and uses them to distribute the work load onto concurrent | ||
440 | * threads. | ||
441 | * One can have multiple producers, that use Schedule() to post new work into | ||
442 | * a central job queue, which in turn is processed concurrently by the Worker | ||
443 | * objects in multiple threads. Furthermore the template provides an interface | ||
444 | * to control the worker swarm, i.e. to wait for their completion or cancel them | ||
445 | * before all jobs are processed. | ||
446 | * | ||
447 | * Note: A worker is a class inheriting from ConcurrentWorker that needs to meet | ||
448 | * a couple of requirements. See the documentation of ConcurrentWorker for | ||
449 | * additional details. | ||
450 | * | ||
451 | * @param WorkerT the class to be used as a worker for a concurrent worker | ||
452 | * swarm | ||
453 | */ | ||
454 | template<class WorkerT> | ||
455 | class ConcurrentWorkers : public Observable<typename WorkerT::returned_data> { | ||
456 | public: | ||
457 | // these data types must be defined by the worker class | ||
458 | /** | ||
459 | * Input data type | ||
460 | */ | ||
461 | typedef typename WorkerT::expected_data expected_data_t; | ||
462 | /** | ||
463 | * Output data type | ||
464 | */ | ||
465 | typedef typename WorkerT::returned_data returned_data_t; | ||
466 | /** | ||
467 | * Common context type | ||
468 | */ | ||
469 | typedef typename WorkerT::worker_context worker_context_t; | ||
470 | |||
471 | protected: | ||
472 | typedef std::vector<pthread_t> WorkerThreads; | ||
473 | |||
474 | /** | ||
475 | * This is a simple wrapper structure to piggy-back control information on | ||
476 | * scheduled jobs. Job structures are scheduled into a central FIFO queue and | ||
477 | * are then processed concurrently by the workers. | ||
478 | */ | ||
479 | template<class DataT> | ||
480 | struct Job { | ||
481 | ✗ | explicit Job(const DataT &data) : data(data), is_death_sentence(false) { } | |
482 | ✗ | Job() : data(), is_death_sentence(true) { } | |
483 | const DataT data; //!< job payload | ||
484 | const bool is_death_sentence; //!< death sentence flag | ||
485 | }; | ||
486 | typedef Job<expected_data_t> WorkerJob; | ||
487 | typedef Job<returned_data_t> CallbackJob; | ||
488 | |||
489 | /** | ||
490 | * Provides a wrapper for initialization data passed to newly spawned worker | ||
491 | * threads for initialization. | ||
492 | * It contains a pointer to the spawning ConcurrentWorkers master object as | ||
493 | * well as a pointer to a context object defined by the concrete worker to be | ||
494 | * spawned. | ||
495 | */ | ||
496 | struct RunBinding { | ||
497 | ✗ | explicit RunBinding(ConcurrentWorkers<WorkerT> *delegate) | |
498 | ✗ | : delegate(delegate) { } | |
499 | ConcurrentWorkers<WorkerT> *delegate; //!< delegate to the Concurrent- | ||
500 | //!< Workers master | ||
501 | }; | ||
502 | |||
503 | struct WorkerRunBinding : RunBinding { | ||
504 | ✗ | WorkerRunBinding(ConcurrentWorkers<WorkerT> *delegate, | |
505 | const worker_context_t *worker_context) | ||
506 | ✗ | : RunBinding(delegate), worker_context(worker_context) { } | |
507 | /** | ||
508 | * WorkerT defined context objects for worker init. | ||
509 | */ | ||
510 | const worker_context_t *worker_context; | ||
511 | }; | ||
512 | |||
513 | public: | ||
514 | /** | ||
515 | * Creates a ConcurrentWorkers master object that encapsulates the actual | ||
516 | * workers. | ||
517 | * | ||
518 | * @param number_of_workers the number of concurrent workers to be spawned | ||
519 | * @param maximal_queue_length the maximal length of the job queue | ||
520 | * (>= number_of_workers) | ||
521 | * @param worker_context a pointer to the WorkerT defined context | ||
522 | * object | ||
523 | */ | ||
524 | ConcurrentWorkers(const size_t number_of_workers, | ||
525 | const size_t maximal_queue_length, | ||
526 | worker_context_t *worker_context = NULL); | ||
527 | virtual ~ConcurrentWorkers(); | ||
528 | |||
529 | /** | ||
530 | * Initializes the ConcurrentWorkers swarm, spawnes a thread for each new | ||
531 | * worker object and puts everything into working state. | ||
532 | * | ||
533 | * @return true if all went fine | ||
534 | */ | ||
535 | bool Initialize(); | ||
536 | |||
537 | /** | ||
538 | * Schedules a new job for processing into the internal job queue. This method | ||
539 | * will block in case the job queue is already full and wait for an empty | ||
540 | * slot. | ||
541 | * | ||
542 | * @param data the data to be processed | ||
543 | */ | ||
544 | ✗ | inline void Schedule(const expected_data_t &data) { | |
545 | ✗ | Schedule(WorkerJob(data)); | |
546 | } | ||
547 | |||
548 | /** | ||
549 | * Shuts down the ConcurrentWorkers object as well as the encapsulated workers | ||
550 | * as soon as possible. Workers will finish their current job and will termi- | ||
551 | * nate afterwards. If no jobs are scheduled they will simply stop waiting for | ||
552 | * new ones and terminate afterwards. | ||
553 | * This method MUST not be called more than once per ConcurrentWorkers. | ||
554 | */ | ||
555 | void Terminate(); | ||
556 | |||
557 | /** | ||
558 | * Waits until the job queue is fully processed | ||
559 | * | ||
560 | * Note: this might lead to undefined behaviour or infinite waiting if other | ||
561 | * producers still schedule jobs into the job queue. | ||
562 | */ | ||
563 | void WaitForEmptyQueue() const; | ||
564 | |||
565 | /** | ||
566 | * Waits until the ConcurrentWorkers swarm fully processed the current job | ||
567 | * queue and shuts down afterwards. | ||
568 | * | ||
569 | * Note: just as for WaitForEmptyQueue() this assumes that no other producers | ||
570 | * schedule jobs in the mean time. | ||
571 | */ | ||
572 | void WaitForTermination(); | ||
573 | |||
574 | ✗ | inline unsigned int GetNumberOfWorkers() const { return number_of_workers_; } | |
575 | ✗ | inline unsigned int GetNumberOfFailedJobs() const { | |
576 | ✗ | return atomic_read32(&jobs_failed_); | |
577 | } | ||
578 | |||
579 | /** | ||
580 | * Defines a job as successfully finished. | ||
581 | * DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT! | ||
582 | * | ||
583 | * @param data the data to be returned back to the user | ||
584 | */ | ||
585 | ✗ | inline void JobSuccessful(const returned_data_t &data) { | |
586 | ✗ | JobDone(data, true); | |
587 | } | ||
588 | |||
589 | /** | ||
590 | * Defines a job as failed. | ||
591 | * DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT! | ||
592 | * | ||
593 | * Note: Even for failed jobs the user will get a callback with a data object. | ||
594 | * You might want to make sure, that this data contains a status flag as | ||
595 | * well, telling the user what went wrong. | ||
596 | * | ||
597 | * @param data the data to be returned back to the user | ||
598 | */ | ||
599 | ✗ | inline void JobFailed(const returned_data_t &data) { JobDone(data, false); } | |
600 | |||
601 | void RunCallbackThread(); | ||
602 | |||
603 | protected: | ||
604 | bool SpawnWorkers(); | ||
605 | |||
606 | /** | ||
607 | * POSIX conform function for thread entry point. Is invoked for every new | ||
608 | * worker thread and contains the initialization, processing loop and tear | ||
609 | * down of the unique worker objects | ||
610 | * | ||
611 | * @param run_binding void pointer to a RunBinding structure (C interface) | ||
612 | * @return NULL in any case | ||
613 | */ | ||
614 | static void *RunWorker(void *run_binding); | ||
615 | |||
616 | static void *RunCallbackThreadWrapper(void *run_binding); | ||
617 | |||
618 | /** | ||
619 | * Tells the master that a worker thread did start. This does not mean, that | ||
620 | * it was initialized successfully. | ||
621 | */ | ||
622 | void ReportStartedWorker() const; | ||
623 | |||
624 | void Schedule(WorkerJob job); | ||
625 | void ScheduleDeathSentences(); | ||
626 | |||
627 | /** | ||
628 | * Empties the job queue | ||
629 | * | ||
630 | * @param forget_pending controls if cancelled jobs should be seen as | ||
631 | * finished | ||
632 | */ | ||
633 | void TruncateJobQueue(const bool forget_pending = false); | ||
634 | |||
635 | /** | ||
636 | * Retrieves a job from the job queue. If the job queue is empty it will block | ||
637 | * until there is a new job available for processing. | ||
638 | * THIS METHOD MUST ONLY BE CALLED INSIDE THE WORKER OBJECTS | ||
639 | * | ||
640 | * @return a job to be processed by a worker | ||
641 | */ | ||
642 | inline WorkerJob Acquire(); | ||
643 | |||
644 | /** | ||
645 | * Controls the asynchronous finishing of a job. | ||
646 | * DO NOT CALL THIS, use JobSuccessful() or JobFailed() wrappers instead. | ||
647 | * | ||
648 | * @param data the data to be returned to the user | ||
649 | * @param success flag if job was successful | ||
650 | */ | ||
651 | void JobDone(const returned_data_t &data, const bool success = true); | ||
652 | |||
653 | ✗ | inline void StartRunning() { | |
654 | ✗ | MutexLockGuard guard(status_mutex_); | |
655 | ✗ | running_ = true; | |
656 | } | ||
657 | ✗ | inline void StopRunning() { | |
658 | ✗ | MutexLockGuard guard(status_mutex_); | |
659 | ✗ | running_ = false; | |
660 | } | ||
661 | ✗ | inline bool IsRunning() const { | |
662 | ✗ | MutexLockGuard guard(status_mutex_); | |
663 | ✗ | return running_; | |
664 | } | ||
665 | |||
666 | private: | ||
667 | // general configuration | ||
668 | const size_t number_of_workers_; //!< number of concurrent worker threads | ||
669 | const worker_context_t *worker_context_; //!< the WorkerT defined context | ||
670 | /** | ||
671 | * The thread context passed to newly spawned threads | ||
672 | */ | ||
673 | WorkerRunBinding thread_context_; | ||
674 | |||
675 | // status information | ||
676 | bool initialized_; | ||
677 | bool running_; | ||
678 | mutable unsigned int workers_started_; | ||
679 | mutable pthread_mutex_t status_mutex_; | ||
680 | mutable pthread_cond_t worker_started_; | ||
681 | mutable pthread_mutex_t jobs_all_done_mutex_; | ||
682 | mutable pthread_cond_t jobs_all_done_; | ||
683 | |||
684 | // worker threads | ||
685 | WorkerThreads worker_threads_; //!< list of worker threads | ||
686 | pthread_t callback_thread_; //!< handles callback invokes | ||
687 | |||
688 | // job queue | ||
689 | typedef FifoChannel<WorkerJob> JobQueue; | ||
690 | JobQueue jobs_queue_; | ||
691 | mutable atomic_int32 jobs_pending_; | ||
692 | mutable atomic_int32 jobs_failed_; | ||
693 | mutable atomic_int64 jobs_processed_; | ||
694 | |||
695 | // callback channel | ||
696 | typedef FifoChannel<CallbackJob> CallbackQueue; | ||
697 | CallbackQueue results_queue_; | ||
698 | }; | ||
699 | |||
700 | |||
701 | /** | ||
702 | * Base class for worker classes that should be used in a ConcurrentWorkers | ||
703 | * swarm. These classes need to fulfill a number of requirements in order to | ||
704 | * satisfy the needs of the ConcurrentWorkers template. | ||
705 | * | ||
706 | * Requirements: | ||
707 | * -> needs to define the following types: | ||
708 | * - expected_data - input data structure of the worker | ||
709 | * - returned_data - output data structure of the worker | ||
710 | * - worker_context - context structure for initialization information | ||
711 | * | ||
712 | * -> implement a constructor that takes a pointer to its worker_context | ||
713 | * as its only parameter: | ||
714 | * AwesomeWorker(const AwesomeWorker::worker_context*) | ||
715 | * Note: do not rely on the context object to be available after the | ||
716 | * constructor has returned! | ||
717 | * | ||
718 | * -> needs to define the calling-operator expecting one parameter of type: | ||
719 | * const expected_data& and returning void | ||
720 | * This will be invoked for every new job the worker should process | ||
721 | * | ||
722 | * -> inside the implementation of the described calling-operator it needs to | ||
723 | * invoke either: | ||
724 | * master()->JobSuccessful(const returned_data&) | ||
725 | * or: | ||
726 | * master()->JobFailed(const returned_data&) | ||
727 | * as its LAST operation before returning. | ||
728 | * This will keep track of finished jobs and inform the user of Concurrent- | ||
729 | * Workers about finished jobs. | ||
730 | * | ||
731 | * -> [optional] overwrite Initialize() and/or TearDown() to do environmental | ||
732 | * setup work, before or respectively after jobs will be processed | ||
733 | * | ||
734 | * General Reminder: | ||
735 | * You will be running in a multi-threaded environment here! Buckle up and | ||
736 | * make suitable preparations to shield yourself from serious head-ache. | ||
737 | * | ||
738 | * Note: This implements a Curiously Recurring Template Pattern | ||
739 | * (http://en.wikipedia.org/wiki/Curiously_recurring_template_pattern) | ||
740 | * | ||
741 | * @param DerivedWorkerT the class name of the inheriting class | ||
742 | * (f.e. class AwesomeWorker : public ConcurrentWorker<AwesomeWorker>) | ||
743 | */ | ||
744 | template<class DerivedWorkerT> | ||
745 | class ConcurrentWorker : SingleCopy { | ||
746 | public: | ||
747 | ✗ | virtual ~ConcurrentWorker() { } | |
748 | |||
749 | /** | ||
750 | * Does general initialization before any jobs will get scheduled. You do not | ||
751 | * need to up-call this initialize method, since it is seen as a dummy here. | ||
752 | * | ||
753 | * @returns true one successful initialization | ||
754 | */ | ||
755 | ✗ | virtual bool Initialize() { return true; } | |
756 | |||
757 | /** | ||
758 | * Does general clean-up after the last job was processed in the worker object | ||
759 | * and it is about to vanish. You do not need to up-call this method. | ||
760 | */ | ||
761 | ✗ | virtual void TearDown() { } | |
762 | |||
763 | /** | ||
764 | * The actual job-processing entry point. See the description of the | ||
765 | * inheriting class requirements to learn about the semantics of this methods. | ||
766 | * DO NOT FORGET TO CALL master()->JobSuccessful() OR master()->JobFinished() | ||
767 | * at the end of thismethod!! | ||
768 | * | ||
769 | * Note: There is no way to generally define this operator, it is therefore | ||
770 | * commented out and placed here just as a reminder. | ||
771 | * | ||
772 | * @param data the data to be processed. | ||
773 | */ | ||
774 | // void operator()(const expected_data &data); // do the actual job of the | ||
775 | // worker | ||
776 | |||
777 | protected: | ||
778 | ✗ | ConcurrentWorker() : master_(NULL) { } | |
779 | |||
780 | /** | ||
781 | * Gets a pointer to the ConcurrentWorkers object that this worker resides in | ||
782 | * | ||
783 | * @returns a pointer to the ConcurrentWorkers object | ||
784 | */ | ||
785 | ✗ | inline ConcurrentWorkers<DerivedWorkerT> *master() const { return master_; } | |
786 | |||
787 | private: | ||
788 | friend class ConcurrentWorkers<DerivedWorkerT>; | ||
789 | ✗ | void RegisterMaster(ConcurrentWorkers<DerivedWorkerT> *master) { | |
790 | ✗ | master_ = master; | |
791 | } | ||
792 | |||
793 | private: | ||
794 | ConcurrentWorkers<DerivedWorkerT> *master_; | ||
795 | }; | ||
796 | |||
797 | #ifdef CVMFS_NAMESPACE_GUARD | ||
798 | } // namespace CVMFS_NAMESPACE_GUARD | ||
799 | #endif | ||
800 | |||
801 | #include "util/concurrency_impl.h" | ||
802 | |||
803 | #endif // CVMFS_UTIL_CONCURRENCY_H_ | ||
804 |