Directory: | cvmfs/ |
---|---|
File: | cvmfs/util/concurrency.h |
Date: | 2025-06-22 02:36:02 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 85 | 115 | 73.9% |
Branches: | 21 | 38 | 55.3% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #ifndef CVMFS_UTIL_CONCURRENCY_H_ | ||
6 | #define CVMFS_UTIL_CONCURRENCY_H_ | ||
7 | |||
8 | #include <pthread.h> | ||
9 | |||
10 | #include <cassert> | ||
11 | #include <queue> | ||
12 | #include <set> | ||
13 | #include <vector> | ||
14 | |||
15 | #include "util/async.h" | ||
16 | #include "util/atomic.h" | ||
17 | #include "util/export.h" | ||
18 | #include "util/mutex.h" | ||
19 | #include "util/single_copy.h" | ||
20 | |||
21 | #ifdef CVMFS_NAMESPACE_GUARD | ||
22 | namespace CVMFS_NAMESPACE_GUARD { | ||
23 | #endif | ||
24 | |||
25 | /** | ||
26 | * A thread-safe, unbounded vector of items that implement a FIFO channel. | ||
27 | * Uses conditional variables to block when threads try to pop from the empty | ||
28 | * channel. | ||
29 | */ | ||
30 | template<class ItemT> | ||
31 | class Channel : SingleCopy { | ||
32 | public: | ||
33 | 90 | Channel() { | |
34 | 90 | int retval = pthread_mutex_init(&lock_, NULL); | |
35 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 90 times.
|
90 | assert(retval == 0); |
36 | 90 | retval = pthread_cond_init(&cond_populated_, NULL); | |
37 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 90 times.
|
90 | assert(retval == 0); |
38 | 90 | } | |
39 | |||
40 | 90 | ~Channel() { | |
41 | 90 | pthread_cond_destroy(&cond_populated_); | |
42 | 90 | pthread_mutex_destroy(&lock_); | |
43 | 90 | } | |
44 | |||
45 | /** | ||
46 | * Returns the queue locked and ready for appending 1 item. | ||
47 | */ | ||
48 | 200000 | std::vector<ItemT *> *StartEnqueueing() { | |
49 | 200000 | int retval = pthread_mutex_lock(&lock_); | |
50 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 200000 times.
|
200000 | assert(retval == 0); |
51 | 200000 | return &items_; | |
52 | } | ||
53 | |||
54 | /** | ||
55 | * Unlocks the queue. The queue must remain unchanged when this is called. | ||
56 | */ | ||
57 | 100000 | void AbortEnqueueing() { | |
58 | 100000 | int retval = pthread_mutex_unlock(&lock_); | |
59 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 100000 times.
|
100000 | assert(retval == 0); |
60 | 100000 | } | |
61 | |||
62 | /** | ||
63 | * 1 new item was added to the queue. Unlock and signal to reader thread. | ||
64 | */ | ||
65 | 100000 | void CommitEnqueueing() { | |
66 | 100000 | int retval = pthread_cond_signal(&cond_populated_); | |
67 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 100000 times.
|
100000 | assert(retval == 0); |
68 | 100000 | retval = pthread_mutex_unlock(&lock_); | |
69 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 100000 times.
|
100000 | assert(retval == 0); |
70 | 100000 | } | |
71 | |||
72 | 126 | void PushBack(ItemT *item) { | |
73 | 126 | MutexLockGuard lock_guard(&lock_); | |
74 |
1/2✓ Branch 1 taken 126 times.
✗ Branch 2 not taken.
|
126 | items_.push_back(item); |
75 | 126 | int retval = pthread_cond_signal(&cond_populated_); | |
76 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 126 times.
|
126 | assert(retval == 0); |
77 | 126 | } | |
78 | |||
79 | /** | ||
80 | * Remove and return the first element from the queue. Block if tube is | ||
81 | * empty. | ||
82 | */ | ||
83 | 100126 | ItemT *PopFront() { | |
84 | 100126 | MutexLockGuard lock_guard(&lock_); | |
85 |
2/2✓ Branch 1 taken 98 times.
✓ Branch 2 taken 100126 times.
|
100224 | while (items_.size() == 0) |
86 |
1/2✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
|
98 | pthread_cond_wait(&cond_populated_, &lock_); |
87 | 100126 | ItemT *item = items_[0]; | |
88 |
1/2✓ Branch 3 taken 100126 times.
✗ Branch 4 not taken.
|
100126 | items_.erase(items_.begin()); |
89 | 100126 | return item; | |
90 | 100126 | } | |
91 | |||
92 | private: | ||
93 | /** | ||
94 | * The locked queue/channel | ||
95 | */ | ||
96 | std::vector<ItemT *> items_; | ||
97 | /** | ||
98 | * Protects all internal state | ||
99 | */ | ||
100 | pthread_mutex_t lock_; | ||
101 | /** | ||
102 | * Signals if there are items enqueued | ||
103 | */ | ||
104 | pthread_cond_t cond_populated_; | ||
105 | }; | ||
106 | |||
107 | /** | ||
108 | * Implements a simple interface to lock objects of derived classes. Classes | ||
109 | * that inherit from Lockable are also usable with the LockGuard template for | ||
110 | * scoped locking semantics. | ||
111 | */ | ||
112 | class CVMFS_EXPORT Lockable : SingleCopy { | ||
113 | public: | ||
114 | 40 | inline virtual ~Lockable() { pthread_mutex_destroy(&mutex_); } | |
115 | |||
116 | 20 | void Lock() const { pthread_mutex_lock(&mutex_); } | |
117 | 40 | int TryLock() const { return pthread_mutex_trylock(&mutex_); } | |
118 | 20 | void Unlock() const { pthread_mutex_unlock(&mutex_); } | |
119 | |||
120 | protected: | ||
121 | 20 | Lockable() { | |
122 | 20 | const int retval = pthread_mutex_init(&mutex_, NULL); | |
123 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 20 times.
|
20 | assert(retval == 0); |
124 | 20 | } | |
125 | |||
126 | private: | ||
127 | mutable pthread_mutex_t mutex_; | ||
128 | }; | ||
129 | |||
130 | |||
131 | // | ||
132 | // ----------------------------------------------------------------------------- | ||
133 | // | ||
134 | |||
135 | |||
136 | /** | ||
137 | * This counter can be counted up and down using the usual increment/decrement | ||
138 | * operators. It allows threads to wait for it to become zero as well as to | ||
139 | * block when a specified maximal value would be exceeded by an increment. | ||
140 | * | ||
141 | * Note: If a maximal value is specified on creation, the SynchronizingCounter | ||
142 | * is assumed to never leave the interval [0, maximal_value]! Otherwise | ||
143 | * the numerical limits of the specified template parameter define this | ||
144 | * interval and an increment _never_ blocks. | ||
145 | * | ||
146 | * Caveat: This implementation uses a simple mutex mechanism and therefore might | ||
147 | * become a scalability bottle neck! | ||
148 | */ | ||
149 | template<typename T> | ||
150 | class SynchronizingCounter : SingleCopy { | ||
151 | public: | ||
152 | 72 | SynchronizingCounter() : value_(T(0)), maximal_value_(T(0)) { Initialize(); } | |
153 | |||
154 | 2890 | explicit SynchronizingCounter(const T maximal_value) | |
155 | 2890 | : value_(T(0)), maximal_value_(maximal_value) { | |
156 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2890 times.
|
2890 | assert(maximal_value > T(0)); |
157 | 2890 | Initialize(); | |
158 | 2890 | } | |
159 | |||
160 | 2961 | ~SynchronizingCounter() { Destroy(); } | |
161 | |||
162 | 643686918 | T Increment() { | |
163 | 643686918 | MutexLockGuard l(mutex_); | |
164 |
1/2✓ Branch 1 taken 651562387 times.
✗ Branch 2 not taken.
|
651562387 | WaitForFreeSlotUnprotected(); |
165 |
1/2✓ Branch 1 taken 1204444 times.
✗ Branch 2 not taken.
|
651562387 | SetValueUnprotected(value_ + T(1)); |
166 | 644772101 | return value_; | |
167 | 651562387 | } | |
168 | |||
169 | 644477857 | T Decrement() { | |
170 | 644477857 | MutexLockGuard l(mutex_); | |
171 |
1/2✓ Branch 1 taken 1204444 times.
✗ Branch 2 not taken.
|
651562387 | SetValueUnprotected(value_ - T(1)); |
172 | 1295706739 | return value_; | |
173 | 651562387 | } | |
174 | |||
175 | 3829 | void WaitForZero() const { | |
176 | 3829 | MutexLockGuard l(mutex_); | |
177 |
2/2✓ Branch 0 taken 548 times.
✓ Branch 1 taken 3829 times.
|
4377 | while (value_ != T(0)) { |
178 |
1/2✓ Branch 1 taken 548 times.
✗ Branch 2 not taken.
|
548 | pthread_cond_wait(&became_zero_, &mutex_); |
179 | } | ||
180 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3829 times.
|
3829 | assert(value_ == T(0)); |
181 | 3829 | } | |
182 | |||
183 | 3912890192 | bool HasMaximalValue() const { return maximal_value_ != T(0); } | |
184 | 8 | T maximal_value() const { return maximal_value_; } | |
185 | |||
186 | 1195565 | T operator++() { return Increment(); } | |
187 | 28 | const T operator++(int) { return Increment() - T(1); } | |
188 | 1195905 | T operator--() { return Decrement(); } | |
189 | 28 | const T operator--(int) { return Decrement() + T(1); } | |
190 | |||
191 | 38364 | T Get() const { | |
192 | 38364 | MutexLockGuard l(mutex_); | |
193 | 76728 | return value_; | |
194 | 38364 | } | |
195 | |||
196 | 86 | SynchronizingCounter<T> &operator=(const T &other) { | |
197 | 86 | MutexLockGuard l(mutex_); | |
198 | 86 | SetValueUnprotected(other); | |
199 | 172 | return *this; | |
200 | 86 | } | |
201 | |||
202 | protected: | ||
203 | void SetValueUnprotected(const T new_value); | ||
204 | void WaitForFreeSlotUnprotected(); | ||
205 | |||
206 | private: | ||
207 | void Initialize(); | ||
208 | void Destroy(); | ||
209 | |||
210 | private: | ||
211 | T value_; | ||
212 | const T maximal_value_; | ||
213 | |||
214 | mutable pthread_mutex_t mutex_; | ||
215 | mutable pthread_cond_t became_zero_; | ||
216 | pthread_cond_t free_slot_; | ||
217 | }; | ||
218 | |||
219 | |||
220 | // | ||
221 | // ----------------------------------------------------------------------------- | ||
222 | // | ||
223 | |||
224 | |||
225 | template<typename ParamT> | ||
226 | class Observable; | ||
227 | |||
228 | |||
229 | /** | ||
230 | * This is a base class for classes that need to expose a callback interface for | ||
231 | * asynchronous callback methods. One can register an arbitrary number of | ||
232 | * observers on an Observable that get notified when the method | ||
233 | * NotifyListeners() is invoked. | ||
234 | * | ||
235 | * Note: the registration and invocation of callbacks in Observable is thread- | ||
236 | * safe, but be aware that the callbacks of observing classes might run in | ||
237 | * arbitrary threads. When using these classes, you should take extra care | ||
238 | * for thread-safety. | ||
239 | * | ||
240 | * Note: The RegisterListener() methods return a pointer to a CallbackBase. | ||
241 | * You MUST NOT free these objects, they are managed by the Observable | ||
242 | * class. Use them only as handles to unregister specific callbacks. | ||
243 | * | ||
244 | * @param ParamT the type of the parameter that is passed to every callback | ||
245 | * invocation. | ||
246 | */ | ||
247 | template<typename ParamT> | ||
248 | class Observable : public Callbackable<ParamT>, SingleCopy { | ||
249 | public: | ||
250 | typedef typename Callbackable<ParamT>::CallbackTN *CallbackPtr; | ||
251 | |||
252 | protected: | ||
253 | typedef std::set<CallbackPtr> Callbacks; | ||
254 | |||
255 | public: | ||
256 | virtual ~Observable(); | ||
257 | |||
258 | /** | ||
259 | * Registers a method of a specific object as a listener to the Observable | ||
260 | * object. The method is invoked on the given delegate when the callback is | ||
261 | * fired by the observed object using NotifyListeners(). Since this is meant | ||
262 | * to be a closure, it also passes the third argument to the method being in- | ||
263 | * voked by the Observable object. | ||
264 | * | ||
265 | * @param DelegateT the type of the delegate object | ||
266 | * @param method a pointer to the method to be invoked by the callback | ||
267 | * @param delegate a pointer to the object to invoke the callback on | ||
268 | * @param closure something to be passed to `method` | ||
269 | * @return a handle to the registered callback | ||
270 | */ | ||
271 | template<class DelegateT, class ClosureDataT> | ||
272 | CallbackPtr RegisterListener( | ||
273 | typename BoundClosure<ParamT, DelegateT, ClosureDataT>::CallbackMethod | ||
274 | method, | ||
275 | DelegateT *delegate, | ||
276 | ClosureDataT data); | ||
277 | |||
278 | /** | ||
279 | * Registers a method of a specific object as a listener to the Observable | ||
280 | * object. The method is invoked on the given delegate when the callback is | ||
281 | * fired by the observed object using NotifyListeners(). | ||
282 | * | ||
283 | * @param DelegateT the type of the delegate object | ||
284 | * @param method a pointer to the method to be invoked by the callback | ||
285 | * @param delegate a pointer to the object to invoke the callback on | ||
286 | * @return a handle to the registered callback | ||
287 | */ | ||
288 | template<class DelegateT> | ||
289 | CallbackPtr RegisterListener( | ||
290 | typename BoundCallback<ParamT, DelegateT>::CallbackMethod method, | ||
291 | DelegateT *delegate); | ||
292 | |||
293 | /** | ||
294 | * Registers a static class member or a C-like function as a callback to the | ||
295 | * Observable object. The function is invoked when the callback is fired by | ||
296 | * the observed object using NotifyListeners(). | ||
297 | * | ||
298 | * @param fn a pointer to the function to be called by the callback | ||
299 | * @return a handle to the registered callback | ||
300 | */ | ||
301 | CallbackPtr RegisterListener(typename Callback<ParamT>::CallbackFunction fn); | ||
302 | |||
303 | /** | ||
304 | * Removes the given callback from the listeners group of this Observable. | ||
305 | * | ||
306 | * @param callback_object a callback handle that was returned by | ||
307 | * RegisterListener() before. | ||
308 | */ | ||
309 | void UnregisterListener(CallbackPtr callback_object); | ||
310 | |||
311 | /** | ||
312 | * Removes all listeners from the Observable | ||
313 | */ | ||
314 | void UnregisterListeners(); | ||
315 | |||
316 | protected: | ||
317 | Observable(); // don't instantiate this as a stand alone object | ||
318 | |||
319 | void RegisterListener(CallbackPtr callback_object); | ||
320 | |||
321 | /** | ||
322 | * Notifies all registered listeners and passes them the provided argument | ||
323 | * This method should be called by a derived class to send out asynchronous | ||
324 | * messages to registered observers. | ||
325 | * | ||
326 | * @param parameter the data to be passed to the observers | ||
327 | */ | ||
328 | void NotifyListeners(const ParamT ¶meter); | ||
329 | |||
330 | private: | ||
331 | Callbacks listeners_; //!< the set of registered | ||
332 | //!< callback objects | ||
333 | mutable pthread_rwlock_t listeners_rw_lock_; | ||
334 | }; | ||
335 | |||
336 | |||
337 | // | ||
338 | // ----------------------------------------------------------------------------- | ||
339 | // | ||
340 | |||
341 | |||
342 | /** | ||
343 | * Returns the number of CPU cores present in the system or a fallback number | ||
344 | * if it failed to determine the number of CPU cores. | ||
345 | * | ||
346 | * @return the number of active CPU cores in the system | ||
347 | */ | ||
348 | CVMFS_EXPORT unsigned int GetNumberOfCpuCores(); | ||
349 | static const unsigned int kFallbackNumberOfCpus = 1; | ||
350 | |||
351 | |||
352 | /** | ||
353 | * A blocking signal for thread synchronization | ||
354 | */ | ||
355 | class CVMFS_EXPORT Signal : SingleCopy { | ||
356 | public: | ||
357 | Signal(); | ||
358 | ~Signal(); | ||
359 | void Wakeup(); | ||
360 | void Wait(); | ||
361 | bool IsSleeping(); | ||
362 | |||
363 | private: | ||
364 | bool fired_; | ||
365 | pthread_mutex_t lock_; | ||
366 | pthread_cond_t signal_; | ||
367 | }; | ||
368 | |||
369 | |||
370 | // | ||
371 | // ----------------------------------------------------------------------------- | ||
372 | // | ||
373 | |||
374 | |||
375 | /** | ||
376 | * Asynchronous FIFO channel template | ||
377 | * Implements a thread safe FIFO queue that handles thread blocking if the queue | ||
378 | * is full or empty. | ||
379 | * | ||
380 | * @param T the data type to be enqueued in the queue | ||
381 | */ | ||
382 | template<class T> | ||
383 | class FifoChannel : protected std::queue<T> { | ||
384 | public: | ||
385 | /** | ||
386 | * Creates a new FIFO channel. | ||
387 | * | ||
388 | * @param maximal_length the maximal number of items that can be enqueued | ||
389 | * @param drainout_threshold if less than xx elements are in the queue it is | ||
390 | * considered to be "not full" | ||
391 | */ | ||
392 | FifoChannel(const size_t maximal_length, const size_t drainout_threshold); | ||
393 | virtual ~FifoChannel(); | ||
394 | |||
395 | /** | ||
396 | * Adds a new item to the end of the FIFO channel. If the queue is full, this | ||
397 | * call will block until items were dequeued by another thread allowing the | ||
398 | * desired insertion. | ||
399 | * | ||
400 | * @param data the data to be enqueued into the FIFO channel | ||
401 | */ | ||
402 | void Enqueue(const T &data); | ||
403 | |||
404 | /** | ||
405 | * Removes the next element from the channel. If the queue is empty, this will | ||
406 | * block until another thread enqueues an item into the channel. | ||
407 | * | ||
408 | * @return the first item in the channel queue | ||
409 | */ | ||
410 | const T Dequeue(); | ||
411 | |||
412 | /** | ||
413 | * Clears all items in the FIFO channel. The cleared items will be lost. | ||
414 | * | ||
415 | * @return the number of dropped items | ||
416 | */ | ||
417 | unsigned int Drop(); | ||
418 | |||
419 | inline size_t GetItemCount() const; | ||
420 | inline bool IsEmpty() const; | ||
421 | inline size_t GetMaximalItemCount() const; | ||
422 | |||
423 | private: | ||
424 | // general configuration | ||
425 | const size_t maximal_queue_length_; | ||
426 | const size_t queue_drainout_threshold_; | ||
427 | |||
428 | // thread synchronisation structures | ||
429 | mutable pthread_mutex_t mutex_; | ||
430 | mutable pthread_cond_t queue_is_not_empty_; | ||
431 | mutable pthread_cond_t queue_is_not_full_; | ||
432 | }; | ||
433 | |||
434 | |||
435 | /** | ||
436 | * This template implements a generic producer/consumer approach to concurrent | ||
437 | * worker tasks. It spawns a given number of Workers derived from the base class | ||
438 | * ConcurrentWorker and uses them to distribute the work load onto concurrent | ||
439 | * threads. | ||
440 | * One can have multiple producers, that use Schedule() to post new work into | ||
441 | * a central job queue, which in turn is processed concurrently by the Worker | ||
442 | * objects in multiple threads. Furthermore the template provides an interface | ||
443 | * to control the worker swarm, i.e. to wait for their completion or cancel them | ||
444 | * before all jobs are processed. | ||
445 | * | ||
446 | * Note: A worker is a class inheriting from ConcurrentWorker that needs to meet | ||
447 | * a couple of requirements. See the documentation of ConcurrentWorker for | ||
448 | * additional details. | ||
449 | * | ||
450 | * @param WorkerT the class to be used as a worker for a concurrent worker | ||
451 | * swarm | ||
452 | */ | ||
453 | template<class WorkerT> | ||
454 | class ConcurrentWorkers : public Observable<typename WorkerT::returned_data> { | ||
455 | public: | ||
456 | // these data types must be defined by the worker class | ||
457 | /** | ||
458 | * Input data type | ||
459 | */ | ||
460 | typedef typename WorkerT::expected_data expected_data_t; | ||
461 | /** | ||
462 | * Output data type | ||
463 | */ | ||
464 | typedef typename WorkerT::returned_data returned_data_t; | ||
465 | /** | ||
466 | * Common context type | ||
467 | */ | ||
468 | typedef typename WorkerT::worker_context worker_context_t; | ||
469 | |||
470 | protected: | ||
471 | typedef std::vector<pthread_t> WorkerThreads; | ||
472 | |||
473 | /** | ||
474 | * This is a simple wrapper structure to piggy-back control information on | ||
475 | * scheduled jobs. Job structures are scheduled into a central FIFO queue and | ||
476 | * are then processed concurrently by the workers. | ||
477 | */ | ||
478 | template<class DataT> | ||
479 | struct Job { | ||
480 | ✗ | explicit Job(const DataT &data) : data(data), is_death_sentence(false) { } | |
481 | ✗ | Job() : data(), is_death_sentence(true) { } | |
482 | const DataT data; //!< job payload | ||
483 | const bool is_death_sentence; //!< death sentence flag | ||
484 | }; | ||
485 | typedef Job<expected_data_t> WorkerJob; | ||
486 | typedef Job<returned_data_t> CallbackJob; | ||
487 | |||
488 | /** | ||
489 | * Provides a wrapper for initialization data passed to newly spawned worker | ||
490 | * threads for initialization. | ||
491 | * It contains a pointer to the spawning ConcurrentWorkers master object as | ||
492 | * well as a pointer to a context object defined by the concrete worker to be | ||
493 | * spawned. | ||
494 | */ | ||
495 | struct RunBinding { | ||
496 | ✗ | explicit RunBinding(ConcurrentWorkers<WorkerT> *delegate) | |
497 | ✗ | : delegate(delegate) { } | |
498 | ConcurrentWorkers<WorkerT> *delegate; //!< delegate to the Concurrent- | ||
499 | //!< Workers master | ||
500 | }; | ||
501 | |||
502 | struct WorkerRunBinding : RunBinding { | ||
503 | ✗ | WorkerRunBinding(ConcurrentWorkers<WorkerT> *delegate, | |
504 | const worker_context_t *worker_context) | ||
505 | ✗ | : RunBinding(delegate), worker_context(worker_context) { } | |
506 | /** | ||
507 | * WorkerT defined context objects for worker init. | ||
508 | */ | ||
509 | const worker_context_t *worker_context; | ||
510 | }; | ||
511 | |||
512 | public: | ||
513 | /** | ||
514 | * Creates a ConcurrentWorkers master object that encapsulates the actual | ||
515 | * workers. | ||
516 | * | ||
517 | * @param number_of_workers the number of concurrent workers to be spawned | ||
518 | * @param maximal_queue_length the maximal length of the job queue | ||
519 | * (>= number_of_workers) | ||
520 | * @param worker_context a pointer to the WorkerT defined context | ||
521 | * object | ||
522 | */ | ||
523 | ConcurrentWorkers(const size_t number_of_workers, | ||
524 | const size_t maximal_queue_length, | ||
525 | worker_context_t *worker_context = NULL); | ||
526 | virtual ~ConcurrentWorkers(); | ||
527 | |||
528 | /** | ||
529 | * Initializes the ConcurrentWorkers swarm, spawnes a thread for each new | ||
530 | * worker object and puts everything into working state. | ||
531 | * | ||
532 | * @return true if all went fine | ||
533 | */ | ||
534 | bool Initialize(); | ||
535 | |||
536 | /** | ||
537 | * Schedules a new job for processing into the internal job queue. This method | ||
538 | * will block in case the job queue is already full and wait for an empty | ||
539 | * slot. | ||
540 | * | ||
541 | * @param data the data to be processed | ||
542 | */ | ||
543 | ✗ | inline void Schedule(const expected_data_t &data) { | |
544 | ✗ | Schedule(WorkerJob(data)); | |
545 | } | ||
546 | |||
547 | /** | ||
548 | * Shuts down the ConcurrentWorkers object as well as the encapsulated workers | ||
549 | * as soon as possible. Workers will finish their current job and will termi- | ||
550 | * nate afterwards. If no jobs are scheduled they will simply stop waiting for | ||
551 | * new ones and terminate afterwards. | ||
552 | * This method MUST not be called more than once per ConcurrentWorkers. | ||
553 | */ | ||
554 | void Terminate(); | ||
555 | |||
556 | /** | ||
557 | * Waits until the job queue is fully processed | ||
558 | * | ||
559 | * Note: this might lead to undefined behaviour or infinite waiting if other | ||
560 | * producers still schedule jobs into the job queue. | ||
561 | */ | ||
562 | void WaitForEmptyQueue() const; | ||
563 | |||
564 | /** | ||
565 | * Waits until the ConcurrentWorkers swarm fully processed the current job | ||
566 | * queue and shuts down afterwards. | ||
567 | * | ||
568 | * Note: just as for WaitForEmptyQueue() this assumes that no other producers | ||
569 | * schedule jobs in the mean time. | ||
570 | */ | ||
571 | void WaitForTermination(); | ||
572 | |||
573 | ✗ | inline unsigned int GetNumberOfWorkers() const { return number_of_workers_; } | |
574 | ✗ | inline unsigned int GetNumberOfFailedJobs() const { | |
575 | ✗ | return atomic_read32(&jobs_failed_); | |
576 | } | ||
577 | |||
578 | /** | ||
579 | * Defines a job as successfully finished. | ||
580 | * DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT! | ||
581 | * | ||
582 | * @param data the data to be returned back to the user | ||
583 | */ | ||
584 | ✗ | inline void JobSuccessful(const returned_data_t &data) { | |
585 | ✗ | JobDone(data, true); | |
586 | } | ||
587 | |||
588 | /** | ||
589 | * Defines a job as failed. | ||
590 | * DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT! | ||
591 | * | ||
592 | * Note: Even for failed jobs the user will get a callback with a data object. | ||
593 | * You might want to make sure, that this data contains a status flag as | ||
594 | * well, telling the user what went wrong. | ||
595 | * | ||
596 | * @param data the data to be returned back to the user | ||
597 | */ | ||
598 | ✗ | inline void JobFailed(const returned_data_t &data) { JobDone(data, false); } | |
599 | |||
600 | void RunCallbackThread(); | ||
601 | |||
602 | protected: | ||
603 | bool SpawnWorkers(); | ||
604 | |||
605 | /** | ||
606 | * POSIX conform function for thread entry point. Is invoked for every new | ||
607 | * worker thread and contains the initialization, processing loop and tear | ||
608 | * down of the unique worker objects | ||
609 | * | ||
610 | * @param run_binding void pointer to a RunBinding structure (C interface) | ||
611 | * @return NULL in any case | ||
612 | */ | ||
613 | static void *RunWorker(void *run_binding); | ||
614 | |||
615 | static void *RunCallbackThreadWrapper(void *run_binding); | ||
616 | |||
617 | /** | ||
618 | * Tells the master that a worker thread did start. This does not mean, that | ||
619 | * it was initialized successfully. | ||
620 | */ | ||
621 | void ReportStartedWorker() const; | ||
622 | |||
623 | void Schedule(WorkerJob job); | ||
624 | void ScheduleDeathSentences(); | ||
625 | |||
626 | /** | ||
627 | * Empties the job queue | ||
628 | * | ||
629 | * @param forget_pending controls if cancelled jobs should be seen as | ||
630 | * finished | ||
631 | */ | ||
632 | void TruncateJobQueue(const bool forget_pending = false); | ||
633 | |||
634 | /** | ||
635 | * Retrieves a job from the job queue. If the job queue is empty it will block | ||
636 | * until there is a new job available for processing. | ||
637 | * THIS METHOD MUST ONLY BE CALLED INSIDE THE WORKER OBJECTS | ||
638 | * | ||
639 | * @return a job to be processed by a worker | ||
640 | */ | ||
641 | inline WorkerJob Acquire(); | ||
642 | |||
643 | /** | ||
644 | * Controls the asynchronous finishing of a job. | ||
645 | * DO NOT CALL THIS, use JobSuccessful() or JobFailed() wrappers instead. | ||
646 | * | ||
647 | * @param data the data to be returned to the user | ||
648 | * @param success flag if job was successful | ||
649 | */ | ||
650 | void JobDone(const returned_data_t &data, const bool success = true); | ||
651 | |||
652 | ✗ | inline void StartRunning() { | |
653 | ✗ | MutexLockGuard guard(status_mutex_); | |
654 | ✗ | running_ = true; | |
655 | } | ||
656 | ✗ | inline void StopRunning() { | |
657 | ✗ | MutexLockGuard guard(status_mutex_); | |
658 | ✗ | running_ = false; | |
659 | } | ||
660 | ✗ | inline bool IsRunning() const { | |
661 | ✗ | MutexLockGuard guard(status_mutex_); | |
662 | ✗ | return running_; | |
663 | } | ||
664 | |||
665 | private: | ||
666 | // general configuration | ||
667 | const size_t number_of_workers_; //!< number of concurrent worker threads | ||
668 | const worker_context_t *worker_context_; //!< the WorkerT defined context | ||
669 | /** | ||
670 | * The thread context passed to newly spawned threads | ||
671 | */ | ||
672 | WorkerRunBinding thread_context_; | ||
673 | |||
674 | // status information | ||
675 | bool initialized_; | ||
676 | bool running_; | ||
677 | mutable unsigned int workers_started_; | ||
678 | mutable pthread_mutex_t status_mutex_; | ||
679 | mutable pthread_cond_t worker_started_; | ||
680 | mutable pthread_mutex_t jobs_all_done_mutex_; | ||
681 | mutable pthread_cond_t jobs_all_done_; | ||
682 | |||
683 | // worker threads | ||
684 | WorkerThreads worker_threads_; //!< list of worker threads | ||
685 | pthread_t callback_thread_; //!< handles callback invokes | ||
686 | |||
687 | // job queue | ||
688 | typedef FifoChannel<WorkerJob> JobQueue; | ||
689 | JobQueue jobs_queue_; | ||
690 | mutable atomic_int32 jobs_pending_; | ||
691 | mutable atomic_int32 jobs_failed_; | ||
692 | mutable atomic_int64 jobs_processed_; | ||
693 | |||
694 | // callback channel | ||
695 | typedef FifoChannel<CallbackJob> CallbackQueue; | ||
696 | CallbackQueue results_queue_; | ||
697 | }; | ||
698 | |||
699 | |||
700 | /** | ||
701 | * Base class for worker classes that should be used in a ConcurrentWorkers | ||
702 | * swarm. These classes need to fulfill a number of requirements in order to | ||
703 | * satisfy the needs of the ConcurrentWorkers template. | ||
704 | * | ||
705 | * Requirements: | ||
706 | * -> needs to define the following types: | ||
707 | * - expected_data - input data structure of the worker | ||
708 | * - returned_data - output data structure of the worker | ||
709 | * - worker_context - context structure for initialization information | ||
710 | * | ||
711 | * -> implement a constructor that takes a pointer to its worker_context | ||
712 | * as its only parameter: | ||
713 | * AwesomeWorker(const AwesomeWorker::worker_context*) | ||
714 | * Note: do not rely on the context object to be available after the | ||
715 | * constructor has returned! | ||
716 | * | ||
717 | * -> needs to define the calling-operator expecting one parameter of type: | ||
718 | * const expected_data& and returning void | ||
719 | * This will be invoked for every new job the worker should process | ||
720 | * | ||
721 | * -> inside the implementation of the described calling-operator it needs to | ||
722 | * invoke either: | ||
723 | * master()->JobSuccessful(const returned_data&) | ||
724 | * or: | ||
725 | * master()->JobFailed(const returned_data&) | ||
726 | * as its LAST operation before returning. | ||
727 | * This will keep track of finished jobs and inform the user of Concurrent- | ||
728 | * Workers about finished jobs. | ||
729 | * | ||
730 | * -> [optional] overwrite Initialize() and/or TearDown() to do environmental | ||
731 | * setup work, before or respectively after jobs will be processed | ||
732 | * | ||
733 | * General Reminder: | ||
734 | * You will be running in a multi-threaded environment here! Buckle up and | ||
735 | * make suitable preparations to shield yourself from serious head-ache. | ||
736 | * | ||
737 | * Note: This implements a Curiously Recurring Template Pattern | ||
738 | * (http://en.wikipedia.org/wiki/Curiously_recurring_template_pattern) | ||
739 | * | ||
740 | * @param DerivedWorkerT the class name of the inheriting class | ||
741 | * (f.e. class AwesomeWorker : public ConcurrentWorker<AwesomeWorker>) | ||
742 | */ | ||
743 | template<class DerivedWorkerT> | ||
744 | class ConcurrentWorker : SingleCopy { | ||
745 | public: | ||
746 | ✗ | virtual ~ConcurrentWorker() { } | |
747 | |||
748 | /** | ||
749 | * Does general initialization before any jobs will get scheduled. You do not | ||
750 | * need to up-call this initialize method, since it is seen as a dummy here. | ||
751 | * | ||
752 | * @returns true one successful initialization | ||
753 | */ | ||
754 | ✗ | virtual bool Initialize() { return true; } | |
755 | |||
756 | /** | ||
757 | * Does general clean-up after the last job was processed in the worker object | ||
758 | * and it is about to vanish. You do not need to up-call this method. | ||
759 | */ | ||
760 | ✗ | virtual void TearDown() { } | |
761 | |||
762 | /** | ||
763 | * The actual job-processing entry point. See the description of the | ||
764 | * inheriting class requirements to learn about the semantics of this methods. | ||
765 | * DO NOT FORGET TO CALL master()->JobSuccessful() OR master()->JobFinished() | ||
766 | * at the end of thismethod!! | ||
767 | * | ||
768 | * Note: There is no way to generally define this operator, it is therefore | ||
769 | * commented out and placed here just as a reminder. | ||
770 | * | ||
771 | * @param data the data to be processed. | ||
772 | */ | ||
773 | // void operator()(const expected_data &data); // do the actual job of the | ||
774 | // worker | ||
775 | |||
776 | protected: | ||
777 | ✗ | ConcurrentWorker() : master_(NULL) { } | |
778 | |||
779 | /** | ||
780 | * Gets a pointer to the ConcurrentWorkers object that this worker resides in | ||
781 | * | ||
782 | * @returns a pointer to the ConcurrentWorkers object | ||
783 | */ | ||
784 | ✗ | inline ConcurrentWorkers<DerivedWorkerT> *master() const { return master_; } | |
785 | |||
786 | private: | ||
787 | friend class ConcurrentWorkers<DerivedWorkerT>; | ||
788 | ✗ | void RegisterMaster(ConcurrentWorkers<DerivedWorkerT> *master) { | |
789 | ✗ | master_ = master; | |
790 | } | ||
791 | |||
792 | private: | ||
793 | ConcurrentWorkers<DerivedWorkerT> *master_; | ||
794 | }; | ||
795 | |||
796 | #ifdef CVMFS_NAMESPACE_GUARD | ||
797 | } // namespace CVMFS_NAMESPACE_GUARD | ||
798 | #endif | ||
799 | |||
800 | #include "util/concurrency_impl.h" | ||
801 | |||
802 | #endif // CVMFS_UTIL_CONCURRENCY_H_ | ||
803 |