CernVM-FS  2.13.0
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
tube.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_UTIL_TUBE_H_
6 #define CVMFS_UTIL_TUBE_H_
7 
8 #include <pthread.h>
9 #include <stdint.h>
10 
11 #include <cassert>
12 #include <vector>
13 
14 #include "util/atomic.h"
15 #include "util/concurrency.h"
16 #include "util/pointer.h"
17 #include "util/single_copy.h"
18 
38 template<class ItemT>
39 class Tube : SingleCopy {
40  public:
41  class Link : SingleCopy {
42  friend class Tube<ItemT>;
43 
44  public:
45  explicit Link(ItemT *item) : item_(item), next_(NULL), prev_(NULL) { }
46  ItemT *item() { return item_; }
47 
48  private:
49  ItemT *item_;
52  };
53 
54  Tube() : limit_(uint64_t(-1)), size_(0) { Init(); }
55  explicit Tube(uint64_t limit) : limit_(limit), size_(0) { Init(); }
56  ~Tube() {
57  Link *cursor = head_;
58  do {
59  Link *prev = cursor->prev_;
60  delete cursor;
61  cursor = prev;
62  } while (cursor != head_);
63  pthread_cond_destroy(&cond_populated_);
64  pthread_cond_destroy(&cond_capacious_);
65  pthread_cond_destroy(&cond_empty_);
66  pthread_mutex_destroy(&lock_);
67  }
68 
72  Link *EnqueueBack(ItemT *item) {
73  assert(item != NULL);
74  MutexLockGuard lock_guard(&lock_);
75  while (size_ == limit_)
76  pthread_cond_wait(&cond_capacious_, &lock_);
77 
78  Link *link = new Link(item);
79  link->next_ = head_->next_;
80  link->prev_ = head_;
81  head_->next_->prev_ = link;
82  head_->next_ = link;
83  size_++;
84  int retval = pthread_cond_signal(&cond_populated_);
85  assert(retval == 0);
86  return link;
87  }
88 
92  Link *EnqueueFront(ItemT *item) {
93  assert(item != NULL);
94  MutexLockGuard lock_guard(&lock_);
95  while (size_ == limit_)
96  pthread_cond_wait(&cond_capacious_, &lock_);
97 
98  Link *link = new Link(item);
99  link->next_ = head_;
100  link->prev_ = head_->prev_;
101  head_->prev_->next_ = link;
102  head_->prev_ = link;
103  size_++;
104  int retval = pthread_cond_signal(&cond_populated_);
105  assert(retval == 0);
106  return link;
107  }
108 
113  ItemT *Slice(Link *link) {
114  MutexLockGuard lock_guard(&lock_);
115  return SliceUnlocked(link);
116  }
117 
122  ItemT *PopFront() {
123  MutexLockGuard lock_guard(&lock_);
124  while (size_ == 0)
125  pthread_cond_wait(&cond_populated_, &lock_);
126  return SliceUnlocked(head_->prev_);
127  }
128 
136  ItemT *TryPopFront() {
137  MutexLockGuard lock_guard(&lock_);
138  // Note that we don't need to wait for a signal to arrive
139  if (size_ == 0)
140  return NULL;
141  return SliceUnlocked(head_->prev_);
142  }
143 
148  ItemT *PopBack() {
149  MutexLockGuard lock_guard(&lock_);
150  while (size_ == 0)
151  pthread_cond_wait(&cond_populated_, &lock_);
152  return SliceUnlocked(head_->next_);
153  }
154 
158  void Wait() {
159  MutexLockGuard lock_guard(&lock_);
160  while (size_ > 0)
161  pthread_cond_wait(&cond_empty_, &lock_);
162  }
163 
164  bool IsEmpty() {
165  MutexLockGuard lock_guard(&lock_);
166  return size_ == 0;
167  }
168 
169  uint64_t size() {
170  MutexLockGuard lock_guard(&lock_);
171  return size_;
172  }
173 
174  private:
175  void Init() {
176  Link *sentinel = new Link(NULL);
177  head_ = sentinel;
178  head_->next_ = head_->prev_ = sentinel;
179 
180  int retval = pthread_mutex_init(&lock_, NULL);
181  assert(retval == 0);
182  retval = pthread_cond_init(&cond_populated_, NULL);
183  assert(retval == 0);
184  retval = pthread_cond_init(&cond_capacious_, NULL);
185  assert(retval == 0);
186  retval = pthread_cond_init(&cond_empty_, NULL);
187  assert(retval == 0);
188  }
189 
190  ItemT *SliceUnlocked(Link *link) {
191  // Cannot delete the sentinel link
192  assert(link != head_);
193  link->prev_->next_ = link->next_;
194  link->next_->prev_ = link->prev_;
195  ItemT *item = link->item_;
196  delete link;
197  size_--;
198  int retval = pthread_cond_signal(&cond_capacious_);
199  assert(retval == 0);
200  if (size_ == 0) {
201  retval = pthread_cond_broadcast(&cond_empty_);
202  assert(retval == 0);
203  }
204  return item;
205  }
206 
207 
211  uint64_t limit_;
215  uint64_t size_;
219  Link *head_;
223  pthread_mutex_t lock_;
227  pthread_cond_t cond_populated_;
231  pthread_cond_t cond_capacious_;
235  pthread_cond_t cond_empty_;
236 };
237 
238 
244 template<class ItemT>
246  public:
247  TubeGroup() : is_active_(false) { atomic_init32(&round_robin_); }
248 
250  for (unsigned i = 0; i < tubes_.size(); ++i)
251  delete tubes_[i];
252  }
253 
255  assert(!is_active_);
256  tubes_.push_back(t);
257  }
258 
259  void Activate() {
260  assert(!is_active_);
261  assert(!tubes_.empty());
262  is_active_ = true;
263  }
264 
268  typename Tube<ItemT>::Link *Dispatch(ItemT *item) {
270  unsigned tube_idx = (tubes_.size() == 1) ? 0
271  : (item->tag() % tubes_.size());
272  return tubes_[tube_idx]->EnqueueBack(item);
273  }
274 
278  typename Tube<ItemT>::Link *DispatchAny(ItemT *item) {
280  unsigned tube_idx = (tubes_.size() == 1)
281  ? 0
282  : (atomic_xadd32(&round_robin_, 1) % tubes_.size());
283  return tubes_[tube_idx]->EnqueueBack(item);
284  }
285 
286  private:
288  std::vector<Tube<ItemT> *> tubes_;
290 };
291 
292 #endif // CVMFS_UTIL_TUBE_H_
void Wait()
Definition: tube.h:158
pthread_mutex_t lock_
Definition: tube.h:223
bool is_active_
Definition: tube.h:287
TubeGroup()
Definition: tube.h:247
uint64_t size_
Definition: tube.h:215
Link * head_
Definition: tube.h:219
Tube< ItemT >::Link * DispatchAny(ItemT *item)
Definition: tube.h:278
ItemT * Slice(Link *link)
Definition: tube.h:113
assert((mem||(size==0))&&"Out Of Memory")
uint64_t limit_
Definition: tube.h:211
~Tube()
Definition: tube.h:56
ItemT * PopFront()
Definition: tube.h:122
bool IsEmpty()
Definition: tube.h:164
ItemT * TryPopFront()
Definition: tube.h:136
pthread_cond_t cond_empty_
Definition: tube.h:235
atomic_int32 round_robin_
Definition: tube.h:289
int32_t atomic_int32
Definition: atomic.h:17
uint64_t size()
Definition: tube.h:169
void Activate()
Definition: tube.h:259
pthread_cond_t cond_populated_
Definition: tube.h:227
ItemT * SliceUnlocked(Link *link)
Definition: tube.h:190
Link * EnqueueFront(ItemT *item)
Definition: tube.h:92
Tube()
Definition: tube.h:54
Link * EnqueueBack(ItemT *item)
Definition: tube.h:72
~TubeGroup()
Definition: tube.h:249
Tube< ItemT >::Link * Dispatch(ItemT *item)
Definition: tube.h:268
void TakeTube(Tube< ItemT > *t)
Definition: tube.h:254
ItemT * PopBack()
Definition: tube.h:148
void Init()
Definition: tube.h:175
Definition: mutex.h:42
pthread_cond_t cond_capacious_
Definition: tube.h:231
Definition: tube.h:39
Tube(uint64_t limit)
Definition: tube.h:55
std::vector< Tube< ItemT > * > tubes_
Definition: tube.h:288