| Directory: | cvmfs/ |
|---|---|
| File: | cvmfs/ingestion/task.h |
| Date: | 2025-10-26 02:35:25 |
| Exec | Total | Coverage | |
|---|---|---|---|
| Lines: | 43 | 44 | 97.7% |
| Branches: | 18 | 24 | 75.0% |
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | /** | ||
| 2 | * This file is part of the CernVM File System. | ||
| 3 | */ | ||
| 4 | |||
| 5 | #ifndef CVMFS_INGESTION_TASK_H_ | ||
| 6 | #define CVMFS_INGESTION_TASK_H_ | ||
| 7 | |||
| 8 | #include <errno.h> | ||
| 9 | #include <pthread.h> | ||
| 10 | #include <unistd.h> | ||
| 11 | |||
| 12 | #include <cassert> | ||
| 13 | #include <vector> | ||
| 14 | |||
| 15 | #include "util/exception.h" | ||
| 16 | #include "util/single_copy.h" | ||
| 17 | #include "util/tube.h" | ||
| 18 | |||
| 19 | /** | ||
| 20 | * Forward declaration of TubeConsumerGroup so that it can be used as a friend | ||
| 21 | * class to TubeConsumer. | ||
| 22 | */ | ||
| 23 | template<typename ItemT> | ||
| 24 | class TubeConsumerGroup; | ||
| 25 | |||
| 26 | |||
| 27 | /** | ||
| 28 | * Base class for threads that processes items from a tube one by one. Concrete | ||
| 29 | * implementations overwrite the Process() method. | ||
| 30 | */ | ||
| 31 | template<class ItemT> | ||
| 32 | class TubeConsumer : SingleCopy { | ||
| 33 | friend class TubeConsumerGroup<ItemT>; | ||
| 34 | |||
| 35 | public: | ||
| 36 | 306238 | virtual ~TubeConsumer() { } | |
| 37 | |||
| 38 | protected: | ||
| 39 | 303192 | explicit TubeConsumer(Tube<ItemT> *tube) : tube_(tube) { } | |
| 40 | virtual void Process(ItemT *item) = 0; | ||
| 41 | 282555 | virtual void OnTerminate() { } | |
| 42 | |||
| 43 | Tube<ItemT> *tube_; | ||
| 44 | |||
| 45 | private: | ||
| 46 | 287832 | static void *MainConsumer(void *data) { | |
| 47 | 287832 | TubeConsumer<ItemT> *consumer = reinterpret_cast<TubeConsumer<ItemT> *>( | |
| 48 | data); | ||
| 49 | |||
| 50 | 117930706 | while (true) { | |
| 51 | 118218538 | ItemT *item = consumer->tube_->PopFront(); | |
| 52 |
2/2✓ Branch 1 taken 144302 times.
✓ Branch 2 taken 64305326 times.
|
117082427 | if (item->IsQuitBeacon()) { |
| 53 |
2/2✓ Branch 0 taken 143987 times.
✓ Branch 1 taken 315 times.
|
285285 | delete item; |
| 54 | 285419 | break; | |
| 55 | } | ||
| 56 | 116459850 | consumer->Process(item); | |
| 57 | } | ||
| 58 | 285419 | consumer->OnTerminate(); | |
| 59 | 284323 | return NULL; | |
| 60 | } | ||
| 61 | }; | ||
| 62 | |||
| 63 | |||
| 64 | template<class ItemT> | ||
| 65 | class TubeConsumerGroup : SingleCopy { | ||
| 66 | public: | ||
| 67 | 16236 | TubeConsumerGroup() : is_active_(false) { } | |
| 68 | |||
| 69 | 16223 | ~TubeConsumerGroup() { | |
| 70 |
2/2✓ Branch 1 taken 153119 times.
✓ Branch 2 taken 9543 times.
|
319142 | for (unsigned i = 0; i < consumers_.size(); ++i) |
| 71 |
1/2✓ Branch 1 taken 153119 times.
✗ Branch 2 not taken.
|
302919 | delete consumers_[i]; |
| 72 | 16223 | } | |
| 73 | |||
| 74 | 303192 | void TakeConsumer(TubeConsumer<ItemT> *consumer) { | |
| 75 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 153256 times.
|
303192 | assert(!is_active_); |
| 76 | 303192 | consumers_.push_back(consumer); | |
| 77 | 303192 | } | |
| 78 | |||
| 79 | 15756 | void Spawn() { | |
| 80 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 9310 times.
|
15756 | assert(!is_active_); |
| 81 | 15756 | unsigned N = consumers_.size(); | |
| 82 | 15756 | threads_.resize(N); | |
| 83 |
2/2✓ Branch 0 taken 145576 times.
✓ Branch 1 taken 9310 times.
|
303588 | for (unsigned i = 0; i < N; ++i) { |
| 84 | 287832 | int retval = pthread_create( | |
| 85 | 287832 | &threads_[i], NULL, TubeConsumer<ItemT>::MainConsumer, consumers_[i]); | |
| 86 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 145576 times.
|
287832 | if (retval != 0) { |
| 87 | ✗ | PANIC(kLogStderr, "failed to create new thread (error: %d, pid: %d)", | |
| 88 | errno, getpid()); | ||
| 89 | } | ||
| 90 | } | ||
| 91 | 15756 | is_active_ = true; | |
| 92 | 15756 | } | |
| 93 | |||
| 94 | 15743 | void Terminate() { | |
| 95 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 9303 times.
|
15743 | assert(is_active_); |
| 96 | 15743 | unsigned N = consumers_.size(); | |
| 97 |
2/2✓ Branch 0 taken 145439 times.
✓ Branch 1 taken 9303 times.
|
303302 | for (unsigned i = 0; i < N; ++i) { |
| 98 | 287559 | consumers_[i]->tube_->EnqueueBack(ItemT::CreateQuitBeacon()); | |
| 99 | } | ||
| 100 |
2/2✓ Branch 0 taken 145439 times.
✓ Branch 1 taken 9303 times.
|
303302 | for (unsigned i = 0; i < N; ++i) { |
| 101 | 287559 | int retval = pthread_join(threads_[i], NULL); | |
| 102 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 145439 times.
|
287559 | assert(retval == 0); |
| 103 | } | ||
| 104 | 15743 | is_active_ = false; | |
| 105 | 15743 | } | |
| 106 | |||
| 107 | 2863 | bool is_active() { return is_active_; } | |
| 108 | |||
| 109 | private: | ||
| 110 | bool is_active_; | ||
| 111 | std::vector<TubeConsumer<ItemT> *> consumers_; | ||
| 112 | std::vector<pthread_t> threads_; | ||
| 113 | }; | ||
| 114 | |||
| 115 | #endif // CVMFS_INGESTION_TASK_H_ | ||
| 116 |