Directory: | cvmfs/ |
---|---|
File: | cvmfs/util/concurrency_impl.h |
Date: | 2025-06-22 02:36:02 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 119 | 264 | 45.1% |
Branches: | 54 | 142 | 38.0% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #ifndef CVMFS_UTIL_CONCURRENCY_IMPL_H_ | ||
6 | #define CVMFS_UTIL_CONCURRENCY_IMPL_H_ | ||
7 | |||
8 | #include "util/logging.h" | ||
9 | |||
10 | #ifdef CVMFS_NAMESPACE_GUARD | ||
11 | namespace CVMFS_NAMESPACE_GUARD { | ||
12 | #endif | ||
13 | |||
14 | // | ||
15 | // +---------------------------------------------------------------------------- | ||
16 | // | SynchronizingCounter | ||
17 | // | ||
18 | |||
19 | |||
20 | template<typename T> | ||
21 | 1303124860 | void SynchronizingCounter<T>::SetValueUnprotected(const T new_value) { | |
22 | // make sure that 0 <= new_value <= maximal_value_ if maximal_value_ != 0 | ||
23 |
5/6✓ Branch 1 taken 2408923 times.
✓ Branch 2 taken 1300715937 times.
✓ Branch 3 taken 2391731 times.
✓ Branch 4 taken 17192 times.
✓ Branch 5 taken 2391731 times.
✗ Branch 6 not taken.
|
1303124860 | assert(!HasMaximalValue() |
24 | || (new_value >= T(0) && new_value <= maximal_value_)); | ||
25 | |||
26 | 1303124860 | value_ = new_value; | |
27 | |||
28 |
2/2✓ Branch 0 taken 86795 times.
✓ Branch 1 taken 1303038065 times.
|
1303124860 | if (value_ == T(0)) { |
29 | 86795 | pthread_cond_broadcast(&became_zero_); | |
30 | } | ||
31 | |||
32 |
6/6✓ Branch 1 taken 2408923 times.
✓ Branch 2 taken 1300715937 times.
✓ Branch 3 taken 1813891 times.
✓ Branch 4 taken 595032 times.
✓ Branch 5 taken 1813891 times.
✓ Branch 6 taken 1301310969 times.
|
1303124860 | if (HasMaximalValue() && value_ < maximal_value_) { |
33 | 1813891 | pthread_cond_broadcast(&free_slot_); | |
34 | } | ||
35 | 1303124860 | } | |
36 | |||
37 | |||
38 | template<typename T> | ||
39 | 651562387 | void SynchronizingCounter<T>::WaitForFreeSlotUnprotected() { | |
40 |
6/6✓ Branch 1 taken 4720078 times.
✓ Branch 2 taken 650357928 times.
✓ Branch 3 taken 3515619 times.
✓ Branch 4 taken 1204459 times.
✓ Branch 5 taken 3515619 times.
✓ Branch 6 taken 651562387 times.
|
655078006 | while (HasMaximalValue() && value_ >= maximal_value_) { |
41 | 3515619 | pthread_cond_wait(&free_slot_, &mutex_); | |
42 | } | ||
43 |
3/4✓ Branch 1 taken 1204459 times.
✓ Branch 2 taken 650357928 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 1204459 times.
|
651562387 | assert(!HasMaximalValue() || value_ < maximal_value_); |
44 | 651562387 | } | |
45 | |||
46 | |||
47 | template<typename T> | ||
48 | 2962 | void SynchronizingCounter<T>::Initialize() { | |
49 | 2962 | const bool init_successful = (pthread_mutex_init(&mutex_, NULL) == 0 | |
50 |
1/2✓ Branch 1 taken 2962 times.
✗ Branch 2 not taken.
|
2962 | && pthread_cond_init(&became_zero_, NULL) == 0 |
51 |
2/4✓ Branch 0 taken 2962 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 2962 times.
✗ Branch 4 not taken.
|
5924 | && pthread_cond_init(&free_slot_, NULL) == 0); |
52 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2962 times.
|
2962 | assert(init_successful); |
53 | 2962 | } | |
54 | |||
55 | |||
56 | template<typename T> | ||
57 | 2961 | void SynchronizingCounter<T>::Destroy() { | |
58 | 2961 | pthread_mutex_destroy(&mutex_); | |
59 | 2961 | pthread_cond_destroy(&became_zero_); | |
60 | 2961 | pthread_cond_destroy(&free_slot_); | |
61 | 2961 | } | |
62 | |||
63 | |||
64 | // | ||
65 | // +---------------------------------------------------------------------------- | ||
66 | // | Observable | ||
67 | // | ||
68 | |||
69 | |||
70 | template<typename ParamT> | ||
71 | 36106223 | Observable<ParamT>::Observable() { | |
72 | 36106223 | const int ret = pthread_rwlock_init(&listeners_rw_lock_, NULL); | |
73 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 18053912 times.
|
36106223 | assert(ret == 0); |
74 | 36106223 | } | |
75 | |||
76 | |||
77 | template<typename ParamT> | ||
78 | 36103296 | Observable<ParamT>::~Observable() { | |
79 | 36103296 | UnregisterListeners(); | |
80 | 36102414 | pthread_rwlock_destroy(&listeners_rw_lock_); | |
81 | } | ||
82 | |||
83 | |||
84 | template<typename ParamT> | ||
85 | template<class DelegateT, class ClosureDataT> | ||
86 | 8999867 | typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener( | |
87 | typename BoundClosure<ParamT, DelegateT, ClosureDataT>::CallbackMethod | ||
88 | method, | ||
89 | DelegateT *delegate, | ||
90 | ClosureDataT data) { | ||
91 | // create a new BoundClosure, register it and return the handle | ||
92 | 8999867 | CallbackBase<ParamT> *callback = Observable<ParamT>::MakeClosure( | |
93 | method, delegate, data); | ||
94 | 8999867 | RegisterListener(callback); | |
95 | 8999867 | return callback; | |
96 | } | ||
97 | |||
98 | |||
99 | template<typename ParamT> | ||
100 | template<class DelegateT> | ||
101 | 25796 | typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener( | |
102 | typename BoundCallback<ParamT, DelegateT>::CallbackMethod method, | ||
103 | DelegateT *delegate) { | ||
104 | // create a new BoundCallback, register it and return the handle | ||
105 | 25796 | CallbackBase<ParamT> *callback = Observable<ParamT>::MakeCallback(method, | |
106 | delegate); | ||
107 | 25796 | RegisterListener(callback); | |
108 | 25796 | return callback; | |
109 | } | ||
110 | |||
111 | |||
112 | template<typename ParamT> | ||
113 | 2349 | typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener( | |
114 | typename Callback<ParamT>::CallbackFunction fn) { | ||
115 | // create a new Callback, register it and return the handle | ||
116 | 2349 | CallbackBase<ParamT> *callback = Observable<ParamT>::MakeCallback(fn); | |
117 | 2349 | RegisterListener(callback); | |
118 | 2349 | return callback; | |
119 | } | ||
120 | |||
121 | |||
122 | template<typename ParamT> | ||
123 | 18028039 | void Observable<ParamT>::RegisterListener( | |
124 | Observable<ParamT>::CallbackPtr callback_object) { | ||
125 | // register a generic CallbackBase callback | ||
126 | 18028039 | WriteLockGuard guard(listeners_rw_lock_); | |
127 |
1/2✓ Branch 1 taken 9014809 times.
✗ Branch 2 not taken.
|
18028039 | listeners_.insert(callback_object); |
128 | 18028039 | } | |
129 | |||
130 | |||
131 | template<typename ParamT> | ||
132 | 1866 | void Observable<ParamT>::UnregisterListener( | |
133 | typename Observable<ParamT>::CallbackPtr callback_object) { | ||
134 | // remove a callback handle from the callbacks list | ||
135 | // if it is not registered --> crash | ||
136 | 1866 | WriteLockGuard guard(listeners_rw_lock_); | |
137 |
1/2✓ Branch 1 taken 1866 times.
✗ Branch 2 not taken.
|
1866 | const size_t was_removed = listeners_.erase(callback_object); |
138 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1866 times.
|
1866 | assert(was_removed > 0); |
139 |
1/2✓ Branch 0 taken 1866 times.
✗ Branch 1 not taken.
|
1866 | delete callback_object; |
140 | 1866 | } | |
141 | |||
142 | |||
143 | template<typename ParamT> | ||
144 | 36091826 | void Observable<ParamT>::UnregisterListeners() { | |
145 | 36091826 | WriteLockGuard guard(listeners_rw_lock_); | |
146 | |||
147 | // remove all callbacks from the list | ||
148 | 36094864 | typename Callbacks::const_iterator i = listeners_.begin(); | |
149 | 36089964 | typename Callbacks::const_iterator iend = listeners_.end(); | |
150 |
2/2✓ Branch 2 taken 9010288 times.
✓ Branch 3 taken 18048530 times.
|
54100551 | for (; i != iend; ++i) { |
151 |
1/2✓ Branch 1 taken 9009455 times.
✗ Branch 2 not taken.
|
18008235 | delete *i; |
152 | } | ||
153 | 36083594 | listeners_.clear(); | |
154 | 36087514 | } | |
155 | |||
156 | |||
157 | template<typename ParamT> | ||
158 | 55855309 | void Observable<ParamT>::NotifyListeners(const ParamT ¶meter) { | |
159 | 55855309 | ReadLockGuard guard(listeners_rw_lock_); | |
160 | |||
161 | // invoke all callbacks and inform them about new data | ||
162 | 55855279 | typename Callbacks::const_iterator i = listeners_.begin(); | |
163 | 55852953 | typename Callbacks::const_iterator iend = listeners_.end(); | |
164 |
2/2✓ Branch 2 taken 28271112 times.
✓ Branch 3 taken 28266850 times.
|
111701632 | for (; i != iend; ++i) { |
165 |
1/2✓ Branch 2 taken 28272086 times.
✗ Branch 3 not taken.
|
55848789 | (**i)(parameter); |
166 | } | ||
167 | 55840265 | } | |
168 | |||
169 | |||
170 | // | ||
171 | // +---------------------------------------------------------------------------- | ||
172 | // | FifoChannel | ||
173 | // | ||
174 | |||
175 | |||
176 | template<class T> | ||
177 | 117 | FifoChannel<T>::FifoChannel(const size_t maximal_length, | |
178 | const size_t drainout_threshold) | ||
179 | 117 | : maximal_queue_length_(maximal_length) | |
180 | 117 | , queue_drainout_threshold_(drainout_threshold) { | |
181 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 117 times.
|
117 | assert(drainout_threshold <= maximal_length); |
182 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 117 times.
|
117 | assert(drainout_threshold > 0); |
183 | |||
184 | 117 | const bool successful = (pthread_mutex_init(&mutex_, NULL) == 0 | |
185 |
1/2✓ Branch 1 taken 117 times.
✗ Branch 2 not taken.
|
117 | && pthread_cond_init(&queue_is_not_empty_, NULL) == 0 |
186 |
2/4✓ Branch 0 taken 117 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 117 times.
✗ Branch 4 not taken.
|
234 | && pthread_cond_init(&queue_is_not_full_, NULL) |
187 | == 0); | ||
188 | |||
189 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 117 times.
|
117 | assert(successful); |
190 | 117 | } | |
191 | |||
192 | |||
193 | template<class T> | ||
194 | 346 | FifoChannel<T>::~FifoChannel() { | |
195 | 234 | pthread_cond_destroy(&queue_is_not_empty_); | |
196 | 234 | pthread_cond_destroy(&queue_is_not_full_); | |
197 | 234 | pthread_mutex_destroy(&mutex_); | |
198 | 580 | } | |
199 | |||
200 | |||
201 | template<class T> | ||
202 | 20004237 | void FifoChannel<T>::Enqueue(const T &data) { | |
203 | 20004237 | MutexLockGuard lock(mutex_); | |
204 | |||
205 | // wait for space in the queue | ||
206 |
2/2✓ Branch 1 taken 20 times.
✓ Branch 2 taken 20004237 times.
|
20004257 | while (this->size() >= maximal_queue_length_) { |
207 |
1/2✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
|
20 | pthread_cond_wait(&queue_is_not_full_, &mutex_); |
208 | } | ||
209 | |||
210 | // put something into the queue | ||
211 |
1/2✓ Branch 1 taken 20004237 times.
✗ Branch 2 not taken.
|
20004237 | this->push(data); |
212 | |||
213 | // wake all waiting threads | ||
214 | 20004237 | pthread_cond_broadcast(&queue_is_not_empty_); | |
215 | 20004237 | } | |
216 | |||
217 | |||
218 | template<class T> | ||
219 | 19723977 | const T FifoChannel<T>::Dequeue() { | |
220 | 19723977 | MutexLockGuard lock(mutex_); | |
221 | |||
222 | // wait until there is something to do | ||
223 |
2/2✓ Branch 1 taken 58677729 times.
✓ Branch 2 taken 20002257 times.
|
78679986 | while (this->empty()) { |
224 |
1/2✓ Branch 1 taken 58677729 times.
✗ Branch 2 not taken.
|
58677729 | pthread_cond_wait(&queue_is_not_empty_, &mutex_); |
225 | } | ||
226 | |||
227 | // get the item from the queue | ||
228 | 20002257 | T data = this->front(); | |
229 | 20002257 | this->pop(); | |
230 | |||
231 | // signal waiting threads about the free space | ||
232 |
2/2✓ Branch 1 taken 19995937 times.
✓ Branch 2 taken 6320 times.
|
20002257 | if (this->size() < queue_drainout_threshold_) { |
233 | 19995937 | pthread_cond_broadcast(&queue_is_not_full_); | |
234 | } | ||
235 | |||
236 | // return the acquired job | ||
237 | 19777277 | return data; | |
238 | 20002257 | } | |
239 | |||
240 | |||
241 | template<class T> | ||
242 | 20 | unsigned int FifoChannel<T>::Drop() { | |
243 | 20 | MutexLockGuard lock(mutex_); | |
244 | |||
245 | 20 | unsigned int dropped_items = 0; | |
246 |
2/2✓ Branch 1 taken 1980 times.
✓ Branch 2 taken 20 times.
|
2000 | while (!this->empty()) { |
247 | 1980 | this->pop(); | |
248 | 1980 | ++dropped_items; | |
249 | } | ||
250 | |||
251 | 20 | pthread_cond_broadcast(&queue_is_not_full_); | |
252 | |||
253 | 40 | return dropped_items; | |
254 | 20 | } | |
255 | |||
256 | |||
257 | template<class T> | ||
258 | 40 | size_t FifoChannel<T>::GetItemCount() const { | |
259 | 40 | MutexLockGuard lock(mutex_); | |
260 | 40 | return this->size(); | |
261 | 40 | } | |
262 | |||
263 | |||
264 | template<class T> | ||
265 | 100 | bool FifoChannel<T>::IsEmpty() const { | |
266 | 100 | MutexLockGuard lock(mutex_); | |
267 | 100 | return this->empty(); | |
268 | 100 | } | |
269 | |||
270 | |||
271 | template<class T> | ||
272 | 20 | size_t FifoChannel<T>::GetMaximalItemCount() const { | |
273 | 20 | return maximal_queue_length_; | |
274 | } | ||
275 | |||
276 | |||
277 | // | ||
278 | // +---------------------------------------------------------------------------- | ||
279 | // | ConcurrentWorkers | ||
280 | // | ||
281 | |||
282 | |||
283 | template<class WorkerT> | ||
284 | ✗ | ConcurrentWorkers<WorkerT>::ConcurrentWorkers( | |
285 | const size_t number_of_workers, | ||
286 | const size_t maximal_queue_length, | ||
287 | ConcurrentWorkers<WorkerT>::worker_context_t *worker_context) | ||
288 | ✗ | : number_of_workers_(number_of_workers) | |
289 | ✗ | , worker_context_(worker_context) | |
290 | ✗ | , thread_context_(this, worker_context_) | |
291 | ✗ | , initialized_(false) | |
292 | ✗ | , running_(false) | |
293 | ✗ | , workers_started_(0) | |
294 | ✗ | , jobs_queue_(maximal_queue_length, maximal_queue_length / 4 + 1) | |
295 | ✗ | , results_queue_(maximal_queue_length, 1) { | |
296 | ✗ | assert(maximal_queue_length >= number_of_workers); | |
297 | ✗ | assert(number_of_workers > 0); | |
298 | |||
299 | ✗ | atomic_init32(&jobs_pending_); | |
300 | ✗ | atomic_init32(&jobs_failed_); | |
301 | ✗ | atomic_init64(&jobs_processed_); | |
302 | } | ||
303 | |||
304 | |||
305 | template<class WorkerT> | ||
306 | ✗ | ConcurrentWorkers<WorkerT>::~ConcurrentWorkers() { | |
307 | ✗ | if (IsRunning()) { | |
308 | ✗ | Terminate(); | |
309 | } | ||
310 | |||
311 | // destroy some synchronisation data structures | ||
312 | ✗ | pthread_cond_destroy(&worker_started_); | |
313 | ✗ | pthread_cond_destroy(&jobs_all_done_); | |
314 | ✗ | pthread_mutex_destroy(&status_mutex_); | |
315 | ✗ | pthread_mutex_destroy(&jobs_all_done_mutex_); | |
316 | } | ||
317 | |||
318 | |||
319 | template<class WorkerT> | ||
320 | ✗ | bool ConcurrentWorkers<WorkerT>::Initialize() { | |
321 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, | |
322 | "Initializing ConcurrentWorker " | ||
323 | "object with %lu worker threads " | ||
324 | "and a queue length of %zu", | ||
325 | number_of_workers_, jobs_queue_.GetMaximalItemCount()); | ||
326 | // LogCvmfs(kLogConcurrency, kLogStdout, "sizeof(expected_data_t): %d\n" | ||
327 | // "sizeof(returned_data_t): %d", | ||
328 | // sizeof(expected_data_t), sizeof(returned_data_t)); | ||
329 | |||
330 | // initialize synchronisation for job queue (Workers) | ||
331 | ✗ | if (pthread_mutex_init(&status_mutex_, NULL) != 0 | |
332 | ✗ | || pthread_mutex_init(&jobs_all_done_mutex_, NULL) != 0 | |
333 | ✗ | || pthread_cond_init(&worker_started_, NULL) != 0 | |
334 | ✗ | || pthread_cond_init(&jobs_all_done_, NULL) != 0) { | |
335 | ✗ | return false; | |
336 | } | ||
337 | |||
338 | // spawn the Worker objects in their own threads | ||
339 | ✗ | if (!SpawnWorkers()) { | |
340 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn workers"); | |
341 | ✗ | return false; | |
342 | } | ||
343 | |||
344 | // all done... | ||
345 | ✗ | initialized_ = true; | |
346 | ✗ | return true; | |
347 | } | ||
348 | |||
349 | |||
350 | template<class WorkerT> | ||
351 | ✗ | bool ConcurrentWorkers<WorkerT>::SpawnWorkers() { | |
352 | ✗ | assert(worker_threads_.size() == 0); | |
353 | ✗ | worker_threads_.resize(number_of_workers_); | |
354 | |||
355 | // set the running flag to trap workers in their treadmills | ||
356 | ✗ | StartRunning(); | |
357 | |||
358 | // spawn the swarm and make them work | ||
359 | ✗ | bool success = true; | |
360 | ✗ | WorkerThreads::iterator i = worker_threads_.begin(); | |
361 | ✗ | WorkerThreads::const_iterator iend = worker_threads_.end(); | |
362 | ✗ | for (; i != iend; ++i) { | |
363 | ✗ | pthread_t *thread = &(*i); | |
364 | ✗ | const int retval = pthread_create( | |
365 | thread, | ||
366 | NULL, | ||
367 | &ConcurrentWorkers<WorkerT>::RunWorker, | ||
368 | ✗ | reinterpret_cast<void *>(&thread_context_)); | |
369 | ✗ | if (retval != 0) { | |
370 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn a Worker"); | |
371 | ✗ | success = false; | |
372 | } | ||
373 | } | ||
374 | |||
375 | // spawn the callback processing thread | ||
376 | ✗ | const int retval = pthread_create( | |
377 | &callback_thread_, | ||
378 | NULL, | ||
379 | &ConcurrentWorkers<WorkerT>::RunCallbackThreadWrapper, | ||
380 | ✗ | reinterpret_cast<void *>(&thread_context_)); | |
381 | ✗ | if (retval != 0) { | |
382 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, | |
383 | "Failed to spawn the callback " | ||
384 | "worker thread"); | ||
385 | ✗ | success = false; | |
386 | } | ||
387 | |||
388 | // wait for all workers to report in... | ||
389 | { | ||
390 | ✗ | MutexLockGuard guard(status_mutex_); | |
391 | // +1 -> callback thread | ||
392 | ✗ | while (workers_started_ < number_of_workers_ + 1) { | |
393 | ✗ | pthread_cond_wait(&worker_started_, &status_mutex_); | |
394 | } | ||
395 | } | ||
396 | |||
397 | // all done... | ||
398 | ✗ | return success; | |
399 | } | ||
400 | |||
401 | |||
402 | template<class WorkerT> | ||
403 | ✗ | void *ConcurrentWorkers<WorkerT>::RunWorker(void *run_binding) { | |
404 | // NOTE: This is the actual worker thread code! | ||
405 | |||
406 | // | ||
407 | // INITIALIZATION | ||
408 | ///////////////// | ||
409 | |||
410 | // get contextual information | ||
411 | ✗ | const WorkerRunBinding &binding = *( | |
412 | static_cast<WorkerRunBinding *>(run_binding)); | ||
413 | ✗ | ConcurrentWorkers<WorkerT> *master = binding.delegate; | |
414 | ✗ | const worker_context_t *worker_context = binding.worker_context; | |
415 | |||
416 | // boot up the worker object and make sure it works | ||
417 | ✗ | WorkerT worker(worker_context); | |
418 | ✗ | worker.RegisterMaster(master); | |
419 | ✗ | const bool init_success = worker.Initialize(); | |
420 | |||
421 | // tell the master that this worker was started | ||
422 | ✗ | master->ReportStartedWorker(); | |
423 | |||
424 | ✗ | if (!init_success) { | |
425 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, | |
426 | "Worker was not initialized " | ||
427 | "properly... it will die now!"); | ||
428 | ✗ | return NULL; | |
429 | } | ||
430 | |||
431 | // | ||
432 | // PROCESSING LOOP | ||
433 | ////////////////// | ||
434 | |||
435 | // start the processing loop | ||
436 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Starting Worker..."); | |
437 | ✗ | while (master->IsRunning()) { | |
438 | // acquire a new job | ||
439 | ✗ | WorkerJob job = master->Acquire(); | |
440 | |||
441 | // check if we need to terminate | ||
442 | ✗ | if (job.is_death_sentence) | |
443 | ✗ | break; | |
444 | |||
445 | // do what you are supposed to do | ||
446 | ✗ | worker(job.data); | |
447 | } | ||
448 | |||
449 | // | ||
450 | // TEAR DOWN | ||
451 | //////////// | ||
452 | |||
453 | // give the worker the chance to tidy up | ||
454 | ✗ | worker.TearDown(); | |
455 | |||
456 | // good bye thread... | ||
457 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Terminating Worker..."); | |
458 | ✗ | return NULL; | |
459 | } | ||
460 | |||
461 | |||
462 | template<class WorkerT> | ||
463 | ✗ | void *ConcurrentWorkers<WorkerT>::RunCallbackThreadWrapper(void *run_binding) { | |
464 | ✗ | const RunBinding &binding = *(static_cast<RunBinding *>(run_binding)); | |
465 | ✗ | ConcurrentWorkers<WorkerT> *master = binding.delegate; | |
466 | |||
467 | ✗ | master->ReportStartedWorker(); | |
468 | |||
469 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, | |
470 | "Started dedicated callback worker"); | ||
471 | ✗ | master->RunCallbackThread(); | |
472 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Terminating Callback Worker..."); | |
473 | |||
474 | ✗ | return NULL; | |
475 | } | ||
476 | |||
477 | |||
478 | template<class WorkerT> | ||
479 | ✗ | void ConcurrentWorkers<WorkerT>::RunCallbackThread() { | |
480 | ✗ | while (IsRunning()) { | |
481 | ✗ | const CallbackJob callback_job = results_queue_.Dequeue(); | |
482 | |||
483 | // stop callback processing if needed | ||
484 | ✗ | if (callback_job.is_death_sentence) { | |
485 | ✗ | break; | |
486 | } | ||
487 | |||
488 | // notify all observers about the finished job | ||
489 | ✗ | this->NotifyListeners(callback_job.data); | |
490 | |||
491 | // remove the job from the pending 'list' and add it to the ready 'list' | ||
492 | ✗ | atomic_dec32(&jobs_pending_); | |
493 | ✗ | atomic_inc64(&jobs_processed_); | |
494 | |||
495 | // signal the Spooler that all jobs are done... | ||
496 | ✗ | if (atomic_read32(&jobs_pending_) == 0) { | |
497 | ✗ | pthread_cond_broadcast(&jobs_all_done_); | |
498 | } | ||
499 | } | ||
500 | } | ||
501 | |||
502 | |||
503 | template<class WorkerT> | ||
504 | ✗ | void ConcurrentWorkers<WorkerT>::ReportStartedWorker() const { | |
505 | ✗ | MutexLockGuard lock(status_mutex_); | |
506 | ✗ | ++workers_started_; | |
507 | ✗ | pthread_cond_signal(&worker_started_); | |
508 | } | ||
509 | |||
510 | |||
511 | template<class WorkerT> | ||
512 | ✗ | void ConcurrentWorkers<WorkerT>::Schedule(WorkerJob job) { | |
513 | // Note: This method can be called from arbitrary threads. Thus we do not | ||
514 | // necessarily have just one producer in the system. | ||
515 | |||
516 | // check if it makes sense to schedule this job | ||
517 | ✗ | if (!IsRunning() && !job.is_death_sentence) { | |
518 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, | |
519 | "Tried to schedule a job but " | ||
520 | "concurrency was not running..."); | ||
521 | ✗ | return; | |
522 | } | ||
523 | |||
524 | ✗ | jobs_queue_.Enqueue(job); | |
525 | ✗ | if (!job.is_death_sentence) { | |
526 | ✗ | atomic_inc32(&jobs_pending_); | |
527 | } | ||
528 | } | ||
529 | |||
530 | |||
531 | template<class WorkerT> | ||
532 | ✗ | void ConcurrentWorkers<WorkerT>::ScheduleDeathSentences() { | |
533 | ✗ | assert(!IsRunning()); | |
534 | |||
535 | // make sure that the queue is empty before we schedule a death sentence | ||
536 | ✗ | TruncateJobQueue(); | |
537 | |||
538 | // schedule a death sentence for each running thread | ||
539 | ✗ | const unsigned int number_of_workers = GetNumberOfWorkers(); | |
540 | ✗ | for (unsigned int i = 0; i < number_of_workers; ++i) { | |
541 | ✗ | Schedule(WorkerJob()); | |
542 | } | ||
543 | |||
544 | // schedule a death sentence for the callback thread | ||
545 | ✗ | results_queue_.Enqueue(CallbackJob()); | |
546 | } | ||
547 | |||
548 | |||
549 | template<class WorkerT> | ||
550 | typename ConcurrentWorkers<WorkerT>::WorkerJob | ||
551 | ✗ | ConcurrentWorkers<WorkerT>::Acquire() { | |
552 | // Note: This method is exclusively called inside the worker threads! | ||
553 | // Any other usage might produce undefined behavior. | ||
554 | ✗ | return jobs_queue_.Dequeue(); | |
555 | } | ||
556 | |||
557 | |||
558 | template<class WorkerT> | ||
559 | ✗ | void ConcurrentWorkers<WorkerT>::TruncateJobQueue(const bool forget_pending) { | |
560 | // Note: This method will throw away all jobs currently waiting in the job | ||
561 | // queue. These jobs will not be processed! | ||
562 | ✗ | const unsigned int dropped_jobs = jobs_queue_.Drop(); | |
563 | |||
564 | // if desired, we remove the jobs from the pending 'list' | ||
565 | ✗ | if (forget_pending) { | |
566 | ✗ | atomic_xadd32(&jobs_pending_, -dropped_jobs); | |
567 | } | ||
568 | } | ||
569 | |||
570 | |||
571 | template<class WorkerT> | ||
572 | ✗ | void ConcurrentWorkers<WorkerT>::Terminate() { | |
573 | // Note: this method causes workers to die immediately after they finished | ||
574 | // their last acquired job. To make sure that each worker will check | ||
575 | // the running state, we schedule empty jobs or Death Sentences. | ||
576 | |||
577 | ✗ | assert(IsRunning()); | |
578 | |||
579 | // unset the running flag (causing threads to die on the next checkpoint) | ||
580 | ✗ | StopRunning(); | |
581 | |||
582 | // schedule empty jobs to make sure that each worker will actually reach the | ||
583 | // next checkpoint in their processing loop and terminate as expected | ||
584 | ✗ | ScheduleDeathSentences(); | |
585 | |||
586 | // wait for the worker threads to return | ||
587 | ✗ | WorkerThreads::const_iterator i = worker_threads_.begin(); | |
588 | ✗ | WorkerThreads::const_iterator iend = worker_threads_.end(); | |
589 | ✗ | for (; i != iend; ++i) { | |
590 | ✗ | pthread_join(*i, NULL); | |
591 | } | ||
592 | |||
593 | // wait for the callback worker thread | ||
594 | ✗ | pthread_join(callback_thread_, NULL); | |
595 | |||
596 | // check if we finished all pending jobs | ||
597 | ✗ | const int pending = atomic_read32(&jobs_pending_); | |
598 | ✗ | if (pending > 0) { | |
599 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, | |
600 | "Job queue was not fully processed. " | ||
601 | "Still %d jobs were pending and " | ||
602 | "will not be executed anymore.", | ||
603 | pending); | ||
604 | } | ||
605 | |||
606 | // check if we had failed jobs | ||
607 | ✗ | const int failed = atomic_read32(&jobs_failed_); | |
608 | ✗ | if (failed > 0) { | |
609 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, "We've had %d failed jobs.", failed); | |
610 | } | ||
611 | |||
612 | // thanks, and good bye... | ||
613 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, | |
614 | "All workers stopped. They processed %ld jobs. Terminating...", | ||
615 | atomic_read64(&jobs_processed_)); | ||
616 | } | ||
617 | |||
618 | |||
619 | template<class WorkerT> | ||
620 | ✗ | void ConcurrentWorkers<WorkerT>::WaitForEmptyQueue() const { | |
621 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, | |
622 | "Waiting for %d jobs to be finished", atomic_read32(&jobs_pending_)); | ||
623 | |||
624 | // wait until all pending jobs are processed | ||
625 | { | ||
626 | ✗ | MutexLockGuard lock(jobs_all_done_mutex_); | |
627 | ✗ | while (atomic_read32(&jobs_pending_) > 0) { | |
628 | ✗ | pthread_cond_wait(&jobs_all_done_, &jobs_all_done_mutex_); | |
629 | } | ||
630 | } | ||
631 | |||
632 | ✗ | LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Jobs are done... go on"); | |
633 | } | ||
634 | |||
635 | |||
636 | template<class WorkerT> | ||
637 | void ConcurrentWorkers<WorkerT>::WaitForTermination() { | ||
638 | if (!IsRunning()) | ||
639 | return; | ||
640 | |||
641 | WaitForEmptyQueue(); | ||
642 | Terminate(); | ||
643 | } | ||
644 | |||
645 | |||
646 | template<class WorkerT> | ||
647 | ✗ | void ConcurrentWorkers<WorkerT>::JobDone( | |
648 | const ConcurrentWorkers<WorkerT>::returned_data_t &data, | ||
649 | const bool success) { | ||
650 | // BEWARE! | ||
651 | // This is a callback method that might be called from a different thread! | ||
652 | |||
653 | // check if the finished job was successful | ||
654 | ✗ | if (!success) { | |
655 | ✗ | atomic_inc32(&jobs_failed_); | |
656 | ✗ | LogCvmfs(kLogConcurrency, kLogWarning, "Job failed"); | |
657 | } | ||
658 | |||
659 | // queue the result in the callback channel | ||
660 | ✗ | results_queue_.Enqueue(CallbackJob(data)); | |
661 | } | ||
662 | |||
663 | #ifdef CVMFS_NAMESPACE_GUARD | ||
664 | } // namespace CVMFS_NAMESPACE_GUARD | ||
665 | #endif | ||
666 | |||
667 | #endif // CVMFS_UTIL_CONCURRENCY_IMPL_H_ | ||
668 |