GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/ingestion/task_read.cc Lines: 34 42 81.0 %
Date: 2019-02-03 02:48:13 Branches: 24 32 75.0 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#include "ingestion/task_read.h"
6
7
#include <errno.h>
8
#include <fcntl.h>
9
#include <unistd.h>
10
11
#include <cstring>
12
13
#include "backoff.h"
14
#include "logging.h"
15
#include "platform.h"
16
#include "smalloc.h"
17
#include "util/posix.h"
18
19
20
atomic_int64 TaskRead::tag_seq_ = 0;
21
22
23
155
void TaskRead::Process(FileItem *item) {
24
155
  BackoffThrottle throttle(kThrottleInitMs, kThrottleMaxMs, kThrottleResetMs);
25

155
  if ((high_watermark_ > 0) && (BlockItem::managed_bytes() > high_watermark_)) {
26
1
    atomic_inc64(&n_block_);
27
2
    do {
28
2
      throttle.Throttle();
29
    } while (BlockItem::managed_bytes() > low_watermark_);
30
  }
31
32
155
  if (item->Open() == false) {
33
    LogCvmfs(kLogCvmfs, kLogStderr, "failed to open %s (%d)",
34
             item->path().c_str(), errno);
35
    abort();
36
  }
37
  uint64_t size;
38
155
  if (item->GetSize(&size) == false) {
39
    LogCvmfs(kLogCvmfs, kLogStderr, "failed to fstat %s (%d)",
40
             item->path().c_str(), errno);
41
    abort();
42
  }
43
155
  item->set_size(size);
44
45
155
  if (item->may_have_chunks()) {
46
    item->set_may_have_chunks(
47
99
      item->chunk_detector()->MightFindChunks(item->size()));
48
  }
49
50
  unsigned char *buffer[kBlockSize];
51
155
  uint64_t tag = atomic_xadd64(&tag_seq_, 1);
52
155
  ssize_t nbytes = -1;
53
155
  unsigned cnt = 0;
54
216390
  do {
55
216390
    nbytes = item->Read(buffer, kBlockSize);
56
216390
    if (nbytes < 0) {
57
      LogCvmfs(kLogCvmfs, kLogStderr, "failed to read %s (%d)",
58
               item->path().c_str(), errno);
59
      abort();
60
    }
61
62
216390
    BlockItem *block_item = new BlockItem(tag, allocator_);
63
216382
    block_item->SetFileItem(item);
64
216382
    if (nbytes == 0) {
65
155
      item->Close();
66
155
      block_item->MakeStop();
67
    } else {
68
      block_item->MakeDataCopy(reinterpret_cast<unsigned char *>(buffer),
69
216227
                               nbytes);
70
    }
71
216390
    tubes_out_->Dispatch(block_item);
72
73
215514
    cnt++;
74
215514
    if ((cnt % 32) == 0) {
75

6752
      if ((high_watermark_ > 0) &&
76
          (BlockItem::managed_bytes() > high_watermark_))
77
      {
78
        throttle.Throttle();
79
      }
80
    }
81
  } while (nbytes > 0);
82
155
}
83
84
85
705
void TaskRead::SetWatermarks(uint64_t low, uint64_t high) {
86
705
  assert(high > low);
87
705
  assert(low > 0);
88
705
  low_watermark_ = low;
89
705
  high_watermark_ = high;
90
705
}