1 |
|
|
/** |
2 |
|
|
* This file is part of the CernVM File System. |
3 |
|
|
*/ |
4 |
|
|
|
5 |
|
|
#include "ingestion/task_read.h" |
6 |
|
|
|
7 |
|
|
#include <errno.h> |
8 |
|
|
#include <fcntl.h> |
9 |
|
|
#include <unistd.h> |
10 |
|
|
|
11 |
|
|
#include <cstring> |
12 |
|
|
|
13 |
|
|
#include "backoff.h" |
14 |
|
|
#include "logging.h" |
15 |
|
|
#include "platform.h" |
16 |
|
|
#include "smalloc.h" |
17 |
|
|
#include "util/posix.h" |
18 |
|
|
|
19 |
|
|
|
20 |
|
|
atomic_int64 TaskRead::tag_seq_ = 0; |
21 |
|
|
|
22 |
|
|
|
23 |
|
155 |
void TaskRead::Process(FileItem *item) { |
24 |
|
155 |
BackoffThrottle throttle(kThrottleInitMs, kThrottleMaxMs, kThrottleResetMs); |
25 |
✓✓✓✓ ✓✓ |
155 |
if ((high_watermark_ > 0) && (BlockItem::managed_bytes() > high_watermark_)) { |
26 |
|
1 |
atomic_inc64(&n_block_); |
27 |
✓✓ |
2 |
do { |
28 |
|
2 |
throttle.Throttle(); |
29 |
|
|
} while (BlockItem::managed_bytes() > low_watermark_); |
30 |
|
|
} |
31 |
|
|
|
32 |
✗✓ |
155 |
if (item->Open() == false) { |
33 |
|
|
LogCvmfs(kLogCvmfs, kLogStderr, "failed to open %s (%d)", |
34 |
|
|
item->path().c_str(), errno); |
35 |
|
|
abort(); |
36 |
|
|
} |
37 |
|
|
uint64_t size; |
38 |
✗✓ |
155 |
if (item->GetSize(&size) == false) { |
39 |
|
|
LogCvmfs(kLogCvmfs, kLogStderr, "failed to fstat %s (%d)", |
40 |
|
|
item->path().c_str(), errno); |
41 |
|
|
abort(); |
42 |
|
|
} |
43 |
|
155 |
item->set_size(size); |
44 |
|
|
|
45 |
✓✓ |
155 |
if (item->may_have_chunks()) { |
46 |
|
|
item->set_may_have_chunks( |
47 |
|
99 |
item->chunk_detector()->MightFindChunks(item->size())); |
48 |
|
|
} |
49 |
|
|
|
50 |
|
|
unsigned char *buffer[kBlockSize]; |
51 |
|
155 |
uint64_t tag = atomic_xadd64(&tag_seq_, 1); |
52 |
|
155 |
ssize_t nbytes = -1; |
53 |
|
155 |
unsigned cnt = 0; |
54 |
✓✓ |
216390 |
do { |
55 |
|
216390 |
nbytes = item->Read(buffer, kBlockSize); |
56 |
✗✓ |
216390 |
if (nbytes < 0) { |
57 |
|
|
LogCvmfs(kLogCvmfs, kLogStderr, "failed to read %s (%d)", |
58 |
|
|
item->path().c_str(), errno); |
59 |
|
|
abort(); |
60 |
|
|
} |
61 |
|
|
|
62 |
|
216390 |
BlockItem *block_item = new BlockItem(tag, allocator_); |
63 |
|
216382 |
block_item->SetFileItem(item); |
64 |
✓✓ |
216382 |
if (nbytes == 0) { |
65 |
|
155 |
item->Close(); |
66 |
|
155 |
block_item->MakeStop(); |
67 |
|
|
} else { |
68 |
|
|
block_item->MakeDataCopy(reinterpret_cast<unsigned char *>(buffer), |
69 |
|
216227 |
nbytes); |
70 |
|
|
} |
71 |
|
216390 |
tubes_out_->Dispatch(block_item); |
72 |
|
|
|
73 |
|
215514 |
cnt++; |
74 |
✓✓ |
215514 |
if ((cnt % 32) == 0) { |
75 |
✓✗✗✓ ✗✓ |
6752 |
if ((high_watermark_ > 0) && |
76 |
|
|
(BlockItem::managed_bytes() > high_watermark_)) |
77 |
|
|
{ |
78 |
|
|
throttle.Throttle(); |
79 |
|
|
} |
80 |
|
|
} |
81 |
|
|
} while (nbytes > 0); |
82 |
|
155 |
} |
83 |
|
|
|
84 |
|
|
|
85 |
|
705 |
void TaskRead::SetWatermarks(uint64_t low, uint64_t high) { |
86 |
✗✓ |
705 |
assert(high > low); |
87 |
✗✓ |
705 |
assert(low > 0); |
88 |
|
705 |
low_watermark_ = low; |
89 |
|
705 |
high_watermark_ = high; |
90 |
|
705 |
} |