34 , total_size(event.
size)
36 , hash_context(shash::ContextPtr(event.id.
algorithm))
37 , hash_buffer(hash_context.
size, 0) {
43 : handle(other.handle)
44 , total_size(other.total_size)
45 , current_size(other.current_size)
46 , hash_context(other.hash_context)
47 , hash_buffer(other.hash_buffer) {
73 int fdin,
const std::string &header_digest,
const std::string &path,
74 uint64_t header_size) {
76 "PayloadProcessor - lease_path: %s, header digest: %s, header "
78 path.c_str(), header_digest.c_str(), header_size);
80 const size_t first_slash_idx = path.find(
'/', 0);
99 nb = read(fdin, &buffer[0], buffer.size());
100 consumer_state = deserializer.
ConsumeNext(nb, &buffer[0]);
104 "PayloadProcessor - error: %d encountered when consuming object "
122 std::string path(
"");
125 path =
event.id.MakePath();
127 path =
event.object_name;
131 "PayloadProcessor - error: Event received with unknown object.");
149 void *buf_copied = smalloc(event.
buf_size);
167 if (file_hash != event.
id) {
170 "PayloadProcessor - error: Hash mismatch for unpacked file: event "
171 "size: %ld, file size: %ld, event hash: %s, file hash: %s",
201 "PayloadProcessor - error: Could not get configuration parameters.");
207 assert(!spooler_temp_dir.empty());
210 +
"/receiver/payload_processor");
216 "dummy_token",
"dummy_key");
224 "Failed to initialize backend upload "
225 "facility in PayloadProcessor.");
240 const unsigned num_uploader_errors =
uploader_->GetNumberOfErrors();
242 if (num_uploader_errors > 0) {
244 "PayloadProcessor - error: Uploader - %d upload(s) failed.",
245 num_uploader_errors);
251 "PayloadProcessor - error: %d unpacking error(s).",
CallbackPtr RegisterListener(typename BoundClosure< ParamT, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data)
virtual Result Finalize()
upload::UploadStreamHandle * handle
FileInfo & operator=(const FileInfo &other)
const size_t kConsumerBuffer
std::string spooler_configuration
UniquePtr< upload::AbstractUploader > uploader_
std::string ToString(const bool with_suffix=false) const
zlib::Algorithms compression_alg
UniquePtr< perf::StatisticsTemplate > statistics_
virtual ~PayloadProcessor()
bool GetParamsFromFile(const std::string &repo_name, Params *params)
perf::Statistics * statistics_
virtual void OnUploadJobComplete(const upload::UploaderResults &results, void *buffer)
assert((mem||(size==0))&&"Out Of Memory")
std::string GetSpoolerTempDir(const std::string &spooler_config)
virtual Result Initialize()
virtual void ConsumerEventCallback(const ObjectPackBuild::Event &event)
std::vector< unsigned char > hash_buffer
void Init(ContextPtr context)
static AbstractUploader * Construct(const SpoolerDefinition ¶m)
ObjectPackBuild::State ConsumeNext(const unsigned buf_size, const unsigned char *buf)
static RaiiTempDir * Create(const std::string &prefix)
shash::ContextPtr hash_context
bool MkdirDeep(const std::string &path, const mode_t mode, bool verify_writable)
std::map< shash::Any, FileInfo > pending_files_
UniquePtr< RaiiTempDir > temp_dir_
void Final(ContextPtr context, Any *any_digest)
Result Process(int fdin, const std::string &header_digest, const std::string &path, uint64_t header_size)
ObjectPack::BucketContentType object_type
void Update(const unsigned char *buffer, const unsigned buffer_length, ContextPtr context)
void UnregisterListeners()
Any MkFromHexPtr(const HexPtr hex, const char suffix)
std::map< shash::Any, FileInfo >::iterator FileIterator
std::string current_repo_
bool generate_legacy_bulk_chunks
void SetStatistics(perf::Statistics *st)
shash::Algorithms hash_alg
static CallbackTN * MakeClosure(typename BoundClosure< UploaderResults, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, const ClosureDataT &closure_data)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)