CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
payload_processor.cc
Go to the documentation of this file.
1 
5 #include "payload_processor.h"
6 
7 #include <fcntl.h>
8 #include <unistd.h>
9 #include <vector>
10 
11 #include "params.h"
12 #include "util/logging.h"
13 #include "util/posix.h"
14 #include "util/string.h"
15 
16 namespace {
17 
18 const size_t kConsumerBuffer = 10 * 1024 * 1024; // 10 MB
19 
20 }
21 
22 namespace receiver {
23 
25  : handle(NULL),
26  total_size(0),
27  current_size(0),
28  hash_context(),
29  hash_buffer()
30 {}
31 
33  : handle(NULL),
34  total_size(event.size),
35  current_size(0),
36  hash_context(shash::ContextPtr(event.id.algorithm)),
37  hash_buffer(hash_context.size, 0)
38 {
41 }
42 
44  : handle(other.handle),
45  total_size(other.total_size),
46  current_size(other.current_size),
47  hash_context(other.hash_context),
48  hash_buffer(other.hash_buffer)
49 {
51 }
52 
54  handle = other.handle;
55  total_size = other.total_size;
56  current_size = other.current_size;
57  hash_context = other.hash_context;
58  hash_buffer = other.hash_buffer;
60 
61  return *this;
62 }
63 
65  : pending_files_(),
66  current_repo_(),
67  uploader_(),
68  temp_dir_(),
69  num_errors_(0),
70  statistics_(NULL) {}
71 
73 
75  int fdin, const std::string& header_digest, const std::string& path,
76  uint64_t header_size) {
78  "PayloadProcessor - lease_path: %s, header digest: %s, header "
79  "size: %ld",
80  path.c_str(), header_digest.c_str(), header_size);
81 
82  const size_t first_slash_idx = path.find('/', 0);
83 
84  current_repo_ = path.substr(0, first_slash_idx);
85 
86  Result init_result = Initialize();
87  if (init_result != kSuccess) {
88  return init_result;
89  }
90 
91  // Set up object pack deserialization
93 
94  ObjectPackConsumer deserializer(digest, header_size);
96 
97  int nb = 0;
99  std::vector<unsigned char> buffer(kConsumerBuffer, 0);
100  do {
101  nb = read(fdin, &buffer[0], buffer.size());
102  consumer_state = deserializer.ConsumeNext(nb, &buffer[0]);
103  if (consumer_state != ObjectPackBuild::kStateContinue &&
104  consumer_state != ObjectPackBuild::kStateDone) {
106  "PayloadProcessor - error: %d encountered when consuming object "
107  "pack.",
108  consumer_state);
109  break;
110  }
111  } while (nb > 0 && consumer_state != ObjectPackBuild::kStateDone);
112 
113  assert(pending_files_.empty());
114 
115  Result res = Finalize();
116 
117  deserializer.UnregisterListeners();
118 
119  return res;
120 }
121 
123  const ObjectPackBuild::Event& event) {
124  std::string path("");
125 
126  if (event.object_type == ObjectPack::kCas) {
127  path = event.id.MakePath();
128  } else if (event.object_type == ObjectPack::kNamed) {
129  path = event.object_name;
130  } else {
131  // kEmpty - this is an error.
133  "PayloadProcessor - error: Event received with unknown object.");
134  num_errors_++;
135  return;
136  }
137 
138  FileIterator it = pending_files_.find(event.id);
139  if (it == pending_files_.end()) {
140  // Schedule file upload if it's not being uploaded yet.
141  // Uploaders later check if the file is already present
142  // in the upstream storage and will not upload it twice.
143  FileInfo info(event);
144  // info.handle is later deleted by FinalizeStreamedUpload
145  info.handle = uploader_->InitStreamedUpload(NULL);
146  pending_files_[event.id] = info;
147  }
148 
149  FileInfo& info = pending_files_[event.id];
150 
151  void *buf_copied = smalloc(event.buf_size);
152  memcpy(buf_copied, event.buf, event.buf_size);
153  upload::AbstractUploader::UploadBuffer buf(event.buf_size, buf_copied);
154  uploader_->ScheduleUpload(info.handle, buf,
156  &PayloadProcessor::OnUploadJobComplete, this, buf_copied));
157 
158  shash::Update(static_cast<const unsigned char*>(event.buf),
159  event.buf_size,
160  info.hash_context);
161 
162  info.current_size += event.buf_size;
163 
164  if (info.current_size == info.total_size) {
165  shash::Any file_hash(event.id.algorithm);
166  shash::Final(info.hash_context, &file_hash);
167 
168  if (file_hash != event.id) {
169  LogCvmfs(
171  "PayloadProcessor - error: Hash mismatch for unpacked file: event "
172  "size: %ld, file size: %ld, event hash: %s, file hash: %s",
173  event.size, info.current_size,
174  event.id.ToString(true).c_str(), file_hash.ToString(true).c_str());
175  num_errors_++;
176  return;
177  }
178  // override final remote path if not CAS object
179  if (event.object_type == ObjectPack::kNamed) {
180  info.handle->remote_path = path;
181  }
182  uploader_->ScheduleCommit(info.handle, event.id);
183 
184  pending_files_.erase(event.id);
185  }
186 }
187 
189  const upload::UploaderResults &results,
190  void *buffer)
191 {
192  free(buffer);
193 }
194 
196  statistics_ = new perf::StatisticsTemplate("publish", st);
197 }
198 
200  Params params;
201  if (!GetParamsFromFile(current_repo_, &params)) {
202  LogCvmfs(
204  "PayloadProcessor - error: Could not get configuration parameters.");
205  return kOtherError;
206  }
207 
208  const std::string spooler_temp_dir =
210  assert(!spooler_temp_dir.empty());
211  assert(MkdirDeep(spooler_temp_dir + "/receiver", 0770, true));
212  temp_dir_ =
213  RaiiTempDir::Create(spooler_temp_dir + "/receiver/payload_processor");
214 
215  upload::SpoolerDefinition definition(
216  params.spooler_configuration, params.hash_alg, params.compression_alg,
218  params.min_chunk_size, params.avg_chunk_size, params.max_chunk_size,
219  "dummy_token", "dummy_key");
220 
221  uploader_.Destroy();
222 
223  // configure the uploader environment
225  if (!uploader_.IsValid()) {
227  "Failed to initialize backend upload "
228  "facility in PayloadProcessor.");
229  return kUploaderError;
230  }
231 
232  if (statistics_.IsValid()) {
233  uploader_->InitCounters(statistics_.weak_ref());
234  }
235 
236  return kSuccess;
237 }
238 
240  uploader_->WaitForUpload();
241  temp_dir_.Destroy();
242 
243  const unsigned num_uploader_errors = uploader_->GetNumberOfErrors();
244  uploader_->TearDown();
245  if (num_uploader_errors > 0) {
247  "PayloadProcessor - error: Uploader - %d upload(s) failed.",
248  num_uploader_errors);
249  return kUploaderError;
250  }
251 
252  if (GetNumErrors() > 0) {
254  "PayloadProcessor - error: %d unpacking error(s).", GetNumErrors());
255  return kOtherError;
256  }
257 
258  return kSuccess;
259 }
260 
261 } // namespace receiver
CallbackPtr RegisterListener(typename BoundClosure< ParamT, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data)
upload::UploadStreamHandle * handle
FileInfo & operator=(const FileInfo &other)
size_t avg_chunk_size
Definition: params.h:28
std::string spooler_configuration
Definition: params.h:20
T * weak_ref() const
Definition: pointer.h:42
UniquePtr< upload::AbstractUploader > uploader_
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:249
zlib::Algorithms compression_alg
Definition: params.h:24
UniquePtr< perf::StatisticsTemplate > statistics_
const int kLogWarning
bool GetParamsFromFile(const std::string &repo_name, Params *params)
Definition: params.cc:20
perf::Statistics * statistics_
Definition: repository.h:139
virtual void OnUploadJobComplete(const upload::UploaderResults &results, void *buffer)
assert((mem||(size==0))&&"Out Of Memory")
std::string GetSpoolerTempDir(const std::string &spooler_config)
Definition: params.cc:14
Algorithms algorithm
Definition: hash.h:125
virtual void ConsumerEventCallback(const ObjectPackBuild::Event &event)
char algorithm
std::vector< unsigned char > hash_buffer
void Init(ContextPtr context)
Definition: hash.cc:164
bool use_file_chunking
Definition: params.h:26
static AbstractUploader * Construct(const SpoolerDefinition &param)
Definition: plugin.h:188
size_t max_chunk_size
Definition: params.h:29
ObjectPackBuild::State ConsumeNext(const unsigned buf_size, const unsigned char *buf)
Definition: pack.cc:281
static RaiiTempDir * Create(const std::string &prefix)
Definition: raii_temp_dir.cc:9
shash::ContextPtr hash_context
bool MkdirDeep(const std::string &path, const mode_t mode, bool verify_writable)
Definition: posix.cc:857
shash::Any id
Definition: pack.h:149
std::map< shash::Any, FileInfo > pending_files_
unsigned char digest[20]
UniquePtr< RaiiTempDir > temp_dir_
void Final(ContextPtr context, Any *any_digest)
Definition: hash.cc:221
const void * buf
Definition: pack.h:152
Result Process(int fdin, const std::string &header_digest, const std::string &path, uint64_t header_size)
bool IsValid() const
Definition: pointer.h:43
void * buffer
Definition: hash.h:501
ObjectPack::BucketContentType object_type
Definition: pack.h:153
void Update(const unsigned char *buffer, const unsigned buffer_length, ContextPtr context)
Definition: hash.cc:190
size_t min_chunk_size
Definition: params.h:27
void UnregisterListeners()
Any MkFromHexPtr(const HexPtr hex, const char suffix)
Definition: hash.cc:83
std::map< shash::Any, FileInfo >::iterator FileIterator
bool generate_legacy_bulk_chunks
Definition: params.h:25
void SetStatistics(perf::Statistics *st)
static void size_t size
Definition: smalloc.h:54
unsigned buf_size
Definition: pack.h:151
void Destroy()
Definition: pointer.h:45
shash::Algorithms hash_alg
Definition: params.h:22
static CallbackTN * MakeClosure(typename BoundClosure< UploaderResults, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, const ClosureDataT &closure_data)
Definition: async.h:204
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528