| Directory: | cvmfs/ |
|---|---|
| File: | cvmfs/ingestion/task.h |
| Date: | 2026-01-25 02:35:50 |
| Exec | Total | Coverage | |
|---|---|---|---|
| Lines: | 43 | 44 | 97.7% |
| Branches: | 17 | 24 | 70.8% |
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | /** | ||
| 2 | * This file is part of the CernVM File System. | ||
| 3 | */ | ||
| 4 | |||
| 5 | #ifndef CVMFS_INGESTION_TASK_H_ | ||
| 6 | #define CVMFS_INGESTION_TASK_H_ | ||
| 7 | |||
| 8 | #include <errno.h> | ||
| 9 | #include <pthread.h> | ||
| 10 | #include <unistd.h> | ||
| 11 | |||
| 12 | #include <cassert> | ||
| 13 | #include <vector> | ||
| 14 | |||
| 15 | #include "util/exception.h" | ||
| 16 | #include "util/single_copy.h" | ||
| 17 | #include "util/tube.h" | ||
| 18 | |||
| 19 | /** | ||
| 20 | * Forward declaration of TubeConsumerGroup so that it can be used as a friend | ||
| 21 | * class to TubeConsumer. | ||
| 22 | */ | ||
| 23 | template<typename ItemT> | ||
| 24 | class TubeConsumerGroup; | ||
| 25 | |||
| 26 | |||
| 27 | /** | ||
| 28 | * Base class for threads that processes items from a tube one by one. Concrete | ||
| 29 | * implementations overwrite the Process() method. | ||
| 30 | */ | ||
| 31 | template<class ItemT> | ||
| 32 | class TubeConsumer : SingleCopy { | ||
| 33 | friend class TubeConsumerGroup<ItemT>; | ||
| 34 | |||
| 35 | public: | ||
| 36 | 548328 | virtual ~TubeConsumer() { } | |
| 37 | |||
| 38 | protected: | ||
| 39 | 545023 | explicit TubeConsumer(Tube<ItemT> *tube) : tube_(tube) { } | |
| 40 | virtual void Process(ItemT *item) = 0; | ||
| 41 | 496914 | virtual void OnTerminate() { } | |
| 42 | |||
| 43 | Tube<ItemT> *tube_; | ||
| 44 | |||
| 45 | private: | ||
| 46 | 507391 | static void *MainConsumer(void *data) { | |
| 47 | 507391 | TubeConsumer<ItemT> *consumer = reinterpret_cast<TubeConsumer<ItemT> *>( | |
| 48 | data); | ||
| 49 | |||
| 50 | 224253333 | while (true) { | |
| 51 | 224760724 | ItemT *item = consumer->tube_->PopFront(); | |
| 52 |
2/2✓ Branch 1 taken 253560 times.
✓ Branch 2 taken 122734017 times.
|
223158781 | if (item->IsQuitBeacon()) { |
| 53 |
1/2✓ Branch 0 taken 253643 times.
✗ Branch 1 not taken.
|
503542 | delete item; |
| 54 | 502994 | break; | |
| 55 | } | ||
| 56 | 222195549 | consumer->Process(item); | |
| 57 | } | ||
| 58 | 502994 | consumer->OnTerminate(); | |
| 59 | 501428 | return NULL; | |
| 60 | } | ||
| 61 | }; | ||
| 62 | |||
| 63 | |||
| 64 | template<class ItemT> | ||
| 65 | class TubeConsumerGroup : SingleCopy { | ||
| 66 | public: | ||
| 67 | 27563 | TubeConsumerGroup() : is_active_(false) { } | |
| 68 | |||
| 69 | 27550 | ~TubeConsumerGroup() { | |
| 70 |
2/2✓ Branch 1 taken 274164 times.
✓ Branch 2 taken 15438 times.
|
572300 | for (unsigned i = 0; i < consumers_.size(); ++i) |
| 71 |
1/2✓ Branch 1 taken 274164 times.
✗ Branch 2 not taken.
|
544750 | delete consumers_[i]; |
| 72 | 27550 | } | |
| 73 | |||
| 74 | 545023 | void TakeConsumer(TubeConsumer<ItemT> *consumer) { | |
| 75 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 274301 times.
|
545023 | assert(!is_active_); |
| 76 | 545023 | consumers_.push_back(consumer); | |
| 77 | 545023 | } | |
| 78 | |||
| 79 | 26387 | void Spawn() { | |
| 80 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 14857 times.
|
26387 | assert(!is_active_); |
| 81 | 26387 | unsigned N = consumers_.size(); | |
| 82 | 26387 | threads_.resize(N); | |
| 83 |
2/2✓ Branch 0 taken 255485 times.
✓ Branch 1 taken 14857 times.
|
533778 | for (unsigned i = 0; i < N; ++i) { |
| 84 | 507391 | int retval = pthread_create( | |
| 85 | 507391 | &threads_[i], NULL, TubeConsumer<ItemT>::MainConsumer, consumers_[i]); | |
| 86 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 255485 times.
|
507391 | if (retval != 0) { |
| 87 | ✗ | PANIC(kLogStderr, "failed to create new thread (error: %d, pid: %d)", | |
| 88 | errno, getpid()); | ||
| 89 | } | ||
| 90 | } | ||
| 91 | 26387 | is_active_ = true; | |
| 92 | 26387 | } | |
| 93 | |||
| 94 | 26374 | void Terminate() { | |
| 95 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 14850 times.
|
26374 | assert(is_active_); |
| 96 | 26374 | unsigned N = consumers_.size(); | |
| 97 |
2/2✓ Branch 0 taken 255348 times.
✓ Branch 1 taken 14850 times.
|
533492 | for (unsigned i = 0; i < N; ++i) { |
| 98 | 507118 | consumers_[i]->tube_->EnqueueBack(ItemT::CreateQuitBeacon()); | |
| 99 | } | ||
| 100 |
2/2✓ Branch 0 taken 255348 times.
✓ Branch 1 taken 14850 times.
|
533492 | for (unsigned i = 0; i < N; ++i) { |
| 101 | 507118 | int retval = pthread_join(threads_[i], NULL); | |
| 102 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 255348 times.
|
507118 | assert(retval == 0); |
| 103 | } | ||
| 104 | 26374 | is_active_ = false; | |
| 105 | 26374 | } | |
| 106 | |||
| 107 | 3326 | bool is_active() { return is_active_; } | |
| 108 | |||
| 109 | private: | ||
| 110 | bool is_active_; | ||
| 111 | std::vector<TubeConsumer<ItemT> *> consumers_; | ||
| 112 | std::vector<pthread_t> threads_; | ||
| 113 | }; | ||
| 114 | |||
| 115 | #endif // CVMFS_INGESTION_TASK_H_ | ||
| 116 |