CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
pipeline.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_INGESTION_PIPELINE_H_
6 #define CVMFS_INGESTION_PIPELINE_H_
7 
8 #include <string>
9 
11 #include "crypto/hash.h"
12 #include "ingestion/item.h"
13 #include "ingestion/item_mem.h"
14 #include "ingestion/task.h"
15 #include "upload_spooler_result.h"
16 #include "util/concurrency.h"
17 #include "util/tube.h"
18 
19 namespace upload {
20 class AbstractUploader;
21 struct SpoolerDefinition;
22 } // namespace upload
23 
24 class IngestionPipeline : public Observable<upload::SpoolerResult> {
25  public:
26  explicit IngestionPipeline(
27  upload::AbstractUploader *uploader,
28  const upload::SpoolerDefinition &spooler_definition);
30 
31  void Spawn();
32  void Process(IngestionSource *source, bool allow_chunking,
33  shash::Suffix hash_suffix = shash::kSuffixNone);
34  void WaitFor();
35 
36  void OnFileProcessed(const upload::SpoolerResult &spooler_result);
37 
38  private:
39  static const uint64_t kMaxPipelineMem; // 1G
40  static const unsigned kMaxFilesInFlight = 8000;
41  static const unsigned kNforkRegister = 1;
42  static const unsigned kNforkWrite = 1;
43  static const unsigned kNforkHash = 2;
44  static const unsigned kNforkCompress = 4;
45  static const unsigned kNforkChunk = 1;
46  static const unsigned kNforkRead = 8;
47 
51  const bool chunking_enabled_;
52  const size_t minimal_chunk_size_;
53  const size_t average_chunk_size_;
54  const size_t maximal_chunk_size_;
55 
56  bool spawned_;
58  // TODO(jblomer): a semaphore would be faster!
59  // We need to have two in-flight counters: the pre-counter decreases
60  // before the final NotifyListeners() call, so that the callback can schedule
61  // a new job into the pipeline. This happens when finished child catalogs
62  // trigger uploading the parent catalog.
63  // The pre-counter sets the kMaxFilesInFlight limit.
65  // The post counter is set after the final callback. It is used to wait
66  // for the pipeline to finish.
68 
70 
72 
75 
78 
81 
84 
87 
89 }; // class IngestionPipeline
90 
91 
94  ScrubbingResult(const std::string &p, const shash::Any &h)
95  : path(p), hash(h) { }
96  std::string path;
98 };
99 
100 
101 class TaskScrubbingCallback : public TubeConsumer<BlockItem>,
102  public Observable<ScrubbingResult> {
103  public:
105  : TubeConsumer<BlockItem>(tube_in), tube_counter_(tube_counter) { }
106 
107  protected:
108  virtual void Process(BlockItem *block_item);
109 
110  private:
112 };
113 
114 
115 class ScrubbingPipeline : public Observable<ScrubbingResult> {
116  public:
119 
120  void Spawn();
122  shash::Algorithms hash_algorithm,
123  shash::Suffix hash_suffix);
124  void WaitFor();
125 
126  void OnFileProcessed(const ScrubbingResult &scrubbing_result);
127 
128  private:
129  static const uint64_t kMemLowWatermark = 384 * 1024 * 1024;
130  static const uint64_t kMemHighWatermark = 512 * 1024 * 1024;
131  static const unsigned kMaxFilesInFlight = 8000;
132  static const unsigned kNforkScrubbingCallback = 1;
133  static const unsigned kNforkHash = 2;
134  static const unsigned kNforkChunk = 1;
135  static const unsigned kNforkRead = 8;
136 
137  bool spawned_;
139  // TODO(jblomer): a semaphore would be faster!
141 
143 
146 
149 
152 
154 };
155 
156 
159  CompressHashResult(const std::string &p, const shash::Any &h)
160  : path(p), hash(h) { }
161  std::string path;
163 };
164 
165 
167  : public TubeConsumer<BlockItem>
168  , public Observable<CompressHashResult>
169 {
170  public:
172  Tube<FileItem> *tube_counter)
173  : TubeConsumer<BlockItem>(tube_in)
174  , tube_counter_(tube_counter)
175  { }
176 
177  protected:
178  virtual void Process(BlockItem *block_item);
179 
180  private:
182 };
183 
184 
185 class CompressHashPipeline : public Observable<CompressHashResult> {
186  public:
189 
190  void Spawn();
192  shash::Algorithms hash_algorithm,
193  shash::Suffix hash_suffix);
194  void WaitFor();
195 
196  void OnFileProcessed(const CompressHashResult &compress_hash_result);
197 
198  private:
199  static const uint64_t kMemLowWatermark = 64 * 1024 * 1024;
200  static const uint64_t kMemHighWatermark = 128 * 1024 * 1024;
201 
202  bool spawned_;
205 
207 
210 
213 
216 
219 
221 };
222 
223 #endif // CVMFS_INGESTION_PIPELINE_H_
static const unsigned kNforkScrubbingCallback
Definition: pipeline.h:132
upload::AbstractUploader * uploader_
Definition: pipeline.h:57
TubeGroup< BlockItem > tubes_compress_hash_callback_
Definition: pipeline.h:217
TubeConsumerGroup< BlockItem > tasks_chunk_
Definition: pipeline.h:145
void OnFileProcessed(const upload::SpoolerResult &spooler_result)
Definition: pipeline.cc:113
shash::Any hash
Definition: pipeline.h:97
Tube< FileItem > tube_ctr_inflight_post_
Definition: pipeline.h:67
Tube< FileItem > tube_ctr_inflight_pre_
Definition: pipeline.h:64
TubeConsumerGroup< BlockItem > tasks_chunk_
Definition: pipeline.h:209
TubeGroup< BlockItem > tubes_chunk_
Definition: pipeline.h:208
TaskCompressHashCallback(Tube< BlockItem > *tube_in, Tube< FileItem > *tube_counter)
Definition: pipeline.h:171
static const unsigned kNforkHash
Definition: pipeline.h:133
static const uint64_t kMemHighWatermark
Definition: pipeline.h:200
Tube< FileItem > tube_counter_
Definition: pipeline.h:140
ItemAllocator item_allocator_
Definition: pipeline.h:88
virtual void Process(BlockItem *block_item)
Definition: pipeline.cc:154
CVMFS_EXPORT const LogSource source
Definition: exception.h:33
TubeGroup< BlockItem > tubes_hash_
Definition: pipeline.h:214
static const unsigned kNforkChunk
Definition: pipeline.h:45
void Process(IngestionSource *source, bool allow_chunking, shash::Suffix hash_suffix=shash::kSuffixNone)
Definition: pipeline.cc:119
static const unsigned kNforkRead
Definition: pipeline.h:135
TubeConsumerGroup< BlockItem > tasks_compress_
Definition: pipeline.h:77
TubeGroup< BlockItem > tubes_compress_
Definition: pipeline.h:76
TubeGroup< BlockItem > tubes_scrubbing_callback_
Definition: pipeline.h:150
void Process(IngestionSource *source, shash::Algorithms hash_algorithm, shash::Suffix hash_suffix)
Definition: pipeline.cc:239
TubeGroup< BlockItem > tubes_chunk_
Definition: pipeline.h:73
TubeConsumerGroup< FileItem > tasks_read_
Definition: pipeline.h:206
const size_t average_chunk_size_
Definition: pipeline.h:53
ItemAllocator item_allocator_
Definition: pipeline.h:153
static const unsigned kMaxFilesInFlight
Definition: pipeline.h:131
TubeGroup< BlockItem > tubes_hash_
Definition: pipeline.h:147
TaskScrubbingCallback(Tube< BlockItem > *tube_in, Tube< FileItem > *tube_counter)
Definition: pipeline.h:104
static const uint64_t kMaxPipelineMem
Definition: pipeline.h:39
Tube< FileItem > tube_input_
Definition: pipeline.h:203
Algorithms
Definition: hash.h:41
Tube< FileItem > * tube_counter_
Definition: pipeline.h:111
static const unsigned kNforkRegister
Definition: pipeline.h:41
ScrubbingResult(const std::string &p, const shash::Any &h)
Definition: pipeline.h:94
Algorithms
Definition: compression.h:44
TubeGroup< BlockItem > tubes_write_
Definition: pipeline.h:82
void OnFileProcessed(const CompressHashResult &compress_hash_result)
static const unsigned kNforkHash
Definition: pipeline.h:43
CompressHashResult(const std::string &p, const shash::Any &h)
Definition: pipeline.h:159
const shash::Algorithms hash_algorithm_
Definition: pipeline.h:49
Tube< FileItem > tube_input_
Definition: pipeline.h:69
TubeGroup< BlockItem > tubes_hash_
Definition: pipeline.h:79
const size_t maximal_chunk_size_
Definition: pipeline.h:54
TubeConsumerGroup< BlockItem > tasks_compress_hash_callback_
Definition: pipeline.h:218
TubeConsumerGroup< BlockItem > tasks_hash_
Definition: pipeline.h:148
TubeConsumerGroup< FileItem > tasks_register_
Definition: pipeline.h:86
static const uint64_t kMemHighWatermark
Definition: pipeline.h:130
IngestionPipeline(upload::AbstractUploader *uploader, const upload::SpoolerDefinition &spooler_definition)
Definition: pipeline.cc:26
static const unsigned kNforkChunk
Definition: pipeline.h:134
static const uint64_t kMemLowWatermark
Definition: pipeline.h:199
TubeGroup< BlockItem > tubes_chunk_
Definition: pipeline.h:144
char Suffix
Definition: hash.h:111
static const uint64_t kMemLowWatermark
Definition: pipeline.h:129
void OnFileProcessed(const ScrubbingResult &scrubbing_result)
Definition: pipeline.cc:233
Tube< FileItem > * tube_counter_
Definition: pipeline.h:181
const bool chunking_enabled_
Definition: pipeline.h:51
static const unsigned kNforkRead
Definition: pipeline.h:46
shash::Any hash
Definition: pipeline.h:162
TubeConsumerGroup< FileItem > tasks_read_
Definition: pipeline.h:71
TubeConsumerGroup< FileItem > tasks_read_
Definition: pipeline.h:142
std::string path
Definition: pipeline.h:161
virtual void Process(BlockItem *block_item)
TubeGroup< BlockItem > tubes_compress_
Definition: pipeline.h:211
void Process(IngestionSource *source, shash::Algorithms hash_algorithm, shash::Suffix hash_suffix)
TubeConsumerGroup< BlockItem > tasks_chunk_
Definition: pipeline.h:74
ItemAllocator item_allocator_
Definition: pipeline.h:220
static const unsigned kNforkCompress
Definition: pipeline.h:44
TubeConsumerGroup< BlockItem > tasks_compress_
Definition: pipeline.h:212
TubeConsumerGroup< BlockItem > tasks_scrubbing_callback_
Definition: pipeline.h:151
TubeConsumerGroup< BlockItem > tasks_hash_
Definition: pipeline.h:80
Tube< FileItem > tube_input_
Definition: pipeline.h:138
TubeConsumerGroup< BlockItem > tasks_write_
Definition: pipeline.h:83
static const unsigned kNforkWrite
Definition: pipeline.h:42
static const unsigned kMaxFilesInFlight
Definition: pipeline.h:40
TubeConsumerGroup< BlockItem > tasks_hash_
Definition: pipeline.h:215
const char kSuffixNone
Definition: hash.h:53
TubeGroup< FileItem > tubes_register_
Definition: pipeline.h:85
std::string path
Definition: pipeline.h:96
Tube< FileItem > tube_counter_
Definition: pipeline.h:204
const zlib::Algorithms compression_algorithm_
Definition: pipeline.h:48
const size_t minimal_chunk_size_
Definition: pipeline.h:52
const bool generate_legacy_bulk_chunks_
Definition: pipeline.h:50