Directory: | cvmfs/ |
---|---|
File: | cvmfs/util/concurrency.h |
Date: | 2024-04-21 02:33:16 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 49 | 83 | 59.0% |
Branches: | 9 | 16 | 56.2% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #ifndef CVMFS_UTIL_CONCURRENCY_H_ | ||
6 | #define CVMFS_UTIL_CONCURRENCY_H_ | ||
7 | |||
8 | #include <pthread.h> | ||
9 | |||
10 | #include <cassert> | ||
11 | #include <queue> | ||
12 | #include <set> | ||
13 | #include <vector> | ||
14 | |||
15 | #include "util/async.h" | ||
16 | #include "util/atomic.h" | ||
17 | #include "util/export.h" | ||
18 | #include "util/mutex.h" | ||
19 | #include "util/single_copy.h" | ||
20 | |||
21 | #ifdef CVMFS_NAMESPACE_GUARD | ||
22 | namespace CVMFS_NAMESPACE_GUARD { | ||
23 | #endif | ||
24 | |||
25 | /** | ||
26 | * Implements a simple interface to lock objects of derived classes. Classes that | ||
27 | * inherit from Lockable are also usable with the LockGuard template for scoped | ||
28 | * locking semantics. | ||
29 | */ | ||
30 | class CVMFS_EXPORT Lockable : SingleCopy { | ||
31 | public: | ||
32 | 2 | inline virtual ~Lockable() { pthread_mutex_destroy(&mutex_); } | |
33 | |||
34 | 1 | void Lock() const { pthread_mutex_lock(&mutex_); } | |
35 | 2 | int TryLock() const { return pthread_mutex_trylock(&mutex_); } | |
36 | 1 | void Unlock() const { pthread_mutex_unlock(&mutex_); } | |
37 | |||
38 | protected: | ||
39 | 1 | Lockable() { | |
40 | 1 | const int retval = pthread_mutex_init(&mutex_, NULL); | |
41 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | assert(retval == 0); |
42 | 1 | } | |
43 | |||
44 | private: | ||
45 | mutable pthread_mutex_t mutex_; | ||
46 | }; | ||
47 | |||
48 | |||
49 | // | ||
50 | // ----------------------------------------------------------------------------- | ||
51 | // | ||
52 | |||
53 | |||
54 | /** | ||
55 | * This counter can be counted up and down using the usual increment/decrement | ||
56 | * operators. It allows threads to wait for it to become zero as well as to | ||
57 | * block when a specified maximal value would be exceeded by an increment. | ||
58 | * | ||
59 | * Note: If a maximal value is specified on creation, the SynchronizingCounter | ||
60 | * is assumed to never leave the interval [0, maximal_value]! Otherwise | ||
61 | * the numerical limits of the specified template parameter define this | ||
62 | * interval and an increment _never_ blocks. | ||
63 | * | ||
64 | * Caveat: This implementation uses a simple mutex mechanism and therefore might | ||
65 | * become a scalability bottle neck! | ||
66 | */ | ||
67 | template <typename T> | ||
68 | class SynchronizingCounter : SingleCopy { | ||
69 | public: | ||
70 | 8 | SynchronizingCounter() : | |
71 | 8 | value_(T(0)), maximal_value_(T(0)) { Initialize(); } | |
72 | |||
73 | 131 | explicit SynchronizingCounter(const T maximal_value) | |
74 | 131 | : value_(T(0)) | |
75 | 131 | , maximal_value_(maximal_value) | |
76 | { | ||
77 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 131 times.
|
131 | assert(maximal_value > T(0)); |
78 | 131 | Initialize(); | |
79 | 131 | } | |
80 | |||
81 | 139 | ~SynchronizingCounter() { Destroy(); } | |
82 | |||
83 | 71888251 | T Increment() { | |
84 | 71888251 | MutexLockGuard l(mutex_); | |
85 |
1/2✓ Branch 1 taken 72935238 times.
✗ Branch 2 not taken.
|
72935238 | WaitForFreeSlotUnprotected(); |
86 |
1/2✓ Branch 1 taken 673231 times.
✗ Branch 2 not taken.
|
72935238 | SetValueUnprotected(value_ + T(1)); |
87 | 72042438 | return value_; | |
88 | 72935238 | } | |
89 | |||
90 | 72210462 | T Decrement() { | |
91 | 72210462 | MutexLockGuard l(mutex_); | |
92 |
1/2✓ Branch 1 taken 673231 times.
✗ Branch 2 not taken.
|
72935238 | SetValueUnprotected(value_ - T(1)); |
93 | 144554863 | return value_; | |
94 | 72935238 | } | |
95 | |||
96 | 147 | void WaitForZero() const { | |
97 | 147 | MutexLockGuard l(mutex_); | |
98 |
2/2✓ Branch 0 taken 43 times.
✓ Branch 1 taken 147 times.
|
190 | while (value_ != T(0)) { |
99 |
1/2✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
|
43 | pthread_cond_wait(&became_zero_, &mutex_); |
100 | } | ||
101 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 147 times.
|
147 | assert(value_ == T(0)); |
102 | 147 | } | |
103 | |||
104 | 440766463 | bool HasMaximalValue() const { return maximal_value_ != T(0); } | |
105 | 8 | T maximal_value() const { return maximal_value_; } | |
106 | |||
107 | 671969 | T operator++() { return Increment(); } | |
108 | 4 | const T operator++(int) { return Increment() - T(1); } | |
109 | 672626 | T operator--() { return Decrement(); } | |
110 | 4 | const T operator--(int) { return Decrement() + T(1); } | |
111 | |||
112 | 2765 | T Get() const { | |
113 | 2765 | MutexLockGuard l(mutex_); | |
114 | 5530 | return value_; | |
115 | 2765 | } | |
116 | |||
117 | 14 | SynchronizingCounter<T>& operator=(const T &other) { | |
118 | 14 | MutexLockGuard l(mutex_); | |
119 | 14 | SetValueUnprotected(other); | |
120 | 28 | return *this; | |
121 | 14 | } | |
122 | |||
123 | protected: | ||
124 | void SetValueUnprotected(const T new_value); | ||
125 | void WaitForFreeSlotUnprotected(); | ||
126 | |||
127 | private: | ||
128 | void Initialize(); | ||
129 | void Destroy(); | ||
130 | |||
131 | private: | ||
132 | T value_; | ||
133 | const T maximal_value_; | ||
134 | |||
135 | mutable pthread_mutex_t mutex_; | ||
136 | mutable pthread_cond_t became_zero_; | ||
137 | pthread_cond_t free_slot_; | ||
138 | }; | ||
139 | |||
140 | |||
141 | // | ||
142 | // ----------------------------------------------------------------------------- | ||
143 | // | ||
144 | |||
145 | |||
146 | template <typename ParamT> | ||
147 | class Observable; | ||
148 | |||
149 | |||
150 | /** | ||
151 | * This is a base class for classes that need to expose a callback interface for | ||
152 | * asynchronous callback methods. One can register an arbitrary number of | ||
153 | * observers on an Observable that get notified when the method NotifyListeners() | ||
154 | * is invoked. | ||
155 | * | ||
156 | * Note: the registration and invocation of callbacks in Observable is thread- | ||
157 | * safe, but be aware that the callbacks of observing classes might run in | ||
158 | * arbitrary threads. When using these classes, you should take extra care | ||
159 | * for thread-safety. | ||
160 | * | ||
161 | * Note: The RegisterListener() methods return a pointer to a CallbackBase. | ||
162 | * You MUST NOT free these objects, they are managed by the Observable | ||
163 | * class. Use them only as handles to unregister specific callbacks. | ||
164 | * | ||
165 | * @param ParamT the type of the parameter that is passed to every callback | ||
166 | * invocation. | ||
167 | */ | ||
168 | template <typename ParamT> | ||
169 | class Observable : public Callbackable<ParamT>, | ||
170 | SingleCopy { | ||
171 | public: | ||
172 | typedef typename Callbackable<ParamT>::CallbackTN* CallbackPtr; | ||
173 | protected: | ||
174 | typedef std::set<CallbackPtr> Callbacks; | ||
175 | |||
176 | public: | ||
177 | virtual ~Observable(); | ||
178 | |||
179 | /** | ||
180 | * Registers a method of a specific object as a listener to the Observable | ||
181 | * object. The method is invoked on the given delegate when the callback is | ||
182 | * fired by the observed object using NotifyListeners(). Since this is meant | ||
183 | * to be a closure, it also passes the third argument to the method being in- | ||
184 | * voked by the Observable object. | ||
185 | * | ||
186 | * @param DelegateT the type of the delegate object | ||
187 | * @param method a pointer to the method to be invoked by the callback | ||
188 | * @param delegate a pointer to the object to invoke the callback on | ||
189 | * @param closure something to be passed to `method` | ||
190 | * @return a handle to the registered callback | ||
191 | */ | ||
192 | template <class DelegateT, class ClosureDataT> | ||
193 | CallbackPtr RegisterListener( | ||
194 | typename BoundClosure<ParamT, | ||
195 | DelegateT, | ||
196 | ClosureDataT>::CallbackMethod method, | ||
197 | DelegateT *delegate, | ||
198 | ClosureDataT data); | ||
199 | |||
200 | /** | ||
201 | * Registers a method of a specific object as a listener to the Observable | ||
202 | * object. The method is invoked on the given delegate when the callback is | ||
203 | * fired by the observed object using NotifyListeners(). | ||
204 | * | ||
205 | * @param DelegateT the type of the delegate object | ||
206 | * @param method a pointer to the method to be invoked by the callback | ||
207 | * @param delegate a pointer to the object to invoke the callback on | ||
208 | * @return a handle to the registered callback | ||
209 | */ | ||
210 | template <class DelegateT> | ||
211 | CallbackPtr RegisterListener( | ||
212 | typename BoundCallback<ParamT, DelegateT>::CallbackMethod method, | ||
213 | DelegateT *delegate); | ||
214 | |||
215 | /** | ||
216 | * Registers a static class member or a C-like function as a callback to the | ||
217 | * Observable object. The function is invoked when the callback is fired by | ||
218 | * the observed object using NotifyListeners(). | ||
219 | * | ||
220 | * @param fn a pointer to the function to be called by the callback | ||
221 | * @return a handle to the registered callback | ||
222 | */ | ||
223 | CallbackPtr RegisterListener(typename Callback<ParamT>::CallbackFunction fn); | ||
224 | |||
225 | /** | ||
226 | * Removes the given callback from the listeners group of this Observable. | ||
227 | * | ||
228 | * @param callback_object a callback handle that was returned by | ||
229 | * RegisterListener() before. | ||
230 | */ | ||
231 | void UnregisterListener(CallbackPtr callback_object); | ||
232 | |||
233 | /** | ||
234 | * Removes all listeners from the Observable | ||
235 | */ | ||
236 | void UnregisterListeners(); | ||
237 | |||
238 | protected: | ||
239 | Observable(); // don't instantiate this as a stand alone object | ||
240 | |||
241 | void RegisterListener(CallbackPtr callback_object); | ||
242 | |||
243 | /** | ||
244 | * Notifies all registered listeners and passes them the provided argument | ||
245 | * This method should be called by a derived class to send out asynchronous | ||
246 | * messages to registered observers. | ||
247 | * | ||
248 | * @param parameter the data to be passed to the observers | ||
249 | */ | ||
250 | void NotifyListeners(const ParamT ¶meter); | ||
251 | |||
252 | private: | ||
253 | Callbacks listeners_; //!< the set of registered | ||
254 | //!< callback objects | ||
255 | mutable pthread_rwlock_t listeners_rw_lock_; | ||
256 | }; | ||
257 | |||
258 | |||
259 | // | ||
260 | // ----------------------------------------------------------------------------- | ||
261 | // | ||
262 | |||
263 | |||
264 | /** | ||
265 | * Returns the number of CPU cores present in the system or a fallback number | ||
266 | * if it failed to determine the number of CPU cores. | ||
267 | * | ||
268 | * @return the number of active CPU cores in the system | ||
269 | */ | ||
270 | CVMFS_EXPORT unsigned int GetNumberOfCpuCores(); | ||
271 | static const unsigned int kFallbackNumberOfCpus = 1; | ||
272 | |||
273 | |||
274 | /** | ||
275 | * A blocking signal for thread synchronization | ||
276 | */ | ||
277 | class CVMFS_EXPORT Signal : SingleCopy { | ||
278 | public: | ||
279 | Signal(); | ||
280 | ~Signal(); | ||
281 | void Wakeup(); | ||
282 | void Wait(); | ||
283 | bool IsSleeping(); | ||
284 | |||
285 | private: | ||
286 | bool fired_; | ||
287 | pthread_mutex_t lock_; | ||
288 | pthread_cond_t signal_; | ||
289 | }; | ||
290 | |||
291 | |||
292 | // | ||
293 | // ----------------------------------------------------------------------------- | ||
294 | // | ||
295 | |||
296 | |||
297 | /** | ||
298 | * Asynchronous FIFO channel template | ||
299 | * Implements a thread safe FIFO queue that handles thread blocking if the queue | ||
300 | * is full or empty. | ||
301 | * | ||
302 | * @param T the data type to be enqueued in the queue | ||
303 | */ | ||
304 | template <class T> | ||
305 | class FifoChannel : protected std::queue<T> { | ||
306 | public: | ||
307 | /** | ||
308 | * Creates a new FIFO channel. | ||
309 | * | ||
310 | * @param maximal_length the maximal number of items that can be enqueued | ||
311 | * @param drainout_threshold if less than xx elements are in the queue it is | ||
312 | * considered to be "not full" | ||
313 | */ | ||
314 | FifoChannel(const size_t maximal_length, | ||
315 | const size_t drainout_threshold); | ||
316 | virtual ~FifoChannel(); | ||
317 | |||
318 | /** | ||
319 | * Adds a new item to the end of the FIFO channel. If the queue is full, this | ||
320 | * call will block until items were dequeued by another thread allowing the | ||
321 | * desired insertion. | ||
322 | * | ||
323 | * @param data the data to be enqueued into the FIFO channel | ||
324 | */ | ||
325 | void Enqueue(const T &data); | ||
326 | |||
327 | /** | ||
328 | * Removes the next element from the channel. If the queue is empty, this will | ||
329 | * block until another thread enqueues an item into the channel. | ||
330 | * | ||
331 | * @return the first item in the channel queue | ||
332 | */ | ||
333 | const T Dequeue(); | ||
334 | |||
335 | /** | ||
336 | * Clears all items in the FIFO channel. The cleared items will be lost. | ||
337 | * | ||
338 | * @return the number of dropped items | ||
339 | */ | ||
340 | unsigned int Drop(); | ||
341 | |||
342 | inline size_t GetItemCount() const; | ||
343 | inline bool IsEmpty() const; | ||
344 | inline size_t GetMaximalItemCount() const; | ||
345 | |||
346 | private: | ||
347 | // general configuration | ||
348 | const size_t maximal_queue_length_; | ||
349 | const size_t queue_drainout_threshold_; | ||
350 | |||
351 | // thread synchronisation structures | ||
352 | mutable pthread_mutex_t mutex_; | ||
353 | mutable pthread_cond_t queue_is_not_empty_; | ||
354 | mutable pthread_cond_t queue_is_not_full_; | ||
355 | }; | ||
356 | |||
357 | |||
358 | /** | ||
359 | * This template implements a generic producer/consumer approach to concurrent | ||
360 | * worker tasks. It spawns a given number of Workers derived from the base class | ||
361 | * ConcurrentWorker and uses them to distribute the work load onto concurrent | ||
362 | * threads. | ||
363 | * One can have multiple producers, that use Schedule() to post new work into | ||
364 | * a central job queue, which in turn is processed concurrently by the Worker | ||
365 | * objects in multiple threads. Furthermore the template provides an interface | ||
366 | * to control the worker swarm, i.e. to wait for their completion or cancel them | ||
367 | * before all jobs are processed. | ||
368 | * | ||
369 | * Note: A worker is a class inheriting from ConcurrentWorker that needs to meet | ||
370 | * a couple of requirements. See the documentation of ConcurrentWorker for | ||
371 | * additional details. | ||
372 | * | ||
373 | * @param WorkerT the class to be used as a worker for a concurrent worker | ||
374 | * swarm | ||
375 | */ | ||
376 | template <class WorkerT> | ||
377 | class ConcurrentWorkers : public Observable<typename WorkerT::returned_data> { | ||
378 | public: | ||
379 | // these data types must be defined by the worker class | ||
380 | /** | ||
381 | * Input data type | ||
382 | */ | ||
383 | typedef typename WorkerT::expected_data expected_data_t; | ||
384 | /** | ||
385 | * Output data type | ||
386 | */ | ||
387 | typedef typename WorkerT::returned_data returned_data_t; | ||
388 | /** | ||
389 | * Common context type | ||
390 | */ | ||
391 | typedef typename WorkerT::worker_context worker_context_t; | ||
392 | |||
393 | protected: | ||
394 | typedef std::vector<pthread_t> WorkerThreads; | ||
395 | |||
396 | /** | ||
397 | * This is a simple wrapper structure to piggy-back control information on | ||
398 | * scheduled jobs. Job structures are scheduled into a central FIFO queue and | ||
399 | * are then processed concurrently by the workers. | ||
400 | */ | ||
401 | template <class DataT> | ||
402 | struct Job { | ||
403 | ✗ | explicit Job(const DataT &data) : | |
404 | ✗ | data(data), | |
405 | ✗ | is_death_sentence(false) {} | |
406 | ✗ | Job() : | |
407 | ✗ | data(), | |
408 | ✗ | is_death_sentence(true) {} | |
409 | const DataT data; //!< job payload | ||
410 | const bool is_death_sentence; //!< death sentence flag | ||
411 | }; | ||
412 | typedef Job<expected_data_t> WorkerJob; | ||
413 | typedef Job<returned_data_t> CallbackJob; | ||
414 | |||
415 | /** | ||
416 | * Provides a wrapper for initialization data passed to newly spawned worker | ||
417 | * threads for initialization. | ||
418 | * It contains a pointer to the spawning ConcurrentWorkers master object as | ||
419 | * well as a pointer to a context object defined by the concrete worker to be | ||
420 | * spawned. | ||
421 | */ | ||
422 | struct RunBinding { | ||
423 | ✗ | explicit RunBinding(ConcurrentWorkers<WorkerT> *delegate) : | |
424 | ✗ | delegate(delegate) {} | |
425 | ConcurrentWorkers<WorkerT> *delegate; //!< delegate to the Concurrent- | ||
426 | //!< Workers master | ||
427 | }; | ||
428 | |||
429 | struct WorkerRunBinding : RunBinding { | ||
430 | ✗ | WorkerRunBinding(ConcurrentWorkers<WorkerT> *delegate, | |
431 | const worker_context_t *worker_context) : | ||
432 | RunBinding(delegate), | ||
433 | ✗ | worker_context(worker_context) {} | |
434 | /** | ||
435 | * WorkerT defined context objects for worker init. | ||
436 | */ | ||
437 | const worker_context_t *worker_context; | ||
438 | }; | ||
439 | |||
440 | public: | ||
441 | /** | ||
442 | * Creates a ConcurrentWorkers master object that encapsulates the actual | ||
443 | * workers. | ||
444 | * | ||
445 | * @param number_of_workers the number of concurrent workers to be spawned | ||
446 | * @param maximal_queue_length the maximal length of the job queue | ||
447 | * (>= number_of_workers) | ||
448 | * @param worker_context a pointer to the WorkerT defined context object | ||
449 | */ | ||
450 | ConcurrentWorkers(const size_t number_of_workers, | ||
451 | const size_t maximal_queue_length, | ||
452 | worker_context_t *worker_context = NULL); | ||
453 | virtual ~ConcurrentWorkers(); | ||
454 | |||
455 | /** | ||
456 | * Initializes the ConcurrentWorkers swarm, spawnes a thread for each new | ||
457 | * worker object and puts everything into working state. | ||
458 | * | ||
459 | * @return true if all went fine | ||
460 | */ | ||
461 | bool Initialize(); | ||
462 | |||
463 | /** | ||
464 | * Schedules a new job for processing into the internal job queue. This method | ||
465 | * will block in case the job queue is already full and wait for an empty slot. | ||
466 | * | ||
467 | * @param data the data to be processed | ||
468 | */ | ||
469 | ✗ | inline void Schedule(const expected_data_t &data) { | |
470 | ✗ | Schedule(WorkerJob(data)); | |
471 | } | ||
472 | |||
473 | /** | ||
474 | * Shuts down the ConcurrentWorkers object as well as the encapsulated workers | ||
475 | * as soon as possible. Workers will finish their current job and will termi- | ||
476 | * nate afterwards. If no jobs are scheduled they will simply stop waiting for | ||
477 | * new ones and terminate afterwards. | ||
478 | * This method MUST not be called more than once per ConcurrentWorkers. | ||
479 | */ | ||
480 | void Terminate(); | ||
481 | |||
482 | /** | ||
483 | * Waits until the job queue is fully processed | ||
484 | * | ||
485 | * Note: this might lead to undefined behaviour or infinite waiting if other | ||
486 | * producers still schedule jobs into the job queue. | ||
487 | */ | ||
488 | void WaitForEmptyQueue() const; | ||
489 | |||
490 | /** | ||
491 | * Waits until the ConcurrentWorkers swarm fully processed the current job | ||
492 | * queue and shuts down afterwards. | ||
493 | * | ||
494 | * Note: just as for WaitForEmptyQueue() this assumes that no other producers | ||
495 | * schedule jobs in the mean time. | ||
496 | */ | ||
497 | void WaitForTermination(); | ||
498 | |||
499 | ✗ | inline unsigned int GetNumberOfWorkers() const { return number_of_workers_; } | |
500 | ✗ | inline unsigned int GetNumberOfFailedJobs() const { | |
501 | ✗ | return atomic_read32(&jobs_failed_); | |
502 | } | ||
503 | |||
504 | /** | ||
505 | * Defines a job as successfully finished. | ||
506 | * DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT! | ||
507 | * | ||
508 | * @param data the data to be returned back to the user | ||
509 | */ | ||
510 | ✗ | inline void JobSuccessful(const returned_data_t& data) { | |
511 | ✗ | JobDone(data, true); | |
512 | } | ||
513 | |||
514 | /** | ||
515 | * Defines a job as failed. | ||
516 | * DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT! | ||
517 | * | ||
518 | * Note: Even for failed jobs the user will get a callback with a data object. | ||
519 | * You might want to make sure, that this data contains a status flag as | ||
520 | * well, telling the user what went wrong. | ||
521 | * | ||
522 | * @param data the data to be returned back to the user | ||
523 | */ | ||
524 | ✗ | inline void JobFailed(const returned_data_t& data) { JobDone(data, false); } | |
525 | |||
526 | void RunCallbackThread(); | ||
527 | |||
528 | protected: | ||
529 | bool SpawnWorkers(); | ||
530 | |||
531 | /** | ||
532 | * POSIX conform function for thread entry point. Is invoked for every new | ||
533 | * worker thread and contains the initialization, processing loop and tear | ||
534 | * down of the unique worker objects | ||
535 | * | ||
536 | * @param run_binding void pointer to a RunBinding structure (C interface) | ||
537 | * @return NULL in any case | ||
538 | */ | ||
539 | static void* RunWorker(void *run_binding); | ||
540 | |||
541 | static void* RunCallbackThreadWrapper(void *run_binding); | ||
542 | |||
543 | /** | ||
544 | * Tells the master that a worker thread did start. This does not mean, that | ||
545 | * it was initialized successfully. | ||
546 | */ | ||
547 | void ReportStartedWorker() const; | ||
548 | |||
549 | void Schedule(WorkerJob job); | ||
550 | void ScheduleDeathSentences(); | ||
551 | |||
552 | /** | ||
553 | * Empties the job queue | ||
554 | * | ||
555 | * @param forget_pending controls if cancelled jobs should be seen as finished | ||
556 | */ | ||
557 | void TruncateJobQueue(const bool forget_pending = false); | ||
558 | |||
559 | /** | ||
560 | * Retrieves a job from the job queue. If the job queue is empty it will block | ||
561 | * until there is a new job available for processing. | ||
562 | * THIS METHOD MUST ONLY BE CALLED INSIDE THE WORKER OBJECTS | ||
563 | * | ||
564 | * @return a job to be processed by a worker | ||
565 | */ | ||
566 | inline WorkerJob Acquire(); | ||
567 | |||
568 | /** | ||
569 | * Controls the asynchronous finishing of a job. | ||
570 | * DO NOT CALL THIS, use JobSuccessful() or JobFailed() wrappers instead. | ||
571 | * | ||
572 | * @param data the data to be returned to the user | ||
573 | * @param success flag if job was successful | ||
574 | */ | ||
575 | void JobDone(const returned_data_t& data, const bool success = true); | ||
576 | |||
577 | ✗ | inline void StartRunning() { | |
578 | ✗ | MutexLockGuard guard(status_mutex_); | |
579 | ✗ | running_ = true; | |
580 | } | ||
581 | ✗ | inline void StopRunning() { | |
582 | ✗ | MutexLockGuard guard(status_mutex_); | |
583 | ✗ | running_ = false; | |
584 | } | ||
585 | ✗ | inline bool IsRunning() const { | |
586 | ✗ | MutexLockGuard guard(status_mutex_); | |
587 | ✗ | return running_; | |
588 | } | ||
589 | |||
590 | private: | ||
591 | // general configuration | ||
592 | const size_t number_of_workers_; //!< number of concurrent worker threads | ||
593 | const worker_context_t *worker_context_; //!< the WorkerT defined context | ||
594 | /** | ||
595 | * The thread context passed to newly spawned threads | ||
596 | */ | ||
597 | WorkerRunBinding thread_context_; | ||
598 | |||
599 | // status information | ||
600 | bool initialized_; | ||
601 | bool running_; | ||
602 | mutable unsigned int workers_started_; | ||
603 | mutable pthread_mutex_t status_mutex_; | ||
604 | mutable pthread_cond_t worker_started_; | ||
605 | mutable pthread_mutex_t jobs_all_done_mutex_; | ||
606 | mutable pthread_cond_t jobs_all_done_; | ||
607 | |||
608 | // worker threads | ||
609 | WorkerThreads worker_threads_; //!< list of worker threads | ||
610 | pthread_t callback_thread_; //!< handles callback invokes | ||
611 | |||
612 | // job queue | ||
613 | typedef FifoChannel<WorkerJob > JobQueue; | ||
614 | JobQueue jobs_queue_; | ||
615 | mutable atomic_int32 jobs_pending_; | ||
616 | mutable atomic_int32 jobs_failed_; | ||
617 | mutable atomic_int64 jobs_processed_; | ||
618 | |||
619 | // callback channel | ||
620 | typedef FifoChannel<CallbackJob > CallbackQueue; | ||
621 | CallbackQueue results_queue_; | ||
622 | }; | ||
623 | |||
624 | |||
625 | /** | ||
626 | * Base class for worker classes that should be used in a ConcurrentWorkers | ||
627 | * swarm. These classes need to fulfill a number of requirements in order to | ||
628 | * satisfy the needs of the ConcurrentWorkers template. | ||
629 | * | ||
630 | * Requirements: | ||
631 | * -> needs to define the following types: | ||
632 | * - expected_data - input data structure of the worker | ||
633 | * - returned_data - output data structure of the worker | ||
634 | * - worker_context - context structure for initialization information | ||
635 | * | ||
636 | * -> implement a constructor that takes a pointer to its worker_context | ||
637 | * as its only parameter: | ||
638 | * AwesomeWorker(const AwesomeWorker::worker_context*) | ||
639 | * Note: do not rely on the context object to be available after the | ||
640 | * constructor has returned! | ||
641 | * | ||
642 | * -> needs to define the calling-operator expecting one parameter of type: | ||
643 | * const expected_data& and returning void | ||
644 | * This will be invoked for every new job the worker should process | ||
645 | * | ||
646 | * -> inside the implementation of the described calling-operator it needs to | ||
647 | * invoke either: | ||
648 | * master()->JobSuccessful(const returned_data&) | ||
649 | * or: | ||
650 | * master()->JobFailed(const returned_data&) | ||
651 | * as its LAST operation before returning. | ||
652 | * This will keep track of finished jobs and inform the user of Concurrent- | ||
653 | * Workers about finished jobs. | ||
654 | * | ||
655 | * -> [optional] overwrite Initialize() and/or TearDown() to do environmental | ||
656 | * setup work, before or respectively after jobs will be processed | ||
657 | * | ||
658 | * General Reminder: | ||
659 | * You will be running in a multi-threaded environment here! Buckle up and | ||
660 | * make suitable preparations to shield yourself from serious head-ache. | ||
661 | * | ||
662 | * Note: This implements a Curiously Recurring Template Pattern | ||
663 | * (http://en.wikipedia.org/wiki/Curiously_recurring_template_pattern) | ||
664 | * | ||
665 | * @param DerivedWorkerT the class name of the inheriting class | ||
666 | * (f.e. class AwesomeWorker : public ConcurrentWorker<AwesomeWorker>) | ||
667 | */ | ||
668 | template <class DerivedWorkerT> | ||
669 | class ConcurrentWorker : SingleCopy { | ||
670 | public: | ||
671 | ✗ | virtual ~ConcurrentWorker() {} | |
672 | |||
673 | /** | ||
674 | * Does general initialization before any jobs will get scheduled. You do not | ||
675 | * need to up-call this initialize method, since it is seen as a dummy here. | ||
676 | * | ||
677 | * @returns true one successful initialization | ||
678 | */ | ||
679 | ✗ | virtual bool Initialize() { return true; } | |
680 | |||
681 | /** | ||
682 | * Does general clean-up after the last job was processed in the worker object | ||
683 | * and it is about to vanish. You do not need to up-call this method. | ||
684 | */ | ||
685 | ✗ | virtual void TearDown() {} | |
686 | |||
687 | /** | ||
688 | * The actual job-processing entry point. See the description of the inheriting | ||
689 | * class requirements to learn about the semantics of this methods. | ||
690 | * DO NOT FORGET TO CALL master()->JobSuccessful() OR master()->JobFinished() | ||
691 | * at the end of thismethod!! | ||
692 | * | ||
693 | * Note: There is no way to generally define this operator, it is therefore | ||
694 | * commented out and placed here just as a reminder. | ||
695 | * | ||
696 | * @param data the data to be processed. | ||
697 | */ | ||
698 | // void operator()(const expected_data &data); // do the actual job of the | ||
699 | // worker | ||
700 | |||
701 | protected: | ||
702 | ✗ | ConcurrentWorker() : master_(NULL) {} | |
703 | |||
704 | /** | ||
705 | * Gets a pointer to the ConcurrentWorkers object that this worker resides in | ||
706 | * | ||
707 | * @returns a pointer to the ConcurrentWorkers object | ||
708 | */ | ||
709 | ✗ | inline ConcurrentWorkers<DerivedWorkerT>* master() const { return master_; } | |
710 | |||
711 | private: | ||
712 | friend class ConcurrentWorkers<DerivedWorkerT>; | ||
713 | ✗ | void RegisterMaster(ConcurrentWorkers<DerivedWorkerT> *master) { | |
714 | ✗ | master_ = master; | |
715 | } | ||
716 | |||
717 | private: | ||
718 | ConcurrentWorkers<DerivedWorkerT> *master_; | ||
719 | }; | ||
720 | |||
721 | #ifdef CVMFS_NAMESPACE_GUARD | ||
722 | } // namespace CVMFS_NAMESPACE_GUARD | ||
723 | #endif | ||
724 | |||
725 | #include "util/concurrency_impl.h" | ||
726 | |||
727 | #endif // CVMFS_UTIL_CONCURRENCY_H_ | ||
728 |