Directory: | cvmfs/ |
---|---|
File: | cvmfs/util/concurrency_impl.h |
Date: | 2025-02-09 02:34:19 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 119 | 265 | 44.9% |
Branches: | 52 | 142 | 36.6% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #ifndef CVMFS_UTIL_CONCURRENCY_IMPL_H_ | ||
6 | #define CVMFS_UTIL_CONCURRENCY_IMPL_H_ | ||
7 | |||
8 | #include "util/logging.h" | ||
9 | |||
10 | #ifdef CVMFS_NAMESPACE_GUARD | ||
11 | namespace CVMFS_NAMESPACE_GUARD { | ||
12 | #endif | ||
13 | |||
14 | // | ||
15 | // +---------------------------------------------------------------------------- | ||
16 | // | SynchronizingCounter | ||
17 | // | ||
18 | |||
19 | |||
20 | template <typename T> | ||
21 | 145870490 | void SynchronizingCounter<T>::SetValueUnprotected(const T new_value) { | |
22 | // make sure that 0 <= new_value <= maximal_value_ if maximal_value_ != 0 | ||
23 |
5/6✓ Branch 1 taken 1346497 times.
✓ Branch 2 taken 144523993 times.
✓ Branch 3 taken 1345269 times.
✓ Branch 4 taken 1228 times.
✓ Branch 5 taken 1345269 times.
✗ Branch 6 not taken.
|
145870490 | assert(!HasMaximalValue() || |
24 | (new_value >= T(0) && new_value <= maximal_value_)); | ||
25 | |||
26 | 145870490 | value_ = new_value; | |
27 | |||
28 |
2/2✓ Branch 0 taken 37960 times.
✓ Branch 1 taken 145832530 times.
|
145870490 | if (value_ == T(0)) { |
29 | 37960 | pthread_cond_broadcast(&became_zero_); | |
30 | } | ||
31 | |||
32 |
6/6✓ Branch 1 taken 1346497 times.
✓ Branch 2 taken 144523993 times.
✓ Branch 3 taken 801817 times.
✓ Branch 4 taken 544680 times.
✓ Branch 5 taken 801817 times.
✓ Branch 6 taken 145068673 times.
|
145870490 | if (HasMaximalValue() && value_ < maximal_value_) { |
33 | 801817 | pthread_cond_broadcast(&free_slot_); | |
34 | } | ||
35 | 145870490 | } | |
36 | |||
37 | |||
38 | template <typename T> | ||
39 | 72935238 | void SynchronizingCounter<T>::WaitForFreeSlotUnprotected() { | |
40 |
6/6✓ Branch 1 taken 3831413 times.
✓ Branch 2 taken 72261992 times.
✓ Branch 3 taken 3158167 times.
✓ Branch 4 taken 673246 times.
✓ Branch 5 taken 3158167 times.
✓ Branch 6 taken 72935238 times.
|
76093405 | while (HasMaximalValue() && value_ >= maximal_value_) { |
41 | 3158167 | pthread_cond_wait(&free_slot_, &mutex_); | |
42 | } | ||
43 |
3/4✓ Branch 1 taken 673246 times.
✓ Branch 2 taken 72261992 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 673246 times.
|
72935238 | assert(!HasMaximalValue() || value_ < maximal_value_); |
44 | 72935238 | } | |
45 | |||
46 | |||
47 | template <typename T> | ||
48 | 139 | void SynchronizingCounter<T>::Initialize() { | |
49 | 139 | const bool init_successful = ( | |
50 |
1/2✓ Branch 1 taken 139 times.
✗ Branch 2 not taken.
|
278 | pthread_mutex_init(&mutex_, NULL) == 0 && |
51 |
2/4✓ Branch 0 taken 139 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 139 times.
✗ Branch 4 not taken.
|
278 | pthread_cond_init(&became_zero_, NULL) == 0 && |
52 | 139 | pthread_cond_init(&free_slot_, NULL) == 0); | |
53 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 139 times.
|
139 | assert(init_successful); |
54 | 139 | } | |
55 | |||
56 | |||
57 | template <typename T> | ||
58 | 139 | void SynchronizingCounter<T>::Destroy() { | |
59 | 139 | pthread_mutex_destroy(&mutex_); | |
60 | 139 | pthread_cond_destroy(&became_zero_); | |
61 | 139 | pthread_cond_destroy(&free_slot_); | |
62 | 139 | } | |
63 | |||
64 | |||
65 | // | ||
66 | // +---------------------------------------------------------------------------- | ||
67 | // | Observable | ||
68 | // | ||
69 | |||
70 | |||
71 | template <typename ParamT> | ||
72 | 738054 | Observable<ParamT>::Observable() { | |
73 | 738054 | const int ret = pthread_rwlock_init(&listeners_rw_lock_, NULL); | |
74 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 369064 times.
|
738054 | assert(ret == 0); |
75 | 738054 | } | |
76 | |||
77 | |||
78 | template <typename ParamT> | ||
79 | 738034 | Observable<ParamT>::~Observable() { | |
80 | 738034 | UnregisterListeners(); | |
81 | 738050 | pthread_rwlock_destroy(&listeners_rw_lock_); | |
82 | } | ||
83 | |||
84 | |||
85 | template <typename ParamT> | ||
86 | template <class DelegateT, class ClosureDataT> | ||
87 | 183697 | typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener( | |
88 | typename BoundClosure<ParamT, | ||
89 | DelegateT, | ||
90 | ClosureDataT>::CallbackMethod method, | ||
91 | DelegateT *delegate, | ||
92 | ClosureDataT data) { | ||
93 | // create a new BoundClosure, register it and return the handle | ||
94 | CallbackBase<ParamT> *callback = | ||
95 | 183697 | Observable<ParamT>::MakeClosure(method, delegate, data); | |
96 | 183697 | RegisterListener(callback); | |
97 | 183697 | return callback; | |
98 | } | ||
99 | |||
100 | |||
101 | template <typename ParamT> | ||
102 | template <class DelegateT> | ||
103 | 1046 | typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener( | |
104 | typename BoundCallback<ParamT, DelegateT>::CallbackMethod method, | ||
105 | DelegateT *delegate) { | ||
106 | // create a new BoundCallback, register it and return the handle | ||
107 | CallbackBase<ParamT> *callback = | ||
108 | 1046 | Observable<ParamT>::MakeCallback(method, delegate); | |
109 | 1046 | RegisterListener(callback); | |
110 | 1046 | return callback; | |
111 | } | ||
112 | |||
113 | |||
114 | template <typename ParamT> | ||
115 | 71 | typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener( | |
116 | typename Callback<ParamT>::CallbackFunction fn) { | ||
117 | // create a new Callback, register it and return the handle | ||
118 | CallbackBase<ParamT> *callback = | ||
119 | 71 | Observable<ParamT>::MakeCallback(fn); | |
120 | 71 | RegisterListener(callback); | |
121 | 71 | return callback; | |
122 | } | ||
123 | |||
124 | |||
125 | template <typename ParamT> | ||
126 | 368537 | void Observable<ParamT>::RegisterListener( | |
127 | Observable<ParamT>::CallbackPtr callback_object) { | ||
128 | // register a generic CallbackBase callback | ||
129 | 368537 | WriteLockGuard guard(listeners_rw_lock_); | |
130 |
1/2✓ Branch 1 taken 184305 times.
✗ Branch 2 not taken.
|
368537 | listeners_.insert(callback_object); |
131 | 368537 | } | |
132 | |||
133 | |||
134 | template <typename ParamT> | ||
135 | 87 | void Observable<ParamT>::UnregisterListener( | |
136 | typename Observable<ParamT>::CallbackPtr callback_object) { | ||
137 | // remove a callback handle from the callbacks list | ||
138 | // if it is not registered --> crash | ||
139 | 87 | WriteLockGuard guard(listeners_rw_lock_); | |
140 |
1/2✓ Branch 1 taken 87 times.
✗ Branch 2 not taken.
|
87 | const size_t was_removed = listeners_.erase(callback_object); |
141 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 87 times.
|
87 | assert(was_removed > 0); |
142 |
1/2✓ Branch 0 taken 87 times.
✗ Branch 1 not taken.
|
87 | delete callback_object; |
143 | 87 | } | |
144 | |||
145 | |||
146 | template <typename ParamT> | ||
147 | 737557 | void Observable<ParamT>::UnregisterListeners() { | |
148 | 737557 | WriteLockGuard guard(listeners_rw_lock_); | |
149 | |||
150 | // remove all callbacks from the list | ||
151 | 737605 | typename Callbacks::const_iterator i = listeners_.begin(); | |
152 | 737577 | typename Callbacks::const_iterator iend = listeners_.end(); | |
153 |
2/2✓ Branch 2 taken 184180 times.
✓ Branch 3 taken 368957 times.
|
1105504 | for (; i != iend; ++i) { |
154 |
1/2✓ Branch 1 taken 184183 times.
✗ Branch 2 not taken.
|
367895 | delete *i; |
155 | } | ||
156 | 737407 | listeners_.clear(); | |
157 | 737539 | } | |
158 | |||
159 | |||
160 | template <typename ParamT> | ||
161 | 2139956 | void Observable<ParamT>::NotifyListeners(const ParamT ¶meter) { | |
162 | 2139956 | ReadLockGuard guard(listeners_rw_lock_); | |
163 | |||
164 | // invoke all callbacks and inform them about new data | ||
165 | 2140074 | typename Callbacks::const_iterator i = listeners_.begin(); | |
166 | 2139924 | typename Callbacks::const_iterator iend = listeners_.end(); | |
167 |
2/2✓ Branch 2 taken 1085907 times.
✓ Branch 3 taken 1085705 times.
|
4279594 | for (; i != iend; ++i) { |
168 |
1/2✓ Branch 2 taken 1085919 times.
✗ Branch 3 not taken.
|
2139690 | (**i)(parameter); |
169 | } | ||
170 | 2139286 | } | |
171 | |||
172 | |||
173 | // | ||
174 | // +---------------------------------------------------------------------------- | ||
175 | // | FifoChannel | ||
176 | // | ||
177 | |||
178 | |||
179 | template <class T> | ||
180 | 10 | FifoChannel<T>::FifoChannel(const size_t maximal_length, | |
181 | const size_t drainout_threshold) : | ||
182 | 10 | maximal_queue_length_(maximal_length), | |
183 | 10 | queue_drainout_threshold_(drainout_threshold) | |
184 | { | ||
185 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
|
10 | assert(drainout_threshold <= maximal_length); |
186 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
|
10 | assert(drainout_threshold > 0); |
187 | |||
188 | 10 | const bool successful = ( | |
189 |
1/2✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
|
20 | pthread_mutex_init(&mutex_, NULL) == 0 && |
190 |
2/4✓ Branch 0 taken 10 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 10 times.
✗ Branch 4 not taken.
|
20 | pthread_cond_init(&queue_is_not_empty_, NULL) == 0 && |
191 | 10 | pthread_cond_init(&queue_is_not_full_, NULL) == 0); | |
192 | |||
193 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
|
10 | assert(successful); |
194 | 10 | } | |
195 | |||
196 | |||
197 | template <class T> | ||
198 | 28 | FifoChannel<T>::~FifoChannel() { | |
199 | 20 | pthread_cond_destroy(&queue_is_not_empty_); | |
200 | 20 | pthread_cond_destroy(&queue_is_not_full_); | |
201 | 20 | pthread_mutex_destroy(&mutex_); | |
202 | 48 | } | |
203 | |||
204 | |||
205 | template <class T> | ||
206 | 1000216 | void FifoChannel<T>::Enqueue(const T &data) { | |
207 | 1000216 | MutexLockGuard lock(mutex_); | |
208 | |||
209 | // wait for space in the queue | ||
210 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 1000216 times.
|
1000216 | while (this->size() >= maximal_queue_length_) { |
211 | ✗ | pthread_cond_wait(&queue_is_not_full_, &mutex_); | |
212 | } | ||
213 | |||
214 | // put something into the queue | ||
215 |
1/2✓ Branch 1 taken 1000216 times.
✗ Branch 2 not taken.
|
1000216 | this->push(data); |
216 | |||
217 | // wake all waiting threads | ||
218 | 1000216 | pthread_cond_broadcast(&queue_is_not_empty_); | |
219 | 1000216 | } | |
220 | |||
221 | |||
222 | template <class T> | ||
223 | 989104 | const T FifoChannel<T>::Dequeue() { | |
224 | 989104 | MutexLockGuard lock(mutex_); | |
225 | |||
226 | // wait until there is something to do | ||
227 |
2/2✓ Branch 1 taken 2934670 times.
✓ Branch 2 taken 1000117 times.
|
3934787 | while (this->empty()) { |
228 |
1/2✓ Branch 1 taken 2934670 times.
✗ Branch 2 not taken.
|
2934670 | pthread_cond_wait(&queue_is_not_empty_, &mutex_); |
229 | } | ||
230 | |||
231 | // get the item from the queue | ||
232 | 1000117 | T data = this->front(); this->pop(); | |
233 | |||
234 | // signal waiting threads about the free space | ||
235 |
2/2✓ Branch 1 taken 999923 times.
✓ Branch 2 taken 194 times.
|
1000117 | if (this->size() < queue_drainout_threshold_) { |
236 | 999923 | pthread_cond_broadcast(&queue_is_not_full_); | |
237 | } | ||
238 | |||
239 | // return the acquired job | ||
240 | 989831 | return data; | |
241 | 1000117 | } | |
242 | |||
243 | |||
244 | template <class T> | ||
245 | 1 | unsigned int FifoChannel<T>::Drop() { | |
246 | 1 | MutexLockGuard lock(mutex_); | |
247 | |||
248 | 1 | unsigned int dropped_items = 0; | |
249 |
2/2✓ Branch 1 taken 99 times.
✓ Branch 2 taken 1 times.
|
100 | while (!this->empty()) { |
250 | 99 | this->pop(); | |
251 | 99 | ++dropped_items; | |
252 | } | ||
253 | |||
254 | 1 | pthread_cond_broadcast(&queue_is_not_full_); | |
255 | |||
256 | 2 | return dropped_items; | |
257 | 1 | } | |
258 | |||
259 | |||
260 | template <class T> | ||
261 | 2 | size_t FifoChannel<T>::GetItemCount() const { | |
262 | 2 | MutexLockGuard lock(mutex_); | |
263 | 2 | return this->size(); | |
264 | 2 | } | |
265 | |||
266 | |||
267 | template <class T> | ||
268 | 5 | bool FifoChannel<T>::IsEmpty() const { | |
269 | 5 | MutexLockGuard lock(mutex_); | |
270 | 5 | return this->empty(); | |
271 | 5 | } | |
272 | |||
273 | |||
274 | template <class T> | ||
275 | 1 | size_t FifoChannel<T>::GetMaximalItemCount() const { | |
276 | 1 | return maximal_queue_length_; | |
277 | } | ||
278 | |||
279 | |||
280 | // | ||
281 | // +---------------------------------------------------------------------------- | ||
282 | // | ConcurrentWorkers | ||
283 | // | ||
284 | |||
285 | |||
286 | template <class WorkerT> | ||
287 | ✗ | ConcurrentWorkers<WorkerT>::ConcurrentWorkers( | |
288 | const size_t number_of_workers, | ||
289 | const size_t maximal_queue_length, | ||
290 | ConcurrentWorkers<WorkerT>::worker_context_t *worker_context) : | ||
291 | ✗ | number_of_workers_(number_of_workers), | |
292 | ✗ | worker_context_(worker_context), | |
293 | ✗ | thread_context_(this, worker_context_), | |
294 | ✗ | initialized_(false), | |
295 | ✗ | running_(false), | |
296 | ✗ | workers_started_(0), | |
297 | ✗ | jobs_queue_(maximal_queue_length, maximal_queue_length / 4 + 1), | |
298 | ✗ | results_queue_(maximal_queue_length, 1) | |
299 | { | ||
300 | ✗ | assert(maximal_queue_length >= number_of_workers); | |
301 | ✗ | assert(number_of_workers > 0); | |
302 | |||
303 | ✗ | atomic_init32(&jobs_pending_); | |
304 | ✗ | atomic_init32(&jobs_failed_); | |
305 | ✗ | atomic_init64(&jobs_processed_); | |
306 | } | ||
307 | |||
308 | |||
309 | template <class WorkerT> | ||
310 | ✗ | ConcurrentWorkers<WorkerT>::~ConcurrentWorkers() { | |
311 | ✗ | if (IsRunning()) { | |
312 | ✗ | Terminate(); | |
313 | } | ||
314 | |||
315 | // destroy some synchronisation data structures | ||
316 | ✗ | pthread_cond_destroy(&worker_started_); | |
317 | ✗ | pthread_cond_destroy(&jobs_all_done_); | |
318 | ✗ | pthread_mutex_destroy(&status_mutex_); | |
319 | ✗ | pthread_mutex_destroy(&jobs_all_done_mutex_); | |
320 | } | ||
321 | |||
322 | |||
323 | template <class WorkerT> | ||
324 | ✗ | bool ConcurrentWorkers<WorkerT>::Initialize() { | |
325 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Initializing ConcurrentWorker " | |
326 | "object with %lu worker threads " | ||
327 | "and a queue length of %zu", | ||
328 | number_of_workers_, jobs_queue_.GetMaximalItemCount()); | ||
329 | // LogCvmfs(kLogConcurrency, kLogStdout, "sizeof(expected_data_t): %d\n" | ||
330 | // "sizeof(returned_data_t): %d", | ||
331 | // sizeof(expected_data_t), sizeof(returned_data_t)); | ||
332 | |||
333 | // initialize synchronisation for job queue (Workers) | ||
334 | ✗ | if (pthread_mutex_init(&status_mutex_, NULL) != 0 || | |
335 | ✗ | pthread_mutex_init(&jobs_all_done_mutex_, NULL) != 0 || | |
336 | ✗ | pthread_cond_init(&worker_started_, NULL) != 0 || | |
337 | ✗ | pthread_cond_init(&jobs_all_done_, NULL) != 0) { | |
338 | ✗ | return false; | |
339 | } | ||
340 | |||
341 | // spawn the Worker objects in their own threads | ||
342 | ✗ | if (!SpawnWorkers()) { | |
343 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn workers"); | |
344 | ✗ | return false; | |
345 | } | ||
346 | |||
347 | // all done... | ||
348 | ✗ | initialized_ = true; | |
349 | ✗ | return true; | |
350 | } | ||
351 | |||
352 | |||
353 | template <class WorkerT> | ||
354 | ✗ | bool ConcurrentWorkers<WorkerT>::SpawnWorkers() { | |
355 | ✗ | assert(worker_threads_.size() == 0); | |
356 | ✗ | worker_threads_.resize(number_of_workers_); | |
357 | |||
358 | // set the running flag to trap workers in their treadmills | ||
359 | ✗ | StartRunning(); | |
360 | |||
361 | // spawn the swarm and make them work | ||
362 | ✗ | bool success = true; | |
363 | ✗ | WorkerThreads::iterator i = worker_threads_.begin(); | |
364 | ✗ | WorkerThreads::const_iterator iend = worker_threads_.end(); | |
365 | ✗ | for (; i != iend; ++i) { | |
366 | ✗ | pthread_t* thread = &(*i); | |
367 | ✗ | const int retval = pthread_create( | |
368 | thread, | ||
369 | NULL, | ||
370 | &ConcurrentWorkers<WorkerT>::RunWorker, | ||
371 | ✗ | reinterpret_cast<void *>(&thread_context_)); | |
372 | ✗ | if (retval != 0) { | |
373 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn a Worker"); | |
374 | ✗ | success = false; | |
375 | } | ||
376 | } | ||
377 | |||
378 | // spawn the callback processing thread | ||
379 | const int retval = | ||
380 | ✗ | pthread_create( | |
381 | &callback_thread_, | ||
382 | NULL, | ||
383 | &ConcurrentWorkers<WorkerT>::RunCallbackThreadWrapper, | ||
384 | ✗ | reinterpret_cast<void *>(&thread_context_)); | |
385 | ✗ | if (retval != 0) { | |
386 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn the callback " | |
387 | "worker thread"); | ||
388 | ✗ | success = false; | |
389 | } | ||
390 | |||
391 | // wait for all workers to report in... | ||
392 | { | ||
393 | ✗ | MutexLockGuard guard(status_mutex_); | |
394 | // +1 -> callback thread | ||
395 | ✗ | while (workers_started_ < number_of_workers_ + 1) { | |
396 | ✗ | pthread_cond_wait(&worker_started_, &status_mutex_); | |
397 | } | ||
398 | } | ||
399 | |||
400 | // all done... | ||
401 | ✗ | return success; | |
402 | } | ||
403 | |||
404 | |||
405 | template <class WorkerT> | ||
406 | ✗ | void* ConcurrentWorkers<WorkerT>::RunWorker(void *run_binding) { | |
407 | // NOTE: This is the actual worker thread code! | ||
408 | |||
409 | // | ||
410 | // INITIALIZATION | ||
411 | ///////////////// | ||
412 | |||
413 | // get contextual information | ||
414 | ✗ | const WorkerRunBinding &binding = | |
415 | *(static_cast<WorkerRunBinding*>(run_binding)); | ||
416 | ✗ | ConcurrentWorkers<WorkerT> *master = binding.delegate; | |
417 | ✗ | const worker_context_t *worker_context = binding.worker_context; | |
418 | |||
419 | // boot up the worker object and make sure it works | ||
420 | ✗ | WorkerT worker(worker_context); | |
421 | ✗ | worker.RegisterMaster(master); | |
422 | ✗ | const bool init_success = worker.Initialize(); | |
423 | |||
424 | // tell the master that this worker was started | ||
425 | ✗ | master->ReportStartedWorker(); | |
426 | |||
427 | ✗ | if (!init_success) { | |
428 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, "Worker was not initialized " | |
429 | "properly... it will die now!"); | ||
430 | ✗ | return NULL; | |
431 | } | ||
432 | |||
433 | // | ||
434 | // PROCESSING LOOP | ||
435 | ////////////////// | ||
436 | |||
437 | // start the processing loop | ||
438 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Starting Worker..."); | |
439 | ✗ | while (master->IsRunning()) { | |
440 | // acquire a new job | ||
441 | ✗ | WorkerJob job = master->Acquire(); | |
442 | |||
443 | // check if we need to terminate | ||
444 | ✗ | if (job.is_death_sentence) | |
445 | ✗ | break; | |
446 | |||
447 | // do what you are supposed to do | ||
448 | ✗ | worker(job.data); | |
449 | } | ||
450 | |||
451 | // | ||
452 | // TEAR DOWN | ||
453 | //////////// | ||
454 | |||
455 | // give the worker the chance to tidy up | ||
456 | ✗ | worker.TearDown(); | |
457 | |||
458 | // good bye thread... | ||
459 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Terminating Worker..."); | |
460 | ✗ | return NULL; | |
461 | } | ||
462 | |||
463 | |||
464 | template <class WorkerT> | ||
465 | ✗ | void* ConcurrentWorkers<WorkerT>::RunCallbackThreadWrapper(void *run_binding) { | |
466 | ✗ | const RunBinding &binding = *(static_cast<RunBinding*>(run_binding)); | |
467 | ✗ | ConcurrentWorkers<WorkerT> *master = binding.delegate; | |
468 | |||
469 | ✗ | master->ReportStartedWorker(); | |
470 | |||
471 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, | |
472 | "Started dedicated callback worker"); | ||
473 | ✗ | master->RunCallbackThread(); | |
474 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Terminating Callback Worker..."); | |
475 | |||
476 | ✗ | return NULL; | |
477 | } | ||
478 | |||
479 | |||
480 | template <class WorkerT> | ||
481 | ✗ | void ConcurrentWorkers<WorkerT>::RunCallbackThread() { | |
482 | ✗ | while (IsRunning()) { | |
483 | ✗ | const CallbackJob callback_job = results_queue_.Dequeue(); | |
484 | |||
485 | // stop callback processing if needed | ||
486 | ✗ | if (callback_job.is_death_sentence) { | |
487 | ✗ | break; | |
488 | } | ||
489 | |||
490 | // notify all observers about the finished job | ||
491 | ✗ | this->NotifyListeners(callback_job.data); | |
492 | |||
493 | // remove the job from the pending 'list' and add it to the ready 'list' | ||
494 | ✗ | atomic_dec32(&jobs_pending_); | |
495 | ✗ | atomic_inc64(&jobs_processed_); | |
496 | |||
497 | // signal the Spooler that all jobs are done... | ||
498 | ✗ | if (atomic_read32(&jobs_pending_) == 0) { | |
499 | ✗ | pthread_cond_broadcast(&jobs_all_done_); | |
500 | } | ||
501 | } | ||
502 | } | ||
503 | |||
504 | |||
505 | template <class WorkerT> | ||
506 | ✗ | void ConcurrentWorkers<WorkerT>::ReportStartedWorker() const { | |
507 | ✗ | MutexLockGuard lock(status_mutex_); | |
508 | ✗ | ++workers_started_; | |
509 | ✗ | pthread_cond_signal(&worker_started_); | |
510 | } | ||
511 | |||
512 | |||
513 | template <class WorkerT> | ||
514 | ✗ | void ConcurrentWorkers<WorkerT>::Schedule(WorkerJob job) { | |
515 | // Note: This method can be called from arbitrary threads. Thus we do not | ||
516 | // necessarily have just one producer in the system. | ||
517 | |||
518 | // check if it makes sense to schedule this job | ||
519 | ✗ | if (!IsRunning() && !job.is_death_sentence) { | |
520 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, "Tried to schedule a job but " | |
521 | "concurrency was not running..."); | ||
522 | ✗ | return; | |
523 | } | ||
524 | |||
525 | ✗ | jobs_queue_.Enqueue(job); | |
526 | ✗ | if (!job.is_death_sentence) { | |
527 | ✗ | atomic_inc32(&jobs_pending_); | |
528 | } | ||
529 | } | ||
530 | |||
531 | |||
532 | template <class WorkerT> | ||
533 | ✗ | void ConcurrentWorkers<WorkerT>::ScheduleDeathSentences() { | |
534 | ✗ | assert(!IsRunning()); | |
535 | |||
536 | // make sure that the queue is empty before we schedule a death sentence | ||
537 | ✗ | TruncateJobQueue(); | |
538 | |||
539 | // schedule a death sentence for each running thread | ||
540 | ✗ | const unsigned int number_of_workers = GetNumberOfWorkers(); | |
541 | ✗ | for (unsigned int i = 0; i < number_of_workers; ++i) { | |
542 | ✗ | Schedule(WorkerJob()); | |
543 | } | ||
544 | |||
545 | // schedule a death sentence for the callback thread | ||
546 | ✗ | results_queue_.Enqueue(CallbackJob()); | |
547 | } | ||
548 | |||
549 | |||
550 | template <class WorkerT> | ||
551 | typename ConcurrentWorkers<WorkerT>::WorkerJob | ||
552 | ✗ | ConcurrentWorkers<WorkerT>::Acquire() | |
553 | { | ||
554 | // Note: This method is exclusively called inside the worker threads! | ||
555 | // Any other usage might produce undefined behavior. | ||
556 | ✗ | return jobs_queue_.Dequeue(); | |
557 | } | ||
558 | |||
559 | |||
560 | template <class WorkerT> | ||
561 | ✗ | void ConcurrentWorkers<WorkerT>::TruncateJobQueue(const bool forget_pending) { | |
562 | // Note: This method will throw away all jobs currently waiting in the job | ||
563 | // queue. These jobs will not be processed! | ||
564 | ✗ | const unsigned int dropped_jobs = jobs_queue_.Drop(); | |
565 | |||
566 | // if desired, we remove the jobs from the pending 'list' | ||
567 | ✗ | if (forget_pending) { | |
568 | ✗ | atomic_xadd32(&jobs_pending_, -dropped_jobs); | |
569 | } | ||
570 | } | ||
571 | |||
572 | |||
573 | template <class WorkerT> | ||
574 | ✗ | void ConcurrentWorkers<WorkerT>::Terminate() { | |
575 | // Note: this method causes workers to die immediately after they finished | ||
576 | // their last acquired job. To make sure that each worker will check | ||
577 | // the running state, we schedule empty jobs or Death Sentences. | ||
578 | |||
579 | ✗ | assert(IsRunning()); | |
580 | |||
581 | // unset the running flag (causing threads to die on the next checkpoint) | ||
582 | ✗ | StopRunning(); | |
583 | |||
584 | // schedule empty jobs to make sure that each worker will actually reach the | ||
585 | // next checkpoint in their processing loop and terminate as expected | ||
586 | ✗ | ScheduleDeathSentences(); | |
587 | |||
588 | // wait for the worker threads to return | ||
589 | ✗ | WorkerThreads::const_iterator i = worker_threads_.begin(); | |
590 | ✗ | WorkerThreads::const_iterator iend = worker_threads_.end(); | |
591 | ✗ | for (; i != iend; ++i) { | |
592 | ✗ | pthread_join(*i, NULL); | |
593 | } | ||
594 | |||
595 | // wait for the callback worker thread | ||
596 | ✗ | pthread_join(callback_thread_, NULL); | |
597 | |||
598 | // check if we finished all pending jobs | ||
599 | ✗ | const int pending = atomic_read32(&jobs_pending_); | |
600 | ✗ | if (pending > 0) { | |
601 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, "Job queue was not fully processed. " | |
602 | "Still %d jobs were pending and " | ||
603 | "will not be executed anymore.", | ||
604 | pending); | ||
605 | } | ||
606 | |||
607 | // check if we had failed jobs | ||
608 | ✗ | const int failed = atomic_read32(&jobs_failed_); | |
609 | ✗ | if (failed > 0) { | |
610 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, "We've had %d failed jobs.", | |
611 | failed); | ||
612 | } | ||
613 | |||
614 | // thanks, and good bye... | ||
615 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, | |
616 | "All workers stopped. They processed %ld jobs. Terminating...", | ||
617 | atomic_read64(&jobs_processed_)); | ||
618 | } | ||
619 | |||
620 | |||
621 | template <class WorkerT> | ||
622 | ✗ | void ConcurrentWorkers<WorkerT>::WaitForEmptyQueue() const { | |
623 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, | |
624 | "Waiting for %d jobs to be finished", atomic_read32(&jobs_pending_)); | ||
625 | |||
626 | // wait until all pending jobs are processed | ||
627 | { | ||
628 | ✗ | MutexLockGuard lock(jobs_all_done_mutex_); | |
629 | ✗ | while (atomic_read32(&jobs_pending_) > 0) { | |
630 | ✗ | pthread_cond_wait(&jobs_all_done_, &jobs_all_done_mutex_); | |
631 | } | ||
632 | } | ||
633 | |||
634 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Jobs are done... go on"); | |
635 | } | ||
636 | |||
637 | |||
638 | template <class WorkerT> | ||
639 | void ConcurrentWorkers<WorkerT>::WaitForTermination() { | ||
640 | if (!IsRunning()) | ||
641 | return; | ||
642 | |||
643 | WaitForEmptyQueue(); | ||
644 | Terminate(); | ||
645 | } | ||
646 | |||
647 | |||
648 | template <class WorkerT> | ||
649 | ✗ | void ConcurrentWorkers<WorkerT>::JobDone( | |
650 | const ConcurrentWorkers<WorkerT>::returned_data_t& data, | ||
651 | const bool success) { | ||
652 | // BEWARE! | ||
653 | // This is a callback method that might be called from a different thread! | ||
654 | |||
655 | // check if the finished job was successful | ||
656 | ✗ | if (!success) { | |
657 | ✗ | atomic_inc32(&jobs_failed_); | |
658 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, "Job failed"); | |
659 | } | ||
660 | |||
661 | // queue the result in the callback channel | ||
662 | ✗ | results_queue_.Enqueue(CallbackJob(data)); | |
663 | } | ||
664 | |||
665 | #ifdef CVMFS_NAMESPACE_GUARD | ||
666 | } // namespace CVMFS_NAMESPACE_GUARD | ||
667 | #endif | ||
668 | |||
669 | #endif // CVMFS_UTIL_CONCURRENCY_IMPL_H_ | ||
670 |