Directory: | cvmfs/ |
---|---|
File: | cvmfs/ingestion/task_chunk.cc |
Date: | 2025-02-09 02:34:19 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 87 | 89 | 97.8% |
Branches: | 89 | 149 | 59.7% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #include <unistd.h> | ||
6 | |||
7 | #include <cassert> | ||
8 | |||
9 | #include "ingestion/task_chunk.h" | ||
10 | #include "util/exception.h" | ||
11 | |||
12 | /** | ||
13 | * The tags from the read stage in the pipeline and the tags given in the | ||
14 | * chunking stage can safely overlap. Nevertheless, debugging might be easier | ||
15 | * if they don't. So let's start with a high number. | ||
16 | */ | ||
17 | atomic_int64 TaskChunk::tag_seq_ = 2 << 28; | ||
18 | |||
19 | /** | ||
20 | * Consumes the stream of input blocks and produces new output blocks according | ||
21 | * to cut marks. The output blocks correspond to chunks. | ||
22 | */ | ||
23 | 559367 | void TaskChunk::Process(BlockItem *input_block) { | |
24 | 559367 | FileItem *file_item = input_block->file_item(); | |
25 | 559114 | int64_t input_tag = input_block->tag(); | |
26 |
2/4✓ Branch 0 taken 559574 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 559581 times.
✗ Branch 3 not taken.
|
559540 | assert((file_item != NULL) && (input_tag >= 0)); |
27 | |||
28 | 559581 | ChunkInfo chunk_info; | |
29 | // Do we see blocks of the file for the first time? | ||
30 |
3/4✓ Branch 1 taken 560752 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 249641 times.
✓ Branch 4 taken 311111 times.
|
559201 | if (!tag_map_.Lookup(input_tag, &chunk_info)) { |
31 | // We may have only regular chunks, only a bulk chunk, or both. We may | ||
32 | // end up in a situation where we produced only a single non-bulk chunk. | ||
33 | // This needs to be fixed up later in the pipeline by the write task. | ||
34 |
2/2✓ Branch 1 taken 15 times.
✓ Branch 2 taken 249614 times.
|
249641 | if (file_item->may_have_chunks()) { |
35 |
2/4✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 15 times.
✗ Branch 5 not taken.
|
15 | chunk_info.next_chunk = new ChunkItem(file_item, 0); |
36 | 15 | chunk_info.output_tag_chunk = atomic_xadd64(&tag_seq_, 1); | |
37 |
2/2✓ Branch 1 taken 11 times.
✓ Branch 2 taken 4 times.
|
15 | if (file_item->has_legacy_bulk_chunk()) { |
38 |
2/4✓ Branch 1 taken 11 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 11 times.
✗ Branch 5 not taken.
|
11 | chunk_info.bulk_chunk = new ChunkItem(file_item, 0); |
39 | } | ||
40 | } else { | ||
41 |
2/4✓ Branch 1 taken 249741 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 249685 times.
✗ Branch 5 not taken.
|
249614 | chunk_info.bulk_chunk = new ChunkItem(file_item, 0); |
42 | } | ||
43 | |||
44 |
1/2✓ Branch 0 taken 249706 times.
✗ Branch 1 not taken.
|
249700 | if (chunk_info.bulk_chunk != NULL) { |
45 |
1/2✓ Branch 1 taken 249516 times.
✗ Branch 2 not taken.
|
249706 | chunk_info.bulk_chunk->MakeBulkChunk(); |
46 | 249516 | chunk_info.bulk_chunk->set_size(file_item->size()); | |
47 | 249419 | chunk_info.output_tag_bulk = atomic_xadd64(&tag_seq_, 1); | |
48 | } | ||
49 |
1/2✓ Branch 1 taken 249489 times.
✗ Branch 2 not taken.
|
250009 | tag_map_.Insert(input_tag, chunk_info); |
50 | } | ||
51 |
3/4✓ Branch 0 taken 10276 times.
✓ Branch 1 taken 550324 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 10276 times.
|
560600 | assert((chunk_info.bulk_chunk != NULL) || (chunk_info.next_chunk != NULL)); |
52 | |||
53 | 560600 | BlockItem *output_block_bulk = NULL; | |
54 |
2/2✓ Branch 0 taken 551043 times.
✓ Branch 1 taken 9557 times.
|
560600 | if (chunk_info.bulk_chunk != NULL) { |
55 |
2/4✓ Branch 1 taken 551234 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 550334 times.
✗ Branch 5 not taken.
|
551043 | output_block_bulk = new BlockItem(chunk_info.output_tag_bulk, allocator_); |
56 |
1/2✓ Branch 1 taken 550393 times.
✗ Branch 2 not taken.
|
550334 | output_block_bulk->SetFileItem(file_item); |
57 |
1/2✓ Branch 1 taken 550695 times.
✗ Branch 2 not taken.
|
550393 | output_block_bulk->SetChunkItem(chunk_info.bulk_chunk); |
58 | } | ||
59 | |||
60 | 560252 | ChunkDetector *chunk_detector = file_item->chunk_detector(); | |
61 |
2/3✓ Branch 1 taken 249122 times.
✓ Branch 2 taken 312695 times.
✗ Branch 3 not taken.
|
560818 | switch (input_block->type()) { |
62 | 249122 | case BlockItem::kBlockStop: | |
63 | // End of the file, no more new chunks | ||
64 | 249122 | file_item->set_is_fully_chunked(); | |
65 |
2/4✓ Branch 0 taken 249810 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 249311 times.
✗ Branch 4 not taken.
|
249808 | if (output_block_bulk) output_block_bulk->MakeStop(); |
66 |
2/2✓ Branch 0 taken 15 times.
✓ Branch 1 taken 249294 times.
|
249309 | if (chunk_info.next_chunk != NULL) { |
67 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 15 times.
|
15 | assert(file_item->size() >= chunk_info.next_chunk->offset()); |
68 | 15 | chunk_info.next_chunk->set_size( | |
69 | 15 | file_item->size() - chunk_info.next_chunk->offset()); | |
70 | BlockItem *block_stop = | ||
71 |
2/4✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 15 times.
✗ Branch 5 not taken.
|
15 | new BlockItem(chunk_info.output_tag_chunk, allocator_); |
72 |
1/2✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
|
15 | block_stop->SetFileItem(file_item); |
73 |
1/2✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
|
15 | block_stop->SetChunkItem(chunk_info.next_chunk); |
74 |
1/2✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
|
15 | block_stop->MakeStop(); |
75 |
1/2✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
|
15 | tubes_out_->Dispatch(block_stop); |
76 | } | ||
77 |
1/2✓ Branch 1 taken 249004 times.
✗ Branch 2 not taken.
|
249309 | tag_map_.Erase(input_tag); |
78 | 249004 | break; | |
79 | |||
80 | 312695 | case BlockItem::kBlockData: | |
81 |
2/2✓ Branch 0 taken 302437 times.
✓ Branch 1 taken 10258 times.
|
312695 | if (output_block_bulk) { |
82 |
2/2✓ Branch 0 taken 27106 times.
✓ Branch 1 taken 275331 times.
|
302437 | if (chunk_info.next_chunk != NULL) { |
83 | // Reserve zero-copy for the regular chunk | ||
84 |
1/2✓ Branch 3 taken 27153 times.
✗ Branch 4 not taken.
|
27106 | output_block_bulk->MakeDataCopy(input_block->data(), |
85 | input_block->size()); | ||
86 | } else { | ||
87 | // There is only the bulk chunk, zero copy | ||
88 |
1/2✓ Branch 1 taken 275331 times.
✗ Branch 2 not taken.
|
275331 | output_block_bulk->MakeDataMove(input_block); |
89 | } | ||
90 | } | ||
91 | |||
92 |
2/2✓ Branch 0 taken 37404 times.
✓ Branch 1 taken 275338 times.
|
312742 | if (chunk_info.next_chunk != NULL) { |
93 | 37404 | unsigned offset_in_block = 0; | |
94 | 37404 | uint64_t cut_mark = 0; | |
95 |
3/4✓ Branch 1 taken 42802 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5399 times.
✓ Branch 4 taken 37403 times.
|
42803 | while ((cut_mark = chunk_detector->FindNextCutMark(input_block)) != 0) { |
96 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 5399 times.
|
5399 | assert(cut_mark >= chunk_info.offset + offset_in_block); |
97 | 5399 | uint64_t cut_mark_in_block = cut_mark - chunk_info.offset; | |
98 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 5399 times.
|
5399 | assert(cut_mark_in_block >= offset_in_block); |
99 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 5399 times.
|
5399 | assert(cut_mark_in_block <= input_block->size()); |
100 | 5399 | unsigned tail_size = cut_mark_in_block - offset_in_block; | |
101 | |||
102 |
1/2✓ Branch 0 taken 5399 times.
✗ Branch 1 not taken.
|
5399 | if (tail_size > 0) { |
103 | BlockItem *block_tail = | ||
104 |
2/4✓ Branch 1 taken 5399 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 5399 times.
✗ Branch 5 not taken.
|
5399 | new BlockItem(chunk_info.output_tag_chunk, allocator_); |
105 |
1/2✓ Branch 1 taken 5399 times.
✗ Branch 2 not taken.
|
5399 | block_tail->SetFileItem(file_item); |
106 |
1/2✓ Branch 1 taken 5399 times.
✗ Branch 2 not taken.
|
5399 | block_tail->SetChunkItem(chunk_info.next_chunk); |
107 |
1/2✓ Branch 2 taken 5399 times.
✗ Branch 3 not taken.
|
5399 | block_tail->MakeDataCopy(input_block->data() + offset_in_block, |
108 | tail_size); | ||
109 |
1/2✓ Branch 1 taken 5399 times.
✗ Branch 2 not taken.
|
5399 | tubes_out_->Dispatch(block_tail); |
110 | } | ||
111 | |||
112 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 5399 times.
|
5399 | assert(cut_mark >= chunk_info.next_chunk->offset()); |
113 | // If the cut mark happens to at the end of file, let the final | ||
114 | // incoming stop block schedule dispatch of the chunk stop block | ||
115 |
2/2✓ Branch 1 taken 5398 times.
✓ Branch 2 taken 1 times.
|
5399 | if (cut_mark < file_item->size()) { |
116 | 5398 | chunk_info.next_chunk->set_size( | |
117 | 5398 | cut_mark - chunk_info.next_chunk->offset()); | |
118 | BlockItem *block_stop = | ||
119 |
2/4✓ Branch 1 taken 5398 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 5398 times.
✗ Branch 5 not taken.
|
5398 | new BlockItem(chunk_info.output_tag_chunk, allocator_); |
120 |
1/2✓ Branch 1 taken 5398 times.
✗ Branch 2 not taken.
|
5398 | block_stop->SetFileItem(file_item); |
121 |
1/2✓ Branch 1 taken 5398 times.
✗ Branch 2 not taken.
|
5398 | block_stop->SetChunkItem(chunk_info.next_chunk); |
122 |
1/2✓ Branch 1 taken 5398 times.
✗ Branch 2 not taken.
|
5398 | block_stop->MakeStop(); |
123 |
1/2✓ Branch 1 taken 5398 times.
✗ Branch 2 not taken.
|
5398 | tubes_out_->Dispatch(block_stop); |
124 | |||
125 |
2/4✓ Branch 1 taken 5398 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 5398 times.
✗ Branch 5 not taken.
|
5398 | chunk_info.next_chunk = new ChunkItem(file_item, cut_mark); |
126 | 5398 | chunk_info.output_tag_chunk = atomic_xadd64(&tag_seq_, 1); | |
127 | } | ||
128 | 5399 | offset_in_block = cut_mark_in_block; | |
129 | } | ||
130 | 37403 | chunk_info.offset += offset_in_block; | |
131 | |||
132 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 37402 times.
|
37403 | assert(input_block->size() >= offset_in_block); |
133 | 37402 | unsigned tail_size = input_block->size() - offset_in_block; | |
134 |
2/2✓ Branch 0 taken 37346 times.
✓ Branch 1 taken 55 times.
|
37401 | if (tail_size > 0) { |
135 | BlockItem *block_tail = | ||
136 |
2/4✓ Branch 1 taken 37340 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 37333 times.
✗ Branch 5 not taken.
|
37346 | new BlockItem(chunk_info.output_tag_chunk, allocator_); |
137 |
1/2✓ Branch 1 taken 37334 times.
✗ Branch 2 not taken.
|
37333 | block_tail->SetFileItem(file_item); |
138 |
1/2✓ Branch 1 taken 37337 times.
✗ Branch 2 not taken.
|
37334 | block_tail->SetChunkItem(chunk_info.next_chunk); |
139 |
1/2✓ Branch 2 taken 37344 times.
✗ Branch 3 not taken.
|
37337 | block_tail->MakeDataCopy(input_block->data() + offset_in_block, |
140 | tail_size); | ||
141 |
1/2✓ Branch 1 taken 37346 times.
✗ Branch 2 not taken.
|
37344 | tubes_out_->Dispatch(block_tail); |
142 | 37346 | chunk_info.offset += tail_size; | |
143 | } | ||
144 | |||
145 | // Delete data from incoming block | ||
146 |
1/2✓ Branch 1 taken 37394 times.
✗ Branch 2 not taken.
|
37401 | input_block->Reset(); |
147 | } | ||
148 | |||
149 |
1/2✓ Branch 1 taken 313026 times.
✗ Branch 2 not taken.
|
312732 | tag_map_.Insert(input_tag, chunk_info); |
150 | 313026 | break; | |
151 | |||
152 | ✗ | default: | |
153 | ✗ | PANIC(NULL); | |
154 | } | ||
155 | |||
156 |
2/2✓ Branch 0 taken 560998 times.
✓ Branch 1 taken 1032 times.
|
562030 | delete input_block; |
157 |
3/4✓ Branch 0 taken 552327 times.
✓ Branch 1 taken 10276 times.
✓ Branch 3 taken 550129 times.
✗ Branch 4 not taken.
|
562603 | if (output_block_bulk) tubes_out_->Dispatch(output_block_bulk); |
158 | 560405 | } | |
159 |