GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/util/tube.h
Date: 2026-02-01 02:35:56
Exec Total Coverage
Lines: 130 130 100.0%
Branches: 50 74 67.6%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_UTIL_TUBE_H_
6 #define CVMFS_UTIL_TUBE_H_
7
8 #include <pthread.h>
9 #include <stdint.h>
10
11 #include <cassert>
12 #include <cstddef>
13 #include <vector>
14
15 #include "util/atomic.h"
16 #include "util/mutex.h"
17 #include "util/single_copy.h"
18
19 /**
20 * A thread-safe, doubly linked list of links containing pointers to ItemT. The
21 * ItemT elements are not owned by the Tube. FIFO or LIFO semantics. Using
22 * Slice(), items at arbitrary locations in the tube can be removed, too.
23 *
24 *
25 * The layout of the linked list is as follows:
26 *
27 * --------------------------------------------------------------
28 * | |
29 * --> I$n$ (back) <--> I2 <--> ... <--> I1 (front) <--> HEAD <--
30 *
31 * The tube links the steps in the file processing pipeline. It connects
32 * multiple producers to multiple consumers and can throttle the producers if a
33 * limit for the tube size is set.
34 *
35 * Internally, uses conditional variables to block when threads try to pop from
36 * the empty tube or insert into the full tube.
37 */
38 template<class ItemT>
39 class Tube : SingleCopy {
40 public:
41 class Link : SingleCopy {
42 friend class Tube<ItemT>;
43
44 public:
45 311723791 explicit Link(ItemT *item) : item_(item), next_(NULL), prev_(NULL) { }
46 47 ItemT *item() { return item_; }
47
48 private:
49 ItemT *item_;
50 Link *next_;
51 Link *prev_;
52 };
53
54 226273 Tube() : limit_(uint64_t(-1)), size_(0) { Init(); }
55 2448 explicit Tube(uint64_t limit) : limit_(limit), size_(0) { Init(); }
56 235149 ~Tube() {
57 235396 Link *cursor = head_;
58 do {
59 235435 Link *prev = cursor->prev_;
60
1/2
✓ Branch 0 taken 120493 times.
✗ Branch 1 not taken.
235435 delete cursor;
61 235435 cursor = prev;
62
2/2
✓ Branch 0 taken 39 times.
✓ Branch 1 taken 120454 times.
235435 } while (cursor != head_);
63 235396 pthread_cond_destroy(&cond_populated_);
64 235396 pthread_cond_destroy(&cond_capacious_);
65 235396 pthread_cond_destroy(&cond_empty_);
66 235396 pthread_mutex_destroy(&lock_);
67 235396 }
68
69 /**
70 * Push an item to the back of the queue. Block if queue is currently full.
71 */
72 296790186 Link *EnqueueBack(ItemT *item) {
73
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 161656064 times.
296790186 assert(item != NULL);
74 296790186 MutexLockGuard lock_guard(&lock_);
75
2/2
✓ Branch 0 taken 7441200 times.
✓ Branch 1 taken 161524798 times.
311402995 while (size_ == limit_)
76
1/2
✓ Branch 1 taken 7398222 times.
✗ Branch 2 not taken.
14882400 pthread_cond_wait(&cond_capacious_, &lock_);
77
78
1/2
✓ Branch 1 taken 161690465 times.
✗ Branch 2 not taken.
296520595 Link *link = new Link(item);
79 294340641 link->next_ = head_->next_;
80 294340641 link->prev_ = head_;
81 294340641 head_->next_->prev_ = link;
82 294340641 head_->next_ = link;
83 294340641 size_++;
84 294340641 int retval = pthread_cond_signal(&cond_populated_);
85
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 162480686 times.
298432371 assert(retval == 0);
86 297471153 return link;
87 298432371 }
88
89 /**
90 * Push an item to the front of the queue. Block if queue currently full.
91 */
92 15703227 Link *EnqueueFront(ItemT *item) {
93
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15701120 times.
15703227 assert(item != NULL);
94 15703227 MutexLockGuard lock_guard(&lock_);
95
2/2
✓ Branch 0 taken 90 times.
✓ Branch 1 taken 15701315 times.
15703602 while (size_ == limit_)
96
1/2
✓ Branch 1 taken 90 times.
✗ Branch 2 not taken.
180 pthread_cond_wait(&cond_capacious_, &lock_);
97
98
1/2
✓ Branch 1 taken 15701354 times.
✗ Branch 2 not taken.
15703422 Link *link = new Link(item);
99 15703305 link->next_ = head_;
100 15703305 link->prev_ = head_->prev_;
101 15703305 head_->prev_->next_ = link;
102 15703305 head_->prev_ = link;
103 15703305 size_++;
104 15703305 int retval = pthread_cond_signal(&cond_populated_);
105
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15701198 times.
15703305 assert(retval == 0);
106 15703461 return link;
107 15703305 }
108
109 /**
110 * Remove any link from the queue and return its item, including first/last
111 * element.
112 */
113 71 ItemT *Slice(Link *link) {
114 71 MutexLockGuard lock_guard(&lock_);
115 71 return SliceUnlocked(link);
116 71 }
117
118 /**
119 * Remove and return the first element from the queue. Block if tube is
120 * empty.
121 */
122 310987407 ItemT *PopFront() {
123 310987407 MutexLockGuard lock_guard(&lock_);
124
2/2
✓ Branch 0 taken 58747199 times.
✓ Branch 1 taken 176201546 times.
427140392 while (size_ == 0)
125
1/2
✓ Branch 1 taken 58405653 times.
✗ Branch 2 not taken.
115639462 pthread_cond_wait(&cond_populated_, &lock_);
126 621404858 return SliceUnlocked(head_->prev_);
127 309669658 }
128
129 /**
130 * Remove and return the first element from the queue if there is any.
131 * Equivalent to an antomic
132 * ItemT item = NULL;
133 * if (!IsEmpty())
134 * item = PopFront();
135 */
136 15692492 ItemT *TryPopFront() {
137 15692492 MutexLockGuard lock_guard(&lock_);
138 // Note that we don't need to wait for a signal to arrive
139
2/2
✓ Branch 0 taken 14373351 times.
✓ Branch 1 taken 1325615 times.
15698966 if (size_ == 0)
140 14373351 return NULL;
141 1325615 return SliceUnlocked(head_->prev_);
142 15698966 }
143
144 /**
145 * Remove and return the last element from the queue. Block if tube is
146 * empty.
147 */
148 4526 ItemT *PopBack() {
149 4526 MutexLockGuard lock_guard(&lock_);
150
2/2
✓ Branch 0 taken 300 times.
✓ Branch 1 taken 2419 times.
5126 while (size_ == 0)
151
1/2
✓ Branch 1 taken 300 times.
✗ Branch 2 not taken.
600 pthread_cond_wait(&cond_populated_, &lock_);
152 9052 return SliceUnlocked(head_->next_);
153 4526 }
154
155 /**
156 * Blocks until the tube is empty
157 */
158 2130 void Wait() {
159 2130 MutexLockGuard lock_guard(&lock_);
160
2/2
✓ Branch 0 taken 1034 times.
✓ Branch 1 taken 2130 times.
3164 while (size_ > 0)
161
1/2
✓ Branch 1 taken 1034 times.
✗ Branch 2 not taken.
1034 pthread_cond_wait(&cond_empty_, &lock_);
162 2130 }
163
164 17894 bool IsEmpty() {
165 17894 MutexLockGuard lock_guard(&lock_);
166 35788 return size_ == 0;
167 17894 }
168
169 790 uint64_t size() {
170 790 MutexLockGuard lock_guard(&lock_);
171 1580 return size_;
172 790 }
173
174 private:
175 230181 void Init() {
176 230181 Link *sentinel = new Link(NULL);
177 230181 head_ = sentinel;
178 230181 head_->next_ = head_->prev_ = sentinel;
179
180 230181 int retval = pthread_mutex_init(&lock_, NULL);
181
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 120530 times.
230181 assert(retval == 0);
182 230181 retval = pthread_cond_init(&cond_populated_, NULL);
183
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 120530 times.
230181 assert(retval == 0);
184 230181 retval = pthread_cond_init(&cond_capacious_, NULL);
185
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 120530 times.
230181 assert(retval == 0);
186 230181 retval = pthread_cond_init(&cond_empty_, NULL);
187
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 120530 times.
230181 assert(retval == 0);
188 230181 }
189
190 311858683 ItemT *SliceUnlocked(Link *link) {
191 // Cannot delete the sentinel link
192
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 177042368 times.
311858683 assert(link != head_);
193 311858683 link->prev_->next_ = link->next_;
194 311858683 link->next_->prev_ = link->prev_;
195 311858683 ItemT *item = link->item_;
196
2/2
✓ Branch 0 taken 176500984 times.
✓ Branch 1 taken 541384 times.
311858683 delete link;
197 314101737 size_--;
198 314101737 int retval = pthread_cond_signal(&cond_capacious_);
199
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 177778158 times.
313330263 assert(retval == 0);
200
2/2
✓ Branch 0 taken 65763785 times.
✓ Branch 1 taken 112014373 times.
313330263 if (size_ == 0) {
201 127785212 retval = pthread_cond_broadcast(&cond_empty_);
202
2/2
✓ Branch 0 taken 721583 times.
✓ Branch 1 taken 65049065 times.
127798938 assert(retval == 0);
203 }
204 311900862 return item;
205 }
206
207
208 /**
209 * Adding new item blocks as long as limit_ == size_
210 */
211 uint64_t limit_;
212 /**
213 * The current number of links in the list
214 */
215 uint64_t size_;
216 /**
217 * Sentinel element in front of the first (front) element
218 */
219 Link *head_;
220 /**
221 * Protects all internal state
222 */
223 pthread_mutex_t lock_;
224 /**
225 * Signals if there are items enqueued
226 */
227 pthread_cond_t cond_populated_;
228 /**
229 * Signals if there is space to enqueue more items
230 */
231 pthread_cond_t cond_capacious_;
232 /**
233 * Signals if the queue runs empty
234 */
235 pthread_cond_t cond_empty_;
236 };
237
238
239 /**
240 * A tube group manages a fixed set of Tubes and dispatches items among them in
241 * such a way that items with the same tag (a positive integer) are all sent
242 * to the same tube.
243 */
244 template<class ItemT>
245 class TubeGroup : SingleCopy {
246 public:
247 19916 TubeGroup() : is_active_(false) { atomic_init32(&round_robin_); }
248
249 19905 ~TubeGroup() {
250
2/2
✓ Branch 1 taken 109332 times.
✓ Branch 2 taken 12218 times.
232815 for (unsigned i = 0; i < tubes_.size(); ++i)
251
1/2
✓ Branch 1 taken 109332 times.
✗ Branch 2 not taken.
212910 delete tubes_[i];
252 19905 }
253
254 213055 void TakeTube(Tube<ItemT> *t) {
255
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 109405 times.
213055 assert(!is_active_);
256 213055 tubes_.push_back(t);
257 213055 }
258
259 19916 void Activate() {
260
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12224 times.
19916 assert(!is_active_);
261
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 12224 times.
19916 assert(!tubes_.empty());
262 19916 is_active_ = true;
263 19916 }
264
265 /**
266 * Like Tube::EnqueueBack(), but pick a tube according to ItemT::tag()
267 */
268 120217632 typename Tube<ItemT>::Link *Dispatch(ItemT *item) {
269
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 120217632 times.
120217632 assert(is_active_);
270
2/2
✓ Branch 1 taken 92642504 times.
✓ Branch 2 taken 27410392 times.
120217632 unsigned tube_idx = (tubes_.size() == 1) ? 0
271 92642504 : (item->tag() % tubes_.size());
272 119196768 return tubes_[tube_idx]->EnqueueBack(item);
273 }
274
275 /**
276 * Like Tube::EnqueueBack(), use tubes one after another
277 */
278 9752875 typename Tube<ItemT>::Link *DispatchAny(ItemT *item) {
279
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9752875 times.
9752875 assert(is_active_);
280
2/2
✓ Branch 1 taken 9752777 times.
✓ Branch 2 taken 98 times.
9752875 unsigned tube_idx = (tubes_.size() == 1)
281 ? 0
282 9752777 : (atomic_xadd32(&round_robin_, 1) % tubes_.size());
283 9752875 return tubes_[tube_idx]->EnqueueBack(item);
284 }
285
286 private:
287 bool is_active_;
288 std::vector<Tube<ItemT> *> tubes_;
289 atomic_int32 round_robin_;
290 };
291
292 #endif // CVMFS_UTIL_TUBE_H_
293