1 |
|
|
/** |
2 |
|
|
* This file is part of the CernVM File System. |
3 |
|
|
*/ |
4 |
|
|
|
5 |
|
|
#ifndef CVMFS_UTIL_CONCURRENCY_H_ |
6 |
|
|
#define CVMFS_UTIL_CONCURRENCY_H_ |
7 |
|
|
|
8 |
|
|
#include <pthread.h> |
9 |
|
|
|
10 |
|
|
#include <cassert> |
11 |
|
|
#include <queue> |
12 |
|
|
#include <set> |
13 |
|
|
#include <vector> |
14 |
|
|
|
15 |
|
|
#include "atomic.h" |
16 |
|
|
#include "util/async.h" |
17 |
|
|
#include "util/single_copy.h" |
18 |
|
|
|
19 |
|
|
#ifdef CVMFS_NAMESPACE_GUARD |
20 |
|
|
namespace CVMFS_NAMESPACE_GUARD { |
21 |
|
|
#endif |
22 |
|
|
|
23 |
|
|
/** |
24 |
|
|
* Implements a simple interface to lock objects of derived classes. Classes that |
25 |
|
|
* inherit from Lockable are also usable with the LockGuard template for scoped |
26 |
|
|
* locking semantics. |
27 |
|
|
* |
28 |
|
|
* Note: a Lockable object should not be copied! |
29 |
|
|
*/ |
30 |
|
|
class Lockable : SingleCopy { |
31 |
|
|
public: |
32 |
✗✓ |
1 |
inline virtual ~Lockable() { pthread_mutex_destroy(&mutex_); } |
33 |
|
|
|
34 |
|
1 |
void Lock() const { pthread_mutex_lock(&mutex_); } |
35 |
|
2 |
int TryLock() const { return pthread_mutex_trylock(&mutex_); } |
36 |
|
1 |
void Unlock() const { pthread_mutex_unlock(&mutex_); } |
37 |
|
|
|
38 |
|
|
protected: |
39 |
|
1 |
Lockable() { |
40 |
|
1 |
const int retval = pthread_mutex_init(&mutex_, NULL); |
41 |
✗✓ |
1 |
assert(retval == 0); |
42 |
|
1 |
} |
43 |
|
|
|
44 |
|
|
private: |
45 |
|
|
mutable pthread_mutex_t mutex_; |
46 |
|
|
}; |
47 |
|
|
|
48 |
|
|
|
49 |
|
|
// |
50 |
|
|
// ----------------------------------------------------------------------------- |
51 |
|
|
// |
52 |
|
|
|
53 |
|
|
/** |
54 |
|
|
* Used to allow for static polymorphism in the RAII template to statically |
55 |
|
|
* decide which 'lock' functions to use, if we have more than one possiblity. |
56 |
|
|
* (I.e. Read/Write locks) |
57 |
|
|
* Note: Static Polymorphism - Strategy Pattern |
58 |
|
|
* |
59 |
|
|
* TODO: eventually replace this by C++11 typed enum |
60 |
|
|
*/ |
61 |
|
|
struct _RAII_Polymorphism { |
62 |
|
|
enum T { |
63 |
|
|
None, |
64 |
|
|
ReadLock, |
65 |
|
|
WriteLock |
66 |
|
|
}; |
67 |
|
|
}; |
68 |
|
|
|
69 |
|
|
|
70 |
|
|
/** |
71 |
|
|
* Basic template wrapper class for any kind of RAII-like behavior. |
72 |
|
|
* The user is supposed to provide a template specialization of Enter() and |
73 |
|
|
* Leave(). On creation of the RAII object it will call Enter() respectively |
74 |
|
|
* Leave() on destruction. The gold standard example is a LockGard (see below). |
75 |
|
|
* |
76 |
|
|
* Note: Resource Acquisition Is Initialization (Bjarne Stroustrup) |
77 |
|
|
*/ |
78 |
|
|
template <typename T, _RAII_Polymorphism::T P = _RAII_Polymorphism::None> |
79 |
|
|
class RAII : SingleCopy { |
80 |
|
|
public: |
81 |
|
148841949 |
inline explicit RAII(T &object) : ref_(object) { Enter(); } |
82 |
|
4854119 |
inline explicit RAII(T *object) : ref_(*object) { Enter(); } |
83 |
|
154762214 |
inline ~RAII() { Leave(); } |
84 |
|
|
|
85 |
|
|
protected: |
86 |
|
2 |
inline void Enter() { ref_.Lock(); } |
87 |
|
2 |
inline void Leave() { ref_.Unlock(); } |
88 |
|
|
|
89 |
|
|
private: |
90 |
|
|
T &ref_; |
91 |
|
|
}; |
92 |
|
|
|
93 |
|
|
|
94 |
|
|
/** |
95 |
|
|
* This is a simple scoped lock implementation. Every object that provides the |
96 |
|
|
* methods Lock() and Unlock() should work with it. Classes that will be used |
97 |
|
|
* with this template should therefore simply inherit from Lockable. |
98 |
|
|
* |
99 |
|
|
* Creating a LockGuard object on the stack will lock the provided object. When |
100 |
|
|
* the LockGuard runs out of scope it will automatically release the lock. This |
101 |
|
|
* ensures a clean unlock in a lot of situations! |
102 |
|
|
* |
103 |
|
|
* TODO: C++11 replace this by a type alias to RAII |
104 |
|
|
*/ |
105 |
|
|
template <typename LockableT> |
106 |
|
2 |
class LockGuard : public RAII<LockableT> { |
107 |
|
|
public: |
108 |
|
2 |
inline explicit LockGuard(LockableT *object) : RAII<LockableT>(object) {} |
109 |
|
|
}; |
110 |
|
|
|
111 |
|
|
|
112 |
|
|
template <> |
113 |
|
153548895 |
inline void RAII<pthread_mutex_t>::Enter() { pthread_mutex_lock(&ref_); } |
114 |
|
|
template <> |
115 |
|
154677108 |
inline void RAII<pthread_mutex_t>::Leave() { pthread_mutex_unlock(&ref_); } |
116 |
|
|
typedef RAII<pthread_mutex_t> MutexLockGuard; |
117 |
|
|
|
118 |
|
|
|
119 |
|
|
template <> |
120 |
|
39182 |
inline void RAII<pthread_rwlock_t, |
121 |
|
|
_RAII_Polymorphism::ReadLock>::Enter() { |
122 |
|
39182 |
pthread_rwlock_rdlock(&ref_); |
123 |
|
39182 |
} |
124 |
|
|
template <> |
125 |
|
39182 |
inline void RAII<pthread_rwlock_t, |
126 |
|
|
_RAII_Polymorphism::ReadLock>::Leave() { |
127 |
|
39182 |
pthread_rwlock_unlock(&ref_); |
128 |
|
39182 |
} |
129 |
|
|
template <> |
130 |
|
46282 |
inline void RAII<pthread_rwlock_t, |
131 |
|
|
_RAII_Polymorphism::WriteLock>::Enter() { |
132 |
|
46282 |
pthread_rwlock_wrlock(&ref_); |
133 |
|
46282 |
} |
134 |
|
|
template <> |
135 |
|
46282 |
inline void RAII<pthread_rwlock_t, |
136 |
|
|
_RAII_Polymorphism::WriteLock>::Leave() { |
137 |
|
46282 |
pthread_rwlock_unlock(&ref_); |
138 |
|
46282 |
} |
139 |
|
|
typedef RAII<pthread_rwlock_t, _RAII_Polymorphism::ReadLock> ReadLockGuard; |
140 |
|
|
typedef RAII<pthread_rwlock_t, _RAII_Polymorphism::WriteLock> WriteLockGuard; |
141 |
|
|
|
142 |
|
|
|
143 |
|
|
// |
144 |
|
|
// ----------------------------------------------------------------------------- |
145 |
|
|
// |
146 |
|
|
|
147 |
|
|
|
148 |
|
|
/** |
149 |
|
|
* This is a simple implementation of a Future wrapper template. |
150 |
|
|
* It is used as a proxy for results that are computed asynchronously and might |
151 |
|
|
* not be available on the first access. |
152 |
|
|
* Since this is a very simple implementation one needs to use the Future's |
153 |
|
|
* Get() and Set() methods to obtain the containing data resp. to write it. |
154 |
|
|
* Note: More than a single call to Set() is prohibited! |
155 |
|
|
* If Get() is called before Set() the calling thread will block until |
156 |
|
|
* the value has been set by a different thread. |
157 |
|
|
* |
158 |
|
|
* @param T the value type wrapped by this Future template |
159 |
|
|
*/ |
160 |
|
|
template <typename T> |
161 |
|
|
class Future : SingleCopy { |
162 |
|
|
public: |
163 |
|
|
Future(); |
164 |
|
|
virtual ~Future(); |
165 |
|
|
|
166 |
|
|
/** |
167 |
|
|
* Save an asynchronously computed value into the Future. This potentially |
168 |
|
|
* unblocks threads that already wait for the value. |
169 |
|
|
* @param object the value object to be set |
170 |
|
|
*/ |
171 |
|
|
void Set(const T &object); |
172 |
|
|
|
173 |
|
|
/** |
174 |
|
|
* Retrieves the wrapped value object. If the value is not yet available it |
175 |
|
|
* will automatically block until a different thread calls Set(). |
176 |
|
|
* @return the containing value object |
177 |
|
|
*/ |
178 |
|
|
T& Get(); |
179 |
|
|
const T& Get() const; |
180 |
|
|
|
181 |
|
|
protected: |
182 |
|
|
void Wait() const; |
183 |
|
|
|
184 |
|
|
private: |
185 |
|
|
T object_; |
186 |
|
|
mutable pthread_mutex_t mutex_; |
187 |
|
|
mutable pthread_cond_t object_set_; |
188 |
|
|
bool object_was_set_; |
189 |
|
|
}; |
190 |
|
|
|
191 |
|
|
|
192 |
|
|
// |
193 |
|
|
// ----------------------------------------------------------------------------- |
194 |
|
|
// |
195 |
|
|
|
196 |
|
|
|
197 |
|
|
/** |
198 |
|
|
* This counter can be counted up and down using the usual increment/decrement |
199 |
|
|
* operators. It allows threads to wait for it to become zero as well as to |
200 |
|
|
* block when a specified maximal value would be exceeded by an increment. |
201 |
|
|
* |
202 |
|
|
* Note: If a maximal value is specified on creation, the SynchronizingCounter |
203 |
|
|
* is assumed to never leave the interval [0, maximal_value]! Otherwise |
204 |
|
|
* the numerical limits of the specified template parameter define this |
205 |
|
|
* interval and an increment _never_ blocks. |
206 |
|
|
* |
207 |
|
|
* Caveat: This implementation uses a simple mutex mechanism and therefore might |
208 |
|
|
* become a scalability bottle neck! |
209 |
|
|
*/ |
210 |
|
|
template <typename T> |
211 |
|
|
class SynchronizingCounter : SingleCopy { |
212 |
|
|
public: |
213 |
|
8 |
SynchronizingCounter() : |
214 |
|
8 |
value_(T(0)), maximal_value_(T(0)) { Initialize(); } |
215 |
|
|
|
216 |
|
338 |
explicit SynchronizingCounter(const T maximal_value) |
217 |
|
|
: value_(T(0)) |
218 |
|
338 |
, maximal_value_(maximal_value) |
219 |
|
|
{ |
220 |
✗✓ |
338 |
assert(maximal_value > T(0)); |
221 |
|
338 |
Initialize(); |
222 |
|
338 |
} |
223 |
|
|
|
224 |
|
346 |
~SynchronizingCounter() { Destroy(); } |
225 |
|
|
|
226 |
|
72787506 |
T Increment() { |
227 |
|
72787506 |
MutexLockGuard l(mutex_); |
228 |
|
72855615 |
WaitForFreeSlotUnprotected(); |
229 |
|
72855615 |
SetValueUnprotected(value_ + T(1)); |
230 |
|
72855615 |
return value_; |
231 |
|
|
} |
232 |
|
|
|
233 |
|
72576889 |
T Decrement() { |
234 |
|
72576889 |
MutexLockGuard l(mutex_); |
235 |
|
72855682 |
SetValueUnprotected(value_ - T(1)); |
236 |
|
72855682 |
return value_; |
237 |
|
|
} |
238 |
|
|
|
239 |
|
188 |
void WaitForZero() const { |
240 |
|
188 |
MutexLockGuard l(mutex_); |
241 |
✓✓ |
505 |
while (value_ != T(0)) { |
242 |
|
129 |
pthread_cond_wait(&became_zero_, &mutex_); |
243 |
|
|
} |
244 |
✗✓ |
188 |
assert(value_ == T(0)); |
245 |
|
188 |
} |
246 |
|
|
|
247 |
|
437266806 |
bool HasMaximalValue() const { return maximal_value_ != T(0); } |
248 |
|
9 |
T maximal_value() const { return maximal_value_; } |
249 |
|
|
|
250 |
|
590650 |
T operator++() { return Increment(); } |
251 |
|
4 |
T operator++(int) { return Increment() - T(1); } |
252 |
|
590674 |
T operator--() { return Decrement(); } |
253 |
|
32 |
T operator--(int) { return Decrement() + T(1); } |
254 |
|
|
|
255 |
|
13491 |
operator T() const { |
256 |
|
13491 |
MutexLockGuard l(mutex_); |
257 |
|
13491 |
return value_; |
258 |
|
|
} |
259 |
|
|
|
260 |
|
44 |
SynchronizingCounter<T>& operator=(const T &other) { |
261 |
|
44 |
MutexLockGuard l(mutex_); |
262 |
|
44 |
SetValueUnprotected(other); |
263 |
|
44 |
return *this; |
264 |
|
|
} |
265 |
|
|
|
266 |
|
|
protected: |
267 |
|
|
void SetValueUnprotected(const T new_value); |
268 |
|
|
void WaitForFreeSlotUnprotected(); |
269 |
|
|
|
270 |
|
|
private: |
271 |
|
|
void Initialize(); |
272 |
|
|
void Destroy(); |
273 |
|
|
|
274 |
|
|
private: |
275 |
|
|
T value_; |
276 |
|
|
const T maximal_value_; |
277 |
|
|
|
278 |
|
|
mutable pthread_mutex_t mutex_; |
279 |
|
|
mutable pthread_cond_t became_zero_; |
280 |
|
|
pthread_cond_t free_slot_; |
281 |
|
|
}; |
282 |
|
|
|
283 |
|
|
|
284 |
|
|
// |
285 |
|
|
// ----------------------------------------------------------------------------- |
286 |
|
|
// |
287 |
|
|
|
288 |
|
|
|
289 |
|
|
template <typename ParamT> |
290 |
|
|
class Observable; |
291 |
|
|
|
292 |
|
|
|
293 |
|
|
/** |
294 |
|
|
* This is a base class for classes that need to expose a callback interface for |
295 |
|
|
* asynchronous callback methods. One can register an arbitrary number of |
296 |
|
|
* observers on an Observable that get notified when the method NotifyListeners() |
297 |
|
|
* is invoked. |
298 |
|
|
* |
299 |
|
|
* Note: the registration and invocation of callbacks in Observable is thread- |
300 |
|
|
* safe, but be aware that the callbacks of observing classes might run in |
301 |
|
|
* arbitrary threads. When using these classes, you should take extra care |
302 |
|
|
* for thread-safety. |
303 |
|
|
* |
304 |
|
|
* Note: The RegisterListener() methods return a pointer to a CallbackBase. |
305 |
|
|
* You MUST NOT free these objects, they are managed by the Observable |
306 |
|
|
* class. Use them only as handles to unregister specific callbacks. |
307 |
|
|
* |
308 |
|
|
* @param ParamT the type of the parameter that is passed to every callback |
309 |
|
|
* invocation. |
310 |
|
|
*/ |
311 |
|
|
template <typename ParamT> |
312 |
|
|
class Observable : public Callbackable<ParamT>, |
313 |
|
|
SingleCopy { |
314 |
|
|
public: |
315 |
|
|
typedef typename Callbackable<ParamT>::CallbackTN* CallbackPtr; |
316 |
|
|
protected: |
317 |
|
|
typedef std::set<CallbackPtr> Callbacks; |
318 |
|
|
|
319 |
|
|
public: |
320 |
|
|
virtual ~Observable(); |
321 |
|
|
|
322 |
|
|
/** |
323 |
|
|
* Registers a method of a specific object as a listener to the Observable |
324 |
|
|
* object. The method is invoked on the given delegate when the callback is |
325 |
|
|
* fired by the observed object using NotifyListeners(). Since this is meant |
326 |
|
|
* to be a closure, it also passes the third argument to the method being in- |
327 |
|
|
* voked by the Observable object. |
328 |
|
|
* |
329 |
|
|
* @param DelegateT the type of the delegate object |
330 |
|
|
* @param method a pointer to the method to be invoked by the callback |
331 |
|
|
* @param delegate a pointer to the object to invoke the callback on |
332 |
|
|
* @param closure something to be passed to `method` |
333 |
|
|
* @return a handle to the registered callback |
334 |
|
|
*/ |
335 |
|
|
template <class DelegateT, class ClosureDataT> |
336 |
|
|
CallbackPtr RegisterListener( |
337 |
|
|
typename BoundClosure<ParamT, |
338 |
|
|
DelegateT, |
339 |
|
|
ClosureDataT>::CallbackMethod method, |
340 |
|
|
DelegateT *delegate, |
341 |
|
|
ClosureDataT data); |
342 |
|
|
|
343 |
|
|
/** |
344 |
|
|
* Registers a method of a specific object as a listener to the Observable |
345 |
|
|
* object. The method is invoked on the given delegate when the callback is |
346 |
|
|
* fired by the observed object using NotifyListeners(). |
347 |
|
|
* |
348 |
|
|
* @param DelegateT the type of the delegate object |
349 |
|
|
* @param method a pointer to the method to be invoked by the callback |
350 |
|
|
* @param delegate a pointer to the object to invoke the callback on |
351 |
|
|
* @return a handle to the registered callback |
352 |
|
|
*/ |
353 |
|
|
template <class DelegateT> |
354 |
|
|
CallbackPtr RegisterListener( |
355 |
|
|
typename BoundCallback<ParamT, DelegateT>::CallbackMethod method, |
356 |
|
|
DelegateT *delegate); |
357 |
|
|
|
358 |
|
|
/** |
359 |
|
|
* Registers a static class member or a C-like function as a callback to the |
360 |
|
|
* Observable object. The function is invoked when the callback is fired by |
361 |
|
|
* the observed object using NotifyListeners(). |
362 |
|
|
* |
363 |
|
|
* @param fn a pointer to the function to be called by the callback |
364 |
|
|
* @return a handle to the registered callback |
365 |
|
|
*/ |
366 |
|
|
CallbackPtr RegisterListener(typename Callback<ParamT>::CallbackFunction fn); |
367 |
|
|
|
368 |
|
|
/** |
369 |
|
|
* Removes the given callback from the listeners group of this Observable. |
370 |
|
|
* |
371 |
|
|
* @param callback_object a callback handle that was returned by |
372 |
|
|
* RegisterListener() before. |
373 |
|
|
*/ |
374 |
|
|
void UnregisterListener(CallbackPtr callback_object); |
375 |
|
|
|
376 |
|
|
/** |
377 |
|
|
* Removes all listeners from the Observable |
378 |
|
|
*/ |
379 |
|
|
void UnregisterListeners(); |
380 |
|
|
|
381 |
|
|
protected: |
382 |
|
|
Observable(); // don't instantiate this as a stand alone object |
383 |
|
|
|
384 |
|
|
void RegisterListener(CallbackPtr callback_object); |
385 |
|
|
|
386 |
|
|
/** |
387 |
|
|
* Notifies all registered listeners and passes them the provided argument |
388 |
|
|
* This method should be called by a derived class to send out asynchronous |
389 |
|
|
* messages to registered observers. |
390 |
|
|
* |
391 |
|
|
* @param parameter the data to be passed to the observers |
392 |
|
|
*/ |
393 |
|
|
void NotifyListeners(const ParamT ¶meter); |
394 |
|
|
|
395 |
|
|
private: |
396 |
|
|
Callbacks listeners_; //!< the set of registered |
397 |
|
|
//!< callback objects |
398 |
|
|
mutable pthread_rwlock_t listeners_rw_lock_; |
399 |
|
|
}; |
400 |
|
|
|
401 |
|
|
|
402 |
|
|
// |
403 |
|
|
// ----------------------------------------------------------------------------- |
404 |
|
|
// |
405 |
|
|
|
406 |
|
|
|
407 |
|
|
/** |
408 |
|
|
* Returns the number of CPU cores present in the system or a fallback number |
409 |
|
|
* if it failed to determine the number of CPU cores. |
410 |
|
|
* |
411 |
|
|
* @return the number of active CPU cores in the system |
412 |
|
|
*/ |
413 |
|
|
unsigned int GetNumberOfCpuCores(); |
414 |
|
|
static const unsigned int kFallbackNumberOfCpus = 1; |
415 |
|
|
|
416 |
|
|
|
417 |
|
|
/** |
418 |
|
|
* A blocking signal for thread synchronization |
419 |
|
|
*/ |
420 |
|
|
class Signal : SingleCopy { |
421 |
|
|
public: |
422 |
|
|
Signal(); |
423 |
|
|
~Signal(); |
424 |
|
|
void Wakeup(); |
425 |
|
|
void Wait(); |
426 |
|
|
bool IsSleeping(); |
427 |
|
|
|
428 |
|
|
private: |
429 |
|
|
bool fired_; |
430 |
|
|
pthread_mutex_t lock_; |
431 |
|
|
pthread_cond_t signal_; |
432 |
|
|
}; |
433 |
|
|
|
434 |
|
|
|
435 |
|
|
// |
436 |
|
|
// ----------------------------------------------------------------------------- |
437 |
|
|
// |
438 |
|
|
|
439 |
|
|
|
440 |
|
|
/** |
441 |
|
|
* Asynchronous FIFO channel template |
442 |
|
|
* Implements a thread safe FIFO queue that handles thread blocking if the queue |
443 |
|
|
* is full or empty. |
444 |
|
|
* |
445 |
|
|
* @param T the data type to be enqueued in the queue |
446 |
|
|
*/ |
447 |
|
|
template <class T> |
448 |
|
|
class FifoChannel : protected std::queue<T> { |
449 |
|
|
public: |
450 |
|
|
/** |
451 |
|
|
* Creates a new FIFO channel. |
452 |
|
|
* |
453 |
|
|
* @param maximal_length the maximal number of items that can be enqueued |
454 |
|
|
* @param drainout_threshold if less than xx elements are in the queue it is |
455 |
|
|
* considered to be "not full" |
456 |
|
|
*/ |
457 |
|
|
FifoChannel(const size_t maximal_length, |
458 |
|
|
const size_t drainout_threshold); |
459 |
|
|
virtual ~FifoChannel(); |
460 |
|
|
|
461 |
|
|
/** |
462 |
|
|
* Adds a new item to the end of the FIFO channel. If the queue is full, this |
463 |
|
|
* call will block until items were dequeued by another thread allowing the |
464 |
|
|
* desired insertion. |
465 |
|
|
* |
466 |
|
|
* @param data the data to be enqueued into the FIFO channel |
467 |
|
|
*/ |
468 |
|
|
void Enqueue(const T &data); |
469 |
|
|
|
470 |
|
|
/** |
471 |
|
|
* Removes the next element from the channel. If the queue is empty, this will |
472 |
|
|
* block until another thread enqueues an item into the channel. |
473 |
|
|
* |
474 |
|
|
* @return the first item in the channel queue |
475 |
|
|
*/ |
476 |
|
|
const T Dequeue(); |
477 |
|
|
|
478 |
|
|
/** |
479 |
|
|
* Clears all items in the FIFO channel. The cleared items will be lost. |
480 |
|
|
* |
481 |
|
|
* @return the number of dropped items |
482 |
|
|
*/ |
483 |
|
|
unsigned int Drop(); |
484 |
|
|
|
485 |
|
|
inline size_t GetItemCount() const; |
486 |
|
|
inline bool IsEmpty() const; |
487 |
|
|
inline size_t GetMaximalItemCount() const; |
488 |
|
|
|
489 |
|
|
private: |
490 |
|
|
// general configuration |
491 |
|
|
const size_t maximal_queue_length_; |
492 |
|
|
const size_t queue_drainout_threshold_; |
493 |
|
|
|
494 |
|
|
// thread synchronisation structures |
495 |
|
|
mutable pthread_mutex_t mutex_; |
496 |
|
|
mutable pthread_cond_t queue_is_not_empty_; |
497 |
|
|
mutable pthread_cond_t queue_is_not_full_; |
498 |
|
|
}; |
499 |
|
|
|
500 |
|
|
|
501 |
|
|
/** |
502 |
|
|
* This template implements a generic producer/consumer approach to concurrent |
503 |
|
|
* worker tasks. It spawns a given number of Workers derived from the base class |
504 |
|
|
* ConcurrentWorker and uses them to distribute the work load onto concurrent |
505 |
|
|
* threads. |
506 |
|
|
* One can have multiple producers, that use Schedule() to post new work into |
507 |
|
|
* a central job queue, which in turn is processed concurrently by the Worker |
508 |
|
|
* objects in multiple threads. Furthermore the template provides an interface |
509 |
|
|
* to control the worker swarm, i.e. to wait for their completion or cancel them |
510 |
|
|
* before all jobs are processed. |
511 |
|
|
* |
512 |
|
|
* Note: A worker is a class inheriting from ConcurrentWorker that needs to meet |
513 |
|
|
* a couple of requirements. See the documentation of ConcurrentWorker for |
514 |
|
|
* additional details. |
515 |
|
|
* |
516 |
|
|
* @param WorkerT the class to be used as a worker for a concurrent worker |
517 |
|
|
* swarm |
518 |
|
|
*/ |
519 |
|
|
template <class WorkerT> |
520 |
|
|
class ConcurrentWorkers : public Observable<typename WorkerT::returned_data> { |
521 |
|
|
public: |
522 |
|
|
// these data types must be defined by the worker class |
523 |
|
|
/** |
524 |
|
|
* Input data type |
525 |
|
|
*/ |
526 |
|
|
typedef typename WorkerT::expected_data expected_data_t; |
527 |
|
|
/** |
528 |
|
|
* Output data type |
529 |
|
|
*/ |
530 |
|
|
typedef typename WorkerT::returned_data returned_data_t; |
531 |
|
|
/** |
532 |
|
|
* Common context type |
533 |
|
|
*/ |
534 |
|
|
typedef typename WorkerT::worker_context worker_context_t; |
535 |
|
|
|
536 |
|
|
protected: |
537 |
|
|
typedef std::vector<pthread_t> WorkerThreads; |
538 |
|
|
|
539 |
|
|
/** |
540 |
|
|
* This is a simple wrapper structure to piggy-back control information on |
541 |
|
|
* scheduled jobs. Job structures are scheduled into a central FIFO queue and |
542 |
|
|
* are then processed concurrently by the workers. |
543 |
|
|
*/ |
544 |
|
|
template <class DataT> |
545 |
|
|
struct Job { |
546 |
|
|
explicit Job(const DataT &data) : |
547 |
|
|
data(data), |
548 |
|
|
is_death_sentence(false) {} |
549 |
|
|
Job() : |
550 |
|
|
data(), |
551 |
|
|
is_death_sentence(true) {} |
552 |
|
|
const DataT data; //!< job payload |
553 |
|
|
const bool is_death_sentence; //!< death sentence flag |
554 |
|
|
}; |
555 |
|
|
typedef Job<expected_data_t> WorkerJob; |
556 |
|
|
typedef Job<returned_data_t> CallbackJob; |
557 |
|
|
|
558 |
|
|
/** |
559 |
|
|
* Provides a wrapper for initialization data passed to newly spawned worker |
560 |
|
|
* threads for initialization. |
561 |
|
|
* It contains a pointer to the spawning ConcurrentWorkers master object as |
562 |
|
|
* well as a pointer to a context object defined by the concrete worker to be |
563 |
|
|
* spawned. |
564 |
|
|
*/ |
565 |
|
|
struct RunBinding { |
566 |
|
|
explicit RunBinding(ConcurrentWorkers<WorkerT> *delegate) : |
567 |
|
|
delegate(delegate) {} |
568 |
|
|
ConcurrentWorkers<WorkerT> *delegate; //!< delegate to the Concurrent- |
569 |
|
|
//!< Workers master |
570 |
|
|
}; |
571 |
|
|
|
572 |
|
|
struct WorkerRunBinding : RunBinding { |
573 |
|
|
WorkerRunBinding(ConcurrentWorkers<WorkerT> *delegate, |
574 |
|
|
const worker_context_t *worker_context) : |
575 |
|
|
RunBinding(delegate), |
576 |
|
|
worker_context(worker_context) {} |
577 |
|
|
/** |
578 |
|
|
* WorkerT defined context objects for worker init. |
579 |
|
|
*/ |
580 |
|
|
const worker_context_t *worker_context; |
581 |
|
|
}; |
582 |
|
|
|
583 |
|
|
public: |
584 |
|
|
/** |
585 |
|
|
* Creates a ConcurrentWorkers master object that encapsulates the actual |
586 |
|
|
* workers. |
587 |
|
|
* |
588 |
|
|
* @param number_of_workers the number of concurrent workers to be spawned |
589 |
|
|
* @param maximal_queue_length the maximal length of the job queue |
590 |
|
|
* (>= number_of_workers) |
591 |
|
|
* @param worker_context a pointer to the WorkerT defined context object |
592 |
|
|
*/ |
593 |
|
|
ConcurrentWorkers(const size_t number_of_workers, |
594 |
|
|
const size_t maximal_queue_length, |
595 |
|
|
worker_context_t *worker_context = NULL); |
596 |
|
|
virtual ~ConcurrentWorkers(); |
597 |
|
|
|
598 |
|
|
/** |
599 |
|
|
* Initializes the ConcurrentWorkers swarm, spawnes a thread for each new |
600 |
|
|
* worker object and puts everything into working state. |
601 |
|
|
* |
602 |
|
|
* @return true if all went fine |
603 |
|
|
*/ |
604 |
|
|
bool Initialize(); |
605 |
|
|
|
606 |
|
|
/** |
607 |
|
|
* Schedules a new job for processing into the internal job queue. This method |
608 |
|
|
* will block in case the job queue is already full and wait for an empty slot. |
609 |
|
|
* |
610 |
|
|
* @param data the data to be processed |
611 |
|
|
*/ |
612 |
|
|
inline void Schedule(const expected_data_t &data) { |
613 |
|
|
Schedule(WorkerJob(data)); |
614 |
|
|
} |
615 |
|
|
|
616 |
|
|
/** |
617 |
|
|
* Shuts down the ConcurrentWorkers object as well as the encapsulated workers |
618 |
|
|
* as soon as possible. Workers will finish their current job and will termi- |
619 |
|
|
* nate afterwards. If no jobs are scheduled they will simply stop waiting for |
620 |
|
|
* new ones and terminate afterwards. |
621 |
|
|
* This method MUST not be called more than once per ConcurrentWorkers. |
622 |
|
|
*/ |
623 |
|
|
void Terminate(); |
624 |
|
|
|
625 |
|
|
/** |
626 |
|
|
* Waits until the job queue is fully processed |
627 |
|
|
* |
628 |
|
|
* Note: this might lead to undefined behaviour or infinite waiting if other |
629 |
|
|
* producers still schedule jobs into the job queue. |
630 |
|
|
*/ |
631 |
|
|
void WaitForEmptyQueue() const; |
632 |
|
|
|
633 |
|
|
/** |
634 |
|
|
* Waits until the ConcurrentWorkers swarm fully processed the current job |
635 |
|
|
* queue and shuts down afterwards. |
636 |
|
|
* |
637 |
|
|
* Note: just as for WaitForEmptyQueue() this assumes that no other producers |
638 |
|
|
* schedule jobs in the mean time. |
639 |
|
|
*/ |
640 |
|
|
void WaitForTermination(); |
641 |
|
|
|
642 |
|
|
inline unsigned int GetNumberOfWorkers() const { return number_of_workers_; } |
643 |
|
|
inline unsigned int GetNumberOfFailedJobs() const { |
644 |
|
|
return atomic_read32(&jobs_failed_); |
645 |
|
|
} |
646 |
|
|
|
647 |
|
|
/** |
648 |
|
|
* Defines a job as successfully finished. |
649 |
|
|
* DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT! |
650 |
|
|
* |
651 |
|
|
* @param data the data to be returned back to the user |
652 |
|
|
*/ |
653 |
|
|
inline void JobSuccessful(const returned_data_t& data) { |
654 |
|
|
JobDone(data, true); |
655 |
|
|
} |
656 |
|
|
|
657 |
|
|
/** |
658 |
|
|
* Defines a job as failed. |
659 |
|
|
* DO NOT CALL THIS OUTSIDE OF A WORKER OBJECT! |
660 |
|
|
* |
661 |
|
|
* Note: Even for failed jobs the user will get a callback with a data object. |
662 |
|
|
* You might want to make sure, that this data contains a status flag as |
663 |
|
|
* well, telling the user what went wrong. |
664 |
|
|
* |
665 |
|
|
* @param data the data to be returned back to the user |
666 |
|
|
*/ |
667 |
|
|
inline void JobFailed(const returned_data_t& data) { JobDone(data, false); } |
668 |
|
|
|
669 |
|
|
void RunCallbackThread(); |
670 |
|
|
|
671 |
|
|
protected: |
672 |
|
|
bool SpawnWorkers(); |
673 |
|
|
|
674 |
|
|
/** |
675 |
|
|
* POSIX conform function for thread entry point. Is invoked for every new |
676 |
|
|
* worker thread and contains the initialization, processing loop and tear |
677 |
|
|
* down of the unique worker objects |
678 |
|
|
* |
679 |
|
|
* @param run_binding void pointer to a RunBinding structure (C interface) |
680 |
|
|
* @return NULL in any case |
681 |
|
|
*/ |
682 |
|
|
static void* RunWorker(void *run_binding); |
683 |
|
|
|
684 |
|
|
static void* RunCallbackThreadWrapper(void *run_binding); |
685 |
|
|
|
686 |
|
|
/** |
687 |
|
|
* Tells the master that a worker thread did start. This does not mean, that |
688 |
|
|
* it was initialized successfully. |
689 |
|
|
*/ |
690 |
|
|
void ReportStartedWorker() const; |
691 |
|
|
|
692 |
|
|
void Schedule(WorkerJob job); |
693 |
|
|
void ScheduleDeathSentences(); |
694 |
|
|
|
695 |
|
|
/** |
696 |
|
|
* Empties the job queue |
697 |
|
|
* |
698 |
|
|
* @param forget_pending controls if cancelled jobs should be seen as finished |
699 |
|
|
*/ |
700 |
|
|
void TruncateJobQueue(const bool forget_pending = false); |
701 |
|
|
|
702 |
|
|
/** |
703 |
|
|
* Retrieves a job from the job queue. If the job queue is empty it will block |
704 |
|
|
* until there is a new job available for processing. |
705 |
|
|
* THIS METHOD MUST ONLY BE CALLED INSIDE THE WORKER OBJECTS |
706 |
|
|
* |
707 |
|
|
* @return a job to be processed by a worker |
708 |
|
|
*/ |
709 |
|
|
inline WorkerJob Acquire(); |
710 |
|
|
|
711 |
|
|
/** |
712 |
|
|
* Controls the asynchronous finishing of a job. |
713 |
|
|
* DO NOT CALL THIS, use JobSuccessful() or JobFailed() wrappers instead. |
714 |
|
|
* |
715 |
|
|
* @param data the data to be returned to the user |
716 |
|
|
* @param success flag if job was successful |
717 |
|
|
*/ |
718 |
|
|
void JobDone(const returned_data_t& data, const bool success = true); |
719 |
|
|
|
720 |
|
|
inline void StartRunning() { |
721 |
|
|
MutexLockGuard guard(status_mutex_); |
722 |
|
|
running_ = true; |
723 |
|
|
} |
724 |
|
|
inline void StopRunning() { |
725 |
|
|
MutexLockGuard guard(status_mutex_); |
726 |
|
|
running_ = false; |
727 |
|
|
} |
728 |
|
|
inline bool IsRunning() const { |
729 |
|
|
MutexLockGuard guard(status_mutex_); |
730 |
|
|
return running_; |
731 |
|
|
} |
732 |
|
|
|
733 |
|
|
private: |
734 |
|
|
// general configuration |
735 |
|
|
const size_t number_of_workers_; //!< number of concurrent worker threads |
736 |
|
|
const worker_context_t *worker_context_; //!< the WorkerT defined context |
737 |
|
|
/** |
738 |
|
|
* The thread context passed to newly spawned threads |
739 |
|
|
*/ |
740 |
|
|
WorkerRunBinding thread_context_; |
741 |
|
|
|
742 |
|
|
// status information |
743 |
|
|
bool initialized_; |
744 |
|
|
bool running_; |
745 |
|
|
mutable unsigned int workers_started_; |
746 |
|
|
mutable pthread_mutex_t status_mutex_; |
747 |
|
|
mutable pthread_cond_t worker_started_; |
748 |
|
|
mutable pthread_mutex_t jobs_all_done_mutex_; |
749 |
|
|
mutable pthread_cond_t jobs_all_done_; |
750 |
|
|
|
751 |
|
|
// worker threads |
752 |
|
|
WorkerThreads worker_threads_; //!< list of worker threads |
753 |
|
|
pthread_t callback_thread_; //!< handles callback invokes |
754 |
|
|
|
755 |
|
|
// job queue |
756 |
|
|
typedef FifoChannel<WorkerJob > JobQueue; |
757 |
|
|
JobQueue jobs_queue_; |
758 |
|
|
mutable atomic_int32 jobs_pending_; |
759 |
|
|
mutable atomic_int32 jobs_failed_; |
760 |
|
|
mutable atomic_int64 jobs_processed_; |
761 |
|
|
|
762 |
|
|
// callback channel |
763 |
|
|
typedef FifoChannel<CallbackJob > CallbackQueue; |
764 |
|
|
CallbackQueue results_queue_; |
765 |
|
|
}; |
766 |
|
|
|
767 |
|
|
|
768 |
|
|
/** |
769 |
|
|
* Base class for worker classes that should be used in a ConcurrentWorkers |
770 |
|
|
* swarm. These classes need to fulfill a number of requirements in order to |
771 |
|
|
* satisfy the needs of the ConcurrentWorkers template. |
772 |
|
|
* |
773 |
|
|
* Requirements: |
774 |
|
|
* -> needs to define the following types: |
775 |
|
|
* - expected_data - input data structure of the worker |
776 |
|
|
* - returned_data - output data structure of the worker |
777 |
|
|
* - worker_context - context structure for initialization information |
778 |
|
|
* |
779 |
|
|
* -> implement a constructor that takes a pointer to its worker_context |
780 |
|
|
* as its only parameter: |
781 |
|
|
* AwesomeWorker(const AwesomeWorker::worker_context*) |
782 |
|
|
* Note: do not rely on the context object to be available after the |
783 |
|
|
* consturctor has returned! |
784 |
|
|
* |
785 |
|
|
* -> needs to define the calling-operator expecting one parameter of type: |
786 |
|
|
* const expected_data& and returning void |
787 |
|
|
* This will be invoked for every new job the worker should process |
788 |
|
|
* |
789 |
|
|
* -> inside the implementation of the described calling-operator it needs to |
790 |
|
|
* invoke either: |
791 |
|
|
* master()->JobSuccessful(const returned_data&) |
792 |
|
|
* or: |
793 |
|
|
* master()->JobFailed(const returned_data&) |
794 |
|
|
* as its LAST operation before returning. |
795 |
|
|
* This will keep track of finished jobs and inform the user of Concurrent- |
796 |
|
|
* Workers about finished jobs. |
797 |
|
|
* |
798 |
|
|
* -> [optional] overwrite Initialize() and/or TearDown() to do environmental |
799 |
|
|
* setup work, before or respectively after jobs will be processed |
800 |
|
|
* |
801 |
|
|
* General Reminder: |
802 |
|
|
* You will be running in a multi-threaded environment here! Buckle up and |
803 |
|
|
* make suitable preparations to shield yourself from serious head-ache. |
804 |
|
|
* |
805 |
|
|
* Note: This implements a Curiously Recurring Template Pattern |
806 |
|
|
* (http://en.wikipedia.org/wiki/Curiously_recurring_template_pattern) |
807 |
|
|
* |
808 |
|
|
* @param DerivedWorkerT the class name of the inheriting class |
809 |
|
|
* (f.e. class AwesomeWorker : public ConcurrentWorker<AwesomeWorker>) |
810 |
|
|
*/ |
811 |
|
|
template <class DerivedWorkerT> |
812 |
|
|
class ConcurrentWorker : SingleCopy { |
813 |
|
|
public: |
814 |
|
|
virtual ~ConcurrentWorker() {} |
815 |
|
|
|
816 |
|
|
/** |
817 |
|
|
* Does general initialization before any jobs will get scheduled. You do not |
818 |
|
|
* need to up-call this initialize method, since it is seen as a dummy here. |
819 |
|
|
* |
820 |
|
|
* @returns true one successful initialization |
821 |
|
|
*/ |
822 |
|
|
virtual bool Initialize() { return true; } |
823 |
|
|
|
824 |
|
|
/** |
825 |
|
|
* Does general clean-up after the last job was processed in the worker object |
826 |
|
|
* and it is about to vanish. You do not need to up-call this method. |
827 |
|
|
*/ |
828 |
|
|
virtual void TearDown() {} |
829 |
|
|
|
830 |
|
|
/** |
831 |
|
|
* The actual job-processing entry point. See the description of the inheriting |
832 |
|
|
* class requirements to learn about the semantics of this methods. |
833 |
|
|
* DO NOT FORGET TO CALL master()->JobSuccessful() OR master()->JobFinished() |
834 |
|
|
* at the end of thismethod!! |
835 |
|
|
* |
836 |
|
|
* Note: There is no way to generally define this operator, it is therefore |
837 |
|
|
* commented out and placed here just as a reminder. |
838 |
|
|
* |
839 |
|
|
* @param data the data to be processed. |
840 |
|
|
*/ |
841 |
|
|
// void operator()(const expected_data &data); // do the actual job of the |
842 |
|
|
// worker |
843 |
|
|
|
844 |
|
|
protected: |
845 |
|
|
ConcurrentWorker() : master_(NULL) {} |
846 |
|
|
|
847 |
|
|
/** |
848 |
|
|
* Gets a pointer to the ConcurrentWorkers object that this worker resides in |
849 |
|
|
* |
850 |
|
|
* @returns a pointer to the ConcurrentWorkers object |
851 |
|
|
*/ |
852 |
|
|
inline ConcurrentWorkers<DerivedWorkerT>* master() const { return master_; } |
853 |
|
|
|
854 |
|
|
private: |
855 |
|
|
friend class ConcurrentWorkers<DerivedWorkerT>; |
856 |
|
|
void RegisterMaster(ConcurrentWorkers<DerivedWorkerT> *master) { |
857 |
|
|
master_ = master; |
858 |
|
|
} |
859 |
|
|
|
860 |
|
|
private: |
861 |
|
|
ConcurrentWorkers<DerivedWorkerT> *master_; |
862 |
|
|
}; |
863 |
|
|
|
864 |
|
|
#ifdef CVMFS_NAMESPACE_GUARD |
865 |
|
|
} // namespace CVMFS_NAMESPACE_GUARD |
866 |
|
|
#endif |
867 |
|
|
|
868 |
|
|
#include "util_concurrency_impl.h" |
869 |
|
|
|
870 |
|
|
#endif // CVMFS_UTIL_CONCURRENCY_H_ |