CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
task.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_INGESTION_TASK_H_
6 #define CVMFS_INGESTION_TASK_H_
7 
8 #include <errno.h>
9 #include <pthread.h>
10 #include <unistd.h>
11 
12 #include <cassert>
13 #include <vector>
14 
15 #include "util/exception.h"
16 #include "util/single_copy.h"
17 #include "util/tube.h"
18 
23 template<typename ItemT>
25 
26 
31 template <class ItemT>
33  friend class TubeConsumerGroup<ItemT>;
34 
35  public:
36  virtual ~TubeConsumer() { }
37 
38  protected:
39  explicit TubeConsumer(Tube<ItemT> *tube) : tube_(tube) { }
40  virtual void Process(ItemT *item) = 0;
41  virtual void OnTerminate() { }
42 
44 
45  private:
46  static void *MainConsumer(void *data) {
47  TubeConsumer<ItemT> *consumer =
48  reinterpret_cast<TubeConsumer<ItemT> *>(data);
49 
50  while (true) {
51  ItemT *item = consumer->tube_->PopFront();
52  if (item->IsQuitBeacon()) {
53  delete item;
54  break;
55  }
56  consumer->Process(item);
57  }
58  consumer->OnTerminate();
59  return NULL;
60  }
61 };
62 
63 
64 template <class ItemT>
66  public:
68 
70  for (unsigned i = 0; i < consumers_.size(); ++i)
71  delete consumers_[i];
72  }
73 
76  consumers_.push_back(consumer);
77  }
78 
79  void Spawn() {
81  unsigned N = consumers_.size();
82  threads_.resize(N);
83  for (unsigned i = 0; i < N; ++i) {
84  int retval = pthread_create(
86  if (retval != 0) {
87  PANIC(kLogStderr, "failed to create new thread (error: %d, pid: %d)",
88  errno, getpid());
89  }
90  }
91  is_active_ = true;
92  }
93 
94  void Terminate() {
96  unsigned N = consumers_.size();
97  for (unsigned i = 0; i < N; ++i) {
98  consumers_[i]->tube_->EnqueueBack(ItemT::CreateQuitBeacon());
99  }
100  for (unsigned i = 0; i < N; ++i) {
101  int retval = pthread_join(threads_[i], NULL);
102  assert(retval == 0);
103  }
104  is_active_ = false;
105  }
106 
107  bool is_active() { return is_active_; }
108 
109  private:
111  std::vector<TubeConsumer<ItemT> *> consumers_;
112  std::vector<pthread_t> threads_;
113 };
114 
115 #endif // CVMFS_INGESTION_TASK_H_
virtual void OnTerminate()
Definition: task.h:41
static void * MainConsumer(void *data)
Definition: task.h:46
std::vector< TubeConsumer< ItemT > * > consumers_
Definition: task.h:111
void TakeConsumer(TubeConsumer< ItemT > *consumer)
Definition: task.h:74
#define PANIC(...)
Definition: exception.h:29
Tube< ItemT > * tube_
Definition: task.h:43
TubeConsumer(Tube< ItemT > *tube)
Definition: task.h:39
assert((mem||(size==0))&&"Out Of Memory")
TubeConsumerGroup()
Definition: task.h:67
bool is_active_
Definition: task.h:110
void Spawn()
Definition: task.h:79
virtual ~TubeConsumer()
Definition: task.h:36
bool is_active()
Definition: task.h:107
std::vector< pthread_t > threads_
Definition: task.h:112
virtual void Process(ItemT *item)=0
Definition: tube.h:39
~TubeConsumerGroup()
Definition: task.h:69
void Terminate()
Definition: task.h:94