CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
pipeline.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_INGESTION_PIPELINE_H_
6 #define CVMFS_INGESTION_PIPELINE_H_
7 
8 #include <string>
9 
11 #include "crypto/hash.h"
12 #include "ingestion/item.h"
13 #include "ingestion/item_mem.h"
14 #include "ingestion/task.h"
15 #include "upload_spooler_result.h"
16 #include "util/concurrency.h"
17 #include "util/tube.h"
18 
19 namespace upload {
20 class AbstractUploader;
21 struct SpoolerDefinition;
22 }
23 
24 class IngestionPipeline : public Observable<upload::SpoolerResult> {
25  public:
26  explicit IngestionPipeline(
27  upload::AbstractUploader *uploader,
28  const upload::SpoolerDefinition &spooler_definition);
30 
31  void Spawn();
32  void Process(IngestionSource* source, bool allow_chunking,
33  shash::Suffix hash_suffix = shash::kSuffixNone);
34  void WaitFor();
35 
36  void OnFileProcessed(const upload::SpoolerResult &spooler_result);
37 
38  private:
39  static const uint64_t kMaxPipelineMem; // 1G
40  static const unsigned kMaxFilesInFlight = 8000;
41  static const unsigned kNforkRegister = 1;
42  static const unsigned kNforkWrite = 1;
43  static const unsigned kNforkHash = 2;
44  static const unsigned kNforkCompress = 4;
45  static const unsigned kNforkChunk = 1;
46  static const unsigned kNforkRead = 8;
47 
51  const bool chunking_enabled_;
52  const size_t minimal_chunk_size_;
53  const size_t average_chunk_size_;
54  const size_t maximal_chunk_size_;
55 
56  bool spawned_;
58  // TODO(jblomer): a semaphore would be faster!
59  // We need to have two in-flight counters: the pre-counter decreases
60  // before the final NotifyListeners() call, so that the callback can schedule
61  // a new job into the pipeline. This happens when finished child catalogs
62  // trigger uploading the parent catalog.
63  // The pre-counter sets the kMaxFilesInFlight limit.
65  // The post counter is set after the final callback. It is used to wait
66  // for the pipeline to finish.
68 
70 
72 
75 
78 
81 
84 
87 
89 }; // class IngestionPipeline
90 
91 
94  ScrubbingResult(const std::string &p, const shash::Any &h)
95  : path(p), hash(h) { }
96  std::string path;
98 };
99 
100 
102  : public TubeConsumer<BlockItem>
103  , public Observable<ScrubbingResult>
104 {
105  public:
107  Tube<FileItem> *tube_counter)
108  : TubeConsumer<BlockItem>(tube_in)
109  , tube_counter_(tube_counter)
110  { }
111 
112  protected:
113  virtual void Process(BlockItem *block_item);
114 
115  private:
117 };
118 
119 
120 class ScrubbingPipeline : public Observable<ScrubbingResult> {
121  public:
124 
125  void Spawn();
126  void Process(IngestionSource* source,
127  shash::Algorithms hash_algorithm,
128  shash::Suffix hash_suffix);
129  void WaitFor();
130 
131  void OnFileProcessed(const ScrubbingResult &scrubbing_result);
132 
133  private:
134  static const uint64_t kMemLowWatermark = 384 * 1024 * 1024;
135  static const uint64_t kMemHighWatermark = 512 * 1024 * 1024;
136  static const unsigned kMaxFilesInFlight = 8000;
137  static const unsigned kNforkScrubbingCallback = 1;
138  static const unsigned kNforkHash = 2;
139  static const unsigned kNforkChunk = 1;
140  static const unsigned kNforkRead = 8;
141 
142  bool spawned_;
144  // TODO(jblomer): a semaphore would be faster!
146 
148 
151 
154 
157 
159 };
160 
161 #endif // CVMFS_INGESTION_PIPELINE_H_
static const unsigned kNforkScrubbingCallback
Definition: pipeline.h:137
upload::AbstractUploader * uploader_
Definition: pipeline.h:57
TubeConsumerGroup< BlockItem > tasks_chunk_
Definition: pipeline.h:150
void OnFileProcessed(const upload::SpoolerResult &spooler_result)
Definition: pipeline.cc:113
shash::Any hash
Definition: pipeline.h:97
Tube< FileItem > tube_ctr_inflight_post_
Definition: pipeline.h:67
Tube< FileItem > tube_ctr_inflight_pre_
Definition: pipeline.h:64
static const unsigned kNforkHash
Definition: pipeline.h:138
Tube< FileItem > tube_counter_
Definition: pipeline.h:145
ItemAllocator item_allocator_
Definition: pipeline.h:88
virtual void Process(BlockItem *block_item)
Definition: pipeline.cc:160
static const unsigned kNforkChunk
Definition: pipeline.h:45
void Process(IngestionSource *source, bool allow_chunking, shash::Suffix hash_suffix=shash::kSuffixNone)
Definition: pipeline.cc:120
static const unsigned kNforkRead
Definition: pipeline.h:140
TubeConsumerGroup< BlockItem > tasks_compress_
Definition: pipeline.h:77
TubeGroup< BlockItem > tubes_compress_
Definition: pipeline.h:76
TubeGroup< BlockItem > tubes_scrubbing_callback_
Definition: pipeline.h:155
void Process(IngestionSource *source, shash::Algorithms hash_algorithm, shash::Suffix hash_suffix)
Definition: pipeline.cc:249
TubeGroup< BlockItem > tubes_chunk_
Definition: pipeline.h:73
const size_t average_chunk_size_
Definition: pipeline.h:53
ItemAllocator item_allocator_
Definition: pipeline.h:158
static const unsigned kMaxFilesInFlight
Definition: pipeline.h:136
TubeGroup< BlockItem > tubes_hash_
Definition: pipeline.h:152
TaskScrubbingCallback(Tube< BlockItem > *tube_in, Tube< FileItem > *tube_counter)
Definition: pipeline.h:106
static const uint64_t kMaxPipelineMem
Definition: pipeline.h:39
Algorithms
Definition: hash.h:41
Tube< FileItem > * tube_counter_
Definition: pipeline.h:116
static const unsigned kNforkRegister
Definition: pipeline.h:41
ScrubbingResult(const std::string &p, const shash::Any &h)
Definition: pipeline.h:94
Algorithms
Definition: compression.h:44
TubeGroup< BlockItem > tubes_write_
Definition: pipeline.h:82
static const unsigned kNforkHash
Definition: pipeline.h:43
const shash::Algorithms hash_algorithm_
Definition: pipeline.h:49
Tube< FileItem > tube_input_
Definition: pipeline.h:69
TubeGroup< BlockItem > tubes_hash_
Definition: pipeline.h:79
const size_t maximal_chunk_size_
Definition: pipeline.h:54
TubeConsumerGroup< BlockItem > tasks_hash_
Definition: pipeline.h:153
TubeConsumerGroup< FileItem > tasks_register_
Definition: pipeline.h:86
static const uint64_t kMemHighWatermark
Definition: pipeline.h:135
IngestionPipeline(upload::AbstractUploader *uploader, const upload::SpoolerDefinition &spooler_definition)
Definition: pipeline.cc:26
static const unsigned kNforkChunk
Definition: pipeline.h:139
TubeGroup< BlockItem > tubes_chunk_
Definition: pipeline.h:149
char Suffix
Definition: hash.h:114
static const uint64_t kMemLowWatermark
Definition: pipeline.h:134
void OnFileProcessed(const ScrubbingResult &scrubbing_result)
Definition: pipeline.cc:242
const bool chunking_enabled_
Definition: pipeline.h:51
static const unsigned kNforkRead
Definition: pipeline.h:46
TubeConsumerGroup< FileItem > tasks_read_
Definition: pipeline.h:71
TubeConsumerGroup< FileItem > tasks_read_
Definition: pipeline.h:147
TubeConsumerGroup< BlockItem > tasks_chunk_
Definition: pipeline.h:74
static const unsigned kNforkCompress
Definition: pipeline.h:44
TubeConsumerGroup< BlockItem > tasks_scrubbing_callback_
Definition: pipeline.h:156
TubeConsumerGroup< BlockItem > tasks_hash_
Definition: pipeline.h:80
Tube< FileItem > tube_input_
Definition: pipeline.h:143
TubeConsumerGroup< BlockItem > tasks_write_
Definition: pipeline.h:83
static const unsigned kNforkWrite
Definition: pipeline.h:42
static const unsigned kMaxFilesInFlight
Definition: pipeline.h:40
const char kSuffixNone
Definition: hash.h:53
TubeGroup< FileItem > tubes_register_
Definition: pipeline.h:85
std::string path
Definition: pipeline.h:96
const zlib::Algorithms compression_algorithm_
Definition: pipeline.h:48
const size_t minimal_chunk_size_
Definition: pipeline.h:52
const bool generate_legacy_bulk_chunks_
Definition: pipeline.h:50