Directory: | cvmfs/ |
---|---|
File: | cvmfs/util/concurrency_impl.h |
Date: | 2025-07-06 02:35:01 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 119 | 264 | 45.1% |
Branches: | 54 | 142 | 38.0% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #ifndef CVMFS_UTIL_CONCURRENCY_IMPL_H_ | ||
6 | #define CVMFS_UTIL_CONCURRENCY_IMPL_H_ | ||
7 | |||
8 | // clang-format off | ||
9 | // Only needed to let clang-tidy see the class definitions. | ||
10 | #ifndef CVMFS_UTIL_CONCURRENCY_H_ | ||
11 | #include "util/concurrency.h" | ||
12 | #endif | ||
13 | // clang-format on | ||
14 | |||
15 | #include <cstddef> | ||
16 | |||
17 | #include "util/logging.h" | ||
18 | |||
19 | #ifdef CVMFS_NAMESPACE_GUARD | ||
20 | namespace CVMFS_NAMESPACE_GUARD { | ||
21 | #endif | ||
22 | |||
23 | // | ||
24 | // +---------------------------------------------------------------------------- | ||
25 | // | SynchronizingCounter | ||
26 | // | ||
27 | |||
28 | |||
29 | template<typename T> | ||
30 | 4374394421 | void SynchronizingCounter<T>::SetValueUnprotected(const T new_value) { | |
31 | // make sure that 0 <= new_value <= maximal_value_ if maximal_value_ != 0 | ||
32 |
5/6✓ Branch 1 taken 38674631 times.
✓ Branch 2 taken 4335719790 times.
✓ Branch 3 taken 38637791 times.
✓ Branch 4 taken 36840 times.
✓ Branch 5 taken 38637791 times.
✗ Branch 6 not taken.
|
4374394421 | assert(!HasMaximalValue() |
33 | || (new_value >= T(0) && new_value <= maximal_value_)); | ||
34 | |||
35 | 4374394421 | value_ = new_value; | |
36 | |||
37 |
2/2✓ Branch 0 taken 1339250 times.
✓ Branch 1 taken 4373055171 times.
|
4374394421 | if (value_ == T(0)) { |
38 | 1339250 | pthread_cond_broadcast(&became_zero_); | |
39 | } | ||
40 | |||
41 |
6/6✓ Branch 1 taken 38674631 times.
✓ Branch 2 taken 4335719790 times.
✓ Branch 3 taken 22560706 times.
✓ Branch 4 taken 16113925 times.
✓ Branch 5 taken 22560706 times.
✓ Branch 6 taken 4351833715 times.
|
4374394421 | if (HasMaximalValue() && value_ < maximal_value_) { |
42 | 22560706 | pthread_cond_broadcast(&free_slot_); | |
43 | } | ||
44 | 4374394421 | } | |
45 | |||
46 | |||
47 | template<typename T> | ||
48 | 2187196983 | void SynchronizingCounter<T>::WaitForFreeSlotUnprotected() { | |
49 |
6/6✓ Branch 1 taken 114664137 times.
✓ Branch 2 taken 2167859760 times.
✓ Branch 3 taken 95326914 times.
✓ Branch 4 taken 19337223 times.
✓ Branch 5 taken 95326914 times.
✓ Branch 6 taken 2187196983 times.
|
2282523897 | while (HasMaximalValue() && value_ >= maximal_value_) { |
50 | 95326914 | pthread_cond_wait(&free_slot_, &mutex_); | |
51 | } | ||
52 |
3/4✓ Branch 1 taken 19337223 times.
✓ Branch 2 taken 2167859760 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 19337223 times.
|
2187196983 | assert(!HasMaximalValue() || value_ < maximal_value_); |
53 | 2187196983 | } | |
54 | |||
55 | |||
56 | template<typename T> | ||
57 | 3205 | void SynchronizingCounter<T>::Initialize() { | |
58 | 3205 | const bool init_successful = (pthread_mutex_init(&mutex_, NULL) == 0 | |
59 |
1/2✓ Branch 1 taken 3205 times.
✗ Branch 2 not taken.
|
3205 | && pthread_cond_init(&became_zero_, NULL) == 0 |
60 |
2/4✓ Branch 0 taken 3205 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 3205 times.
✗ Branch 4 not taken.
|
6410 | && pthread_cond_init(&free_slot_, NULL) == 0); |
61 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3205 times.
|
3205 | assert(init_successful); |
62 | 3205 | } | |
63 | |||
64 | |||
65 | template<typename T> | ||
66 | 3204 | void SynchronizingCounter<T>::Destroy() { | |
67 | 3204 | pthread_mutex_destroy(&mutex_); | |
68 | 3204 | pthread_cond_destroy(&became_zero_); | |
69 | 3204 | pthread_cond_destroy(&free_slot_); | |
70 | 3204 | } | |
71 | |||
72 | |||
73 | // | ||
74 | // +---------------------------------------------------------------------------- | ||
75 | // | Observable | ||
76 | // | ||
77 | |||
78 | |||
79 | template<typename ParamT> | ||
80 | 28748222 | Observable<ParamT>::Observable() { | |
81 | 28748222 | const int ret = pthread_rwlock_init(&listeners_rw_lock_, NULL); | |
82 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 14375221 times.
|
28748222 | assert(ret == 0); |
83 | 28748222 | } | |
84 | |||
85 | |||
86 | template<typename ParamT> | ||
87 | 28748550 | Observable<ParamT>::~Observable() { | |
88 | 28748550 | UnregisterListeners(); | |
89 | 28747146 | pthread_rwlock_destroy(&listeners_rw_lock_); | |
90 | } | ||
91 | |||
92 | |||
93 | template<typename ParamT> | ||
94 | template<class DelegateT, class ClosureDataT> | ||
95 | 7163059 | typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener( | |
96 | typename BoundClosure<ParamT, DelegateT, ClosureDataT>::CallbackMethod | ||
97 | method, | ||
98 | DelegateT *delegate, | ||
99 | ClosureDataT data) { | ||
100 | // create a new BoundClosure, register it and return the handle | ||
101 | 7163059 | CallbackBase<ParamT> *callback = Observable<ParamT>::MakeClosure( | |
102 | method, delegate, data); | ||
103 | 7163059 | RegisterListener(callback); | |
104 | 7163059 | return callback; | |
105 | } | ||
106 | |||
107 | |||
108 | template<typename ParamT> | ||
109 | template<class DelegateT> | ||
110 | 27692 | typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener( | |
111 | typename BoundCallback<ParamT, DelegateT>::CallbackMethod method, | ||
112 | DelegateT *delegate) { | ||
113 | // create a new BoundCallback, register it and return the handle | ||
114 | 27692 | CallbackBase<ParamT> *callback = Observable<ParamT>::MakeCallback(method, | |
115 | delegate); | ||
116 | 27692 | RegisterListener(callback); | |
117 | 27692 | return callback; | |
118 | } | ||
119 | |||
120 | |||
121 | template<typename ParamT> | ||
122 | 2610 | typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener( | |
123 | typename Callback<ParamT>::CallbackFunction fn) { | ||
124 | // create a new Callback, register it and return the handle | ||
125 | 2610 | CallbackBase<ParamT> *callback = Observable<ParamT>::MakeCallback(fn); | |
126 | 2610 | RegisterListener(callback); | |
127 | 2610 | return callback; | |
128 | } | ||
129 | |||
130 | |||
131 | template<typename ParamT> | ||
132 | 14356464 | void Observable<ParamT>::RegisterListener( | |
133 | Observable<ParamT>::CallbackPtr callback_object) { | ||
134 | // register a generic CallbackBase callback | ||
135 | 14356464 | WriteLockGuard guard(listeners_rw_lock_); | |
136 |
1/2✓ Branch 1 taken 7179327 times.
✗ Branch 2 not taken.
|
14356464 | listeners_.insert(callback_object); |
137 | 14356464 | } | |
138 | |||
139 | |||
140 | template<typename ParamT> | ||
141 | 1485 | void Observable<ParamT>::UnregisterListener( | |
142 | typename Observable<ParamT>::CallbackPtr callback_object) { | ||
143 | // remove a callback handle from the callbacks list | ||
144 | // if it is not registered --> crash | ||
145 | 1485 | WriteLockGuard guard(listeners_rw_lock_); | |
146 |
1/2✓ Branch 1 taken 1485 times.
✗ Branch 2 not taken.
|
1485 | const size_t was_removed = listeners_.erase(callback_object); |
147 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1485 times.
|
1485 | assert(was_removed > 0); |
148 |
1/2✓ Branch 0 taken 1485 times.
✗ Branch 1 not taken.
|
1485 | delete callback_object; |
149 | 1485 | } | |
150 | |||
151 | |||
152 | template<typename ParamT> | ||
153 | 28734856 | void Observable<ParamT>::UnregisterListeners() { | |
154 | 28734856 | WriteLockGuard guard(listeners_rw_lock_); | |
155 | |||
156 | // remove all callbacks from the list | ||
157 | 28736104 | typename Callbacks::const_iterator i = listeners_.begin(); | |
158 | 28733842 | typename Callbacks::const_iterator iend = listeners_.end(); | |
159 |
2/2✓ Branch 2 taken 7176078 times.
✓ Branch 3 taken 14372888 times.
|
43073412 | for (; i != iend; ++i) { |
160 |
1/2✓ Branch 1 taken 7175844 times.
✗ Branch 2 not taken.
|
14338790 | delete *i; |
161 | } | ||
162 | 28731190 | listeners_.clear(); | |
163 | 28732048 | } | |
164 | |||
165 | |||
166 | template<typename ParamT> | ||
167 | 74146828 | void Observable<ParamT>::NotifyListeners(const ParamT ¶meter) { | |
168 | 74146828 | ReadLockGuard guard(listeners_rw_lock_); | |
169 | |||
170 | // invoke all callbacks and inform them about new data | ||
171 | 74145124 | typename Callbacks::const_iterator i = listeners_.begin(); | |
172 | 74139340 | typename Callbacks::const_iterator iend = listeners_.end(); | |
173 |
2/2✓ Branch 2 taken 37559393 times.
✓ Branch 3 taken 37549559 times.
|
148272242 | for (; i != iend; ++i) { |
174 |
1/2✓ Branch 2 taken 37559468 times.
✗ Branch 3 not taken.
|
74134276 | (**i)(parameter); |
175 | } | ||
176 | 74114608 | } | |
177 | |||
178 | |||
179 | // | ||
180 | // +---------------------------------------------------------------------------- | ||
181 | // | FifoChannel | ||
182 | // | ||
183 | |||
184 | |||
185 | template<class T> | ||
186 | 250 | FifoChannel<T>::FifoChannel(const size_t maximal_length, | |
187 | const size_t drainout_threshold) | ||
188 | 250 | : maximal_queue_length_(maximal_length) | |
189 | 250 | , queue_drainout_threshold_(drainout_threshold) { | |
190 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 250 times.
|
250 | assert(drainout_threshold <= maximal_length); |
191 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 250 times.
|
250 | assert(drainout_threshold > 0); |
192 | |||
193 | 250 | const bool successful = (pthread_mutex_init(&mutex_, NULL) == 0 | |
194 |
1/2✓ Branch 1 taken 250 times.
✗ Branch 2 not taken.
|
250 | && pthread_cond_init(&queue_is_not_empty_, NULL) == 0 |
195 |
2/4✓ Branch 0 taken 250 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 250 times.
✗ Branch 4 not taken.
|
500 | && pthread_cond_init(&queue_is_not_full_, NULL) |
196 | == 0); | ||
197 | |||
198 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 250 times.
|
250 | assert(successful); |
199 | 250 | } | |
200 | |||
201 | |||
202 | template<class T> | ||
203 | 740 | FifoChannel<T>::~FifoChannel() { | |
204 | 500 | pthread_cond_destroy(&queue_is_not_empty_); | |
205 | 500 | pthread_cond_destroy(&queue_is_not_full_); | |
206 | 500 | pthread_mutex_destroy(&mutex_); | |
207 | 1240 | } | |
208 | |||
209 | |||
210 | template<class T> | ||
211 | 5001280 | void FifoChannel<T>::Enqueue(const T &data) { | |
212 | 5001280 | MutexLockGuard lock(mutex_); | |
213 | |||
214 | // wait for space in the queue | ||
215 |
2/2✓ Branch 1 taken 5 times.
✓ Branch 2 taken 5001280 times.
|
5001285 | while (this->size() >= maximal_queue_length_) { |
216 |
1/2✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
|
5 | pthread_cond_wait(&queue_is_not_full_, &mutex_); |
217 | } | ||
218 | |||
219 | // put something into the queue | ||
220 |
1/2✓ Branch 1 taken 5001280 times.
✗ Branch 2 not taken.
|
5001280 | this->push(data); |
221 | |||
222 | // wake all waiting threads | ||
223 | 5001280 | pthread_cond_broadcast(&queue_is_not_empty_); | |
224 | 5001280 | } | |
225 | |||
226 | |||
227 | template<class T> | ||
228 | 4954255 | const T FifoChannel<T>::Dequeue() { | |
229 | 4954255 | MutexLockGuard lock(mutex_); | |
230 | |||
231 | // wait until there is something to do | ||
232 |
2/2✓ Branch 1 taken 14385625 times.
✓ Branch 2 taken 5000785 times.
|
19386410 | while (this->empty()) { |
233 |
1/2✓ Branch 1 taken 14385625 times.
✗ Branch 2 not taken.
|
14385625 | pthread_cond_wait(&queue_is_not_empty_, &mutex_); |
234 | } | ||
235 | |||
236 | // get the item from the queue | ||
237 | 5000785 | T data = this->front(); | |
238 | 5000785 | this->pop(); | |
239 | |||
240 | // signal waiting threads about the free space | ||
241 |
2/2✓ Branch 1 taken 4999210 times.
✓ Branch 2 taken 1575 times.
|
5000785 | if (this->size() < queue_drainout_threshold_) { |
242 | 4999210 | pthread_cond_broadcast(&queue_is_not_full_); | |
243 | } | ||
244 | |||
245 | // return the acquired job | ||
246 | 4958000 | return data; | |
247 | 5000785 | } | |
248 | |||
249 | |||
250 | template<class T> | ||
251 | 5 | unsigned int FifoChannel<T>::Drop() { | |
252 | 5 | MutexLockGuard lock(mutex_); | |
253 | |||
254 | 5 | unsigned int dropped_items = 0; | |
255 |
2/2✓ Branch 1 taken 495 times.
✓ Branch 2 taken 5 times.
|
500 | while (!this->empty()) { |
256 | 495 | this->pop(); | |
257 | 495 | ++dropped_items; | |
258 | } | ||
259 | |||
260 | 5 | pthread_cond_broadcast(&queue_is_not_full_); | |
261 | |||
262 | 10 | return dropped_items; | |
263 | 5 | } | |
264 | |||
265 | |||
266 | template<class T> | ||
267 | 10 | size_t FifoChannel<T>::GetItemCount() const { | |
268 | 10 | MutexLockGuard lock(mutex_); | |
269 | 10 | return this->size(); | |
270 | 10 | } | |
271 | |||
272 | |||
273 | template<class T> | ||
274 | 25 | bool FifoChannel<T>::IsEmpty() const { | |
275 | 25 | MutexLockGuard lock(mutex_); | |
276 | 25 | return this->empty(); | |
277 | 25 | } | |
278 | |||
279 | |||
280 | template<class T> | ||
281 | 5 | size_t FifoChannel<T>::GetMaximalItemCount() const { | |
282 | 5 | return maximal_queue_length_; | |
283 | } | ||
284 | |||
285 | |||
286 | // | ||
287 | // +---------------------------------------------------------------------------- | ||
288 | // | ConcurrentWorkers | ||
289 | // | ||
290 | |||
291 | |||
292 | template<class WorkerT> | ||
293 | ✗ | ConcurrentWorkers<WorkerT>::ConcurrentWorkers( | |
294 | const size_t number_of_workers, | ||
295 | const size_t maximal_queue_length, | ||
296 | ConcurrentWorkers<WorkerT>::worker_context_t *worker_context) | ||
297 | ✗ | : number_of_workers_(number_of_workers) | |
298 | ✗ | , worker_context_(worker_context) | |
299 | ✗ | , thread_context_(this, worker_context_) | |
300 | ✗ | , initialized_(false) | |
301 | ✗ | , running_(false) | |
302 | ✗ | , workers_started_(0) | |
303 | ✗ | , jobs_queue_(maximal_queue_length, maximal_queue_length / 4 + 1) | |
304 | ✗ | , results_queue_(maximal_queue_length, 1) { | |
305 | ✗ | assert(maximal_queue_length >= number_of_workers); | |
306 | ✗ | assert(number_of_workers > 0); | |
307 | |||
308 | ✗ | atomic_init32(&jobs_pending_); | |
309 | ✗ | atomic_init32(&jobs_failed_); | |
310 | ✗ | atomic_init64(&jobs_processed_); | |
311 | } | ||
312 | |||
313 | |||
314 | template<class WorkerT> | ||
315 | ✗ | ConcurrentWorkers<WorkerT>::~ConcurrentWorkers() { | |
316 | ✗ | if (IsRunning()) { | |
317 | ✗ | Terminate(); | |
318 | } | ||
319 | |||
320 | // destroy some synchronisation data structures | ||
321 | ✗ | pthread_cond_destroy(&worker_started_); | |
322 | ✗ | pthread_cond_destroy(&jobs_all_done_); | |
323 | ✗ | pthread_mutex_destroy(&status_mutex_); | |
324 | ✗ | pthread_mutex_destroy(&jobs_all_done_mutex_); | |
325 | } | ||
326 | |||
327 | |||
328 | template<class WorkerT> | ||
329 | ✗ | bool ConcurrentWorkers<WorkerT>::Initialize() { | |
330 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, | |
331 | "Initializing ConcurrentWorker " | ||
332 | "object with %lu worker threads " | ||
333 | "and a queue length of %zu", | ||
334 | number_of_workers_, jobs_queue_.GetMaximalItemCount()); | ||
335 | // LogCvmfs(kLogConcurrency, kLogStdout, "sizeof(expected_data_t): %d\n" | ||
336 | // "sizeof(returned_data_t): %d", | ||
337 | // sizeof(expected_data_t), sizeof(returned_data_t)); | ||
338 | |||
339 | // initialize synchronisation for job queue (Workers) | ||
340 | ✗ | if (pthread_mutex_init(&status_mutex_, NULL) != 0 | |
341 | ✗ | || pthread_mutex_init(&jobs_all_done_mutex_, NULL) != 0 | |
342 | ✗ | || pthread_cond_init(&worker_started_, NULL) != 0 | |
343 | ✗ | || pthread_cond_init(&jobs_all_done_, NULL) != 0) { | |
344 | ✗ | return false; | |
345 | } | ||
346 | |||
347 | // spawn the Worker objects in their own threads | ||
348 | ✗ | if (!SpawnWorkers()) { | |
349 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn workers"); | |
350 | ✗ | return false; | |
351 | } | ||
352 | |||
353 | // all done... | ||
354 | ✗ | initialized_ = true; | |
355 | ✗ | return true; | |
356 | } | ||
357 | |||
358 | |||
359 | template<class WorkerT> | ||
360 | ✗ | bool ConcurrentWorkers<WorkerT>::SpawnWorkers() { | |
361 | ✗ | assert(worker_threads_.size() == 0); | |
362 | ✗ | worker_threads_.resize(number_of_workers_); | |
363 | |||
364 | // set the running flag to trap workers in their treadmills | ||
365 | ✗ | StartRunning(); | |
366 | |||
367 | // spawn the swarm and make them work | ||
368 | ✗ | bool success = true; | |
369 | ✗ | WorkerThreads::iterator i = worker_threads_.begin(); | |
370 | ✗ | WorkerThreads::const_iterator iend = worker_threads_.end(); | |
371 | ✗ | for (; i != iend; ++i) { | |
372 | ✗ | pthread_t *thread = &(*i); | |
373 | ✗ | const int retval = pthread_create( | |
374 | thread, | ||
375 | NULL, | ||
376 | &ConcurrentWorkers<WorkerT>::RunWorker, | ||
377 | ✗ | reinterpret_cast<void *>(&thread_context_)); | |
378 | ✗ | if (retval != 0) { | |
379 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn a Worker"); | |
380 | ✗ | success = false; | |
381 | } | ||
382 | } | ||
383 | |||
384 | // spawn the callback processing thread | ||
385 | ✗ | const int retval = pthread_create( | |
386 | &callback_thread_, | ||
387 | NULL, | ||
388 | &ConcurrentWorkers<WorkerT>::RunCallbackThreadWrapper, | ||
389 | ✗ | reinterpret_cast<void *>(&thread_context_)); | |
390 | ✗ | if (retval != 0) { | |
391 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, | |
392 | "Failed to spawn the callback " | ||
393 | "worker thread"); | ||
394 | ✗ | success = false; | |
395 | } | ||
396 | |||
397 | // wait for all workers to report in... | ||
398 | { | ||
399 | ✗ | MutexLockGuard guard(status_mutex_); | |
400 | // +1 -> callback thread | ||
401 | ✗ | while (workers_started_ < number_of_workers_ + 1) { | |
402 | ✗ | pthread_cond_wait(&worker_started_, &status_mutex_); | |
403 | } | ||
404 | } | ||
405 | |||
406 | // all done... | ||
407 | ✗ | return success; | |
408 | } | ||
409 | |||
410 | |||
411 | template<class WorkerT> | ||
412 | ✗ | void *ConcurrentWorkers<WorkerT>::RunWorker(void *run_binding) { | |
413 | // NOTE: This is the actual worker thread code! | ||
414 | |||
415 | // | ||
416 | // INITIALIZATION | ||
417 | ///////////////// | ||
418 | |||
419 | // get contextual information | ||
420 | ✗ | const WorkerRunBinding &binding = *( | |
421 | static_cast<WorkerRunBinding *>(run_binding)); | ||
422 | ✗ | ConcurrentWorkers<WorkerT> *master = binding.delegate; | |
423 | ✗ | const worker_context_t *worker_context = binding.worker_context; | |
424 | |||
425 | // boot up the worker object and make sure it works | ||
426 | ✗ | WorkerT worker(worker_context); | |
427 | ✗ | worker.RegisterMaster(master); | |
428 | ✗ | const bool init_success = worker.Initialize(); | |
429 | |||
430 | // tell the master that this worker was started | ||
431 | ✗ | master->ReportStartedWorker(); | |
432 | |||
433 | ✗ | if (!init_success) { | |
434 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, | |
435 | "Worker was not initialized " | ||
436 | "properly... it will die now!"); | ||
437 | ✗ | return NULL; | |
438 | } | ||
439 | |||
440 | // | ||
441 | // PROCESSING LOOP | ||
442 | ////////////////// | ||
443 | |||
444 | // start the processing loop | ||
445 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Starting Worker..."); | |
446 | ✗ | while (master->IsRunning()) { | |
447 | // acquire a new job | ||
448 | ✗ | WorkerJob job = master->Acquire(); | |
449 | |||
450 | // check if we need to terminate | ||
451 | ✗ | if (job.is_death_sentence) | |
452 | ✗ | break; | |
453 | |||
454 | // do what you are supposed to do | ||
455 | ✗ | worker(job.data); | |
456 | } | ||
457 | |||
458 | // | ||
459 | // TEAR DOWN | ||
460 | //////////// | ||
461 | |||
462 | // give the worker the chance to tidy up | ||
463 | ✗ | worker.TearDown(); | |
464 | |||
465 | // good bye thread... | ||
466 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Terminating Worker..."); | |
467 | ✗ | return NULL; | |
468 | } | ||
469 | |||
470 | |||
471 | template<class WorkerT> | ||
472 | ✗ | void *ConcurrentWorkers<WorkerT>::RunCallbackThreadWrapper(void *run_binding) { | |
473 | ✗ | const RunBinding &binding = *(static_cast<RunBinding *>(run_binding)); | |
474 | ✗ | ConcurrentWorkers<WorkerT> *master = binding.delegate; | |
475 | |||
476 | ✗ | master->ReportStartedWorker(); | |
477 | |||
478 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, | |
479 | "Started dedicated callback worker"); | ||
480 | ✗ | master->RunCallbackThread(); | |
481 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Terminating Callback Worker..."); | |
482 | |||
483 | ✗ | return NULL; | |
484 | } | ||
485 | |||
486 | |||
487 | template<class WorkerT> | ||
488 | ✗ | void ConcurrentWorkers<WorkerT>::RunCallbackThread() { | |
489 | ✗ | while (IsRunning()) { | |
490 | ✗ | const CallbackJob callback_job = results_queue_.Dequeue(); | |
491 | |||
492 | // stop callback processing if needed | ||
493 | ✗ | if (callback_job.is_death_sentence) { | |
494 | ✗ | break; | |
495 | } | ||
496 | |||
497 | // notify all observers about the finished job | ||
498 | ✗ | this->NotifyListeners(callback_job.data); | |
499 | |||
500 | // remove the job from the pending 'list' and add it to the ready 'list' | ||
501 | ✗ | atomic_dec32(&jobs_pending_); | |
502 | ✗ | atomic_inc64(&jobs_processed_); | |
503 | |||
504 | // signal the Spooler that all jobs are done... | ||
505 | ✗ | if (atomic_read32(&jobs_pending_) == 0) { | |
506 | ✗ | pthread_cond_broadcast(&jobs_all_done_); | |
507 | } | ||
508 | } | ||
509 | } | ||
510 | |||
511 | |||
512 | template<class WorkerT> | ||
513 | ✗ | void ConcurrentWorkers<WorkerT>::ReportStartedWorker() const { | |
514 | ✗ | MutexLockGuard lock(status_mutex_); | |
515 | ✗ | ++workers_started_; | |
516 | ✗ | pthread_cond_signal(&worker_started_); | |
517 | } | ||
518 | |||
519 | |||
520 | template<class WorkerT> | ||
521 | ✗ | void ConcurrentWorkers<WorkerT>::Schedule(WorkerJob job) { | |
522 | // Note: This method can be called from arbitrary threads. Thus we do not | ||
523 | // necessarily have just one producer in the system. | ||
524 | |||
525 | // check if it makes sense to schedule this job | ||
526 | ✗ | if (!IsRunning() && !job.is_death_sentence) { | |
527 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, | |
528 | "Tried to schedule a job but " | ||
529 | "concurrency was not running..."); | ||
530 | ✗ | return; | |
531 | } | ||
532 | |||
533 | ✗ | jobs_queue_.Enqueue(job); | |
534 | ✗ | if (!job.is_death_sentence) { | |
535 | ✗ | atomic_inc32(&jobs_pending_); | |
536 | } | ||
537 | } | ||
538 | |||
539 | |||
540 | template<class WorkerT> | ||
541 | ✗ | void ConcurrentWorkers<WorkerT>::ScheduleDeathSentences() { | |
542 | ✗ | assert(!IsRunning()); | |
543 | |||
544 | // make sure that the queue is empty before we schedule a death sentence | ||
545 | ✗ | TruncateJobQueue(); | |
546 | |||
547 | // schedule a death sentence for each running thread | ||
548 | ✗ | const unsigned int number_of_workers = GetNumberOfWorkers(); | |
549 | ✗ | for (unsigned int i = 0; i < number_of_workers; ++i) { | |
550 | ✗ | Schedule(WorkerJob()); | |
551 | } | ||
552 | |||
553 | // schedule a death sentence for the callback thread | ||
554 | ✗ | results_queue_.Enqueue(CallbackJob()); | |
555 | } | ||
556 | |||
557 | |||
558 | template<class WorkerT> | ||
559 | typename ConcurrentWorkers<WorkerT>::WorkerJob | ||
560 | ✗ | ConcurrentWorkers<WorkerT>::Acquire() { | |
561 | // Note: This method is exclusively called inside the worker threads! | ||
562 | // Any other usage might produce undefined behavior. | ||
563 | ✗ | return jobs_queue_.Dequeue(); | |
564 | } | ||
565 | |||
566 | |||
567 | template<class WorkerT> | ||
568 | ✗ | void ConcurrentWorkers<WorkerT>::TruncateJobQueue(const bool forget_pending) { | |
569 | // Note: This method will throw away all jobs currently waiting in the job | ||
570 | // queue. These jobs will not be processed! | ||
571 | ✗ | const unsigned int dropped_jobs = jobs_queue_.Drop(); | |
572 | |||
573 | // if desired, we remove the jobs from the pending 'list' | ||
574 | ✗ | if (forget_pending) { | |
575 | ✗ | atomic_xadd32(&jobs_pending_, -dropped_jobs); | |
576 | } | ||
577 | } | ||
578 | |||
579 | |||
580 | template<class WorkerT> | ||
581 | ✗ | void ConcurrentWorkers<WorkerT>::Terminate() { | |
582 | // Note: this method causes workers to die immediately after they finished | ||
583 | // their last acquired job. To make sure that each worker will check | ||
584 | // the running state, we schedule empty jobs or Death Sentences. | ||
585 | |||
586 | ✗ | assert(IsRunning()); | |
587 | |||
588 | // unset the running flag (causing threads to die on the next checkpoint) | ||
589 | ✗ | StopRunning(); | |
590 | |||
591 | // schedule empty jobs to make sure that each worker will actually reach the | ||
592 | // next checkpoint in their processing loop and terminate as expected | ||
593 | ✗ | ScheduleDeathSentences(); | |
594 | |||
595 | // wait for the worker threads to return | ||
596 | ✗ | WorkerThreads::const_iterator i = worker_threads_.begin(); | |
597 | ✗ | WorkerThreads::const_iterator iend = worker_threads_.end(); | |
598 | ✗ | for (; i != iend; ++i) { | |
599 | ✗ | pthread_join(*i, NULL); | |
600 | } | ||
601 | |||
602 | // wait for the callback worker thread | ||
603 | ✗ | pthread_join(callback_thread_, NULL); | |
604 | |||
605 | // check if we finished all pending jobs | ||
606 | ✗ | const int pending = atomic_read32(&jobs_pending_); | |
607 | ✗ | if (pending > 0) { | |
608 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, | |
609 | "Job queue was not fully processed. " | ||
610 | "Still %d jobs were pending and " | ||
611 | "will not be executed anymore.", | ||
612 | pending); | ||
613 | } | ||
614 | |||
615 | // check if we had failed jobs | ||
616 | ✗ | const int failed = atomic_read32(&jobs_failed_); | |
617 | ✗ | if (failed > 0) { | |
618 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, "We've had %d failed jobs.", failed); | |
619 | } | ||
620 | |||
621 | // thanks, and good bye... | ||
622 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, | |
623 | "All workers stopped. They processed %ld jobs. Terminating...", | ||
624 | atomic_read64(&jobs_processed_)); | ||
625 | } | ||
626 | |||
627 | |||
628 | template<class WorkerT> | ||
629 | ✗ | void ConcurrentWorkers<WorkerT>::WaitForEmptyQueue() const { | |
630 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, | |
631 | "Waiting for %d jobs to be finished", atomic_read32(&jobs_pending_)); | ||
632 | |||
633 | // wait until all pending jobs are processed | ||
634 | { | ||
635 | ✗ | MutexLockGuard lock(jobs_all_done_mutex_); | |
636 | ✗ | while (atomic_read32(&jobs_pending_) > 0) { | |
637 | ✗ | pthread_cond_wait(&jobs_all_done_, &jobs_all_done_mutex_); | |
638 | } | ||
639 | } | ||
640 | |||
641 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Jobs are done... go on"); | |
642 | } | ||
643 | |||
644 | |||
645 | template<class WorkerT> | ||
646 | void ConcurrentWorkers<WorkerT>::WaitForTermination() { | ||
647 | if (!IsRunning()) | ||
648 | return; | ||
649 | |||
650 | WaitForEmptyQueue(); | ||
651 | Terminate(); | ||
652 | } | ||
653 | |||
654 | |||
655 | template<class WorkerT> | ||
656 | ✗ | void ConcurrentWorkers<WorkerT>::JobDone( | |
657 | const ConcurrentWorkers<WorkerT>::returned_data_t &data, | ||
658 | const bool success) { | ||
659 | // BEWARE! | ||
660 | // This is a callback method that might be called from a different thread! | ||
661 | |||
662 | // check if the finished job was successful | ||
663 | ✗ | if (!success) { | |
664 | ✗ | atomic_inc32(&jobs_failed_); | |
665 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, "Job failed"); | |
666 | } | ||
667 | |||
668 | // queue the result in the callback channel | ||
669 | ✗ | results_queue_.Enqueue(CallbackJob(data)); | |
670 | } | ||
671 | |||
672 | #ifdef CVMFS_NAMESPACE_GUARD | ||
673 | } // namespace CVMFS_NAMESPACE_GUARD | ||
674 | #endif | ||
675 | |||
676 | #endif // CVMFS_UTIL_CONCURRENCY_IMPL_H_ | ||
677 |