GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/util/tube.h
Date: 2024-04-28 02:33:07
Exec Total Coverage
Lines: 134 134 100.0%
Branches: 50 74 67.6%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_UTIL_TUBE_H_
6 #define CVMFS_UTIL_TUBE_H_
7
8 #include <pthread.h>
9 #include <stdint.h>
10
11 #include <cassert>
12 #include <vector>
13
14 #include "util/atomic.h"
15 #include "util/concurrency.h"
16 #include "util/pointer.h"
17 #include "util/single_copy.h"
18
19 /**
20 * A thread-safe, doubly linked list of links containing pointers to ItemT. The
21 * ItemT elements are not owned by the Tube. FIFO or LIFO semantics. Using
22 * Slice(), items at arbitrary locations in the tube can be removed, too.
23 *
24 *
25 * The layout of the linked list is as follows:
26 *
27 * --------------------------------------------------------------
28 * | |
29 * --> I$n$ (back) <--> I2 <--> ... <--> I1 (front) <--> HEAD <--
30 *
31 * The tube links the steps in the file processing pipeline. It connects
32 * multiple producers to multiple consumers and can throttle the producers if a
33 * limit for the tube size is set.
34 *
35 * Internally, uses conditional variables to block when threads try to pop from
36 * the empty tube or insert into the full tube.
37 */
38 template <class ItemT>
39 class Tube : SingleCopy {
40 public:
41 class Link : SingleCopy {
42 friend class Tube<ItemT>;
43 public:
44 7947873 explicit Link(ItemT *item) : item_(item), next_(NULL), prev_(NULL) { }
45 1 ItemT *item() { return item_; }
46
47 private:
48 ItemT *item_;
49 Link *next_;
50 Link *prev_;
51 };
52
53 6759 Tube() : limit_(uint64_t(-1)), size_(0) { Init(); }
54 76 explicit Tube(uint64_t limit) : limit_(limit), size_(0) {
55 76 Init();
56 76 }
57 6999 ~Tube() {
58 7007 Link *cursor = head_;
59 do {
60 7008 Link *prev = cursor->prev_;
61
1/2
✓ Branch 0 taken 3571 times.
✗ Branch 1 not taken.
7008 delete cursor;
62 7008 cursor = prev;
63
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 3570 times.
7008 } while (cursor != head_);
64 7007 pthread_cond_destroy(&cond_populated_);
65 7007 pthread_cond_destroy(&cond_capacious_);
66 7007 pthread_cond_destroy(&cond_empty_);
67 7007 pthread_mutex_destroy(&lock_);
68 7007 }
69
70 /**
71 * Push an item to the back of the queue. Block if queue is currently full.
72 */
73 7585528 Link *EnqueueBack(ItemT *item) {
74
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4128343 times.
7585528 assert(item != NULL);
75 7585528 MutexLockGuard lock_guard(&lock_);
76
2/2
✓ Branch 0 taken 219319 times.
✓ Branch 1 taken 4116733 times.
8000726 while (size_ == limit_)
77
1/2
✓ Branch 1 taken 219412 times.
✗ Branch 2 not taken.
438638 pthread_cond_wait(&cond_capacious_, &lock_);
78
79
1/2
✓ Branch 1 taken 4120082 times.
✗ Branch 2 not taken.
7562088 Link *link = new Link(item);
80 7497166 link->next_ = head_->next_;
81 7497166 link->prev_ = head_;
82 7497166 head_->next_->prev_ = link;
83 7497166 head_->next_ = link;
84 7497166 size_++;
85 7497166 int retval = pthread_cond_signal(&cond_populated_);
86
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4135763 times.
7600148 assert(retval == 0);
87 7584028 return link;
88 7600148 }
89
90 /**
91 * Push an item to the front of the queue. Block if queue currently full.
92 */
93 402593 Link *EnqueueFront(ItemT *item) {
94
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 402523 times.
402593 assert(item != NULL);
95 402593 MutexLockGuard lock_guard(&lock_);
96
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 402521 times.
402597 while (size_ == limit_)
97
1/2
✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
6 pthread_cond_wait(&cond_capacious_, &lock_);
98
99
1/2
✓ Branch 1 taken 402520 times.
✗ Branch 2 not taken.
402591 Link *link = new Link(item);
100 402590 link->next_ = head_;
101 402590 link->prev_ = head_->prev_;
102 402590 head_->prev_->next_ = link;
103 402590 head_->prev_ = link;
104 402590 size_++;
105 402590 int retval = pthread_cond_signal(&cond_populated_);
106
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 402519 times.
402589 assert(retval == 0);
107 402593 return link;
108 402589 }
109
110 /**
111 * Remove any link from the queue and return its item, including first/last
112 * element.
113 */
114 2 ItemT *Slice(Link *link) {
115 2 MutexLockGuard lock_guard(&lock_);
116 2 return SliceUnlocked(link);
117 2 }
118
119 /**
120 * Remove and return the first element from the queue. Block if tube is
121 * empty.
122 */
123 7929794 ItemT *PopFront() {
124 7929794 MutexLockGuard lock_guard(&lock_);
125
2/2
✓ Branch 0 taken 1545673 times.
✓ Branch 1 taken 4501869 times.
10990328 while (size_ == 0)
126
1/2
✓ Branch 1 taken 1567627 times.
✗ Branch 2 not taken.
3026421 pthread_cond_wait(&cond_populated_, &lock_);
127 15875524 return SliceUnlocked(head_->prev_);
128 7912466 }
129
130 /**
131 * Remove and return the first element from the queue if there is any.
132 * Equivalent to an antomic
133 * ItemT item = NULL;
134 * if (!IsEmpty())
135 * item = PopFront();
136 */
137 402151 ItemT *TryPopFront() {
138 402151 MutexLockGuard lock_guard(&lock_);
139 // Note that we don't need to wait for a signal to arrive
140
2/2
✓ Branch 0 taken 368457 times.
✓ Branch 1 taken 33983 times.
402440 if (size_ == 0)
141 368457 return NULL;
142 33983 return SliceUnlocked(head_->prev_);
143 402440 }
144
145 /**
146 * Remove and return the last element from the queue. Block if tube is
147 * empty.
148 */
149 153 ItemT *PopBack() {
150 153 MutexLockGuard lock_guard(&lock_);
151
2/2
✓ Branch 0 taken 11 times.
✓ Branch 1 taken 83 times.
175 while (size_ == 0)
152
1/2
✓ Branch 1 taken 11 times.
✗ Branch 2 not taken.
22 pthread_cond_wait(&cond_populated_, &lock_);
153 306 return SliceUnlocked(head_->next_);
154 153 }
155
156 /**
157 * Blocks until the tube is empty
158 */
159 66 void Wait() {
160 66 MutexLockGuard lock_guard(&lock_);
161
2/2
✓ Branch 0 taken 25 times.
✓ Branch 1 taken 66 times.
91 while (size_ > 0)
162
1/2
✓ Branch 1 taken 25 times.
✗ Branch 2 not taken.
25 pthread_cond_wait(&cond_empty_, &lock_);
163 66 }
164
165 421 bool IsEmpty() {
166 421 MutexLockGuard lock_guard(&lock_);
167 842 return size_ == 0;
168 421 }
169
170 19 uint64_t size() {
171 19 MutexLockGuard lock_guard(&lock_);
172 38 return size_;
173 19 }
174
175 private:
176 6879 void Init() {
177 6879 Link *sentinel = new Link(NULL);
178 6879 head_ = sentinel;
179 6879 head_->next_ = head_->prev_ = sentinel;
180
181 6879 int retval = pthread_mutex_init(&lock_, NULL);
182
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3570 times.
6879 assert(retval == 0);
183 6879 retval = pthread_cond_init(&cond_populated_, NULL);
184
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3570 times.
6879 assert(retval == 0);
185 6879 retval = pthread_cond_init(&cond_capacious_, NULL);
186
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3570 times.
6879 assert(retval == 0);
187 6879 retval = pthread_cond_init(&cond_empty_, NULL);
188
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3570 times.
6879 assert(retval == 0);
189 6879 }
190
191 7959437 ItemT *SliceUnlocked(Link *link) {
192 // Cannot delete the sentinel link
193
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4516598 times.
7959437 assert(link != head_);
194 7959437 link->prev_->next_ = link->next_;
195 7959437 link->next_->prev_ = link->prev_;
196 7959437 ItemT *item = link->item_;
197
2/2
✓ Branch 0 taken 4504377 times.
✓ Branch 1 taken 12221 times.
7959437 delete link;
198 8001815 size_--;
199 8001815 int retval = pthread_cond_signal(&cond_capacious_);
200
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4534382 times.
7995004 assert(retval == 0);
201
2/2
✓ Branch 0 taken 1765071 times.
✓ Branch 1 taken 2769311 times.
7995004 if (size_ == 0) {
202 3414396 retval = pthread_cond_broadcast(&cond_empty_);
203
2/2
✓ Branch 0 taken 7898 times.
✓ Branch 1 taken 1760061 times.
3420172 assert(retval == 0);
204 }
205 7984984 return item;
206 }
207
208
209 /**
210 * Adding new item blocks as long as limit_ == size_
211 */
212 uint64_t limit_;
213 /**
214 * The current number of links in the list
215 */
216 uint64_t size_;
217 /**
218 * Sentinel element in front of the first (front) element
219 */
220 Link *head_;
221 /**
222 * Protects all internal state
223 */
224 pthread_mutex_t lock_;
225 /**
226 * Signals if there are items enqueued
227 */
228 pthread_cond_t cond_populated_;
229 /**
230 * Signals if there is space to enqueue more items
231 */
232 pthread_cond_t cond_capacious_;
233 /**
234 * Signals if the queue runs empty
235 */
236 pthread_cond_t cond_empty_;
237 };
238
239
240 /**
241 * A tube group manages a fixed set of Tubes and dispatches items among them in
242 * such a way that items with the same tag (a positive integer) are all sent
243 * to the same tube.
244 */
245 template <class ItemT>
246 class TubeGroup : SingleCopy {
247 public:
248 570 TubeGroup() : is_active_(false) {
249 570 atomic_init32(&round_robin_);
250 570 }
251
252 570 ~TubeGroup() {
253
2/2
✓ Branch 1 taken 3277 times.
✓ Branch 2 taken 342 times.
6985 for (unsigned i = 0; i < tubes_.size(); ++i)
254
1/2
✓ Branch 1 taken 3277 times.
✗ Branch 2 not taken.
6415 delete tubes_[i];
255 570 }
256
257 6415 void TakeTube(Tube<ItemT> *t) {
258
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3277 times.
6415 assert(!is_active_);
259 6415 tubes_.push_back(t);
260 6415 }
261
262 570 void Activate() {
263
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 342 times.
570 assert(!is_active_);
264
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 342 times.
570 assert(!tubes_.empty());
265 570 is_active_ = true;
266 570 }
267
268 /**
269 * Like Tube::EnqueueBack(), but pick a tube according to ItemT::tag()
270 */
271 3079648 typename Tube<ItemT>::Link *Dispatch(ItemT *item) {
272
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3079648 times.
3079648 assert(is_active_);
273
2/2
✓ Branch 1 taken 2382870 times.
✓ Branch 2 taken 689258 times.
3079648 unsigned tube_idx = (tubes_.size() == 1)
274 2382870 ? 0 : (item->tag() % tubes_.size());
275 3052483 return tubes_[tube_idx]->EnqueueBack(item);
276 }
277
278 /**
279 * Like Tube::EnqueueBack(), use tubes one after another
280 */
281 250083 typename Tube<ItemT>::Link *DispatchAny(ItemT *item) {
282
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 250083 times.
250083 assert(is_active_);
283
2/2
✓ Branch 1 taken 250081 times.
✓ Branch 2 taken 2 times.
250083 unsigned tube_idx = (tubes_.size() == 1)
284 250081 ? 0 : (atomic_xadd32(&round_robin_, 1) % tubes_.size());
285 250083 return tubes_[tube_idx]->EnqueueBack(item);
286 }
287
288 private:
289 bool is_active_;
290 std::vector<Tube<ItemT> *> tubes_;
291 atomic_int32 round_robin_;
292 };
293
294 #endif // CVMFS_UTIL_TUBE_H_
295