5 #include "cvmfs_config.h"
29 : compression_algorithm_(spooler_definition.compression_alg)
30 , hash_algorithm_(spooler_definition.hash_algorithm)
31 , generate_legacy_bulk_chunks_(spooler_definition.generate_legacy_bulk_chunks)
32 , chunking_enabled_(spooler_definition.use_file_chunking)
33 , minimal_chunk_size_(spooler_definition.min_file_chunk_size)
34 , average_chunk_size_(spooler_definition.avg_file_chunk_size)
35 , maximal_chunk_size_(spooler_definition.max_file_chunk_size)
38 , tube_ctr_inflight_pre_(kMaxFilesInFlight)
52 for (
unsigned i = 0; i < nfork_base *
kNforkWrite; ++i) {
59 for (
unsigned i = 0; i < nfork_base *
kNforkHash; ++i) {
74 for (
unsigned i = 0; i < nfork_base *
kNforkChunk; ++i) {
84 char *fixed_limit_mb = getenv(
"_CVMFS_SERVER_PIPELINE_MB");
85 if (fixed_limit_mb != NULL) {
88 uint64_t low = (high * 2) / 3;
90 "pipeline memory thresholds %" PRIu64
"/%" PRIu64
" M",
91 low / (1024 * 1024), high / (1024 * 1024));
92 for (
unsigned i = 0; i < nfork_base *
kNforkRead; ++i) {
162 assert(file_item != NULL);
165 assert(chunk_item != NULL);
166 assert(chunk_item->is_bulk_chunk());
168 switch (block_item->
type()) {
174 assert(!chunk_item->hash_ptr()->IsNull());
176 *chunk_item->hash_ptr()));
194 , tube_counter_(kMaxFilesInFlight)
208 for (
unsigned i = 0; i < nfork_base *
kNforkHash; ++i) {
215 for (
unsigned i = 0; i < nfork_base *
kNforkChunk; ++i) {
223 for (
unsigned i = 0; i < nfork_base *
kNforkRead; ++i) {
static const unsigned kNforkScrubbingCallback
upload::AbstractUploader * uploader_
CallbackPtr RegisterListener(typename BoundClosure< ParamT, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data)
TubeConsumerGroup< BlockItem > tasks_chunk_
void OnFileProcessed(const upload::SpoolerResult &spooler_result)
Tube< FileItem > tube_ctr_inflight_post_
void TakeConsumer(TubeConsumer< ItemT > *consumer)
Tube< FileItem > tube_ctr_inflight_pre_
static const unsigned kNforkHash
Tube< FileItem > tube_counter_
ItemAllocator item_allocator_
virtual void Process(BlockItem *block_item)
static const unsigned kNforkChunk
void Process(IngestionSource *source, bool allow_chunking, shash::Suffix hash_suffix=shash::kSuffixNone)
void NotifyListeners(const upload::SpoolerResult ¶meter)
static const unsigned kNforkRead
TubeConsumerGroup< BlockItem > tasks_compress_
TubeGroup< BlockItem > tubes_compress_
TubeGroup< BlockItem > tubes_scrubbing_callback_
unsigned int GetNumberOfCpuCores()
void Process(IngestionSource *source, shash::Algorithms hash_algorithm, shash::Suffix hash_suffix)
assert((mem||(size==0))&&"Out Of Memory")
TubeGroup< BlockItem > tubes_chunk_
const size_t average_chunk_size_
void SetWatermarks(uint64_t low, uint64_t high)
ItemAllocator item_allocator_
TubeGroup< BlockItem > tubes_hash_
static const uint64_t kMaxPipelineMem
Tube< FileItem > * tube_counter_
static const unsigned kNforkRegister
TubeGroup< BlockItem > tubes_write_
static const unsigned kNforkHash
const shash::Algorithms hash_algorithm_
Tube< FileItem > tube_input_
TubeGroup< BlockItem > tubes_hash_
Link * EnqueueBack(ItemT *item)
const size_t maximal_chunk_size_
TubeConsumerGroup< BlockItem > tasks_hash_
TubeConsumerGroup< FileItem > tasks_register_
void TakeTube(Tube< ItemT > *t)
static const uint64_t kMemHighWatermark
IngestionPipeline(upload::AbstractUploader *uploader, const upload::SpoolerDefinition &spooler_definition)
static const unsigned kNforkChunk
TubeGroup< BlockItem > tubes_chunk_
static const uint64_t kMemLowWatermark
void OnFileProcessed(const ScrubbingResult &scrubbing_result)
const bool chunking_enabled_
static const unsigned kNforkRead
uint64_t String2Uint64(const string &value)
TubeConsumerGroup< FileItem > tasks_read_
TubeConsumerGroup< FileItem > tasks_read_
TubeConsumerGroup< BlockItem > tasks_chunk_
static const unsigned kNforkCompress
TubeConsumerGroup< BlockItem > tasks_scrubbing_callback_
TubeConsumerGroup< BlockItem > tasks_hash_
Tube< FileItem > tube_input_
TubeConsumerGroup< BlockItem > tasks_write_
static const unsigned kNforkWrite
TubeGroup< FileItem > tubes_register_
const zlib::Algorithms compression_algorithm_
const size_t minimal_chunk_size_
const bool generate_legacy_bulk_chunks_
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)