CernVM-FS  2.9.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
tube.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_INGESTION_TUBE_H_
6 #define CVMFS_INGESTION_TUBE_H_
7 
8 #include <pthread.h>
9 #include <stdint.h>
10 
11 #include <cassert>
12 #include <vector>
13 
14 #include "atomic.h"
15 #include "util/pointer.h"
16 #include "util/single_copy.h"
17 #include "util_concurrency.h"
18 
38 template <class ItemT>
39 class Tube : SingleCopy {
40  public:
41  class Link : SingleCopy {
42  friend class Tube<ItemT>;
43  public:
44  explicit Link(ItemT *item) : item_(item), next_(NULL), prev_(NULL) { }
45  ItemT *item() { return item_; }
46 
47  private:
48  ItemT *item_;
51  };
52 
53  Tube() : limit_(uint64_t(-1)), size_(0) { Init(); }
54  explicit Tube(uint64_t limit) : limit_(limit), size_(0) {
55  Init();
56  }
57  ~Tube() {
58  Link *cursor = head_;
59  do {
60  Link *prev = cursor->prev_;
61  delete cursor;
62  cursor = prev;
63  } while (cursor != head_);
64  pthread_cond_destroy(&cond_populated_);
65  pthread_cond_destroy(&cond_capacious_);
66  pthread_cond_destroy(&cond_empty_);
67  pthread_mutex_destroy(&lock_);
68  }
69 
73  Link *EnqueueBack(ItemT *item) {
74  assert(item != NULL);
75  MutexLockGuard lock_guard(&lock_);
76  while (size_ == limit_)
77  pthread_cond_wait(&cond_capacious_, &lock_);
78 
79  Link *link = new Link(item);
80  link->next_ = head_->next_;
81  link->prev_ = head_;
82  head_->next_->prev_ = link;
83  head_->next_ = link;
84  size_++;
85  int retval = pthread_cond_signal(&cond_populated_);
86  assert(retval == 0);
87  return link;
88  }
89 
93  Link *EnqueueFront(ItemT *item) {
94  assert(item != NULL);
95  MutexLockGuard lock_guard(&lock_);
96  while (size_ == limit_)
97  pthread_cond_wait(&cond_capacious_, &lock_);
98 
99  Link *link = new Link(item);
100  link->next_ = head_;
101  link->prev_ = head_->prev_;
102  head_->prev_->next_ = link;
103  head_->prev_ = link;
104  size_++;
105  int retval = pthread_cond_signal(&cond_populated_);
106  assert(retval == 0);
107  return link;
108  }
109 
114  ItemT *Slice(Link *link) {
115  MutexLockGuard lock_guard(&lock_);
116  return SliceUnlocked(link);
117  }
118 
123  ItemT *PopFront() {
124  MutexLockGuard lock_guard(&lock_);
125  while (size_ == 0)
126  pthread_cond_wait(&cond_populated_, &lock_);
127  return SliceUnlocked(head_->prev_);
128  }
129 
134  ItemT *PopBack() {
135  MutexLockGuard lock_guard(&lock_);
136  while (size_ == 0)
137  pthread_cond_wait(&cond_populated_, &lock_);
138  return SliceUnlocked(head_->next_);
139  }
140 
144  void Wait() {
145  MutexLockGuard lock_guard(&lock_);
146  while (size_ > 0)
147  pthread_cond_wait(&cond_empty_, &lock_);
148  }
149 
150  bool IsEmpty() {
151  MutexLockGuard lock_guard(&lock_);
152  return size_ == 0;
153  }
154 
155  uint64_t size() {
156  MutexLockGuard lock_guard(&lock_);
157  return size_;
158  }
159 
160  private:
161  void Init() {
162  Link *sentinel = new Link(NULL);
163  head_ = sentinel;
164  head_->next_ = head_->prev_ = sentinel;
165 
166  int retval = pthread_mutex_init(&lock_, NULL);
167  assert(retval == 0);
168  retval = pthread_cond_init(&cond_populated_, NULL);
169  assert(retval == 0);
170  retval = pthread_cond_init(&cond_capacious_, NULL);
171  assert(retval == 0);
172  retval = pthread_cond_init(&cond_empty_, NULL);
173  assert(retval == 0);
174  }
175 
176  ItemT *SliceUnlocked(Link *link) {
177  // Cannot delete the sentinel link
178  assert(link != head_);
179  link->prev_->next_ = link->next_;
180  link->next_->prev_ = link->prev_;
181  ItemT *item = link->item_;
182  delete link;
183  size_--;
184  int retval = pthread_cond_signal(&cond_capacious_);
185  assert(retval == 0);
186  if (size_ == 0) {
187  retval = pthread_cond_broadcast(&cond_empty_);
188  assert(retval == 0);
189  }
190  return item;
191  }
192 
193 
197  uint64_t limit_;
201  uint64_t size_;
205  Link *head_;
209  pthread_mutex_t lock_;
213  pthread_cond_t cond_populated_;
217  pthread_cond_t cond_capacious_;
221  pthread_cond_t cond_empty_;
222 };
223 
224 
230 template <class ItemT>
232  public:
233  TubeGroup() : is_active_(false) {
234  atomic_init32(&round_robin_);
235  }
236 
238  for (unsigned i = 0; i < tubes_.size(); ++i)
239  delete tubes_[i];
240  }
241 
243  assert(!is_active_);
244  tubes_.push_back(t);
245  }
246 
247  void Activate() {
248  assert(!is_active_);
249  assert(!tubes_.empty());
250  is_active_ = true;
251  }
252 
256  typename Tube<ItemT>::Link *Dispatch(ItemT *item) {
258  unsigned tube_idx = (tubes_.size() == 1)
259  ? 0 : (item->tag() % tubes_.size());
260  return tubes_[tube_idx]->EnqueueBack(item);
261  }
262 
266  typename Tube<ItemT>::Link *DispatchAny(ItemT *item) {
268  unsigned tube_idx = (tubes_.size() == 1)
269  ? 0 : (atomic_xadd32(&round_robin_, 1) % tubes_.size());
270  return tubes_[tube_idx]->EnqueueBack(item);
271  }
272 
273  private:
275  std::vector<Tube<ItemT> *> tubes_;
277 };
278 
279 #endif // CVMFS_INGESTION_TUBE_H_
void Wait()
Definition: tube.h:144
pthread_mutex_t lock_
Definition: tube.h:209
bool is_active_
Definition: tube.h:274
TubeGroup()
Definition: tube.h:233
uint64_t size_
Definition: tube.h:201
Link * head_
Definition: tube.h:205
Tube< ItemT >::Link * DispatchAny(ItemT *item)
Definition: tube.h:266
ItemT * Slice(Link *link)
Definition: tube.h:114
assert((mem||(size==0))&&"Out Of Memory")
uint64_t limit_
Definition: tube.h:197
~Tube()
Definition: tube.h:57
ItemT * PopFront()
Definition: tube.h:123
bool IsEmpty()
Definition: tube.h:150
pthread_cond_t cond_empty_
Definition: tube.h:221
atomic_int32 round_robin_
Definition: tube.h:276
int32_t atomic_int32
Definition: atomic.h:17
uint64_t size()
Definition: tube.h:155
void Activate()
Definition: tube.h:247
pthread_cond_t cond_populated_
Definition: tube.h:213
ItemT * SliceUnlocked(Link *link)
Definition: tube.h:176
Link * EnqueueFront(ItemT *item)
Definition: tube.h:93
Tube()
Definition: tube.h:53
Link * EnqueueBack(ItemT *item)
Definition: tube.h:73
~TubeGroup()
Definition: tube.h:237
Tube< ItemT >::Link * Dispatch(ItemT *item)
Definition: tube.h:256
void TakeTube(Tube< ItemT > *t)
Definition: tube.h:242
ItemT * PopBack()
Definition: tube.h:134
void Init()
Definition: tube.h:161
pthread_cond_t cond_capacious_
Definition: tube.h:217
Definition: tube.h:39
Tube(uint64_t limit)
Definition: tube.h:54
std::vector< Tube< ItemT > * > tubes_
Definition: tube.h:275