CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
pipeline.cc
Go to the documentation of this file.
1 
6 #include "pipeline.h"
7 
8 #include <algorithm>
9 #include <cstdlib>
10 
11 #include "ingestion/task_chunk.h"
13 #include "ingestion/task_hash.h"
14 #include "ingestion/task_read.h"
16 #include "ingestion/task_write.h"
17 #include "upload_facility.h"
19 #include "util/concurrency.h"
20 #include "util/exception.h"
21 #include "util/platform.h"
22 #include "util/string.h"
23 
24 const uint64_t IngestionPipeline::kMaxPipelineMem = 1024 * 1024 * 1024;
25 
27  upload::AbstractUploader *uploader,
28  const upload::SpoolerDefinition &spooler_definition)
29  : compression_algorithm_(spooler_definition.compression_alg)
30  , hash_algorithm_(spooler_definition.hash_algorithm)
31  , generate_legacy_bulk_chunks_(
32  spooler_definition.generate_legacy_bulk_chunks)
33  , chunking_enabled_(spooler_definition.use_file_chunking)
34  , minimal_chunk_size_(spooler_definition.min_file_chunk_size)
35  , average_chunk_size_(spooler_definition.avg_file_chunk_size)
36  , maximal_chunk_size_(spooler_definition.max_file_chunk_size)
37  , spawned_(false)
38  , uploader_(uploader)
39  , tube_ctr_inflight_pre_(kMaxFilesInFlight) {
40  unsigned nfork_base = std::max(1U, GetNumberOfCpuCores() / 8);
41 
42  for (unsigned i = 0; i < nfork_base * kNforkRegister; ++i) {
43  Tube<FileItem> *tube = new Tube<FileItem>();
49  }
51 
52  for (unsigned i = 0; i < nfork_base * kNforkWrite; ++i) {
56  }
58 
59  for (unsigned i = 0; i < nfork_base * kNforkHash; ++i) {
63  }
65 
66  for (unsigned i = 0; i < nfork_base * kNforkCompress; ++i) {
71  }
73 
74  for (unsigned i = 0; i < nfork_base * kNforkChunk; ++i) {
79  }
81 
82  uint64_t high = kMaxPipelineMem;
83  high = std::min(high, platform_memsize() / 5);
84  char *fixed_limit_mb = getenv("_CVMFS_SERVER_PIPELINE_MB");
85  if (fixed_limit_mb != NULL) {
86  high = String2Uint64(fixed_limit_mb) * 1024 * 1024;
87  }
88  uint64_t low = (high * 2) / 3;
90  "pipeline memory thresholds %" PRIu64 "/%" PRIu64 " M",
91  low / (1024 * 1024), high / (1024 * 1024));
92  for (unsigned i = 0; i < nfork_base * kNforkRead; ++i) {
93  TaskRead *task_read = new TaskRead(&tube_input_, &tubes_chunk_,
95  task_read->SetWatermarks(low, high);
96  tasks_read_.TakeConsumer(task_read);
97  }
98 }
99 
100 
102  if (spawned_) {
109  }
110 }
111 
112 
114  const upload::SpoolerResult &spooler_result) {
115  NotifyListeners(spooler_result);
116 }
117 
118 
120  bool allow_chunking,
121  shash::Suffix hash_suffix) {
122  FileItem *file_item = new FileItem(source,
128  hash_suffix,
129  allow_chunking && chunking_enabled_,
133  tube_input_.EnqueueBack(file_item);
134 }
135 
136 
140  tasks_hash_.Spawn();
143  tasks_read_.Spawn();
144  spawned_ = true;
145 }
146 
147 
149 
150 
151 //------------------------------------------------------------------------------
152 
153 
155  FileItem *file_item = block_item->file_item();
156  assert(file_item != NULL);
157  assert(!file_item->path().empty());
158  ChunkItem *chunk_item = block_item->chunk_item();
159  assert(chunk_item != NULL);
160  assert(chunk_item->is_bulk_chunk());
161 
162  switch (block_item->type()) {
164  delete block_item;
165  break;
166 
168  assert(!chunk_item->hash_ptr()->IsNull());
170  ScrubbingResult(file_item->path(), *chunk_item->hash_ptr()));
171  delete block_item;
172  delete chunk_item;
173  delete file_item;
175  break;
176 
177  default:
178  PANIC(NULL);
179  }
180 }
181 
182 
183 //------------------------------------------------------------------------------
184 
185 
187  : spawned_(false), tube_counter_(kMaxFilesInFlight) {
188  unsigned nfork_base = std::max(1U, GetNumberOfCpuCores() / 8);
189 
190  for (unsigned i = 0; i < nfork_base * kNforkScrubbingCallback; ++i) {
191  Tube<BlockItem> *tube = new Tube<BlockItem>();
194  &tube_counter_);
197  }
199 
200  for (unsigned i = 0; i < nfork_base * kNforkHash; ++i) {
201  Tube<BlockItem> *t = new Tube<BlockItem>();
204  }
206 
207  for (unsigned i = 0; i < nfork_base * kNforkChunk; ++i) {
208  Tube<BlockItem> *t = new Tube<BlockItem>();
211  }
213 
214  for (unsigned i = 0; i < nfork_base * kNforkRead; ++i) {
215  TaskRead *task_read = new TaskRead(&tube_input_, &tubes_chunk_,
216  &item_allocator_);
218  tasks_read_.TakeConsumer(task_read);
219  }
220 }
221 
222 
224  if (spawned_) {
229  }
230 }
231 
232 
234  const ScrubbingResult &scrubbing_result) {
235  NotifyListeners(scrubbing_result);
236 }
237 
238 
240  shash::Algorithms hash_algorithm,
241  shash::Suffix hash_suffix) {
242  FileItem *file_item = new FileItem(source, 0, 0, 0, zlib::kNoCompression,
243  hash_algorithm, hash_suffix,
244  false, /* may_have_chunks */
245  true /* hash_legacy_bulk_chunk */);
246  tube_counter_.EnqueueBack(file_item);
247  tube_input_.EnqueueBack(file_item);
248 }
249 
250 
253  tasks_hash_.Spawn();
255  tasks_read_.Spawn();
256  spawned_ = true;
257 }
258 
259 
static const unsigned kNforkScrubbingCallback
Definition: pipeline.h:132
upload::AbstractUploader * uploader_
Definition: pipeline.h:57
CallbackPtr RegisterListener(typename BoundClosure< ParamT, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data)
TubeConsumerGroup< BlockItem > tasks_chunk_
Definition: pipeline.h:145
void OnFileProcessed(const upload::SpoolerResult &spooler_result)
Definition: pipeline.cc:113
void Wait()
Definition: tube.h:158
Tube< FileItem > tube_ctr_inflight_post_
Definition: pipeline.h:67
void TakeConsumer(TubeConsumer< ItemT > *consumer)
Definition: task.h:74
Tube< FileItem > tube_ctr_inflight_pre_
Definition: pipeline.h:64
static const unsigned kNforkHash
Definition: pipeline.h:133
Tube< FileItem > tube_counter_
Definition: pipeline.h:140
ItemAllocator item_allocator_
Definition: pipeline.h:88
virtual void Process(BlockItem *block_item)
Definition: pipeline.cc:154
#define PANIC(...)
Definition: exception.h:29
CVMFS_EXPORT const LogSource source
Definition: exception.h:33
static const unsigned kNforkChunk
Definition: pipeline.h:45
void Process(IngestionSource *source, bool allow_chunking, shash::Suffix hash_suffix=shash::kSuffixNone)
Definition: pipeline.cc:119
void NotifyListeners(const upload::SpoolerResult &parameter)
static const unsigned kNforkRead
Definition: pipeline.h:135
TubeConsumerGroup< BlockItem > tasks_compress_
Definition: pipeline.h:77
TubeGroup< BlockItem > tubes_compress_
Definition: pipeline.h:76
TubeGroup< BlockItem > tubes_scrubbing_callback_
Definition: pipeline.h:150
unsigned int GetNumberOfCpuCores()
Definition: concurrency.cc:18
void Process(IngestionSource *source, shash::Algorithms hash_algorithm, shash::Suffix hash_suffix)
Definition: pipeline.cc:239
assert((mem||(size==0))&&"Out Of Memory")
TubeGroup< BlockItem > tubes_chunk_
Definition: pipeline.h:73
const size_t average_chunk_size_
Definition: pipeline.h:53
void SetWatermarks(uint64_t low, uint64_t high)
Definition: task_read.cc:79
ItemAllocator item_allocator_
Definition: pipeline.h:153
ItemT * PopFront()
Definition: tube.h:122
TubeGroup< BlockItem > tubes_hash_
Definition: pipeline.h:147
static const uint64_t kMaxPipelineMem
Definition: pipeline.h:39
Algorithms
Definition: hash.h:41
Tube< FileItem > * tube_counter_
Definition: pipeline.h:111
static const unsigned kNforkRegister
Definition: pipeline.h:41
uint64_t platform_memsize()
TubeGroup< BlockItem > tubes_write_
Definition: pipeline.h:82
void Activate()
Definition: tube.h:259
void Spawn()
Definition: task.h:79
static const unsigned kNforkHash
Definition: pipeline.h:43
const shash::Algorithms hash_algorithm_
Definition: pipeline.h:49
ChunkItem * chunk_item()
Definition: item.h:222
Tube< FileItem > tube_input_
Definition: pipeline.h:69
TubeGroup< BlockItem > tubes_hash_
Definition: pipeline.h:79
Link * EnqueueBack(ItemT *item)
Definition: tube.h:72
FileItem * file_item()
Definition: item.h:221
const size_t maximal_chunk_size_
Definition: pipeline.h:54
TubeConsumerGroup< BlockItem > tasks_hash_
Definition: pipeline.h:148
Definition: item.h:34
TubeConsumerGroup< FileItem > tasks_register_
Definition: pipeline.h:86
void TakeTube(Tube< ItemT > *t)
Definition: tube.h:254
static const uint64_t kMemHighWatermark
Definition: pipeline.h:130
IngestionPipeline(upload::AbstractUploader *uploader, const upload::SpoolerDefinition &spooler_definition)
Definition: pipeline.cc:26
static const unsigned kNforkChunk
Definition: pipeline.h:134
TubeGroup< BlockItem > tubes_chunk_
Definition: pipeline.h:144
char Suffix
Definition: hash.h:111
static const uint64_t kMemLowWatermark
Definition: pipeline.h:129
std::string path()
Definition: item.h:56
void OnFileProcessed(const ScrubbingResult &scrubbing_result)
Definition: pipeline.cc:233
const bool chunking_enabled_
Definition: pipeline.h:51
static const unsigned kNforkRead
Definition: pipeline.h:46
uint64_t String2Uint64(const string &value)
Definition: string.cc:240
TubeConsumerGroup< FileItem > tasks_read_
Definition: pipeline.h:71
TubeConsumerGroup< FileItem > tasks_read_
Definition: pipeline.h:142
TubeConsumerGroup< BlockItem > tasks_chunk_
Definition: pipeline.h:74
static const unsigned kNforkCompress
Definition: pipeline.h:44
TubeConsumerGroup< BlockItem > tasks_scrubbing_callback_
Definition: pipeline.h:151
BlockType type()
Definition: item.h:219
TubeConsumerGroup< BlockItem > tasks_hash_
Definition: pipeline.h:80
Tube< FileItem > tube_input_
Definition: pipeline.h:138
TubeConsumerGroup< BlockItem > tasks_write_
Definition: pipeline.h:83
static const unsigned kNforkWrite
Definition: pipeline.h:42
TubeGroup< FileItem > tubes_register_
Definition: pipeline.h:85
const zlib::Algorithms compression_algorithm_
Definition: pipeline.h:48
const size_t minimal_chunk_size_
Definition: pipeline.h:52
void Terminate()
Definition: task.h:94
const bool generate_legacy_bulk_chunks_
Definition: pipeline.h:50
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545