GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/ingestion/pipeline.h
Date: 2025-06-22 02:36:02
Exec Total Coverage
Lines: 5 5 100.0%
Branches: 1 2 50.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_INGESTION_PIPELINE_H_
6 #define CVMFS_INGESTION_PIPELINE_H_
7
8 #include <string>
9
10 #include "compression/compression.h"
11 #include "crypto/hash.h"
12 #include "ingestion/item.h"
13 #include "ingestion/item_mem.h"
14 #include "ingestion/task.h"
15 #include "upload_spooler_result.h"
16 #include "util/concurrency.h"
17 #include "util/tube.h"
18
19 namespace upload {
20 class AbstractUploader;
21 struct SpoolerDefinition;
22 } // namespace upload
23
24 class IngestionPipeline : public Observable<upload::SpoolerResult> {
25 public:
26 explicit IngestionPipeline(
27 upload::AbstractUploader *uploader,
28 const upload::SpoolerDefinition &spooler_definition);
29 ~IngestionPipeline();
30
31 void Spawn();
32 void Process(IngestionSource *source, bool allow_chunking,
33 shash::Suffix hash_suffix = shash::kSuffixNone);
34 void WaitFor();
35
36 void OnFileProcessed(const upload::SpoolerResult &spooler_result);
37
38 private:
39 static const uint64_t kMaxPipelineMem; // 1G
40 static const unsigned kMaxFilesInFlight = 8000;
41 static const unsigned kNforkRegister = 1;
42 static const unsigned kNforkWrite = 1;
43 static const unsigned kNforkHash = 2;
44 static const unsigned kNforkCompress = 4;
45 static const unsigned kNforkChunk = 1;
46 static const unsigned kNforkRead = 8;
47
48 const zlib::Algorithms compression_algorithm_;
49 const shash::Algorithms hash_algorithm_;
50 const bool generate_legacy_bulk_chunks_;
51 const bool chunking_enabled_;
52 const size_t minimal_chunk_size_;
53 const size_t average_chunk_size_;
54 const size_t maximal_chunk_size_;
55
56 bool spawned_;
57 upload::AbstractUploader *uploader_;
58 // TODO(jblomer): a semaphore would be faster!
59 // We need to have two in-flight counters: the pre-counter decreases
60 // before the final NotifyListeners() call, so that the callback can schedule
61 // a new job into the pipeline. This happens when finished child catalogs
62 // trigger uploading the parent catalog.
63 // The pre-counter sets the kMaxFilesInFlight limit.
64 Tube<FileItem> tube_ctr_inflight_pre_;
65 // The post counter is set after the final callback. It is used to wait
66 // for the pipeline to finish.
67 Tube<FileItem> tube_ctr_inflight_post_;
68
69 Tube<FileItem> tube_input_;
70
71 TubeConsumerGroup<FileItem> tasks_read_;
72
73 TubeGroup<BlockItem> tubes_chunk_;
74 TubeConsumerGroup<BlockItem> tasks_chunk_;
75
76 TubeGroup<BlockItem> tubes_compress_;
77 TubeConsumerGroup<BlockItem> tasks_compress_;
78
79 TubeGroup<BlockItem> tubes_hash_;
80 TubeConsumerGroup<BlockItem> tasks_hash_;
81
82 TubeGroup<BlockItem> tubes_write_;
83 TubeConsumerGroup<BlockItem> tasks_write_;
84
85 TubeGroup<FileItem> tubes_register_;
86 TubeConsumerGroup<FileItem> tasks_register_;
87
88 ItemAllocator item_allocator_;
89 }; // class IngestionPipeline
90
91
92 struct ScrubbingResult {
93
1/2
✓ Branch 2 taken 14 times.
✗ Branch 3 not taken.
14 ScrubbingResult() { }
94 14 ScrubbingResult(const std::string &p, const shash::Any &h)
95 14 : path(p), hash(h) { }
96 std::string path;
97 shash::Any hash;
98 };
99
100
101 class TaskScrubbingCallback : public TubeConsumer<BlockItem>,
102 public Observable<ScrubbingResult> {
103 public:
104 112 TaskScrubbingCallback(Tube<BlockItem> *tube_in, Tube<FileItem> *tube_counter)
105 112 : TubeConsumer<BlockItem>(tube_in), tube_counter_(tube_counter) { }
106
107 protected:
108 virtual void Process(BlockItem *block_item);
109
110 private:
111 Tube<FileItem> *tube_counter_;
112 };
113
114
115 class ScrubbingPipeline : public Observable<ScrubbingResult> {
116 public:
117 ScrubbingPipeline();
118 ~ScrubbingPipeline();
119
120 void Spawn();
121 void Process(IngestionSource *source,
122 shash::Algorithms hash_algorithm,
123 shash::Suffix hash_suffix);
124 void WaitFor();
125
126 void OnFileProcessed(const ScrubbingResult &scrubbing_result);
127
128 private:
129 static const uint64_t kMemLowWatermark = 384 * 1024 * 1024;
130 static const uint64_t kMemHighWatermark = 512 * 1024 * 1024;
131 static const unsigned kMaxFilesInFlight = 8000;
132 static const unsigned kNforkScrubbingCallback = 1;
133 static const unsigned kNforkHash = 2;
134 static const unsigned kNforkChunk = 1;
135 static const unsigned kNforkRead = 8;
136
137 bool spawned_;
138 Tube<FileItem> tube_input_;
139 // TODO(jblomer): a semaphore would be faster!
140 Tube<FileItem> tube_counter_;
141
142 TubeConsumerGroup<FileItem> tasks_read_;
143
144 TubeGroup<BlockItem> tubes_chunk_;
145 TubeConsumerGroup<BlockItem> tasks_chunk_;
146
147 TubeGroup<BlockItem> tubes_hash_;
148 TubeConsumerGroup<BlockItem> tasks_hash_;
149
150 TubeGroup<BlockItem> tubes_scrubbing_callback_;
151 TubeConsumerGroup<BlockItem> tasks_scrubbing_callback_;
152
153 ItemAllocator item_allocator_;
154 };
155
156
157 struct CompressHashResult {
158 CompressHashResult() { }
159 CompressHashResult(const std::string &p, const shash::Any &h)
160 : path(p), hash(h) { }
161 std::string path;
162 shash::Any hash;
163 };
164
165
166 class TaskCompressHashCallback
167 : public TubeConsumer<BlockItem>
168 , public Observable<CompressHashResult>
169 {
170 public:
171 TaskCompressHashCallback(Tube<BlockItem> *tube_in,
172 Tube<FileItem> *tube_counter)
173 : TubeConsumer<BlockItem>(tube_in)
174 , tube_counter_(tube_counter)
175 { }
176
177 protected:
178 virtual void Process(BlockItem *block_item);
179
180 private:
181 Tube<FileItem> *tube_counter_;
182 };
183
184
185 class CompressHashPipeline : public Observable<CompressHashResult> {
186 public:
187 CompressHashPipeline();
188 ~CompressHashPipeline();
189
190 void Spawn();
191 void Process(IngestionSource* source,
192 shash::Algorithms hash_algorithm,
193 shash::Suffix hash_suffix);
194 void WaitFor();
195
196 void OnFileProcessed(const CompressHashResult &compress_hash_result);
197
198 private:
199 static const uint64_t kMemLowWatermark = 64 * 1024 * 1024;
200 static const uint64_t kMemHighWatermark = 128 * 1024 * 1024;
201
202 bool spawned_;
203 Tube<FileItem> tube_input_;
204 Tube<FileItem> tube_counter_;
205
206 TubeConsumerGroup<FileItem> tasks_read_;
207
208 TubeGroup<BlockItem> tubes_chunk_;
209 TubeConsumerGroup<BlockItem> tasks_chunk_;
210
211 TubeGroup<BlockItem> tubes_compress_;
212 TubeConsumerGroup<BlockItem> tasks_compress_;
213
214 TubeGroup<BlockItem> tubes_hash_;
215 TubeConsumerGroup<BlockItem> tasks_hash_;
216
217 TubeGroup<BlockItem> tubes_compress_hash_callback_;
218 TubeConsumerGroup<BlockItem> tasks_compress_hash_callback_;
219
220 ItemAllocator item_allocator_;
221 };
222
223 #endif // CVMFS_INGESTION_PIPELINE_H_
224