GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/ingestion/task.h Lines: 42 42 100.0 %
Date: 2019-02-03 02:48:13 Branches: 53 83 63.9 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#ifndef CVMFS_INGESTION_TASK_H_
6
#define CVMFS_INGESTION_TASK_H_
7
8
#include <pthread.h>
9
10
#include <cassert>
11
#include <vector>
12
13
#include "ingestion/tube.h"
14
#include "util/single_copy.h"
15
16
/**
17
 * Forward declaration of TubeConsumerGroup so that it can be used as a friend
18
 * class to TubeConsumer.
19
 */
20
template<typename ItemT>
21
class TubeConsumerGroup;
22
23
24
/**
25
 * Base class for threads that processes items from a tube one by one.  Concrete
26
 * implementations overwrite the Process() method.
27
 */
28
template <class ItemT>
29
class TubeConsumer : SingleCopy {
30
  friend class TubeConsumerGroup<ItemT>;
31
32
 public:
33

2290
  virtual ~TubeConsumer() { }
34
35
 protected:
36
2290
  explicit TubeConsumer(Tube<ItemT> *tube) : tube_(tube) { }
37
  virtual void Process(ItemT *item) = 0;
38
1842
  virtual void OnTerminate() { }
39
40
  Tube<ItemT> *tube_;
41
42
 private:
43
1906
  static void *MainConsumer(void *data) {
44
    TubeConsumer<ItemT> *consumer =
45
1906
      reinterpret_cast<TubeConsumer<ItemT> *>(data);
46
47
2297721
    while (true) {
48
2299627
      ItemT *item = consumer->tube_->Pop();
49

2299573
      if (item->IsQuitBeacon()) {
50

1906
        delete item;
51
1906
        break;
52
      }
53
2297617
      consumer->Process(item);
54
    }
55
1906
    consumer->OnTerminate();
56
1906
    return NULL;
57
  }
58
};
59
60
61
template <class ItemT>
62
class TubeConsumerGroup : SingleCopy {
63
 public:
64
781
  TubeConsumerGroup() : is_active_(false) { }
65
66
781
  ~TubeConsumerGroup() {
67


3071
    for (unsigned i = 0; i < consumers_.size(); ++i)
68


2290
      delete consumers_[i];
69
781
  }
70
71
2290
  void TakeConsumer(TubeConsumer<ItemT> *consumer) {
72

2290
    assert(!is_active_);
73
2290
    consumers_.push_back(consumer);
74
2290
  }
75
76
769
  void Spawn() {
77

769
    assert(!is_active_);
78
769
    unsigned N = consumers_.size();
79
769
    threads_.resize(N);
80

2675
    for (unsigned i = 0; i < N; ++i) {
81
      int retval = pthread_create(
82
1906
        &threads_[i], NULL, TubeConsumer<ItemT>::MainConsumer, consumers_[i]);
83

1906
      assert(retval == 0);
84
    }
85
769
    is_active_ = true;
86
769
  }
87
88
769
  void Terminate() {
89

769
    assert(is_active_);
90
769
    unsigned N = consumers_.size();
91

2675
    for (unsigned i = 0; i < N; ++i) {
92
1906
      consumers_[i]->tube_->Enqueue(ItemT::CreateQuitBeacon());
93
    }
94

2675
    for (unsigned i = 0; i < N; ++i) {
95
1906
      int retval = pthread_join(threads_[i], NULL);
96

1906
      assert(retval == 0);
97
    }
98
769
    is_active_ = false;
99
769
  }
100
101
231
  bool is_active() { return is_active_; }
102
103
 private:
104
  bool is_active_;
105
  std::vector<TubeConsumer<ItemT> *> consumers_;
106
  std::vector<pthread_t> threads_;
107
};
108
109
#endif  // CVMFS_INGESTION_TASK_H_