34 total_size(event.
size),
36 hash_context(shash::ContextPtr(event.id.
algorithm)),
37 hash_buffer(hash_context.
size, 0)
44 : handle(other.handle),
45 total_size(other.total_size),
46 current_size(other.current_size),
47 hash_context(other.hash_context),
48 hash_buffer(other.hash_buffer)
75 int fdin,
const std::string& header_digest,
const std::string& path,
76 uint64_t header_size) {
78 "PayloadProcessor - lease_path: %s, header digest: %s, header "
80 path.c_str(), header_digest.c_str(), header_size);
82 const size_t first_slash_idx = path.find(
'/', 0);
101 nb = read(fdin, &buffer[0], buffer.size());
102 consumer_state = deserializer.
ConsumeNext(nb, &buffer[0]);
106 "PayloadProcessor - error: %d encountered when consuming object "
124 std::string path(
"");
127 path =
event.id.MakePath();
129 path =
event.object_name;
133 "PayloadProcessor - error: Event received with unknown object.");
151 void *buf_copied = smalloc(event.
buf_size);
168 if (file_hash != event.
id) {
171 "PayloadProcessor - error: Hash mismatch for unpacked file: event "
172 "size: %ld, file size: %ld, event hash: %s, file hash: %s",
204 "PayloadProcessor - error: Could not get configuration parameters.");
208 const std::string spooler_temp_dir =
210 assert(!spooler_temp_dir.empty());
219 "dummy_token",
"dummy_key");
227 "Failed to initialize backend upload "
228 "facility in PayloadProcessor.");
243 const unsigned num_uploader_errors =
uploader_->GetNumberOfErrors();
245 if (num_uploader_errors > 0) {
247 "PayloadProcessor - error: Uploader - %d upload(s) failed.",
248 num_uploader_errors);
254 "PayloadProcessor - error: %d unpacking error(s).",
GetNumErrors());
CallbackPtr RegisterListener(typename BoundClosure< ParamT, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data)
virtual Result Finalize()
upload::UploadStreamHandle * handle
FileInfo & operator=(const FileInfo &other)
const size_t kConsumerBuffer
std::string spooler_configuration
UniquePtr< upload::AbstractUploader > uploader_
std::string ToString(const bool with_suffix=false) const
zlib::Algorithms compression_alg
UniquePtr< perf::StatisticsTemplate > statistics_
virtual ~PayloadProcessor()
bool GetParamsFromFile(const std::string &repo_name, Params *params)
perf::Statistics * statistics_
virtual void OnUploadJobComplete(const upload::UploaderResults &results, void *buffer)
assert((mem||(size==0))&&"Out Of Memory")
std::string GetSpoolerTempDir(const std::string &spooler_config)
virtual Result Initialize()
virtual void ConsumerEventCallback(const ObjectPackBuild::Event &event)
std::vector< unsigned char > hash_buffer
void Init(ContextPtr context)
static AbstractUploader * Construct(const SpoolerDefinition ¶m)
ObjectPackBuild::State ConsumeNext(const unsigned buf_size, const unsigned char *buf)
static RaiiTempDir * Create(const std::string &prefix)
shash::ContextPtr hash_context
bool MkdirDeep(const std::string &path, const mode_t mode, bool verify_writable)
std::map< shash::Any, FileInfo > pending_files_
UniquePtr< RaiiTempDir > temp_dir_
void Final(ContextPtr context, Any *any_digest)
Result Process(int fdin, const std::string &header_digest, const std::string &path, uint64_t header_size)
ObjectPack::BucketContentType object_type
void Update(const unsigned char *buffer, const unsigned buffer_length, ContextPtr context)
void UnregisterListeners()
Any MkFromHexPtr(const HexPtr hex, const char suffix)
std::map< shash::Any, FileInfo >::iterator FileIterator
std::string current_repo_
bool generate_legacy_bulk_chunks
void SetStatistics(perf::Statistics *st)
shash::Algorithms hash_alg
static CallbackTN * MakeClosure(typename BoundClosure< UploaderResults, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, const ClosureDataT &closure_data)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)