GCC Code Coverage Report | |||||||||||||||||||||
|
|||||||||||||||||||||
Line | Branch | Exec | Source |
1 |
/** |
||
2 |
* This file is part of the CernVM File System. |
||
3 |
*/ |
||
4 |
|||
5 |
#ifndef CVMFS_INGESTION_TASK_H_ |
||
6 |
#define CVMFS_INGESTION_TASK_H_ |
||
7 |
|||
8 |
#include <pthread.h> |
||
9 |
|||
10 |
#include <cassert> |
||
11 |
#include <vector> |
||
12 |
|||
13 |
#include "ingestion/tube.h" |
||
14 |
#include "util/single_copy.h" |
||
15 |
|||
16 |
/** |
||
17 |
* Forward declaration of TubeConsumerGroup so that it can be used as a friend |
||
18 |
* class to TubeConsumer. |
||
19 |
*/ |
||
20 |
template<typename ItemT> |
||
21 |
class TubeConsumerGroup; |
||
22 |
|||
23 |
|||
24 |
/** |
||
25 |
* Base class for threads that processes items from a tube one by one. Concrete |
||
26 |
* implementations overwrite the Process() method. |
||
27 |
*/ |
||
28 |
template <class ItemT> |
||
29 |
class TubeConsumer : SingleCopy { |
||
30 |
friend class TubeConsumerGroup<ItemT>; |
||
31 |
|||
32 |
public: |
||
33 |
✗✓✗✓ ✗✓ |
2290 |
virtual ~TubeConsumer() { } |
34 |
|||
35 |
protected: |
||
36 |
2290 |
explicit TubeConsumer(Tube<ItemT> *tube) : tube_(tube) { } |
|
37 |
virtual void Process(ItemT *item) = 0; |
||
38 |
1842 |
virtual void OnTerminate() { } |
|
39 |
|||
40 |
Tube<ItemT> *tube_; |
||
41 |
|||
42 |
private: |
||
43 |
1906 |
static void *MainConsumer(void *data) { |
|
44 |
TubeConsumer<ItemT> *consumer = |
||
45 |
1906 |
reinterpret_cast<TubeConsumer<ItemT> *>(data); |
|
46 |
|||
47 |
2297721 |
while (true) { |
|
48 |
2299627 |
ItemT *item = consumer->tube_->Pop(); |
|
49 |
✓✓✓✓ ✓✓ |
2299573 |
if (item->IsQuitBeacon()) { |
50 |
✓✗✗✓ ✗ |
1906 |
delete item; |
51 |
1906 |
break; |
|
52 |
} |
||
53 |
2297617 |
consumer->Process(item); |
|
54 |
} |
||
55 |
1906 |
consumer->OnTerminate(); |
|
56 |
1906 |
return NULL; |
|
57 |
} |
||
58 |
}; |
||
59 |
|||
60 |
|||
61 |
template <class ItemT> |
||
62 |
class TubeConsumerGroup : SingleCopy { |
||
63 |
public: |
||
64 |
781 |
TubeConsumerGroup() : is_active_(false) { } |
|
65 |
|||
66 |
781 |
~TubeConsumerGroup() { |
|
67 |
✓✓✗✗ ✓✓✓✓ |
3071 |
for (unsigned i = 0; i < consumers_.size(); ++i) |
68 |
✓✗✗✗ ✗✗✓✗ ✓✗ |
2290 |
delete consumers_[i]; |
69 |
781 |
} |
|
70 |
|||
71 |
2290 |
void TakeConsumer(TubeConsumer<ItemT> *consumer) { |
|
72 |
✗✓✗✓ ✗✓ |
2290 |
assert(!is_active_); |
73 |
2290 |
consumers_.push_back(consumer); |
|
74 |
2290 |
} |
|
75 |
|||
76 |
769 |
void Spawn() { |
|
77 |
✗✓✗✓ ✗✓ |
769 |
assert(!is_active_); |
78 |
769 |
unsigned N = consumers_.size(); |
|
79 |
769 |
threads_.resize(N); |
|
80 |
✓✓✓✓ ✓✓ |
2675 |
for (unsigned i = 0; i < N; ++i) { |
81 |
int retval = pthread_create( |
||
82 |
1906 |
&threads_[i], NULL, TubeConsumer<ItemT>::MainConsumer, consumers_[i]); |
|
83 |
✗✓✗✓ ✗✓ |
1906 |
assert(retval == 0); |
84 |
} |
||
85 |
769 |
is_active_ = true; |
|
86 |
769 |
} |
|
87 |
|||
88 |
769 |
void Terminate() { |
|
89 |
✗✓✗✓ ✗✓ |
769 |
assert(is_active_); |
90 |
769 |
unsigned N = consumers_.size(); |
|
91 |
✓✓✓✓ ✓✓ |
2675 |
for (unsigned i = 0; i < N; ++i) { |
92 |
1906 |
consumers_[i]->tube_->Enqueue(ItemT::CreateQuitBeacon()); |
|
93 |
} |
||
94 |
✓✓✓✓ ✓✓ |
2675 |
for (unsigned i = 0; i < N; ++i) { |
95 |
1906 |
int retval = pthread_join(threads_[i], NULL); |
|
96 |
✗✓✗✓ ✗✓ |
1906 |
assert(retval == 0); |
97 |
} |
||
98 |
769 |
is_active_ = false; |
|
99 |
769 |
} |
|
100 |
|||
101 |
231 |
bool is_active() { return is_active_; } |
|
102 |
|||
103 |
private: |
||
104 |
bool is_active_; |
||
105 |
std::vector<TubeConsumer<ItemT> *> consumers_; |
||
106 |
std::vector<pthread_t> threads_; |
||
107 |
}; |
||
108 |
|||
109 |
#endif // CVMFS_INGESTION_TASK_H_ |
Generated by: GCOVR (Version 4.1) |