GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/ingestion/task.h
Date: 2026-03-22 02:40:38
Exec Total Coverage
Lines: 43 44 97.7%
Branches: 18 24 75.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_INGESTION_TASK_H_
6 #define CVMFS_INGESTION_TASK_H_
7
8 #include <errno.h>
9 #include <pthread.h>
10 #include <unistd.h>
11
12 #include <cassert>
13 #include <vector>
14
15 #include "util/exception.h"
16 #include "util/single_copy.h"
17 #include "util/tube.h"
18
19 /**
20 * Forward declaration of TubeConsumerGroup so that it can be used as a friend
21 * class to TubeConsumer.
22 */
23 template<typename ItemT>
24 class TubeConsumerGroup;
25
26
27 /**
28 * Base class for threads that processes items from a tube one by one. Concrete
29 * implementations overwrite the Process() method.
30 */
31 template<class ItemT>
32 class TubeConsumer : SingleCopy {
33 friend class TubeConsumerGroup<ItemT>;
34
35 public:
36 318264 virtual ~TubeConsumer() { }
37
38 protected:
39 314835 explicit TubeConsumer(Tube<ItemT> *tube) : tube_(tube) { }
40 virtual void Process(ItemT *item) = 0;
41 277322 virtual void OnTerminate() { }
42
43 Tube<ItemT> *tube_;
44
45 private:
46 284883 static void *MainConsumer(void *data) {
47 284883 TubeConsumer<ItemT> *consumer = reinterpret_cast<TubeConsumer<ItemT> *>(
48 data);
49
50 13410241 while (true) {
51 13695124 ItemT *item = consumer->tube_->PopFront();
52
2/2
✓ Branch 1 taken 143403 times.
✓ Branch 2 taken 7638179 times.
13615683 if (item->IsQuitBeacon()) {
53
2/2
✓ Branch 0 taken 143233 times.
✓ Branch 1 taken 170 times.
283104 delete item;
54 282412 break;
55 }
56 13307925 consumer->Process(item);
57 }
58 282412 consumer->OnTerminate();
59 281666 return NULL;
60 }
61 };
62
63
64 template<class ItemT>
65 class TubeConsumerGroup : SingleCopy {
66 public:
67 17071 TubeConsumerGroup() : is_active_(false) { }
68
69 17058 ~TubeConsumerGroup() {
70
2/2
✓ Branch 1 taken 159132 times.
✓ Branch 2 taken 10062 times.
331620 for (unsigned i = 0; i < consumers_.size(); ++i)
71
1/2
✓ Branch 1 taken 159132 times.
✗ Branch 2 not taken.
314562 delete consumers_[i];
72 17058 }
73
74 314835 void TakeConsumer(TubeConsumer<ItemT> *consumer) {
75
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 159269 times.
314835 assert(!is_active_);
76 314835 consumers_.push_back(consumer);
77 314835 }
78
79 16135 void Spawn() {
80
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9601 times.
16135 assert(!is_active_);
81 16135 unsigned N = consumers_.size();
82 16135 threads_.resize(N);
83
2/2
✓ Branch 0 taken 144293 times.
✓ Branch 1 taken 9601 times.
301018 for (unsigned i = 0; i < N; ++i) {
84 284883 int retval = pthread_create(
85 284883 &threads_[i], NULL, TubeConsumer<ItemT>::MainConsumer, consumers_[i]);
86
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 144293 times.
284883 if (retval != 0) {
87 PANIC(kLogStderr, "failed to create new thread (error: %d, pid: %d)",
88 errno, getpid());
89 }
90 }
91 16135 is_active_ = true;
92 16135 }
93
94 16122 void Terminate() {
95
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9594 times.
16122 assert(is_active_);
96 16122 unsigned N = consumers_.size();
97
2/2
✓ Branch 0 taken 144156 times.
✓ Branch 1 taken 9594 times.
300732 for (unsigned i = 0; i < N; ++i) {
98 284610 consumers_[i]->tube_->EnqueueBack(ItemT::CreateQuitBeacon());
99 }
100
2/2
✓ Branch 0 taken 144156 times.
✓ Branch 1 taken 9594 times.
300732 for (unsigned i = 0; i < N; ++i) {
101 284610 int retval = pthread_join(threads_[i], NULL);
102
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 144156 times.
284610 assert(retval == 0);
103 }
104 16122 is_active_ = false;
105 16122 }
106
107 3066 bool is_active() { return is_active_; }
108
109 private:
110 bool is_active_;
111 std::vector<TubeConsumer<ItemT> *> consumers_;
112 std::vector<pthread_t> threads_;
113 };
114
115 #endif // CVMFS_INGESTION_TASK_H_
116