CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
pipeline.cc
Go to the documentation of this file.
1 
5 #include "cvmfs_config.h"
6 #include "pipeline.h"
7 
8 #include <algorithm>
9 #include <cstdlib>
10 
11 #include "ingestion/task_chunk.h"
13 #include "ingestion/task_hash.h"
14 #include "ingestion/task_read.h"
16 #include "ingestion/task_write.h"
17 #include "upload_facility.h"
19 #include "util/concurrency.h"
20 #include "util/exception.h"
21 #include "util/platform.h"
22 #include "util/string.h"
23 
24 const uint64_t IngestionPipeline::kMaxPipelineMem = 1024 * 1024 * 1024;
25 
27  upload::AbstractUploader *uploader,
28  const upload::SpoolerDefinition &spooler_definition)
29  : compression_algorithm_(spooler_definition.compression_alg)
30  , hash_algorithm_(spooler_definition.hash_algorithm)
31  , generate_legacy_bulk_chunks_(spooler_definition.generate_legacy_bulk_chunks)
32  , chunking_enabled_(spooler_definition.use_file_chunking)
33  , minimal_chunk_size_(spooler_definition.min_file_chunk_size)
34  , average_chunk_size_(spooler_definition.avg_file_chunk_size)
35  , maximal_chunk_size_(spooler_definition.max_file_chunk_size)
36  , spawned_(false)
37  , uploader_(uploader)
38  , tube_ctr_inflight_pre_(kMaxFilesInFlight)
39 {
40  unsigned nfork_base = std::max(1U, GetNumberOfCpuCores() / 8);
41 
42  for (unsigned i = 0; i < nfork_base * kNforkRegister; ++i) {
43  Tube<FileItem> *tube = new Tube<FileItem>();
45  TaskRegister *task = new TaskRegister(tube,
49  }
51 
52  for (unsigned i = 0; i < nfork_base * kNforkWrite; ++i) {
56  }
58 
59  for (unsigned i = 0; i < nfork_base * kNforkHash; ++i) {
63  }
65 
66  for (unsigned i = 0; i < nfork_base * kNforkCompress; ++i) {
71  }
73 
74  for (unsigned i = 0; i < nfork_base * kNforkChunk; ++i) {
79  }
81 
82  uint64_t high = kMaxPipelineMem;
83  high = std::min(high, platform_memsize() / 5);
84  char *fixed_limit_mb = getenv("_CVMFS_SERVER_PIPELINE_MB");
85  if (fixed_limit_mb != NULL) {
86  high = String2Uint64(fixed_limit_mb) * 1024 * 1024;
87  }
88  uint64_t low = (high * 2) / 3;
90  "pipeline memory thresholds %" PRIu64 "/%" PRIu64 " M",
91  low / (1024 * 1024), high / (1024 * 1024));
92  for (unsigned i = 0; i < nfork_base * kNforkRead; ++i) {
93  TaskRead *task_read =
95  task_read->SetWatermarks(low, high);
96  tasks_read_.TakeConsumer(task_read);
97  }
98 }
99 
100 
102  if (spawned_) {
109  }
110 }
111 
112 
114  const upload::SpoolerResult &spooler_result)
115 {
116  NotifyListeners(spooler_result);
117 }
118 
119 
121  IngestionSource* source,
122  bool allow_chunking,
123  shash::Suffix hash_suffix)
124 {
125  FileItem *file_item = new FileItem(
126  source,
132  hash_suffix,
133  allow_chunking && chunking_enabled_,
137  tube_input_.EnqueueBack(file_item);
138 }
139 
140 
144  tasks_hash_.Spawn();
147  tasks_read_.Spawn();
148  spawned_ = true;
149 }
150 
151 
154 }
155 
156 
157 //------------------------------------------------------------------------------
158 
159 
161  FileItem *file_item = block_item->file_item();
162  assert(file_item != NULL);
163  assert(!file_item->path().empty());
164  ChunkItem *chunk_item = block_item->chunk_item();
165  assert(chunk_item != NULL);
166  assert(chunk_item->is_bulk_chunk());
167 
168  switch (block_item->type()) {
170  delete block_item;
171  break;
172 
174  assert(!chunk_item->hash_ptr()->IsNull());
175  NotifyListeners(ScrubbingResult(file_item->path(),
176  *chunk_item->hash_ptr()));
177  delete block_item;
178  delete chunk_item;
179  delete file_item;
181  break;
182 
183  default:
184  PANIC(NULL);
185  }
186 }
187 
188 
189 //------------------------------------------------------------------------------
190 
191 
193  : spawned_(false)
194  , tube_counter_(kMaxFilesInFlight)
195 {
196  unsigned nfork_base = std::max(1U, GetNumberOfCpuCores() / 8);
197 
198  for (unsigned i = 0; i < nfork_base * kNforkScrubbingCallback; ++i) {
199  Tube<BlockItem> *tube = new Tube<BlockItem>();
201  TaskScrubbingCallback *task =
205  }
207 
208  for (unsigned i = 0; i < nfork_base * kNforkHash; ++i) {
209  Tube<BlockItem> *t = new Tube<BlockItem>();
212  }
214 
215  for (unsigned i = 0; i < nfork_base * kNforkChunk; ++i) {
216  Tube<BlockItem> *t = new Tube<BlockItem>();
220  }
222 
223  for (unsigned i = 0; i < nfork_base * kNforkRead; ++i) {
224  TaskRead *task_read =
227  tasks_read_.TakeConsumer(task_read);
228  }
229 }
230 
231 
233  if (spawned_) {
238  }
239 }
240 
241 
243  const ScrubbingResult &scrubbing_result)
244 {
245  NotifyListeners(scrubbing_result);
246 }
247 
248 
250  IngestionSource *source,
251  shash::Algorithms hash_algorithm,
252  shash::Suffix hash_suffix)
253 {
254  FileItem *file_item = new FileItem(
255  source,
256  0, 0, 0,
258  hash_algorithm,
259  hash_suffix,
260  false, /* may_have_chunks */
261  true /* hash_legacy_bulk_chunk */);
262  tube_counter_.EnqueueBack(file_item);
263  tube_input_.EnqueueBack(file_item);
264 }
265 
266 
269  tasks_hash_.Spawn();
271  tasks_read_.Spawn();
272  spawned_ = true;
273 }
274 
275 
278 }
static const unsigned kNforkScrubbingCallback
Definition: pipeline.h:137
upload::AbstractUploader * uploader_
Definition: pipeline.h:57
CallbackPtr RegisterListener(typename BoundClosure< ParamT, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data)
TubeConsumerGroup< BlockItem > tasks_chunk_
Definition: pipeline.h:150
void OnFileProcessed(const upload::SpoolerResult &spooler_result)
Definition: pipeline.cc:113
void Wait()
Definition: tube.h:159
Tube< FileItem > tube_ctr_inflight_post_
Definition: pipeline.h:67
void TakeConsumer(TubeConsumer< ItemT > *consumer)
Definition: task.h:74
Tube< FileItem > tube_ctr_inflight_pre_
Definition: pipeline.h:64
static const unsigned kNforkHash
Definition: pipeline.h:138
Tube< FileItem > tube_counter_
Definition: pipeline.h:145
ItemAllocator item_allocator_
Definition: pipeline.h:88
virtual void Process(BlockItem *block_item)
Definition: pipeline.cc:160
#define PANIC(...)
Definition: exception.h:29
static const unsigned kNforkChunk
Definition: pipeline.h:45
void Process(IngestionSource *source, bool allow_chunking, shash::Suffix hash_suffix=shash::kSuffixNone)
Definition: pipeline.cc:120
void NotifyListeners(const upload::SpoolerResult &parameter)
static const unsigned kNforkRead
Definition: pipeline.h:140
TubeConsumerGroup< BlockItem > tasks_compress_
Definition: pipeline.h:77
TubeGroup< BlockItem > tubes_compress_
Definition: pipeline.h:76
TubeGroup< BlockItem > tubes_scrubbing_callback_
Definition: pipeline.h:155
unsigned int GetNumberOfCpuCores()
Definition: concurrency.cc:18
void Process(IngestionSource *source, shash::Algorithms hash_algorithm, shash::Suffix hash_suffix)
Definition: pipeline.cc:249
assert((mem||(size==0))&&"Out Of Memory")
TubeGroup< BlockItem > tubes_chunk_
Definition: pipeline.h:73
const size_t average_chunk_size_
Definition: pipeline.h:53
void SetWatermarks(uint64_t low, uint64_t high)
Definition: task_read.cc:80
ItemAllocator item_allocator_
Definition: pipeline.h:158
ItemT * PopFront()
Definition: tube.h:123
TubeGroup< BlockItem > tubes_hash_
Definition: pipeline.h:152
static const uint64_t kMaxPipelineMem
Definition: pipeline.h:39
Algorithms
Definition: hash.h:41
Tube< FileItem > * tube_counter_
Definition: pipeline.h:116
static const unsigned kNforkRegister
Definition: pipeline.h:41
uint64_t platform_memsize()
TubeGroup< BlockItem > tubes_write_
Definition: pipeline.h:82
void Activate()
Definition: tube.h:262
void Spawn()
Definition: task.h:79
static const unsigned kNforkHash
Definition: pipeline.h:43
const shash::Algorithms hash_algorithm_
Definition: pipeline.h:49
ChunkItem * chunk_item()
Definition: item.h:221
Tube< FileItem > tube_input_
Definition: pipeline.h:69
TubeGroup< BlockItem > tubes_hash_
Definition: pipeline.h:79
Link * EnqueueBack(ItemT *item)
Definition: tube.h:73
FileItem * file_item()
Definition: item.h:220
const size_t maximal_chunk_size_
Definition: pipeline.h:54
TubeConsumerGroup< BlockItem > tasks_hash_
Definition: pipeline.h:153
Definition: item.h:34
TubeConsumerGroup< FileItem > tasks_register_
Definition: pipeline.h:86
void TakeTube(Tube< ItemT > *t)
Definition: tube.h:257
static const uint64_t kMemHighWatermark
Definition: pipeline.h:135
IngestionPipeline(upload::AbstractUploader *uploader, const upload::SpoolerDefinition &spooler_definition)
Definition: pipeline.cc:26
static const unsigned kNforkChunk
Definition: pipeline.h:139
TubeGroup< BlockItem > tubes_chunk_
Definition: pipeline.h:149
char Suffix
Definition: hash.h:114
static const uint64_t kMemLowWatermark
Definition: pipeline.h:134
std::string path()
Definition: item.h:57
void OnFileProcessed(const ScrubbingResult &scrubbing_result)
Definition: pipeline.cc:242
const bool chunking_enabled_
Definition: pipeline.h:51
static const unsigned kNforkRead
Definition: pipeline.h:46
uint64_t String2Uint64(const string &value)
Definition: string.cc:246
TubeConsumerGroup< FileItem > tasks_read_
Definition: pipeline.h:71
TubeConsumerGroup< FileItem > tasks_read_
Definition: pipeline.h:147
TubeConsumerGroup< BlockItem > tasks_chunk_
Definition: pipeline.h:74
static const unsigned kNforkCompress
Definition: pipeline.h:44
TubeConsumerGroup< BlockItem > tasks_scrubbing_callback_
Definition: pipeline.h:156
BlockType type()
Definition: item.h:218
TubeConsumerGroup< BlockItem > tasks_hash_
Definition: pipeline.h:80
Tube< FileItem > tube_input_
Definition: pipeline.h:143
TubeConsumerGroup< BlockItem > tasks_write_
Definition: pipeline.h:83
static const unsigned kNforkWrite
Definition: pipeline.h:42
TubeGroup< FileItem > tubes_register_
Definition: pipeline.h:85
const zlib::Algorithms compression_algorithm_
Definition: pipeline.h:48
const size_t minimal_chunk_size_
Definition: pipeline.h:52
void Terminate()
Definition: task.h:94
const bool generate_legacy_bulk_chunks_
Definition: pipeline.h:50
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528