Directory: | cvmfs/ |
---|---|
File: | cvmfs/ingestion/task_chunk.cc |
Date: | 2024-04-28 02:33:07 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 87 | 89 | 97.8% |
Branches: | 91 | 149 | 61.1% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #include <unistd.h> | ||
6 | |||
7 | #include <cassert> | ||
8 | |||
9 | #include "ingestion/task_chunk.h" | ||
10 | #include "util/exception.h" | ||
11 | |||
12 | /** | ||
13 | * The tags from the read stage in the pipeline and the tags given in the | ||
14 | * chunking stage can safely overlap. Nevertheless, debugging might be easier | ||
15 | * if they don't. So let's start with a high number. | ||
16 | */ | ||
17 | atomic_int64 TaskChunk::tag_seq_ = 2 << 28; | ||
18 | |||
19 | /** | ||
20 | * Consumes the stream of input blocks and produces new output blocks according | ||
21 | * to cut marks. The output blocks correspond to chunks. | ||
22 | */ | ||
23 | 560115 | void TaskChunk::Process(BlockItem *input_block) { | |
24 | 560115 | FileItem *file_item = input_block->file_item(); | |
25 | 562399 | int64_t input_tag = input_block->tag(); | |
26 |
3/4✓ Branch 0 taken 563228 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 561596 times.
✓ Branch 3 taken 1632 times.
|
562274 | assert((file_item != NULL) && (input_tag >= 0)); |
27 | |||
28 | 561596 | ChunkInfo chunk_info; | |
29 | // Do we see blocks of the file for the first time? | ||
30 |
3/4✓ Branch 1 taken 561830 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 249842 times.
✓ Branch 4 taken 311988 times.
|
561054 | if (!tag_map_.Lookup(input_tag, &chunk_info)) { |
31 | // We may have only regular chunks, only a bulk chunk, or both. We may | ||
32 | // end up in a situation where we produced only a single non-bulk chunk. | ||
33 | // This needs to be fixed up later in the pipeline by the write task. | ||
34 |
2/2✓ Branch 1 taken 15 times.
✓ Branch 2 taken 249729 times.
|
249842 | if (file_item->may_have_chunks()) { |
35 |
2/4✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 15 times.
✗ Branch 5 not taken.
|
15 | chunk_info.next_chunk = new ChunkItem(file_item, 0); |
36 | 15 | chunk_info.output_tag_chunk = atomic_xadd64(&tag_seq_, 1); | |
37 |
2/2✓ Branch 1 taken 11 times.
✓ Branch 2 taken 4 times.
|
15 | if (file_item->has_legacy_bulk_chunk()) { |
38 |
2/4✓ Branch 1 taken 11 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 11 times.
✗ Branch 5 not taken.
|
11 | chunk_info.bulk_chunk = new ChunkItem(file_item, 0); |
39 | } | ||
40 | } else { | ||
41 |
2/4✓ Branch 1 taken 249856 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 249742 times.
✗ Branch 5 not taken.
|
249729 | chunk_info.bulk_chunk = new ChunkItem(file_item, 0); |
42 | } | ||
43 | |||
44 |
2/2✓ Branch 0 taken 249754 times.
✓ Branch 1 taken 3 times.
|
249757 | if (chunk_info.bulk_chunk != NULL) { |
45 |
1/2✓ Branch 1 taken 249645 times.
✗ Branch 2 not taken.
|
249754 | chunk_info.bulk_chunk->MakeBulkChunk(); |
46 | 249645 | chunk_info.bulk_chunk->set_size(file_item->size()); | |
47 | 249615 | chunk_info.output_tag_bulk = atomic_xadd64(&tag_seq_, 1); | |
48 | } | ||
49 |
1/2✓ Branch 1 taken 249617 times.
✗ Branch 2 not taken.
|
250009 | tag_map_.Insert(input_tag, chunk_info); |
50 | } | ||
51 |
3/4✓ Branch 0 taken 10276 times.
✓ Branch 1 taken 551329 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 10276 times.
|
561605 | assert((chunk_info.bulk_chunk != NULL) || (chunk_info.next_chunk != NULL)); |
52 | |||
53 | 561605 | BlockItem *output_block_bulk = NULL; | |
54 |
2/2✓ Branch 0 taken 552088 times.
✓ Branch 1 taken 9517 times.
|
561605 | if (chunk_info.bulk_chunk != NULL) { |
55 |
2/4✓ Branch 1 taken 552162 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 551487 times.
✗ Branch 5 not taken.
|
552088 | output_block_bulk = new BlockItem(chunk_info.output_tag_bulk, allocator_); |
56 |
1/2✓ Branch 1 taken 551309 times.
✗ Branch 2 not taken.
|
551487 | output_block_bulk->SetFileItem(file_item); |
57 |
1/2✓ Branch 1 taken 551321 times.
✗ Branch 2 not taken.
|
551309 | output_block_bulk->SetChunkItem(chunk_info.bulk_chunk); |
58 | } | ||
59 | |||
60 | 560838 | ChunkDetector *chunk_detector = file_item->chunk_detector(); | |
61 |
2/3✓ Branch 1 taken 249313 times.
✓ Branch 2 taken 312856 times.
✗ Branch 3 not taken.
|
561570 | switch (input_block->type()) { |
62 | 249313 | case BlockItem::kBlockStop: | |
63 | // End of the file, no more new chunks | ||
64 | 249313 | file_item->set_is_fully_chunked(); | |
65 |
2/4✓ Branch 0 taken 249805 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 249477 times.
✗ Branch 4 not taken.
|
249804 | if (output_block_bulk) output_block_bulk->MakeStop(); |
66 |
2/2✓ Branch 0 taken 15 times.
✓ Branch 1 taken 249461 times.
|
249476 | if (chunk_info.next_chunk != NULL) { |
67 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 15 times.
|
15 | assert(file_item->size() >= chunk_info.next_chunk->offset()); |
68 | 15 | chunk_info.next_chunk->set_size( | |
69 | 15 | file_item->size() - chunk_info.next_chunk->offset()); | |
70 | BlockItem *block_stop = | ||
71 |
2/4✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 15 times.
✗ Branch 5 not taken.
|
15 | new BlockItem(chunk_info.output_tag_chunk, allocator_); |
72 |
1/2✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
|
15 | block_stop->SetFileItem(file_item); |
73 |
1/2✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
|
15 | block_stop->SetChunkItem(chunk_info.next_chunk); |
74 |
1/2✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
|
15 | block_stop->MakeStop(); |
75 |
1/2✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
|
15 | tubes_out_->Dispatch(block_stop); |
76 | } | ||
77 |
1/2✓ Branch 1 taken 249443 times.
✗ Branch 2 not taken.
|
249476 | tag_map_.Erase(input_tag); |
78 | 249443 | break; | |
79 | |||
80 | 312856 | case BlockItem::kBlockData: | |
81 |
2/2✓ Branch 0 taken 302568 times.
✓ Branch 1 taken 10288 times.
|
312856 | if (output_block_bulk) { |
82 |
2/2✓ Branch 0 taken 27114 times.
✓ Branch 1 taken 275454 times.
|
302568 | if (chunk_info.next_chunk != NULL) { |
83 | // Reserve zero-copy for the regular chunk | ||
84 |
1/2✓ Branch 3 taken 27113 times.
✗ Branch 4 not taken.
|
27114 | output_block_bulk->MakeDataCopy(input_block->data(), |
85 | input_block->size()); | ||
86 | } else { | ||
87 | // There is only the bulk chunk, zero copy | ||
88 |
1/2✓ Branch 1 taken 275491 times.
✗ Branch 2 not taken.
|
275454 | output_block_bulk->MakeDataMove(input_block); |
89 | } | ||
90 | } | ||
91 | |||
92 |
2/2✓ Branch 0 taken 37399 times.
✓ Branch 1 taken 275493 times.
|
312892 | if (chunk_info.next_chunk != NULL) { |
93 | 37399 | unsigned offset_in_block = 0; | |
94 | 37399 | uint64_t cut_mark = 0; | |
95 |
3/4✓ Branch 1 taken 42783 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5399 times.
✓ Branch 4 taken 37384 times.
|
42798 | while ((cut_mark = chunk_detector->FindNextCutMark(input_block)) != 0) { |
96 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 5399 times.
|
5399 | assert(cut_mark >= chunk_info.offset + offset_in_block); |
97 | 5399 | uint64_t cut_mark_in_block = cut_mark - chunk_info.offset; | |
98 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 5399 times.
|
5399 | assert(cut_mark_in_block >= offset_in_block); |
99 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 5399 times.
|
5399 | assert(cut_mark_in_block <= input_block->size()); |
100 | 5399 | unsigned tail_size = cut_mark_in_block - offset_in_block; | |
101 | |||
102 |
1/2✓ Branch 0 taken 5399 times.
✗ Branch 1 not taken.
|
5399 | if (tail_size > 0) { |
103 | BlockItem *block_tail = | ||
104 |
2/4✓ Branch 1 taken 5399 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 5399 times.
✗ Branch 5 not taken.
|
5399 | new BlockItem(chunk_info.output_tag_chunk, allocator_); |
105 |
1/2✓ Branch 1 taken 5399 times.
✗ Branch 2 not taken.
|
5399 | block_tail->SetFileItem(file_item); |
106 |
1/2✓ Branch 1 taken 5399 times.
✗ Branch 2 not taken.
|
5399 | block_tail->SetChunkItem(chunk_info.next_chunk); |
107 |
1/2✓ Branch 2 taken 5399 times.
✗ Branch 3 not taken.
|
5399 | block_tail->MakeDataCopy(input_block->data() + offset_in_block, |
108 | tail_size); | ||
109 |
1/2✓ Branch 1 taken 5399 times.
✗ Branch 2 not taken.
|
5399 | tubes_out_->Dispatch(block_tail); |
110 | } | ||
111 | |||
112 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 5399 times.
|
5399 | assert(cut_mark >= chunk_info.next_chunk->offset()); |
113 | // If the cut mark happens to at the end of file, let the final | ||
114 | // incoming stop block schedule dispatch of the chunk stop block | ||
115 |
2/2✓ Branch 1 taken 5398 times.
✓ Branch 2 taken 1 times.
|
5399 | if (cut_mark < file_item->size()) { |
116 | 5398 | chunk_info.next_chunk->set_size( | |
117 | 5398 | cut_mark - chunk_info.next_chunk->offset()); | |
118 | BlockItem *block_stop = | ||
119 |
2/4✓ Branch 1 taken 5398 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 5398 times.
✗ Branch 5 not taken.
|
5398 | new BlockItem(chunk_info.output_tag_chunk, allocator_); |
120 |
1/2✓ Branch 1 taken 5398 times.
✗ Branch 2 not taken.
|
5398 | block_stop->SetFileItem(file_item); |
121 |
1/2✓ Branch 1 taken 5398 times.
✗ Branch 2 not taken.
|
5398 | block_stop->SetChunkItem(chunk_info.next_chunk); |
122 |
1/2✓ Branch 1 taken 5398 times.
✗ Branch 2 not taken.
|
5398 | block_stop->MakeStop(); |
123 |
1/2✓ Branch 1 taken 5398 times.
✗ Branch 2 not taken.
|
5398 | tubes_out_->Dispatch(block_stop); |
124 | |||
125 |
2/4✓ Branch 1 taken 5398 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 5398 times.
✗ Branch 5 not taken.
|
5398 | chunk_info.next_chunk = new ChunkItem(file_item, cut_mark); |
126 | 5398 | chunk_info.output_tag_chunk = atomic_xadd64(&tag_seq_, 1); | |
127 | } | ||
128 | 5399 | offset_in_block = cut_mark_in_block; | |
129 | } | ||
130 | 37384 | chunk_info.offset += offset_in_block; | |
131 | |||
132 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 37384 times.
|
37384 | assert(input_block->size() >= offset_in_block); |
133 | 37384 | unsigned tail_size = input_block->size() - offset_in_block; | |
134 |
2/2✓ Branch 0 taken 37328 times.
✓ Branch 1 taken 55 times.
|
37383 | if (tail_size > 0) { |
135 | BlockItem *block_tail = | ||
136 |
2/4✓ Branch 1 taken 37334 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 37330 times.
✗ Branch 5 not taken.
|
37328 | new BlockItem(chunk_info.output_tag_chunk, allocator_); |
137 |
1/2✓ Branch 1 taken 37328 times.
✗ Branch 2 not taken.
|
37330 | block_tail->SetFileItem(file_item); |
138 |
1/2✓ Branch 1 taken 37330 times.
✗ Branch 2 not taken.
|
37328 | block_tail->SetChunkItem(chunk_info.next_chunk); |
139 |
1/2✓ Branch 2 taken 37343 times.
✗ Branch 3 not taken.
|
37330 | block_tail->MakeDataCopy(input_block->data() + offset_in_block, |
140 | tail_size); | ||
141 |
1/2✓ Branch 1 taken 37338 times.
✗ Branch 2 not taken.
|
37343 | tubes_out_->Dispatch(block_tail); |
142 | 37338 | chunk_info.offset += tail_size; | |
143 | } | ||
144 | |||
145 | // Delete data from incoming block | ||
146 |
1/2✓ Branch 1 taken 37406 times.
✗ Branch 2 not taken.
|
37393 | input_block->Reset(); |
147 | } | ||
148 | |||
149 |
1/2✓ Branch 1 taken 313323 times.
✗ Branch 2 not taken.
|
312899 | tag_map_.Insert(input_tag, chunk_info); |
150 | 313323 | break; | |
151 | |||
152 | ✗ | default: | |
153 | ✗ | PANIC(NULL); | |
154 | } | ||
155 | |||
156 |
2/2✓ Branch 0 taken 562341 times.
✓ Branch 1 taken 425 times.
|
562766 | delete input_block; |
157 |
3/4✓ Branch 0 taken 552643 times.
✓ Branch 1 taken 10276 times.
✓ Branch 3 taken 550578 times.
✗ Branch 4 not taken.
|
562919 | if (output_block_bulk) tubes_out_->Dispatch(output_block_bulk); |
158 | 560854 | } | |
159 |