GCC Code Coverage Report | |||||||||||||||||||||
|
|||||||||||||||||||||
Line | Branch | Exec | Source |
1 |
/** |
||
2 |
* This file is part of the CernVM File System. |
||
3 |
*/ |
||
4 |
|||
5 |
#ifndef CVMFS_UTIL_CONCURRENCY_IMPL_H_ |
||
6 |
#define CVMFS_UTIL_CONCURRENCY_IMPL_H_ |
||
7 |
|||
8 |
#include "logging.h" |
||
9 |
|||
10 |
#ifdef CVMFS_NAMESPACE_GUARD |
||
11 |
namespace CVMFS_NAMESPACE_GUARD { |
||
12 |
#endif |
||
13 |
|||
14 |
// |
||
15 |
// +---------------------------------------------------------------------------- |
||
16 |
// | Future |
||
17 |
// |
||
18 |
|||
19 |
|||
20 |
template <typename T> |
||
21 |
39 |
Future<T>::Future() : object_(), object_was_set_(false) { |
|
22 |
const bool init_successful = (pthread_mutex_init(&mutex_, NULL) == 0 && |
||
23 |
✓✗✓✗ ✗✗✗✗ ✗✗✗✗ ✗✗✗✗ |
39 |
pthread_cond_init(&object_set_, NULL) == 0); |
24 |
✗✓✗✗ ✗✗✗✗ |
39 |
assert(init_successful); |
25 |
39 |
} |
|
26 |
|||
27 |
|||
28 |
template <typename T> |
||
29 |
70 |
Future<T>::~Future() { |
|
30 |
39 |
pthread_cond_destroy(&object_set_); |
|
31 |
✗✓✗✗ ✗✗✗✗ |
39 |
pthread_mutex_destroy(&mutex_); |
32 |
109 |
} |
|
33 |
|||
34 |
|||
35 |
template <typename T> |
||
36 |
39 |
void Future<T>::Set(const T &object) { |
|
37 |
39 |
MutexLockGuard guard(mutex_); |
|
38 |
✗✓✗✗ ✗✗✗✗ |
39 |
assert(!object_was_set_); |
39 |
✗✗✗ | 39 |
object_ = object; |
40 |
39 |
object_was_set_ = true; |
|
41 |
39 |
pthread_cond_broadcast(&object_set_); |
|
42 |
39 |
} |
|
43 |
|||
44 |
|||
45 |
template <typename T> |
||
46 |
39 |
void Future<T>::Wait() const { |
|
47 |
39 |
MutexLockGuard guard(mutex_); |
|
48 |
✓✓✗✗ ✗✗✗✗ |
39 |
if (!object_was_set_) { |
49 |
12 |
pthread_cond_wait(&object_set_, &mutex_); |
|
50 |
} |
||
51 |
✗✓✗✗ ✗✗✗✗ |
39 |
assert(object_was_set_); |
52 |
39 |
} |
|
53 |
|||
54 |
|||
55 |
template <typename T> |
||
56 |
39 |
T& Future<T>::Get() { |
|
57 |
39 |
Wait(); |
|
58 |
39 |
return object_; |
|
59 |
} |
||
60 |
|||
61 |
|||
62 |
template <typename T> |
||
63 |
const T& Future<T>::Get() const { |
||
64 |
Wait(); |
||
65 |
return object_; |
||
66 |
} |
||
67 |
|||
68 |
|||
69 |
// |
||
70 |
// +---------------------------------------------------------------------------- |
||
71 |
// | SynchronizingCounter |
||
72 |
// |
||
73 |
|||
74 |
|||
75 |
template <typename T> |
||
76 |
145711341 |
void SynchronizingCounter<T>::SetValueUnprotected(const T new_value) { |
|
77 |
// make sure that 0 <= new_value <= maximal_value_ if maximal_value_ != 0 |
||
78 |
✓✓✓✓ ✗✓ |
145711341 |
assert(!HasMaximalValue() || |
79 |
(new_value >= T(0) && new_value <= maximal_value_)); |
||
80 |
|||
81 |
145711341 |
value_ = new_value; |
|
82 |
|||
83 |
✓✓ | 145711341 |
if (value_ == T(0)) { |
84 |
3483 |
pthread_cond_broadcast(&became_zero_); |
|
85 |
} |
||
86 |
|||
87 |
✓✓✓✓ ✓✓ |
145711341 |
if (HasMaximalValue() && value_ < maximal_value_) { |
88 |
1052233 |
pthread_cond_broadcast(&free_slot_); |
|
89 |
} |
||
90 |
145711341 |
} |
|
91 |
|||
92 |
|||
93 |
template <typename T> |
||
94 |
72855615 |
void SynchronizingCounter<T>::WaitForFreeSlotUnprotected() { |
|
95 |
✓✓✓✓ ✓✓ |
145844108 |
while (HasMaximalValue() && value_ >= maximal_value_) { |
96 |
132878 |
pthread_cond_wait(&free_slot_, &mutex_); |
|
97 |
} |
||
98 |
✓✓✗✓ |
72855615 |
assert(!HasMaximalValue() || value_ < maximal_value_); |
99 |
72855615 |
} |
|
100 |
|||
101 |
|||
102 |
template <typename T> |
||
103 |
346 |
void SynchronizingCounter<T>::Initialize() { |
|
104 |
const bool init_successful = ( |
||
105 |
pthread_mutex_init(&mutex_, NULL) == 0 && |
||
106 |
pthread_cond_init(&became_zero_, NULL) == 0 && |
||
107 |
✓✗✓✗ ✓✗ |
346 |
pthread_cond_init(&free_slot_, NULL) == 0); |
108 |
✗✓ | 346 |
assert(init_successful); |
109 |
346 |
} |
|
110 |
|||
111 |
|||
112 |
template <typename T> |
||
113 |
346 |
void SynchronizingCounter<T>::Destroy() { |
|
114 |
346 |
pthread_mutex_destroy(&mutex_); |
|
115 |
346 |
pthread_cond_destroy(&became_zero_); |
|
116 |
346 |
pthread_cond_destroy(&free_slot_); |
|
117 |
346 |
} |
|
118 |
|||
119 |
|||
120 |
// |
||
121 |
// +---------------------------------------------------------------------------- |
||
122 |
// | Observable |
||
123 |
// |
||
124 |
|||
125 |
|||
126 |
template <typename ParamT> |
||
127 |
426 |
Observable<ParamT>::Observable() { |
|
128 |
426 |
const int ret = pthread_rwlock_init(&listeners_rw_lock_, NULL); |
|
129 |
✗✓✗✓ |
426 |
assert(ret == 0); |
130 |
426 |
} |
|
131 |
|||
132 |
|||
133 |
template <typename ParamT> |
||
134 |
426 |
Observable<ParamT>::~Observable() { |
|
135 |
426 |
UnregisterListeners(); |
|
136 |
✗✗✓✗ ✗✗✓ |
426 |
pthread_rwlock_destroy(&listeners_rw_lock_); |
137 |
852 |
} |
|
138 |
|||
139 |
|||
140 |
template <typename ParamT> |
||
141 |
template <class DelegateT, class ClosureDataT> |
||
142 |
10 |
typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener( |
|
143 |
typename BoundClosure<ParamT, |
||
144 |
DelegateT, |
||
145 |
ClosureDataT>::CallbackMethod method, |
||
146 |
DelegateT *delegate, |
||
147 |
ClosureDataT data) { |
||
148 |
// create a new BoundClosure, register it and return the handle |
||
149 |
CallbackBase<ParamT> *callback = |
||
150 |
10 |
Observable<ParamT>::MakeClosure(method, delegate, data); |
|
151 |
10 |
RegisterListener(callback); |
|
152 |
10 |
return callback; |
|
153 |
} |
||
154 |
|||
155 |
|||
156 |
template <typename ParamT> |
||
157 |
template <class DelegateT> |
||
158 |
228 |
typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener( |
|
159 |
typename BoundCallback<ParamT, DelegateT>::CallbackMethod method, |
||
160 |
DelegateT *delegate) { |
||
161 |
// create a new BoundCallback, register it and return the handle |
||
162 |
CallbackBase<ParamT> *callback = |
||
163 |
228 |
Observable<ParamT>::MakeCallback(method, delegate); |
|
164 |
228 |
RegisterListener(callback); |
|
165 |
228 |
return callback; |
|
166 |
} |
||
167 |
|||
168 |
|||
169 |
template <typename ParamT> |
||
170 |
138 |
typename Observable<ParamT>::CallbackPtr Observable<ParamT>::RegisterListener( |
|
171 |
typename Callback<ParamT>::CallbackFunction fn) { |
||
172 |
// create a new Callback, register it and return the handle |
||
173 |
CallbackBase<ParamT> *callback = |
||
174 |
138 |
Observable<ParamT>::MakeCallback(fn); |
|
175 |
138 |
RegisterListener(callback); |
|
176 |
138 |
return callback; |
|
177 |
} |
||
178 |
|||
179 |
|||
180 |
template <typename ParamT> |
||
181 |
376 |
void Observable<ParamT>::RegisterListener( |
|
182 |
Observable<ParamT>::CallbackPtr callback_object) { |
||
183 |
// register a generic CallbackBase callback |
||
184 |
376 |
WriteLockGuard guard(listeners_rw_lock_); |
|
185 |
376 |
listeners_.insert(callback_object); |
|
186 |
376 |
} |
|
187 |
|||
188 |
|||
189 |
template <typename ParamT> |
||
190 |
45 |
void Observable<ParamT>::UnregisterListener( |
|
191 |
typename Observable<ParamT>::CallbackPtr callback_object) { |
||
192 |
// remove a callback handle from the callbacks list |
||
193 |
// if it is not registered --> crash |
||
194 |
45 |
WriteLockGuard guard(listeners_rw_lock_); |
|
195 |
45 |
const size_t was_removed = listeners_.erase(callback_object); |
|
196 |
✗✓ | 45 |
assert(was_removed > 0); |
197 |
✓✗ | 45 |
delete callback_object; |
198 |
45 |
} |
|
199 |
|||
200 |
|||
201 |
template <typename ParamT> |
||
202 |
437 |
void Observable<ParamT>::UnregisterListeners() { |
|
203 |
437 |
WriteLockGuard guard(listeners_rw_lock_); |
|
204 |
|||
205 |
// remove all callbacks from the list |
||
206 |
437 |
typename Callbacks::const_iterator i = listeners_.begin(); |
|
207 |
437 |
typename Callbacks::const_iterator iend = listeners_.end(); |
|
208 |
✓✓✗✗ ✗✗ |
768 |
for (; i != iend; ++i) { |
209 |
✓✗✗✗ ✗✗ |
331 |
delete *i; |
210 |
} |
||
211 |
437 |
listeners_.clear(); |
|
212 |
437 |
} |
|
213 |
|||
214 |
|||
215 |
template <typename ParamT> |
||
216 |
38734 |
void Observable<ParamT>::NotifyListeners(const ParamT ¶meter) { |
|
217 |
38734 |
ReadLockGuard guard(listeners_rw_lock_); |
|
218 |
|||
219 |
// invoke all callbacks and inform them about new data |
||
220 |
38734 |
typename Callbacks::const_iterator i = listeners_.begin(); |
|
221 |
38734 |
typename Callbacks::const_iterator iend = listeners_.end(); |
|
222 |
✓✓✓✓ |
77327 |
for (; i != iend; ++i) { |
223 |
38593 |
(**i)(parameter); |
|
224 |
} |
||
225 |
38734 |
} |
|
226 |
|||
227 |
|||
228 |
// |
||
229 |
// +---------------------------------------------------------------------------- |
||
230 |
// | FifoChannel |
||
231 |
// |
||
232 |
|||
233 |
|||
234 |
template <class T> |
||
235 |
43 |
FifoChannel<T>::FifoChannel(const size_t maximal_length, |
|
236 |
const size_t drainout_threshold) : |
||
237 |
maximal_queue_length_(maximal_length), |
||
238 |
43 |
queue_drainout_threshold_(drainout_threshold) |
|
239 |
{ |
||
240 |
✗✓✗✓ ✗✗✗✗ |
43 |
assert(drainout_threshold <= maximal_length); |
241 |
✗✓✗✓ ✗✗✗✗ |
43 |
assert(drainout_threshold > 0); |
242 |
|||
243 |
const bool successful = ( |
||
244 |
pthread_mutex_init(&mutex_, NULL) == 0 && |
||
245 |
pthread_cond_init(&queue_is_not_empty_, NULL) == 0 && |
||
246 |
✓✗✓✗ ✓✗✓✗ ✓✗✓✗ ✗✗✗✗ ✗✗✗✗ ✗✗✗✗ |
43 |
pthread_cond_init(&queue_is_not_full_, NULL) == 0); |
247 |
|||
248 |
✗✓✗✓ ✗✗✗✗ |
43 |
assert(successful); |
249 |
43 |
} |
|
250 |
|||
251 |
|||
252 |
template <class T> |
||
253 |
58 |
FifoChannel<T>::~FifoChannel() { |
|
254 |
43 |
pthread_cond_destroy(&queue_is_not_empty_); |
|
255 |
43 |
pthread_cond_destroy(&queue_is_not_full_); |
|
256 |
✗✓✗✗ ✗✓✗✗ |
43 |
pthread_mutex_destroy(&mutex_); |
257 |
101 |
} |
|
258 |
|||
259 |
|||
260 |
template <class T> |
||
261 |
1000289 |
void FifoChannel<T>::Enqueue(const T &data) { |
|
262 |
1000289 |
MutexLockGuard lock(mutex_); |
|
263 |
|||
264 |
// wait for space in the queue |
||
265 |
✓✓✗✓ ✗✓✗✗ |
2003512 |
while (this->size() >= maximal_queue_length_) { |
266 |
2934 |
pthread_cond_wait(&queue_is_not_full_, &mutex_); |
|
267 |
} |
||
268 |
|||
269 |
// put something into the queue |
||
270 |
1000289 |
this->push(data); |
|
271 |
|||
272 |
// wake all waiting threads |
||
273 |
1000289 |
pthread_cond_broadcast(&queue_is_not_empty_); |
|
274 |
1000289 |
} |
|
275 |
|||
276 |
|||
277 |
template <class T> |
||
278 |
999832 |
const T FifoChannel<T>::Dequeue() { |
|
279 |
999832 |
MutexLockGuard lock(mutex_); |
|
280 |
|||
281 |
// wait until there is something to do |
||
282 |
✓✓✗✗ ✗✓✗✗ |
2162793 |
while (this->empty()) { |
283 |
162435 |
pthread_cond_wait(&queue_is_not_empty_, &mutex_); |
|
284 |
} |
||
285 |
|||
286 |
// get the item from the queue |
||
287 |
1000179 |
T data = this->front(); this->pop(); |
|
288 |
|||
289 |
// signal waiting threads about the free space |
||
290 |
✓✓✗✗ ✓✗✗✗ |
1000179 |
if (this->size() < queue_drainout_threshold_) { |
291 |
393502 |
pthread_cond_broadcast(&queue_is_not_full_); |
|
292 |
} |
||
293 |
|||
294 |
// return the acquired job |
||
295 |
1000179 |
return data; |
|
296 |
} |
||
297 |
|||
298 |
|||
299 |
template <class T> |
||
300 |
34 |
unsigned int FifoChannel<T>::Drop() { |
|
301 |
34 |
MutexLockGuard lock(mutex_); |
|
302 |
|||
303 |
34 |
unsigned int dropped_items = 0; |
|
304 |
✓✓✗✓ ✗✓✗✗ |
167 |
while (!this->empty()) { |
305 |
99 |
this->pop(); |
|
306 |
99 |
++dropped_items; |
|
307 |
} |
||
308 |
|||
309 |
34 |
pthread_cond_broadcast(&queue_is_not_full_); |
|
310 |
|||
311 |
34 |
return dropped_items; |
|
312 |
} |
||
313 |
|||
314 |
|||
315 |
template <class T> |
||
316 |
2 |
size_t FifoChannel<T>::GetItemCount() const { |
|
317 |
2 |
MutexLockGuard lock(mutex_); |
|
318 |
2 |
return this->size(); |
|
319 |
} |
||
320 |
|||
321 |
|||
322 |
template <class T> |
||
323 |
8972 |
bool FifoChannel<T>::IsEmpty() const { |
|
324 |
8972 |
MutexLockGuard lock(mutex_); |
|
325 |
8972 |
return this->empty(); |
|
326 |
} |
||
327 |
|||
328 |
|||
329 |
template <class T> |
||
330 |
1 |
size_t FifoChannel<T>::GetMaximalItemCount() const { |
|
331 |
1 |
return maximal_queue_length_; |
|
332 |
} |
||
333 |
|||
334 |
|||
335 |
// |
||
336 |
// +---------------------------------------------------------------------------- |
||
337 |
// | ConcurrentWorkers |
||
338 |
// |
||
339 |
|||
340 |
|||
341 |
template <class WorkerT> |
||
342 |
ConcurrentWorkers<WorkerT>::ConcurrentWorkers( |
||
343 |
const size_t number_of_workers, |
||
344 |
const size_t maximal_queue_length, |
||
345 |
ConcurrentWorkers<WorkerT>::worker_context_t *worker_context) : |
||
346 |
number_of_workers_(number_of_workers), |
||
347 |
worker_context_(worker_context), |
||
348 |
thread_context_(this, worker_context_), |
||
349 |
initialized_(false), |
||
350 |
running_(false), |
||
351 |
workers_started_(0), |
||
352 |
jobs_queue_(maximal_queue_length, maximal_queue_length / 4 + 1), |
||
353 |
results_queue_(maximal_queue_length, 1) |
||
354 |
{ |
||
355 |
assert(maximal_queue_length >= number_of_workers); |
||
356 |
assert(number_of_workers > 0); |
||
357 |
|||
358 |
atomic_init32(&jobs_pending_); |
||
359 |
atomic_init32(&jobs_failed_); |
||
360 |
atomic_init64(&jobs_processed_); |
||
361 |
} |
||
362 |
|||
363 |
|||
364 |
template <class WorkerT> |
||
365 |
ConcurrentWorkers<WorkerT>::~ConcurrentWorkers() { |
||
366 |
if (IsRunning()) { |
||
367 |
Terminate(); |
||
368 |
} |
||
369 |
|||
370 |
// destroy some synchronisation data structures |
||
371 |
pthread_cond_destroy(&worker_started_); |
||
372 |
pthread_cond_destroy(&jobs_all_done_); |
||
373 |
pthread_mutex_destroy(&status_mutex_); |
||
374 |
pthread_mutex_destroy(&jobs_all_done_mutex_); |
||
375 |
} |
||
376 |
|||
377 |
|||
378 |
template <class WorkerT> |
||
379 |
bool ConcurrentWorkers<WorkerT>::Initialize() { |
||
380 |
LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Initializing ConcurrentWorker " |
||
381 |
"object with %d worker threads " |
||
382 |
"and a queue length of %d", |
||
383 |
number_of_workers_, jobs_queue_.GetMaximalItemCount()); |
||
384 |
// LogCvmfs(kLogConcurrency, kLogStdout, "sizeof(expected_data_t): %d\n" |
||
385 |
// "sizeof(returned_data_t): %d", |
||
386 |
// sizeof(expected_data_t), sizeof(returned_data_t)); |
||
387 |
|||
388 |
// initialize synchronisation for job queue (Workers) |
||
389 |
if (pthread_mutex_init(&status_mutex_, NULL) != 0 || |
||
390 |
pthread_mutex_init(&jobs_all_done_mutex_, NULL) != 0 || |
||
391 |
pthread_cond_init(&worker_started_, NULL) != 0 || |
||
392 |
pthread_cond_init(&jobs_all_done_, NULL) != 0) { |
||
393 |
return false; |
||
394 |
} |
||
395 |
|||
396 |
// spawn the Worker objects in their own threads |
||
397 |
if (!SpawnWorkers()) { |
||
398 |
LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn workers"); |
||
399 |
return false; |
||
400 |
} |
||
401 |
|||
402 |
// all done... |
||
403 |
initialized_ = true; |
||
404 |
return true; |
||
405 |
} |
||
406 |
|||
407 |
|||
408 |
template <class WorkerT> |
||
409 |
bool ConcurrentWorkers<WorkerT>::SpawnWorkers() { |
||
410 |
assert(worker_threads_.size() == 0); |
||
411 |
worker_threads_.resize(number_of_workers_); |
||
412 |
|||
413 |
// set the running flag to trap workers in their treadmills |
||
414 |
StartRunning(); |
||
415 |
|||
416 |
// spawn the swarm and make them work |
||
417 |
bool success = true; |
||
418 |
WorkerThreads::iterator i = worker_threads_.begin(); |
||
419 |
WorkerThreads::const_iterator iend = worker_threads_.end(); |
||
420 |
for (; i != iend; ++i) { |
||
421 |
pthread_t* thread = &(*i); |
||
422 |
const int retval = pthread_create( |
||
423 |
thread, |
||
424 |
NULL, |
||
425 |
&ConcurrentWorkers<WorkerT>::RunWorker, |
||
426 |
reinterpret_cast<void *>(&thread_context_)); |
||
427 |
if (retval != 0) { |
||
428 |
LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn a Worker"); |
||
429 |
success = false; |
||
430 |
} |
||
431 |
} |
||
432 |
|||
433 |
// spawn the callback processing thread |
||
434 |
const int retval = |
||
435 |
pthread_create( |
||
436 |
&callback_thread_, |
||
437 |
NULL, |
||
438 |
&ConcurrentWorkers<WorkerT>::RunCallbackThreadWrapper, |
||
439 |
reinterpret_cast<void *>(&thread_context_)); |
||
440 |
if (retval != 0) { |
||
441 |
LogCvmfs(kLogConcurrency, kLogWarning, "Failed to spawn the callback " |
||
442 |
"worker thread"); |
||
443 |
success = false; |
||
444 |
} |
||
445 |
|||
446 |
// wait for all workers to report in... |
||
447 |
{ |
||
448 |
MutexLockGuard guard(status_mutex_); |
||
449 |
// +1 -> callback thread |
||
450 |
while (workers_started_ < number_of_workers_ + 1) { |
||
451 |
pthread_cond_wait(&worker_started_, &status_mutex_); |
||
452 |
} |
||
453 |
} |
||
454 |
|||
455 |
// all done... |
||
456 |
return success; |
||
457 |
} |
||
458 |
|||
459 |
|||
460 |
template <class WorkerT> |
||
461 |
void* ConcurrentWorkers<WorkerT>::RunWorker(void *run_binding) { |
||
462 |
// NOTE: This is the actual worker thread code! |
||
463 |
|||
464 |
// |
||
465 |
// INITIALIZATION |
||
466 |
///////////////// |
||
467 |
|||
468 |
// get contextual information |
||
469 |
const WorkerRunBinding &binding = |
||
470 |
*(static_cast<WorkerRunBinding*>(run_binding)); |
||
471 |
ConcurrentWorkers<WorkerT> *master = binding.delegate; |
||
472 |
const worker_context_t *worker_context = binding.worker_context; |
||
473 |
|||
474 |
// boot up the worker object and make sure it works |
||
475 |
WorkerT worker(worker_context); |
||
476 |
worker.RegisterMaster(master); |
||
477 |
const bool init_success = worker.Initialize(); |
||
478 |
|||
479 |
// tell the master that this worker was started |
||
480 |
master->ReportStartedWorker(); |
||
481 |
|||
482 |
if (!init_success) { |
||
483 |
LogCvmfs(kLogConcurrency, kLogWarning, "Worker was not initialized " |
||
484 |
"properly... it will die now!"); |
||
485 |
return NULL; |
||
486 |
} |
||
487 |
|||
488 |
// |
||
489 |
// PROCESSING LOOP |
||
490 |
////////////////// |
||
491 |
|||
492 |
// start the processing loop |
||
493 |
LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Starting Worker..."); |
||
494 |
while (master->IsRunning()) { |
||
495 |
// acquire a new job |
||
496 |
WorkerJob job = master->Acquire(); |
||
497 |
|||
498 |
// check if we need to terminate |
||
499 |
if (job.is_death_sentence) |
||
500 |
break; |
||
501 |
|||
502 |
// do what you are supposed to do |
||
503 |
worker(job.data); |
||
504 |
} |
||
505 |
|||
506 |
// |
||
507 |
// TEAR DOWN |
||
508 |
//////////// |
||
509 |
|||
510 |
// give the worker the chance to tidy up |
||
511 |
worker.TearDown(); |
||
512 |
|||
513 |
// good bye thread... |
||
514 |
LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Terminating Worker..."); |
||
515 |
return NULL; |
||
516 |
} |
||
517 |
|||
518 |
|||
519 |
template <class WorkerT> |
||
520 |
void* ConcurrentWorkers<WorkerT>::RunCallbackThreadWrapper(void *run_binding) { |
||
521 |
const RunBinding &binding = *(static_cast<RunBinding*>(run_binding)); |
||
522 |
ConcurrentWorkers<WorkerT> *master = binding.delegate; |
||
523 |
|||
524 |
master->ReportStartedWorker(); |
||
525 |
|||
526 |
LogCvmfs(kLogConcurrency, kLogVerboseMsg, |
||
527 |
"Started dedicated callback worker"); |
||
528 |
master->RunCallbackThread(); |
||
529 |
LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Terminating Callback Worker..."); |
||
530 |
|||
531 |
return NULL; |
||
532 |
} |
||
533 |
|||
534 |
|||
535 |
template <class WorkerT> |
||
536 |
void ConcurrentWorkers<WorkerT>::RunCallbackThread() { |
||
537 |
while (IsRunning()) { |
||
538 |
const CallbackJob callback_job = results_queue_.Dequeue(); |
||
539 |
|||
540 |
// stop callback processing if needed |
||
541 |
if (callback_job.is_death_sentence) { |
||
542 |
break; |
||
543 |
} |
||
544 |
|||
545 |
// notify all observers about the finished job |
||
546 |
this->NotifyListeners(callback_job.data); |
||
547 |
|||
548 |
// remove the job from the pending 'list' and add it to the ready 'list' |
||
549 |
atomic_dec32(&jobs_pending_); |
||
550 |
atomic_inc64(&jobs_processed_); |
||
551 |
|||
552 |
// signal the Spooler that all jobs are done... |
||
553 |
if (atomic_read32(&jobs_pending_) == 0) { |
||
554 |
pthread_cond_broadcast(&jobs_all_done_); |
||
555 |
} |
||
556 |
} |
||
557 |
} |
||
558 |
|||
559 |
|||
560 |
template <class WorkerT> |
||
561 |
void ConcurrentWorkers<WorkerT>::ReportStartedWorker() const { |
||
562 |
MutexLockGuard lock(status_mutex_); |
||
563 |
++workers_started_; |
||
564 |
pthread_cond_signal(&worker_started_); |
||
565 |
} |
||
566 |
|||
567 |
|||
568 |
template <class WorkerT> |
||
569 |
void ConcurrentWorkers<WorkerT>::Schedule(WorkerJob job) { |
||
570 |
// Note: This method can be called from arbitrary threads. Thus we do not |
||
571 |
// necessarily have just one producer in the system. |
||
572 |
|||
573 |
// check if it makes sense to schedule this job |
||
574 |
if (!IsRunning() && !job.is_death_sentence) { |
||
575 |
LogCvmfs(kLogConcurrency, kLogWarning, "Tried to schedule a job but " |
||
576 |
"concurrency was not running..."); |
||
577 |
return; |
||
578 |
} |
||
579 |
|||
580 |
jobs_queue_.Enqueue(job); |
||
581 |
if (!job.is_death_sentence) { |
||
582 |
atomic_inc32(&jobs_pending_); |
||
583 |
} |
||
584 |
} |
||
585 |
|||
586 |
|||
587 |
template <class WorkerT> |
||
588 |
void ConcurrentWorkers<WorkerT>::ScheduleDeathSentences() { |
||
589 |
assert(!IsRunning()); |
||
590 |
|||
591 |
// make sure that the queue is empty before we schedule a death sentence |
||
592 |
TruncateJobQueue(); |
||
593 |
|||
594 |
// schedule a death sentence for each running thread |
||
595 |
const unsigned int number_of_workers = GetNumberOfWorkers(); |
||
596 |
for (unsigned int i = 0; i < number_of_workers; ++i) { |
||
597 |
Schedule(WorkerJob()); |
||
598 |
} |
||
599 |
|||
600 |
// schedule a death sentence for the callback thread |
||
601 |
results_queue_.Enqueue(CallbackJob()); |
||
602 |
} |
||
603 |
|||
604 |
|||
605 |
template <class WorkerT> |
||
606 |
typename ConcurrentWorkers<WorkerT>::WorkerJob |
||
607 |
ConcurrentWorkers<WorkerT>::Acquire() |
||
608 |
{ |
||
609 |
// Note: This method is exclusively called inside the worker threads! |
||
610 |
// Any other usage might produce undefined behavior. |
||
611 |
return jobs_queue_.Dequeue(); |
||
612 |
} |
||
613 |
|||
614 |
|||
615 |
template <class WorkerT> |
||
616 |
void ConcurrentWorkers<WorkerT>::TruncateJobQueue(const bool forget_pending) { |
||
617 |
// Note: This method will throw away all jobs currently waiting in the job |
||
618 |
// queue. These jobs will not be processed! |
||
619 |
const unsigned int dropped_jobs = jobs_queue_.Drop(); |
||
620 |
|||
621 |
// if desired, we remove the jobs from the pending 'list' |
||
622 |
if (forget_pending) { |
||
623 |
atomic_xadd32(&jobs_pending_, -dropped_jobs); |
||
624 |
} |
||
625 |
} |
||
626 |
|||
627 |
|||
628 |
template <class WorkerT> |
||
629 |
void ConcurrentWorkers<WorkerT>::Terminate() { |
||
630 |
// Note: this method causes workers to die immediately after they finished |
||
631 |
// their last acquired job. To make sure that each worker will check |
||
632 |
// the running state, we schedule empty jobs or Death Sentences. |
||
633 |
|||
634 |
assert(IsRunning()); |
||
635 |
|||
636 |
// unset the running flag (causing threads to die on the next checkpoint) |
||
637 |
StopRunning(); |
||
638 |
|||
639 |
// schedule empty jobs to make sure that each worker will actually reach the |
||
640 |
// next checkpoint in their processing loop and terminate as expected |
||
641 |
ScheduleDeathSentences(); |
||
642 |
|||
643 |
// wait for the worker threads to return |
||
644 |
WorkerThreads::const_iterator i = worker_threads_.begin(); |
||
645 |
WorkerThreads::const_iterator iend = worker_threads_.end(); |
||
646 |
for (; i != iend; ++i) { |
||
647 |
pthread_join(*i, NULL); |
||
648 |
} |
||
649 |
|||
650 |
// wait for the callback worker thread |
||
651 |
pthread_join(callback_thread_, NULL); |
||
652 |
|||
653 |
// check if we finished all pending jobs |
||
654 |
const int pending = atomic_read32(&jobs_pending_); |
||
655 |
if (pending > 0) { |
||
656 |
LogCvmfs(kLogConcurrency, kLogWarning, "Job queue was not fully processed. " |
||
657 |
"Still %d jobs were pending and " |
||
658 |
"will not be executed anymore.", |
||
659 |
pending); |
||
660 |
} |
||
661 |
|||
662 |
// check if we had failed jobs |
||
663 |
const int failed = atomic_read32(&jobs_failed_); |
||
664 |
if (failed > 0) { |
||
665 |
LogCvmfs(kLogConcurrency, kLogWarning, "We've had %d failed jobs.", |
||
666 |
failed); |
||
667 |
} |
||
668 |
|||
669 |
// thanks, and good bye... |
||
670 |
LogCvmfs(kLogConcurrency, kLogVerboseMsg, |
||
671 |
"All workers stopped. They processed %d jobs. Terminating...", |
||
672 |
atomic_read64(&jobs_processed_)); |
||
673 |
} |
||
674 |
|||
675 |
|||
676 |
template <class WorkerT> |
||
677 |
void ConcurrentWorkers<WorkerT>::WaitForEmptyQueue() const { |
||
678 |
LogCvmfs(kLogConcurrency, kLogVerboseMsg, |
||
679 |
"Waiting for %d jobs to be finished", atomic_read32(&jobs_pending_)); |
||
680 |
|||
681 |
// wait until all pending jobs are processed |
||
682 |
{ |
||
683 |
MutexLockGuard lock(jobs_all_done_mutex_); |
||
684 |
while (atomic_read32(&jobs_pending_) > 0) { |
||
685 |
pthread_cond_wait(&jobs_all_done_, &jobs_all_done_mutex_); |
||
686 |
} |
||
687 |
} |
||
688 |
|||
689 |
LogCvmfs(kLogConcurrency, kLogVerboseMsg, "Jobs are done... go on"); |
||
690 |
} |
||
691 |
|||
692 |
|||
693 |
template <class WorkerT> |
||
694 |
void ConcurrentWorkers<WorkerT>::WaitForTermination() { |
||
695 |
if (!IsRunning()) |
||
696 |
return; |
||
697 |
|||
698 |
WaitForEmptyQueue(); |
||
699 |
Terminate(); |
||
700 |
} |
||
701 |
|||
702 |
|||
703 |
template <class WorkerT> |
||
704 |
void ConcurrentWorkers<WorkerT>::JobDone( |
||
705 |
const ConcurrentWorkers<WorkerT>::returned_data_t& data, |
||
706 |
const bool success) { |
||
707 |
// BEWARE! |
||
708 |
// This is a callback method that might be called from a different thread! |
||
709 |
|||
710 |
// check if the finished job was successful |
||
711 |
if (!success) { |
||
712 |
atomic_inc32(&jobs_failed_); |
||
713 |
LogCvmfs(kLogConcurrency, kLogWarning, "Job failed"); |
||
714 |
} |
||
715 |
|||
716 |
// queue the result in the callback channel |
||
717 |
results_queue_.Enqueue(CallbackJob(data)); |
||
718 |
} |
||
719 |
|||
720 |
#ifdef CVMFS_NAMESPACE_GUARD |
||
721 |
} // namespace CVMFS_NAMESPACE_GUARD |
||
722 |
#endif |
||
723 |
|||
724 |
#endif // CVMFS_UTIL_CONCURRENCY_IMPL_H_ |
Generated by: GCOVR (Version 4.1) |