CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
tube.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_UTIL_TUBE_H_
6 #define CVMFS_UTIL_TUBE_H_
7 
8 #include <pthread.h>
9 #include <stdint.h>
10 
11 #include <cassert>
12 #include <vector>
13 
14 #include "util/atomic.h"
15 #include "util/concurrency.h"
16 #include "util/pointer.h"
17 #include "util/single_copy.h"
18 
38 template <class ItemT>
39 class Tube : SingleCopy {
40  public:
41  class Link : SingleCopy {
42  friend class Tube<ItemT>;
43  public:
44  explicit Link(ItemT *item) : item_(item), next_(NULL), prev_(NULL) { }
45  ItemT *item() { return item_; }
46 
47  private:
48  ItemT *item_;
51  };
52 
53  Tube() : limit_(uint64_t(-1)), size_(0) { Init(); }
54  explicit Tube(uint64_t limit) : limit_(limit), size_(0) {
55  Init();
56  }
57  ~Tube() {
58  Link *cursor = head_;
59  do {
60  Link *prev = cursor->prev_;
61  delete cursor;
62  cursor = prev;
63  } while (cursor != head_);
64  pthread_cond_destroy(&cond_populated_);
65  pthread_cond_destroy(&cond_capacious_);
66  pthread_cond_destroy(&cond_empty_);
67  pthread_mutex_destroy(&lock_);
68  }
69 
73  Link *EnqueueBack(ItemT *item) {
74  assert(item != NULL);
75  MutexLockGuard lock_guard(&lock_);
76  while (size_ == limit_)
77  pthread_cond_wait(&cond_capacious_, &lock_);
78 
79  Link *link = new Link(item);
80  link->next_ = head_->next_;
81  link->prev_ = head_;
82  head_->next_->prev_ = link;
83  head_->next_ = link;
84  size_++;
85  int retval = pthread_cond_signal(&cond_populated_);
86  assert(retval == 0);
87  return link;
88  }
89 
93  Link *EnqueueFront(ItemT *item) {
94  assert(item != NULL);
95  MutexLockGuard lock_guard(&lock_);
96  while (size_ == limit_)
97  pthread_cond_wait(&cond_capacious_, &lock_);
98 
99  Link *link = new Link(item);
100  link->next_ = head_;
101  link->prev_ = head_->prev_;
102  head_->prev_->next_ = link;
103  head_->prev_ = link;
104  size_++;
105  int retval = pthread_cond_signal(&cond_populated_);
106  assert(retval == 0);
107  return link;
108  }
109 
114  ItemT *Slice(Link *link) {
115  MutexLockGuard lock_guard(&lock_);
116  return SliceUnlocked(link);
117  }
118 
123  ItemT *PopFront() {
124  MutexLockGuard lock_guard(&lock_);
125  while (size_ == 0)
126  pthread_cond_wait(&cond_populated_, &lock_);
127  return SliceUnlocked(head_->prev_);
128  }
129 
137  ItemT *TryPopFront() {
138  MutexLockGuard lock_guard(&lock_);
139  // Note that we don't need to wait for a signal to arrive
140  if (size_ == 0)
141  return NULL;
142  return SliceUnlocked(head_->prev_);
143  }
144 
149  ItemT *PopBack() {
150  MutexLockGuard lock_guard(&lock_);
151  while (size_ == 0)
152  pthread_cond_wait(&cond_populated_, &lock_);
153  return SliceUnlocked(head_->next_);
154  }
155 
159  void Wait() {
160  MutexLockGuard lock_guard(&lock_);
161  while (size_ > 0)
162  pthread_cond_wait(&cond_empty_, &lock_);
163  }
164 
165  bool IsEmpty() {
166  MutexLockGuard lock_guard(&lock_);
167  return size_ == 0;
168  }
169 
170  uint64_t size() {
171  MutexLockGuard lock_guard(&lock_);
172  return size_;
173  }
174 
175  private:
176  void Init() {
177  Link *sentinel = new Link(NULL);
178  head_ = sentinel;
179  head_->next_ = head_->prev_ = sentinel;
180 
181  int retval = pthread_mutex_init(&lock_, NULL);
182  assert(retval == 0);
183  retval = pthread_cond_init(&cond_populated_, NULL);
184  assert(retval == 0);
185  retval = pthread_cond_init(&cond_capacious_, NULL);
186  assert(retval == 0);
187  retval = pthread_cond_init(&cond_empty_, NULL);
188  assert(retval == 0);
189  }
190 
191  ItemT *SliceUnlocked(Link *link) {
192  // Cannot delete the sentinel link
193  assert(link != head_);
194  link->prev_->next_ = link->next_;
195  link->next_->prev_ = link->prev_;
196  ItemT *item = link->item_;
197  delete link;
198  size_--;
199  int retval = pthread_cond_signal(&cond_capacious_);
200  assert(retval == 0);
201  if (size_ == 0) {
202  retval = pthread_cond_broadcast(&cond_empty_);
203  assert(retval == 0);
204  }
205  return item;
206  }
207 
208 
212  uint64_t limit_;
216  uint64_t size_;
220  Link *head_;
224  pthread_mutex_t lock_;
228  pthread_cond_t cond_populated_;
232  pthread_cond_t cond_capacious_;
236  pthread_cond_t cond_empty_;
237 };
238 
239 
245 template <class ItemT>
247  public:
248  TubeGroup() : is_active_(false) {
249  atomic_init32(&round_robin_);
250  }
251 
253  for (unsigned i = 0; i < tubes_.size(); ++i)
254  delete tubes_[i];
255  }
256 
258  assert(!is_active_);
259  tubes_.push_back(t);
260  }
261 
262  void Activate() {
263  assert(!is_active_);
264  assert(!tubes_.empty());
265  is_active_ = true;
266  }
267 
271  typename Tube<ItemT>::Link *Dispatch(ItemT *item) {
273  unsigned tube_idx = (tubes_.size() == 1)
274  ? 0 : (item->tag() % tubes_.size());
275  return tubes_[tube_idx]->EnqueueBack(item);
276  }
277 
281  typename Tube<ItemT>::Link *DispatchAny(ItemT *item) {
283  unsigned tube_idx = (tubes_.size() == 1)
284  ? 0 : (atomic_xadd32(&round_robin_, 1) % tubes_.size());
285  return tubes_[tube_idx]->EnqueueBack(item);
286  }
287 
288  private:
290  std::vector<Tube<ItemT> *> tubes_;
292 };
293 
294 #endif // CVMFS_UTIL_TUBE_H_
void Wait()
Definition: tube.h:159
pthread_mutex_t lock_
Definition: tube.h:224
bool is_active_
Definition: tube.h:289
TubeGroup()
Definition: tube.h:248
uint64_t size_
Definition: tube.h:216
Link * head_
Definition: tube.h:220
Tube< ItemT >::Link * DispatchAny(ItemT *item)
Definition: tube.h:281
ItemT * Slice(Link *link)
Definition: tube.h:114
assert((mem||(size==0))&&"Out Of Memory")
uint64_t limit_
Definition: tube.h:212
~Tube()
Definition: tube.h:57
ItemT * PopFront()
Definition: tube.h:123
bool IsEmpty()
Definition: tube.h:165
ItemT * TryPopFront()
Definition: tube.h:137
pthread_cond_t cond_empty_
Definition: tube.h:236
atomic_int32 round_robin_
Definition: tube.h:291
int32_t atomic_int32
Definition: atomic.h:17
uint64_t size()
Definition: tube.h:170
void Activate()
Definition: tube.h:262
pthread_cond_t cond_populated_
Definition: tube.h:228
ItemT * SliceUnlocked(Link *link)
Definition: tube.h:191
Link * EnqueueFront(ItemT *item)
Definition: tube.h:93
Tube()
Definition: tube.h:53
Link * EnqueueBack(ItemT *item)
Definition: tube.h:73
~TubeGroup()
Definition: tube.h:252
Tube< ItemT >::Link * Dispatch(ItemT *item)
Definition: tube.h:271
void TakeTube(Tube< ItemT > *t)
Definition: tube.h:257
ItemT * PopBack()
Definition: tube.h:149
void Init()
Definition: tube.h:176
Definition: mutex.h:42
pthread_cond_t cond_capacious_
Definition: tube.h:232
Definition: tube.h:39
Tube(uint64_t limit)
Definition: tube.h:54
std::vector< Tube< ItemT > * > tubes_
Definition: tube.h:290