GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/ingestion/pipeline.cc
Date: 2024-04-28 02:33:07
Exec Total Coverage
Lines: 159 165 96.4%
Branches: 107 199 53.8%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "cvmfs_config.h"
6 #include "pipeline.h"
7
8 #include <algorithm>
9 #include <cstdlib>
10
11 #include "ingestion/task_chunk.h"
12 #include "ingestion/task_compress.h"
13 #include "ingestion/task_hash.h"
14 #include "ingestion/task_read.h"
15 #include "ingestion/task_register.h"
16 #include "ingestion/task_write.h"
17 #include "upload_facility.h"
18 #include "upload_spooler_definition.h"
19 #include "util/concurrency.h"
20 #include "util/exception.h"
21 #include "util/platform.h"
22 #include "util/string.h"
23
24 const uint64_t IngestionPipeline::kMaxPipelineMem = 1024 * 1024 * 1024;
25
26 43 IngestionPipeline::IngestionPipeline(
27 upload::AbstractUploader *uploader,
28 43 const upload::SpoolerDefinition &spooler_definition)
29 43 : compression_algorithm_(spooler_definition.compression_alg)
30 43 , hash_algorithm_(spooler_definition.hash_algorithm)
31 43 , generate_legacy_bulk_chunks_(spooler_definition.generate_legacy_bulk_chunks)
32 43 , chunking_enabled_(spooler_definition.use_file_chunking)
33 43 , minimal_chunk_size_(spooler_definition.min_file_chunk_size)
34 43 , average_chunk_size_(spooler_definition.avg_file_chunk_size)
35 43 , maximal_chunk_size_(spooler_definition.max_file_chunk_size)
36 43 , spawned_(false)
37 43 , uploader_(uploader)
38
4/8
✓ Branch 2 taken 43 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 43 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 43 times.
✗ Branch 9 not taken.
✓ Branch 22 taken 43 times.
✗ Branch 23 not taken.
43 , tube_ctr_inflight_pre_(kMaxFilesInFlight)
39 {
40
1/2
✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
43 unsigned nfork_base = std::max(1U, GetNumberOfCpuCores() / 8);
41
42
2/2
✓ Branch 0 taken 344 times.
✓ Branch 1 taken 43 times.
387 for (unsigned i = 0; i < nfork_base * kNforkRegister; ++i) {
43
2/4
✓ Branch 1 taken 344 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 344 times.
✗ Branch 5 not taken.
344 Tube<FileItem> *tube = new Tube<FileItem>();
44
1/2
✓ Branch 1 taken 344 times.
✗ Branch 2 not taken.
344 tubes_register_.TakeTube(tube);
45 TaskRegister *task = new TaskRegister(tube,
46
2/4
✓ Branch 1 taken 344 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 344 times.
✗ Branch 5 not taken.
344 &tube_ctr_inflight_pre_, &tube_ctr_inflight_post_);
47
1/2
✓ Branch 1 taken 344 times.
✗ Branch 2 not taken.
344 task->RegisterListener(&IngestionPipeline::OnFileProcessed, this);
48
1/2
✓ Branch 1 taken 344 times.
✗ Branch 2 not taken.
344 tasks_register_.TakeConsumer(task);
49 }
50 43 tubes_register_.Activate();
51
52
2/2
✓ Branch 0 taken 344 times.
✓ Branch 1 taken 43 times.
387 for (unsigned i = 0; i < nfork_base * kNforkWrite; ++i) {
53
2/4
✓ Branch 1 taken 344 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 344 times.
✗ Branch 5 not taken.
344 Tube<BlockItem> *t = new Tube<BlockItem>();
54
1/2
✓ Branch 1 taken 344 times.
✗ Branch 2 not taken.
344 tubes_write_.TakeTube(t);
55
3/6
✓ Branch 1 taken 344 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 344 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 344 times.
✗ Branch 8 not taken.
344 tasks_write_.TakeConsumer(new TaskWrite(t, &tubes_register_, uploader_));
56 }
57 43 tubes_write_.Activate();
58
59
2/2
✓ Branch 0 taken 688 times.
✓ Branch 1 taken 43 times.
731 for (unsigned i = 0; i < nfork_base * kNforkHash; ++i) {
60
2/4
✓ Branch 1 taken 688 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 688 times.
✗ Branch 5 not taken.
688 Tube<BlockItem> *t = new Tube<BlockItem>();
61
1/2
✓ Branch 1 taken 688 times.
✗ Branch 2 not taken.
688 tubes_hash_.TakeTube(t);
62
3/6
✓ Branch 1 taken 688 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 688 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 688 times.
✗ Branch 8 not taken.
688 tasks_hash_.TakeConsumer(new TaskHash(t, &tubes_write_));
63 }
64 43 tubes_hash_.Activate();
65
66
2/2
✓ Branch 0 taken 1376 times.
✓ Branch 1 taken 43 times.
1419 for (unsigned i = 0; i < nfork_base * kNforkCompress; ++i) {
67
2/4
✓ Branch 1 taken 1376 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1376 times.
✗ Branch 5 not taken.
1376 Tube<BlockItem> *t = new Tube<BlockItem>();
68
1/2
✓ Branch 1 taken 1376 times.
✗ Branch 2 not taken.
1376 tubes_compress_.TakeTube(t);
69
1/2
✓ Branch 1 taken 1376 times.
✗ Branch 2 not taken.
1376 tasks_compress_.TakeConsumer(
70
2/4
✓ Branch 1 taken 1376 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1376 times.
✗ Branch 5 not taken.
1376 new TaskCompress(t, &tubes_hash_, &item_allocator_));
71 }
72 43 tubes_compress_.Activate();
73
74
2/2
✓ Branch 0 taken 344 times.
✓ Branch 1 taken 43 times.
387 for (unsigned i = 0; i < nfork_base * kNforkChunk; ++i) {
75
2/4
✓ Branch 1 taken 344 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 344 times.
✗ Branch 5 not taken.
344 Tube<BlockItem> *t = new Tube<BlockItem>();
76
1/2
✓ Branch 1 taken 344 times.
✗ Branch 2 not taken.
344 tubes_chunk_.TakeTube(t);
77
1/2
✓ Branch 1 taken 344 times.
✗ Branch 2 not taken.
344 tasks_chunk_.TakeConsumer(
78
2/4
✓ Branch 1 taken 344 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 344 times.
✗ Branch 5 not taken.
344 new TaskChunk(t, &tubes_compress_, &item_allocator_));
79 }
80 43 tubes_chunk_.Activate();
81
82 43 uint64_t high = kMaxPipelineMem;
83 43 high = std::min(high, platform_memsize() / 5);
84 43 char *fixed_limit_mb = getenv("_CVMFS_SERVER_PIPELINE_MB");
85
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 43 times.
43 if (fixed_limit_mb != NULL) {
86 high = String2Uint64(fixed_limit_mb) * 1024 * 1024;
87 }
88 43 uint64_t low = (high * 2) / 3;
89
1/2
✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
43 LogCvmfs(kLogCvmfs, kLogDebug,
90 "pipeline memory thresholds %" PRIu64 "/%" PRIu64 " M",
91 low / (1024 * 1024), high / (1024 * 1024));
92
2/2
✓ Branch 0 taken 2752 times.
✓ Branch 1 taken 43 times.
2795 for (unsigned i = 0; i < nfork_base * kNforkRead; ++i) {
93 TaskRead *task_read =
94
2/4
✓ Branch 1 taken 2752 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2752 times.
✗ Branch 5 not taken.
2752 new TaskRead(&tube_input_, &tubes_chunk_, &item_allocator_);
95
1/2
✓ Branch 1 taken 2752 times.
✗ Branch 2 not taken.
2752 task_read->SetWatermarks(low, high);
96
1/2
✓ Branch 1 taken 2752 times.
✗ Branch 2 not taken.
2752 tasks_read_.TakeConsumer(task_read);
97 }
98 43 }
99
100
101 138 IngestionPipeline::~IngestionPipeline() {
102
1/2
✓ Branch 0 taken 43 times.
✗ Branch 1 not taken.
86 if (spawned_) {
103 86 tasks_read_.Terminate();
104 86 tasks_chunk_.Terminate();
105 86 tasks_compress_.Terminate();
106 86 tasks_hash_.Terminate();
107 86 tasks_write_.Terminate();
108 86 tasks_register_.Terminate();
109 }
110 138 }
111
112
113 250058 void IngestionPipeline::OnFileProcessed(
114 const upload::SpoolerResult &spooler_result)
115 {
116 250058 NotifyListeners(spooler_result);
117 250065 }
118
119
120 250081 void IngestionPipeline::Process(
121 IngestionSource* source,
122 bool allow_chunking,
123 shash::Suffix hash_suffix)
124 {
125 FileItem *file_item = new FileItem(
126 source,
127 250081 minimal_chunk_size_,
128 250081 average_chunk_size_,
129 250081 maximal_chunk_size_,
130 250081 compression_algorithm_,
131 250081 hash_algorithm_,
132 hash_suffix,
133
1/2
✓ Branch 0 taken 250025 times.
✗ Branch 1 not taken.
250081 allow_chunking && chunking_enabled_,
134
3/4
✓ Branch 1 taken 250025 times.
✓ Branch 2 taken 56 times.
✓ Branch 4 taken 250081 times.
✗ Branch 5 not taken.
500162 generate_legacy_bulk_chunks_);
135 250081 tube_ctr_inflight_post_.EnqueueBack(file_item);
136 250081 tube_ctr_inflight_pre_.EnqueueBack(file_item);
137 250081 tube_input_.EnqueueBack(file_item);
138 250081 }
139
140
141 43 void IngestionPipeline::Spawn() {
142 43 tasks_register_.Spawn();
143 43 tasks_write_.Spawn();
144 43 tasks_hash_.Spawn();
145 43 tasks_compress_.Spawn();
146 43 tasks_chunk_.Spawn();
147 43 tasks_read_.Spawn();
148 43 spawned_ = true;
149 43 }
150
151
152 63 void IngestionPipeline::WaitFor() {
153 63 tube_ctr_inflight_post_.Wait();
154 63 }
155
156
157 //------------------------------------------------------------------------------
158
159
160 1 void TaskScrubbingCallback::Process(BlockItem *block_item) {
161 1 FileItem *file_item = block_item->file_item();
162
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 assert(file_item != NULL);
163
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 1 times.
1 assert(!file_item->path().empty());
164 1 ChunkItem *chunk_item = block_item->chunk_item();
165
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 assert(chunk_item != NULL);
166
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
1 assert(chunk_item->is_bulk_chunk());
167
168
1/3
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 switch (block_item->type()) {
169 case BlockItem::kBlockData:
170 delete block_item;
171 break;
172
173 1 case BlockItem::kBlockStop:
174
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 1 times.
1 assert(!chunk_item->hash_ptr()->IsNull());
175
2/4
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1 times.
✗ Branch 6 not taken.
1 NotifyListeners(ScrubbingResult(file_item->path(),
176 1 *chunk_item->hash_ptr()));
177
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 delete block_item;
178
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 delete chunk_item;
179
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 delete file_item;
180 1 tube_counter_->PopFront();
181 1 break;
182
183 default:
184 PANIC(NULL);
185 }
186 1 }
187
188
189 //------------------------------------------------------------------------------
190
191
192 1 ScrubbingPipeline::ScrubbingPipeline()
193 1 : spawned_(false)
194
3/6
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1 times.
✗ Branch 6 not taken.
✓ Branch 15 taken 1 times.
✗ Branch 16 not taken.
1 , tube_counter_(kMaxFilesInFlight)
195 {
196
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 unsigned nfork_base = std::max(1U, GetNumberOfCpuCores() / 8);
197
198
2/2
✓ Branch 0 taken 8 times.
✓ Branch 1 taken 1 times.
9 for (unsigned i = 0; i < nfork_base * kNforkScrubbingCallback; ++i) {
199
2/4
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 8 times.
✗ Branch 5 not taken.
8 Tube<BlockItem> *tube = new Tube<BlockItem>();
200
1/2
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
8 tubes_scrubbing_callback_.TakeTube(tube);
201 TaskScrubbingCallback *task =
202
2/4
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 8 times.
✗ Branch 5 not taken.
8 new TaskScrubbingCallback(tube, &tube_counter_);
203
1/2
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
8 task->RegisterListener(&ScrubbingPipeline::OnFileProcessed, this);
204
1/2
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
8 tasks_scrubbing_callback_.TakeConsumer(task);
205 }
206 1 tubes_scrubbing_callback_.Activate();
207
208
2/2
✓ Branch 0 taken 16 times.
✓ Branch 1 taken 1 times.
17 for (unsigned i = 0; i < nfork_base * kNforkHash; ++i) {
209
2/4
✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 16 times.
✗ Branch 5 not taken.
16 Tube<BlockItem> *t = new Tube<BlockItem>();
210
1/2
✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
16 tubes_hash_.TakeTube(t);
211
3/6
✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 16 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 16 times.
✗ Branch 8 not taken.
16 tasks_hash_.TakeConsumer(new TaskHash(t, &tubes_scrubbing_callback_));
212 }
213 1 tubes_hash_.Activate();
214
215
2/2
✓ Branch 0 taken 8 times.
✓ Branch 1 taken 1 times.
9 for (unsigned i = 0; i < nfork_base * kNforkChunk; ++i) {
216
2/4
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 8 times.
✗ Branch 5 not taken.
8 Tube<BlockItem> *t = new Tube<BlockItem>();
217
1/2
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
8 tubes_chunk_.TakeTube(t);
218
1/2
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
8 tasks_chunk_.TakeConsumer(
219
2/4
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 8 times.
✗ Branch 5 not taken.
8 new TaskChunk(t, &tubes_hash_, &item_allocator_));
220 }
221 1 tubes_chunk_.Activate();
222
223
2/2
✓ Branch 0 taken 64 times.
✓ Branch 1 taken 1 times.
65 for (unsigned i = 0; i < nfork_base * kNforkRead; ++i) {
224 TaskRead *task_read =
225
2/4
✓ Branch 1 taken 64 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 64 times.
✗ Branch 5 not taken.
64 new TaskRead(&tube_input_, &tubes_chunk_, &item_allocator_);
226
1/2
✓ Branch 1 taken 64 times.
✗ Branch 2 not taken.
64 task_read->SetWatermarks(kMemLowWatermark, kMemHighWatermark);
227
1/2
✓ Branch 1 taken 64 times.
✗ Branch 2 not taken.
64 tasks_read_.TakeConsumer(task_read);
228 }
229 1 }
230
231
232 4 ScrubbingPipeline::~ScrubbingPipeline() {
233
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
2 if (spawned_) {
234 2 tasks_read_.Terminate();
235 2 tasks_chunk_.Terminate();
236 2 tasks_hash_.Terminate();
237 2 tasks_scrubbing_callback_.Terminate();
238 }
239 4 }
240
241
242 1 void ScrubbingPipeline::OnFileProcessed(
243 const ScrubbingResult &scrubbing_result)
244 {
245 1 NotifyListeners(scrubbing_result);
246 1 }
247
248
249 1 void ScrubbingPipeline::Process(
250 IngestionSource *source,
251 shash::Algorithms hash_algorithm,
252 shash::Suffix hash_suffix)
253 {
254 FileItem *file_item = new FileItem(
255 source,
256 0, 0, 0,
257 zlib::kNoCompression,
258 hash_algorithm,
259 hash_suffix,
260 false, /* may_have_chunks */
261
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 true /* hash_legacy_bulk_chunk */);
262 1 tube_counter_.EnqueueBack(file_item);
263 1 tube_input_.EnqueueBack(file_item);
264 1 }
265
266
267 1 void ScrubbingPipeline::Spawn() {
268 1 tasks_scrubbing_callback_.Spawn();
269 1 tasks_hash_.Spawn();
270 1 tasks_chunk_.Spawn();
271 1 tasks_read_.Spawn();
272 1 spawned_ = true;
273 1 }
274
275
276 1 void ScrubbingPipeline::WaitFor() {
277 1 tube_counter_.Wait();
278 1 }
279