GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/ingestion/pipeline.cc
Date: 2026-02-22 02:35:58
Exec Total Coverage
Lines: 154 160 96.2%
Branches: 107 199 53.8%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5
6 #include "pipeline.h"
7
8 #include <algorithm>
9 #include <cstdlib>
10
11 #include "ingestion/task_chunk.h"
12 #include "ingestion/task_compress.h"
13 #include "ingestion/task_hash.h"
14 #include "ingestion/task_read.h"
15 #include "ingestion/task_register.h"
16 #include "ingestion/task_write.h"
17 #include "upload_facility.h"
18 #include "upload_spooler_definition.h"
19 #include "util/concurrency.h"
20 #include "util/exception.h"
21 #include "util/platform.h"
22 #include "util/string.h"
23
24 const uint64_t IngestionPipeline::kMaxPipelineMem = 1024 * 1024 * 1024;
25
26 1092 IngestionPipeline::IngestionPipeline(
27 upload::AbstractUploader *uploader,
28 1092 const upload::SpoolerDefinition &spooler_definition)
29 1092 : compression_algorithm_(spooler_definition.compression_alg)
30 1092 , hash_algorithm_(spooler_definition.hash_algorithm)
31 1092 , generate_legacy_bulk_chunks_(
32 1092 spooler_definition.generate_legacy_bulk_chunks)
33 1092 , chunking_enabled_(spooler_definition.use_file_chunking)
34 1092 , minimal_chunk_size_(spooler_definition.min_file_chunk_size)
35 1092 , average_chunk_size_(spooler_definition.avg_file_chunk_size)
36 1092 , maximal_chunk_size_(spooler_definition.max_file_chunk_size)
37 1092 , spawned_(false)
38 1092 , uploader_(uploader)
39
4/8
✓ Branch 2 taken 1092 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1092 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 1092 times.
✗ Branch 9 not taken.
✓ Branch 22 taken 1092 times.
✗ Branch 23 not taken.
1092 , tube_ctr_inflight_pre_(kMaxFilesInFlight) {
40
1/2
✓ Branch 1 taken 1092 times.
✗ Branch 2 not taken.
1092 const unsigned nfork_base = std::max(1U, GetNumberOfCpuCores() / 8);
41
42
2/2
✓ Branch 0 taken 8736 times.
✓ Branch 1 taken 1092 times.
9828 for (unsigned i = 0; i < nfork_base * kNforkRegister; ++i) {
43
2/4
✓ Branch 1 taken 8736 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 8736 times.
✗ Branch 5 not taken.
8736 Tube<FileItem> *tube = new Tube<FileItem>();
44
1/2
✓ Branch 1 taken 8736 times.
✗ Branch 2 not taken.
8736 tubes_register_.TakeTube(tube);
45 TaskRegister *task = new TaskRegister(tube, &tube_ctr_inflight_pre_,
46
2/4
✓ Branch 1 taken 8736 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 8736 times.
✗ Branch 5 not taken.
8736 &tube_ctr_inflight_post_);
47
1/2
✓ Branch 1 taken 8736 times.
✗ Branch 2 not taken.
8736 task->RegisterListener(&IngestionPipeline::OnFileProcessed, this);
48
1/2
✓ Branch 1 taken 8736 times.
✗ Branch 2 not taken.
8736 tasks_register_.TakeConsumer(task);
49 }
50 1092 tubes_register_.Activate();
51
52
2/2
✓ Branch 0 taken 8736 times.
✓ Branch 1 taken 1092 times.
9828 for (unsigned i = 0; i < nfork_base * kNforkWrite; ++i) {
53
2/4
✓ Branch 1 taken 8736 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 8736 times.
✗ Branch 5 not taken.
8736 Tube<BlockItem> *t = new Tube<BlockItem>();
54
1/2
✓ Branch 1 taken 8736 times.
✗ Branch 2 not taken.
8736 tubes_write_.TakeTube(t);
55
3/6
✓ Branch 1 taken 8736 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 8736 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 8736 times.
✗ Branch 8 not taken.
8736 tasks_write_.TakeConsumer(new TaskWrite(t, &tubes_register_, uploader_));
56 }
57 1092 tubes_write_.Activate();
58
59
2/2
✓ Branch 0 taken 17472 times.
✓ Branch 1 taken 1092 times.
18564 for (unsigned i = 0; i < nfork_base * kNforkHash; ++i) {
60
2/4
✓ Branch 1 taken 17472 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 17472 times.
✗ Branch 5 not taken.
17472 Tube<BlockItem> *t = new Tube<BlockItem>();
61
1/2
✓ Branch 1 taken 17472 times.
✗ Branch 2 not taken.
17472 tubes_hash_.TakeTube(t);
62
3/6
✓ Branch 1 taken 17472 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 17472 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 17472 times.
✗ Branch 8 not taken.
17472 tasks_hash_.TakeConsumer(new TaskHash(t, &tubes_write_));
63 }
64 1092 tubes_hash_.Activate();
65
66
2/2
✓ Branch 0 taken 34944 times.
✓ Branch 1 taken 1092 times.
36036 for (unsigned i = 0; i < nfork_base * kNforkCompress; ++i) {
67
2/4
✓ Branch 1 taken 34944 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 34944 times.
✗ Branch 5 not taken.
34944 Tube<BlockItem> *t = new Tube<BlockItem>();
68
1/2
✓ Branch 1 taken 34944 times.
✗ Branch 2 not taken.
34944 tubes_compress_.TakeTube(t);
69
1/2
✓ Branch 1 taken 34944 times.
✗ Branch 2 not taken.
34944 tasks_compress_.TakeConsumer(
70
2/4
✓ Branch 1 taken 34944 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 34944 times.
✗ Branch 5 not taken.
34944 new TaskCompress(t, &tubes_hash_, &item_allocator_));
71 }
72 1092 tubes_compress_.Activate();
73
74
2/2
✓ Branch 0 taken 8736 times.
✓ Branch 1 taken 1092 times.
9828 for (unsigned i = 0; i < nfork_base * kNforkChunk; ++i) {
75
2/4
✓ Branch 1 taken 8736 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 8736 times.
✗ Branch 5 not taken.
8736 Tube<BlockItem> *t = new Tube<BlockItem>();
76
1/2
✓ Branch 1 taken 8736 times.
✗ Branch 2 not taken.
8736 tubes_chunk_.TakeTube(t);
77
1/2
✓ Branch 1 taken 8736 times.
✗ Branch 2 not taken.
8736 tasks_chunk_.TakeConsumer(
78
2/4
✓ Branch 1 taken 8736 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 8736 times.
✗ Branch 5 not taken.
8736 new TaskChunk(t, &tubes_compress_, &item_allocator_));
79 }
80 1092 tubes_chunk_.Activate();
81
82 1092 uint64_t high = kMaxPipelineMem;
83 1092 high = std::min(high, platform_memsize() / 5);
84 1092 char *fixed_limit_mb = getenv("_CVMFS_SERVER_PIPELINE_MB");
85
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1092 times.
1092 if (fixed_limit_mb != NULL) {
86 high = String2Uint64(fixed_limit_mb) * 1024 * 1024;
87 }
88 1092 const uint64_t low = (high * 2) / 3;
89
1/2
✓ Branch 1 taken 1092 times.
✗ Branch 2 not taken.
1092 LogCvmfs(kLogCvmfs, kLogDebug,
90 "pipeline memory thresholds %" PRIu64 "/%" PRIu64 " M",
91 low / (1024 * 1024), high / (1024 * 1024));
92
2/2
✓ Branch 0 taken 69888 times.
✓ Branch 1 taken 1092 times.
70980 for (unsigned i = 0; i < nfork_base * kNforkRead; ++i) {
93 TaskRead *task_read = new TaskRead(&tube_input_, &tubes_chunk_,
94
2/4
✓ Branch 1 taken 69888 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 69888 times.
✗ Branch 5 not taken.
69888 &item_allocator_);
95
1/2
✓ Branch 1 taken 69888 times.
✗ Branch 2 not taken.
69888 task_read->SetWatermarks(low, high);
96
1/2
✓ Branch 1 taken 69888 times.
✗ Branch 2 not taken.
69888 tasks_read_.TakeConsumer(task_read);
97 }
98 1092 }
99
100
101 3446 IngestionPipeline::~IngestionPipeline() {
102
1/2
✓ Branch 0 taken 1091 times.
✗ Branch 1 not taken.
2182 if (spawned_) {
103 2182 tasks_read_.Terminate();
104 2182 tasks_chunk_.Terminate();
105 2182 tasks_compress_.Terminate();
106 2182 tasks_hash_.Terminate();
107 2182 tasks_write_.Terminate();
108 2182 tasks_register_.Terminate();
109 }
110 3446 }
111
112
113 6750344 void IngestionPipeline::OnFileProcessed(
114 const upload::SpoolerResult &spooler_result) {
115 6750344 NotifyListeners(spooler_result);
116 6750209 }
117
118
119 6752126 void IngestionPipeline::Process(IngestionSource *source,
120 bool allow_chunking,
121 shash::Suffix hash_suffix) {
122 FileItem *file_item = new FileItem(source,
123 6752126 minimal_chunk_size_,
124 6752126 average_chunk_size_,
125 6752126 maximal_chunk_size_,
126 6752126 compression_algorithm_,
127 6752126 hash_algorithm_,
128 hash_suffix,
129
1/2
✓ Branch 0 taken 6750683 times.
✗ Branch 1 not taken.
6752126 allow_chunking && chunking_enabled_,
130
3/4
✓ Branch 1 taken 6750683 times.
✓ Branch 2 taken 1443 times.
✓ Branch 4 taken 6752126 times.
✗ Branch 5 not taken.
13504252 generate_legacy_bulk_chunks_);
131 6752126 tube_ctr_inflight_post_.EnqueueBack(file_item);
132 6752126 tube_ctr_inflight_pre_.EnqueueBack(file_item);
133 6752126 tube_input_.EnqueueBack(file_item);
134 6752126 }
135
136
137 1092 void IngestionPipeline::Spawn() {
138 1092 tasks_register_.Spawn();
139 1092 tasks_write_.Spawn();
140 1092 tasks_hash_.Spawn();
141 1092 tasks_compress_.Spawn();
142 1092 tasks_chunk_.Spawn();
143 1092 tasks_read_.Spawn();
144 1092 spawned_ = true;
145 1092 }
146
147
148 1549 void IngestionPipeline::WaitFor() { tube_ctr_inflight_post_.Wait(); }
149
150
151 //------------------------------------------------------------------------------
152
153
154 31 void TaskScrubbingCallback::Process(BlockItem *block_item) {
155 31 FileItem *file_item = block_item->file_item();
156
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31 times.
31 assert(file_item != NULL);
157
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 31 times.
31 assert(!file_item->path().empty());
158 31 ChunkItem *chunk_item = block_item->chunk_item();
159
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31 times.
31 assert(chunk_item != NULL);
160
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 31 times.
31 assert(chunk_item->is_bulk_chunk());
161
162
1/3
✗ Branch 1 not taken.
✓ Branch 2 taken 31 times.
✗ Branch 3 not taken.
31 switch (block_item->type()) {
163 case BlockItem::kBlockData:
164 delete block_item;
165 break;
166
167 31 case BlockItem::kBlockStop:
168
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 31 times.
31 assert(!chunk_item->hash_ptr()->IsNull());
169
1/2
✓ Branch 1 taken 31 times.
✗ Branch 2 not taken.
31 NotifyListeners(
170
1/2
✓ Branch 3 taken 31 times.
✗ Branch 4 not taken.
62 ScrubbingResult(file_item->path(), *chunk_item->hash_ptr()));
171
1/2
✓ Branch 0 taken 31 times.
✗ Branch 1 not taken.
31 delete block_item;
172
1/2
✓ Branch 0 taken 31 times.
✗ Branch 1 not taken.
31 delete chunk_item;
173
1/2
✓ Branch 0 taken 31 times.
✗ Branch 1 not taken.
31 delete file_item;
174 31 tube_counter_->PopFront();
175 31 break;
176
177 default:
178 PANIC(NULL);
179 }
180 31 }
181
182
183 //------------------------------------------------------------------------------
184
185
186 31 ScrubbingPipeline::ScrubbingPipeline()
187
3/6
✓ Branch 2 taken 31 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 31 times.
✗ Branch 6 not taken.
✓ Branch 15 taken 31 times.
✗ Branch 16 not taken.
31 : spawned_(false), tube_counter_(kMaxFilesInFlight) {
188
1/2
✓ Branch 1 taken 31 times.
✗ Branch 2 not taken.
31 const unsigned nfork_base = std::max(1U, GetNumberOfCpuCores() / 8);
189
190
2/2
✓ Branch 0 taken 248 times.
✓ Branch 1 taken 31 times.
279 for (unsigned i = 0; i < nfork_base * kNforkScrubbingCallback; ++i) {
191
2/4
✓ Branch 1 taken 248 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 248 times.
✗ Branch 5 not taken.
248 Tube<BlockItem> *tube = new Tube<BlockItem>();
192
1/2
✓ Branch 1 taken 248 times.
✗ Branch 2 not taken.
248 tubes_scrubbing_callback_.TakeTube(tube);
193 TaskScrubbingCallback *task = new TaskScrubbingCallback(tube,
194
2/4
✓ Branch 1 taken 248 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 248 times.
✗ Branch 5 not taken.
248 &tube_counter_);
195
1/2
✓ Branch 1 taken 248 times.
✗ Branch 2 not taken.
248 task->RegisterListener(&ScrubbingPipeline::OnFileProcessed, this);
196
1/2
✓ Branch 1 taken 248 times.
✗ Branch 2 not taken.
248 tasks_scrubbing_callback_.TakeConsumer(task);
197 }
198 31 tubes_scrubbing_callback_.Activate();
199
200
2/2
✓ Branch 0 taken 496 times.
✓ Branch 1 taken 31 times.
527 for (unsigned i = 0; i < nfork_base * kNforkHash; ++i) {
201
2/4
✓ Branch 1 taken 496 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 496 times.
✗ Branch 5 not taken.
496 Tube<BlockItem> *t = new Tube<BlockItem>();
202
1/2
✓ Branch 1 taken 496 times.
✗ Branch 2 not taken.
496 tubes_hash_.TakeTube(t);
203
3/6
✓ Branch 1 taken 496 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 496 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 496 times.
✗ Branch 8 not taken.
496 tasks_hash_.TakeConsumer(new TaskHash(t, &tubes_scrubbing_callback_));
204 }
205 31 tubes_hash_.Activate();
206
207
2/2
✓ Branch 0 taken 248 times.
✓ Branch 1 taken 31 times.
279 for (unsigned i = 0; i < nfork_base * kNforkChunk; ++i) {
208
2/4
✓ Branch 1 taken 248 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 248 times.
✗ Branch 5 not taken.
248 Tube<BlockItem> *t = new Tube<BlockItem>();
209
1/2
✓ Branch 1 taken 248 times.
✗ Branch 2 not taken.
248 tubes_chunk_.TakeTube(t);
210
3/6
✓ Branch 1 taken 248 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 248 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 248 times.
✗ Branch 8 not taken.
248 tasks_chunk_.TakeConsumer(new TaskChunk(t, &tubes_hash_, &item_allocator_));
211 }
212 31 tubes_chunk_.Activate();
213
214
2/2
✓ Branch 0 taken 1984 times.
✓ Branch 1 taken 31 times.
2015 for (unsigned i = 0; i < nfork_base * kNforkRead; ++i) {
215 TaskRead *task_read = new TaskRead(&tube_input_, &tubes_chunk_,
216
2/4
✓ Branch 1 taken 1984 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1984 times.
✗ Branch 5 not taken.
1984 &item_allocator_);
217
1/2
✓ Branch 1 taken 1984 times.
✗ Branch 2 not taken.
1984 task_read->SetWatermarks(kMemLowWatermark, kMemHighWatermark);
218
1/2
✓ Branch 1 taken 1984 times.
✗ Branch 2 not taken.
1984 tasks_read_.TakeConsumer(task_read);
219 }
220 31 }
221
222
223 124 ScrubbingPipeline::~ScrubbingPipeline() {
224
1/2
✓ Branch 0 taken 31 times.
✗ Branch 1 not taken.
62 if (spawned_) {
225 62 tasks_read_.Terminate();
226 62 tasks_chunk_.Terminate();
227 62 tasks_hash_.Terminate();
228 62 tasks_scrubbing_callback_.Terminate();
229 }
230 124 }
231
232
233 31 void ScrubbingPipeline::OnFileProcessed(
234 const ScrubbingResult &scrubbing_result) {
235 31 NotifyListeners(scrubbing_result);
236 31 }
237
238
239 31 void ScrubbingPipeline::Process(IngestionSource *source,
240 shash::Algorithms hash_algorithm,
241 shash::Suffix hash_suffix) {
242 FileItem *file_item = new FileItem(source, 0, 0, 0, zlib::kNoCompression,
243 hash_algorithm, hash_suffix,
244 false, /* may_have_chunks */
245
1/2
✓ Branch 2 taken 31 times.
✗ Branch 3 not taken.
31 true /* hash_legacy_bulk_chunk */);
246 31 tube_counter_.EnqueueBack(file_item);
247 31 tube_input_.EnqueueBack(file_item);
248 31 }
249
250
251 31 void ScrubbingPipeline::Spawn() {
252 31 tasks_scrubbing_callback_.Spawn();
253 31 tasks_hash_.Spawn();
254 31 tasks_chunk_.Spawn();
255 31 tasks_read_.Spawn();
256 31 spawned_ = true;
257 31 }
258
259
260 31 void ScrubbingPipeline::WaitFor() { tube_counter_.Wait(); }
261