GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/util/tube.h
Date: 2025-05-11 02:35:43
Exec Total Coverage
Lines: 134 134 100.0%
Branches: 50 74 67.6%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_UTIL_TUBE_H_
6 #define CVMFS_UTIL_TUBE_H_
7
8 #include <pthread.h>
9 #include <stdint.h>
10
11 #include <cassert>
12 #include <vector>
13
14 #include "util/atomic.h"
15 #include "util/concurrency.h"
16 #include "util/pointer.h"
17 #include "util/single_copy.h"
18
19 /**
20 * A thread-safe, doubly linked list of links containing pointers to ItemT. The
21 * ItemT elements are not owned by the Tube. FIFO or LIFO semantics. Using
22 * Slice(), items at arbitrary locations in the tube can be removed, too.
23 *
24 *
25 * The layout of the linked list is as follows:
26 *
27 * --------------------------------------------------------------
28 * | |
29 * --> I$n$ (back) <--> I2 <--> ... <--> I1 (front) <--> HEAD <--
30 *
31 * The tube links the steps in the file processing pipeline. It connects
32 * multiple producers to multiple consumers and can throttle the producers if a
33 * limit for the tube size is set.
34 *
35 * Internally, uses conditional variables to block when threads try to pop from
36 * the empty tube or insert into the full tube.
37 */
38 template <class ItemT>
39 class Tube : SingleCopy {
40 public:
41 class Link : SingleCopy {
42 friend class Tube<ItemT>;
43 public:
44 7962096 explicit Link(ItemT *item) : item_(item), next_(NULL), prev_(NULL) { }
45 1 ItemT *item() { return item_; }
46
47 private:
48 ItemT *item_;
49 Link *next_;
50 Link *prev_;
51 };
52
53 7057 Tube() : limit_(uint64_t(-1)), size_(0) { Init(); }
54 78 explicit Tube(uint64_t limit) : limit_(limit), size_(0) {
55 78 Init();
56 78 }
57 7303 ~Tube() {
58 7311 Link *cursor = head_;
59 do {
60 7312 Link *prev = cursor->prev_;
61
1/2
✓ Branch 0 taken 3723 times.
✗ Branch 1 not taken.
7312 delete cursor;
62 7312 cursor = prev;
63
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 3722 times.
7312 } while (cursor != head_);
64 7311 pthread_cond_destroy(&cond_populated_);
65 7311 pthread_cond_destroy(&cond_capacious_);
66 7311 pthread_cond_destroy(&cond_empty_);
67 7311 pthread_mutex_destroy(&lock_);
68 7311 }
69
70 /**
71 * Push an item to the back of the queue. Block if queue is currently full.
72 */
73 7573493 Link *EnqueueBack(ItemT *item) {
74
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4122348 times.
7573493 assert(item != NULL);
75 7573493 MutexLockGuard lock_guard(&lock_);
76
2/2
✓ Branch 0 taken 193181 times.
✓ Branch 1 taken 4128311 times.
7971588 while (size_ == limit_)
77
1/2
✓ Branch 1 taken 193651 times.
✗ Branch 2 not taken.
386362 pthread_cond_wait(&cond_capacious_, &lock_);
78
79
1/2
✓ Branch 1 taken 4130101 times.
✗ Branch 2 not taken.
7585226 Link *link = new Link(item);
80 7500706 link->next_ = head_->next_;
81 7500706 link->prev_ = head_;
82 7500706 head_->next_->prev_ = link;
83 7500706 head_->next_ = link;
84 7500706 size_++;
85 7500706 int retval = pthread_cond_signal(&cond_populated_);
86
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4133821 times.
7596246 assert(retval == 0);
87 7568375 return link;
88 7596246 }
89
90 /**
91 * Push an item to the front of the queue. Block if queue currently full.
92 */
93 402592 Link *EnqueueFront(ItemT *item) {
94
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 402522 times.
402592 assert(item != NULL);
95 402592 MutexLockGuard lock_guard(&lock_);
96
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 402523 times.
402599 while (size_ == limit_)
97
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
6 pthread_cond_wait(&cond_capacious_, &lock_);
98
99
1/2
✓ Branch 1 taken 402523 times.
✗ Branch 2 not taken.
402593 Link *link = new Link(item);
100 402593 link->next_ = head_;
101 402593 link->prev_ = head_->prev_;
102 402593 head_->prev_->next_ = link;
103 402593 head_->prev_ = link;
104 402593 size_++;
105 402593 int retval = pthread_cond_signal(&cond_populated_);
106
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 402523 times.
402593 assert(retval == 0);
107 402590 return link;
108 402593 }
109
110 /**
111 * Remove any link from the queue and return its item, including first/last
112 * element.
113 */
114 2 ItemT *Slice(Link *link) {
115 2 MutexLockGuard lock_guard(&lock_);
116 2 return SliceUnlocked(link);
117 2 }
118
119 /**
120 * Remove and return the first element from the queue. Block if tube is
121 * empty.
122 */
123 7923138 ItemT *PopFront() {
124 7923138 MutexLockGuard lock_guard(&lock_);
125
2/2
✓ Branch 0 taken 1498830 times.
✓ Branch 1 taken 4508818 times.
10932951 while (size_ == 0)
126
1/2
✓ Branch 1 taken 1512788 times.
✗ Branch 2 not taken.
2955165 pthread_cond_wait(&cond_populated_, &lock_);
127 15881091 return SliceUnlocked(head_->prev_);
128 7895161 }
129
130 /**
131 * Remove and return the first element from the queue if there is any.
132 * Equivalent to an antomic
133 * ItemT item = NULL;
134 * if (!IsEmpty())
135 * item = PopFront();
136 */
137 402293 ItemT *TryPopFront() {
138 402293 MutexLockGuard lock_guard(&lock_);
139 // Note that we don't need to wait for a signal to arrive
140
2/2
✓ Branch 0 taken 368457 times.
✓ Branch 1 taken 33983 times.
402440 if (size_ == 0)
141 368457 return NULL;
142 33983 return SliceUnlocked(head_->prev_);
143 402440 }
144
145 /**
146 * Remove and return the last element from the queue. Block if tube is
147 * empty.
148 */
149 153 ItemT *PopBack() {
150 153 MutexLockGuard lock_guard(&lock_);
151
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 83 times.
171 while (size_ == 0)
152
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
18 pthread_cond_wait(&cond_populated_, &lock_);
153 306 return SliceUnlocked(head_->next_);
154 153 }
155
156 /**
157 * Blocks until the tube is empty
158 */
159 70 void Wait() {
160 70 MutexLockGuard lock_guard(&lock_);
161
2/2
✓ Branch 0 taken 24 times.
✓ Branch 1 taken 70 times.
94 while (size_ > 0)
162
1/2
✓ Branch 1 taken 24 times.
✗ Branch 2 not taken.
24 pthread_cond_wait(&cond_empty_, &lock_);
163 70 }
164
165 421 bool IsEmpty() {
166 421 MutexLockGuard lock_guard(&lock_);
167 842 return size_ == 0;
168 421 }
169
170 19 uint64_t size() {
171 19 MutexLockGuard lock_guard(&lock_);
172 38 return size_;
173 19 }
174
175 private:
176 7181 void Init() {
177 7181 Link *sentinel = new Link(NULL);
178 7181 head_ = sentinel;
179 7181 head_->next_ = head_->prev_ = sentinel;
180
181 7181 int retval = pthread_mutex_init(&lock_, NULL);
182
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3722 times.
7181 assert(retval == 0);
183 7181 retval = pthread_cond_init(&cond_populated_, NULL);
184
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3722 times.
7181 assert(retval == 0);
185 7181 retval = pthread_cond_init(&cond_capacious_, NULL);
186
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3722 times.
7181 assert(retval == 0);
187 7181 retval = pthread_cond_init(&cond_empty_, NULL);
188
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3722 times.
7181 assert(retval == 0);
189 7181 }
190
191 7958314 ItemT *SliceUnlocked(Link *link) {
192 // Cannot delete the sentinel link
193
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4516046 times.
7958314 assert(link != head_);
194 7958314 link->prev_->next_ = link->next_;
195 7958314 link->next_->prev_ = link->prev_;
196 7958314 ItemT *item = link->item_;
197
2/2
✓ Branch 0 taken 4502594 times.
✓ Branch 1 taken 13452 times.
7958314 delete link;
198 8023345 size_--;
199 8023345 int retval = pthread_cond_signal(&cond_capacious_);
200
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4538911 times.
8004044 assert(retval == 0);
201
2/2
✓ Branch 0 taken 1698045 times.
✓ Branch 1 taken 2840866 times.
8004044 if (size_ == 0) {
202 3298982 retval = pthread_cond_broadcast(&cond_empty_);
203
2/2
✓ Branch 0 taken 20756 times.
✓ Branch 1 taken 1677687 times.
3299778 assert(retval == 0);
204 }
205 7963329 return item;
206 }
207
208
209 /**
210 * Adding new item blocks as long as limit_ == size_
211 */
212 uint64_t limit_;
213 /**
214 * The current number of links in the list
215 */
216 uint64_t size_;
217 /**
218 * Sentinel element in front of the first (front) element
219 */
220 Link *head_;
221 /**
222 * Protects all internal state
223 */
224 pthread_mutex_t lock_;
225 /**
226 * Signals if there are items enqueued
227 */
228 pthread_cond_t cond_populated_;
229 /**
230 * Signals if there is space to enqueue more items
231 */
232 pthread_cond_t cond_capacious_;
233 /**
234 * Signals if the queue runs empty
235 */
236 pthread_cond_t cond_empty_;
237 };
238
239
240 /**
241 * A tube group manages a fixed set of Tubes and dispatches items among them in
242 * such a way that items with the same tag (a positive integer) are all sent
243 * to the same tube.
244 */
245 template <class ItemT>
246 class TubeGroup : SingleCopy {
247 public:
248 592 TubeGroup() : is_active_(false) {
249 592 atomic_init32(&round_robin_);
250 592 }
251
252 592 ~TubeGroup() {
253
2/2
✓ Branch 1 taken 3423 times.
✓ Branch 2 taken 354 times.
7297 for (unsigned i = 0; i < tubes_.size(); ++i)
254
1/2
✓ Branch 1 taken 3423 times.
✗ Branch 2 not taken.
6705 delete tubes_[i];
255 592 }
256
257 6705 void TakeTube(Tube<ItemT> *t) {
258
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3423 times.
6705 assert(!is_active_);
259 6705 tubes_.push_back(t);
260 6705 }
261
262 592 void Activate() {
263
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 354 times.
592 assert(!is_active_);
264
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 354 times.
592 assert(!tubes_.empty());
265 592 is_active_ = true;
266 592 }
267
268 /**
269 * Like Tube::EnqueueBack(), but pick a tube according to ItemT::tag()
270 */
271 3069983 typename Tube<ItemT>::Link *Dispatch(ItemT *item) {
272
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3069983 times.
3069983 assert(is_active_);
273
2/2
✓ Branch 1 taken 2376731 times.
✓ Branch 2 taken 689298 times.
3069983 unsigned tube_idx = (tubes_.size() == 1)
274 2376731 ? 0 : (item->tag() % tubes_.size());
275 3048640 return tubes_[tube_idx]->EnqueueBack(item);
276 }
277
278 /**
279 * Like Tube::EnqueueBack(), use tubes one after another
280 */
281 250091 typename Tube<ItemT>::Link *DispatchAny(ItemT *item) {
282
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 250091 times.
250091 assert(is_active_);
283
2/2
✓ Branch 1 taken 250089 times.
✓ Branch 2 taken 2 times.
250091 unsigned tube_idx = (tubes_.size() == 1)
284 250089 ? 0 : (atomic_xadd32(&round_robin_, 1) % tubes_.size());
285 250091 return tubes_[tube_idx]->EnqueueBack(item);
286 }
287
288 private:
289 bool is_active_;
290 std::vector<Tube<ItemT> *> tubes_;
291 atomic_int32 round_robin_;
292 };
293
294 #endif // CVMFS_UTIL_TUBE_H_
295