GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/util/tube.h
Date: 2025-04-20 02:34:28
Exec Total Coverage
Lines: 134 134 100.0%
Branches: 49 74 66.2%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_UTIL_TUBE_H_
6 #define CVMFS_UTIL_TUBE_H_
7
8 #include <pthread.h>
9 #include <stdint.h>
10
11 #include <cassert>
12 #include <vector>
13
14 #include "util/atomic.h"
15 #include "util/concurrency.h"
16 #include "util/pointer.h"
17 #include "util/single_copy.h"
18
19 /**
20 * A thread-safe, doubly linked list of links containing pointers to ItemT. The
21 * ItemT elements are not owned by the Tube. FIFO or LIFO semantics. Using
22 * Slice(), items at arbitrary locations in the tube can be removed, too.
23 *
24 *
25 * The layout of the linked list is as follows:
26 *
27 * --------------------------------------------------------------
28 * | |
29 * --> I$n$ (back) <--> I2 <--> ... <--> I1 (front) <--> HEAD <--
30 *
31 * The tube links the steps in the file processing pipeline. It connects
32 * multiple producers to multiple consumers and can throttle the producers if a
33 * limit for the tube size is set.
34 *
35 * Internally, uses conditional variables to block when threads try to pop from
36 * the empty tube or insert into the full tube.
37 */
38 template <class ItemT>
39 class Tube : SingleCopy {
40 public:
41 class Link : SingleCopy {
42 friend class Tube<ItemT>;
43 public:
44 7945864 explicit Link(ItemT *item) : item_(item), next_(NULL), prev_(NULL) { }
45 1 ItemT *item() { return item_; }
46
47 private:
48 ItemT *item_;
49 Link *next_;
50 Link *prev_;
51 };
52
53 7057 Tube() : limit_(uint64_t(-1)), size_(0) { Init(); }
54 78 explicit Tube(uint64_t limit) : limit_(limit), size_(0) {
55 78 Init();
56 78 }
57 7303 ~Tube() {
58 7311 Link *cursor = head_;
59 do {
60 7312 Link *prev = cursor->prev_;
61
1/2
✓ Branch 0 taken 3723 times.
✗ Branch 1 not taken.
7312 delete cursor;
62 7312 cursor = prev;
63
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 3722 times.
7312 } while (cursor != head_);
64 7311 pthread_cond_destroy(&cond_populated_);
65 7311 pthread_cond_destroy(&cond_capacious_);
66 7311 pthread_cond_destroy(&cond_empty_);
67 7311 pthread_mutex_destroy(&lock_);
68 7311 }
69
70 /**
71 * Push an item to the back of the queue. Block if queue is currently full.
72 */
73 7590235 Link *EnqueueBack(ItemT *item) {
74
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4130716 times.
7590235 assert(item != NULL);
75 7590235 MutexLockGuard lock_guard(&lock_);
76
2/2
✓ Branch 0 taken 208657 times.
✓ Branch 1 taken 4106459 times.
7958836 while (size_ == limit_)
77
1/2
✓ Branch 1 taken 208011 times.
✗ Branch 2 not taken.
417314 pthread_cond_wait(&cond_capacious_, &lock_);
78
79
1/2
✓ Branch 1 taken 4110759 times.
✗ Branch 2 not taken.
7541522 Link *link = new Link(item);
80 7503292 link->next_ = head_->next_;
81 7503292 link->prev_ = head_;
82 7503292 head_->next_->prev_ = link;
83 7503292 head_->next_ = link;
84 7503292 size_++;
85 7503292 int retval = pthread_cond_signal(&cond_populated_);
86
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4144973 times.
7618550 assert(retval == 0);
87 7620359 return link;
88 7618550 }
89
90 /**
91 * Push an item to the front of the queue. Block if queue currently full.
92 */
93 402589 Link *EnqueueFront(ItemT *item) {
94
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 402519 times.
402589 assert(item != NULL);
95 402589 MutexLockGuard lock_guard(&lock_);
96
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 402516 times.
402590 while (size_ == limit_)
97
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
4 pthread_cond_wait(&cond_capacious_, &lock_);
98
99
1/2
✓ Branch 1 taken 402517 times.
✗ Branch 2 not taken.
402586 Link *link = new Link(item);
100 402583 link->next_ = head_;
101 402583 link->prev_ = head_->prev_;
102 402583 head_->prev_->next_ = link;
103 402583 head_->prev_ = link;
104 402583 size_++;
105 402583 int retval = pthread_cond_signal(&cond_populated_);
106
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 402512 times.
402582 assert(retval == 0);
107 402590 return link;
108 402582 }
109
110 /**
111 * Remove any link from the queue and return its item, including first/last
112 * element.
113 */
114 2 ItemT *Slice(Link *link) {
115 2 MutexLockGuard lock_guard(&lock_);
116 2 return SliceUnlocked(link);
117 2 }
118
119 /**
120 * Remove and return the first element from the queue. Block if tube is
121 * empty.
122 */
123 7948323 ItemT *PopFront() {
124 7948323 MutexLockGuard lock_guard(&lock_);
125
2/2
✓ Branch 0 taken 1497817 times.
✓ Branch 1 taken 4488507 times.
10886348 while (size_ == 0)
126
1/2
✓ Branch 1 taken 1504141 times.
✗ Branch 2 not taken.
2949184 pthread_cond_wait(&cond_populated_, &lock_);
127 15893819 return SliceUnlocked(head_->prev_);
128 7886972 }
129
130 /**
131 * Remove and return the first element from the queue if there is any.
132 * Equivalent to an antomic
133 * ItemT item = NULL;
134 * if (!IsEmpty())
135 * item = PopFront();
136 */
137 402295 ItemT *TryPopFront() {
138 402295 MutexLockGuard lock_guard(&lock_);
139 // Note that we don't need to wait for a signal to arrive
140
2/2
✓ Branch 0 taken 368457 times.
✓ Branch 1 taken 33983 times.
402440 if (size_ == 0)
141 368457 return NULL;
142 33983 return SliceUnlocked(head_->prev_);
143 402440 }
144
145 /**
146 * Remove and return the last element from the queue. Block if tube is
147 * empty.
148 */
149 153 ItemT *PopBack() {
150 153 MutexLockGuard lock_guard(&lock_);
151
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 83 times.
181 while (size_ == 0)
152
1/2
✓ Branch 1 taken 14 times.
✗ Branch 2 not taken.
28 pthread_cond_wait(&cond_populated_, &lock_);
153 306 return SliceUnlocked(head_->next_);
154 153 }
155
156 /**
157 * Blocks until the tube is empty
158 */
159 70 void Wait() {
160 70 MutexLockGuard lock_guard(&lock_);
161
2/2
✓ Branch 0 taken 25 times.
✓ Branch 1 taken 70 times.
95 while (size_ > 0)
162
1/2
✓ Branch 1 taken 25 times.
✗ Branch 2 not taken.
25 pthread_cond_wait(&cond_empty_, &lock_);
163 70 }
164
165 421 bool IsEmpty() {
166 421 MutexLockGuard lock_guard(&lock_);
167 842 return size_ == 0;
168 421 }
169
170 19 uint64_t size() {
171 19 MutexLockGuard lock_guard(&lock_);
172 38 return size_;
173 19 }
174
175 private:
176 7181 void Init() {
177 7181 Link *sentinel = new Link(NULL);
178 7181 head_ = sentinel;
179 7181 head_->next_ = head_->prev_ = sentinel;
180
181 7181 int retval = pthread_mutex_init(&lock_, NULL);
182
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3722 times.
7181 assert(retval == 0);
183 7181 retval = pthread_cond_init(&cond_populated_, NULL);
184
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3722 times.
7181 assert(retval == 0);
185 7181 retval = pthread_cond_init(&cond_capacious_, NULL);
186
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3722 times.
7181 assert(retval == 0);
187 7181 retval = pthread_cond_init(&cond_empty_, NULL);
188
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3722 times.
7181 assert(retval == 0);
189 7181 }
190
191 7944266 ItemT *SliceUnlocked(Link *link) {
192 // Cannot delete the sentinel link
193
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4509022 times.
7944266 assert(link != head_);
194 7944266 link->prev_->next_ = link->next_;
195 7944266 link->next_->prev_ = link->prev_;
196 7944266 ItemT *item = link->item_;
197
2/2
✓ Branch 0 taken 4491797 times.
✓ Branch 1 taken 17225 times.
7944266 delete link;
198 7999388 size_--;
199 7999388 int retval = pthread_cond_signal(&cond_capacious_);
200
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4507018 times.
7940258 assert(retval == 0);
201
2/2
✓ Branch 0 taken 1621797 times.
✓ Branch 1 taken 2885221 times.
7940258 if (size_ == 0) {
202 3149422 retval = pthread_cond_broadcast(&cond_empty_);
203
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1625943 times.
3151690 assert(retval == 0);
204 }
205 7948550 return item;
206 }
207
208
209 /**
210 * Adding new item blocks as long as limit_ == size_
211 */
212 uint64_t limit_;
213 /**
214 * The current number of links in the list
215 */
216 uint64_t size_;
217 /**
218 * Sentinel element in front of the first (front) element
219 */
220 Link *head_;
221 /**
222 * Protects all internal state
223 */
224 pthread_mutex_t lock_;
225 /**
226 * Signals if there are items enqueued
227 */
228 pthread_cond_t cond_populated_;
229 /**
230 * Signals if there is space to enqueue more items
231 */
232 pthread_cond_t cond_capacious_;
233 /**
234 * Signals if the queue runs empty
235 */
236 pthread_cond_t cond_empty_;
237 };
238
239
240 /**
241 * A tube group manages a fixed set of Tubes and dispatches items among them in
242 * such a way that items with the same tag (a positive integer) are all sent
243 * to the same tube.
244 */
245 template <class ItemT>
246 class TubeGroup : SingleCopy {
247 public:
248 592 TubeGroup() : is_active_(false) {
249 592 atomic_init32(&round_robin_);
250 592 }
251
252 592 ~TubeGroup() {
253
2/2
✓ Branch 1 taken 3423 times.
✓ Branch 2 taken 354 times.
7297 for (unsigned i = 0; i < tubes_.size(); ++i)
254
1/2
✓ Branch 1 taken 3423 times.
✗ Branch 2 not taken.
6705 delete tubes_[i];
255 592 }
256
257 6705 void TakeTube(Tube<ItemT> *t) {
258
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3423 times.
6705 assert(!is_active_);
259 6705 tubes_.push_back(t);
260 6705 }
261
262 592 void Activate() {
263
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 354 times.
592 assert(!is_active_);
264
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 354 times.
592 assert(!tubes_.empty());
265 592 is_active_ = true;
266 592 }
267
268 /**
269 * Like Tube::EnqueueBack(), but pick a tube according to ItemT::tag()
270 */
271 3075412 typename Tube<ItemT>::Link *Dispatch(ItemT *item) {
272
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3075412 times.
3075412 assert(is_active_);
273
2/2
✓ Branch 1 taken 2379076 times.
✓ Branch 2 taken 689295 times.
3075412 unsigned tube_idx = (tubes_.size() == 1)
274 2379076 ? 0 : (item->tag() % tubes_.size());
275 3055153 return tubes_[tube_idx]->EnqueueBack(item);
276 }
277
278 /**
279 * Like Tube::EnqueueBack(), use tubes one after another
280 */
281 250091 typename Tube<ItemT>::Link *DispatchAny(ItemT *item) {
282
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 250091 times.
250091 assert(is_active_);
283
2/2
✓ Branch 1 taken 250089 times.
✓ Branch 2 taken 2 times.
250091 unsigned tube_idx = (tubes_.size() == 1)
284 250089 ? 0 : (atomic_xadd32(&round_robin_, 1) % tubes_.size());
285 250091 return tubes_[tube_idx]->EnqueueBack(item);
286 }
287
288 private:
289 bool is_active_;
290 std::vector<Tube<ItemT> *> tubes_;
291 atomic_int32 round_robin_;
292 };
293
294 #endif // CVMFS_UTIL_TUBE_H_
295