CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
task_read.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_INGESTION_TASK_READ_H_
6 #define CVMFS_INGESTION_TASK_READ_H_
7 
8 #include <stdint.h>
9 
10 #include "ingestion/item.h"
11 #include "ingestion/task.h"
12 #include "util/atomic.h"
13 #include "util/posix.h"
14 #include "util/tube.h"
15 
16 class ItemAllocator;
17 
18 class TaskRead : public TubeConsumer<FileItem> {
19  public:
20  static const unsigned kThrottleInitMs = 50;
21  static const unsigned kThrottleMaxMs = 500;
22  static const unsigned kThrottleResetMs = 2000;
23  static const unsigned kBlockSize = kPageSize * 4;
24 
26  Tube<FileItem> *tube_in,
27  TubeGroup<BlockItem> *tubes_out,
28  ItemAllocator *allocator)
29  : TubeConsumer<FileItem>(tube_in)
30  , tubes_out_(tubes_out)
31  , allocator_(allocator)
32  , low_watermark_(0)
33  , high_watermark_(0)
34  { atomic_init64(&n_block_); }
35 
36  void SetWatermarks(uint64_t low, uint64_t high);
37 
38  uint64_t n_block() { return atomic_read64(&n_block_); }
39 
40  protected:
41  virtual void Process(FileItem *item);
42 
43  private:
49 
56  uint64_t low_watermark_;
61  uint64_t high_watermark_;
66 };
67 
68 #endif // CVMFS_INGESTION_TASK_READ_H_
ItemAllocator * allocator_
Definition: task_read.h:51
uint64_t n_block()
Definition: task_read.h:38
int64_t atomic_int64
Definition: atomic.h:18
TubeGroup< BlockItem > * tubes_out_
Definition: task_read.h:50
uint64_t low_watermark_
Definition: task_read.h:56
uint64_t high_watermark_
Definition: task_read.h:61
static const unsigned kBlockSize
Definition: task_read.h:23
static const unsigned kThrottleMaxMs
Definition: task_read.h:21
TaskRead(Tube< FileItem > *tube_in, TubeGroup< BlockItem > *tubes_out, ItemAllocator *allocator)
Definition: task_read.h:25
void SetWatermarks(uint64_t low, uint64_t high)
Definition: task_read.cc:80
atomic_int64 n_block_
Definition: task_read.h:65
static const unsigned kThrottleResetMs
Definition: task_read.h:22
Definition: item.h:34
static atomic_int64 tag_seq_
Definition: task_read.h:48
const unsigned kPageSize
Definition: posix.h:30
virtual void Process(FileItem *item)
Definition: task_read.cc:24
static const unsigned kThrottleInitMs
Definition: task_read.h:20