CernVM-FS  2.9.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
pipeline.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_INGESTION_PIPELINE_H_
6 #define CVMFS_INGESTION_PIPELINE_H_
7 
8 #include <string>
9 
10 #include "compression.h"
11 #include "hash.h"
12 #include "ingestion/item.h"
13 #include "ingestion/item_mem.h"
14 #include "ingestion/task.h"
15 #include "ingestion/tube.h"
16 #include "upload_spooler_result.h"
17 #include "util_concurrency.h"
18 
19 namespace upload {
20 class AbstractUploader;
21 struct SpoolerDefinition;
22 }
23 
24 class IngestionPipeline : public Observable<upload::SpoolerResult> {
25  public:
26  explicit IngestionPipeline(
27  upload::AbstractUploader *uploader,
28  const upload::SpoolerDefinition &spooler_definition);
30 
31  void Spawn();
32  void Process(IngestionSource* source, bool allow_chunking,
33  shash::Suffix hash_suffix = shash::kSuffixNone);
34  void WaitFor();
35 
36  void OnFileProcessed(const upload::SpoolerResult &spooler_result);
37 
38  private:
39  static const uint64_t kMaxPipelineMem; // 1G
40  static const unsigned kMaxFilesInFlight = 8000;
41  static const unsigned kNforkRegister = 1;
42  static const unsigned kNforkWrite = 1;
43  static const unsigned kNforkHash = 2;
44  static const unsigned kNforkCompress = 4;
45  static const unsigned kNforkChunk = 1;
46  static const unsigned kNforkRead = 8;
47 
51  const bool chunking_enabled_;
52  const size_t minimal_chunk_size_;
53  const size_t average_chunk_size_;
54  const size_t maximal_chunk_size_;
55 
56  bool spawned_;
58  // TODO(jblomer): a semaphore would be faster!
61 
63 
66 
69 
72 
75 
78 
80 }; // class IngestionPipeline
81 
82 
85  ScrubbingResult(const std::string &p, const shash::Any &h)
86  : path(p), hash(h) { }
87  std::string path;
89 };
90 
91 
93  : public TubeConsumer<BlockItem>
94  , public Observable<ScrubbingResult>
95 {
96  public:
98  Tube<FileItem> *tube_counter)
99  : TubeConsumer<BlockItem>(tube_in)
100  , tube_counter_(tube_counter)
101  { }
102 
103  protected:
104  virtual void Process(BlockItem *block_item);
105 
106  private:
108 };
109 
110 
111 class ScrubbingPipeline : public Observable<ScrubbingResult> {
112  public:
115 
116  void Spawn();
117  void Process(IngestionSource* source,
118  shash::Algorithms hash_algorithm,
119  shash::Suffix hash_suffix);
120  void WaitFor();
121 
122  void OnFileProcessed(const ScrubbingResult &scrubbing_result);
123 
124  private:
125  static const uint64_t kMemLowWatermark = 384 * 1024 * 1024;
126  static const uint64_t kMemHighWatermark = 512 * 1024 * 1024;
127  static const unsigned kMaxFilesInFlight = 8000;
128  static const unsigned kNforkScrubbingCallback = 1;
129  static const unsigned kNforkHash = 2;
130  static const unsigned kNforkChunk = 1;
131  static const unsigned kNforkRead = 8;
132 
133  bool spawned_;
135  // TODO(jblomer): a semaphore would be faster!
137 
139 
142 
145 
148 
150 };
151 
152 #endif // CVMFS_INGESTION_PIPELINE_H_
static const unsigned kNforkScrubbingCallback
Definition: pipeline.h:128
upload::AbstractUploader * uploader_
Definition: pipeline.h:57
TubeConsumerGroup< BlockItem > tasks_chunk_
Definition: pipeline.h:141
void OnFileProcessed(const upload::SpoolerResult &spooler_result)
Definition: pipeline.cc:112
shash::Any hash
Definition: pipeline.h:88
static const unsigned kNforkHash
Definition: pipeline.h:129
Tube< FileItem > tube_counter_
Definition: pipeline.h:136
ItemAllocator item_allocator_
Definition: pipeline.h:79
virtual void Process(BlockItem *block_item)
Definition: pipeline.cc:158
static const unsigned kNforkChunk
Definition: pipeline.h:45
void Process(IngestionSource *source, bool allow_chunking, shash::Suffix hash_suffix=shash::kSuffixNone)
Definition: pipeline.cc:119
static const unsigned kNforkRead
Definition: pipeline.h:131
TubeConsumerGroup< BlockItem > tasks_compress_
Definition: pipeline.h:68
TubeGroup< BlockItem > tubes_compress_
Definition: pipeline.h:67
TubeGroup< BlockItem > tubes_scrubbing_callback_
Definition: pipeline.h:146
void Process(IngestionSource *source, shash::Algorithms hash_algorithm, shash::Suffix hash_suffix)
Definition: pipeline.cc:247
TubeGroup< BlockItem > tubes_chunk_
Definition: pipeline.h:64
const size_t average_chunk_size_
Definition: pipeline.h:53
ItemAllocator item_allocator_
Definition: pipeline.h:149
static const unsigned kMaxFilesInFlight
Definition: pipeline.h:127
TubeGroup< BlockItem > tubes_hash_
Definition: pipeline.h:143
TaskScrubbingCallback(Tube< BlockItem > *tube_in, Tube< FileItem > *tube_counter)
Definition: pipeline.h:97
static const uint64_t kMaxPipelineMem
Definition: pipeline.h:39
Algorithms
Definition: hash.h:39
Tube< FileItem > * tube_counter_
Definition: pipeline.h:107
static const unsigned kNforkRegister
Definition: pipeline.h:41
ScrubbingResult(const std::string &p, const shash::Any &h)
Definition: pipeline.h:85
Algorithms
Definition: compression.h:44
TubeGroup< BlockItem > tubes_write_
Definition: pipeline.h:73
static const unsigned kNforkHash
Definition: pipeline.h:43
const shash::Algorithms hash_algorithm_
Definition: pipeline.h:49
Tube< FileItem > tube_input_
Definition: pipeline.h:60
TubeGroup< BlockItem > tubes_hash_
Definition: pipeline.h:70
const size_t maximal_chunk_size_
Definition: pipeline.h:54
TubeConsumerGroup< BlockItem > tasks_hash_
Definition: pipeline.h:144
Tube< FileItem > tube_counter_
Definition: pipeline.h:59
TubeConsumerGroup< FileItem > tasks_register_
Definition: pipeline.h:77
static const uint64_t kMemHighWatermark
Definition: pipeline.h:126
IngestionPipeline(upload::AbstractUploader *uploader, const upload::SpoolerDefinition &spooler_definition)
Definition: pipeline.cc:26
static const unsigned kNforkChunk
Definition: pipeline.h:130
TubeGroup< BlockItem > tubes_chunk_
Definition: pipeline.h:140
char Suffix
Definition: hash.h:112
static const uint64_t kMemLowWatermark
Definition: pipeline.h:125
void OnFileProcessed(const ScrubbingResult &scrubbing_result)
Definition: pipeline.cc:240
const bool chunking_enabled_
Definition: pipeline.h:51
static const unsigned kNforkRead
Definition: pipeline.h:46
TubeConsumerGroup< FileItem > tasks_read_
Definition: pipeline.h:62
TubeConsumerGroup< FileItem > tasks_read_
Definition: pipeline.h:138
TubeConsumerGroup< BlockItem > tasks_chunk_
Definition: pipeline.h:65
static const unsigned kNforkCompress
Definition: pipeline.h:44
TubeConsumerGroup< BlockItem > tasks_scrubbing_callback_
Definition: pipeline.h:147
TubeConsumerGroup< BlockItem > tasks_hash_
Definition: pipeline.h:71
Tube< FileItem > tube_input_
Definition: pipeline.h:134
TubeConsumerGroup< BlockItem > tasks_write_
Definition: pipeline.h:74
static const unsigned kNforkWrite
Definition: pipeline.h:42
static const unsigned kMaxFilesInFlight
Definition: pipeline.h:40
const char kSuffixNone
Definition: hash.h:51
TubeGroup< FileItem > tubes_register_
Definition: pipeline.h:76
std::string path
Definition: pipeline.h:87
const zlib::Algorithms compression_algorithm_
Definition: pipeline.h:48
const size_t minimal_chunk_size_
Definition: pipeline.h:52
const bool generate_legacy_bulk_chunks_
Definition: pipeline.h:50