CernVM-FS  2.9.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
upload_s3.cc
Go to the documentation of this file.
1 
5 #include "upload_s3.h"
6 
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <inttypes.h>
10 #include <unistd.h>
11 
12 #include <string>
13 #include <vector>
14 
15 #include "compression.h"
16 #include "logging.h"
17 #include "options.h"
18 #include "s3fanout.h"
19 #include "util/exception.h"
20 #include "util/posix.h"
21 #include "util/string.h"
22 
23 namespace upload {
24 
26  char c;
27  ReadPipe(pipe_wait[0], &c, 1);
28  assert(c == 'c');
30 }
31 
32 
34  : AbstractUploader(spooler_definition)
35  , dns_buckets_(true)
39  , authz_method_(s3fanout::kAuthzAwsV2)
40  , peek_before_put_(true)
41  , use_https_(false)
42  , temporary_path_(spooler_definition.temporary_path)
43 {
44  assert(spooler_definition.IsValid() &&
45  spooler_definition.driver_type == SpoolerDefinition::S3);
46 
47  atomic_init32(&io_errors_);
48 
49  if (!ParseSpoolerDefinition(spooler_definition)) {
50  PANIC(kLogStderr, "Error in parsing the spooler definition");
51  }
52 
54  s3config.access_key = access_key_;
55  s3config.secret_key = secret_key_;
56  s3config.hostname_port = host_name_port_;
57  s3config.authz_method = authz_method_;
58  s3config.region = region_;
59  s3config.flavor = flavor_;
60  s3config.bucket = bucket_;
61  s3config.dns_buckets = dns_buckets_;
63  s3config.opt_timeout_sec = timeout_sec_;
64  s3config.opt_max_retries = num_retries_;
67  if (use_https_) {
68  s3config.protocol = "https";
69  } else {
70  s3config.protocol = "http";
71  }
72 
74  s3fanout_mgr_->Spawn();
75 
76  int retval = pthread_create(
78  assert(retval == 0);
79 }
80 
81 
83  // Signal termination to our own worker thread
84  s3fanout_mgr_->PushCompletedJob(NULL);
85  pthread_join(thread_collect_results_, NULL);
86 }
87 
88 
90  const SpoolerDefinition &spooler_definition)
91 {
92  const std::vector<std::string> config =
93  SplitString(spooler_definition.spooler_configuration, '@');
94  if (config.size() != 2) {
96  "Failed to parse spooler configuration string '%s'.\n"
97  "Provide: <repo_alias>@/path/to/s3.conf",
98  spooler_definition.spooler_configuration.c_str());
99  return false;
100  }
101  repository_alias_ = config[0];
102  const std::string &config_path = config[1];
103 
104  if (!FileExists(config_path)) {
106  "Cannot find S3 config file at '%s'",
107  config_path.c_str());
108  return false;
109  }
110 
111  // Parse S3 configuration
112  BashOptionsManager options_manager = BashOptionsManager(
114  options_manager.ParsePath(config_path, false);
115  std::string parameter;
116 
117  if (!options_manager.GetValue("CVMFS_S3_HOST", &host_name_)) {
119  "Failed to parse CVMFS_S3_HOST from '%s'",
120  config_path.c_str());
121  return false;
122  }
123  if (!options_manager.GetValue("CVMFS_S3_ACCESS_KEY", &access_key_)) {
125  "Failed to parse CVMFS_S3_ACCESS_KEY from '%s'.",
126  config_path.c_str());
127  return false;
128  }
129  if (!options_manager.GetValue("CVMFS_S3_SECRET_KEY", &secret_key_)) {
131  "Failed to parse CVMFS_S3_SECRET_KEY from '%s'.",
132  config_path.c_str());
133  return false;
134  }
135  if (!options_manager.GetValue("CVMFS_S3_BUCKET", &bucket_)) {
137  "Failed to parse CVMFS_S3_BUCKET from '%s'.",
138  config_path.c_str());
139  return false;
140  }
141  if (options_manager.GetValue("CVMFS_S3_DNS_BUCKETS", &parameter)) {
142  if (parameter == "false") {
143  dns_buckets_ = false;
144  }
145  }
146  if (options_manager.GetValue("CVMFS_S3_MAX_NUMBER_OF_PARALLEL_CONNECTIONS",
147  &parameter))
148  {
150  }
151  if (options_manager.GetValue("CVMFS_S3_MAX_RETRIES", &parameter)) {
152  num_retries_ = String2Uint64(parameter);
153  }
154  if (options_manager.GetValue("CVMFS_S3_TIMEOUT", &parameter)) {
155  timeout_sec_ = String2Uint64(parameter);
156  }
157  if (options_manager.GetValue("CVMFS_S3_REGION", &region_)) {
159  }
160  if (options_manager.GetValue("CVMFS_S3_FLAVOR", &flavor_)) {
161  if (flavor_ == "azure") {
163  }
164  }
165  if (options_manager.GetValue("CVMFS_S3_PEEK_BEFORE_PUT", &parameter)) {
166  peek_before_put_ = options_manager.IsOn(parameter);
167  }
168  if (options_manager.GetValue("CVMFS_S3_USE_HTTPS", &parameter)) {
169  use_https_ = options_manager.IsOn(parameter);
170  }
171 
172  if (options_manager.GetValue("CVMFS_S3_PORT", &parameter)) {
173  host_name_port_ = host_name_ + ":" + parameter;
174  } else {
176  }
177 
178  return true;
179 }
180 
181 
182 bool S3Uploader::WillHandle(const SpoolerDefinition &spooler_definition) {
183  return spooler_definition.driver_type == SpoolerDefinition::S3;
184 }
185 
186 
188  if (!dns_buckets_)
189  return false;
190 
191  s3fanout::JobInfo *info = CreateJobInfo("");
193  std::string request_content;
194  if (!region_.empty()) {
195  request_content =
196  std::string("<CreateBucketConfiguration xmlns="
197  "\"http://s3.amazonaws.com/doc/2006-03-01/\">"
198  "<LocationConstraint>") + region_ + "</LocationConstraint>"
199  "</CreateBucketConfiguration>";
200  info->origin->Append(request_content.data(), request_content.length());
201  info->origin->Commit();
202  }
203 
204  RequestCtrl req_ctrl;
205  MakePipe(req_ctrl.pipe_wait);
206  info->callback = const_cast<void*>(static_cast<void const*>(MakeClosure(
207  &S3Uploader::OnReqComplete, this, &req_ctrl)));
208 
209  IncJobsInFlight();
210  UploadJobInfo(info);
211  req_ctrl.WaitFor();
212 
213  return req_ctrl.return_code == 0;
214 }
215 
216 
217 unsigned int S3Uploader::GetNumberOfErrors() const {
218  return atomic_read32(&io_errors_);
219 }
220 
221 
226  LogCvmfs(kLogUploadS3, kLogDebug, "Upload_S3 WorkerThread started.");
227  S3Uploader *uploader = reinterpret_cast<S3Uploader *>(data);
228 
229  while (true) {
230  s3fanout::JobInfo *info = uploader->s3fanout_mgr_->PopCompletedJob();
231  if (!info)
232  break;
233  // Report completed job
234  int reply_code = 0;
235  if (info->error_code != s3fanout::kFailOk) {
236  if ((info->request != s3fanout::JobInfo::kReqHeadOnly) ||
237  (info->error_code != s3fanout::kFailNotFound)) {
239  "Upload job for '%s' failed. (error code: %d - %s)",
240  info->object_key.c_str(),
241  info->error_code,
243  reply_code = 99;
244  atomic_inc32(&uploader->io_errors_);
245  }
246  }
247  if (info->request == s3fanout::JobInfo::kReqDelete) {
248  uploader->Respond(NULL, UploaderResults());
249  } else if (info->request == s3fanout::JobInfo::kReqHeadOnly) {
250  if (info->error_code == s3fanout::kFailNotFound) reply_code = 1;
251  uploader->Respond(static_cast<CallbackTN*>(info->callback),
253  reply_code));
254  } else {
256  // The HEAD request was not transformed into a PUT request, thus this
257  // was a duplicate
258  // Uploaded catalogs are always unique ->
259  // assume this was a regular file and decrease appropriate counters
260  uploader->CountDuplicates();
261  uploader->DecUploadedChunks();
262  uploader->CountUploadedBytes(-(info->payload_size));
263  }
264  uploader->Respond(static_cast<CallbackTN*>(info->callback),
266  reply_code));
267 
268  assert(!info->origin.IsValid());
269  }
270  delete info;
271  }
272 
273  LogCvmfs(kLogUploadS3, kLogDebug, "Upload_S3 WorkerThread finished.");
274  return NULL;
275 }
276 
277 
279  const std::string &remote_path,
280  IngestionSource *source,
281  const CallbackTN *callback
282 ) {
283  bool rvb = source->Open();
284  if (!rvb) {
285  Respond(callback, UploaderResults(100, source->GetPath()));
286  return;
287  }
288  uint64_t size;
289  rvb = source->GetSize(&size);
290  assert(rvb);
291 
292  FileBackedBuffer *origin =
294  spooler_definition().temporary_path);
295 
296  unsigned char buffer[kPageSize];
297  ssize_t nbytes;
298  do {
299  nbytes = source->Read(buffer, kPageSize);
300  if (nbytes > 0) origin->Append(buffer, nbytes);
301  if (nbytes < 0) {
302  source->Close();
303  delete origin;
304  Respond(callback, UploaderResults(100, source->GetPath()));
305  return;
306  }
307  } while (nbytes == kPageSize);
308  source->Close();
309  origin->Commit();
310 
311  s3fanout::JobInfo *info =
312  new s3fanout::JobInfo(repository_alias_ + "/" + remote_path,
313  const_cast<void*>(
314  static_cast<void const*>(callback)),
315  origin);
316 
317  if (HasPrefix(remote_path, ".cvmfs", false /*ignore_case*/)) {
319  } else if (HasSuffix(remote_path, ".html", false)) {
321  } else {
322  if (peek_before_put_)
324  }
325 
326  RequestCtrl req_ctrl;
327  MakePipe(req_ctrl.pipe_wait);
328  req_ctrl.callback_forward = callback;
329  req_ctrl.original_path = source->GetPath();
330  info->callback = const_cast<void*>(static_cast<void const*>(MakeClosure(
331  &S3Uploader::OnReqComplete, this, &req_ctrl)));
332 
333  UploadJobInfo(info);
334  req_ctrl.WaitFor();
335  LogCvmfs(kLogUploadS3, kLogDebug, "Uploading from source finished: %s",
336  source->GetPath().c_str());
337 }
338 
339 
342  "Uploading:\n"
343  "--> Object: '%s'\n"
344  "--> Bucket: '%s'\n"
345  "--> Host: '%s'\n",
346  info->object_key.c_str(),
347  bucket_.c_str(),
348  host_name_port_.c_str());
349 
350  s3fanout_mgr_->PushNewJob(info);
351 }
352 
353 
355  return new S3StreamHandle(callback, kInMemoryObjectThreshold,
356  spooler_definition().temporary_path);
357 }
358 
359 
361  UploadStreamHandle *handle,
362  UploadBuffer buffer,
363  const CallbackTN *callback)
364 {
365  S3StreamHandle *s3_handle = static_cast<S3StreamHandle*>(handle);
366 
367  s3_handle->buffer->Append(buffer.data, buffer.size);
369 }
370 
371 
373  UploadStreamHandle *handle,
374  const shash::Any &content_hash)
375 {
376  S3StreamHandle *s3_handle = static_cast<S3StreamHandle*>(handle);
377 
378  // New file name based on content hash or remote_path override
379  std::string final_path;
380  if (s3_handle->remote_path != "") {
381  final_path = repository_alias_ + "/" + s3_handle->remote_path;
382  } else {
383  final_path = repository_alias_ + "/data/" + content_hash.MakePath();
384  }
385 
386  s3_handle->buffer->Commit();
387 
388  size_t bytes_uploaded = s3_handle->buffer->GetSize();
389 
390  s3fanout::JobInfo *info =
391  new s3fanout::JobInfo(final_path,
392  const_cast<void*>(
393  static_cast<void const*>(
394  handle->commit_callback)),
395  s3_handle->buffer.Release());
396 
397  if (peek_before_put_)
399  UploadJobInfo(info);
400 
401  // Remove the temporary file
402  delete s3_handle;
403 
404  // Update statistics counters
405  if (!content_hash.HasSuffix() ||
406  content_hash.suffix == shash::kSuffixPartial) {
408  CountUploadedBytes(bytes_uploaded);
409  } else if (content_hash.suffix == shash::kSuffixCatalog) {
411  CountUploadedCatalogBytes(bytes_uploaded);
412  }
413 }
414 
415 
416 s3fanout::JobInfo *S3Uploader::CreateJobInfo(const std::string& path) const {
418  return new s3fanout::JobInfo(path, NULL, buf);
419 }
420 
421 
422 void S3Uploader::DoRemoveAsync(const std::string& file_to_delete) {
423  const std::string mangled_path = repository_alias_ + "/" + file_to_delete;
424  s3fanout::JobInfo *info = CreateJobInfo(mangled_path);
425 
427 
428  LogCvmfs(kLogUploadS3, kLogDebug, "Asynchronously removing %s/%s",
429  bucket_.c_str(), info->object_key.c_str());
430  s3fanout_mgr_->PushNewJob(info);
431 }
432 
433 
435  const upload::UploaderResults &results,
436  RequestCtrl *ctrl)
437 {
438  ctrl->return_code = results.return_code;
439  if (ctrl->callback_forward != NULL) {
440  // We are already in Respond() so we must not call it again
441  upload::UploaderResults fix_path(results.return_code, ctrl->original_path);
442  (*(ctrl->callback_forward))(fix_path);
443  delete ctrl->callback_forward;
444  ctrl->callback_forward = NULL;
445  }
446  char c = 'c';
447  WritePipe(ctrl->pipe_wait[1], &c, 1);
448 }
449 
450 
451 bool S3Uploader::Peek(const std::string& path) {
452  const std::string mangled_path = repository_alias_ + "/" + path;
453  s3fanout::JobInfo *info = CreateJobInfo(mangled_path);
454 
455  RequestCtrl req_ctrl;
456  MakePipe(req_ctrl.pipe_wait);
458  info->callback = const_cast<void*>(static_cast<void const*>(MakeClosure(
459  &S3Uploader::OnReqComplete, this, &req_ctrl)));
460 
461  IncJobsInFlight();
462  UploadJobInfo(info);
463  req_ctrl.WaitFor();
464 
465  return req_ctrl.return_code == 0;
466 }
467 
468 
469 // noop: no mkdir needed in S3 storage
470 bool S3Uploader::Mkdir(const std::string &path) {
471  return true;
472 }
473 
474 
476  return false; // TODO(rmeusel): implement
477 }
478 
479 
480 int64_t S3Uploader::DoGetObjectSize(const std::string &file_name) {
481  // TODO(dosarudaniel): use a head request for byte count
482  // Re-enable 661 integration test when done
483  return -EOPNOTSUPP;
484 }
485 
486 } // namespace upload
#define LogCvmfs(source, mask,...)
Definition: logging.h:20
std::string host_name_
Definition: upload_s3.h:121
virtual bool Create()
Definition: upload_s3.cc:187
bool GetValue(const std::string &key, std::string *value)
Definition: options.cc:376
static FileBackedBuffer * Create(uint64_t in_memory_threshold, const std::string &tmp_dir="/tmp/")
DriverType driver_type
the type of the spooler driver
vector< string > SplitString(const string &str, const char delim, const unsigned max_chunks)
Definition: string.cc:288
#define PANIC(...)
Definition: exception.h:26
void Respond(const CallbackTN *callback, const UploaderResults &result) const
virtual void FinalizeStreamedUpload(UploadStreamHandle *handle, const shash::Any &content_hash)
Definition: upload_s3.cc:372
std::string secret_key_
Definition: upload_s3.h:130
bool HasSuffix() const
Definition: hash.h:235
int64_t DoGetObjectSize(const std::string &file_name)
Definition: upload_s3.cc:480
virtual bool Close()=0
S3Uploader(const SpoolerDefinition &spooler_definition)
Definition: upload_s3.cc:33
const std::string object_key
Definition: s3fanout.h:108
assert((mem||(size==0))&&"Out Of Memory")
const SpoolerDefinition & spooler_definition() const
virtual bool GetSize(uint64_t *size)=0
static const unsigned kDefaultNumRetries
Definition: upload_s3.h:88
s3fanout::JobInfo * CreateJobInfo(const std::string &path) const
Definition: upload_s3.cc:416
void CountUploadedChunks() const
static bool WillHandle(const SpoolerDefinition &spooler_definition)
Definition: upload_s3.cc:182
s3fanout::AuthzMethods authz_method_
Definition: upload_s3.h:131
void MakePipe(int pipe_fd[2])
Definition: posix.cc:525
static const unsigned kDefaultTimeoutSec
Definition: upload_s3.h:89
static void * MainCollectResults(void *data)
Definition: upload_s3.cc:225
Failures error_code
Definition: s3fanout.h:146
virtual ~S3Uploader()
Definition: upload_s3.cc:82
std::string repository_alias_
Definition: upload_s3.h:119
void OnReqComplete(const upload::UploaderResults &results, RequestCtrl *ctrl)
Definition: upload_s3.cc:434
bool FileExists(const std::string &path)
Definition: posix.cc:816
const CallbackTN * commit_callback
virtual void DoUpload(const std::string &remote_path, IngestionSource *source, const CallbackTN *callback=NULL)
Definition: upload_s3.cc:278
virtual bool Open()=0
atomic_int32 io_errors_
Definition: upload_s3.h:136
UniquePtr< s3fanout::S3FanoutManager > s3fanout_mgr_
Definition: upload_s3.h:118
virtual void DoRemoveAsync(const std::string &file_to_delete)
Definition: upload_s3.cc:422
uint64_t payload_size
Definition: s3fanout.h:144
const char kSuffixPartial
Definition: hash.h:55
virtual bool Peek(const std::string &path)
Definition: upload_s3.cc:451
bool HasSuffix(const std::string &str, const std::string &suffix, const bool ignore_case)
Definition: string.cc:279
const char kSuffixCatalog
Definition: hash.h:52
virtual UploadStreamHandle * InitStreamedUpload(const CallbackTN *callback=NULL)
Definition: upload_s3.cc:354
std::string access_key_
Definition: upload_s3.h:129
UniquePtr< FileBackedBuffer > origin
Definition: s3fanout.h:110
virtual bool Mkdir(const std::string &path)
Definition: upload_s3.cc:470
std::string region_
Definition: upload_s3.h:122
void CountUploadedBytes(int64_t bytes_written) const
UniquePtr< FileBackedBuffer > buffer
Definition: upload_s3.h:34
pthread_t thread_collect_results_
Definition: upload_s3.h:137
static const unsigned kDefaultBackoffMaxMs
Definition: upload_s3.h:91
bool IsValid() const
Definition: pointer.h:43
static const unsigned kDefaultNumParallelUploads
Definition: upload_s3.h:87
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:265
virtual std::string GetPath() const =0
void ParsePath(const std::string &config_file, const bool external)
Definition: options.cc:146
virtual void StreamedUpload(UploadStreamHandle *handle, UploadBuffer buffer, const CallbackTN *callback=NULL)
Definition: upload_s3.cc:360
void UploadJobInfo(s3fanout::JobInfo *info)
Definition: upload_s3.cc:340
void Append(const void *source, uint64_t len)
void CountUploadedCatalogBytes(int64_t bytes_written) const
RequestType request
Definition: s3fanout.h:145
uint64_t String2Uint64(const string &value)
Definition: string.cc:228
T * Release()
Definition: pointer.h:44
virtual bool PlaceBootstrappingShortcut(const shash::Any &object)
Definition: upload_s3.cc:475
const unsigned kPageSize
Definition: posix.h:29
const CallbackTN * callback_forward
Definition: upload_s3.h:104
static const unsigned kDefaultBackoffInitMs
Definition: upload_s3.h:90
void CountUploadedCatalogs() const
virtual ssize_t Read(void *buffer, size_t nbyte)=0
bool ParseSpoolerDefinition(const SpoolerDefinition &spooler_definition)
Definition: upload_s3.cc:89
Suffix suffix
Definition: hash.h:124
std::string MakePath() const
Definition: hash.h:312
virtual unsigned int GetNumberOfErrors() const
Definition: upload_s3.cc:217
static const unsigned kInMemoryObjectThreshold
Definition: upload_s3.h:92
const std::string temporary_path_
Definition: upload_s3.h:135
unsigned num_retries_
Definition: upload_s3.h:127
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:534
static void size_t size
Definition: smalloc.h:47
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:546
std::string host_name_port_
Definition: upload_s3.h:120
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:584
bool IsOn(const std::string &param_value)
Definition: options.cc:409
std::string flavor_
Definition: upload_s3.h:123
const char * Code2Ascii(const Failures error)
Definition: s3fanout.h:57
unsigned timeout_sec_
Definition: upload_s3.h:128
std::string bucket_
Definition: upload_s3.h:124
static CallbackTN * MakeClosure(typename BoundClosure< UploaderResults, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, const ClosureDataT &closure_data)
Definition: async.h:204