Directory: | cvmfs/ |
---|---|
File: | cvmfs/util/tube.h |
Date: | 2025-04-20 02:34:28 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 134 | 134 | 100.0% |
Branches: | 49 | 74 | 66.2% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #ifndef CVMFS_UTIL_TUBE_H_ | ||
6 | #define CVMFS_UTIL_TUBE_H_ | ||
7 | |||
8 | #include <pthread.h> | ||
9 | #include <stdint.h> | ||
10 | |||
11 | #include <cassert> | ||
12 | #include <vector> | ||
13 | |||
14 | #include "util/atomic.h" | ||
15 | #include "util/concurrency.h" | ||
16 | #include "util/pointer.h" | ||
17 | #include "util/single_copy.h" | ||
18 | |||
19 | /** | ||
20 | * A thread-safe, doubly linked list of links containing pointers to ItemT. The | ||
21 | * ItemT elements are not owned by the Tube. FIFO or LIFO semantics. Using | ||
22 | * Slice(), items at arbitrary locations in the tube can be removed, too. | ||
23 | * | ||
24 | * | ||
25 | * The layout of the linked list is as follows: | ||
26 | * | ||
27 | * -------------------------------------------------------------- | ||
28 | * | | | ||
29 | * --> I$n$ (back) <--> I2 <--> ... <--> I1 (front) <--> HEAD <-- | ||
30 | * | ||
31 | * The tube links the steps in the file processing pipeline. It connects | ||
32 | * multiple producers to multiple consumers and can throttle the producers if a | ||
33 | * limit for the tube size is set. | ||
34 | * | ||
35 | * Internally, uses conditional variables to block when threads try to pop from | ||
36 | * the empty tube or insert into the full tube. | ||
37 | */ | ||
38 | template <class ItemT> | ||
39 | class Tube : SingleCopy { | ||
40 | public: | ||
41 | class Link : SingleCopy { | ||
42 | friend class Tube<ItemT>; | ||
43 | public: | ||
44 | 7945864 | explicit Link(ItemT *item) : item_(item), next_(NULL), prev_(NULL) { } | |
45 | 1 | ItemT *item() { return item_; } | |
46 | |||
47 | private: | ||
48 | ItemT *item_; | ||
49 | Link *next_; | ||
50 | Link *prev_; | ||
51 | }; | ||
52 | |||
53 | 7057 | Tube() : limit_(uint64_t(-1)), size_(0) { Init(); } | |
54 | 78 | explicit Tube(uint64_t limit) : limit_(limit), size_(0) { | |
55 | 78 | Init(); | |
56 | 78 | } | |
57 | 7303 | ~Tube() { | |
58 | 7311 | Link *cursor = head_; | |
59 | do { | ||
60 | 7312 | Link *prev = cursor->prev_; | |
61 |
1/2✓ Branch 0 taken 3723 times.
✗ Branch 1 not taken.
|
7312 | delete cursor; |
62 | 7312 | cursor = prev; | |
63 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 3722 times.
|
7312 | } while (cursor != head_); |
64 | 7311 | pthread_cond_destroy(&cond_populated_); | |
65 | 7311 | pthread_cond_destroy(&cond_capacious_); | |
66 | 7311 | pthread_cond_destroy(&cond_empty_); | |
67 | 7311 | pthread_mutex_destroy(&lock_); | |
68 | 7311 | } | |
69 | |||
70 | /** | ||
71 | * Push an item to the back of the queue. Block if queue is currently full. | ||
72 | */ | ||
73 | 7590235 | Link *EnqueueBack(ItemT *item) { | |
74 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 4130716 times.
|
7590235 | assert(item != NULL); |
75 | 7590235 | MutexLockGuard lock_guard(&lock_); | |
76 |
2/2✓ Branch 0 taken 208657 times.
✓ Branch 1 taken 4106459 times.
|
7958836 | while (size_ == limit_) |
77 |
1/2✓ Branch 1 taken 208011 times.
✗ Branch 2 not taken.
|
417314 | pthread_cond_wait(&cond_capacious_, &lock_); |
78 | |||
79 |
1/2✓ Branch 1 taken 4110759 times.
✗ Branch 2 not taken.
|
7541522 | Link *link = new Link(item); |
80 | 7503292 | link->next_ = head_->next_; | |
81 | 7503292 | link->prev_ = head_; | |
82 | 7503292 | head_->next_->prev_ = link; | |
83 | 7503292 | head_->next_ = link; | |
84 | 7503292 | size_++; | |
85 | 7503292 | int retval = pthread_cond_signal(&cond_populated_); | |
86 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 4144973 times.
|
7618550 | assert(retval == 0); |
87 | 7620359 | return link; | |
88 | 7618550 | } | |
89 | |||
90 | /** | ||
91 | * Push an item to the front of the queue. Block if queue currently full. | ||
92 | */ | ||
93 | 402589 | Link *EnqueueFront(ItemT *item) { | |
94 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 402519 times.
|
402589 | assert(item != NULL); |
95 | 402589 | MutexLockGuard lock_guard(&lock_); | |
96 |
2/2✓ Branch 0 taken 2 times.
✓ Branch 1 taken 402516 times.
|
402590 | while (size_ == limit_) |
97 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
4 | pthread_cond_wait(&cond_capacious_, &lock_); |
98 | |||
99 |
1/2✓ Branch 1 taken 402517 times.
✗ Branch 2 not taken.
|
402586 | Link *link = new Link(item); |
100 | 402583 | link->next_ = head_; | |
101 | 402583 | link->prev_ = head_->prev_; | |
102 | 402583 | head_->prev_->next_ = link; | |
103 | 402583 | head_->prev_ = link; | |
104 | 402583 | size_++; | |
105 | 402583 | int retval = pthread_cond_signal(&cond_populated_); | |
106 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 402512 times.
|
402582 | assert(retval == 0); |
107 | 402590 | return link; | |
108 | 402582 | } | |
109 | |||
110 | /** | ||
111 | * Remove any link from the queue and return its item, including first/last | ||
112 | * element. | ||
113 | */ | ||
114 | 2 | ItemT *Slice(Link *link) { | |
115 | 2 | MutexLockGuard lock_guard(&lock_); | |
116 | 2 | return SliceUnlocked(link); | |
117 | 2 | } | |
118 | |||
119 | /** | ||
120 | * Remove and return the first element from the queue. Block if tube is | ||
121 | * empty. | ||
122 | */ | ||
123 | 7948323 | ItemT *PopFront() { | |
124 | 7948323 | MutexLockGuard lock_guard(&lock_); | |
125 |
2/2✓ Branch 0 taken 1497817 times.
✓ Branch 1 taken 4488507 times.
|
10886348 | while (size_ == 0) |
126 |
1/2✓ Branch 1 taken 1504141 times.
✗ Branch 2 not taken.
|
2949184 | pthread_cond_wait(&cond_populated_, &lock_); |
127 | 15893819 | return SliceUnlocked(head_->prev_); | |
128 | 7886972 | } | |
129 | |||
130 | /** | ||
131 | * Remove and return the first element from the queue if there is any. | ||
132 | * Equivalent to an antomic | ||
133 | * ItemT item = NULL; | ||
134 | * if (!IsEmpty()) | ||
135 | * item = PopFront(); | ||
136 | */ | ||
137 | 402295 | ItemT *TryPopFront() { | |
138 | 402295 | MutexLockGuard lock_guard(&lock_); | |
139 | // Note that we don't need to wait for a signal to arrive | ||
140 |
2/2✓ Branch 0 taken 368457 times.
✓ Branch 1 taken 33983 times.
|
402440 | if (size_ == 0) |
141 | 368457 | return NULL; | |
142 | 33983 | return SliceUnlocked(head_->prev_); | |
143 | 402440 | } | |
144 | |||
145 | /** | ||
146 | * Remove and return the last element from the queue. Block if tube is | ||
147 | * empty. | ||
148 | */ | ||
149 | 153 | ItemT *PopBack() { | |
150 | 153 | MutexLockGuard lock_guard(&lock_); | |
151 |
2/2✓ Branch 0 taken 14 times.
✓ Branch 1 taken 83 times.
|
181 | while (size_ == 0) |
152 |
1/2✓ Branch 1 taken 14 times.
✗ Branch 2 not taken.
|
28 | pthread_cond_wait(&cond_populated_, &lock_); |
153 | 306 | return SliceUnlocked(head_->next_); | |
154 | 153 | } | |
155 | |||
156 | /** | ||
157 | * Blocks until the tube is empty | ||
158 | */ | ||
159 | 70 | void Wait() { | |
160 | 70 | MutexLockGuard lock_guard(&lock_); | |
161 |
2/2✓ Branch 0 taken 25 times.
✓ Branch 1 taken 70 times.
|
95 | while (size_ > 0) |
162 |
1/2✓ Branch 1 taken 25 times.
✗ Branch 2 not taken.
|
25 | pthread_cond_wait(&cond_empty_, &lock_); |
163 | 70 | } | |
164 | |||
165 | 421 | bool IsEmpty() { | |
166 | 421 | MutexLockGuard lock_guard(&lock_); | |
167 | 842 | return size_ == 0; | |
168 | 421 | } | |
169 | |||
170 | 19 | uint64_t size() { | |
171 | 19 | MutexLockGuard lock_guard(&lock_); | |
172 | 38 | return size_; | |
173 | 19 | } | |
174 | |||
175 | private: | ||
176 | 7181 | void Init() { | |
177 | 7181 | Link *sentinel = new Link(NULL); | |
178 | 7181 | head_ = sentinel; | |
179 | 7181 | head_->next_ = head_->prev_ = sentinel; | |
180 | |||
181 | 7181 | int retval = pthread_mutex_init(&lock_, NULL); | |
182 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3722 times.
|
7181 | assert(retval == 0); |
183 | 7181 | retval = pthread_cond_init(&cond_populated_, NULL); | |
184 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3722 times.
|
7181 | assert(retval == 0); |
185 | 7181 | retval = pthread_cond_init(&cond_capacious_, NULL); | |
186 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3722 times.
|
7181 | assert(retval == 0); |
187 | 7181 | retval = pthread_cond_init(&cond_empty_, NULL); | |
188 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3722 times.
|
7181 | assert(retval == 0); |
189 | 7181 | } | |
190 | |||
191 | 7944266 | ItemT *SliceUnlocked(Link *link) { | |
192 | // Cannot delete the sentinel link | ||
193 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 4509022 times.
|
7944266 | assert(link != head_); |
194 | 7944266 | link->prev_->next_ = link->next_; | |
195 | 7944266 | link->next_->prev_ = link->prev_; | |
196 | 7944266 | ItemT *item = link->item_; | |
197 |
2/2✓ Branch 0 taken 4491797 times.
✓ Branch 1 taken 17225 times.
|
7944266 | delete link; |
198 | 7999388 | size_--; | |
199 | 7999388 | int retval = pthread_cond_signal(&cond_capacious_); | |
200 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 4507018 times.
|
7940258 | assert(retval == 0); |
201 |
2/2✓ Branch 0 taken 1621797 times.
✓ Branch 1 taken 2885221 times.
|
7940258 | if (size_ == 0) { |
202 | 3149422 | retval = pthread_cond_broadcast(&cond_empty_); | |
203 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1625943 times.
|
3151690 | assert(retval == 0); |
204 | } | ||
205 | 7948550 | return item; | |
206 | } | ||
207 | |||
208 | |||
209 | /** | ||
210 | * Adding new item blocks as long as limit_ == size_ | ||
211 | */ | ||
212 | uint64_t limit_; | ||
213 | /** | ||
214 | * The current number of links in the list | ||
215 | */ | ||
216 | uint64_t size_; | ||
217 | /** | ||
218 | * Sentinel element in front of the first (front) element | ||
219 | */ | ||
220 | Link *head_; | ||
221 | /** | ||
222 | * Protects all internal state | ||
223 | */ | ||
224 | pthread_mutex_t lock_; | ||
225 | /** | ||
226 | * Signals if there are items enqueued | ||
227 | */ | ||
228 | pthread_cond_t cond_populated_; | ||
229 | /** | ||
230 | * Signals if there is space to enqueue more items | ||
231 | */ | ||
232 | pthread_cond_t cond_capacious_; | ||
233 | /** | ||
234 | * Signals if the queue runs empty | ||
235 | */ | ||
236 | pthread_cond_t cond_empty_; | ||
237 | }; | ||
238 | |||
239 | |||
240 | /** | ||
241 | * A tube group manages a fixed set of Tubes and dispatches items among them in | ||
242 | * such a way that items with the same tag (a positive integer) are all sent | ||
243 | * to the same tube. | ||
244 | */ | ||
245 | template <class ItemT> | ||
246 | class TubeGroup : SingleCopy { | ||
247 | public: | ||
248 | 592 | TubeGroup() : is_active_(false) { | |
249 | 592 | atomic_init32(&round_robin_); | |
250 | 592 | } | |
251 | |||
252 | 592 | ~TubeGroup() { | |
253 |
2/2✓ Branch 1 taken 3423 times.
✓ Branch 2 taken 354 times.
|
7297 | for (unsigned i = 0; i < tubes_.size(); ++i) |
254 |
1/2✓ Branch 1 taken 3423 times.
✗ Branch 2 not taken.
|
6705 | delete tubes_[i]; |
255 | 592 | } | |
256 | |||
257 | 6705 | void TakeTube(Tube<ItemT> *t) { | |
258 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3423 times.
|
6705 | assert(!is_active_); |
259 | 6705 | tubes_.push_back(t); | |
260 | 6705 | } | |
261 | |||
262 | 592 | void Activate() { | |
263 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 354 times.
|
592 | assert(!is_active_); |
264 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 354 times.
|
592 | assert(!tubes_.empty()); |
265 | 592 | is_active_ = true; | |
266 | 592 | } | |
267 | |||
268 | /** | ||
269 | * Like Tube::EnqueueBack(), but pick a tube according to ItemT::tag() | ||
270 | */ | ||
271 | 3075412 | typename Tube<ItemT>::Link *Dispatch(ItemT *item) { | |
272 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3075412 times.
|
3075412 | assert(is_active_); |
273 |
2/2✓ Branch 1 taken 2379076 times.
✓ Branch 2 taken 689295 times.
|
3075412 | unsigned tube_idx = (tubes_.size() == 1) |
274 | 2379076 | ? 0 : (item->tag() % tubes_.size()); | |
275 | 3055153 | return tubes_[tube_idx]->EnqueueBack(item); | |
276 | } | ||
277 | |||
278 | /** | ||
279 | * Like Tube::EnqueueBack(), use tubes one after another | ||
280 | */ | ||
281 | 250091 | typename Tube<ItemT>::Link *DispatchAny(ItemT *item) { | |
282 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 250091 times.
|
250091 | assert(is_active_); |
283 |
2/2✓ Branch 1 taken 250089 times.
✓ Branch 2 taken 2 times.
|
250091 | unsigned tube_idx = (tubes_.size() == 1) |
284 | 250089 | ? 0 : (atomic_xadd32(&round_robin_, 1) % tubes_.size()); | |
285 | 250091 | return tubes_[tube_idx]->EnqueueBack(item); | |
286 | } | ||
287 | |||
288 | private: | ||
289 | bool is_active_; | ||
290 | std::vector<Tube<ItemT> *> tubes_; | ||
291 | atomic_int32 round_robin_; | ||
292 | }; | ||
293 | |||
294 | #endif // CVMFS_UTIL_TUBE_H_ | ||
295 |