GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/ingestion/pipeline.h
Date: 2024-04-21 02:33:16
Exec Total Coverage
Lines: 7 7 100.0%
Branches: 1 2 50.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_INGESTION_PIPELINE_H_
6 #define CVMFS_INGESTION_PIPELINE_H_
7
8 #include <string>
9
10 #include "compression.h"
11 #include "crypto/hash.h"
12 #include "ingestion/item.h"
13 #include "ingestion/item_mem.h"
14 #include "ingestion/task.h"
15 #include "upload_spooler_result.h"
16 #include "util/concurrency.h"
17 #include "util/tube.h"
18
19 namespace upload {
20 class AbstractUploader;
21 struct SpoolerDefinition;
22 }
23
24 class IngestionPipeline : public Observable<upload::SpoolerResult> {
25 public:
26 explicit IngestionPipeline(
27 upload::AbstractUploader *uploader,
28 const upload::SpoolerDefinition &spooler_definition);
29 ~IngestionPipeline();
30
31 void Spawn();
32 void Process(IngestionSource* source, bool allow_chunking,
33 shash::Suffix hash_suffix = shash::kSuffixNone);
34 void WaitFor();
35
36 void OnFileProcessed(const upload::SpoolerResult &spooler_result);
37
38 private:
39 static const uint64_t kMaxPipelineMem; // 1G
40 static const unsigned kMaxFilesInFlight = 8000;
41 static const unsigned kNforkRegister = 1;
42 static const unsigned kNforkWrite = 1;
43 static const unsigned kNforkHash = 2;
44 static const unsigned kNforkCompress = 4;
45 static const unsigned kNforkChunk = 1;
46 static const unsigned kNforkRead = 8;
47
48 const zlib::Algorithms compression_algorithm_;
49 const shash::Algorithms hash_algorithm_;
50 const bool generate_legacy_bulk_chunks_;
51 const bool chunking_enabled_;
52 const size_t minimal_chunk_size_;
53 const size_t average_chunk_size_;
54 const size_t maximal_chunk_size_;
55
56 bool spawned_;
57 upload::AbstractUploader *uploader_;
58 // TODO(jblomer): a semaphore would be faster!
59 // We need to have two in-flight counters: the pre-counter decreases
60 // before the final NotifyListeners() call, so that the callback can schedule
61 // a new job into the pipeline. This happens when finished child catalogs
62 // trigger uploading the parent catalog.
63 // The pre-counter sets the kMaxFilesInFlight limit.
64 Tube<FileItem> tube_ctr_inflight_pre_;
65 // The post counter is set after the final callback. It is used to wait
66 // for the pipeline to finish.
67 Tube<FileItem> tube_ctr_inflight_post_;
68
69 Tube<FileItem> tube_input_;
70
71 TubeConsumerGroup<FileItem> tasks_read_;
72
73 TubeGroup<BlockItem> tubes_chunk_;
74 TubeConsumerGroup<BlockItem> tasks_chunk_;
75
76 TubeGroup<BlockItem> tubes_compress_;
77 TubeConsumerGroup<BlockItem> tasks_compress_;
78
79 TubeGroup<BlockItem> tubes_hash_;
80 TubeConsumerGroup<BlockItem> tasks_hash_;
81
82 TubeGroup<BlockItem> tubes_write_;
83 TubeConsumerGroup<BlockItem> tasks_write_;
84
85 TubeGroup<FileItem> tubes_register_;
86 TubeConsumerGroup<FileItem> tasks_register_;
87
88 ItemAllocator item_allocator_;
89 }; // class IngestionPipeline
90
91
92 struct ScrubbingResult {
93
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 ScrubbingResult() { }
94 1 ScrubbingResult(const std::string &p, const shash::Any &h)
95 1 : path(p), hash(h) { }
96 std::string path;
97 shash::Any hash;
98 };
99
100
101 class TaskScrubbingCallback
102 : public TubeConsumer<BlockItem>
103 , public Observable<ScrubbingResult>
104 {
105 public:
106 8 TaskScrubbingCallback(Tube<BlockItem> *tube_in,
107 Tube<FileItem> *tube_counter)
108 8 : TubeConsumer<BlockItem>(tube_in)
109 8 , tube_counter_(tube_counter)
110 8 { }
111
112 protected:
113 virtual void Process(BlockItem *block_item);
114
115 private:
116 Tube<FileItem> *tube_counter_;
117 };
118
119
120 class ScrubbingPipeline : public Observable<ScrubbingResult> {
121 public:
122 ScrubbingPipeline();
123 ~ScrubbingPipeline();
124
125 void Spawn();
126 void Process(IngestionSource* source,
127 shash::Algorithms hash_algorithm,
128 shash::Suffix hash_suffix);
129 void WaitFor();
130
131 void OnFileProcessed(const ScrubbingResult &scrubbing_result);
132
133 private:
134 static const uint64_t kMemLowWatermark = 384 * 1024 * 1024;
135 static const uint64_t kMemHighWatermark = 512 * 1024 * 1024;
136 static const unsigned kMaxFilesInFlight = 8000;
137 static const unsigned kNforkScrubbingCallback = 1;
138 static const unsigned kNforkHash = 2;
139 static const unsigned kNforkChunk = 1;
140 static const unsigned kNforkRead = 8;
141
142 bool spawned_;
143 Tube<FileItem> tube_input_;
144 // TODO(jblomer): a semaphore would be faster!
145 Tube<FileItem> tube_counter_;
146
147 TubeConsumerGroup<FileItem> tasks_read_;
148
149 TubeGroup<BlockItem> tubes_chunk_;
150 TubeConsumerGroup<BlockItem> tasks_chunk_;
151
152 TubeGroup<BlockItem> tubes_hash_;
153 TubeConsumerGroup<BlockItem> tasks_hash_;
154
155 TubeGroup<BlockItem> tubes_scrubbing_callback_;
156 TubeConsumerGroup<BlockItem> tasks_scrubbing_callback_;
157
158 ItemAllocator item_allocator_;
159 };
160
161 #endif // CVMFS_INGESTION_PIPELINE_H_
162