| Directory: | cvmfs/ |
|---|---|
| File: | cvmfs/ingestion/pipeline.cc |
| Date: | 2025-11-02 02:35:35 |
| Exec | Total | Coverage | |
|---|---|---|---|
| Lines: | 154 | 160 | 96.2% |
| Branches: | 107 | 199 | 53.8% |
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | /** | ||
| 2 | * This file is part of the CernVM File System. | ||
| 3 | */ | ||
| 4 | |||
| 5 | |||
| 6 | #include "pipeline.h" | ||
| 7 | |||
| 8 | #include <algorithm> | ||
| 9 | #include <cstdlib> | ||
| 10 | |||
| 11 | #include "ingestion/task_chunk.h" | ||
| 12 | #include "ingestion/task_compress.h" | ||
| 13 | #include "ingestion/task_hash.h" | ||
| 14 | #include "ingestion/task_read.h" | ||
| 15 | #include "ingestion/task_register.h" | ||
| 16 | #include "ingestion/task_write.h" | ||
| 17 | #include "upload_facility.h" | ||
| 18 | #include "upload_spooler_definition.h" | ||
| 19 | #include "util/concurrency.h" | ||
| 20 | #include "util/exception.h" | ||
| 21 | #include "util/platform.h" | ||
| 22 | #include "util/string.h" | ||
| 23 | |||
| 24 | const uint64_t IngestionPipeline::kMaxPipelineMem = 1024 * 1024 * 1024; | ||
| 25 | |||
| 26 | 1343 | IngestionPipeline::IngestionPipeline( | |
| 27 | upload::AbstractUploader *uploader, | ||
| 28 | 1343 | const upload::SpoolerDefinition &spooler_definition) | |
| 29 | 1343 | : compression_algorithm_(spooler_definition.compression_alg) | |
| 30 | 1343 | , hash_algorithm_(spooler_definition.hash_algorithm) | |
| 31 | 1343 | , generate_legacy_bulk_chunks_( | |
| 32 | 1343 | spooler_definition.generate_legacy_bulk_chunks) | |
| 33 | 1343 | , chunking_enabled_(spooler_definition.use_file_chunking) | |
| 34 | 1343 | , minimal_chunk_size_(spooler_definition.min_file_chunk_size) | |
| 35 | 1343 | , average_chunk_size_(spooler_definition.avg_file_chunk_size) | |
| 36 | 1343 | , maximal_chunk_size_(spooler_definition.max_file_chunk_size) | |
| 37 | 1343 | , spawned_(false) | |
| 38 | 1343 | , uploader_(uploader) | |
| 39 |
4/8✓ Branch 2 taken 1343 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1343 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 1343 times.
✗ Branch 9 not taken.
✓ Branch 22 taken 1343 times.
✗ Branch 23 not taken.
|
1343 | , tube_ctr_inflight_pre_(kMaxFilesInFlight) { |
| 40 |
1/2✓ Branch 1 taken 1343 times.
✗ Branch 2 not taken.
|
1343 | const unsigned nfork_base = std::max(1U, GetNumberOfCpuCores() / 8); |
| 41 | |||
| 42 |
2/2✓ Branch 0 taken 10744 times.
✓ Branch 1 taken 1343 times.
|
12087 | for (unsigned i = 0; i < nfork_base * kNforkRegister; ++i) { |
| 43 |
2/4✓ Branch 1 taken 10744 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 10744 times.
✗ Branch 5 not taken.
|
10744 | Tube<FileItem> *tube = new Tube<FileItem>(); |
| 44 |
1/2✓ Branch 1 taken 10744 times.
✗ Branch 2 not taken.
|
10744 | tubes_register_.TakeTube(tube); |
| 45 | TaskRegister *task = new TaskRegister(tube, &tube_ctr_inflight_pre_, | ||
| 46 |
2/4✓ Branch 1 taken 10744 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 10744 times.
✗ Branch 5 not taken.
|
10744 | &tube_ctr_inflight_post_); |
| 47 |
1/2✓ Branch 1 taken 10744 times.
✗ Branch 2 not taken.
|
10744 | task->RegisterListener(&IngestionPipeline::OnFileProcessed, this); |
| 48 |
1/2✓ Branch 1 taken 10744 times.
✗ Branch 2 not taken.
|
10744 | tasks_register_.TakeConsumer(task); |
| 49 | } | ||
| 50 | 1343 | tubes_register_.Activate(); | |
| 51 | |||
| 52 |
2/2✓ Branch 0 taken 10744 times.
✓ Branch 1 taken 1343 times.
|
12087 | for (unsigned i = 0; i < nfork_base * kNforkWrite; ++i) { |
| 53 |
2/4✓ Branch 1 taken 10744 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 10744 times.
✗ Branch 5 not taken.
|
10744 | Tube<BlockItem> *t = new Tube<BlockItem>(); |
| 54 |
1/2✓ Branch 1 taken 10744 times.
✗ Branch 2 not taken.
|
10744 | tubes_write_.TakeTube(t); |
| 55 |
3/6✓ Branch 1 taken 10744 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 10744 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 10744 times.
✗ Branch 8 not taken.
|
10744 | tasks_write_.TakeConsumer(new TaskWrite(t, &tubes_register_, uploader_)); |
| 56 | } | ||
| 57 | 1343 | tubes_write_.Activate(); | |
| 58 | |||
| 59 |
2/2✓ Branch 0 taken 21488 times.
✓ Branch 1 taken 1343 times.
|
22831 | for (unsigned i = 0; i < nfork_base * kNforkHash; ++i) { |
| 60 |
2/4✓ Branch 1 taken 21488 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 21488 times.
✗ Branch 5 not taken.
|
21488 | Tube<BlockItem> *t = new Tube<BlockItem>(); |
| 61 |
1/2✓ Branch 1 taken 21488 times.
✗ Branch 2 not taken.
|
21488 | tubes_hash_.TakeTube(t); |
| 62 |
3/6✓ Branch 1 taken 21488 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 21488 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 21488 times.
✗ Branch 8 not taken.
|
21488 | tasks_hash_.TakeConsumer(new TaskHash(t, &tubes_write_)); |
| 63 | } | ||
| 64 | 1343 | tubes_hash_.Activate(); | |
| 65 | |||
| 66 |
2/2✓ Branch 0 taken 42976 times.
✓ Branch 1 taken 1343 times.
|
44319 | for (unsigned i = 0; i < nfork_base * kNforkCompress; ++i) { |
| 67 |
2/4✓ Branch 1 taken 42976 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 42976 times.
✗ Branch 5 not taken.
|
42976 | Tube<BlockItem> *t = new Tube<BlockItem>(); |
| 68 |
1/2✓ Branch 1 taken 42976 times.
✗ Branch 2 not taken.
|
42976 | tubes_compress_.TakeTube(t); |
| 69 |
1/2✓ Branch 1 taken 42976 times.
✗ Branch 2 not taken.
|
42976 | tasks_compress_.TakeConsumer( |
| 70 |
2/4✓ Branch 1 taken 42976 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 42976 times.
✗ Branch 5 not taken.
|
42976 | new TaskCompress(t, &tubes_hash_, &item_allocator_)); |
| 71 | } | ||
| 72 | 1343 | tubes_compress_.Activate(); | |
| 73 | |||
| 74 |
2/2✓ Branch 0 taken 10744 times.
✓ Branch 1 taken 1343 times.
|
12087 | for (unsigned i = 0; i < nfork_base * kNforkChunk; ++i) { |
| 75 |
2/4✓ Branch 1 taken 10744 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 10744 times.
✗ Branch 5 not taken.
|
10744 | Tube<BlockItem> *t = new Tube<BlockItem>(); |
| 76 |
1/2✓ Branch 1 taken 10744 times.
✗ Branch 2 not taken.
|
10744 | tubes_chunk_.TakeTube(t); |
| 77 |
1/2✓ Branch 1 taken 10744 times.
✗ Branch 2 not taken.
|
10744 | tasks_chunk_.TakeConsumer( |
| 78 |
2/4✓ Branch 1 taken 10744 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 10744 times.
✗ Branch 5 not taken.
|
10744 | new TaskChunk(t, &tubes_compress_, &item_allocator_)); |
| 79 | } | ||
| 80 | 1343 | tubes_chunk_.Activate(); | |
| 81 | |||
| 82 | 1343 | uint64_t high = kMaxPipelineMem; | |
| 83 | 1343 | high = std::min(high, platform_memsize() / 5); | |
| 84 | 1343 | char *fixed_limit_mb = getenv("_CVMFS_SERVER_PIPELINE_MB"); | |
| 85 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1343 times.
|
1343 | if (fixed_limit_mb != NULL) { |
| 86 | ✗ | high = String2Uint64(fixed_limit_mb) * 1024 * 1024; | |
| 87 | } | ||
| 88 | 1343 | const uint64_t low = (high * 2) / 3; | |
| 89 |
1/2✓ Branch 1 taken 1343 times.
✗ Branch 2 not taken.
|
1343 | LogCvmfs(kLogCvmfs, kLogDebug, |
| 90 | "pipeline memory thresholds %" PRIu64 "/%" PRIu64 " M", | ||
| 91 | low / (1024 * 1024), high / (1024 * 1024)); | ||
| 92 |
2/2✓ Branch 0 taken 85952 times.
✓ Branch 1 taken 1343 times.
|
87295 | for (unsigned i = 0; i < nfork_base * kNforkRead; ++i) { |
| 93 | TaskRead *task_read = new TaskRead(&tube_input_, &tubes_chunk_, | ||
| 94 |
2/4✓ Branch 1 taken 85952 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 85952 times.
✗ Branch 5 not taken.
|
85952 | &item_allocator_); |
| 95 |
1/2✓ Branch 1 taken 85952 times.
✗ Branch 2 not taken.
|
85952 | task_read->SetWatermarks(low, high); |
| 96 |
1/2✓ Branch 1 taken 85952 times.
✗ Branch 2 not taken.
|
85952 | tasks_read_.TakeConsumer(task_read); |
| 97 | } | ||
| 98 | 1343 | } | |
| 99 | |||
| 100 | |||
| 101 | 4382 | IngestionPipeline::~IngestionPipeline() { | |
| 102 |
1/2✓ Branch 0 taken 1342 times.
✗ Branch 1 not taken.
|
2684 | if (spawned_) { |
| 103 | 2684 | tasks_read_.Terminate(); | |
| 104 | 2684 | tasks_chunk_.Terminate(); | |
| 105 | 2684 | tasks_compress_.Terminate(); | |
| 106 | 2684 | tasks_hash_.Terminate(); | |
| 107 | 2684 | tasks_write_.Terminate(); | |
| 108 | 2684 | tasks_register_.Terminate(); | |
| 109 | } | ||
| 110 | 4382 | } | |
| 111 | |||
| 112 | |||
| 113 | 7250595 | void IngestionPipeline::OnFileProcessed( | |
| 114 | const upload::SpoolerResult &spooler_result) { | ||
| 115 | 7250595 | NotifyListeners(spooler_result); | |
| 116 | 7251233 | } | |
| 117 | |||
| 118 | |||
| 119 | 7252683 | void IngestionPipeline::Process(IngestionSource *source, | |
| 120 | bool allow_chunking, | ||
| 121 | shash::Suffix hash_suffix) { | ||
| 122 | FileItem *file_item = new FileItem(source, | ||
| 123 | 7252683 | minimal_chunk_size_, | |
| 124 | 7252683 | average_chunk_size_, | |
| 125 | 7252683 | maximal_chunk_size_, | |
| 126 | 7252683 | compression_algorithm_, | |
| 127 | 7252683 | hash_algorithm_, | |
| 128 | hash_suffix, | ||
| 129 |
1/2✓ Branch 0 taken 7250687 times.
✗ Branch 1 not taken.
|
7252683 | allow_chunking && chunking_enabled_, |
| 130 |
3/4✓ Branch 1 taken 7250687 times.
✓ Branch 2 taken 1996 times.
✓ Branch 4 taken 7252683 times.
✗ Branch 5 not taken.
|
14505366 | generate_legacy_bulk_chunks_); |
| 131 | 7252683 | tube_ctr_inflight_post_.EnqueueBack(file_item); | |
| 132 | 7252683 | tube_ctr_inflight_pre_.EnqueueBack(file_item); | |
| 133 | 7252683 | tube_input_.EnqueueBack(file_item); | |
| 134 | 7252683 | } | |
| 135 | |||
| 136 | |||
| 137 | 1343 | void IngestionPipeline::Spawn() { | |
| 138 | 1343 | tasks_register_.Spawn(); | |
| 139 | 1343 | tasks_write_.Spawn(); | |
| 140 | 1343 | tasks_hash_.Spawn(); | |
| 141 | 1343 | tasks_compress_.Spawn(); | |
| 142 | 1343 | tasks_chunk_.Spawn(); | |
| 143 | 1343 | tasks_read_.Spawn(); | |
| 144 | 1343 | spawned_ = true; | |
| 145 | 1343 | } | |
| 146 | |||
| 147 | |||
| 148 | 2039 | void IngestionPipeline::WaitFor() { tube_ctr_inflight_post_.Wait(); } | |
| 149 | |||
| 150 | |||
| 151 | //------------------------------------------------------------------------------ | ||
| 152 | |||
| 153 | |||
| 154 | 10 | void TaskScrubbingCallback::Process(BlockItem *block_item) { | |
| 155 | 10 | FileItem *file_item = block_item->file_item(); | |
| 156 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
|
10 | assert(file_item != NULL); |
| 157 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 10 times.
|
10 | assert(!file_item->path().empty()); |
| 158 | 10 | ChunkItem *chunk_item = block_item->chunk_item(); | |
| 159 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
|
10 | assert(chunk_item != NULL); |
| 160 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 10 times.
|
10 | assert(chunk_item->is_bulk_chunk()); |
| 161 | |||
| 162 |
1/3✗ Branch 1 not taken.
✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
|
10 | switch (block_item->type()) { |
| 163 | ✗ | case BlockItem::kBlockData: | |
| 164 | ✗ | delete block_item; | |
| 165 | ✗ | break; | |
| 166 | |||
| 167 | 10 | case BlockItem::kBlockStop: | |
| 168 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 10 times.
|
10 | assert(!chunk_item->hash_ptr()->IsNull()); |
| 169 |
1/2✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
|
10 | NotifyListeners( |
| 170 |
1/2✓ Branch 3 taken 10 times.
✗ Branch 4 not taken.
|
20 | ScrubbingResult(file_item->path(), *chunk_item->hash_ptr())); |
| 171 |
1/2✓ Branch 0 taken 10 times.
✗ Branch 1 not taken.
|
10 | delete block_item; |
| 172 |
1/2✓ Branch 0 taken 10 times.
✗ Branch 1 not taken.
|
10 | delete chunk_item; |
| 173 |
1/2✓ Branch 0 taken 10 times.
✗ Branch 1 not taken.
|
10 | delete file_item; |
| 174 | 10 | tube_counter_->PopFront(); | |
| 175 | 10 | break; | |
| 176 | |||
| 177 | ✗ | default: | |
| 178 | ✗ | PANIC(NULL); | |
| 179 | } | ||
| 180 | 10 | } | |
| 181 | |||
| 182 | |||
| 183 | //------------------------------------------------------------------------------ | ||
| 184 | |||
| 185 | |||
| 186 | 10 | ScrubbingPipeline::ScrubbingPipeline() | |
| 187 |
3/6✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 10 times.
✗ Branch 6 not taken.
✓ Branch 15 taken 10 times.
✗ Branch 16 not taken.
|
10 | : spawned_(false), tube_counter_(kMaxFilesInFlight) { |
| 188 |
1/2✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
|
10 | const unsigned nfork_base = std::max(1U, GetNumberOfCpuCores() / 8); |
| 189 | |||
| 190 |
2/2✓ Branch 0 taken 80 times.
✓ Branch 1 taken 10 times.
|
90 | for (unsigned i = 0; i < nfork_base * kNforkScrubbingCallback; ++i) { |
| 191 |
2/4✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 80 times.
✗ Branch 5 not taken.
|
80 | Tube<BlockItem> *tube = new Tube<BlockItem>(); |
| 192 |
1/2✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
|
80 | tubes_scrubbing_callback_.TakeTube(tube); |
| 193 | TaskScrubbingCallback *task = new TaskScrubbingCallback(tube, | ||
| 194 |
2/4✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 80 times.
✗ Branch 5 not taken.
|
80 | &tube_counter_); |
| 195 |
1/2✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
|
80 | task->RegisterListener(&ScrubbingPipeline::OnFileProcessed, this); |
| 196 |
1/2✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
|
80 | tasks_scrubbing_callback_.TakeConsumer(task); |
| 197 | } | ||
| 198 | 10 | tubes_scrubbing_callback_.Activate(); | |
| 199 | |||
| 200 |
2/2✓ Branch 0 taken 160 times.
✓ Branch 1 taken 10 times.
|
170 | for (unsigned i = 0; i < nfork_base * kNforkHash; ++i) { |
| 201 |
2/4✓ Branch 1 taken 160 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 160 times.
✗ Branch 5 not taken.
|
160 | Tube<BlockItem> *t = new Tube<BlockItem>(); |
| 202 |
1/2✓ Branch 1 taken 160 times.
✗ Branch 2 not taken.
|
160 | tubes_hash_.TakeTube(t); |
| 203 |
3/6✓ Branch 1 taken 160 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 160 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 160 times.
✗ Branch 8 not taken.
|
160 | tasks_hash_.TakeConsumer(new TaskHash(t, &tubes_scrubbing_callback_)); |
| 204 | } | ||
| 205 | 10 | tubes_hash_.Activate(); | |
| 206 | |||
| 207 |
2/2✓ Branch 0 taken 80 times.
✓ Branch 1 taken 10 times.
|
90 | for (unsigned i = 0; i < nfork_base * kNforkChunk; ++i) { |
| 208 |
2/4✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 80 times.
✗ Branch 5 not taken.
|
80 | Tube<BlockItem> *t = new Tube<BlockItem>(); |
| 209 |
1/2✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
|
80 | tubes_chunk_.TakeTube(t); |
| 210 |
3/6✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 80 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 80 times.
✗ Branch 8 not taken.
|
80 | tasks_chunk_.TakeConsumer(new TaskChunk(t, &tubes_hash_, &item_allocator_)); |
| 211 | } | ||
| 212 | 10 | tubes_chunk_.Activate(); | |
| 213 | |||
| 214 |
2/2✓ Branch 0 taken 640 times.
✓ Branch 1 taken 10 times.
|
650 | for (unsigned i = 0; i < nfork_base * kNforkRead; ++i) { |
| 215 | TaskRead *task_read = new TaskRead(&tube_input_, &tubes_chunk_, | ||
| 216 |
2/4✓ Branch 1 taken 640 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 640 times.
✗ Branch 5 not taken.
|
640 | &item_allocator_); |
| 217 |
1/2✓ Branch 1 taken 640 times.
✗ Branch 2 not taken.
|
640 | task_read->SetWatermarks(kMemLowWatermark, kMemHighWatermark); |
| 218 |
1/2✓ Branch 1 taken 640 times.
✗ Branch 2 not taken.
|
640 | tasks_read_.TakeConsumer(task_read); |
| 219 | } | ||
| 220 | 10 | } | |
| 221 | |||
| 222 | |||
| 223 | 40 | ScrubbingPipeline::~ScrubbingPipeline() { | |
| 224 |
1/2✓ Branch 0 taken 10 times.
✗ Branch 1 not taken.
|
20 | if (spawned_) { |
| 225 | 20 | tasks_read_.Terminate(); | |
| 226 | 20 | tasks_chunk_.Terminate(); | |
| 227 | 20 | tasks_hash_.Terminate(); | |
| 228 | 20 | tasks_scrubbing_callback_.Terminate(); | |
| 229 | } | ||
| 230 | 40 | } | |
| 231 | |||
| 232 | |||
| 233 | 10 | void ScrubbingPipeline::OnFileProcessed( | |
| 234 | const ScrubbingResult &scrubbing_result) { | ||
| 235 | 10 | NotifyListeners(scrubbing_result); | |
| 236 | 10 | } | |
| 237 | |||
| 238 | |||
| 239 | 10 | void ScrubbingPipeline::Process(IngestionSource *source, | |
| 240 | shash::Algorithms hash_algorithm, | ||
| 241 | shash::Suffix hash_suffix) { | ||
| 242 | FileItem *file_item = new FileItem(source, 0, 0, 0, zlib::kNoCompression, | ||
| 243 | hash_algorithm, hash_suffix, | ||
| 244 | false, /* may_have_chunks */ | ||
| 245 |
1/2✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
|
10 | true /* hash_legacy_bulk_chunk */); |
| 246 | 10 | tube_counter_.EnqueueBack(file_item); | |
| 247 | 10 | tube_input_.EnqueueBack(file_item); | |
| 248 | 10 | } | |
| 249 | |||
| 250 | |||
| 251 | 10 | void ScrubbingPipeline::Spawn() { | |
| 252 | 10 | tasks_scrubbing_callback_.Spawn(); | |
| 253 | 10 | tasks_hash_.Spawn(); | |
| 254 | 10 | tasks_chunk_.Spawn(); | |
| 255 | 10 | tasks_read_.Spawn(); | |
| 256 | 10 | spawned_ = true; | |
| 257 | 10 | } | |
| 258 | |||
| 259 | |||
| 260 | 10 | void ScrubbingPipeline::WaitFor() { tube_counter_.Wait(); } | |
| 261 |