| Directory: | cvmfs/ |
|---|---|
| File: | cvmfs/util/tube.h |
| Date: | 2026-05-10 02:36:07 |
| Exec | Total | Coverage | |
|---|---|---|---|
| Lines: | 130 | 130 | 100.0% |
| Branches: | 50 | 74 | 67.6% |
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | /** | ||
| 2 | * This file is part of the CernVM File System. | ||
| 3 | */ | ||
| 4 | |||
| 5 | #ifndef CVMFS_UTIL_TUBE_H_ | ||
| 6 | #define CVMFS_UTIL_TUBE_H_ | ||
| 7 | |||
| 8 | #include <pthread.h> | ||
| 9 | #include <stdint.h> | ||
| 10 | |||
| 11 | #include <cassert> | ||
| 12 | #include <cstddef> | ||
| 13 | #include <vector> | ||
| 14 | |||
| 15 | #include "util/atomic.h" | ||
| 16 | #include "util/mutex.h" | ||
| 17 | #include "util/single_copy.h" | ||
| 18 | |||
| 19 | /** | ||
| 20 | * A thread-safe, doubly linked list of links containing pointers to ItemT. The | ||
| 21 | * ItemT elements are not owned by the Tube. FIFO or LIFO semantics. Using | ||
| 22 | * Slice(), items at arbitrary locations in the tube can be removed, too. | ||
| 23 | * | ||
| 24 | * | ||
| 25 | * The layout of the linked list is as follows: | ||
| 26 | * | ||
| 27 | * -------------------------------------------------------------- | ||
| 28 | * | | | ||
| 29 | * --> I$n$ (back) <--> I2 <--> ... <--> I1 (front) <--> HEAD <-- | ||
| 30 | * | ||
| 31 | * The tube links the steps in the file processing pipeline. It connects | ||
| 32 | * multiple producers to multiple consumers and can throttle the producers if a | ||
| 33 | * limit for the tube size is set. | ||
| 34 | * | ||
| 35 | * Internally, uses conditional variables to block when threads try to pop from | ||
| 36 | * the empty tube or insert into the full tube. | ||
| 37 | */ | ||
| 38 | template<class ItemT> | ||
| 39 | class Tube : SingleCopy { | ||
| 40 | public: | ||
| 41 | class Link : SingleCopy { | ||
| 42 | friend class Tube<ItemT>; | ||
| 43 | |||
| 44 | public: | ||
| 45 | 294941685 | explicit Link(ItemT *item) : item_(item), next_(NULL), prev_(NULL) { } | |
| 46 | 43 | ItemT *item() { return item_; } | |
| 47 | |||
| 48 | private: | ||
| 49 | ItemT *item_; | ||
| 50 | Link *next_; | ||
| 51 | Link *prev_; | ||
| 52 | }; | ||
| 53 | |||
| 54 | 206291 | Tube() : limit_(uint64_t(-1)), size_(0) { Init(); } | |
| 55 | 2837 | explicit Tube(uint64_t limit) : limit_(limit), size_(0) { Init(); } | |
| 56 | 214023 | ~Tube() { | |
| 57 | 214397 | Link *cursor = head_; | |
| 58 | do { | ||
| 59 | 214440 | Link *prev = cursor->prev_; | |
| 60 |
1/2✓ Branch 0 taken 109141 times.
✗ Branch 1 not taken.
|
214440 | delete cursor; |
| 61 | 214440 | cursor = prev; | |
| 62 |
2/2✓ Branch 0 taken 43 times.
✓ Branch 1 taken 109098 times.
|
214440 | } while (cursor != head_); |
| 63 | 214397 | pthread_cond_destroy(&cond_populated_); | |
| 64 | 214397 | pthread_cond_destroy(&cond_capacious_); | |
| 65 | 214397 | pthread_cond_destroy(&cond_empty_); | |
| 66 | 214397 | pthread_mutex_destroy(&lock_); | |
| 67 | 214397 | } | |
| 68 | |||
| 69 | /** | ||
| 70 | * Push an item to the back of the queue. Block if queue is currently full. | ||
| 71 | */ | ||
| 72 | 279648960 | Link *EnqueueBack(ItemT *item) { | |
| 73 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 152021580 times.
|
279648960 | assert(item != NULL); |
| 74 | 279648960 | MutexLockGuard lock_guard(&lock_); | |
| 75 |
2/2✓ Branch 0 taken 6640120 times.
✓ Branch 1 taken 151111543 times.
|
291095902 | while (size_ == limit_) |
| 76 |
1/2✓ Branch 1 taken 6604400 times.
✗ Branch 2 not taken.
|
13280240 | pthread_cond_wait(&cond_capacious_, &lock_); |
| 77 | |||
| 78 |
1/2✓ Branch 1 taken 151310244 times.
✗ Branch 2 not taken.
|
277815662 | Link *link = new Link(item); |
| 79 | 276244044 | link->next_ = head_->next_; | |
| 80 | 276244044 | link->prev_ = head_; | |
| 81 | 276244044 | head_->next_->prev_ = link; | |
| 82 | 276244044 | head_->next_ = link; | |
| 83 | 276244044 | size_++; | |
| 84 | 276244044 | int retval = pthread_cond_signal(&cond_populated_); | |
| 85 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 152549690 times.
|
280691956 | assert(retval == 0); |
| 86 | 279505982 | return link; | |
| 87 | 280691956 | } | |
| 88 | |||
| 89 | /** | ||
| 90 | * Push an item to the front of the queue. Block if queue currently full. | ||
| 91 | */ | ||
| 92 | 17296599 | Link *EnqueueFront(ItemT *item) { | |
| 93 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 17293311 times.
|
17296599 | assert(item != NULL); |
| 94 | 17296599 | MutexLockGuard lock_guard(&lock_); | |
| 95 |
2/2✓ Branch 0 taken 141 times.
✓ Branch 1 taken 17293225 times.
|
17296795 | while (size_ == limit_) |
| 96 |
1/2✓ Branch 1 taken 141 times.
✗ Branch 2 not taken.
|
282 | pthread_cond_wait(&cond_capacious_, &lock_); |
| 97 | |||
| 98 |
1/2✓ Branch 1 taken 17293268 times.
✗ Branch 2 not taken.
|
17296513 | Link *link = new Link(item); |
| 99 | 17296255 | link->next_ = head_; | |
| 100 | 17296255 | link->prev_ = head_->prev_; | |
| 101 | 17296255 | head_->prev_->next_ = link; | |
| 102 | 17296255 | head_->prev_ = link; | |
| 103 | 17296255 | size_++; | |
| 104 | 17296255 | int retval = pthread_cond_signal(&cond_populated_); | |
| 105 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 17292838 times.
|
17296126 | assert(retval == 0); |
| 106 | 17296427 | return link; | |
| 107 | 17296126 | } | |
| 108 | |||
| 109 | /** | ||
| 110 | * Remove any link from the queue and return its item, including first/last | ||
| 111 | * element. | ||
| 112 | */ | ||
| 113 | 47 | ItemT *Slice(Link *link) { | |
| 114 | 47 | MutexLockGuard lock_guard(&lock_); | |
| 115 | 47 | return SliceUnlocked(link); | |
| 116 | 47 | } | |
| 117 | |||
| 118 | /** | ||
| 119 | * Remove and return the first element from the queue. Block if tube is | ||
| 120 | * empty. | ||
| 121 | */ | ||
| 122 | 294771535 | ItemT *PopFront() { | |
| 123 | 294771535 | MutexLockGuard lock_guard(&lock_); | |
| 124 |
2/2✓ Branch 0 taken 55074445 times.
✓ Branch 1 taken 167571860 times.
|
403496285 | while (size_ == 0) |
| 125 |
1/2✓ Branch 1 taken 55227194 times.
✗ Branch 2 not taken.
|
108589783 | pthread_cond_wait(&cond_populated_, &lock_); |
| 126 | 588344940 | return SliceUnlocked(head_->prev_); | |
| 127 | 293433417 | } | |
| 128 | |||
| 129 | /** | ||
| 130 | * Remove and return the first element from the queue if there is any. | ||
| 131 | * Equivalent to an antomic | ||
| 132 | * ItemT item = NULL; | ||
| 133 | * if (!IsEmpty()) | ||
| 134 | * item = PopFront(); | ||
| 135 | */ | ||
| 136 | 17279131 | ItemT *TryPopFront() { | |
| 137 | 17279131 | MutexLockGuard lock_guard(&lock_); | |
| 138 | // Note that we don't need to wait for a signal to arrive | ||
| 139 |
2/2✓ Branch 0 taken 15829923 times.
✓ Branch 1 taken 1460216 times.
|
17290139 | if (size_ == 0) |
| 140 | 15829923 | return NULL; | |
| 141 | 1460216 | return SliceUnlocked(head_->prev_); | |
| 142 | 17290139 | } | |
| 143 | |||
| 144 | /** | ||
| 145 | * Remove and return the last element from the queue. Block if tube is | ||
| 146 | * empty. | ||
| 147 | */ | ||
| 148 | 6628 | ItemT *PopBack() { | |
| 149 | 6628 | MutexLockGuard lock_guard(&lock_); | |
| 150 |
2/2✓ Branch 0 taken 846 times.
✓ Branch 1 taken 3340 times.
|
8320 | while (size_ == 0) |
| 151 |
1/2✓ Branch 1 taken 846 times.
✗ Branch 2 not taken.
|
1692 | pthread_cond_wait(&cond_populated_, &lock_); |
| 152 | 13256 | return SliceUnlocked(head_->next_); | |
| 153 | 6628 | } | |
| 154 | |||
| 155 | /** | ||
| 156 | * Blocks until the tube is empty | ||
| 157 | */ | ||
| 158 | 2084 | void Wait() { | |
| 159 | 2084 | MutexLockGuard lock_guard(&lock_); | |
| 160 |
2/2✓ Branch 0 taken 738 times.
✓ Branch 1 taken 2084 times.
|
2822 | while (size_ > 0) |
| 161 |
1/2✓ Branch 1 taken 738 times.
✗ Branch 2 not taken.
|
738 | pthread_cond_wait(&cond_empty_, &lock_); |
| 162 | 2084 | } | |
| 163 | |||
| 164 | 10058 | bool IsEmpty() { | |
| 165 | 10058 | MutexLockGuard lock_guard(&lock_); | |
| 166 | 20116 | return size_ == 0; | |
| 167 | 10058 | } | |
| 168 | |||
| 169 | 388 | uint64_t size() { | |
| 170 | 388 | MutexLockGuard lock_guard(&lock_); | |
| 171 | 776 | return size_; | |
| 172 | 388 | } | |
| 173 | |||
| 174 | private: | ||
| 175 | 210469 | void Init() { | |
| 176 | 210469 | Link *sentinel = new Link(NULL); | |
| 177 | 210469 | head_ = sentinel; | |
| 178 | 210469 | head_->next_ = head_->prev_ = sentinel; | |
| 179 | |||
| 180 | 210469 | int retval = pthread_mutex_init(&lock_, NULL); | |
| 181 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 109174 times.
|
210469 | assert(retval == 0); |
| 182 | 210469 | retval = pthread_cond_init(&cond_populated_, NULL); | |
| 183 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 109174 times.
|
210469 | assert(retval == 0); |
| 184 | 210469 | retval = pthread_cond_init(&cond_capacious_, NULL); | |
| 185 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 109174 times.
|
210469 | assert(retval == 0); |
| 186 | 210469 | retval = pthread_cond_init(&cond_empty_, NULL); | |
| 187 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 109174 times.
|
210469 | assert(retval == 0); |
| 188 | 210469 | } | |
| 189 | |||
| 190 | 295565029 | ItemT *SliceUnlocked(Link *link) { | |
| 191 | // Cannot delete the sentinel link | ||
| 192 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 168629637 times.
|
295565029 | assert(link != head_); |
| 193 | 295565029 | link->prev_->next_ = link->next_; | |
| 194 | 295565029 | link->next_->prev_ = link->prev_; | |
| 195 | 295565029 | ItemT *item = link->item_; | |
| 196 |
2/2✓ Branch 0 taken 168059298 times.
✓ Branch 1 taken 570339 times.
|
295565029 | delete link; |
| 197 | 298333173 | size_--; | |
| 198 | 298333173 | int retval = pthread_cond_signal(&cond_capacious_); | |
| 199 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 169626785 times.
|
297559325 | assert(retval == 0); |
| 200 |
2/2✓ Branch 0 taken 63101425 times.
✓ Branch 1 taken 106525360 times.
|
297559325 | if (size_ == 0) { |
| 201 | 122688346 | retval = pthread_cond_broadcast(&cond_empty_); | |
| 202 |
2/2✓ Branch 0 taken 660738 times.
✓ Branch 1 taken 62471042 times.
|
122749056 | assert(retval == 0); |
| 203 | } | ||
| 204 | 296298559 | return item; | |
| 205 | } | ||
| 206 | |||
| 207 | |||
| 208 | /** | ||
| 209 | * Adding new item blocks as long as limit_ == size_ | ||
| 210 | */ | ||
| 211 | uint64_t limit_; | ||
| 212 | /** | ||
| 213 | * The current number of links in the list | ||
| 214 | */ | ||
| 215 | uint64_t size_; | ||
| 216 | /** | ||
| 217 | * Sentinel element in front of the first (front) element | ||
| 218 | */ | ||
| 219 | Link *head_; | ||
| 220 | /** | ||
| 221 | * Protects all internal state | ||
| 222 | */ | ||
| 223 | pthread_mutex_t lock_; | ||
| 224 | /** | ||
| 225 | * Signals if there are items enqueued | ||
| 226 | */ | ||
| 227 | pthread_cond_t cond_populated_; | ||
| 228 | /** | ||
| 229 | * Signals if there is space to enqueue more items | ||
| 230 | */ | ||
| 231 | pthread_cond_t cond_capacious_; | ||
| 232 | /** | ||
| 233 | * Signals if the queue runs empty | ||
| 234 | */ | ||
| 235 | pthread_cond_t cond_empty_; | ||
| 236 | }; | ||
| 237 | |||
| 238 | |||
| 239 | /** | ||
| 240 | * A tube group manages a fixed set of Tubes and dispatches items among them in | ||
| 241 | * such a way that items with the same tag (a positive integer) are all sent | ||
| 242 | * to the same tube. | ||
| 243 | */ | ||
| 244 | template<class ItemT> | ||
| 245 | class TubeGroup : SingleCopy { | ||
| 246 | public: | ||
| 247 | 16857 | TubeGroup() : is_active_(false) { atomic_init32(&round_robin_); } | |
| 248 | |||
| 249 | 16846 | ~TubeGroup() { | |
| 250 |
2/2✓ Branch 1 taken 100942 times.
✓ Branch 2 taken 10114 times.
|
214148 | for (unsigned i = 0; i < tubes_.size(); ++i) |
| 251 |
1/2✓ Branch 1 taken 100942 times.
✗ Branch 2 not taken.
|
197302 | delete tubes_[i]; |
| 252 | 16846 | } | |
| 253 | |||
| 254 | 197447 | void TakeTube(Tube<ItemT> *t) { | |
| 255 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 101015 times.
|
197447 | assert(!is_active_); |
| 256 | 197447 | tubes_.push_back(t); | |
| 257 | 197447 | } | |
| 258 | |||
| 259 | 16857 | void Activate() { | |
| 260 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 10120 times.
|
16857 | assert(!is_active_); |
| 261 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 10120 times.
|
16857 | assert(!tubes_.empty()); |
| 262 | 16857 | is_active_ = true; | |
| 263 | 16857 | } | |
| 264 | |||
| 265 | /** | ||
| 266 | * Like Tube::EnqueueBack(), but pick a tube according to ItemT::tag() | ||
| 267 | */ | ||
| 268 | 114679541 | typename Tube<ItemT>::Link *Dispatch(ItemT *item) { | |
| 269 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 114679541 times.
|
114679541 | assert(is_active_); |
| 270 |
2/2✓ Branch 1 taken 90191693 times.
✓ Branch 2 taken 24386122 times.
|
114679541 | unsigned tube_idx = (tubes_.size() == 1) ? 0 |
| 271 | 90191693 | : (item->tag() % tubes_.size()); | |
| 272 | 113812989 | return tubes_[tube_idx]->EnqueueBack(item); | |
| 273 | } | ||
| 274 | |||
| 275 | /** | ||
| 276 | * Like Tube::EnqueueBack(), use tubes one after another | ||
| 277 | */ | ||
| 278 | 9503243 | typename Tube<ItemT>::Link *DispatchAny(ItemT *item) { | |
| 279 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 9503243 times.
|
9503243 | assert(is_active_); |
| 280 |
2/2✓ Branch 1 taken 9503235 times.
✓ Branch 2 taken 8 times.
|
9503243 | unsigned tube_idx = (tubes_.size() == 1) |
| 281 | ? 0 | ||
| 282 | 9503235 | : (atomic_xadd32(&round_robin_, 1) % tubes_.size()); | |
| 283 | 9503243 | return tubes_[tube_idx]->EnqueueBack(item); | |
| 284 | } | ||
| 285 | |||
| 286 | private: | ||
| 287 | bool is_active_; | ||
| 288 | std::vector<Tube<ItemT> *> tubes_; | ||
| 289 | atomic_int32 round_robin_; | ||
| 290 | }; | ||
| 291 | |||
| 292 | #endif // CVMFS_UTIL_TUBE_H_ | ||
| 293 |