CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
task_read.cc
Go to the documentation of this file.
1 
5 #include "ingestion/task_read.h"
6 
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <unistd.h>
10 
11 #include <cstring>
12 
13 #include "backoff.h"
14 #include "util/exception.h"
15 #include "util/logging.h"
16 #include "util/platform.h"
17 #include "util/posix.h"
18 #include "util/smalloc.h"
19 
20 
22 
23 
27  atomic_inc64(&n_block_);
28  do {
29  throttle.Throttle();
31  }
32 
33  if (item->Open() == false) {
34  PANIC(kLogStderr, "failed to open %s (%d)", item->path().c_str(), errno);
35  }
36  uint64_t size;
37  if (item->GetSize(&size) == false) {
38  PANIC(kLogStderr, "failed to fstat %s (%d)", item->path().c_str(), errno);
39  }
40  item->set_size(size);
41 
42  if (item->may_have_chunks()) {
43  item->set_may_have_chunks(
44  item->chunk_detector()->MightFindChunks(item->size()));
45  }
46 
47  unsigned char *buffer[kBlockSize];
48  uint64_t tag = atomic_xadd64(&tag_seq_, 1);
49  ssize_t nbytes = -1;
50  unsigned cnt = 0;
51  do {
52  nbytes = item->Read(buffer, kBlockSize);
53  if (nbytes < 0) {
54  PANIC(kLogStderr, "failed to read %s (%d)", item->path().c_str(), errno);
55  }
56 
57  BlockItem *block_item = new BlockItem(tag, allocator_);
58  block_item->SetFileItem(item);
59  if (nbytes == 0) {
60  item->Close();
61  block_item->MakeStop();
62  } else {
63  block_item->MakeDataCopy(reinterpret_cast<unsigned char *>(buffer),
64  nbytes);
65  }
66  tubes_out_->Dispatch(block_item);
67 
68  cnt++;
69  if ((cnt % 32) == 0) {
70  if ((high_watermark_ > 0) &&
72  {
73  throttle.Throttle();
74  }
75  }
76  } while (nbytes > 0);
77 }
78 
79 
80 void TaskRead::SetWatermarks(uint64_t low, uint64_t high) {
81  assert(high > low);
82  assert(low > 0);
83  low_watermark_ = low;
84  high_watermark_ = high;
85 }
ItemAllocator * allocator_
Definition: task_read.h:51
int64_t atomic_int64
Definition: atomic.h:18
bool GetSize(uint64_t *size)
Definition: item.h:81
TubeGroup< BlockItem > * tubes_out_
Definition: task_read.h:50
uint64_t low_watermark_
Definition: task_read.h:56
#define PANIC(...)
Definition: exception.h:29
bool MightFindChunks(const uint64_t size) const
uint64_t high_watermark_
Definition: task_read.h:61
static const unsigned kBlockSize
Definition: task_read.h:23
static const unsigned kThrottleMaxMs
Definition: task_read.h:21
bool may_have_chunks()
Definition: item.h:64
assert((mem||(size==0))&&"Out Of Memory")
void SetFileItem(FileItem *item)
Definition: item.cc:223
Xor32Detector * chunk_detector()
Definition: item.h:59
void SetWatermarks(uint64_t low, uint64_t high)
Definition: task_read.cc:80
void set_size(uint64_t val)
Definition: item.h:67
void Throttle()
Definition: backoff.cc:50
atomic_int64 n_block_
Definition: task_read.h:65
static const unsigned kThrottleResetMs
Definition: task_read.h:22
void set_may_have_chunks(bool val)
Definition: item.h:68
Tube< ItemT >::Link * Dispatch(ItemT *item)
Definition: tube.h:271
Definition: item.h:34
uint64_t size()
Definition: item.h:58
std::string path()
Definition: item.h:57
static atomic_int64 tag_seq_
Definition: task_read.h:48
bool Open()
Definition: item.h:76
virtual void Process(FileItem *item)
Definition: task_read.cc:24
bool Close()
Definition: item.h:80
ssize_t Read(void *buffer, size_t nbyte)
Definition: item.h:77
void MakeDataCopy(const unsigned char *data, uint32_t size)
Definition: item.cc:189
void MakeStop()
Definition: item.cc:151
static void size_t size
Definition: smalloc.h:54
static uint64_t managed_bytes()
Definition: item.h:222
static const unsigned kThrottleInitMs
Definition: task_read.h:20