GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/ingestion/tube.h
Date: 2024-02-04 02:33:02
Exec Total Coverage
Lines: 132 134 98.5%
Branches: 45 74 60.8%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_INGESTION_TUBE_H_
6 #define CVMFS_INGESTION_TUBE_H_
7
8 #include <pthread.h>
9 #include <stdint.h>
10
11 #include <cassert>
12 #include <vector>
13
14 #include "util/atomic.h"
15 #include "util/concurrency.h"
16 #include "util/pointer.h"
17 #include "util/single_copy.h"
18
19 /**
20 * A thread-safe, doubly linked list of links containing pointers to ItemT. The
21 * ItemT elements are not owned by the Tube. FIFO or LIFO semantics. Using
22 * Slice(), items at arbitrary locations in the tube can be removed, too.
23 *
24 *
25 * The layout of the linked list is as follows:
26 *
27 * --------------------------------------------------------------
28 * | |
29 * --> I$n$ (back) <--> I2 <--> ... <--> I1 (front) <--> HEAD <--
30 *
31 * The tube links the steps in the file processing pipeline. It connects
32 * multiple producers to multiple consumers and can throttle the producers if a
33 * limit for the tube size is set.
34 *
35 * Internally, uses conditional variables to block when threads try to pop from
36 * the empty tube or insert into the full tube.
37 */
38 template <class ItemT>
39 class Tube : SingleCopy {
40 public:
41 class Link : SingleCopy {
42 friend class Tube<ItemT>;
43 public:
44 7969254 explicit Link(ItemT *item) : item_(item), next_(NULL), prev_(NULL) { }
45 1 ItemT *item() { return item_; }
46
47 private:
48 ItemT *item_;
49 Link *next_;
50 Link *prev_;
51 };
52
53 6759 Tube() : limit_(uint64_t(-1)), size_(0) { Init(); }
54 44 explicit Tube(uint64_t limit) : limit_(limit), size_(0) {
55 44 Init();
56 44 }
57 6983 ~Tube() {
58 6983 Link *cursor = head_;
59 do {
60 6984 Link *prev = cursor->prev_;
61
1/2
✓ Branch 0 taken 3555 times.
✗ Branch 1 not taken.
6984 delete cursor;
62 6984 cursor = prev;
63
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 3554 times.
6984 } while (cursor != head_);
64 6983 pthread_cond_destroy(&cond_populated_);
65 6983 pthread_cond_destroy(&cond_capacious_);
66 6983 pthread_cond_destroy(&cond_empty_);
67 6983 pthread_mutex_destroy(&lock_);
68 6983 }
69
70 /**
71 * Push an item to the back of the queue. Block if queue is currently full.
72 */
73 7594257 Link *EnqueueBack(ItemT *item) {
74
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4132757 times.
7594257 assert(item != NULL);
75 7594257 MutexLockGuard lock_guard(&lock_);
76
2/2
✓ Branch 0 taken 224446 times.
✓ Branch 1 taken 4119530 times.
8016574 while (size_ == limit_)
77
1/2
✓ Branch 1 taken 222283 times.
✗ Branch 2 not taken.
448892 pthread_cond_wait(&cond_capacious_, &lock_);
78
79
1/2
✓ Branch 1 taken 4125400 times.
✗ Branch 2 not taken.
7567682 Link *link = new Link(item);
80 7529790 link->next_ = head_->next_;
81 7529790 link->prev_ = head_;
82 7529790 head_->next_->prev_ = link;
83 7529790 head_->next_ = link;
84 7529790 size_++;
85 7529790 int retval = pthread_cond_signal(&cond_populated_);
86
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4144083 times.
7616788 assert(retval == 0);
87 7585605 return link;
88 7616788 }
89
90 /**
91 * Push an item to the front of the queue. Block if queue currently full.
92 */
93 402450 Link *EnqueueFront(ItemT *item) {
94
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 402450 times.
402450 assert(item != NULL);
95 402450 MutexLockGuard lock_guard(&lock_);
96
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 402453 times.
402453 while (size_ == limit_)
97 pthread_cond_wait(&cond_capacious_, &lock_);
98
99
1/2
✓ Branch 1 taken 402453 times.
✗ Branch 2 not taken.
402453 Link *link = new Link(item);
100 402453 link->next_ = head_;
101 402453 link->prev_ = head_->prev_;
102 402453 head_->prev_->next_ = link;
103 402453 head_->prev_ = link;
104 402453 size_++;
105 402453 int retval = pthread_cond_signal(&cond_populated_);
106
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 402452 times.
402452 assert(retval == 0);
107 402451 return link;
108 402452 }
109
110 /**
111 * Remove any link from the queue and return its item, including first/last
112 * element.
113 */
114 2 ItemT *Slice(Link *link) {
115 2 MutexLockGuard lock_guard(&lock_);
116 2 return SliceUnlocked(link);
117 2 }
118
119 /**
120 * Remove and return the first element from the queue. Block if tube is
121 * empty.
122 */
123 7942614 ItemT *PopFront() {
124 7942614 MutexLockGuard lock_guard(&lock_);
125
2/2
✓ Branch 0 taken 1561333 times.
✓ Branch 1 taken 4494845 times.
11000107 while (size_ == 0)
126
1/2
✓ Branch 1 taken 1550449 times.
✗ Branch 2 not taken.
3050249 pthread_cond_wait(&cond_populated_, &lock_);
127 15858895 return SliceUnlocked(head_->prev_);
128 7908814 }
129
130 /**
131 * Remove and return the first element from the queue if there is any.
132 * Equivalent to an antomic
133 * ItemT item = NULL;
134 * if (!IsEmpty())
135 * item = PopFront();
136 */
137 402157 ItemT *TryPopFront() {
138 402157 MutexLockGuard lock_guard(&lock_);
139 // Note that we don't need to wait for a signal to arrive
140
2/2
✓ Branch 0 taken 368457 times.
✓ Branch 1 taken 33983 times.
402440 if (size_ == 0)
141 368457 return NULL;
142 33983 return SliceUnlocked(head_->prev_);
143 402440 }
144
145 /**
146 * Remove and return the last element from the queue. Block if tube is
147 * empty.
148 */
149 13 ItemT *PopBack() {
150 13 MutexLockGuard lock_guard(&lock_);
151
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13 times.
13 while (size_ == 0)
152 pthread_cond_wait(&cond_populated_, &lock_);
153 26 return SliceUnlocked(head_->next_);
154 13 }
155
156 /**
157 * Blocks until the tube is empty
158 */
159 66 void Wait() {
160 66 MutexLockGuard lock_guard(&lock_);
161
2/2
✓ Branch 0 taken 26 times.
✓ Branch 1 taken 66 times.
92 while (size_ > 0)
162
1/2
✓ Branch 1 taken 26 times.
✗ Branch 2 not taken.
26 pthread_cond_wait(&cond_empty_, &lock_);
163 66 }
164
165 374 bool IsEmpty() {
166 374 MutexLockGuard lock_guard(&lock_);
167 748 return size_ == 0;
168 374 }
169
170 19 uint64_t size() {
171 19 MutexLockGuard lock_guard(&lock_);
172 38 return size_;
173 19 }
174
175 private:
176 6847 void Init() {
177 6847 Link *sentinel = new Link(NULL);
178 6847 head_ = sentinel;
179 6847 head_->next_ = head_->prev_ = sentinel;
180
181 6847 int retval = pthread_mutex_init(&lock_, NULL);
182
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3554 times.
6847 assert(retval == 0);
183 6847 retval = pthread_cond_init(&cond_populated_, NULL);
184
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3554 times.
6847 assert(retval == 0);
185 6847 retval = pthread_cond_init(&cond_capacious_, NULL);
186
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3554 times.
6847 assert(retval == 0);
187 6847 retval = pthread_cond_init(&cond_empty_, NULL);
188
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3554 times.
6847 assert(retval == 0);
189 6847 }
190
191 7960367 ItemT *SliceUnlocked(Link *link) {
192 // Cannot delete the sentinel link
193
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4517098 times.
7960367 assert(link != head_);
194 7960367 link->prev_->next_ = link->next_;
195 7960367 link->next_->prev_ = link->prev_;
196 7960367 ItemT *item = link->item_;
197
2/2
✓ Branch 0 taken 4506732 times.
✓ Branch 1 taken 10366 times.
7960367 delete link;
198 8005666 size_--;
199 8005666 int retval = pthread_cond_signal(&cond_capacious_);
200
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4516543 times.
7959258 assert(retval == 0);
201
2/2
✓ Branch 0 taken 1781732 times.
✓ Branch 1 taken 2734811 times.
7959258 if (size_ == 0) {
202 3437059 retval = pthread_cond_broadcast(&cond_empty_);
203
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1786377 times.
3438511 assert(retval == 0);
204 }
205 7968549 return item;
206 }
207
208
209 /**
210 * Adding new item blocks as long as limit_ == size_
211 */
212 uint64_t limit_;
213 /**
214 * The current number of links in the list
215 */
216 uint64_t size_;
217 /**
218 * Sentinel element in front of the first (front) element
219 */
220 Link *head_;
221 /**
222 * Protects all internal state
223 */
224 pthread_mutex_t lock_;
225 /**
226 * Signals if there are items enqueued
227 */
228 pthread_cond_t cond_populated_;
229 /**
230 * Signals if there is space to enqueue more items
231 */
232 pthread_cond_t cond_capacious_;
233 /**
234 * Signals if the queue runs empty
235 */
236 pthread_cond_t cond_empty_;
237 };
238
239
240 /**
241 * A tube group manages a fixed set of Tubes and dispatches items among them in
242 * such a way that items with the same tag (a positive integer) are all sent
243 * to the same tube.
244 */
245 template <class ItemT>
246 class TubeGroup : SingleCopy {
247 public:
248 570 TubeGroup() : is_active_(false) {
249 570 atomic_init32(&round_robin_);
250 570 }
251
252 570 ~TubeGroup() {
253
2/2
✓ Branch 1 taken 3277 times.
✓ Branch 2 taken 342 times.
6985 for (unsigned i = 0; i < tubes_.size(); ++i)
254
1/2
✓ Branch 1 taken 3277 times.
✗ Branch 2 not taken.
6415 delete tubes_[i];
255 570 }
256
257 6415 void TakeTube(Tube<ItemT> *t) {
258
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3277 times.
6415 assert(!is_active_);
259 6415 tubes_.push_back(t);
260 6415 }
261
262 570 void Activate() {
263
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 342 times.
570 assert(!is_active_);
264
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 342 times.
570 assert(!tubes_.empty());
265 570 is_active_ = true;
266 570 }
267
268 /**
269 * Like Tube::EnqueueBack(), but pick a tube according to ItemT::tag()
270 */
271 3077443 typename Tube<ItemT>::Link *Dispatch(ItemT *item) {
272
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3077443 times.
3077443 assert(is_active_);
273
2/2
✓ Branch 1 taken 2384176 times.
✓ Branch 2 taken 689367 times.
3077443 unsigned tube_idx = (tubes_.size() == 1)
274 2384176 ? 0 : (item->tag() % tubes_.size());
275 3060420 return tubes_[tube_idx]->EnqueueBack(item);
276 }
277
278 /**
279 * Like Tube::EnqueueBack(), use tubes one after another
280 */
281 250083 typename Tube<ItemT>::Link *DispatchAny(ItemT *item) {
282
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 250083 times.
250083 assert(is_active_);
283
2/2
✓ Branch 1 taken 250081 times.
✓ Branch 2 taken 2 times.
250083 unsigned tube_idx = (tubes_.size() == 1)
284 250081 ? 0 : (atomic_xadd32(&round_robin_, 1) % tubes_.size());
285 250083 return tubes_[tube_idx]->EnqueueBack(item);
286 }
287
288 private:
289 bool is_active_;
290 std::vector<Tube<ItemT> *> tubes_;
291 atomic_int32 round_robin_;
292 };
293
294 #endif // CVMFS_INGESTION_TUBE_H_
295