GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/ingestion/tube.h Lines: 102 103 99.0 %
Date: 2019-02-03 02:48:13 Branches: 70 115 60.9 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#ifndef CVMFS_INGESTION_TUBE_H_
6
#define CVMFS_INGESTION_TUBE_H_
7
8
#include <pthread.h>
9
#include <stdint.h>
10
11
#include <cassert>
12
#include <vector>
13
14
#include "atomic.h"
15
#include "util/pointer.h"
16
#include "util/single_copy.h"
17
#include "util_concurrency.h"
18
19
/**
20
 * A thread-safe, doubly linked list of links containing pointers to ItemT.  The
21
 * ItemT elements are not owned by the Tube.  FIFO semantics; items are pushed
22
 * to the back and poped from the front.  Using Slice(), items at arbitrary
23
 * locations in the tube can be removed, too.
24
 *
25
 * The tube links the steps in the file processing pipeline.  It connects
26
 * multiple producers to multiple consumers and can throttle the producers if a
27
 * limit for the tube size is set.
28
 *
29
 * Internally, uses conditional variables to block when threads try to pop from
30
 * the empty tube or insert into the full tube.
31
 */
32
template <class ItemT>
33
class Tube : SingleCopy {
34
 public:
35
  class Link : SingleCopy {
36
    friend class Tube<ItemT>;
37
   public:
38
2321121
    explicit Link(ItemT *item) : item_(item), next_(NULL), prev_(NULL) { }
39
4
    ItemT *item() { return item_; }
40
41
   private:
42
    ItemT *item_;
43
    Link *next_;
44
    Link *prev_;
45
  };
46
47
1270
  Tube() : limit_(uint64_t(-1)), size_(0) { Init(); }
48
88
  explicit Tube(uint64_t limit) : limit_(limit), size_(0) {
49
88
    Init();
50
88
  }
51
1358
  ~Tube() {
52
1358
    Link *cursor = head_;
53


1358
    do {
54
1358
      Link *prev = cursor->prev_;
55
1358
      delete cursor;
56
1358
      cursor = prev;
57
    } while (cursor != head_);
58
1358
    pthread_cond_destroy(&cond_populated_);
59
1358
    pthread_cond_destroy(&cond_capacious_);
60
1358
    pthread_cond_destroy(&cond_empty_);
61
1358
    pthread_mutex_destroy(&lock_);
62
1358
  }
63
64
  /**
65
   * Push an item to the back of the queue.  Block if queue is currently full.
66
   */
67
2319573
  Link *Enqueue(ItemT *item) {
68

2319573
    assert(item != NULL);
69
2319573
    MutexLockGuard lock_guard(&lock_);
70

4639646
    while (size_ == limit_)
71
      pthread_cond_wait(&cond_capacious_, &lock_);
72
73
2319825
    Link *link = new Link(item);
74
2319679
    link->next_ = tail_;
75
2319679
    link->prev_ = tail_->prev_;
76
2319679
    tail_->prev_->next_ = link;
77
2319679
    tail_->prev_ = link;
78
2319679
    tail_ = link;
79
2319679
    size_++;
80
2319679
    int retval = pthread_cond_signal(&cond_populated_);
81

2319920
    assert(retval == 0);
82
2319920
    return link;
83
  }
84
85
  /**
86
   * Remove any link from the queue and return its item, including first/last
87
   * element.
88
   */
89
4
  ItemT *Slice(Link *link) {
90
4
    MutexLockGuard lock_guard(&lock_);
91
4
    return SliceUnlocked(link);
92
  }
93
94
  /**
95
   * Remove and return the first element from the queue.  Block if tube is
96
   * empty.
97
   */
98
2319881
  ItemT *Pop() {
99
2319881
    MutexLockGuard lock_guard(&lock_);
100

4789473
    while (size_ == 0)
101
149628
      pthread_cond_wait(&cond_populated_, &lock_);
102
2319905
    return SliceUnlocked(head_->prev_);
103
  }
104
105
  /**
106
   * Blocks until the tube is empty
107
   */
108
105
  void Wait() {
109
105
    MutexLockGuard lock_guard(&lock_);
110
285
    while (size_ > 0)
111
75
      pthread_cond_wait(&cond_empty_, &lock_);
112
105
  }
113
114
13
  bool IsEmpty() {
115
13
    MutexLockGuard lock_guard(&lock_);
116
13
    return size_ == 0;
117
  }
118
119
38
  uint64_t size() {
120
38
    MutexLockGuard lock_guard(&lock_);
121
38
    return size_;
122
  }
123
124
 private:
125
1358
  void Init() {
126
1358
    Link *sentinel = new Link(NULL);
127
1358
    head_ = tail_ = sentinel;
128
1358
    head_->next_ = head_->prev_ = sentinel;
129
1358
    tail_->next_ = tail_->prev_ = sentinel;
130
131
1358
    int retval = pthread_mutex_init(&lock_, NULL);
132

1358
    assert(retval == 0);
133
1358
    retval = pthread_cond_init(&cond_populated_, NULL);
134

1358
    assert(retval == 0);
135
1358
    retval = pthread_cond_init(&cond_capacious_, NULL);
136

1358
    assert(retval == 0);
137
1358
    retval = pthread_cond_init(&cond_empty_, NULL);
138

1358
    assert(retval == 0);
139
1358
  }
140
141
2319890
  ItemT *SliceUnlocked(Link *link) {
142
2319890
    link->prev_->next_ = link->next_;
143
2319890
    link->next_->prev_ = link->prev_;
144

2319890
    if (link == tail_)
145
143303
      tail_ = head_;
146
2319890
    ItemT *item = link->item_;
147
2319890
    delete link;
148
2319781
    size_--;
149
2319781
    int retval = pthread_cond_signal(&cond_capacious_);
150

2319896
    assert(retval == 0);
151

2319896
    if (size_ == 0) {
152
143307
      retval = pthread_cond_broadcast(&cond_empty_);
153

143303
      assert(retval == 0);
154
    }
155
2319892
    return item;
156
  }
157
158
159
  /**
160
   * Adding new item blocks as long as limit_ == size_
161
   */
162
  uint64_t limit_;
163
  /**
164
   * The current number of links in the list
165
   */
166
  uint64_t size_;
167
  /**
168
   * In front of the first element (next in line for Pop())
169
   */
170
  Link *head_;
171
  /**
172
   * Points to the last inserted element
173
   */
174
  Link *tail_;
175
  /**
176
   * Protects all internal state
177
   */
178
  pthread_mutex_t lock_;
179
  /**
180
   * Signals if there are items enqueued
181
   */
182
  pthread_cond_t cond_populated_;
183
  /**
184
   * Signals if there is space to enqueue more items
185
   */
186
  pthread_cond_t cond_capacious_;
187
  /**
188
   * Signals if the queue runs empty
189
   */
190
  pthread_cond_t cond_empty_;
191
};
192
193
194
/**
195
 * A tube group manages a fixed set of Tubes and dispatches items among them in
196
 * such a way that items with the same tag (a positive integer) are all sent
197
 * to the same tube.
198
 */
199
template <class ItemT>
200
class TubeGroup : SingleCopy {
201
 public:
202
687
  TubeGroup() : is_active_(false) {
203
687
    atomic_init32(&round_robin_);
204
687
  }
205
206
687
  ~TubeGroup() {
207

1837
    for (unsigned i = 0; i < tubes_.size(); ++i)
208

1158
      delete tubes_[i];
209
687
  }
210
211
1150
  void TakeTube(Tube<ItemT> *t) {
212

1150
    assert(!is_active_);
213
1150
    tubes_.push_back(t);
214
1150
  }
215
216
687
  void Activate() {
217

687
    assert(!is_active_);
218

687
    assert(!tubes_.empty());
219
687
    is_active_ = true;
220
687
  }
221
222
  /**
223
   * Like Tube::Enqueue(), but pick a tube according to ItemT::tag()
224
   */
225
2244041
  typename Tube<ItemT>::Link *Dispatch(ItemT *item) {
226
2244041
    assert(is_active_);
227
    unsigned tube_idx = (tubes_.size() == 1)
228
2244041
                        ? 0 : (item->tag() % tubes_.size());
229
2244201
    return tubes_[tube_idx]->Enqueue(item);
230
  }
231
232
  /**
233
   * Like Tube::Enqueue(), use tubes one after another
234
   */
235
151
  typename Tube<ItemT>::Link *DispatchAny(ItemT *item) {
236
151
    assert(is_active_);
237
    unsigned tube_idx = (tubes_.size() == 1)
238
151
                        ? 0 : (atomic_xadd32(&round_robin_, 1) % tubes_.size());
239
151
    return tubes_[tube_idx]->Enqueue(item);
240
  }
241
242
 private:
243
  bool is_active_;
244
  std::vector<Tube<ItemT> *> tubes_;
245
  atomic_int32 round_robin_;
246
};
247
248
#endif  // CVMFS_INGESTION_TUBE_H_