| Directory: | cvmfs/ |
|---|---|
| File: | cvmfs/ingestion/pipeline.cc |
| Date: | 2025-11-30 02:35:17 |
| Exec | Total | Coverage | |
|---|---|---|---|
| Lines: | 154 | 160 | 96.2% |
| Branches: | 107 | 199 | 53.8% |
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | /** | ||
| 2 | * This file is part of the CernVM File System. | ||
| 3 | */ | ||
| 4 | |||
| 5 | |||
| 6 | #include "pipeline.h" | ||
| 7 | |||
| 8 | #include <algorithm> | ||
| 9 | #include <cstdlib> | ||
| 10 | |||
| 11 | #include "ingestion/task_chunk.h" | ||
| 12 | #include "ingestion/task_compress.h" | ||
| 13 | #include "ingestion/task_hash.h" | ||
| 14 | #include "ingestion/task_read.h" | ||
| 15 | #include "ingestion/task_register.h" | ||
| 16 | #include "ingestion/task_write.h" | ||
| 17 | #include "upload_facility.h" | ||
| 18 | #include "upload_spooler_definition.h" | ||
| 19 | #include "util/concurrency.h" | ||
| 20 | #include "util/exception.h" | ||
| 21 | #include "util/platform.h" | ||
| 22 | #include "util/string.h" | ||
| 23 | |||
| 24 | const uint64_t IngestionPipeline::kMaxPipelineMem = 1024 * 1024 * 1024; | ||
| 25 | |||
| 26 | 1422 | IngestionPipeline::IngestionPipeline( | |
| 27 | upload::AbstractUploader *uploader, | ||
| 28 | 1422 | const upload::SpoolerDefinition &spooler_definition) | |
| 29 | 1422 | : compression_algorithm_(spooler_definition.compression_alg) | |
| 30 | 1422 | , hash_algorithm_(spooler_definition.hash_algorithm) | |
| 31 | 1422 | , generate_legacy_bulk_chunks_( | |
| 32 | 1422 | spooler_definition.generate_legacy_bulk_chunks) | |
| 33 | 1422 | , chunking_enabled_(spooler_definition.use_file_chunking) | |
| 34 | 1422 | , minimal_chunk_size_(spooler_definition.min_file_chunk_size) | |
| 35 | 1422 | , average_chunk_size_(spooler_definition.avg_file_chunk_size) | |
| 36 | 1422 | , maximal_chunk_size_(spooler_definition.max_file_chunk_size) | |
| 37 | 1422 | , spawned_(false) | |
| 38 | 1422 | , uploader_(uploader) | |
| 39 |
4/8✓ Branch 2 taken 1422 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1422 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 1422 times.
✗ Branch 9 not taken.
✓ Branch 22 taken 1422 times.
✗ Branch 23 not taken.
|
1422 | , tube_ctr_inflight_pre_(kMaxFilesInFlight) { |
| 40 |
1/2✓ Branch 1 taken 1422 times.
✗ Branch 2 not taken.
|
1422 | const unsigned nfork_base = std::max(1U, GetNumberOfCpuCores() / 8); |
| 41 | |||
| 42 |
2/2✓ Branch 0 taken 11376 times.
✓ Branch 1 taken 1422 times.
|
12798 | for (unsigned i = 0; i < nfork_base * kNforkRegister; ++i) { |
| 43 |
2/4✓ Branch 1 taken 11376 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 11376 times.
✗ Branch 5 not taken.
|
11376 | Tube<FileItem> *tube = new Tube<FileItem>(); |
| 44 |
1/2✓ Branch 1 taken 11376 times.
✗ Branch 2 not taken.
|
11376 | tubes_register_.TakeTube(tube); |
| 45 | TaskRegister *task = new TaskRegister(tube, &tube_ctr_inflight_pre_, | ||
| 46 |
2/4✓ Branch 1 taken 11376 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 11376 times.
✗ Branch 5 not taken.
|
11376 | &tube_ctr_inflight_post_); |
| 47 |
1/2✓ Branch 1 taken 11376 times.
✗ Branch 2 not taken.
|
11376 | task->RegisterListener(&IngestionPipeline::OnFileProcessed, this); |
| 48 |
1/2✓ Branch 1 taken 11376 times.
✗ Branch 2 not taken.
|
11376 | tasks_register_.TakeConsumer(task); |
| 49 | } | ||
| 50 | 1422 | tubes_register_.Activate(); | |
| 51 | |||
| 52 |
2/2✓ Branch 0 taken 11376 times.
✓ Branch 1 taken 1422 times.
|
12798 | for (unsigned i = 0; i < nfork_base * kNforkWrite; ++i) { |
| 53 |
2/4✓ Branch 1 taken 11376 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 11376 times.
✗ Branch 5 not taken.
|
11376 | Tube<BlockItem> *t = new Tube<BlockItem>(); |
| 54 |
1/2✓ Branch 1 taken 11376 times.
✗ Branch 2 not taken.
|
11376 | tubes_write_.TakeTube(t); |
| 55 |
3/6✓ Branch 1 taken 11376 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 11376 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 11376 times.
✗ Branch 8 not taken.
|
11376 | tasks_write_.TakeConsumer(new TaskWrite(t, &tubes_register_, uploader_)); |
| 56 | } | ||
| 57 | 1422 | tubes_write_.Activate(); | |
| 58 | |||
| 59 |
2/2✓ Branch 0 taken 22752 times.
✓ Branch 1 taken 1422 times.
|
24174 | for (unsigned i = 0; i < nfork_base * kNforkHash; ++i) { |
| 60 |
2/4✓ Branch 1 taken 22752 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 22752 times.
✗ Branch 5 not taken.
|
22752 | Tube<BlockItem> *t = new Tube<BlockItem>(); |
| 61 |
1/2✓ Branch 1 taken 22752 times.
✗ Branch 2 not taken.
|
22752 | tubes_hash_.TakeTube(t); |
| 62 |
3/6✓ Branch 1 taken 22752 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 22752 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 22752 times.
✗ Branch 8 not taken.
|
22752 | tasks_hash_.TakeConsumer(new TaskHash(t, &tubes_write_)); |
| 63 | } | ||
| 64 | 1422 | tubes_hash_.Activate(); | |
| 65 | |||
| 66 |
2/2✓ Branch 0 taken 45504 times.
✓ Branch 1 taken 1422 times.
|
46926 | for (unsigned i = 0; i < nfork_base * kNforkCompress; ++i) { |
| 67 |
2/4✓ Branch 1 taken 45504 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 45504 times.
✗ Branch 5 not taken.
|
45504 | Tube<BlockItem> *t = new Tube<BlockItem>(); |
| 68 |
1/2✓ Branch 1 taken 45504 times.
✗ Branch 2 not taken.
|
45504 | tubes_compress_.TakeTube(t); |
| 69 |
1/2✓ Branch 1 taken 45504 times.
✗ Branch 2 not taken.
|
45504 | tasks_compress_.TakeConsumer( |
| 70 |
2/4✓ Branch 1 taken 45504 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 45504 times.
✗ Branch 5 not taken.
|
45504 | new TaskCompress(t, &tubes_hash_, &item_allocator_)); |
| 71 | } | ||
| 72 | 1422 | tubes_compress_.Activate(); | |
| 73 | |||
| 74 |
2/2✓ Branch 0 taken 11376 times.
✓ Branch 1 taken 1422 times.
|
12798 | for (unsigned i = 0; i < nfork_base * kNforkChunk; ++i) { |
| 75 |
2/4✓ Branch 1 taken 11376 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 11376 times.
✗ Branch 5 not taken.
|
11376 | Tube<BlockItem> *t = new Tube<BlockItem>(); |
| 76 |
1/2✓ Branch 1 taken 11376 times.
✗ Branch 2 not taken.
|
11376 | tubes_chunk_.TakeTube(t); |
| 77 |
1/2✓ Branch 1 taken 11376 times.
✗ Branch 2 not taken.
|
11376 | tasks_chunk_.TakeConsumer( |
| 78 |
2/4✓ Branch 1 taken 11376 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 11376 times.
✗ Branch 5 not taken.
|
11376 | new TaskChunk(t, &tubes_compress_, &item_allocator_)); |
| 79 | } | ||
| 80 | 1422 | tubes_chunk_.Activate(); | |
| 81 | |||
| 82 | 1422 | uint64_t high = kMaxPipelineMem; | |
| 83 | 1422 | high = std::min(high, platform_memsize() / 5); | |
| 84 | 1422 | char *fixed_limit_mb = getenv("_CVMFS_SERVER_PIPELINE_MB"); | |
| 85 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1422 times.
|
1422 | if (fixed_limit_mb != NULL) { |
| 86 | ✗ | high = String2Uint64(fixed_limit_mb) * 1024 * 1024; | |
| 87 | } | ||
| 88 | 1422 | const uint64_t low = (high * 2) / 3; | |
| 89 |
1/2✓ Branch 1 taken 1422 times.
✗ Branch 2 not taken.
|
1422 | LogCvmfs(kLogCvmfs, kLogDebug, |
| 90 | "pipeline memory thresholds %" PRIu64 "/%" PRIu64 " M", | ||
| 91 | low / (1024 * 1024), high / (1024 * 1024)); | ||
| 92 |
2/2✓ Branch 0 taken 91008 times.
✓ Branch 1 taken 1422 times.
|
92430 | for (unsigned i = 0; i < nfork_base * kNforkRead; ++i) { |
| 93 | TaskRead *task_read = new TaskRead(&tube_input_, &tubes_chunk_, | ||
| 94 |
2/4✓ Branch 1 taken 91008 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 91008 times.
✗ Branch 5 not taken.
|
91008 | &item_allocator_); |
| 95 |
1/2✓ Branch 1 taken 91008 times.
✗ Branch 2 not taken.
|
91008 | task_read->SetWatermarks(low, high); |
| 96 |
1/2✓ Branch 1 taken 91008 times.
✗ Branch 2 not taken.
|
91008 | tasks_read_.TakeConsumer(task_read); |
| 97 | } | ||
| 98 | 1422 | } | |
| 99 | |||
| 100 | |||
| 101 | 4120 | IngestionPipeline::~IngestionPipeline() { | |
| 102 |
1/2✓ Branch 0 taken 1421 times.
✗ Branch 1 not taken.
|
2842 | if (spawned_) { |
| 103 | 2842 | tasks_read_.Terminate(); | |
| 104 | 2842 | tasks_chunk_.Terminate(); | |
| 105 | 2842 | tasks_compress_.Terminate(); | |
| 106 | 2842 | tasks_hash_.Terminate(); | |
| 107 | 2842 | tasks_write_.Terminate(); | |
| 108 | 2842 | tasks_register_.Terminate(); | |
| 109 | } | ||
| 110 | 4120 | } | |
| 111 | |||
| 112 | |||
| 113 | 11501176 | void IngestionPipeline::OnFileProcessed( | |
| 114 | const upload::SpoolerResult &spooler_result) { | ||
| 115 | 11501176 | NotifyListeners(spooler_result); | |
| 116 | 11501590 | } | |
| 117 | |||
| 118 | |||
| 119 | 11503246 | void IngestionPipeline::Process(IngestionSource *source, | |
| 120 | bool allow_chunking, | ||
| 121 | shash::Suffix hash_suffix) { | ||
| 122 | FileItem *file_item = new FileItem(source, | ||
| 123 | 11503246 | minimal_chunk_size_, | |
| 124 | 11503246 | average_chunk_size_, | |
| 125 | 11503246 | maximal_chunk_size_, | |
| 126 | 11503246 | compression_algorithm_, | |
| 127 | 11503246 | hash_algorithm_, | |
| 128 | hash_suffix, | ||
| 129 |
1/2✓ Branch 0 taken 11501150 times.
✗ Branch 1 not taken.
|
11503246 | allow_chunking && chunking_enabled_, |
| 130 |
3/4✓ Branch 1 taken 11501150 times.
✓ Branch 2 taken 2096 times.
✓ Branch 4 taken 11503246 times.
✗ Branch 5 not taken.
|
23006492 | generate_legacy_bulk_chunks_); |
| 131 | 11503246 | tube_ctr_inflight_post_.EnqueueBack(file_item); | |
| 132 | 11503246 | tube_ctr_inflight_pre_.EnqueueBack(file_item); | |
| 133 | 11503246 | tube_input_.EnqueueBack(file_item); | |
| 134 | 11503246 | } | |
| 135 | |||
| 136 | |||
| 137 | 1422 | void IngestionPipeline::Spawn() { | |
| 138 | 1422 | tasks_register_.Spawn(); | |
| 139 | 1422 | tasks_write_.Spawn(); | |
| 140 | 1422 | tasks_hash_.Spawn(); | |
| 141 | 1422 | tasks_compress_.Spawn(); | |
| 142 | 1422 | tasks_chunk_.Spawn(); | |
| 143 | 1422 | tasks_read_.Spawn(); | |
| 144 | 1422 | spawned_ = true; | |
| 145 | 1422 | } | |
| 146 | |||
| 147 | |||
| 148 | 1972 | void IngestionPipeline::WaitFor() { tube_ctr_inflight_post_.Wait(); } | |
| 149 | |||
| 150 | |||
| 151 | //------------------------------------------------------------------------------ | ||
| 152 | |||
| 153 | |||
| 154 | 46 | void TaskScrubbingCallback::Process(BlockItem *block_item) { | |
| 155 | 46 | FileItem *file_item = block_item->file_item(); | |
| 156 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 46 times.
|
46 | assert(file_item != NULL); |
| 157 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 46 times.
|
46 | assert(!file_item->path().empty()); |
| 158 | 46 | ChunkItem *chunk_item = block_item->chunk_item(); | |
| 159 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 46 times.
|
46 | assert(chunk_item != NULL); |
| 160 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 46 times.
|
46 | assert(chunk_item->is_bulk_chunk()); |
| 161 | |||
| 162 |
1/3✗ Branch 1 not taken.
✓ Branch 2 taken 46 times.
✗ Branch 3 not taken.
|
46 | switch (block_item->type()) { |
| 163 | ✗ | case BlockItem::kBlockData: | |
| 164 | ✗ | delete block_item; | |
| 165 | ✗ | break; | |
| 166 | |||
| 167 | 46 | case BlockItem::kBlockStop: | |
| 168 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 46 times.
|
46 | assert(!chunk_item->hash_ptr()->IsNull()); |
| 169 |
1/2✓ Branch 1 taken 46 times.
✗ Branch 2 not taken.
|
46 | NotifyListeners( |
| 170 |
1/2✓ Branch 3 taken 46 times.
✗ Branch 4 not taken.
|
92 | ScrubbingResult(file_item->path(), *chunk_item->hash_ptr())); |
| 171 |
1/2✓ Branch 0 taken 46 times.
✗ Branch 1 not taken.
|
46 | delete block_item; |
| 172 |
1/2✓ Branch 0 taken 46 times.
✗ Branch 1 not taken.
|
46 | delete chunk_item; |
| 173 |
1/2✓ Branch 0 taken 46 times.
✗ Branch 1 not taken.
|
46 | delete file_item; |
| 174 | 46 | tube_counter_->PopFront(); | |
| 175 | 46 | break; | |
| 176 | |||
| 177 | ✗ | default: | |
| 178 | ✗ | PANIC(NULL); | |
| 179 | } | ||
| 180 | 46 | } | |
| 181 | |||
| 182 | |||
| 183 | //------------------------------------------------------------------------------ | ||
| 184 | |||
| 185 | |||
| 186 | 46 | ScrubbingPipeline::ScrubbingPipeline() | |
| 187 |
3/6✓ Branch 2 taken 46 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 46 times.
✗ Branch 6 not taken.
✓ Branch 15 taken 46 times.
✗ Branch 16 not taken.
|
46 | : spawned_(false), tube_counter_(kMaxFilesInFlight) { |
| 188 |
1/2✓ Branch 1 taken 46 times.
✗ Branch 2 not taken.
|
46 | const unsigned nfork_base = std::max(1U, GetNumberOfCpuCores() / 8); |
| 189 | |||
| 190 |
2/2✓ Branch 0 taken 368 times.
✓ Branch 1 taken 46 times.
|
414 | for (unsigned i = 0; i < nfork_base * kNforkScrubbingCallback; ++i) { |
| 191 |
2/4✓ Branch 1 taken 368 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 368 times.
✗ Branch 5 not taken.
|
368 | Tube<BlockItem> *tube = new Tube<BlockItem>(); |
| 192 |
1/2✓ Branch 1 taken 368 times.
✗ Branch 2 not taken.
|
368 | tubes_scrubbing_callback_.TakeTube(tube); |
| 193 | TaskScrubbingCallback *task = new TaskScrubbingCallback(tube, | ||
| 194 |
2/4✓ Branch 1 taken 368 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 368 times.
✗ Branch 5 not taken.
|
368 | &tube_counter_); |
| 195 |
1/2✓ Branch 1 taken 368 times.
✗ Branch 2 not taken.
|
368 | task->RegisterListener(&ScrubbingPipeline::OnFileProcessed, this); |
| 196 |
1/2✓ Branch 1 taken 368 times.
✗ Branch 2 not taken.
|
368 | tasks_scrubbing_callback_.TakeConsumer(task); |
| 197 | } | ||
| 198 | 46 | tubes_scrubbing_callback_.Activate(); | |
| 199 | |||
| 200 |
2/2✓ Branch 0 taken 736 times.
✓ Branch 1 taken 46 times.
|
782 | for (unsigned i = 0; i < nfork_base * kNforkHash; ++i) { |
| 201 |
2/4✓ Branch 1 taken 736 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 736 times.
✗ Branch 5 not taken.
|
736 | Tube<BlockItem> *t = new Tube<BlockItem>(); |
| 202 |
1/2✓ Branch 1 taken 736 times.
✗ Branch 2 not taken.
|
736 | tubes_hash_.TakeTube(t); |
| 203 |
3/6✓ Branch 1 taken 736 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 736 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 736 times.
✗ Branch 8 not taken.
|
736 | tasks_hash_.TakeConsumer(new TaskHash(t, &tubes_scrubbing_callback_)); |
| 204 | } | ||
| 205 | 46 | tubes_hash_.Activate(); | |
| 206 | |||
| 207 |
2/2✓ Branch 0 taken 368 times.
✓ Branch 1 taken 46 times.
|
414 | for (unsigned i = 0; i < nfork_base * kNforkChunk; ++i) { |
| 208 |
2/4✓ Branch 1 taken 368 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 368 times.
✗ Branch 5 not taken.
|
368 | Tube<BlockItem> *t = new Tube<BlockItem>(); |
| 209 |
1/2✓ Branch 1 taken 368 times.
✗ Branch 2 not taken.
|
368 | tubes_chunk_.TakeTube(t); |
| 210 |
3/6✓ Branch 1 taken 368 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 368 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 368 times.
✗ Branch 8 not taken.
|
368 | tasks_chunk_.TakeConsumer(new TaskChunk(t, &tubes_hash_, &item_allocator_)); |
| 211 | } | ||
| 212 | 46 | tubes_chunk_.Activate(); | |
| 213 | |||
| 214 |
2/2✓ Branch 0 taken 2944 times.
✓ Branch 1 taken 46 times.
|
2990 | for (unsigned i = 0; i < nfork_base * kNforkRead; ++i) { |
| 215 | TaskRead *task_read = new TaskRead(&tube_input_, &tubes_chunk_, | ||
| 216 |
2/4✓ Branch 1 taken 2944 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2944 times.
✗ Branch 5 not taken.
|
2944 | &item_allocator_); |
| 217 |
1/2✓ Branch 1 taken 2944 times.
✗ Branch 2 not taken.
|
2944 | task_read->SetWatermarks(kMemLowWatermark, kMemHighWatermark); |
| 218 |
1/2✓ Branch 1 taken 2944 times.
✗ Branch 2 not taken.
|
2944 | tasks_read_.TakeConsumer(task_read); |
| 219 | } | ||
| 220 | 46 | } | |
| 221 | |||
| 222 | |||
| 223 | 184 | ScrubbingPipeline::~ScrubbingPipeline() { | |
| 224 |
1/2✓ Branch 0 taken 46 times.
✗ Branch 1 not taken.
|
92 | if (spawned_) { |
| 225 | 92 | tasks_read_.Terminate(); | |
| 226 | 92 | tasks_chunk_.Terminate(); | |
| 227 | 92 | tasks_hash_.Terminate(); | |
| 228 | 92 | tasks_scrubbing_callback_.Terminate(); | |
| 229 | } | ||
| 230 | 184 | } | |
| 231 | |||
| 232 | |||
| 233 | 46 | void ScrubbingPipeline::OnFileProcessed( | |
| 234 | const ScrubbingResult &scrubbing_result) { | ||
| 235 | 46 | NotifyListeners(scrubbing_result); | |
| 236 | 46 | } | |
| 237 | |||
| 238 | |||
| 239 | 46 | void ScrubbingPipeline::Process(IngestionSource *source, | |
| 240 | shash::Algorithms hash_algorithm, | ||
| 241 | shash::Suffix hash_suffix) { | ||
| 242 | FileItem *file_item = new FileItem(source, 0, 0, 0, zlib::kNoCompression, | ||
| 243 | hash_algorithm, hash_suffix, | ||
| 244 | false, /* may_have_chunks */ | ||
| 245 |
1/2✓ Branch 2 taken 46 times.
✗ Branch 3 not taken.
|
46 | true /* hash_legacy_bulk_chunk */); |
| 246 | 46 | tube_counter_.EnqueueBack(file_item); | |
| 247 | 46 | tube_input_.EnqueueBack(file_item); | |
| 248 | 46 | } | |
| 249 | |||
| 250 | |||
| 251 | 46 | void ScrubbingPipeline::Spawn() { | |
| 252 | 46 | tasks_scrubbing_callback_.Spawn(); | |
| 253 | 46 | tasks_hash_.Spawn(); | |
| 254 | 46 | tasks_chunk_.Spawn(); | |
| 255 | 46 | tasks_read_.Spawn(); | |
| 256 | 46 | spawned_ = true; | |
| 257 | 46 | } | |
| 258 | |||
| 259 | |||
| 260 | 46 | void ScrubbingPipeline::WaitFor() { tube_counter_.Wait(); } | |
| 261 |