Directory: | cvmfs/ |
---|---|
File: | cvmfs/util/tube.h |
Date: | 2025-07-06 02:35:01 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 130 | 130 | 100.0% |
Branches: | 49 | 74 | 66.2% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #ifndef CVMFS_UTIL_TUBE_H_ | ||
6 | #define CVMFS_UTIL_TUBE_H_ | ||
7 | |||
8 | #include <pthread.h> | ||
9 | #include <stdint.h> | ||
10 | |||
11 | #include <cassert> | ||
12 | #include <cstddef> | ||
13 | #include <vector> | ||
14 | |||
15 | #include "util/atomic.h" | ||
16 | #include "util/mutex.h" | ||
17 | #include "util/single_copy.h" | ||
18 | |||
19 | /** | ||
20 | * A thread-safe, doubly linked list of links containing pointers to ItemT. The | ||
21 | * ItemT elements are not owned by the Tube. FIFO or LIFO semantics. Using | ||
22 | * Slice(), items at arbitrary locations in the tube can be removed, too. | ||
23 | * | ||
24 | * | ||
25 | * The layout of the linked list is as follows: | ||
26 | * | ||
27 | * -------------------------------------------------------------- | ||
28 | * | | | ||
29 | * --> I$n$ (back) <--> I2 <--> ... <--> I1 (front) <--> HEAD <-- | ||
30 | * | ||
31 | * The tube links the steps in the file processing pipeline. It connects | ||
32 | * multiple producers to multiple consumers and can throttle the producers if a | ||
33 | * limit for the tube size is set. | ||
34 | * | ||
35 | * Internally, uses conditional variables to block when threads try to pop from | ||
36 | * the empty tube or insert into the full tube. | ||
37 | */ | ||
38 | template<class ItemT> | ||
39 | class Tube : SingleCopy { | ||
40 | public: | ||
41 | class Link : SingleCopy { | ||
42 | friend class Tube<ItemT>; | ||
43 | |||
44 | public: | ||
45 | 237113636 | explicit Link(ItemT *item) : item_(item), next_(NULL), prev_(NULL) { } | |
46 | 40 | ItemT *item() { return item_; } | |
47 | |||
48 | private: | ||
49 | ItemT *item_; | ||
50 | Link *next_; | ||
51 | Link *prev_; | ||
52 | }; | ||
53 | |||
54 | 188082 | Tube() : limit_(uint64_t(-1)), size_(0) { Init(); } | |
55 | 2695 | explicit Tube(uint64_t limit) : limit_(limit), size_(0) { Init(); } | |
56 | 193890 | ~Tube() { | |
57 | 194256 | Link *cursor = head_; | |
58 | do { | ||
59 | 194295 | Link *prev = cursor->prev_; | |
60 |
1/2✓ Branch 0 taken 98949 times.
✗ Branch 1 not taken.
|
194295 | delete cursor; |
61 | 194295 | cursor = prev; | |
62 |
2/2✓ Branch 0 taken 39 times.
✓ Branch 1 taken 98910 times.
|
194295 | } while (cursor != head_); |
63 | 194256 | pthread_cond_destroy(&cond_populated_); | |
64 | 194256 | pthread_cond_destroy(&cond_capacious_); | |
65 | 194256 | pthread_cond_destroy(&cond_empty_); | |
66 | 194256 | pthread_mutex_destroy(&lock_); | |
67 | 194256 | } | |
68 | |||
69 | /** | ||
70 | * Push an item to the back of the queue. Block if queue is currently full. | ||
71 | */ | ||
72 | 221852394 | Link *EnqueueBack(ItemT *item) { | |
73 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 120571268 times.
|
221852394 | assert(item != NULL); |
74 | 221852394 | MutexLockGuard lock_guard(&lock_); | |
75 |
2/2✓ Branch 0 taken 5539740 times.
✓ Branch 1 taken 120712663 times.
|
233208154 | while (size_ == limit_) |
76 |
1/2✓ Branch 1 taken 5552280 times.
✗ Branch 2 not taken.
|
11079480 | pthread_cond_wait(&cond_capacious_, &lock_); |
77 | |||
78 |
1/2✓ Branch 1 taken 120758053 times.
✗ Branch 2 not taken.
|
222128674 | Link *link = new Link(item); |
79 | 220046454 | link->next_ = head_->next_; | |
80 | 220046454 | link->prev_ = head_; | |
81 | 220046454 | head_->next_->prev_ = link; | |
82 | 220046454 | head_->next_ = link; | |
83 | 220046454 | size_++; | |
84 | 220046454 | int retval = pthread_cond_signal(&cond_populated_); | |
85 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 121030833 times.
|
222765014 | assert(retval == 0); |
86 | 221727644 | return link; | |
87 | 222765014 | } | |
88 | |||
89 | /** | ||
90 | * Push an item to the front of the queue. Block if queue currently full. | ||
91 | */ | ||
92 | 15688797 | Link *EnqueueFront(ItemT *item) { | |
93 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 15685455 times.
|
15688797 | assert(item != NULL); |
94 | 15688797 | MutexLockGuard lock_guard(&lock_); | |
95 |
2/2✓ Branch 0 taken 48 times.
✓ Branch 1 taken 15685689 times.
|
15689127 | while (size_ == limit_) |
96 |
1/2✓ Branch 1 taken 48 times.
✗ Branch 2 not taken.
|
96 | pthread_cond_wait(&cond_capacious_, &lock_); |
97 | |||
98 |
1/2✓ Branch 1 taken 15685689 times.
✗ Branch 2 not taken.
|
15689031 | Link *link = new Link(item); |
99 | 15688953 | link->next_ = head_; | |
100 | 15688953 | link->prev_ = head_->prev_; | |
101 | 15688953 | head_->prev_->next_ = link; | |
102 | 15688953 | head_->prev_ = link; | |
103 | 15688953 | size_++; | |
104 | 15688953 | int retval = pthread_cond_signal(&cond_populated_); | |
105 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 15685572 times.
|
15688914 | assert(retval == 0); |
106 | 15688797 | return link; | |
107 | 15688914 | } | |
108 | |||
109 | /** | ||
110 | * Remove any link from the queue and return its item, including first/last | ||
111 | * element. | ||
112 | */ | ||
113 | 48 | ItemT *Slice(Link *link) { | |
114 | 48 | MutexLockGuard lock_guard(&lock_); | |
115 | 48 | return SliceUnlocked(link); | |
116 | 48 | } | |
117 | |||
118 | /** | ||
119 | * Remove and return the first element from the queue. Block if tube is | ||
120 | * empty. | ||
121 | */ | ||
122 | 235537428 | ItemT *PopFront() { | |
123 | 235537428 | MutexLockGuard lock_guard(&lock_); | |
124 |
2/2✓ Branch 0 taken 44343994 times.
✓ Branch 1 taken 135283137 times.
|
324284077 | while (size_ == 0) |
125 |
1/2✓ Branch 1 taken 44791030 times.
✗ Branch 2 not taken.
|
87372191 | pthread_cond_wait(&cond_populated_, &lock_); |
126 | 471557247 | return SliceUnlocked(head_->prev_); | |
127 | 234268316 | } | |
128 | |||
129 | /** | ||
130 | * Remove and return the first element from the queue if there is any. | ||
131 | * Equivalent to an antomic | ||
132 | * ItemT item = NULL; | ||
133 | * if (!IsEmpty()) | ||
134 | * item = PopFront(); | ||
135 | */ | ||
136 | 15677089 | ItemT *TryPopFront() { | |
137 | 15677089 | MutexLockGuard lock_guard(&lock_); | |
138 | // Note that we don't need to wait for a signal to arrive | ||
139 |
2/2✓ Branch 0 taken 14357856 times.
✓ Branch 1 taken 1324420 times.
|
15682276 | if (size_ == 0) |
140 | 14357856 | return NULL; | |
141 | 1324420 | return SliceUnlocked(head_->prev_); | |
142 | 15682276 | } | |
143 | |||
144 | /** | ||
145 | * Remove and return the last element from the queue. Block if tube is | ||
146 | * empty. | ||
147 | */ | ||
148 | 6788 | ItemT *PopBack() { | |
149 | 6788 | MutexLockGuard lock_guard(&lock_); | |
150 |
2/2✓ Branch 0 taken 672 times.
✓ Branch 1 taken 3446 times.
|
8132 | while (size_ == 0) |
151 |
1/2✓ Branch 1 taken 672 times.
✗ Branch 2 not taken.
|
1344 | pthread_cond_wait(&cond_populated_, &lock_); |
152 | 13576 | return SliceUnlocked(head_->next_); | |
153 | 6788 | } | |
154 | |||
155 | /** | ||
156 | * Blocks until the tube is empty | ||
157 | */ | ||
158 | 1827 | void Wait() { | |
159 | 1827 | MutexLockGuard lock_guard(&lock_); | |
160 |
2/2✓ Branch 0 taken 637 times.
✓ Branch 1 taken 1827 times.
|
2464 | while (size_ > 0) |
161 |
1/2✓ Branch 1 taken 637 times.
✗ Branch 2 not taken.
|
637 | pthread_cond_wait(&cond_empty_, &lock_); |
162 | 1827 | } | |
163 | |||
164 | 9665 | bool IsEmpty() { | |
165 | 9665 | MutexLockGuard lock_guard(&lock_); | |
166 | 19330 | return size_ == 0; | |
167 | 9665 | } | |
168 | |||
169 | 390 | uint64_t size() { | |
170 | 390 | MutexLockGuard lock_guard(&lock_); | |
171 | 780 | return size_; | |
172 | 390 | } | |
173 | |||
174 | private: | ||
175 | 192008 | void Init() { | |
176 | 192008 | Link *sentinel = new Link(NULL); | |
177 | 192008 | head_ = sentinel; | |
178 | 192008 | head_->next_ = head_->prev_ = sentinel; | |
179 | |||
180 | 192008 | int retval = pthread_mutex_init(&lock_, NULL); | |
181 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 98986 times.
|
192008 | assert(retval == 0); |
182 | 192008 | retval = pthread_cond_init(&cond_populated_, NULL); | |
183 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 98986 times.
|
192008 | assert(retval == 0); |
184 | 192008 | retval = pthread_cond_init(&cond_capacious_, NULL); | |
185 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 98986 times.
|
192008 | assert(retval == 0); |
186 | 192008 | retval = pthread_cond_init(&cond_empty_, NULL); | |
187 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 98986 times.
|
192008 | assert(retval == 0); |
188 | 192008 | } | |
189 | |||
190 | 236663480 | ItemT *SliceUnlocked(Link *link) { | |
191 | // Cannot delete the sentinel link | ||
192 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 135819549 times.
|
236663480 | assert(link != head_); |
193 | 236663480 | link->prev_->next_ = link->next_; | |
194 | 236663480 | link->next_->prev_ = link->prev_; | |
195 | 236663480 | ItemT *item = link->item_; | |
196 |
2/2✓ Branch 0 taken 135386490 times.
✓ Branch 1 taken 433059 times.
|
236663480 | delete link; |
197 | 238286280 | size_--; | |
198 | 238286280 | int retval = pthread_cond_signal(&cond_capacious_); | |
199 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 135670922 times.
|
236366226 | assert(retval == 0); |
200 |
2/2✓ Branch 0 taken 49987276 times.
✓ Branch 1 taken 85683646 times.
|
236366226 | if (size_ == 0) { |
201 | 96759540 | retval = pthread_cond_broadcast(&cond_empty_); | |
202 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 50088966 times.
|
96760788 | assert(retval == 0); |
203 | } | ||
204 | 236569645 | return item; | |
205 | } | ||
206 | |||
207 | |||
208 | /** | ||
209 | * Adding new item blocks as long as limit_ == size_ | ||
210 | */ | ||
211 | uint64_t limit_; | ||
212 | /** | ||
213 | * The current number of links in the list | ||
214 | */ | ||
215 | uint64_t size_; | ||
216 | /** | ||
217 | * Sentinel element in front of the first (front) element | ||
218 | */ | ||
219 | Link *head_; | ||
220 | /** | ||
221 | * Protects all internal state | ||
222 | */ | ||
223 | pthread_mutex_t lock_; | ||
224 | /** | ||
225 | * Signals if there are items enqueued | ||
226 | */ | ||
227 | pthread_cond_t cond_populated_; | ||
228 | /** | ||
229 | * Signals if there is space to enqueue more items | ||
230 | */ | ||
231 | pthread_cond_t cond_capacious_; | ||
232 | /** | ||
233 | * Signals if the queue runs empty | ||
234 | */ | ||
235 | pthread_cond_t cond_empty_; | ||
236 | }; | ||
237 | |||
238 | |||
239 | /** | ||
240 | * A tube group manages a fixed set of Tubes and dispatches items among them in | ||
241 | * such a way that items with the same tag (a positive integer) are all sent | ||
242 | * to the same tube. | ||
243 | */ | ||
244 | template<class ItemT> | ||
245 | class TubeGroup : SingleCopy { | ||
246 | public: | ||
247 | 14816 | TubeGroup() : is_active_(false) { atomic_init32(&round_robin_); } | |
248 | |||
249 | 14805 | ~TubeGroup() { | |
250 |
2/2✓ Branch 1 taken 91295 times.
✓ Branch 2 taken 8615 times.
|
194510 | for (unsigned i = 0; i < tubes_.size(); ++i) |
251 |
1/2✓ Branch 1 taken 91295 times.
✗ Branch 2 not taken.
|
179705 | delete tubes_[i]; |
252 | 14805 | } | |
253 | |||
254 | 179850 | void TakeTube(Tube<ItemT> *t) { | |
255 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 91368 times.
|
179850 | assert(!is_active_); |
256 | 179850 | tubes_.push_back(t); | |
257 | 179850 | } | |
258 | |||
259 | 14816 | void Activate() { | |
260 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 8621 times.
|
14816 | assert(!is_active_); |
261 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 8621 times.
|
14816 | assert(!tubes_.empty()); |
262 | 14816 | is_active_ = true; | |
263 | 14816 | } | |
264 | |||
265 | /** | ||
266 | * Like Tube::EnqueueBack(), but pick a tube according to ItemT::tag() | ||
267 | */ | ||
268 | 90810144 | typename Tube<ItemT>::Link *Dispatch(ItemT *item) { | |
269 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 90810144 times.
|
90810144 | assert(is_active_); |
270 |
2/2✓ Branch 1 taken 71296532 times.
✓ Branch 2 taken 19355212 times.
|
90810144 | unsigned tube_idx = (tubes_.size() == 1) ? 0 |
271 | 71296532 | : (item->tag() % tubes_.size()); | |
272 | 90192557 | return tubes_[tube_idx]->EnqueueBack(item); | |
273 | } | ||
274 | |||
275 | /** | ||
276 | * Like Tube::EnqueueBack(), use tubes one after another | ||
277 | */ | ||
278 | 7502211 | typename Tube<ItemT>::Link *DispatchAny(ItemT *item) { | |
279 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7502211 times.
|
7502211 | assert(is_active_); |
280 |
2/2✓ Branch 1 taken 7502201 times.
✓ Branch 2 taken 10 times.
|
7502211 | unsigned tube_idx = (tubes_.size() == 1) |
281 | ? 0 | ||
282 | 7502201 | : (atomic_xadd32(&round_robin_, 1) % tubes_.size()); | |
283 | 7502211 | return tubes_[tube_idx]->EnqueueBack(item); | |
284 | } | ||
285 | |||
286 | private: | ||
287 | bool is_active_; | ||
288 | std::vector<Tube<ItemT> *> tubes_; | ||
289 | atomic_int32 round_robin_; | ||
290 | }; | ||
291 | |||
292 | #endif // CVMFS_UTIL_TUBE_H_ | ||
293 |