| Directory: | cvmfs/ |
|---|---|
| File: | cvmfs/util/concurrency.h |
| Date: | 2025-11-09 02:35:23 |
| Exec | Total | Coverage | |
|---|---|---|---|
| Lines: | 85 | 115 | 73.9% |
| Branches: | 21 | 38 | 55.3% |
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | /** | ||
| 2 | * This file is part of the CernVM File System. | ||
| 3 | */ | ||
| 4 | |||
| 5 | #ifndef CVMFS_UTIL_CONCURRENCY_H_ | ||
| 6 | #define CVMFS_UTIL_CONCURRENCY_H_ | ||
| 7 | |||
| 8 | #include <pthread.h> | ||
| 9 | |||
| 10 | #include <cassert> | ||
| 11 | #include <cstddef> | ||
| 12 | #include <queue> | ||
| 13 | #include <set> | ||
| 14 | #include <vector> | ||
| 15 | |||
| 16 | #include "util/async.h" | ||
| 17 | #include "util/atomic.h" | ||
| 18 | #include "util/export.h" | ||
| 19 | #include "util/mutex.h" | ||
| 20 | #include "util/single_copy.h" | ||
| 21 | |||
| 22 | #ifdef CVMFS_NAMESPACE_GUARD | ||
| 23 | namespace CVMFS_NAMESPACE_GUARD { | ||
| 24 | #endif | ||
| 25 | |||
| 26 | /** | ||
| 27 | * A thread-safe, unbounded vector of items that implement a FIFO channel. | ||
| 28 | * Uses conditional variables to block when threads try to pop from the empty | ||
| 29 | * channel. | ||
| 30 | */ | ||
| 31 | template<class ItemT> | ||
| 32 | class Channel : SingleCopy { | ||
| 33 | public: | ||
| 34 | 91 | Channel() { | |
| 35 | 91 | int retval = pthread_mutex_init(&lock_, NULL); | |
| 36 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 91 times.
|
91 | assert(retval == 0); |
| 37 | 91 | retval = pthread_cond_init(&cond_populated_, NULL); | |
| 38 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 91 times.
|
91 | assert(retval == 0); |
| 39 | 91 | } | |
| 40 | |||
| 41 | 91 | ~Channel() { | |
| 42 | 91 | pthread_cond_destroy(&cond_populated_); | |
| 43 | 91 | pthread_mutex_destroy(&lock_); | |
| 44 | 91 | } | |
| 45 | |||
| 46 | /** | ||
| 47 | * Returns the queue locked and ready for appending 1 item. | ||
| 48 | */ | ||
| 49 | 460000 | std::vector<ItemT *> *StartEnqueueing() { | |
| 50 | 460000 | int retval = pthread_mutex_lock(&lock_); | |
| 51 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 460000 times.
|
460000 | assert(retval == 0); |
| 52 | 460000 | return &items_; | |
| 53 | } | ||
| 54 | |||
| 55 | /** | ||
| 56 | * Unlocks the queue. The queue must remain unchanged when this is called. | ||
| 57 | */ | ||
| 58 | 230000 | void AbortEnqueueing() { | |
| 59 | 230000 | int retval = pthread_mutex_unlock(&lock_); | |
| 60 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 230000 times.
|
230000 | assert(retval == 0); |
| 61 | 230000 | } | |
| 62 | |||
| 63 | /** | ||
| 64 | * 1 new item was added to the queue. Unlock and signal to reader thread. | ||
| 65 | */ | ||
| 66 | 230000 | void CommitEnqueueing() { | |
| 67 | 230000 | int retval = pthread_cond_signal(&cond_populated_); | |
| 68 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 230000 times.
|
230000 | assert(retval == 0); |
| 69 | 230000 | retval = pthread_mutex_unlock(&lock_); | |
| 70 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 230000 times.
|
230000 | assert(retval == 0); |
| 71 | 230000 | } | |
| 72 | |||
| 73 | 81 | void PushBack(ItemT *item) { | |
| 74 | 81 | MutexLockGuard lock_guard(&lock_); | |
| 75 |
1/2✓ Branch 1 taken 81 times.
✗ Branch 2 not taken.
|
81 | items_.push_back(item); |
| 76 | 81 | int retval = pthread_cond_signal(&cond_populated_); | |
| 77 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 81 times.
|
81 | assert(retval == 0); |
| 78 | 81 | } | |
| 79 | |||
| 80 | /** | ||
| 81 | * Remove and return the first element from the queue. Block if tube is | ||
| 82 | * empty. | ||
| 83 | */ | ||
| 84 | 230081 | ItemT *PopFront() { | |
| 85 | 230081 | MutexLockGuard lock_guard(&lock_); | |
| 86 |
2/2✓ Branch 1 taken 845 times.
✓ Branch 2 taken 230081 times.
|
230926 | while (items_.size() == 0) |
| 87 |
1/2✓ Branch 1 taken 845 times.
✗ Branch 2 not taken.
|
845 | pthread_cond_wait(&cond_populated_, &lock_); |
| 88 | 230081 | ItemT *item = items_[0]; | |
| 89 |
1/2✓ Branch 3 taken 230081 times.
✗ Branch 4 not taken.
|
230081 | items_.erase(items_.begin()); |
| 90 | 230081 | return item; | |
| 91 | 230081 | } | |
| 92 | |||
| 93 | private: | ||
| 94 | /** | ||
| 95 | * The locked queue/channel | ||
| 96 | */ | ||
| 97 | std::vector<ItemT *> items_; | ||
| 98 | /** | ||
| 99 | * Protects all internal state | ||
| 100 | */ | ||
| 101 | pthread_mutex_t lock_; | ||
| 102 | /** | ||
| 103 | * Signals if there are items enqueued | ||
| 104 | */ | ||
| 105 | pthread_cond_t cond_populated_; | ||
| 106 | }; | ||
| 107 | |||
| 108 | /** | ||
| 109 | * Implements a simple interface to lock objects of derived classes. Classes | ||
| 110 | * that inherit from Lockable are also usable with the LockGuard template for | ||
| 111 | * scoped locking semantics. | ||
| 112 | */ | ||
| 113 | class CVMFS_EXPORT Lockable : SingleCopy { | ||
| 114 | public: | ||
| 115 | 92 | inline virtual ~Lockable() { pthread_mutex_destroy(&mutex_); } | |
| 116 | |||
| 117 | 46 | void Lock() const { pthread_mutex_lock(&mutex_); } | |
| 118 | 92 | int TryLock() const { return pthread_mutex_trylock(&mutex_); } | |
| 119 | 46 | void Unlock() const { pthread_mutex_unlock(&mutex_); } | |
| 120 | |||
| 121 | protected: | ||
| 122 | 46 | Lockable() { | |
| 123 | 46 | const int retval = pthread_mutex_init(&mutex_, NULL); | |
| 124 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 46 times.
|
46 | assert(retval == 0); |
| 125 | 46 | } | |
| 126 | |||
| 127 | private: | ||
| 128 | mutable pthread_mutex_t mutex_; | ||
| 129 | }; | ||
| 130 | |||
| 131 | |||
| 132 | // | ||
| 133 | // ----------------------------------------------------------------------------- | ||
| 134 | // | ||
| 135 | |||
| 136 | |||
| 137 | /** | ||
| 138 | * This counter can be counted up and down using the usual increment/decrement | ||
| 139 | * operators. It allows threads to wait for it to become zero as well as to | ||
| 140 | * block when a specified maximal value would be exceeded by an increment. | ||
| 141 | * | ||
| 142 | * Note: If a maximal value is specified on creation, the SynchronizingCounter | ||
| 143 | * is assumed to never leave the interval [0, maximal_value]! Otherwise | ||
| 144 | * the numerical limits of the specified template parameter define this | ||
| 145 | * interval and an increment _never_ blocks. | ||
| 146 | * | ||
| 147 | * Caveat: This implementation uses a simple mutex mechanism and therefore might | ||
| 148 | * become a scalability bottle neck! | ||
| 149 | */ | ||
| 150 | template<typename T> | ||
| 151 | class SynchronizingCounter : SingleCopy { | ||
| 152 | public: | ||
| 153 | 136 | SynchronizingCounter() : value_(T(0)), maximal_value_(T(0)) { Initialize(); } | |
| 154 | |||
| 155 | 3514 | explicit SynchronizingCounter(const T maximal_value) | |
| 156 | 3514 | : value_(T(0)), maximal_value_(maximal_value) { | |
| 157 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3514 times.
|
3514 | assert(maximal_value > T(0)); |
| 158 | 3514 | Initialize(); | |
| 159 | 3514 | } | |
| 160 | |||
| 161 | 3649 | ~SynchronizingCounter() { Destroy(); } | |
| 162 | |||
| 163 | 1229384668 | T Increment() { | |
| 164 | 1229384668 | MutexLockGuard l(mutex_); | |
| 165 |
1/2✓ Branch 1 taken 1247694975 times.
✗ Branch 2 not taken.
|
1247694975 | WaitForFreeSlotUnprotected(); |
| 166 |
1/2✓ Branch 1 taken 19240781 times.
✗ Branch 2 not taken.
|
1247694975 | SetValueUnprotected(value_ + T(1)); |
| 167 | 1233129252 | return value_; | |
| 168 | 1247694975 | } | |
| 169 | |||
| 170 | 1235710404 | T Decrement() { | |
| 171 | 1235710404 | MutexLockGuard l(mutex_); | |
| 172 |
1/2✓ Branch 1 taken 19240781 times.
✗ Branch 2 not taken.
|
1247694975 | SetValueUnprotected(value_ - T(1)); |
| 173 | 2465679056 | return value_; | |
| 174 | 1247694975 | } | |
| 175 | |||
| 176 | 3751 | void WaitForZero() const { | |
| 177 | 3751 | MutexLockGuard l(mutex_); | |
| 178 |
2/2✓ Branch 0 taken 905 times.
✓ Branch 1 taken 3751 times.
|
4656 | while (value_ != T(0)) { |
| 179 |
1/2✓ Branch 1 taken 905 times.
✗ Branch 2 not taken.
|
905 | pthread_cond_wait(&became_zero_, &mutex_); |
| 180 | } | ||
| 181 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3751 times.
|
3751 | assert(value_ == T(0)); |
| 182 | 3751 | } | |
| 183 | |||
| 184 | 7580947185 | bool HasMaximalValue() const { return maximal_value_ != T(0); } | |
| 185 | 176 | T maximal_value() const { return maximal_value_; } | |
| 186 | |||
| 187 | 19215174 | T operator++() { return Increment(); } | |
| 188 | 73 | const T operator++(int) { return Increment() - T(1); } | |
| 189 | 19227392 | T operator--() { return Decrement(); } | |
| 190 | 73 | const T operator--(int) { return Decrement() + T(1); } | |
| 191 | |||
| 192 | 60690 | T Get() const { | |
| 193 | 60690 | MutexLockGuard l(mutex_); | |
| 194 | 121380 | return value_; | |
| 195 | 60690 | } | |
| 196 | |||
| 197 | 263 | SynchronizingCounter<T> &operator=(const T &other) { | |
| 198 | 263 | MutexLockGuard l(mutex_); | |
| 199 | 263 | SetValueUnprotected(other); | |
| 200 | 526 | return *this; | |
| 201 | 263 | } | |
| 202 | |||
| 203 | protected: | ||
| 204 | void SetValueUnprotected(const T new_value); | ||
| 205 | void WaitForFreeSlotUnprotected(); | ||
| 206 | |||
| 207 | private: | ||
| 208 | void Initialize(); | ||
| 209 | void Destroy(); | ||
| 210 | |||
| 211 | private: | ||
| 212 | T value_; | ||
| 213 | const T maximal_value_; | ||
| 214 | |||
| 215 | mutable pthread_mutex_t mutex_; | ||
| 216 | mutable pthread_cond_t became_zero_; | ||
| 217 | pthread_cond_t free_slot_; | ||
| 218 | }; | ||
| 219 | |||
| 220 | |||
| 221 | // | ||
| 222 | // ----------------------------------------------------------------------------- | ||
| 223 | // | ||
| 224 | |||
| 225 | |||
| 226 | template<typename ParamT> | ||
| 227 | class Observable; | ||
| 228 | |||
| 229 | |||
| 230 | /** | ||
| 231 | * This is a base class for classes that need to expose a callback interface for | ||
| 232 | * asynchronous callback methods. One can register an arbitrary number of | ||
| 233 | * observers on an Observable that get notified when the method | ||
| 234 | * NotifyListeners() is invoked. | ||
| 235 | * | ||
| 236 | * Note: the registration and invocation of callbacks in Observable is thread- | ||
| 237 | * safe, but be aware that the callbacks of observing classes might run in | ||
| 238 | * arbitrary threads. When using these classes, you should take extra care | ||
| 239 | * for thread-safety. | ||
| 240 | * | ||
| 241 | * Note: The RegisterListener() methods return a pointer to a CallbackBase. | ||
| 242 | * You MUST NOT free these objects, they are managed by the Observable | ||
| 243 | * class. Use them only as handles to unregister specific callbacks. | ||
| 244 | * | ||
| 245 | * @param ParamT the type of the parameter that is passed to every callback | ||
| 246 | * invocation. | ||
| 247 | */ | ||
| 248 | template<typename ParamT> | ||
| 249 | class Observable : public Callbackable<ParamT>, SingleCopy { | ||
| 250 | public: | ||
| 251 | typedef typename Callbackable<ParamT>::CallbackTN *CallbackPtr; | ||
| 252 | |||
| 253 | protected: | ||
| 254 | typedef std::set<CallbackPtr> Callbacks; | ||
| 255 | |||
| 256 | public: | ||
| 257 | virtual ~Observable(); | ||
| 258 | |||
| 259 | /** | ||
| 260 | * Registers a method of a specific object as a listener to the Observable | ||
| 261 | * object. The method is invoked on the given delegate when the callback is | ||
| 262 | * fired by the observed object using NotifyListeners(). Since this is meant | ||
| 263 | * to be a closure, it also passes the third argument to the method being in- | ||
| 264 | * voked by the Observable object. | ||
| 265 | * | ||
| 266 | * @param DelegateT the type of the delegate object | ||
| 267 | * @param method a pointer to the method to be invoked by the callback | ||
| 268 | * @param delegate a pointer to the object to invoke the callback on | ||
| 269 | * @param closure something to be passed to `method` | ||
| 270 | * @return a handle to the registered callback | ||
| 271 | */ | ||
| 272 | template<class DelegateT, class ClosureDataT> | ||
| 273 | CallbackPtr RegisterListener( | ||
| 274 | typename BoundClosure<ParamT, DelegateT, ClosureDataT>::CallbackMethod | ||
| 275 | method, | ||
| 276 | DelegateT *delegate, | ||
| 277 | ClosureDataT data); | ||
| 278 | |||
| 279 | /** | ||
| 280 | * Registers a method of a specific object as a listener to the Observable | ||
| 281 | * object. The method is invoked on the given delegate when the callback is | ||
| 282 | * fired by the observed object using NotifyListeners(). | ||
| 283 | * | ||
| 284 | * @param DelegateT the type of the delegate object | ||
| 285 | * @param method a pointer to the method to be invoked by the callback | ||
| 286 | * @param delegate a pointer to the object to invoke the callback on | ||
| 287 | * @return a handle to the registered callback | ||
| 288 | */ | ||
| 289 | template<class DelegateT> | ||
| 290 | CallbackPtr RegisterListener( | ||
| 291 | typename BoundCallback<ParamT, DelegateT>::CallbackMethod method, | ||
| 292 | DelegateT *delegate); | ||
| 293 | |||
| 294 | /** | ||
| 295 | * Registers a static class member or a C-like function as a callback to the | ||
| 296 | * Observable object. The function is invoked when the callback is fired by | ||
| 297 | * the observed object using NotifyListeners(). | ||
| 298 | * | ||
| 299 | * @param fn a pointer to the function to be called by the callback | ||
| 300 | * @return a handle to the registered callback | ||
| 301 | */ | ||
| 302 | CallbackPtr RegisterListener(typename Callback<ParamT>::CallbackFunction fn); | ||
| 303 | |||
| 304 | /** | ||
| 305 | * Removes the given callback from the listeners group of this Observable. | ||
| 306 | * | ||
| 307 | * @param callback_object a callback handle that was returned by | ||
| 308 | * RegisterListener() before. | ||
| 309 | */ | ||
| 310 | void UnregisterListener(CallbackPtr callback_object); | ||
| 311 | |||
| 312 | /** | ||
| 313 | * Removes all listeners from the Observable | ||
| 314 | */ | ||
| 315 | void UnregisterListeners(); | ||
| 316 | |||
| 317 | protected: | ||
| 318 | Observable(); // don't instantiate this as a stand alone object | ||
| 319 | |||
| 320 | void RegisterListener(CallbackPtr callback_object); | ||
| 321 | |||
| 322 | /** | ||
| 323 | * Notifies all registered listeners and passes them the provided argument | ||
| 324 | * This method should be called by a derived class to send out asynchronous | ||
| 325 | * messages to registered observers. | ||
| 326 | * | ||
| 327 | * @param parameter the data to be passed to the observers | ||
| 328 | */ | ||
| 329 | void NotifyListeners(const ParamT ¶meter); | ||
| 330 | |||
| 331 | private: | ||
| 332 | Callbacks listeners_; //!< the set of registered | ||
| 333 | //!< callback objects | ||
| 334 | mutable pthread_rwlock_t listeners_rw_lock_; | ||
| 335 | }; | ||
| 336 | |||
| 337 | |||
| 338 | // | ||
| 339 | // ----------------------------------------------------------------------------- | ||
| 340 | // | ||
| 341 | |||
| 342 | |||
| 343 | /** | ||
| 344 | * Returns the number of CPU cores present in the system or a fallback number | ||
| 345 | * if it failed to determine the number of CPU cores. | ||
| 346 | * | ||
| 347 | * @return the number of active CPU cores in the system | ||
| 348 | */ | ||
| 349 | CVMFS_EXPORT unsigned int GetNumberOfCpuCores(); | ||
| 350 | static const unsigned int kFallbackNumberOfCpus = 1; | ||
| 351 | |||
| 352 | |||
| 353 | /** | ||
| 354 | * A blocking signal for thread synchronization | ||
| 355 | */ | ||
| 356 | class CVMFS_EXPORT Signal : SingleCopy { | ||
| 357 | public: | ||
| 358 | Signal(); | ||
| 359 | ~Signal(); | ||
| 360 | void Wakeup(); | ||
| 361 | void Wait(); | ||
| 362 | bool IsSleeping(); | ||
| 363 | |||
| 364 | private: | ||
| 365 | bool fired_; | ||
| 366 | pthread_mutex_t lock_; | ||
| 367 | pthread_cond_t signal_; | ||
| 368 | }; | ||
| 369 | |||
| 370 | |||
| 371 | // | ||
| 372 | // ----------------------------------------------------------------------------- | ||
| 373 | // | ||
| 374 | |||
| 375 | |||
| 376 | /** | ||
| 377 | * Asynchronous FIFO channel template | ||
| 378 | * Implements a thread safe FIFO queue that handles thread blocking if the queue | ||
| 379 | * is full or empty. | ||
| 380 | * | ||
| 381 | * @param T the data type to be enqueued in the queue | ||
| 382 | */ | ||
| 383 | template<class T> | ||
| 384 | class FifoChannel : protected std::queue<T> { | ||
| 385 | public: | ||
| 386 | /** | ||
| 387 | * Creates a new FIFO channel. | ||
| 388 | * | ||
| 389 | * @param maximal_length the maximal number of items that can be enqueued | ||
| 390 | * @param drainout_threshold if less than xx elements are in the queue it is | ||
| 391 | * considered to be "not full" | ||
| 392 | */ | ||
| 393 | FifoChannel(const size_t maximal_length, const size_t drainout_threshold); | ||
| 394 | virtual ~FifoChannel(); | ||
| 395 | |||
| 396 | /** | ||
| 397 | * Adds a new item to the end of the FIFO channel. If the queue is full, this | ||
| 398 | * call will block until items were dequeued by another thread allowing the | ||
| 399 | * desired insertion. | ||
| 400 | * | ||
| 401 | * @param data the data to be enqueued into the FIFO channel | ||
| 402 | */ | ||
| 403 | void Enqueue(const T &data); | ||
| 404 | |||
| 405 | /** | ||
| 406 | * Removes the next element from the channel. If the queue is empty, this will | ||
| 407 | * block until another thread enqueues an item into the channel. | ||
| 408 | * | ||
| 409 | * @return the first item in the channel queue | ||
| 410 | */ | ||
| 411 | const T Dequeue(); | ||
| 412 | |||
| 413 | /** | ||
| 414 | * Clears all items in the FIFO channel. The cleared items will be lost. | ||
| 415 | * | ||
| 416 | * @return the number of dropped items | ||
| 417 | */ | ||
| 418 | unsigned int Drop(); | ||
| 419 | |||
| 420 | inline size_t GetItemCount() const; | ||
| 421 | inline bool IsEmpty() const; | ||
| 422 | inline size_t GetMaximalItemCount() const; | ||
| 423 | |||
| 424 | private: | ||
| 425 | // general configuration | ||
| 426 | const size_t maximal_queue_length_; | ||
| 427 | const size_t queue_drainout_threshold_; | ||
| 428 | |||
| 429 | // thread synchronisation structures | ||
| 430 | mutable pthread_mutex_t mutex_; | ||
| 431 | mutable pthread_cond_t queue_is_not_empty_; | ||
| 432 | mutable pthread_cond_t queue_is_not_full_; | ||
| 433 | }; | ||
| 434 | |||
| 435 | |||
| 436 | /** | ||
| 437 | * This template implements a generic producer/consumer approach to concurrent | ||
| 438 | * worker tasks. It spawns a given number of Workers derived from the base class | ||
| 439 | * ConcurrentWorker and uses them to distribute the work load onto concurrent | ||
| 440 | * threads. | ||
| 441 | * One can have multiple producers, that use Schedule() to post new work into | ||
| 442 | * a central job queue, which in turn is processed concurrently by the Worker | ||
| 443 | * objects in multiple threads. Furthermore the template provides an interface | ||
| 444 | * to control the worker swarm, i.e. to wait for their completion or cancel them | ||
| 445 | * before all jobs are processed. | ||
| 446 | * | ||
| 447 | * Note: A worker is a class inheriting from ConcurrentWorker that needs to meet | ||
| 448 | * a couple of requirements. See the documentation of ConcurrentWorker for | ||
| 449 | * additional details. | ||
| 450 | * | ||
| 451 | * @param WorkerT the class to be used as a worker for a concurrent worker | ||
| 452 | * swarm | ||
| 453 | */ | ||
| 454 | template<class WorkerT> | ||
| 455 | class ConcurrentWorkers : public Observable<typename WorkerT::returned_data> { | ||
| 456 | public: | ||
| 457 | // these data types must be defined by the worker class | ||
| 458 | /** | ||
| 459 | * Input data type | ||
| 460 | */ | ||
| 461 | typedef typename WorkerT::expected_data expected_data_t; | ||
| 462 | /** | ||
| 463 | * Output data type | ||
| 464 | */ | ||
| 465 | typedef typename WorkerT::returned_data returned_data_t; | ||
| 466 | /** | ||
| 467 | * Common context type | ||
| 468 | */ | ||
| 469 | typedef typename WorkerT::worker_context worker_context_t; | ||
| 470 | |||
| 471 | protected: | ||
| 472 | typedef std::vector<pthread_t> WorkerThreads; | ||
| 473 | |||
| 474 | /** | ||
| 475 | * This is a simple wrapper structure to piggy-back control information on | ||
| 476 | * scheduled jobs. Job structures are scheduled into a central FIFO queue and | ||
| 477 | * are then processed concurrently by the workers. | ||
| 478 | */ | ||
| 479 | template<class DataT> | ||
| 480 | struct Job { | ||
| 481 | ✗ | explicit Job(const DataT &data) : data(data), is_death_sentence(false) { } | |
| 482 | ✗ | Job() : data(), is_death_sentence(true) { } | |
| 483 | const DataT data; //!< job payload | ||
| 484 | const bool is_death_sentence; //!< death sentence flag | ||
| 485 | }; | ||
| 486 | typedef Job<expected_data_t> WorkerJob; | ||
| 487 | typedef Job<returned_data_t> CallbackJob; | ||
| 488 | |||
| 489 | /** | ||
| 490 | * Provides a wrapper for initialization data passed to newly spawned worker | ||
| 491 | * threads for initialization. | ||
| 492 | * It contains a pointer to the spawning ConcurrentWorkers master object as | ||
| 493 | * well as a pointer to a context object defined by the concrete worker to be | ||
| 494 | * spawned. | ||
| 495 | */ | ||
| 496 | struct RunBinding { | ||
| 497 | ✗ | explicit RunBinding(ConcurrentWorkers<WorkerT> *delegate) | |
| 498 | ✗ | : delegate(delegate) { } | |
| 499 | ConcurrentWorkers<WorkerT> *delegate; //!< delegate to the Concurrent- | ||
| 500 | //!< Workers master | ||
| 501 | }; | ||
| 502 | |||
| 503 | struct WorkerRunBinding : RunBinding { | ||
| 504 | ✗ | WorkerRunBinding(ConcurrentWorkers<WorkerT> *delegate, | |
| 505 | const worker_context_t *worker_context) | ||
| 506 | ✗ | : RunBinding(delegate), worker_context(worker_context) { } | |
| 507 | /** | ||
| 508 | * WorkerT defined context objects for worker init. | ||
| 509 | */ | ||
| 510 | const worker_context_t *worker_context; | ||
| 511 | }; | ||
| 512 | |||
| 513 | public: | ||
| 514 | /** | ||
| 515 | * Creates a ConcurrentWorkers master object that encapsulates the actual | ||
| 516 | * workers. | ||
| 517 | * | ||
| 518 | * @param number_of_workers the number of concurrent workers to be spawned | ||
| 519 | * @param maximal_queue_length the maximal length of the job queue | ||
| 520 | * (>= number_of_workers) | ||
| 521 | * @param worker_context a pointer to the WorkerT defined context | ||
| 522 | * object | ||
| 523 | */ | ||
| 524 | ConcurrentWorkers(const size_t number_of_workers, | ||
| 525 | const size_t maximal_queue_length, | ||
| 526 | worker_context_t *worker_context = NULL); | ||
| 527 | virtual ~ConcurrentWorkers(); | ||
| 528 | |||
| 529 | /** | ||
| 530 | * Initializes the ConcurrentWorkers swarm, spawnes a thread for each new | ||
| 531 | * worker object and puts everything into working state. | ||
| 532 | * | ||
| 533 | * @return true if all went fine | ||
| 534 | */ | ||
| 535 | bool Initialize(); | ||
| 536 | |||
| 537 | /** | ||
| 538 | * Schedules a new job for processing into the internal job queue. This method | ||
| 539 | * will block in case the job queue is already full and wait for an empty | ||
| 540 | * slot. | ||
| 541 | * | ||
| 542 | * @param data the data to be processed | ||
| 543 | */ | ||
| 544 | ✗ | inline void Schedule(const expected_data_t &data) { | |
| 545 | ✗ | Schedule(WorkerJob(data)); | |
| 546 | } | ||
| 547 | |||
| 548 | /** | ||
| 549 | * Shuts down the ConcurrentWorkers object as well as the encapsulated workers | ||
| 550 | * as soon as possible. Workers will finish their current job and will termi- | ||
| 551 | * nate afterwards. If no jobs are scheduled they will simply stop waiting for | ||
| 552 | * new ones and terminate afterwards. | ||
| 553 | * This method MUST not be called more than once per ConcurrentWorkers. | ||
| 554 | */ | ||
| 555 | void Terminate(); | ||
| 556 | |||
| 557 | /** | ||
| 558 | * Waits until the job queue is fully processed | ||
| 559 | * | ||
| 560 | * Note: this might lead to undefined behaviour or infinite waiting if other | ||
| 561 | * producers still schedule jobs into the job queue. | ||
| 562 | */ | ||
| 563 | void WaitForEmptyQueue() const; | ||
| 564 | |||
| 565 | /** | ||
| 566 | * Waits until the ConcurrentWorkers swarm fully processed the current job | ||
| 567 | * queue and shuts down afterwards. | ||
| 568 | * | ||
| 569 | * Note: just as for WaitForEmptyQueue() this assumes that no other producers | ||
| 570 | * schedule jobs in the mean time. | ||
| 571 | */ | ||
| 572 | void WaitForTermination(); | ||
| 573 | |||
| 574 | ✗ | inline unsigned int GetNumberOfWorkers() const { return number_of_workers_; } | |
| 575 | ✗ | inline unsigned int GetNumberOfFailedJobs() const { | |
| 576 | ✗ | return atomic_read32(&jobs_failed_); | |
| 577 | } | ||
| 578 | |||
| 579 | /** | ||
| 580 | * Defines a job as successfully finished. | ||
| 581 | * DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT! | ||
| 582 | * | ||
| 583 | * @param data the data to be returned back to the user | ||
| 584 | */ | ||
| 585 | ✗ | inline void JobSuccessful(const returned_data_t &data) { | |
| 586 | ✗ | JobDone(data, true); | |
| 587 | } | ||
| 588 | |||
| 589 | /** | ||
| 590 | * Defines a job as failed. | ||
| 591 | * DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT! | ||
| 592 | * | ||
| 593 | * Note: Even for failed jobs the user will get a callback with a data object. | ||
| 594 | * You might want to make sure, that this data contains a status flag as | ||
| 595 | * well, telling the user what went wrong. | ||
| 596 | * | ||
| 597 | * @param data the data to be returned back to the user | ||
| 598 | */ | ||
| 599 | ✗ | inline void JobFailed(const returned_data_t &data) { JobDone(data, false); } | |
| 600 | |||
| 601 | void RunCallbackThread(); | ||
| 602 | |||
| 603 | protected: | ||
| 604 | bool SpawnWorkers(); | ||
| 605 | |||
| 606 | /** | ||
| 607 | * POSIX conform function for thread entry point. Is invoked for every new | ||
| 608 | * worker thread and contains the initialization, processing loop and tear | ||
| 609 | * down of the unique worker objects | ||
| 610 | * | ||
| 611 | * @param run_binding void pointer to a RunBinding structure (C interface) | ||
| 612 | * @return NULL in any case | ||
| 613 | */ | ||
| 614 | static void *RunWorker(void *run_binding); | ||
| 615 | |||
| 616 | static void *RunCallbackThreadWrapper(void *run_binding); | ||
| 617 | |||
| 618 | /** | ||
| 619 | * Tells the master that a worker thread did start. This does not mean, that | ||
| 620 | * it was initialized successfully. | ||
| 621 | */ | ||
| 622 | void ReportStartedWorker() const; | ||
| 623 | |||
| 624 | void Schedule(WorkerJob job); | ||
| 625 | void ScheduleDeathSentences(); | ||
| 626 | |||
| 627 | /** | ||
| 628 | * Empties the job queue | ||
| 629 | * | ||
| 630 | * @param forget_pending controls if cancelled jobs should be seen as | ||
| 631 | * finished | ||
| 632 | */ | ||
| 633 | void TruncateJobQueue(const bool forget_pending = false); | ||
| 634 | |||
| 635 | /** | ||
| 636 | * Retrieves a job from the job queue. If the job queue is empty it will block | ||
| 637 | * until there is a new job available for processing. | ||
| 638 | * THIS METHOD MUST ONLY BE CALLED INSIDE THE WORKER OBJECTS | ||
| 639 | * | ||
| 640 | * @return a job to be processed by a worker | ||
| 641 | */ | ||
| 642 | inline WorkerJob Acquire(); | ||
| 643 | |||
| 644 | /** | ||
| 645 | * Controls the asynchronous finishing of a job. | ||
| 646 | * DO NOT CALL THIS, use JobSuccessful() or JobFailed() wrappers instead. | ||
| 647 | * | ||
| 648 | * @param data the data to be returned to the user | ||
| 649 | * @param success flag if job was successful | ||
| 650 | */ | ||
| 651 | void JobDone(const returned_data_t &data, const bool success = true); | ||
| 652 | |||
| 653 | ✗ | inline void StartRunning() { | |
| 654 | ✗ | MutexLockGuard guard(status_mutex_); | |
| 655 | ✗ | running_ = true; | |
| 656 | } | ||
| 657 | ✗ | inline void StopRunning() { | |
| 658 | ✗ | MutexLockGuard guard(status_mutex_); | |
| 659 | ✗ | running_ = false; | |
| 660 | } | ||
| 661 | ✗ | inline bool IsRunning() const { | |
| 662 | ✗ | MutexLockGuard guard(status_mutex_); | |
| 663 | ✗ | return running_; | |
| 664 | } | ||
| 665 | |||
| 666 | private: | ||
| 667 | // general configuration | ||
| 668 | const size_t number_of_workers_; //!< number of concurrent worker threads | ||
| 669 | const worker_context_t *worker_context_; //!< the WorkerT defined context | ||
| 670 | /** | ||
| 671 | * The thread context passed to newly spawned threads | ||
| 672 | */ | ||
| 673 | WorkerRunBinding thread_context_; | ||
| 674 | |||
| 675 | // status information | ||
| 676 | bool initialized_; | ||
| 677 | bool running_; | ||
| 678 | mutable unsigned int workers_started_; | ||
| 679 | mutable pthread_mutex_t status_mutex_; | ||
| 680 | mutable pthread_cond_t worker_started_; | ||
| 681 | mutable pthread_mutex_t jobs_all_done_mutex_; | ||
| 682 | mutable pthread_cond_t jobs_all_done_; | ||
| 683 | |||
| 684 | // worker threads | ||
| 685 | WorkerThreads worker_threads_; //!< list of worker threads | ||
| 686 | pthread_t callback_thread_; //!< handles callback invokes | ||
| 687 | |||
| 688 | // job queue | ||
| 689 | typedef FifoChannel<WorkerJob> JobQueue; | ||
| 690 | JobQueue jobs_queue_; | ||
| 691 | mutable atomic_int32 jobs_pending_; | ||
| 692 | mutable atomic_int32 jobs_failed_; | ||
| 693 | mutable atomic_int64 jobs_processed_; | ||
| 694 | |||
| 695 | // callback channel | ||
| 696 | typedef FifoChannel<CallbackJob> CallbackQueue; | ||
| 697 | CallbackQueue results_queue_; | ||
| 698 | }; | ||
| 699 | |||
| 700 | |||
| 701 | /** | ||
| 702 | * Base class for worker classes that should be used in a ConcurrentWorkers | ||
| 703 | * swarm. These classes need to fulfill a number of requirements in order to | ||
| 704 | * satisfy the needs of the ConcurrentWorkers template. | ||
| 705 | * | ||
| 706 | * Requirements: | ||
| 707 | * -> needs to define the following types: | ||
| 708 | * - expected_data - input data structure of the worker | ||
| 709 | * - returned_data - output data structure of the worker | ||
| 710 | * - worker_context - context structure for initialization information | ||
| 711 | * | ||
| 712 | * -> implement a constructor that takes a pointer to its worker_context | ||
| 713 | * as its only parameter: | ||
| 714 | * AwesomeWorker(const AwesomeWorker::worker_context*) | ||
| 715 | * Note: do not rely on the context object to be available after the | ||
| 716 | * constructor has returned! | ||
| 717 | * | ||
| 718 | * -> needs to define the calling-operator expecting one parameter of type: | ||
| 719 | * const expected_data& and returning void | ||
| 720 | * This will be invoked for every new job the worker should process | ||
| 721 | * | ||
| 722 | * -> inside the implementation of the described calling-operator it needs to | ||
| 723 | * invoke either: | ||
| 724 | * master()->JobSuccessful(const returned_data&) | ||
| 725 | * or: | ||
| 726 | * master()->JobFailed(const returned_data&) | ||
| 727 | * as its LAST operation before returning. | ||
| 728 | * This will keep track of finished jobs and inform the user of Concurrent- | ||
| 729 | * Workers about finished jobs. | ||
| 730 | * | ||
| 731 | * -> [optional] overwrite Initialize() and/or TearDown() to do environmental | ||
| 732 | * setup work, before or respectively after jobs will be processed | ||
| 733 | * | ||
| 734 | * General Reminder: | ||
| 735 | * You will be running in a multi-threaded environment here! Buckle up and | ||
| 736 | * make suitable preparations to shield yourself from serious head-ache. | ||
| 737 | * | ||
| 738 | * Note: This implements a Curiously Recurring Template Pattern | ||
| 739 | * (http://en.wikipedia.org/wiki/Curiously_recurring_template_pattern) | ||
| 740 | * | ||
| 741 | * @param DerivedWorkerT the class name of the inheriting class | ||
| 742 | * (f.e. class AwesomeWorker : public ConcurrentWorker<AwesomeWorker>) | ||
| 743 | */ | ||
| 744 | template<class DerivedWorkerT> | ||
| 745 | class ConcurrentWorker : SingleCopy { | ||
| 746 | public: | ||
| 747 | ✗ | virtual ~ConcurrentWorker() { } | |
| 748 | |||
| 749 | /** | ||
| 750 | * Does general initialization before any jobs will get scheduled. You do not | ||
| 751 | * need to up-call this initialize method, since it is seen as a dummy here. | ||
| 752 | * | ||
| 753 | * @returns true one successful initialization | ||
| 754 | */ | ||
| 755 | ✗ | virtual bool Initialize() { return true; } | |
| 756 | |||
| 757 | /** | ||
| 758 | * Does general clean-up after the last job was processed in the worker object | ||
| 759 | * and it is about to vanish. You do not need to up-call this method. | ||
| 760 | */ | ||
| 761 | ✗ | virtual void TearDown() { } | |
| 762 | |||
| 763 | /** | ||
| 764 | * The actual job-processing entry point. See the description of the | ||
| 765 | * inheriting class requirements to learn about the semantics of this methods. | ||
| 766 | * DO NOT FORGET TO CALL master()->JobSuccessful() OR master()->JobFinished() | ||
| 767 | * at the end of thismethod!! | ||
| 768 | * | ||
| 769 | * Note: There is no way to generally define this operator, it is therefore | ||
| 770 | * commented out and placed here just as a reminder. | ||
| 771 | * | ||
| 772 | * @param data the data to be processed. | ||
| 773 | */ | ||
| 774 | // void operator()(const expected_data &data); // do the actual job of the | ||
| 775 | // worker | ||
| 776 | |||
| 777 | protected: | ||
| 778 | ✗ | ConcurrentWorker() : master_(NULL) { } | |
| 779 | |||
| 780 | /** | ||
| 781 | * Gets a pointer to the ConcurrentWorkers object that this worker resides in | ||
| 782 | * | ||
| 783 | * @returns a pointer to the ConcurrentWorkers object | ||
| 784 | */ | ||
| 785 | ✗ | inline ConcurrentWorkers<DerivedWorkerT> *master() const { return master_; } | |
| 786 | |||
| 787 | private: | ||
| 788 | friend class ConcurrentWorkers<DerivedWorkerT>; | ||
| 789 | ✗ | void RegisterMaster(ConcurrentWorkers<DerivedWorkerT> *master) { | |
| 790 | ✗ | master_ = master; | |
| 791 | } | ||
| 792 | |||
| 793 | private: | ||
| 794 | ConcurrentWorkers<DerivedWorkerT> *master_; | ||
| 795 | }; | ||
| 796 | |||
| 797 | #ifdef CVMFS_NAMESPACE_GUARD | ||
| 798 | } // namespace CVMFS_NAMESPACE_GUARD | ||
| 799 | #endif | ||
| 800 | |||
| 801 | #include "util/concurrency_impl.h" | ||
| 802 | |||
| 803 | #endif // CVMFS_UTIL_CONCURRENCY_H_ | ||
| 804 |