CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
pipeline.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_INGESTION_PIPELINE_H_
6 #define CVMFS_INGESTION_PIPELINE_H_
7 
8 #include <string>
9 
11 #include "crypto/hash.h"
12 #include "ingestion/item.h"
13 #include "ingestion/item_mem.h"
14 #include "ingestion/task.h"
15 #include "upload_spooler_result.h"
16 #include "util/concurrency.h"
17 #include "util/tube.h"
18 
19 namespace upload {
20 class AbstractUploader;
21 struct SpoolerDefinition;
22 } // namespace upload
23 
24 class IngestionPipeline : public Observable<upload::SpoolerResult> {
25  public:
26  explicit IngestionPipeline(
27  upload::AbstractUploader *uploader,
28  const upload::SpoolerDefinition &spooler_definition);
30 
31  void Spawn();
32  void Process(IngestionSource *source, bool allow_chunking,
33  shash::Suffix hash_suffix = shash::kSuffixNone);
34  void WaitFor();
35 
36  void OnFileProcessed(const upload::SpoolerResult &spooler_result);
37 
38  private:
39  static const uint64_t kMaxPipelineMem; // 1G
40  static const unsigned kMaxFilesInFlight = 8000;
41  static const unsigned kNforkRegister = 1;
42  static const unsigned kNforkWrite = 1;
43  static const unsigned kNforkHash = 2;
44  static const unsigned kNforkCompress = 4;
45  static const unsigned kNforkChunk = 1;
46  static const unsigned kNforkRead = 8;
47 
51  const bool chunking_enabled_;
52  const size_t minimal_chunk_size_;
53  const size_t average_chunk_size_;
54  const size_t maximal_chunk_size_;
55 
56  bool spawned_;
58  // TODO(jblomer): a semaphore would be faster!
59  // We need to have two in-flight counters: the pre-counter decreases
60  // before the final NotifyListeners() call, so that the callback can schedule
61  // a new job into the pipeline. This happens when finished child catalogs
62  // trigger uploading the parent catalog.
63  // The pre-counter sets the kMaxFilesInFlight limit.
65  // The post counter is set after the final callback. It is used to wait
66  // for the pipeline to finish.
68 
70 
72 
75 
78 
81 
84 
87 
89 }; // class IngestionPipeline
90 
91 
94  ScrubbingResult(const std::string &p, const shash::Any &h)
95  : path(p), hash(h) { }
96  std::string path;
98 };
99 
100 
101 class TaskScrubbingCallback : public TubeConsumer<BlockItem>,
102  public Observable<ScrubbingResult> {
103  public:
105  : TubeConsumer<BlockItem>(tube_in), tube_counter_(tube_counter) { }
106 
107  protected:
108  virtual void Process(BlockItem *block_item);
109 
110  private:
112 };
113 
114 
115 class ScrubbingPipeline : public Observable<ScrubbingResult> {
116  public:
119 
120  void Spawn();
122  shash::Algorithms hash_algorithm,
123  shash::Suffix hash_suffix);
124  void WaitFor();
125 
126  void OnFileProcessed(const ScrubbingResult &scrubbing_result);
127 
128  private:
129  static const uint64_t kMemLowWatermark = 384 * 1024 * 1024;
130  static const uint64_t kMemHighWatermark = 512 * 1024 * 1024;
131  static const unsigned kMaxFilesInFlight = 8000;
132  static const unsigned kNforkScrubbingCallback = 1;
133  static const unsigned kNforkHash = 2;
134  static const unsigned kNforkChunk = 1;
135  static const unsigned kNforkRead = 8;
136 
137  bool spawned_;
139  // TODO(jblomer): a semaphore would be faster!
141 
143 
146 
149 
152 
154 };
155 
156 #endif // CVMFS_INGESTION_PIPELINE_H_
static const unsigned kNforkScrubbingCallback
Definition: pipeline.h:132
upload::AbstractUploader * uploader_
Definition: pipeline.h:57
TubeConsumerGroup< BlockItem > tasks_chunk_
Definition: pipeline.h:145
void OnFileProcessed(const upload::SpoolerResult &spooler_result)
Definition: pipeline.cc:113
shash::Any hash
Definition: pipeline.h:97
Tube< FileItem > tube_ctr_inflight_post_
Definition: pipeline.h:67
Tube< FileItem > tube_ctr_inflight_pre_
Definition: pipeline.h:64
static const unsigned kNforkHash
Definition: pipeline.h:133
Tube< FileItem > tube_counter_
Definition: pipeline.h:140
ItemAllocator item_allocator_
Definition: pipeline.h:88
virtual void Process(BlockItem *block_item)
Definition: pipeline.cc:154
CVMFS_EXPORT const LogSource source
Definition: exception.h:33
static const unsigned kNforkChunk
Definition: pipeline.h:45
void Process(IngestionSource *source, bool allow_chunking, shash::Suffix hash_suffix=shash::kSuffixNone)
Definition: pipeline.cc:119
static const unsigned kNforkRead
Definition: pipeline.h:135
TubeConsumerGroup< BlockItem > tasks_compress_
Definition: pipeline.h:77
TubeGroup< BlockItem > tubes_compress_
Definition: pipeline.h:76
TubeGroup< BlockItem > tubes_scrubbing_callback_
Definition: pipeline.h:150
void Process(IngestionSource *source, shash::Algorithms hash_algorithm, shash::Suffix hash_suffix)
Definition: pipeline.cc:239
TubeGroup< BlockItem > tubes_chunk_
Definition: pipeline.h:73
const size_t average_chunk_size_
Definition: pipeline.h:53
ItemAllocator item_allocator_
Definition: pipeline.h:153
static const unsigned kMaxFilesInFlight
Definition: pipeline.h:131
TubeGroup< BlockItem > tubes_hash_
Definition: pipeline.h:147
TaskScrubbingCallback(Tube< BlockItem > *tube_in, Tube< FileItem > *tube_counter)
Definition: pipeline.h:104
static const uint64_t kMaxPipelineMem
Definition: pipeline.h:39
Algorithms
Definition: hash.h:41
Tube< FileItem > * tube_counter_
Definition: pipeline.h:111
static const unsigned kNforkRegister
Definition: pipeline.h:41
ScrubbingResult(const std::string &p, const shash::Any &h)
Definition: pipeline.h:94
Algorithms
Definition: compression.h:44
TubeGroup< BlockItem > tubes_write_
Definition: pipeline.h:82
static const unsigned kNforkHash
Definition: pipeline.h:43
const shash::Algorithms hash_algorithm_
Definition: pipeline.h:49
Tube< FileItem > tube_input_
Definition: pipeline.h:69
TubeGroup< BlockItem > tubes_hash_
Definition: pipeline.h:79
const size_t maximal_chunk_size_
Definition: pipeline.h:54
TubeConsumerGroup< BlockItem > tasks_hash_
Definition: pipeline.h:148
TubeConsumerGroup< FileItem > tasks_register_
Definition: pipeline.h:86
static const uint64_t kMemHighWatermark
Definition: pipeline.h:130
IngestionPipeline(upload::AbstractUploader *uploader, const upload::SpoolerDefinition &spooler_definition)
Definition: pipeline.cc:26
static const unsigned kNforkChunk
Definition: pipeline.h:134
TubeGroup< BlockItem > tubes_chunk_
Definition: pipeline.h:144
char Suffix
Definition: hash.h:111
static const uint64_t kMemLowWatermark
Definition: pipeline.h:129
void OnFileProcessed(const ScrubbingResult &scrubbing_result)
Definition: pipeline.cc:233
const bool chunking_enabled_
Definition: pipeline.h:51
static const unsigned kNforkRead
Definition: pipeline.h:46
TubeConsumerGroup< FileItem > tasks_read_
Definition: pipeline.h:71
TubeConsumerGroup< FileItem > tasks_read_
Definition: pipeline.h:142
TubeConsumerGroup< BlockItem > tasks_chunk_
Definition: pipeline.h:74
static const unsigned kNforkCompress
Definition: pipeline.h:44
TubeConsumerGroup< BlockItem > tasks_scrubbing_callback_
Definition: pipeline.h:151
TubeConsumerGroup< BlockItem > tasks_hash_
Definition: pipeline.h:80
Tube< FileItem > tube_input_
Definition: pipeline.h:138
TubeConsumerGroup< BlockItem > tasks_write_
Definition: pipeline.h:83
static const unsigned kNforkWrite
Definition: pipeline.h:42
static const unsigned kMaxFilesInFlight
Definition: pipeline.h:40
const char kSuffixNone
Definition: hash.h:53
TubeGroup< FileItem > tubes_register_
Definition: pipeline.h:85
std::string path
Definition: pipeline.h:96
const zlib::Algorithms compression_algorithm_
Definition: pipeline.h:48
const size_t minimal_chunk_size_
Definition: pipeline.h:52
const bool generate_legacy_bulk_chunks_
Definition: pipeline.h:50