GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/ingestion/pipeline.h
Date: 2025-07-13 02:35:07
Exec Total Coverage
Lines: 5 5 100.0%
Branches: 1 2 50.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_INGESTION_PIPELINE_H_
6 #define CVMFS_INGESTION_PIPELINE_H_
7
8 #include <string>
9
10 #include "compression/compression.h"
11 #include "crypto/hash.h"
12 #include "ingestion/item.h"
13 #include "ingestion/item_mem.h"
14 #include "ingestion/task.h"
15 #include "upload_spooler_result.h"
16 #include "util/concurrency.h"
17 #include "util/tube.h"
18
19 namespace upload {
20 class AbstractUploader;
21 struct SpoolerDefinition;
22 } // namespace upload
23
24 class IngestionPipeline : public Observable<upload::SpoolerResult> {
25 public:
26 explicit IngestionPipeline(
27 upload::AbstractUploader *uploader,
28 const upload::SpoolerDefinition &spooler_definition);
29 ~IngestionPipeline();
30
31 void Spawn();
32 void Process(IngestionSource *source, bool allow_chunking,
33 shash::Suffix hash_suffix = shash::kSuffixNone);
34 void WaitFor();
35
36 void OnFileProcessed(const upload::SpoolerResult &spooler_result);
37
38 private:
39 static const uint64_t kMaxPipelineMem; // 1G
40 static const unsigned kMaxFilesInFlight = 8000;
41 static const unsigned kNforkRegister = 1;
42 static const unsigned kNforkWrite = 1;
43 static const unsigned kNforkHash = 2;
44 static const unsigned kNforkCompress = 4;
45 static const unsigned kNforkChunk = 1;
46 static const unsigned kNforkRead = 8;
47
48 const zlib::Algorithms compression_algorithm_;
49 const shash::Algorithms hash_algorithm_;
50 const bool generate_legacy_bulk_chunks_;
51 const bool chunking_enabled_;
52 const size_t minimal_chunk_size_;
53 const size_t average_chunk_size_;
54 const size_t maximal_chunk_size_;
55
56 bool spawned_;
57 upload::AbstractUploader *uploader_;
58 // TODO(jblomer): a semaphore would be faster!
59 // We need to have two in-flight counters: the pre-counter decreases
60 // before the final NotifyListeners() call, so that the callback can schedule
61 // a new job into the pipeline. This happens when finished child catalogs
62 // trigger uploading the parent catalog.
63 // The pre-counter sets the kMaxFilesInFlight limit.
64 Tube<FileItem> tube_ctr_inflight_pre_;
65 // The post counter is set after the final callback. It is used to wait
66 // for the pipeline to finish.
67 Tube<FileItem> tube_ctr_inflight_post_;
68
69 Tube<FileItem> tube_input_;
70
71 TubeConsumerGroup<FileItem> tasks_read_;
72
73 TubeGroup<BlockItem> tubes_chunk_;
74 TubeConsumerGroup<BlockItem> tasks_chunk_;
75
76 TubeGroup<BlockItem> tubes_compress_;
77 TubeConsumerGroup<BlockItem> tasks_compress_;
78
79 TubeGroup<BlockItem> tubes_hash_;
80 TubeConsumerGroup<BlockItem> tasks_hash_;
81
82 TubeGroup<BlockItem> tubes_write_;
83 TubeConsumerGroup<BlockItem> tasks_write_;
84
85 TubeGroup<FileItem> tubes_register_;
86 TubeConsumerGroup<FileItem> tasks_register_;
87
88 ItemAllocator item_allocator_;
89 }; // class IngestionPipeline
90
91
92 struct ScrubbingResult {
93
1/2
✓ Branch 2 taken 7 times.
✗ Branch 3 not taken.
7 ScrubbingResult() { }
94 7 ScrubbingResult(const std::string &p, const shash::Any &h)
95 7 : path(p), hash(h) { }
96 std::string path;
97 shash::Any hash;
98 };
99
100
101 class TaskScrubbingCallback : public TubeConsumer<BlockItem>,
102 public Observable<ScrubbingResult> {
103 public:
104 56 TaskScrubbingCallback(Tube<BlockItem> *tube_in, Tube<FileItem> *tube_counter)
105 56 : TubeConsumer<BlockItem>(tube_in), tube_counter_(tube_counter) { }
106
107 protected:
108 virtual void Process(BlockItem *block_item);
109
110 private:
111 Tube<FileItem> *tube_counter_;
112 };
113
114
115 class ScrubbingPipeline : public Observable<ScrubbingResult> {
116 public:
117 ScrubbingPipeline();
118 ~ScrubbingPipeline();
119
120 void Spawn();
121 void Process(IngestionSource *source,
122 shash::Algorithms hash_algorithm,
123 shash::Suffix hash_suffix);
124 void WaitFor();
125
126 void OnFileProcessed(const ScrubbingResult &scrubbing_result);
127
128 private:
129 static const uint64_t kMemLowWatermark = 384 * 1024 * 1024;
130 static const uint64_t kMemHighWatermark = 512 * 1024 * 1024;
131 static const unsigned kMaxFilesInFlight = 8000;
132 static const unsigned kNforkScrubbingCallback = 1;
133 static const unsigned kNforkHash = 2;
134 static const unsigned kNforkChunk = 1;
135 static const unsigned kNforkRead = 8;
136
137 bool spawned_;
138 Tube<FileItem> tube_input_;
139 // TODO(jblomer): a semaphore would be faster!
140 Tube<FileItem> tube_counter_;
141
142 TubeConsumerGroup<FileItem> tasks_read_;
143
144 TubeGroup<BlockItem> tubes_chunk_;
145 TubeConsumerGroup<BlockItem> tasks_chunk_;
146
147 TubeGroup<BlockItem> tubes_hash_;
148 TubeConsumerGroup<BlockItem> tasks_hash_;
149
150 TubeGroup<BlockItem> tubes_scrubbing_callback_;
151 TubeConsumerGroup<BlockItem> tasks_scrubbing_callback_;
152
153 ItemAllocator item_allocator_;
154 };
155
156
157 struct CompressHashResult {
158 CompressHashResult() { }
159 CompressHashResult(const std::string &p, const shash::Any &h)
160 : path(p), hash(h) { }
161 std::string path;
162 shash::Any hash;
163 };
164
165
166 class TaskCompressHashCallback : public TubeConsumer<BlockItem>,
167 public Observable<CompressHashResult> {
168 public:
169 TaskCompressHashCallback(Tube<BlockItem> *tube_in,
170 Tube<FileItem> *tube_counter)
171 : TubeConsumer<BlockItem>(tube_in), tube_counter_(tube_counter) { }
172
173 protected:
174 virtual void Process(BlockItem *block_item);
175
176 private:
177 Tube<FileItem> *tube_counter_;
178 };
179
180
181 class CompressHashPipeline : public Observable<CompressHashResult> {
182 public:
183 CompressHashPipeline();
184 ~CompressHashPipeline();
185
186 void Spawn();
187 void Process(IngestionSource *source,
188 shash::Algorithms hash_algorithm,
189 shash::Suffix hash_suffix);
190 void WaitFor();
191
192 void OnFileProcessed(const CompressHashResult &compress_hash_result);
193
194 private:
195 static const uint64_t kMemLowWatermark = 64 * 1024 * 1024;
196 static const uint64_t kMemHighWatermark = 128 * 1024 * 1024;
197
198 bool spawned_;
199 Tube<FileItem> tube_input_;
200 Tube<FileItem> tube_counter_;
201
202 TubeConsumerGroup<FileItem> tasks_read_;
203
204 TubeGroup<BlockItem> tubes_chunk_;
205 TubeConsumerGroup<BlockItem> tasks_chunk_;
206
207 TubeGroup<BlockItem> tubes_compress_;
208 TubeConsumerGroup<BlockItem> tasks_compress_;
209
210 TubeGroup<BlockItem> tubes_hash_;
211 TubeConsumerGroup<BlockItem> tasks_hash_;
212
213 TubeGroup<BlockItem> tubes_compress_hash_callback_;
214 TubeConsumerGroup<BlockItem> tasks_compress_hash_callback_;
215
216 ItemAllocator item_allocator_;
217 };
218
219 #endif // CVMFS_INGESTION_PIPELINE_H_
220