1 |
|
|
/** |
2 |
|
|
* This file is part of the CernVM File System. |
3 |
|
|
*/ |
4 |
|
|
|
5 |
|
|
#include <unistd.h> |
6 |
|
|
|
7 |
|
|
#include <cassert> |
8 |
|
|
|
9 |
|
|
#include "ingestion/task_chunk.h" |
10 |
|
|
|
11 |
|
|
/** |
12 |
|
|
* The tags from the read stage in the pipeline and the tags given in the |
13 |
|
|
* chunking stage can safely overlap. Nevertheless, debugging might be easier |
14 |
|
|
* if they don't. So let's start with a high number. |
15 |
|
|
*/ |
16 |
|
|
atomic_int64 TaskChunk::tag_seq_ = 2 << 28; |
17 |
|
|
|
18 |
|
|
/** |
19 |
|
|
* Consumes the stream of input blocks and produces new output blocks according |
20 |
|
|
* to cut marks. The output blocks correspond to chunks. |
21 |
|
|
*/ |
22 |
|
226394 |
void TaskChunk::Process(BlockItem *input_block) { |
23 |
|
226394 |
FileItem *file_item = input_block->file_item(); |
24 |
|
226394 |
int64_t input_tag = input_block->tag(); |
25 |
✓✗✗✓
|
226394 |
assert((file_item != NULL) && (input_tag >= 0)); |
26 |
|
|
|
27 |
|
226394 |
ChunkInfo chunk_info; |
28 |
|
|
// Do we see blocks of the file for the first time? |
29 |
|
226394 |
TagMap::const_iterator iter_find = tag_map_.find(input_tag); |
30 |
✓✓ |
226394 |
if (iter_find == tag_map_.end()) { |
31 |
|
|
// We may have only regular chunks, only a bulk chunk, or both. We may |
32 |
|
|
// end up in a situation where we produced only a single non-bulk chunk. |
33 |
|
|
// This needs to be fixed up later in the pipeline by the write task. |
34 |
✓✓ |
155 |
if (file_item->may_have_chunks()) { |
35 |
|
48 |
chunk_info.next_chunk = new ChunkItem(file_item, 0); |
36 |
|
48 |
chunk_info.output_tag_chunk = atomic_xadd64(&tag_seq_, 1); |
37 |
✓✓ |
48 |
if (file_item->has_legacy_bulk_chunk()) { |
38 |
|
41 |
chunk_info.bulk_chunk = new ChunkItem(file_item, 0); |
39 |
|
|
} |
40 |
|
|
} else { |
41 |
|
107 |
chunk_info.bulk_chunk = new ChunkItem(file_item, 0); |
42 |
|
|
} |
43 |
|
|
|
44 |
✓✓ |
155 |
if (chunk_info.bulk_chunk != NULL) { |
45 |
|
148 |
chunk_info.bulk_chunk->MakeBulkChunk(); |
46 |
|
148 |
chunk_info.bulk_chunk->set_size(file_item->size()); |
47 |
|
148 |
chunk_info.output_tag_bulk = atomic_xadd64(&tag_seq_, 1); |
48 |
|
|
} |
49 |
|
155 |
tag_map_[input_tag] = chunk_info; |
50 |
|
|
} else { |
51 |
|
226239 |
chunk_info = iter_find->second; |
52 |
|
|
} |
53 |
✓✓✗✓
|
226394 |
assert((chunk_info.bulk_chunk != NULL) || (chunk_info.next_chunk != NULL)); |
54 |
|
|
|
55 |
|
226394 |
BlockItem *output_block_bulk = NULL; |
56 |
✓✓ |
226394 |
if (chunk_info.bulk_chunk != NULL) { |
57 |
|
215347 |
output_block_bulk = new BlockItem(chunk_info.output_tag_bulk, allocator_); |
58 |
|
215347 |
output_block_bulk->SetFileItem(file_item); |
59 |
|
215347 |
output_block_bulk->SetChunkItem(chunk_info.bulk_chunk); |
60 |
|
|
} |
61 |
|
|
|
62 |
|
226394 |
ChunkDetector *chunk_detector = file_item->chunk_detector(); |
63 |
✓✓✗ |
226394 |
switch (input_block->type()) { |
64 |
|
|
case BlockItem::kBlockStop: |
65 |
|
|
// End of the file, no more new chunks |
66 |
|
155 |
file_item->set_is_fully_chunked(); |
67 |
✓✓ |
155 |
if (output_block_bulk) output_block_bulk->MakeStop(); |
68 |
✓✓ |
155 |
if (chunk_info.next_chunk != NULL) { |
69 |
✗✓ |
48 |
assert(file_item->size() >= chunk_info.next_chunk->offset()); |
70 |
|
|
chunk_info.next_chunk->set_size( |
71 |
|
48 |
file_item->size() - chunk_info.next_chunk->offset()); |
72 |
|
|
BlockItem *block_stop = |
73 |
|
48 |
new BlockItem(chunk_info.output_tag_chunk, allocator_); |
74 |
|
48 |
block_stop->SetFileItem(file_item); |
75 |
|
48 |
block_stop->SetChunkItem(chunk_info.next_chunk); |
76 |
|
48 |
block_stop->MakeStop(); |
77 |
|
48 |
tubes_out_->Dispatch(block_stop); |
78 |
|
|
} |
79 |
|
155 |
tag_map_.erase(input_tag); |
80 |
|
155 |
break; |
81 |
|
|
|
82 |
|
|
case BlockItem::kBlockData: |
83 |
✓✓ |
226239 |
if (output_block_bulk) { |
84 |
✓✓ |
215199 |
if (chunk_info.next_chunk != NULL) { |
85 |
|
|
// Reserve zero-copy for the regular chunk |
86 |
|
|
output_block_bulk->MakeDataCopy(input_block->data(), |
87 |
|
108544 |
input_block->size()); |
88 |
|
|
} else { |
89 |
|
|
// There is only the bulk chunk, zero copy |
90 |
|
106655 |
output_block_bulk->MakeDataMove(input_block); |
91 |
|
|
} |
92 |
|
|
} |
93 |
|
|
|
94 |
✓✓ |
226239 |
if (chunk_info.next_chunk != NULL) { |
95 |
|
119584 |
unsigned offset_in_block = 0; |
96 |
|
119584 |
uint64_t cut_mark = 0; |
97 |
✓✓ |
245761 |
while ((cut_mark = chunk_detector->FindNextCutMark(input_block)) != 0) { |
98 |
✗✓ |
6593 |
assert(cut_mark >= chunk_info.offset + offset_in_block); |
99 |
|
6593 |
uint64_t cut_mark_in_block = cut_mark - chunk_info.offset; |
100 |
✗✓ |
6593 |
assert(cut_mark_in_block >= offset_in_block); |
101 |
✗✓ |
6593 |
assert(cut_mark_in_block <= input_block->size()); |
102 |
|
6593 |
unsigned tail_size = cut_mark_in_block - offset_in_block; |
103 |
|
|
|
104 |
✓✗ |
6593 |
if (tail_size > 0) { |
105 |
|
|
BlockItem *block_tail = |
106 |
|
6593 |
new BlockItem(chunk_info.output_tag_chunk, allocator_); |
107 |
|
6593 |
block_tail->SetFileItem(file_item); |
108 |
|
6593 |
block_tail->SetChunkItem(chunk_info.next_chunk); |
109 |
|
|
block_tail->MakeDataCopy(input_block->data() + offset_in_block, |
110 |
|
6593 |
tail_size); |
111 |
|
6593 |
tubes_out_->Dispatch(block_tail); |
112 |
|
|
} |
113 |
|
|
|
114 |
✗✓ |
6593 |
assert(cut_mark >= chunk_info.next_chunk->offset()); |
115 |
|
|
// If the cut mark happens to at the end of file, let the final |
116 |
|
|
// incoming stop block schedule dispatch of the chunk stop block |
117 |
✓✓ |
6593 |
if (cut_mark < file_item->size()) { |
118 |
|
|
chunk_info.next_chunk->set_size( |
119 |
|
6592 |
cut_mark - chunk_info.next_chunk->offset()); |
120 |
|
|
BlockItem *block_stop = |
121 |
|
6592 |
new BlockItem(chunk_info.output_tag_chunk, allocator_); |
122 |
|
6592 |
block_stop->SetFileItem(file_item); |
123 |
|
6592 |
block_stop->SetChunkItem(chunk_info.next_chunk); |
124 |
|
6592 |
block_stop->MakeStop(); |
125 |
|
6592 |
tubes_out_->Dispatch(block_stop); |
126 |
|
|
|
127 |
|
6592 |
chunk_info.next_chunk = new ChunkItem(file_item, cut_mark); |
128 |
|
6592 |
chunk_info.output_tag_chunk = atomic_xadd64(&tag_seq_, 1); |
129 |
|
|
} |
130 |
|
6593 |
offset_in_block = cut_mark_in_block; |
131 |
|
|
} |
132 |
|
119584 |
chunk_info.offset += offset_in_block; |
133 |
|
|
|
134 |
✗✓ |
119584 |
assert(input_block->size() >= offset_in_block); |
135 |
|
119584 |
unsigned tail_size = input_block->size() - offset_in_block; |
136 |
✓✓ |
119584 |
if (tail_size > 0) { |
137 |
|
|
BlockItem *block_tail = |
138 |
|
119370 |
new BlockItem(chunk_info.output_tag_chunk, allocator_); |
139 |
|
119370 |
block_tail->SetFileItem(file_item); |
140 |
|
119370 |
block_tail->SetChunkItem(chunk_info.next_chunk); |
141 |
|
|
block_tail->MakeDataCopy(input_block->data() + offset_in_block, |
142 |
|
119370 |
tail_size); |
143 |
|
119370 |
tubes_out_->Dispatch(block_tail); |
144 |
|
119370 |
chunk_info.offset += tail_size; |
145 |
|
|
} |
146 |
|
|
|
147 |
|
|
// Delete data from incoming block |
148 |
|
119584 |
input_block->Reset(); |
149 |
|
|
} |
150 |
|
|
|
151 |
|
226239 |
tag_map_[input_tag] = chunk_info; |
152 |
|
226239 |
break; |
153 |
|
|
|
154 |
|
|
default: |
155 |
|
|
abort(); |
156 |
|
|
} |
157 |
|
|
|
158 |
✓✗ |
226394 |
delete input_block; |
159 |
✓✓ |
226394 |
if (output_block_bulk) tubes_out_->Dispatch(output_block_bulk); |
160 |
|
226394 |
} |