CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
payload_processor.cc
Go to the documentation of this file.
1 
5 #include "payload_processor.h"
6 
7 #include <fcntl.h>
8 #include <unistd.h>
9 
10 #include <vector>
11 
12 #include "params.h"
13 #include "util/logging.h"
14 #include "util/posix.h"
15 #include "util/string.h"
16 
17 namespace {
18 
19 const size_t kConsumerBuffer = 10 * 1024 * 1024; // 10 MB
20 
21 }
22 
23 namespace receiver {
24 
26  : handle(NULL)
27  , total_size(0)
28  , current_size(0)
29  , hash_context()
30  , hash_buffer() { }
31 
33  : handle(NULL)
34  , total_size(event.size)
35  , current_size(0)
36  , hash_context(shash::ContextPtr(event.id.algorithm))
37  , hash_buffer(hash_context.size, 0) {
40 }
41 
43  : handle(other.handle)
44  , total_size(other.total_size)
45  , current_size(other.current_size)
46  , hash_context(other.hash_context)
47  , hash_buffer(other.hash_buffer) {
49 }
50 
52  handle = other.handle;
53  total_size = other.total_size;
54  current_size = other.current_size;
55  hash_context = other.hash_context;
56  hash_buffer = other.hash_buffer;
58 
59  return *this;
60 }
61 
63  : pending_files_()
64  , current_repo_()
65  , uploader_()
66  , temp_dir_()
67  , num_errors_(0)
68  , statistics_(NULL) { }
69 
71 
73  int fdin, const std::string &header_digest, const std::string &path,
74  uint64_t header_size) {
76  "PayloadProcessor - lease_path: %s, header digest: %s, header "
77  "size: %ld",
78  path.c_str(), header_digest.c_str(), header_size);
79 
80  const size_t first_slash_idx = path.find('/', 0);
81 
82  current_repo_ = path.substr(0, first_slash_idx);
83 
84  Result init_result = Initialize();
85  if (init_result != kSuccess) {
86  return init_result;
87  }
88 
89  // Set up object pack deserialization
91 
92  ObjectPackConsumer deserializer(digest, header_size);
94 
95  int nb = 0;
97  std::vector<unsigned char> buffer(kConsumerBuffer, 0);
98  do {
99  nb = read(fdin, &buffer[0], buffer.size());
100  consumer_state = deserializer.ConsumeNext(nb, &buffer[0]);
101  if (consumer_state != ObjectPackBuild::kStateContinue
102  && consumer_state != ObjectPackBuild::kStateDone) {
104  "PayloadProcessor - error: %d encountered when consuming object "
105  "pack.",
106  consumer_state);
107  break;
108  }
109  } while (nb > 0 && consumer_state != ObjectPackBuild::kStateDone);
110 
111  assert(pending_files_.empty());
112 
113  Result res = Finalize();
114 
115  deserializer.UnregisterListeners();
116 
117  return res;
118 }
119 
121  const ObjectPackBuild::Event &event) {
122  std::string path("");
123 
124  if (event.object_type == ObjectPack::kCas) {
125  path = event.id.MakePath();
126  } else if (event.object_type == ObjectPack::kNamed) {
127  path = event.object_name;
128  } else {
129  // kEmpty - this is an error.
131  "PayloadProcessor - error: Event received with unknown object.");
132  num_errors_++;
133  return;
134  }
135 
136  FileIterator it = pending_files_.find(event.id);
137  if (it == pending_files_.end()) {
138  // Schedule file upload if it's not being uploaded yet.
139  // Uploaders later check if the file is already present
140  // in the upstream storage and will not upload it twice.
141  FileInfo info(event);
142  // info.handle is later deleted by FinalizeStreamedUpload
143  info.handle = uploader_->InitStreamedUpload(NULL);
144  pending_files_[event.id] = info;
145  }
146 
147  FileInfo &info = pending_files_[event.id];
148 
149  void *buf_copied = smalloc(event.buf_size);
150  memcpy(buf_copied, event.buf, event.buf_size);
151  upload::AbstractUploader::UploadBuffer buf(event.buf_size, buf_copied);
152  uploader_->ScheduleUpload(
153  info.handle, buf,
155  &PayloadProcessor::OnUploadJobComplete, this, buf_copied));
156 
157  shash::Update(static_cast<const unsigned char *>(event.buf),
158  event.buf_size,
159  info.hash_context);
160 
161  info.current_size += event.buf_size;
162 
163  if (info.current_size == info.total_size) {
164  shash::Any file_hash(event.id.algorithm);
165  shash::Final(info.hash_context, &file_hash);
166 
167  if (file_hash != event.id) {
168  LogCvmfs(
170  "PayloadProcessor - error: Hash mismatch for unpacked file: event "
171  "size: %ld, file size: %ld, event hash: %s, file hash: %s",
172  event.size, info.current_size, event.id.ToString(true).c_str(),
173  file_hash.ToString(true).c_str());
174  num_errors_++;
175  return;
176  }
177  // override final remote path if not CAS object
178  if (event.object_type == ObjectPack::kNamed) {
179  info.handle->remote_path = path;
180  }
181  uploader_->ScheduleCommit(info.handle, event.id);
182 
183  pending_files_.erase(event.id);
184  }
185 }
186 
188  const upload::UploaderResults &results, void *buffer) {
189  free(buffer);
190 }
191 
193  statistics_ = new perf::StatisticsTemplate("publish", st);
194 }
195 
197  Params params;
198  if (!GetParamsFromFile(current_repo_, &params)) {
199  LogCvmfs(
201  "PayloadProcessor - error: Could not get configuration parameters.");
202  return kOtherError;
203  }
204 
205  const std::string spooler_temp_dir = GetSpoolerTempDir(
206  params.spooler_configuration);
207  assert(!spooler_temp_dir.empty());
208  assert(MkdirDeep(spooler_temp_dir + "/receiver", 0770, true));
209  temp_dir_ = RaiiTempDir::Create(spooler_temp_dir
210  + "/receiver/payload_processor");
211 
212  upload::SpoolerDefinition definition(
213  params.spooler_configuration, params.hash_alg, params.compression_alg,
215  params.min_chunk_size, params.avg_chunk_size, params.max_chunk_size,
216  "dummy_token", "dummy_key");
217 
218  uploader_.Destroy();
219 
220  // configure the uploader environment
222  if (!uploader_.IsValid()) {
224  "Failed to initialize backend upload "
225  "facility in PayloadProcessor.");
226  return kUploaderError;
227  }
228 
229  if (statistics_.IsValid()) {
230  uploader_->InitCounters(statistics_.weak_ref());
231  }
232 
233  return kSuccess;
234 }
235 
237  uploader_->WaitForUpload();
238  temp_dir_.Destroy();
239 
240  const unsigned num_uploader_errors = uploader_->GetNumberOfErrors();
241  uploader_->TearDown();
242  if (num_uploader_errors > 0) {
244  "PayloadProcessor - error: Uploader - %d upload(s) failed.",
245  num_uploader_errors);
246  return kUploaderError;
247  }
248 
249  if (GetNumErrors() > 0) {
251  "PayloadProcessor - error: %d unpacking error(s).",
252  GetNumErrors());
253  return kOtherError;
254  }
255 
256  return kSuccess;
257 }
258 
259 } // namespace receiver
CallbackPtr RegisterListener(typename BoundClosure< ParamT, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data)
upload::UploadStreamHandle * handle
FileInfo & operator=(const FileInfo &other)
size_t avg_chunk_size
Definition: params.h:28
std::string spooler_configuration
Definition: params.h:20
T * weak_ref() const
Definition: pointer.h:46
UniquePtr< upload::AbstractUploader > uploader_
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:241
zlib::Algorithms compression_alg
Definition: params.h:24
UniquePtr< perf::StatisticsTemplate > statistics_
const int kLogWarning
bool GetParamsFromFile(const std::string &repo_name, Params *params)
Definition: params.cc:20
perf::Statistics * statistics_
Definition: repository.h:138
virtual void OnUploadJobComplete(const upload::UploaderResults &results, void *buffer)
assert((mem||(size==0))&&"Out Of Memory")
std::string GetSpoolerTempDir(const std::string &spooler_config)
Definition: params.cc:14
Algorithms algorithm
Definition: hash.h:122
virtual void ConsumerEventCallback(const ObjectPackBuild::Event &event)
char algorithm
std::vector< unsigned char > hash_buffer
void Init(ContextPtr context)
Definition: hash.cc:166
bool use_file_chunking
Definition: params.h:26
static AbstractUploader * Construct(const SpoolerDefinition &param)
Definition: plugin.h:170
size_t max_chunk_size
Definition: params.h:29
ObjectPackBuild::State ConsumeNext(const unsigned buf_size, const unsigned char *buf)
Definition: pack.cc:288
static RaiiTempDir * Create(const std::string &prefix)
Definition: raii_temp_dir.cc:9
shash::ContextPtr hash_context
bool MkdirDeep(const std::string &path, const mode_t mode, bool verify_writable)
Definition: posix.cc:855
shash::Any id
Definition: pack.h:153
std::map< shash::Any, FileInfo > pending_files_
unsigned char digest[20]
UniquePtr< RaiiTempDir > temp_dir_
void Final(ContextPtr context, Any *any_digest)
Definition: hash.cc:223
const void * buf
Definition: pack.h:156
Result Process(int fdin, const std::string &header_digest, const std::string &path, uint64_t header_size)
bool IsValid() const
Definition: pointer.h:47
void * buffer
Definition: hash.h:489
ObjectPack::BucketContentType object_type
Definition: pack.h:157
void Update(const unsigned char *buffer, const unsigned buffer_length, ContextPtr context)
Definition: hash.cc:192
size_t min_chunk_size
Definition: params.h:27
void UnregisterListeners()
Any MkFromHexPtr(const HexPtr hex, const char suffix)
Definition: hash.cc:82
std::map< shash::Any, FileInfo >::iterator FileIterator
bool generate_legacy_bulk_chunks
Definition: params.h:25
void SetStatistics(perf::Statistics *st)
static void size_t size
Definition: smalloc.h:54
unsigned buf_size
Definition: pack.h:155
void Destroy()
Definition: pointer.h:53
shash::Algorithms hash_alg
Definition: params.h:22
static CallbackTN * MakeClosure(typename BoundClosure< UploaderResults, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, const ClosureDataT &closure_data)
Definition: async.h:197
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545