Directory: | cvmfs/ |
---|---|
File: | cvmfs/ingestion/pipeline.cc |
Date: | 2025-07-06 02:35:01 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 154 | 160 | 96.2% |
Branches: | 107 | 199 | 53.8% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | |||
6 | #include "pipeline.h" | ||
7 | |||
8 | #include <algorithm> | ||
9 | #include <cstdlib> | ||
10 | |||
11 | #include "ingestion/task_chunk.h" | ||
12 | #include "ingestion/task_compress.h" | ||
13 | #include "ingestion/task_hash.h" | ||
14 | #include "ingestion/task_read.h" | ||
15 | #include "ingestion/task_register.h" | ||
16 | #include "ingestion/task_write.h" | ||
17 | #include "upload_facility.h" | ||
18 | #include "upload_spooler_definition.h" | ||
19 | #include "util/concurrency.h" | ||
20 | #include "util/exception.h" | ||
21 | #include "util/platform.h" | ||
22 | #include "util/string.h" | ||
23 | |||
24 | const uint64_t IngestionPipeline::kMaxPipelineMem = 1024 * 1024 * 1024; | ||
25 | |||
26 | 1226 | IngestionPipeline::IngestionPipeline( | |
27 | upload::AbstractUploader *uploader, | ||
28 | 1226 | const upload::SpoolerDefinition &spooler_definition) | |
29 | 1226 | : compression_algorithm_(spooler_definition.compression_alg) | |
30 | 1226 | , hash_algorithm_(spooler_definition.hash_algorithm) | |
31 | 1226 | , generate_legacy_bulk_chunks_( | |
32 | 1226 | spooler_definition.generate_legacy_bulk_chunks) | |
33 | 1226 | , chunking_enabled_(spooler_definition.use_file_chunking) | |
34 | 1226 | , minimal_chunk_size_(spooler_definition.min_file_chunk_size) | |
35 | 1226 | , average_chunk_size_(spooler_definition.avg_file_chunk_size) | |
36 | 1226 | , maximal_chunk_size_(spooler_definition.max_file_chunk_size) | |
37 | 1226 | , spawned_(false) | |
38 | 1226 | , uploader_(uploader) | |
39 |
4/8✓ Branch 2 taken 1226 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1226 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 1226 times.
✗ Branch 9 not taken.
✓ Branch 22 taken 1226 times.
✗ Branch 23 not taken.
|
1226 | , tube_ctr_inflight_pre_(kMaxFilesInFlight) { |
40 |
1/2✓ Branch 1 taken 1226 times.
✗ Branch 2 not taken.
|
1226 | const unsigned nfork_base = std::max(1U, GetNumberOfCpuCores() / 8); |
41 | |||
42 |
2/2✓ Branch 0 taken 9808 times.
✓ Branch 1 taken 1226 times.
|
11034 | for (unsigned i = 0; i < nfork_base * kNforkRegister; ++i) { |
43 |
2/4✓ Branch 1 taken 9808 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 9808 times.
✗ Branch 5 not taken.
|
9808 | Tube<FileItem> *tube = new Tube<FileItem>(); |
44 |
1/2✓ Branch 1 taken 9808 times.
✗ Branch 2 not taken.
|
9808 | tubes_register_.TakeTube(tube); |
45 | TaskRegister *task = new TaskRegister(tube, &tube_ctr_inflight_pre_, | ||
46 |
2/4✓ Branch 1 taken 9808 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 9808 times.
✗ Branch 5 not taken.
|
9808 | &tube_ctr_inflight_post_); |
47 |
1/2✓ Branch 1 taken 9808 times.
✗ Branch 2 not taken.
|
9808 | task->RegisterListener(&IngestionPipeline::OnFileProcessed, this); |
48 |
1/2✓ Branch 1 taken 9808 times.
✗ Branch 2 not taken.
|
9808 | tasks_register_.TakeConsumer(task); |
49 | } | ||
50 | 1226 | tubes_register_.Activate(); | |
51 | |||
52 |
2/2✓ Branch 0 taken 9808 times.
✓ Branch 1 taken 1226 times.
|
11034 | for (unsigned i = 0; i < nfork_base * kNforkWrite; ++i) { |
53 |
2/4✓ Branch 1 taken 9808 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 9808 times.
✗ Branch 5 not taken.
|
9808 | Tube<BlockItem> *t = new Tube<BlockItem>(); |
54 |
1/2✓ Branch 1 taken 9808 times.
✗ Branch 2 not taken.
|
9808 | tubes_write_.TakeTube(t); |
55 |
3/6✓ Branch 1 taken 9808 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 9808 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 9808 times.
✗ Branch 8 not taken.
|
9808 | tasks_write_.TakeConsumer(new TaskWrite(t, &tubes_register_, uploader_)); |
56 | } | ||
57 | 1226 | tubes_write_.Activate(); | |
58 | |||
59 |
2/2✓ Branch 0 taken 19616 times.
✓ Branch 1 taken 1226 times.
|
20842 | for (unsigned i = 0; i < nfork_base * kNforkHash; ++i) { |
60 |
2/4✓ Branch 1 taken 19616 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 19616 times.
✗ Branch 5 not taken.
|
19616 | Tube<BlockItem> *t = new Tube<BlockItem>(); |
61 |
1/2✓ Branch 1 taken 19616 times.
✗ Branch 2 not taken.
|
19616 | tubes_hash_.TakeTube(t); |
62 |
3/6✓ Branch 1 taken 19616 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 19616 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 19616 times.
✗ Branch 8 not taken.
|
19616 | tasks_hash_.TakeConsumer(new TaskHash(t, &tubes_write_)); |
63 | } | ||
64 | 1226 | tubes_hash_.Activate(); | |
65 | |||
66 |
2/2✓ Branch 0 taken 39232 times.
✓ Branch 1 taken 1226 times.
|
40458 | for (unsigned i = 0; i < nfork_base * kNforkCompress; ++i) { |
67 |
2/4✓ Branch 1 taken 39232 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 39232 times.
✗ Branch 5 not taken.
|
39232 | Tube<BlockItem> *t = new Tube<BlockItem>(); |
68 |
1/2✓ Branch 1 taken 39232 times.
✗ Branch 2 not taken.
|
39232 | tubes_compress_.TakeTube(t); |
69 |
1/2✓ Branch 1 taken 39232 times.
✗ Branch 2 not taken.
|
39232 | tasks_compress_.TakeConsumer( |
70 |
2/4✓ Branch 1 taken 39232 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 39232 times.
✗ Branch 5 not taken.
|
39232 | new TaskCompress(t, &tubes_hash_, &item_allocator_)); |
71 | } | ||
72 | 1226 | tubes_compress_.Activate(); | |
73 | |||
74 |
2/2✓ Branch 0 taken 9808 times.
✓ Branch 1 taken 1226 times.
|
11034 | for (unsigned i = 0; i < nfork_base * kNforkChunk; ++i) { |
75 |
2/4✓ Branch 1 taken 9808 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 9808 times.
✗ Branch 5 not taken.
|
9808 | Tube<BlockItem> *t = new Tube<BlockItem>(); |
76 |
1/2✓ Branch 1 taken 9808 times.
✗ Branch 2 not taken.
|
9808 | tubes_chunk_.TakeTube(t); |
77 |
1/2✓ Branch 1 taken 9808 times.
✗ Branch 2 not taken.
|
9808 | tasks_chunk_.TakeConsumer( |
78 |
2/4✓ Branch 1 taken 9808 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 9808 times.
✗ Branch 5 not taken.
|
9808 | new TaskChunk(t, &tubes_compress_, &item_allocator_)); |
79 | } | ||
80 | 1226 | tubes_chunk_.Activate(); | |
81 | |||
82 | 1226 | uint64_t high = kMaxPipelineMem; | |
83 | 1226 | high = std::min(high, platform_memsize() / 5); | |
84 | 1226 | char *fixed_limit_mb = getenv("_CVMFS_SERVER_PIPELINE_MB"); | |
85 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1226 times.
|
1226 | if (fixed_limit_mb != NULL) { |
86 | ✗ | high = String2Uint64(fixed_limit_mb) * 1024 * 1024; | |
87 | } | ||
88 | 1226 | const uint64_t low = (high * 2) / 3; | |
89 |
1/2✓ Branch 1 taken 1226 times.
✗ Branch 2 not taken.
|
1226 | LogCvmfs(kLogCvmfs, kLogDebug, |
90 | "pipeline memory thresholds %" PRIu64 "/%" PRIu64 " M", | ||
91 | low / (1024 * 1024), high / (1024 * 1024)); | ||
92 |
2/2✓ Branch 0 taken 78464 times.
✓ Branch 1 taken 1226 times.
|
79690 | for (unsigned i = 0; i < nfork_base * kNforkRead; ++i) { |
93 | TaskRead *task_read = new TaskRead(&tube_input_, &tubes_chunk_, | ||
94 |
2/4✓ Branch 1 taken 78464 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 78464 times.
✗ Branch 5 not taken.
|
78464 | &item_allocator_); |
95 |
1/2✓ Branch 1 taken 78464 times.
✗ Branch 2 not taken.
|
78464 | task_read->SetWatermarks(low, high); |
96 |
1/2✓ Branch 1 taken 78464 times.
✗ Branch 2 not taken.
|
78464 | tasks_read_.TakeConsumer(task_read); |
97 | } | ||
98 | 1226 | } | |
99 | |||
100 | |||
101 | 3880 | IngestionPipeline::~IngestionPipeline() { | |
102 |
1/2✓ Branch 0 taken 1225 times.
✗ Branch 1 not taken.
|
2450 | if (spawned_) { |
103 | 2450 | tasks_read_.Terminate(); | |
104 | 2450 | tasks_chunk_.Terminate(); | |
105 | 2450 | tasks_compress_.Terminate(); | |
106 | 2450 | tasks_hash_.Terminate(); | |
107 | 2450 | tasks_write_.Terminate(); | |
108 | 2450 | tasks_register_.Terminate(); | |
109 | } | ||
110 | 3880 | } | |
111 | |||
112 | |||
113 | 7498961 | void IngestionPipeline::OnFileProcessed( | |
114 | const upload::SpoolerResult &spooler_result) { | ||
115 | 7498961 | NotifyListeners(spooler_result); | |
116 | 7498061 | } | |
117 | |||
118 | |||
119 | 7502201 | void IngestionPipeline::Process(IngestionSource *source, | |
120 | bool allow_chunking, | ||
121 | shash::Suffix hash_suffix) { | ||
122 | FileItem *file_item = new FileItem(source, | ||
123 | 7502201 | minimal_chunk_size_, | |
124 | 7502201 | average_chunk_size_, | |
125 | 7502201 | maximal_chunk_size_, | |
126 | 7502201 | compression_algorithm_, | |
127 | 7502201 | hash_algorithm_, | |
128 | hash_suffix, | ||
129 |
1/2✓ Branch 0 taken 7500700 times.
✗ Branch 1 not taken.
|
7502201 | allow_chunking && chunking_enabled_, |
130 |
3/4✓ Branch 1 taken 7500700 times.
✓ Branch 2 taken 1501 times.
✓ Branch 4 taken 7502201 times.
✗ Branch 5 not taken.
|
15004402 | generate_legacy_bulk_chunks_); |
131 | 7502201 | tube_ctr_inflight_post_.EnqueueBack(file_item); | |
132 | 7502201 | tube_ctr_inflight_pre_.EnqueueBack(file_item); | |
133 | 7502201 | tube_input_.EnqueueBack(file_item); | |
134 | 7502201 | } | |
135 | |||
136 | |||
137 | 1226 | void IngestionPipeline::Spawn() { | |
138 | 1226 | tasks_register_.Spawn(); | |
139 | 1226 | tasks_write_.Spawn(); | |
140 | 1226 | tasks_hash_.Spawn(); | |
141 | 1226 | tasks_compress_.Spawn(); | |
142 | 1226 | tasks_chunk_.Spawn(); | |
143 | 1226 | tasks_read_.Spawn(); | |
144 | 1226 | spawned_ = true; | |
145 | 1226 | } | |
146 | |||
147 | |||
148 | 1812 | void IngestionPipeline::WaitFor() { tube_ctr_inflight_post_.Wait(); } | |
149 | |||
150 | |||
151 | //------------------------------------------------------------------------------ | ||
152 | |||
153 | |||
154 | 5 | void TaskScrubbingCallback::Process(BlockItem *block_item) { | |
155 | 5 | FileItem *file_item = block_item->file_item(); | |
156 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 5 times.
|
5 | assert(file_item != NULL); |
157 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 5 times.
|
5 | assert(!file_item->path().empty()); |
158 | 5 | ChunkItem *chunk_item = block_item->chunk_item(); | |
159 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 5 times.
|
5 | assert(chunk_item != NULL); |
160 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
|
5 | assert(chunk_item->is_bulk_chunk()); |
161 | |||
162 |
1/3✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
✗ Branch 3 not taken.
|
5 | switch (block_item->type()) { |
163 | ✗ | case BlockItem::kBlockData: | |
164 | ✗ | delete block_item; | |
165 | ✗ | break; | |
166 | |||
167 | 5 | case BlockItem::kBlockStop: | |
168 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 5 times.
|
5 | assert(!chunk_item->hash_ptr()->IsNull()); |
169 |
1/2✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
|
5 | NotifyListeners( |
170 |
1/2✓ Branch 3 taken 5 times.
✗ Branch 4 not taken.
|
10 | ScrubbingResult(file_item->path(), *chunk_item->hash_ptr())); |
171 |
1/2✓ Branch 0 taken 5 times.
✗ Branch 1 not taken.
|
5 | delete block_item; |
172 |
1/2✓ Branch 0 taken 5 times.
✗ Branch 1 not taken.
|
5 | delete chunk_item; |
173 |
1/2✓ Branch 0 taken 5 times.
✗ Branch 1 not taken.
|
5 | delete file_item; |
174 | 5 | tube_counter_->PopFront(); | |
175 | 5 | break; | |
176 | |||
177 | ✗ | default: | |
178 | ✗ | PANIC(NULL); | |
179 | } | ||
180 | 5 | } | |
181 | |||
182 | |||
183 | //------------------------------------------------------------------------------ | ||
184 | |||
185 | |||
186 | 5 | ScrubbingPipeline::ScrubbingPipeline() | |
187 |
3/6✓ Branch 2 taken 5 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 5 times.
✗ Branch 6 not taken.
✓ Branch 15 taken 5 times.
✗ Branch 16 not taken.
|
5 | : spawned_(false), tube_counter_(kMaxFilesInFlight) { |
188 |
1/2✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
|
5 | const unsigned nfork_base = std::max(1U, GetNumberOfCpuCores() / 8); |
189 | |||
190 |
2/2✓ Branch 0 taken 40 times.
✓ Branch 1 taken 5 times.
|
45 | for (unsigned i = 0; i < nfork_base * kNforkScrubbingCallback; ++i) { |
191 |
2/4✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 40 times.
✗ Branch 5 not taken.
|
40 | Tube<BlockItem> *tube = new Tube<BlockItem>(); |
192 |
1/2✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
|
40 | tubes_scrubbing_callback_.TakeTube(tube); |
193 | TaskScrubbingCallback *task = new TaskScrubbingCallback(tube, | ||
194 |
2/4✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 40 times.
✗ Branch 5 not taken.
|
40 | &tube_counter_); |
195 |
1/2✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
|
40 | task->RegisterListener(&ScrubbingPipeline::OnFileProcessed, this); |
196 |
1/2✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
|
40 | tasks_scrubbing_callback_.TakeConsumer(task); |
197 | } | ||
198 | 5 | tubes_scrubbing_callback_.Activate(); | |
199 | |||
200 |
2/2✓ Branch 0 taken 80 times.
✓ Branch 1 taken 5 times.
|
85 | for (unsigned i = 0; i < nfork_base * kNforkHash; ++i) { |
201 |
2/4✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 80 times.
✗ Branch 5 not taken.
|
80 | Tube<BlockItem> *t = new Tube<BlockItem>(); |
202 |
1/2✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
|
80 | tubes_hash_.TakeTube(t); |
203 |
3/6✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 80 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 80 times.
✗ Branch 8 not taken.
|
80 | tasks_hash_.TakeConsumer(new TaskHash(t, &tubes_scrubbing_callback_)); |
204 | } | ||
205 | 5 | tubes_hash_.Activate(); | |
206 | |||
207 |
2/2✓ Branch 0 taken 40 times.
✓ Branch 1 taken 5 times.
|
45 | for (unsigned i = 0; i < nfork_base * kNforkChunk; ++i) { |
208 |
2/4✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 40 times.
✗ Branch 5 not taken.
|
40 | Tube<BlockItem> *t = new Tube<BlockItem>(); |
209 |
1/2✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
|
40 | tubes_chunk_.TakeTube(t); |
210 |
3/6✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 40 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 40 times.
✗ Branch 8 not taken.
|
40 | tasks_chunk_.TakeConsumer(new TaskChunk(t, &tubes_hash_, &item_allocator_)); |
211 | } | ||
212 | 5 | tubes_chunk_.Activate(); | |
213 | |||
214 |
2/2✓ Branch 0 taken 320 times.
✓ Branch 1 taken 5 times.
|
325 | for (unsigned i = 0; i < nfork_base * kNforkRead; ++i) { |
215 | TaskRead *task_read = new TaskRead(&tube_input_, &tubes_chunk_, | ||
216 |
2/4✓ Branch 1 taken 320 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 320 times.
✗ Branch 5 not taken.
|
320 | &item_allocator_); |
217 |
1/2✓ Branch 1 taken 320 times.
✗ Branch 2 not taken.
|
320 | task_read->SetWatermarks(kMemLowWatermark, kMemHighWatermark); |
218 |
1/2✓ Branch 1 taken 320 times.
✗ Branch 2 not taken.
|
320 | tasks_read_.TakeConsumer(task_read); |
219 | } | ||
220 | 5 | } | |
221 | |||
222 | |||
223 | 20 | ScrubbingPipeline::~ScrubbingPipeline() { | |
224 |
1/2✓ Branch 0 taken 5 times.
✗ Branch 1 not taken.
|
10 | if (spawned_) { |
225 | 10 | tasks_read_.Terminate(); | |
226 | 10 | tasks_chunk_.Terminate(); | |
227 | 10 | tasks_hash_.Terminate(); | |
228 | 10 | tasks_scrubbing_callback_.Terminate(); | |
229 | } | ||
230 | 20 | } | |
231 | |||
232 | |||
233 | 5 | void ScrubbingPipeline::OnFileProcessed( | |
234 | const ScrubbingResult &scrubbing_result) { | ||
235 | 5 | NotifyListeners(scrubbing_result); | |
236 | 5 | } | |
237 | |||
238 | |||
239 | 5 | void ScrubbingPipeline::Process(IngestionSource *source, | |
240 | shash::Algorithms hash_algorithm, | ||
241 | shash::Suffix hash_suffix) { | ||
242 | FileItem *file_item = new FileItem(source, 0, 0, 0, zlib::kNoCompression, | ||
243 | hash_algorithm, hash_suffix, | ||
244 | false, /* may_have_chunks */ | ||
245 |
1/2✓ Branch 2 taken 5 times.
✗ Branch 3 not taken.
|
5 | true /* hash_legacy_bulk_chunk */); |
246 | 5 | tube_counter_.EnqueueBack(file_item); | |
247 | 5 | tube_input_.EnqueueBack(file_item); | |
248 | 5 | } | |
249 | |||
250 | |||
251 | 5 | void ScrubbingPipeline::Spawn() { | |
252 | 5 | tasks_scrubbing_callback_.Spawn(); | |
253 | 5 | tasks_hash_.Spawn(); | |
254 | 5 | tasks_chunk_.Spawn(); | |
255 | 5 | tasks_read_.Spawn(); | |
256 | 5 | spawned_ = true; | |
257 | 5 | } | |
258 | |||
259 | |||
260 | 5 | void ScrubbingPipeline::WaitFor() { tube_counter_.Wait(); } | |
261 |