| Directory: | cvmfs/ |
|---|---|
| File: | cvmfs/util/concurrency_impl.h |
| Date: | 2025-10-19 02:35:28 |
| Exec | Total | Coverage | |
|---|---|---|---|
| Lines: | 119 | 264 | 45.1% |
| Branches: | 54 | 142 | 38.0% |
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | /** | ||
| 2 | * This file is part of the CernVM File System. | ||
| 3 | */ | ||
| 4 | |||
| 5 | #ifndef CVMFS_UTIL_CONCURRENCY_IMPL_H_ | ||
| 6 | #define CVMFS_UTIL_CONCURRENCY_IMPL_H_ | ||
| 7 | |||
| 8 | // clang-format off | ||
| 9 | // Only needed to let clang-tidy see the class definitions. | ||
| 10 | #ifndef CVMFS_UTIL_CONCURRENCY_H_ | ||
| 11 | #include "util/concurrency.h" | ||
| 12 | #endif | ||
| 13 | // clang-format on | ||
| 14 | |||
| 15 | #include <cstddef> | ||
| 16 | |||
| 17 | #include "util/logging.h" | ||
| 18 | |||
| 19 | #ifdef CVMFS_NAMESPACE_GUARD | ||
| 20 | namespace CVMFS_NAMESPACE_GUARD { | ||
| 21 | #endif | ||
| 22 | |||
| 23 | // | ||
| 24 | // +---------------------------------------------------------------------------- | ||
| 25 | // | SynchronizingCounter | ||
| 26 | // | ||
| 27 | |||
| 28 | |||
| 29 | template<typename T> | ||
| 30 | 6221870937 | void SynchronizingCounter<T>::SetValueUnprotected(const T new_value) { | |
| 31 | // make sure that 0 <= new_value <= maximal_value_ if maximal_value_ != 0 | ||
| 32 |
5/6✓ Branch 1 taken 7339238 times.
✓ Branch 2 taken 6214531699 times.
✓ Branch 3 taken 7301170 times.
✓ Branch 4 taken 38068 times.
✓ Branch 5 taken 7301170 times.
✗ Branch 6 not taken.
|
6221870937 | assert(!HasMaximalValue() |
| 33 | || (new_value >= T(0) && new_value <= maximal_value_)); | ||
| 34 | |||
| 35 | 6221870937 | value_ = new_value; | |
| 36 | |||
| 37 |
2/2✓ Branch 0 taken 251474 times.
✓ Branch 1 taken 6221619463 times.
|
6221870937 | if (value_ == T(0)) { |
| 38 | 251474 | pthread_cond_broadcast(&became_zero_); | |
| 39 | } | ||
| 40 | |||
| 41 |
6/6✓ Branch 1 taken 7339238 times.
✓ Branch 2 taken 6214531699 times.
✓ Branch 3 taken 5168510 times.
✓ Branch 4 taken 2170728 times.
✓ Branch 5 taken 5168510 times.
✓ Branch 6 taken 6216702427 times.
|
6221870937 | if (HasMaximalValue() && value_ < maximal_value_) { |
| 42 | 5168510 | pthread_cond_broadcast(&free_slot_); | |
| 43 | } | ||
| 44 | 6221870937 | } | |
| 45 | |||
| 46 | |||
| 47 | template<typename T> | ||
| 48 | 3110935165 | void SynchronizingCounter<T>::WaitForFreeSlotUnprotected() { | |
| 49 |
6/6✓ Branch 1 taken 16045219 times.
✓ Branch 2 taken 3107265656 times.
✓ Branch 3 taken 12375710 times.
✓ Branch 4 taken 3669509 times.
✓ Branch 5 taken 12375710 times.
✓ Branch 6 taken 3110935165 times.
|
3123310875 | while (HasMaximalValue() && value_ >= maximal_value_) { |
| 50 | 12375710 | pthread_cond_wait(&free_slot_, &mutex_); | |
| 51 | } | ||
| 52 |
3/4✓ Branch 1 taken 3669509 times.
✓ Branch 2 taken 3107265656 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 3669509 times.
|
3110935165 | assert(!HasMaximalValue() || value_ < maximal_value_); |
| 53 | 3110935165 | } | |
| 54 | |||
| 55 | |||
| 56 | template<typename T> | ||
| 57 | 3739 | void SynchronizingCounter<T>::Initialize() { | |
| 58 | 3739 | const bool init_successful = (pthread_mutex_init(&mutex_, NULL) == 0 | |
| 59 |
1/2✓ Branch 1 taken 3739 times.
✗ Branch 2 not taken.
|
3739 | && pthread_cond_init(&became_zero_, NULL) == 0 |
| 60 |
2/4✓ Branch 0 taken 3739 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 3739 times.
✗ Branch 4 not taken.
|
7478 | && pthread_cond_init(&free_slot_, NULL) == 0); |
| 61 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3739 times.
|
3739 | assert(init_successful); |
| 62 | 3739 | } | |
| 63 | |||
| 64 | |||
| 65 | template<typename T> | ||
| 66 | 3738 | void SynchronizingCounter<T>::Destroy() { | |
| 67 | 3738 | pthread_mutex_destroy(&mutex_); | |
| 68 | 3738 | pthread_cond_destroy(&became_zero_); | |
| 69 | 3738 | pthread_cond_destroy(&free_slot_); | |
| 70 | 3738 | } | |
| 71 | |||
| 72 | |||
| 73 | // | ||
| 74 | // +---------------------------------------------------------------------------- | ||
| 75 | // | Observable | ||
| 76 | // | ||
| 77 | |||
| 78 | |||
| 79 | template<typename ParamT> | ||
| 80 | 29481244 | Observable<ParamT>::Observable() { | |
| 81 | 29481244 | const int ret = pthread_rwlock_init(&listeners_rw_lock_, NULL); | |
| 82 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 14742187 times.
|
29481244 | assert(ret == 0); |
| 83 | 29481244 | } | |
| 84 | |||
| 85 | |||
| 86 | template<typename ParamT> | ||
| 87 | 29479634 | Observable<ParamT>::~Observable() { | |
| 88 | 29479634 | UnregisterListeners(); | |
| 89 | 29481634 | pthread_rwlock_destroy(&listeners_rw_lock_); | |
| 90 | } | ||
| 91 | |||
| 92 | |||
| 93 | template<typename ParamT> | ||
| 94 | template<class DelegateT, class ClosureDataT> | ||
| 95 | 7346904 | typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener( | |
| 96 | typename BoundClosure<ParamT, DelegateT, ClosureDataT>::CallbackMethod | ||
| 97 | method, | ||
| 98 | DelegateT *delegate, | ||
| 99 | ClosureDataT data) { | ||
| 100 | // create a new BoundClosure, register it and return the handle | ||
| 101 | 7346904 | CallbackBase<ParamT> *callback = Observable<ParamT>::MakeClosure( | |
| 102 | method, delegate, data); | ||
| 103 | 7346904 | RegisterListener(callback); | |
| 104 | 7346904 | return callback; | |
| 105 | } | ||
| 106 | |||
| 107 | |||
| 108 | template<typename ParamT> | ||
| 109 | template<class DelegateT> | ||
| 110 | 27091 | typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener( | |
| 111 | typename BoundCallback<ParamT, DelegateT>::CallbackMethod method, | ||
| 112 | DelegateT *delegate) { | ||
| 113 | // create a new BoundCallback, register it and return the handle | ||
| 114 | 27091 | CallbackBase<ParamT> *callback = Observable<ParamT>::MakeCallback(method, | |
| 115 | delegate); | ||
| 116 | 27091 | RegisterListener(callback); | |
| 117 | 27091 | return callback; | |
| 118 | } | ||
| 119 | |||
| 120 | |||
| 121 | template<typename ParamT> | ||
| 122 | 2450 | typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener( | |
| 123 | typename Callback<ParamT>::CallbackFunction fn) { | ||
| 124 | // create a new Callback, register it and return the handle | ||
| 125 | 2450 | CallbackBase<ParamT> *callback = Observable<ParamT>::MakeCallback(fn); | |
| 126 | 2450 | RegisterListener(callback); | |
| 127 | 2450 | return callback; | |
| 128 | } | ||
| 129 | |||
| 130 | |||
| 131 | template<typename ParamT> | ||
| 132 | 14722363 | void Observable<ParamT>::RegisterListener( | |
| 133 | Observable<ParamT>::CallbackPtr callback_object) { | ||
| 134 | // register a generic CallbackBase callback | ||
| 135 | 14722363 | WriteLockGuard guard(listeners_rw_lock_); | |
| 136 |
1/2✓ Branch 1 taken 7362725 times.
✗ Branch 2 not taken.
|
14722363 | listeners_.insert(callback_object); |
| 137 | 14722363 | } | |
| 138 | |||
| 139 | |||
| 140 | template<typename ParamT> | ||
| 141 | 1893 | void Observable<ParamT>::UnregisterListener( | |
| 142 | typename Observable<ParamT>::CallbackPtr callback_object) { | ||
| 143 | // remove a callback handle from the callbacks list | ||
| 144 | // if it is not registered --> crash | ||
| 145 | 1893 | WriteLockGuard guard(listeners_rw_lock_); | |
| 146 |
1/2✓ Branch 1 taken 1893 times.
✗ Branch 2 not taken.
|
1893 | const size_t was_removed = listeners_.erase(callback_object); |
| 147 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1893 times.
|
1893 | assert(was_removed > 0); |
| 148 |
1/2✓ Branch 0 taken 1893 times.
✗ Branch 1 not taken.
|
1893 | delete callback_object; |
| 149 | 1893 | } | |
| 150 | |||
| 151 | |||
| 152 | template<typename ParamT> | ||
| 153 | 29467199 | void Observable<ParamT>::UnregisterListeners() { | |
| 154 | 29467199 | WriteLockGuard guard(listeners_rw_lock_); | |
| 155 | |||
| 156 | // remove all callbacks from the list | ||
| 157 | 29471439 | typename Callbacks::const_iterator i = listeners_.begin(); | |
| 158 | 29468799 | typename Callbacks::const_iterator iend = listeners_.end(); | |
| 159 |
2/2✓ Branch 2 taken 7359183 times.
✓ Branch 3 taken 14736759 times.
|
44175390 | for (; i != iend; ++i) { |
| 160 |
1/2✓ Branch 1 taken 7359183 times.
✗ Branch 2 not taken.
|
14705791 | delete *i; |
| 161 | } | ||
| 162 | 29459919 | listeners_.clear(); | |
| 163 | 29465439 | } | |
| 164 | |||
| 165 | |||
| 166 | template<typename ParamT> | ||
| 167 | 49657797 | void Observable<ParamT>::NotifyListeners(const ParamT ¶meter) { | |
| 168 | 49657797 | ReadLockGuard guard(listeners_rw_lock_); | |
| 169 | |||
| 170 | // invoke all callbacks and inform them about new data | ||
| 171 | 49657989 | typename Callbacks::const_iterator i = listeners_.begin(); | |
| 172 | 49656637 | typename Callbacks::const_iterator iend = listeners_.end(); | |
| 173 |
2/2✓ Branch 2 taken 25524764 times.
✓ Branch 3 taken 25523818 times.
|
99309198 | for (; i != iend; ++i) { |
| 174 |
1/2✓ Branch 2 taken 25524372 times.
✗ Branch 3 not taken.
|
49653929 | (**i)(parameter); |
| 175 | } | ||
| 176 | 49652037 | } | |
| 177 | |||
| 178 | |||
| 179 | // | ||
| 180 | // +---------------------------------------------------------------------------- | ||
| 181 | // | FifoChannel | ||
| 182 | // | ||
| 183 | |||
| 184 | |||
| 185 | template<class T> | ||
| 186 | 304 | FifoChannel<T>::FifoChannel(const size_t maximal_length, | |
| 187 | const size_t drainout_threshold) | ||
| 188 | 304 | : maximal_queue_length_(maximal_length) | |
| 189 | 304 | , queue_drainout_threshold_(drainout_threshold) { | |
| 190 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 304 times.
|
304 | assert(drainout_threshold <= maximal_length); |
| 191 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 304 times.
|
304 | assert(drainout_threshold > 0); |
| 192 | |||
| 193 | 304 | const bool successful = (pthread_mutex_init(&mutex_, NULL) == 0 | |
| 194 |
1/2✓ Branch 1 taken 304 times.
✗ Branch 2 not taken.
|
304 | && pthread_cond_init(&queue_is_not_empty_, NULL) == 0 |
| 195 |
2/4✓ Branch 0 taken 304 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 304 times.
✗ Branch 4 not taken.
|
608 | && pthread_cond_init(&queue_is_not_full_, NULL) |
| 196 | == 0); | ||
| 197 | |||
| 198 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 304 times.
|
304 | assert(successful); |
| 199 | 304 | } | |
| 200 | |||
| 201 | |||
| 202 | template<class T> | ||
| 203 | 856 | FifoChannel<T>::~FifoChannel() { | |
| 204 | 608 | pthread_cond_destroy(&queue_is_not_empty_); | |
| 205 | 608 | pthread_cond_destroy(&queue_is_not_full_); | |
| 206 | 608 | pthread_mutex_destroy(&mutex_); | |
| 207 | 1464 | } | |
| 208 | |||
| 209 | |||
| 210 | template<class T> | ||
| 211 | 43009162 | void FifoChannel<T>::Enqueue(const T &data) { | |
| 212 | 43009162 | MutexLockGuard lock(mutex_); | |
| 213 | |||
| 214 | // wait for space in the queue | ||
| 215 |
2/2✓ Branch 1 taken 43 times.
✓ Branch 2 taken 43009162 times.
|
43009205 | while (this->size() >= maximal_queue_length_) { |
| 216 |
1/2✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
|
43 | pthread_cond_wait(&queue_is_not_full_, &mutex_); |
| 217 | } | ||
| 218 | |||
| 219 | // put something into the queue | ||
| 220 |
1/2✓ Branch 1 taken 43009162 times.
✗ Branch 2 not taken.
|
43009162 | this->push(data); |
| 221 | |||
| 222 | // wake all waiting threads | ||
| 223 | 43009162 | pthread_cond_broadcast(&queue_is_not_empty_); | |
| 224 | 43009162 | } | |
| 225 | |||
| 226 | |||
| 227 | template<class T> | ||
| 228 | 42777005 | const T FifoChannel<T>::Dequeue() { | |
| 229 | 42777005 | MutexLockGuard lock(mutex_); | |
| 230 | |||
| 231 | // wait until there is something to do | ||
| 232 |
2/2✓ Branch 1 taken 114098014 times.
✓ Branch 2 taken 43004905 times.
|
157102919 | while (this->empty()) { |
| 233 |
1/2✓ Branch 1 taken 114098014 times.
✗ Branch 2 not taken.
|
114098014 | pthread_cond_wait(&queue_is_not_empty_, &mutex_); |
| 234 | } | ||
| 235 | |||
| 236 | // get the item from the queue | ||
| 237 | 43004905 | T data = this->front(); | |
| 238 | 43004905 | this->pop(); | |
| 239 | |||
| 240 | // signal waiting threads about the free space | ||
| 241 |
2/2✓ Branch 1 taken 42990414 times.
✓ Branch 2 taken 14491 times.
|
43004905 | if (this->size() < queue_drainout_threshold_) { |
| 242 | 42990414 | pthread_cond_broadcast(&queue_is_not_full_); | |
| 243 | } | ||
| 244 | |||
| 245 | // return the acquired job | ||
| 246 | 42899555 | return data; | |
| 247 | 43004905 | } | |
| 248 | |||
| 249 | |||
| 250 | template<class T> | ||
| 251 | 43 | unsigned int FifoChannel<T>::Drop() { | |
| 252 | 43 | MutexLockGuard lock(mutex_); | |
| 253 | |||
| 254 | 43 | unsigned int dropped_items = 0; | |
| 255 |
2/2✓ Branch 1 taken 4257 times.
✓ Branch 2 taken 43 times.
|
4300 | while (!this->empty()) { |
| 256 | 4257 | this->pop(); | |
| 257 | 4257 | ++dropped_items; | |
| 258 | } | ||
| 259 | |||
| 260 | 43 | pthread_cond_broadcast(&queue_is_not_full_); | |
| 261 | |||
| 262 | 86 | return dropped_items; | |
| 263 | 43 | } | |
| 264 | |||
| 265 | |||
| 266 | template<class T> | ||
| 267 | 86 | size_t FifoChannel<T>::GetItemCount() const { | |
| 268 | 86 | MutexLockGuard lock(mutex_); | |
| 269 | 86 | return this->size(); | |
| 270 | 86 | } | |
| 271 | |||
| 272 | |||
| 273 | template<class T> | ||
| 274 | 215 | bool FifoChannel<T>::IsEmpty() const { | |
| 275 | 215 | MutexLockGuard lock(mutex_); | |
| 276 | 215 | return this->empty(); | |
| 277 | 215 | } | |
| 278 | |||
| 279 | |||
| 280 | template<class T> | ||
| 281 | 43 | size_t FifoChannel<T>::GetMaximalItemCount() const { | |
| 282 | 43 | return maximal_queue_length_; | |
| 283 | } | ||
| 284 | |||
| 285 | |||
| 286 | // | ||
| 287 | // +---------------------------------------------------------------------------- | ||
| 288 | // | ConcurrentWorkers | ||
| 289 | // | ||
| 290 | |||
| 291 | |||
| 292 | template<class WorkerT> | ||
| 293 | ✗ | ConcurrentWorkers<WorkerT>::ConcurrentWorkers( | |
| 294 | const size_t number_of_workers, | ||
| 295 | const size_t maximal_queue_length, | ||
| 296 | ConcurrentWorkers<WorkerT>::worker_context_t *worker_context) | ||
| 297 | ✗ | : number_of_workers_(number_of_workers) | |
| 298 | ✗ | , worker_context_(worker_context) | |
| 299 | ✗ | , thread_context_(this, worker_context_) | |
| 300 | ✗ | , initialized_(false) | |
| 301 | ✗ | , running_(false) | |
| 302 | ✗ | , workers_started_(0) | |
| 303 | ✗ | , jobs_queue_(maximal_queue_length, maximal_queue_length / 4 + 1) | |
| 304 | ✗ | , results_queue_(maximal_queue_length, 1) { | |
| 305 | ✗ | assert(maximal_queue_length >= number_of_workers); | |
| 306 | ✗ | assert(number_of_workers > 0); | |
| 307 | |||
| 308 | ✗ | atomic_init32(&jobs_pending_); | |
| 309 | ✗ | atomic_init32(&jobs_failed_); | |
| 310 | ✗ | atomic_init64(&jobs_processed_); | |
| 311 | } | ||
| 312 | |||
| 313 | |||
| 314 | template<class WorkerT> | ||
| 315 | ✗ | ConcurrentWorkers<WorkerT>::~ConcurrentWorkers() { | |
| 316 | ✗ | if (IsRunning()) { | |
| 317 | ✗ | Terminate(); | |
| 318 | } | ||
| 319 | |||
| 320 | // destroy some synchronisation data structures | ||
| 321 | ✗ | pthread_cond_destroy(&worker_started_); | |
| 322 | ✗ | pthread_cond_destroy(&jobs_all_done_); | |
| 323 | ✗ | pthread_mutex_destroy(&status_mutex_); | |
| 324 | ✗ | pthread_mutex_destroy(&jobs_all_done_mutex_); | |
| 325 | } | ||
| 326 | |||
| 327 | |||
| 328 | template<class WorkerT> | ||
| 329 | ✗ | bool ConcurrentWorkers<WorkerT>::Initialize() { | |
| 330 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, | |
| 331 | "Initializing ConcurrentWorker " | ||
| 332 | "object with %lu worker threads " | ||
| 333 | "and a queue length of %zu", | ||
| 334 | number_of_workers_, jobs_queue_.GetMaximalItemCount()); | ||
| 335 | // LogCvmfs(kLogConcurrency, kLogStdout, "sizeof(expected_data_t): %d\n" | ||
| 336 | // "sizeof(returned_data_t): %d", | ||
| 337 | // sizeof(expected_data_t), sizeof(returned_data_t)); | ||
| 338 | |||
| 339 | // initialize synchronisation for job queue (Workers) | ||
| 340 | ✗ | if (pthread_mutex_init(&status_mutex_, NULL) != 0 | |
| 341 | ✗ | || pthread_mutex_init(&jobs_all_done_mutex_, NULL) != 0 | |
| 342 | ✗ | || pthread_cond_init(&worker_started_, NULL) != 0 | |
| 343 | ✗ | || pthread_cond_init(&jobs_all_done_, NULL) != 0) { | |
| 344 | ✗ | return false; | |
| 345 | } | ||
| 346 | |||
| 347 | // spawn the Worker objects in their own threads | ||
| 348 | ✗ | if (!SpawnWorkers()) { | |
| 349 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn workers"); | |
| 350 | ✗ | return false; | |
| 351 | } | ||
| 352 | |||
| 353 | // all done... | ||
| 354 | ✗ | initialized_ = true; | |
| 355 | ✗ | return true; | |
| 356 | } | ||
| 357 | |||
| 358 | |||
| 359 | template<class WorkerT> | ||
| 360 | ✗ | bool ConcurrentWorkers<WorkerT>::SpawnWorkers() { | |
| 361 | ✗ | assert(worker_threads_.size() == 0); | |
| 362 | ✗ | worker_threads_.resize(number_of_workers_); | |
| 363 | |||
| 364 | // set the running flag to trap workers in their treadmills | ||
| 365 | ✗ | StartRunning(); | |
| 366 | |||
| 367 | // spawn the swarm and make them work | ||
| 368 | ✗ | bool success = true; | |
| 369 | ✗ | WorkerThreads::iterator i = worker_threads_.begin(); | |
| 370 | ✗ | WorkerThreads::const_iterator iend = worker_threads_.end(); | |
| 371 | ✗ | for (; i != iend; ++i) { | |
| 372 | ✗ | pthread_t *thread = &(*i); | |
| 373 | ✗ | const int retval = pthread_create( | |
| 374 | thread, | ||
| 375 | NULL, | ||
| 376 | &ConcurrentWorkers<WorkerT>::RunWorker, | ||
| 377 | ✗ | reinterpret_cast<void *>(&thread_context_)); | |
| 378 | ✗ | if (retval != 0) { | |
| 379 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn a Worker"); | |
| 380 | ✗ | success = false; | |
| 381 | } | ||
| 382 | } | ||
| 383 | |||
| 384 | // spawn the callback processing thread | ||
| 385 | ✗ | const int retval = pthread_create( | |
| 386 | &callback_thread_, | ||
| 387 | NULL, | ||
| 388 | &ConcurrentWorkers<WorkerT>::RunCallbackThreadWrapper, | ||
| 389 | ✗ | reinterpret_cast<void *>(&thread_context_)); | |
| 390 | ✗ | if (retval != 0) { | |
| 391 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, | |
| 392 | "Failed to spawn the callback " | ||
| 393 | "worker thread"); | ||
| 394 | ✗ | success = false; | |
| 395 | } | ||
| 396 | |||
| 397 | // wait for all workers to report in... | ||
| 398 | { | ||
| 399 | ✗ | MutexLockGuard guard(status_mutex_); | |
| 400 | // +1 -> callback thread | ||
| 401 | ✗ | while (workers_started_ < number_of_workers_ + 1) { | |
| 402 | ✗ | pthread_cond_wait(&worker_started_, &status_mutex_); | |
| 403 | } | ||
| 404 | } | ||
| 405 | |||
| 406 | // all done... | ||
| 407 | ✗ | return success; | |
| 408 | } | ||
| 409 | |||
| 410 | |||
| 411 | template<class WorkerT> | ||
| 412 | ✗ | void *ConcurrentWorkers<WorkerT>::RunWorker(void *run_binding) { | |
| 413 | // NOTE: This is the actual worker thread code! | ||
| 414 | |||
| 415 | // | ||
| 416 | // INITIALIZATION | ||
| 417 | ///////////////// | ||
| 418 | |||
| 419 | // get contextual information | ||
| 420 | ✗ | const WorkerRunBinding &binding = *( | |
| 421 | static_cast<WorkerRunBinding *>(run_binding)); | ||
| 422 | ✗ | ConcurrentWorkers<WorkerT> *master = binding.delegate; | |
| 423 | ✗ | const worker_context_t *worker_context = binding.worker_context; | |
| 424 | |||
| 425 | // boot up the worker object and make sure it works | ||
| 426 | ✗ | WorkerT worker(worker_context); | |
| 427 | ✗ | worker.RegisterMaster(master); | |
| 428 | ✗ | const bool init_success = worker.Initialize(); | |
| 429 | |||
| 430 | // tell the master that this worker was started | ||
| 431 | ✗ | master->ReportStartedWorker(); | |
| 432 | |||
| 433 | ✗ | if (!init_success) { | |
| 434 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, | |
| 435 | "Worker was not initialized " | ||
| 436 | "properly... it will die now!"); | ||
| 437 | ✗ | return NULL; | |
| 438 | } | ||
| 439 | |||
| 440 | // | ||
| 441 | // PROCESSING LOOP | ||
| 442 | ////////////////// | ||
| 443 | |||
| 444 | // start the processing loop | ||
| 445 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Starting Worker..."); | |
| 446 | ✗ | while (master->IsRunning()) { | |
| 447 | // acquire a new job | ||
| 448 | ✗ | WorkerJob job = master->Acquire(); | |
| 449 | |||
| 450 | // check if we need to terminate | ||
| 451 | ✗ | if (job.is_death_sentence) | |
| 452 | ✗ | break; | |
| 453 | |||
| 454 | // do what you are supposed to do | ||
| 455 | ✗ | worker(job.data); | |
| 456 | } | ||
| 457 | |||
| 458 | // | ||
| 459 | // TEAR DOWN | ||
| 460 | //////////// | ||
| 461 | |||
| 462 | // give the worker the chance to tidy up | ||
| 463 | ✗ | worker.TearDown(); | |
| 464 | |||
| 465 | // good bye thread... | ||
| 466 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Terminating Worker..."); | |
| 467 | ✗ | return NULL; | |
| 468 | } | ||
| 469 | |||
| 470 | |||
| 471 | template<class WorkerT> | ||
| 472 | ✗ | void *ConcurrentWorkers<WorkerT>::RunCallbackThreadWrapper(void *run_binding) { | |
| 473 | ✗ | const RunBinding &binding = *(static_cast<RunBinding *>(run_binding)); | |
| 474 | ✗ | ConcurrentWorkers<WorkerT> *master = binding.delegate; | |
| 475 | |||
| 476 | ✗ | master->ReportStartedWorker(); | |
| 477 | |||
| 478 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, | |
| 479 | "Started dedicated callback worker"); | ||
| 480 | ✗ | master->RunCallbackThread(); | |
| 481 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Terminating Callback Worker..."); | |
| 482 | |||
| 483 | ✗ | return NULL; | |
| 484 | } | ||
| 485 | |||
| 486 | |||
| 487 | template<class WorkerT> | ||
| 488 | ✗ | void ConcurrentWorkers<WorkerT>::RunCallbackThread() { | |
| 489 | ✗ | while (IsRunning()) { | |
| 490 | ✗ | const CallbackJob callback_job = results_queue_.Dequeue(); | |
| 491 | |||
| 492 | // stop callback processing if needed | ||
| 493 | ✗ | if (callback_job.is_death_sentence) { | |
| 494 | ✗ | break; | |
| 495 | } | ||
| 496 | |||
| 497 | // notify all observers about the finished job | ||
| 498 | ✗ | this->NotifyListeners(callback_job.data); | |
| 499 | |||
| 500 | // remove the job from the pending 'list' and add it to the ready 'list' | ||
| 501 | ✗ | atomic_dec32(&jobs_pending_); | |
| 502 | ✗ | atomic_inc64(&jobs_processed_); | |
| 503 | |||
| 504 | // signal the Spooler that all jobs are done... | ||
| 505 | ✗ | if (atomic_read32(&jobs_pending_) == 0) { | |
| 506 | ✗ | pthread_cond_broadcast(&jobs_all_done_); | |
| 507 | } | ||
| 508 | } | ||
| 509 | } | ||
| 510 | |||
| 511 | |||
| 512 | template<class WorkerT> | ||
| 513 | ✗ | void ConcurrentWorkers<WorkerT>::ReportStartedWorker() const { | |
| 514 | ✗ | MutexLockGuard lock(status_mutex_); | |
| 515 | ✗ | ++workers_started_; | |
| 516 | ✗ | pthread_cond_signal(&worker_started_); | |
| 517 | } | ||
| 518 | |||
| 519 | |||
| 520 | template<class WorkerT> | ||
| 521 | ✗ | void ConcurrentWorkers<WorkerT>::Schedule(WorkerJob job) { | |
| 522 | // Note: This method can be called from arbitrary threads. Thus we do not | ||
| 523 | // necessarily have just one producer in the system. | ||
| 524 | |||
| 525 | // check if it makes sense to schedule this job | ||
| 526 | ✗ | if (!IsRunning() && !job.is_death_sentence) { | |
| 527 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, | |
| 528 | "Tried to schedule a job but " | ||
| 529 | "concurrency was not running..."); | ||
| 530 | ✗ | return; | |
| 531 | } | ||
| 532 | |||
| 533 | ✗ | jobs_queue_.Enqueue(job); | |
| 534 | ✗ | if (!job.is_death_sentence) { | |
| 535 | ✗ | atomic_inc32(&jobs_pending_); | |
| 536 | } | ||
| 537 | } | ||
| 538 | |||
| 539 | |||
| 540 | template<class WorkerT> | ||
| 541 | ✗ | void ConcurrentWorkers<WorkerT>::ScheduleDeathSentences() { | |
| 542 | ✗ | assert(!IsRunning()); | |
| 543 | |||
| 544 | // make sure that the queue is empty before we schedule a death sentence | ||
| 545 | ✗ | TruncateJobQueue(); | |
| 546 | |||
| 547 | // schedule a death sentence for each running thread | ||
| 548 | ✗ | const unsigned int number_of_workers = GetNumberOfWorkers(); | |
| 549 | ✗ | for (unsigned int i = 0; i < number_of_workers; ++i) { | |
| 550 | ✗ | Schedule(WorkerJob()); | |
| 551 | } | ||
| 552 | |||
| 553 | // schedule a death sentence for the callback thread | ||
| 554 | ✗ | results_queue_.Enqueue(CallbackJob()); | |
| 555 | } | ||
| 556 | |||
| 557 | |||
| 558 | template<class WorkerT> | ||
| 559 | typename ConcurrentWorkers<WorkerT>::WorkerJob | ||
| 560 | ✗ | ConcurrentWorkers<WorkerT>::Acquire() { | |
| 561 | // Note: This method is exclusively called inside the worker threads! | ||
| 562 | // Any other usage might produce undefined behavior. | ||
| 563 | ✗ | return jobs_queue_.Dequeue(); | |
| 564 | } | ||
| 565 | |||
| 566 | |||
| 567 | template<class WorkerT> | ||
| 568 | ✗ | void ConcurrentWorkers<WorkerT>::TruncateJobQueue(const bool forget_pending) { | |
| 569 | // Note: This method will throw away all jobs currently waiting in the job | ||
| 570 | // queue. These jobs will not be processed! | ||
| 571 | ✗ | const unsigned int dropped_jobs = jobs_queue_.Drop(); | |
| 572 | |||
| 573 | // if desired, we remove the jobs from the pending 'list' | ||
| 574 | ✗ | if (forget_pending) { | |
| 575 | ✗ | atomic_xadd32(&jobs_pending_, -dropped_jobs); | |
| 576 | } | ||
| 577 | } | ||
| 578 | |||
| 579 | |||
| 580 | template<class WorkerT> | ||
| 581 | ✗ | void ConcurrentWorkers<WorkerT>::Terminate() { | |
| 582 | // Note: this method causes workers to die immediately after they finished | ||
| 583 | // their last acquired job. To make sure that each worker will check | ||
| 584 | // the running state, we schedule empty jobs or Death Sentences. | ||
| 585 | |||
| 586 | ✗ | assert(IsRunning()); | |
| 587 | |||
| 588 | // unset the running flag (causing threads to die on the next checkpoint) | ||
| 589 | ✗ | StopRunning(); | |
| 590 | |||
| 591 | // schedule empty jobs to make sure that each worker will actually reach the | ||
| 592 | // next checkpoint in their processing loop and terminate as expected | ||
| 593 | ✗ | ScheduleDeathSentences(); | |
| 594 | |||
| 595 | // wait for the worker threads to return | ||
| 596 | ✗ | WorkerThreads::const_iterator i = worker_threads_.begin(); | |
| 597 | ✗ | WorkerThreads::const_iterator iend = worker_threads_.end(); | |
| 598 | ✗ | for (; i != iend; ++i) { | |
| 599 | ✗ | pthread_join(*i, NULL); | |
| 600 | } | ||
| 601 | |||
| 602 | // wait for the callback worker thread | ||
| 603 | ✗ | pthread_join(callback_thread_, NULL); | |
| 604 | |||
| 605 | // check if we finished all pending jobs | ||
| 606 | ✗ | const int pending = atomic_read32(&jobs_pending_); | |
| 607 | ✗ | if (pending > 0) { | |
| 608 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, | |
| 609 | "Job queue was not fully processed. " | ||
| 610 | "Still %d jobs were pending and " | ||
| 611 | "will not be executed anymore.", | ||
| 612 | pending); | ||
| 613 | } | ||
| 614 | |||
| 615 | // check if we had failed jobs | ||
| 616 | ✗ | const int failed = atomic_read32(&jobs_failed_); | |
| 617 | ✗ | if (failed > 0) { | |
| 618 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, "We've had %d failed jobs.", failed); | |
| 619 | } | ||
| 620 | |||
| 621 | // thanks, and good bye... | ||
| 622 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, | |
| 623 | "All workers stopped. They processed %ld jobs. Terminating...", | ||
| 624 | atomic_read64(&jobs_processed_)); | ||
| 625 | } | ||
| 626 | |||
| 627 | |||
| 628 | template<class WorkerT> | ||
| 629 | ✗ | void ConcurrentWorkers<WorkerT>::WaitForEmptyQueue() const { | |
| 630 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, | |
| 631 | "Waiting for %d jobs to be finished", atomic_read32(&jobs_pending_)); | ||
| 632 | |||
| 633 | // wait until all pending jobs are processed | ||
| 634 | { | ||
| 635 | ✗ | MutexLockGuard lock(jobs_all_done_mutex_); | |
| 636 | ✗ | while (atomic_read32(&jobs_pending_) > 0) { | |
| 637 | ✗ | pthread_cond_wait(&jobs_all_done_, &jobs_all_done_mutex_); | |
| 638 | } | ||
| 639 | } | ||
| 640 | |||
| 641 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Jobs are done... go on"); | |
| 642 | } | ||
| 643 | |||
| 644 | |||
| 645 | template<class WorkerT> | ||
| 646 | void ConcurrentWorkers<WorkerT>::WaitForTermination() { | ||
| 647 | if (!IsRunning()) | ||
| 648 | return; | ||
| 649 | |||
| 650 | WaitForEmptyQueue(); | ||
| 651 | Terminate(); | ||
| 652 | } | ||
| 653 | |||
| 654 | |||
| 655 | template<class WorkerT> | ||
| 656 | ✗ | void ConcurrentWorkers<WorkerT>::JobDone( | |
| 657 | const ConcurrentWorkers<WorkerT>::returned_data_t &data, | ||
| 658 | const bool success) { | ||
| 659 | // BEWARE! | ||
| 660 | // This is a callback method that might be called from a different thread! | ||
| 661 | |||
| 662 | // check if the finished job was successful | ||
| 663 | ✗ | if (!success) { | |
| 664 | ✗ | atomic_inc32(&jobs_failed_); | |
| 665 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, "Job failed"); | |
| 666 | } | ||
| 667 | |||
| 668 | // queue the result in the callback channel | ||
| 669 | ✗ | results_queue_.Enqueue(CallbackJob(data)); | |
| 670 | } | ||
| 671 | |||
| 672 | #ifdef CVMFS_NAMESPACE_GUARD | ||
| 673 | } // namespace CVMFS_NAMESPACE_GUARD | ||
| 674 | #endif | ||
| 675 | |||
| 676 | #endif // CVMFS_UTIL_CONCURRENCY_IMPL_H_ | ||
| 677 |