GCC Code Coverage Report | |||||||||||||||||||||
|
|||||||||||||||||||||
Line | Branch | Exec | Source |
1 |
/** |
||
2 |
* This file is part of the CernVM File System. |
||
3 |
*/ |
||
4 |
|||
5 |
#ifndef CVMFS_INGESTION_TUBE_H_ |
||
6 |
#define CVMFS_INGESTION_TUBE_H_ |
||
7 |
|||
8 |
#include <pthread.h> |
||
9 |
#include <stdint.h> |
||
10 |
|||
11 |
#include <cassert> |
||
12 |
#include <vector> |
||
13 |
|||
14 |
#include "atomic.h" |
||
15 |
#include "util/pointer.h" |
||
16 |
#include "util/single_copy.h" |
||
17 |
#include "util_concurrency.h" |
||
18 |
|||
19 |
/** |
||
20 |
* A thread-safe, doubly linked list of links containing pointers to ItemT. The |
||
21 |
* ItemT elements are not owned by the Tube. FIFO semantics; items are pushed |
||
22 |
* to the back and poped from the front. Using Slice(), items at arbitrary |
||
23 |
* locations in the tube can be removed, too. |
||
24 |
* |
||
25 |
* The tube links the steps in the file processing pipeline. It connects |
||
26 |
* multiple producers to multiple consumers and can throttle the producers if a |
||
27 |
* limit for the tube size is set. |
||
28 |
* |
||
29 |
* Internally, uses conditional variables to block when threads try to pop from |
||
30 |
* the empty tube or insert into the full tube. |
||
31 |
*/ |
||
32 |
template <class ItemT> |
||
33 |
class Tube : SingleCopy { |
||
34 |
public: |
||
35 |
class Link : SingleCopy { |
||
36 |
friend class Tube<ItemT>; |
||
37 |
public: |
||
38 |
2321121 |
explicit Link(ItemT *item) : item_(item), next_(NULL), prev_(NULL) { } |
|
39 |
4 |
ItemT *item() { return item_; } |
|
40 |
|||
41 |
private: |
||
42 |
ItemT *item_; |
||
43 |
Link *next_; |
||
44 |
Link *prev_; |
||
45 |
}; |
||
46 |
|||
47 |
1270 |
Tube() : limit_(uint64_t(-1)), size_(0) { Init(); } |
|
48 |
88 |
explicit Tube(uint64_t limit) : limit_(limit), size_(0) { |
|
49 |
88 |
Init(); |
|
50 |
88 |
} |
|
51 |
1358 |
~Tube() { |
|
52 |
1358 |
Link *cursor = head_; |
|
53 |
✗✓✗✗ ✗✓✗✓ |
1358 |
do { |
54 |
1358 |
Link *prev = cursor->prev_; |
|
55 |
1358 |
delete cursor; |
|
56 |
1358 |
cursor = prev; |
|
57 |
} while (cursor != head_); |
||
58 |
1358 |
pthread_cond_destroy(&cond_populated_); |
|
59 |
1358 |
pthread_cond_destroy(&cond_capacious_); |
|
60 |
1358 |
pthread_cond_destroy(&cond_empty_); |
|
61 |
1358 |
pthread_mutex_destroy(&lock_); |
|
62 |
1358 |
} |
|
63 |
|||
64 |
/** |
||
65 |
* Push an item to the back of the queue. Block if queue is currently full. |
||
66 |
*/ |
||
67 |
2319573 |
Link *Enqueue(ItemT *item) { |
|
68 |
✗✓✗✓ ✗✓ |
2319573 |
assert(item != NULL); |
69 |
2319573 |
MutexLockGuard lock_guard(&lock_); |
|
70 |
✗✓✗✓ ✗✓ |
4639646 |
while (size_ == limit_) |
71 |
pthread_cond_wait(&cond_capacious_, &lock_); |
||
72 |
|||
73 |
2319825 |
Link *link = new Link(item); |
|
74 |
2319679 |
link->next_ = tail_; |
|
75 |
2319679 |
link->prev_ = tail_->prev_; |
|
76 |
2319679 |
tail_->prev_->next_ = link; |
|
77 |
2319679 |
tail_->prev_ = link; |
|
78 |
2319679 |
tail_ = link; |
|
79 |
2319679 |
size_++; |
|
80 |
2319679 |
int retval = pthread_cond_signal(&cond_populated_); |
|
81 |
✗✓✗✓ ✗✓ |
2319920 |
assert(retval == 0); |
82 |
2319920 |
return link; |
|
83 |
} |
||
84 |
|||
85 |
/** |
||
86 |
* Remove any link from the queue and return its item, including first/last |
||
87 |
* element. |
||
88 |
*/ |
||
89 |
4 |
ItemT *Slice(Link *link) { |
|
90 |
4 |
MutexLockGuard lock_guard(&lock_); |
|
91 |
4 |
return SliceUnlocked(link); |
|
92 |
} |
||
93 |
|||
94 |
/** |
||
95 |
* Remove and return the first element from the queue. Block if tube is |
||
96 |
* empty. |
||
97 |
*/ |
||
98 |
2319881 |
ItemT *Pop() { |
|
99 |
2319881 |
MutexLockGuard lock_guard(&lock_); |
|
100 |
✓✓✓✓ ✓✓ |
4789473 |
while (size_ == 0) |
101 |
149628 |
pthread_cond_wait(&cond_populated_, &lock_); |
|
102 |
2319905 |
return SliceUnlocked(head_->prev_); |
|
103 |
} |
||
104 |
|||
105 |
/** |
||
106 |
* Blocks until the tube is empty |
||
107 |
*/ |
||
108 |
105 |
void Wait() { |
|
109 |
105 |
MutexLockGuard lock_guard(&lock_); |
|
110 |
✓✓ | 285 |
while (size_ > 0) |
111 |
75 |
pthread_cond_wait(&cond_empty_, &lock_); |
|
112 |
105 |
} |
|
113 |
|||
114 |
13 |
bool IsEmpty() { |
|
115 |
13 |
MutexLockGuard lock_guard(&lock_); |
|
116 |
13 |
return size_ == 0; |
|
117 |
} |
||
118 |
|||
119 |
38 |
uint64_t size() { |
|
120 |
38 |
MutexLockGuard lock_guard(&lock_); |
|
121 |
38 |
return size_; |
|
122 |
} |
||
123 |
|||
124 |
private: |
||
125 |
1358 |
void Init() { |
|
126 |
1358 |
Link *sentinel = new Link(NULL); |
|
127 |
1358 |
head_ = tail_ = sentinel; |
|
128 |
1358 |
head_->next_ = head_->prev_ = sentinel; |
|
129 |
1358 |
tail_->next_ = tail_->prev_ = sentinel; |
|
130 |
|||
131 |
1358 |
int retval = pthread_mutex_init(&lock_, NULL); |
|
132 |
✗✓✗✓ ✗✓ |
1358 |
assert(retval == 0); |
133 |
1358 |
retval = pthread_cond_init(&cond_populated_, NULL); |
|
134 |
✗✓✗✓ ✗✓ |
1358 |
assert(retval == 0); |
135 |
1358 |
retval = pthread_cond_init(&cond_capacious_, NULL); |
|
136 |
✗✓✗✓ ✗✓ |
1358 |
assert(retval == 0); |
137 |
1358 |
retval = pthread_cond_init(&cond_empty_, NULL); |
|
138 |
✗✓✗✓ ✗✓ |
1358 |
assert(retval == 0); |
139 |
1358 |
} |
|
140 |
|||
141 |
2319890 |
ItemT *SliceUnlocked(Link *link) { |
|
142 |
2319890 |
link->prev_->next_ = link->next_; |
|
143 |
2319890 |
link->next_->prev_ = link->prev_; |
|
144 |
✓✓✓✓ ✓✓ |
2319890 |
if (link == tail_) |
145 |
143303 |
tail_ = head_; |
|
146 |
2319890 |
ItemT *item = link->item_; |
|
147 |
2319890 |
delete link; |
|
148 |
2319781 |
size_--; |
|
149 |
2319781 |
int retval = pthread_cond_signal(&cond_capacious_); |
|
150 |
✗✓✗✓ ✗✓ |
2319896 |
assert(retval == 0); |
151 |
✓✓✓✓ ✓✓ |
2319896 |
if (size_ == 0) { |
152 |
143307 |
retval = pthread_cond_broadcast(&cond_empty_); |
|
153 |
✗✓✗✓ ✗✓ |
143303 |
assert(retval == 0); |
154 |
} |
||
155 |
2319892 |
return item; |
|
156 |
} |
||
157 |
|||
158 |
|||
159 |
/** |
||
160 |
* Adding new item blocks as long as limit_ == size_ |
||
161 |
*/ |
||
162 |
uint64_t limit_; |
||
163 |
/** |
||
164 |
* The current number of links in the list |
||
165 |
*/ |
||
166 |
uint64_t size_; |
||
167 |
/** |
||
168 |
* In front of the first element (next in line for Pop()) |
||
169 |
*/ |
||
170 |
Link *head_; |
||
171 |
/** |
||
172 |
* Points to the last inserted element |
||
173 |
*/ |
||
174 |
Link *tail_; |
||
175 |
/** |
||
176 |
* Protects all internal state |
||
177 |
*/ |
||
178 |
pthread_mutex_t lock_; |
||
179 |
/** |
||
180 |
* Signals if there are items enqueued |
||
181 |
*/ |
||
182 |
pthread_cond_t cond_populated_; |
||
183 |
/** |
||
184 |
* Signals if there is space to enqueue more items |
||
185 |
*/ |
||
186 |
pthread_cond_t cond_capacious_; |
||
187 |
/** |
||
188 |
* Signals if the queue runs empty |
||
189 |
*/ |
||
190 |
pthread_cond_t cond_empty_; |
||
191 |
}; |
||
192 |
|||
193 |
|||
194 |
/** |
||
195 |
* A tube group manages a fixed set of Tubes and dispatches items among them in |
||
196 |
* such a way that items with the same tag (a positive integer) are all sent |
||
197 |
* to the same tube. |
||
198 |
*/ |
||
199 |
template <class ItemT> |
||
200 |
class TubeGroup : SingleCopy { |
||
201 |
public: |
||
202 |
687 |
TubeGroup() : is_active_(false) { |
|
203 |
687 |
atomic_init32(&round_robin_); |
|
204 |
687 |
} |
|
205 |
|||
206 |
687 |
~TubeGroup() { |
|
207 |
✓✓✓✓ ✓✓ |
1837 |
for (unsigned i = 0; i < tubes_.size(); ++i) |
208 |
✓✗✗✓ ✗✓✗ |
1158 |
delete tubes_[i]; |
209 |
687 |
} |
|
210 |
|||
211 |
1150 |
void TakeTube(Tube<ItemT> *t) { |
|
212 |
✗✓✗✓ |
1150 |
assert(!is_active_); |
213 |
1150 |
tubes_.push_back(t); |
|
214 |
1150 |
} |
|
215 |
|||
216 |
687 |
void Activate() { |
|
217 |
✗✓✗✓ |
687 |
assert(!is_active_); |
218 |
✗✓✗✓ |
687 |
assert(!tubes_.empty()); |
219 |
687 |
is_active_ = true; |
|
220 |
687 |
} |
|
221 |
|||
222 |
/** |
||
223 |
* Like Tube::Enqueue(), but pick a tube according to ItemT::tag() |
||
224 |
*/ |
||
225 |
2244041 |
typename Tube<ItemT>::Link *Dispatch(ItemT *item) { |
|
226 |
✗✓ | 2244041 |
assert(is_active_); |
227 |
unsigned tube_idx = (tubes_.size() == 1) |
||
228 |
✓✓ | 2244041 |
? 0 : (item->tag() % tubes_.size()); |
229 |
2244201 |
return tubes_[tube_idx]->Enqueue(item); |
|
230 |
} |
||
231 |
|||
232 |
/** |
||
233 |
* Like Tube::Enqueue(), use tubes one after another |
||
234 |
*/ |
||
235 |
151 |
typename Tube<ItemT>::Link *DispatchAny(ItemT *item) { |
|
236 |
✗✓ | 151 |
assert(is_active_); |
237 |
unsigned tube_idx = (tubes_.size() == 1) |
||
238 |
✗✓ | 151 |
? 0 : (atomic_xadd32(&round_robin_, 1) % tubes_.size()); |
239 |
151 |
return tubes_[tube_idx]->Enqueue(item); |
|
240 |
} |
||
241 |
|||
242 |
private: |
||
243 |
bool is_active_; |
||
244 |
std::vector<Tube<ItemT> *> tubes_; |
||
245 |
atomic_int32 round_robin_; |
||
246 |
}; |
||
247 |
|||
248 |
#endif // CVMFS_INGESTION_TUBE_H_ |
Generated by: GCOVR (Version 4.1) |