CernVM-FS  2.9.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
pipeline.cc
Go to the documentation of this file.
1 
5 #include "cvmfs_config.h"
6 #include "pipeline.h"
7 
8 #include <algorithm>
9 #include <cstdlib>
10 
11 #include "ingestion/task_chunk.h"
13 #include "ingestion/task_hash.h"
14 #include "ingestion/task_read.h"
16 #include "ingestion/task_write.h"
17 #include "platform.h"
18 #include "upload_facility.h"
20 #include "util/exception.h"
21 #include "util/string.h"
22 #include "util_concurrency.h"
23 
24 const uint64_t IngestionPipeline::kMaxPipelineMem = 1024 * 1024 * 1024;
25 
27  upload::AbstractUploader *uploader,
28  const upload::SpoolerDefinition &spooler_definition)
29  : compression_algorithm_(spooler_definition.compression_alg)
30  , hash_algorithm_(spooler_definition.hash_algorithm)
31  , generate_legacy_bulk_chunks_(spooler_definition.generate_legacy_bulk_chunks)
32  , chunking_enabled_(spooler_definition.use_file_chunking)
33  , minimal_chunk_size_(spooler_definition.min_file_chunk_size)
34  , average_chunk_size_(spooler_definition.avg_file_chunk_size)
35  , maximal_chunk_size_(spooler_definition.max_file_chunk_size)
36  , spawned_(false)
37  , uploader_(uploader)
38  , tube_counter_(kMaxFilesInFlight)
39 {
40  unsigned nfork_base = std::max(1U, GetNumberOfCpuCores() / 8);
41 
42  for (unsigned i = 0; i < nfork_base * kNforkRegister; ++i) {
43  Tube<FileItem> *tube = new Tube<FileItem>();
45  TaskRegister *task = new TaskRegister(tube, &tube_counter_);
48  }
50 
51  for (unsigned i = 0; i < nfork_base * kNforkWrite; ++i) {
55  }
57 
58  for (unsigned i = 0; i < nfork_base * kNforkHash; ++i) {
62  }
64 
65  for (unsigned i = 0; i < nfork_base * kNforkCompress; ++i) {
70  }
72 
73  for (unsigned i = 0; i < nfork_base * kNforkChunk; ++i) {
78  }
80 
81  uint64_t high = kMaxPipelineMem;
82  high = std::min(high, platform_memsize() / 5);
83  char *fixed_limit_mb = getenv("_CVMFS_SERVER_PIPELINE_MB");
84  if (fixed_limit_mb != NULL) {
85  high = String2Uint64(fixed_limit_mb) * 1024 * 1024;
86  }
87  uint64_t low = (high * 2) / 3;
89  "pipeline memory thresholds %" PRIu64 "/%" PRIu64 " M",
90  low / (1024 * 1024), high / (1024 * 1024));
91  for (unsigned i = 0; i < nfork_base * kNforkRead; ++i) {
92  TaskRead *task_read =
94  task_read->SetWatermarks(low, high);
95  tasks_read_.TakeConsumer(task_read);
96  }
97 }
98 
99 
101  if (spawned_) {
108  }
109 }
110 
111 
113  const upload::SpoolerResult &spooler_result)
114 {
115  NotifyListeners(spooler_result);
116 }
117 
118 
120  IngestionSource* source,
121  bool allow_chunking,
122  shash::Suffix hash_suffix)
123 {
124  FileItem *file_item = new FileItem(
125  source,
131  hash_suffix,
132  allow_chunking && chunking_enabled_,
134  tube_counter_.EnqueueBack(file_item);
135  tube_input_.EnqueueBack(file_item);
136 }
137 
138 
142  tasks_hash_.Spawn();
145  tasks_read_.Spawn();
146  spawned_ = true;
147 }
148 
149 
152 }
153 
154 
155 //------------------------------------------------------------------------------
156 
157 
159  FileItem *file_item = block_item->file_item();
160  assert(file_item != NULL);
161  assert(!file_item->path().empty());
162  ChunkItem *chunk_item = block_item->chunk_item();
163  assert(chunk_item != NULL);
164  assert(chunk_item->is_bulk_chunk());
165 
166  switch (block_item->type()) {
168  delete block_item;
169  break;
170 
172  assert(!chunk_item->hash_ptr()->IsNull());
173  NotifyListeners(ScrubbingResult(file_item->path(),
174  *chunk_item->hash_ptr()));
175  delete block_item;
176  delete chunk_item;
177  delete file_item;
179  break;
180 
181  default:
182  PANIC(NULL);
183  }
184 }
185 
186 
187 //------------------------------------------------------------------------------
188 
189 
191  : spawned_(false)
192  , tube_counter_(kMaxFilesInFlight)
193 {
194  unsigned nfork_base = std::max(1U, GetNumberOfCpuCores() / 8);
195 
196  for (unsigned i = 0; i < nfork_base * kNforkScrubbingCallback; ++i) {
197  Tube<BlockItem> *tube = new Tube<BlockItem>();
199  TaskScrubbingCallback *task =
203  }
205 
206  for (unsigned i = 0; i < nfork_base * kNforkHash; ++i) {
207  Tube<BlockItem> *t = new Tube<BlockItem>();
210  }
212 
213  for (unsigned i = 0; i < nfork_base * kNforkChunk; ++i) {
214  Tube<BlockItem> *t = new Tube<BlockItem>();
218  }
220 
221  for (unsigned i = 0; i < nfork_base * kNforkRead; ++i) {
222  TaskRead *task_read =
225  tasks_read_.TakeConsumer(task_read);
226  }
227 }
228 
229 
231  if (spawned_) {
236  }
237 }
238 
239 
241  const ScrubbingResult &scrubbing_result)
242 {
243  NotifyListeners(scrubbing_result);
244 }
245 
246 
248  IngestionSource *source,
249  shash::Algorithms hash_algorithm,
250  shash::Suffix hash_suffix)
251 {
252  FileItem *file_item = new FileItem(
253  source,
254  0, 0, 0,
256  hash_algorithm,
257  hash_suffix,
258  false, /* may_have_chunks */
259  true /* hash_legacy_bulk_chunk */);
260  tube_counter_.EnqueueBack(file_item);
261  tube_input_.EnqueueBack(file_item);
262 }
263 
264 
267  tasks_hash_.Spawn();
269  tasks_read_.Spawn();
270  spawned_ = true;
271 }
272 
273 
276 }
static const unsigned kNforkScrubbingCallback
Definition: pipeline.h:128
upload::AbstractUploader * uploader_
Definition: pipeline.h:57
#define LogCvmfs(source, mask,...)
Definition: logging.h:20
CallbackPtr RegisterListener(typename BoundClosure< ParamT, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data)
TubeConsumerGroup< BlockItem > tasks_chunk_
Definition: pipeline.h:141
void OnFileProcessed(const upload::SpoolerResult &spooler_result)
Definition: pipeline.cc:112
void Wait()
Definition: tube.h:144
void TakeConsumer(TubeConsumer< ItemT > *consumer)
Definition: task.h:74
unsigned int GetNumberOfCpuCores()
static const unsigned kNforkHash
Definition: pipeline.h:129
Tube< FileItem > tube_counter_
Definition: pipeline.h:136
ItemAllocator item_allocator_
Definition: pipeline.h:79
virtual void Process(BlockItem *block_item)
Definition: pipeline.cc:158
#define PANIC(...)
Definition: exception.h:26
static const unsigned kNforkChunk
Definition: pipeline.h:45
void Process(IngestionSource *source, bool allow_chunking, shash::Suffix hash_suffix=shash::kSuffixNone)
Definition: pipeline.cc:119
void NotifyListeners(const upload::SpoolerResult &parameter)
static const unsigned kNforkRead
Definition: pipeline.h:131
TubeConsumerGroup< BlockItem > tasks_compress_
Definition: pipeline.h:68
TubeGroup< BlockItem > tubes_compress_
Definition: pipeline.h:67
TubeGroup< BlockItem > tubes_scrubbing_callback_
Definition: pipeline.h:146
void Process(IngestionSource *source, shash::Algorithms hash_algorithm, shash::Suffix hash_suffix)
Definition: pipeline.cc:247
assert((mem||(size==0))&&"Out Of Memory")
TubeGroup< BlockItem > tubes_chunk_
Definition: pipeline.h:64
const size_t average_chunk_size_
Definition: pipeline.h:53
void SetWatermarks(uint64_t low, uint64_t high)
Definition: task_read.cc:80
ItemAllocator item_allocator_
Definition: pipeline.h:149
ItemT * PopFront()
Definition: tube.h:123
TubeGroup< BlockItem > tubes_hash_
Definition: pipeline.h:143
static const uint64_t kMaxPipelineMem
Definition: pipeline.h:39
Algorithms
Definition: hash.h:39
Tube< FileItem > * tube_counter_
Definition: pipeline.h:107
static const unsigned kNforkRegister
Definition: pipeline.h:41
uint64_t platform_memsize()
TubeGroup< BlockItem > tubes_write_
Definition: pipeline.h:73
void Activate()
Definition: tube.h:247
void Spawn()
Definition: task.h:79
static const unsigned kNforkHash
Definition: pipeline.h:43
const shash::Algorithms hash_algorithm_
Definition: pipeline.h:49
ChunkItem * chunk_item()
Definition: item.h:221
Tube< FileItem > tube_input_
Definition: pipeline.h:60
TubeGroup< BlockItem > tubes_hash_
Definition: pipeline.h:70
Link * EnqueueBack(ItemT *item)
Definition: tube.h:73
FileItem * file_item()
Definition: item.h:220
const size_t maximal_chunk_size_
Definition: pipeline.h:54
TubeConsumerGroup< BlockItem > tasks_hash_
Definition: pipeline.h:144
Definition: item.h:34
Tube< FileItem > tube_counter_
Definition: pipeline.h:59
TubeConsumerGroup< FileItem > tasks_register_
Definition: pipeline.h:77
void TakeTube(Tube< ItemT > *t)
Definition: tube.h:242
static const uint64_t kMemHighWatermark
Definition: pipeline.h:126
IngestionPipeline(upload::AbstractUploader *uploader, const upload::SpoolerDefinition &spooler_definition)
Definition: pipeline.cc:26
static const unsigned kNforkChunk
Definition: pipeline.h:130
TubeGroup< BlockItem > tubes_chunk_
Definition: pipeline.h:140
char Suffix
Definition: hash.h:112
static const uint64_t kMemLowWatermark
Definition: pipeline.h:125
std::string path()
Definition: item.h:57
void OnFileProcessed(const ScrubbingResult &scrubbing_result)
Definition: pipeline.cc:240
const bool chunking_enabled_
Definition: pipeline.h:51
static const unsigned kNforkRead
Definition: pipeline.h:46
uint64_t String2Uint64(const string &value)
Definition: string.cc:227
TubeConsumerGroup< FileItem > tasks_read_
Definition: pipeline.h:62
TubeConsumerGroup< FileItem > tasks_read_
Definition: pipeline.h:138
TubeConsumerGroup< BlockItem > tasks_chunk_
Definition: pipeline.h:65
static const unsigned kNforkCompress
Definition: pipeline.h:44
TubeConsumerGroup< BlockItem > tasks_scrubbing_callback_
Definition: pipeline.h:147
BlockType type()
Definition: item.h:218
TubeConsumerGroup< BlockItem > tasks_hash_
Definition: pipeline.h:71
Tube< FileItem > tube_input_
Definition: pipeline.h:134
TubeConsumerGroup< BlockItem > tasks_write_
Definition: pipeline.h:74
static const unsigned kNforkWrite
Definition: pipeline.h:42
TubeGroup< FileItem > tubes_register_
Definition: pipeline.h:76
const zlib::Algorithms compression_algorithm_
Definition: pipeline.h:48
const size_t minimal_chunk_size_
Definition: pipeline.h:52
void Terminate()
Definition: task.h:94
const bool generate_legacy_bulk_chunks_
Definition: pipeline.h:50