CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
upload_s3.cc
Go to the documentation of this file.
1 
5 #include "upload_s3.h"
6 
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <inttypes.h>
10 #include <unistd.h>
11 
12 #include <string>
13 #include <vector>
14 
16 #include "network/s3fanout.h"
17 #include "options.h"
18 #include "util/exception.h"
19 #include "util/logging.h"
20 #include "util/posix.h"
21 #include "util/string.h"
22 
23 namespace upload {
24 
25 /*
26  * Allowed values of x-amz-acl according to S3 API
27  */
28 static const char *x_amz_acl_allowed_values_[8] = {"private",
29  "public-read",
30  "public-write",
31  "authenticated-read",
32  "aws-exec-read",
33  "bucket-owner-read",
34  "bucket-owner-full-control",
35  ""};
36 
38  char c;
39  ReadPipe(pipe_wait[0], &c, 1);
40  assert(c == 'c');
42 }
43 
44 
46  : AbstractUploader(spooler_definition)
47  , dns_buckets_(true)
51  , authz_method_(s3fanout::kAuthzAwsV2)
52  , peek_before_put_(true)
53  , use_https_(false)
54  , proxy_("")
55  , temporary_path_(spooler_definition.temporary_path)
56  , x_amz_acl_("public-read") {
57  assert(spooler_definition.IsValid()
58  && spooler_definition.driver_type == SpoolerDefinition::S3);
59 
60  atomic_init32(&io_errors_);
61 
62  if (!ParseSpoolerDefinition(spooler_definition)) {
63  PANIC(kLogStderr, "Error in parsing the spooler definition");
64  }
65 
67  s3config.access_key = access_key_;
68  s3config.secret_key = secret_key_;
69  s3config.hostname_port = host_name_port_;
70  s3config.authz_method = authz_method_;
71  s3config.region = region_;
72  s3config.flavor = flavor_;
73  s3config.bucket = bucket_;
74  s3config.dns_buckets = dns_buckets_;
76  s3config.opt_timeout_sec = timeout_sec_;
77  s3config.opt_max_retries = num_retries_;
80  s3config.x_amz_acl = x_amz_acl_;
81 
82  if (use_https_) {
83  s3config.protocol = "https";
84  } else {
85  s3config.protocol = "http";
86  }
87  s3config.proxy = proxy_;
88 
90  s3fanout_mgr_->Spawn();
91 
92  int retval = pthread_create(&thread_collect_results_, NULL,
93  MainCollectResults, this);
94  assert(retval == 0);
95 }
96 
97 
99  // Signal termination to our own worker thread
100  s3fanout_mgr_->PushCompletedJob(NULL);
101  pthread_join(thread_collect_results_, NULL);
102 }
103 
104 
106  const SpoolerDefinition &spooler_definition) {
107  const std::vector<std::string> config = SplitString(
108  spooler_definition.spooler_configuration, '@');
109  if (config.size() != 2) {
111  "Failed to parse spooler configuration string '%s'.\n"
112  "Provide: <repo_alias>@/path/to/s3.conf",
113  spooler_definition.spooler_configuration.c_str());
114  return false;
115  }
116  repository_alias_ = config[0];
117  const std::string &config_path = config[1];
118 
119  if (!FileExists(config_path)) {
120  LogCvmfs(kLogUploadS3, kLogStderr, "Cannot find S3 config file at '%s'",
121  config_path.c_str());
122  return false;
123  }
124 
125  // Parse S3 configuration
126  BashOptionsManager options_manager = BashOptionsManager(
128  options_manager.ParsePath(config_path, false);
129  std::string parameter;
130 
131  if (!options_manager.GetValue("CVMFS_S3_HOST", &host_name_)) {
133  "Failed to parse CVMFS_S3_HOST from '%s'", config_path.c_str());
134  return false;
135  }
136  if (!options_manager.GetValue("CVMFS_S3_ACCESS_KEY", &access_key_)) {
138  "Failed to parse CVMFS_S3_ACCESS_KEY from '%s'.",
139  config_path.c_str());
140  return false;
141  }
142  if (!options_manager.GetValue("CVMFS_S3_SECRET_KEY", &secret_key_)) {
144  "Failed to parse CVMFS_S3_SECRET_KEY from '%s'.",
145  config_path.c_str());
146  return false;
147  }
148  if (!options_manager.GetValue("CVMFS_S3_BUCKET", &bucket_)) {
150  "Failed to parse CVMFS_S3_BUCKET from '%s'.", config_path.c_str());
151  return false;
152  }
153  if (options_manager.GetValue("CVMFS_S3_DNS_BUCKETS", &parameter)) {
154  if (parameter == "false") {
155  dns_buckets_ = false;
156  }
157  }
158  if (options_manager.GetValue("CVMFS_S3_MAX_NUMBER_OF_PARALLEL_CONNECTIONS",
159  &parameter)) {
161  }
162  if (options_manager.GetValue("CVMFS_S3_MAX_RETRIES", &parameter)) {
163  num_retries_ = String2Uint64(parameter);
164  }
165  if (options_manager.GetValue("CVMFS_S3_TIMEOUT", &parameter)) {
166  timeout_sec_ = String2Uint64(parameter);
167  }
168  if (options_manager.GetValue("CVMFS_S3_REGION", &region_)) {
170  }
171  if (options_manager.GetValue("CVMFS_S3_FLAVOR", &flavor_)) {
172  if (flavor_ == "azure") {
174  } else if (flavor_ == "awsv2") {
176  } else if (flavor_ == "awsv4") {
178  } else {
180  "Failed to parse CVMFS_S3_FLAVOR from '%s', "
181  "valid options are azure, awsv2 or awsv4",
182  config_path.c_str());
183  return false;
184  }
185  }
186  if (options_manager.GetValue("CVMFS_S3_PEEK_BEFORE_PUT", &parameter)) {
187  peek_before_put_ = options_manager.IsOn(parameter);
188  }
189  if (options_manager.GetValue("CVMFS_S3_X_AMZ_ACL", &parameter)) {
190  bool isAllowed = false;
191  size_t const len = sizeof(x_amz_acl_allowed_values_)
192  / sizeof(x_amz_acl_allowed_values_[0]);
193  for (size_t i = 0; i < len; i++) {
194  if (x_amz_acl_allowed_values_[i] == parameter) {
195  isAllowed = true;
196  break;
197  }
198  }
199  if (!isAllowed) {
201  "%s is not an allowed value for CVMFS_S3_X_AMZ_ACL",
202  parameter.c_str());
203  return false;
204  }
205  x_amz_acl_ = parameter;
206  }
207 
208  if (options_manager.GetValue("CVMFS_S3_USE_HTTPS", &parameter)) {
209  use_https_ = options_manager.IsOn(parameter);
210  }
211 
212  if (options_manager.GetValue("CVMFS_S3_PORT", &parameter)) {
213  host_name_port_ = host_name_ + ":" + parameter;
214  } else {
216  }
217 
218  if (options_manager.IsDefined("CVMFS_S3_PROXY")) {
219  options_manager.GetValue("CVMFS_S3_PROXY", &proxy_);
220  }
221 
222  return true;
223 }
224 
225 
226 bool S3Uploader::WillHandle(const SpoolerDefinition &spooler_definition) {
227  return spooler_definition.driver_type == SpoolerDefinition::S3;
228 }
229 
230 
232  if (!dns_buckets_)
233  return false;
234 
235  s3fanout::JobInfo *info = CreateJobInfo("");
237  std::string request_content;
238  if (!region_.empty()) {
239  request_content = std::string("<CreateBucketConfiguration xmlns="
240  "\"http://s3.amazonaws.com/doc/2006-03-01/\">"
241  "<LocationConstraint>")
242  + region_
243  + "</LocationConstraint>"
244  "</CreateBucketConfiguration>";
245  info->origin->Append(request_content.data(), request_content.length());
246  info->origin->Commit();
247  }
248 
249  RequestCtrl req_ctrl;
250  MakePipe(req_ctrl.pipe_wait);
251  info->callback = const_cast<void *>(static_cast<void const *>(
252  MakeClosure(&S3Uploader::OnReqComplete, this, &req_ctrl)));
253 
254  IncJobsInFlight();
255  UploadJobInfo(info);
256  req_ctrl.WaitFor();
257 
258  return req_ctrl.return_code == 0;
259 }
260 
261 
262 unsigned int S3Uploader::GetNumberOfErrors() const {
263  return atomic_read32(&io_errors_);
264 }
265 
266 
271  LogCvmfs(kLogUploadS3, kLogDebug, "Upload_S3 WorkerThread started.");
272  S3Uploader *uploader = reinterpret_cast<S3Uploader *>(data);
273 
274  while (true) {
275  s3fanout::JobInfo *info = uploader->s3fanout_mgr_->PopCompletedJob();
276  if (!info)
277  break;
278  // Report completed job
279  int reply_code = 0;
280  if (info->error_code != s3fanout::kFailOk) {
282  || (info->error_code != s3fanout::kFailNotFound)) {
284  "Upload job for '%s' failed. (error code: %d - %s)",
285  info->object_key.c_str(), info->error_code,
287  reply_code = 99;
288  atomic_inc32(&uploader->io_errors_);
289  }
290  }
291  if (info->request == s3fanout::JobInfo::kReqDelete) {
292  uploader->Respond(NULL, UploaderResults());
293  } else if (info->request == s3fanout::JobInfo::kReqHeadOnly) {
294  if (info->error_code == s3fanout::kFailNotFound)
295  reply_code = 1;
296  uploader->Respond(static_cast<CallbackTN *>(info->callback),
298  } else {
300  // The HEAD request was not transformed into a PUT request, thus this
301  // was a duplicate
302  // Uploaded catalogs are always unique ->
303  // assume this was a regular file and decrease appropriate counters
304  uploader->CountDuplicates();
305  uploader->DecUploadedChunks();
306  uploader->CountUploadedBytes(-(info->payload_size));
307  }
308  uploader->Respond(
309  static_cast<CallbackTN *>(info->callback),
311 
312  assert(!info->origin.IsValid());
313  }
314  delete info;
315  }
316 
317  LogCvmfs(kLogUploadS3, kLogDebug, "Upload_S3 WorkerThread finished.");
318  return NULL;
319 }
320 
321 
322 void S3Uploader::DoUpload(const std::string &remote_path,
324  const CallbackTN *callback) {
325  bool rvb = source->Open();
326  if (!rvb) {
327  Respond(callback, UploaderResults(100, source->GetPath()));
328  return;
329  }
330  uint64_t size;
331  rvb = source->GetSize(&size);
332  assert(rvb);
333 
335  kInMemoryObjectThreshold, spooler_definition().temporary_path);
336 
337  unsigned char buffer[kPageSize];
338  ssize_t nbytes;
339  do {
340  nbytes = source->Read(buffer, kPageSize);
341  if (nbytes > 0)
342  origin->Append(buffer, nbytes);
343  if (nbytes < 0) {
344  source->Close();
345  delete origin;
346  Respond(callback, UploaderResults(100, source->GetPath()));
347  return;
348  }
349  } while (nbytes == kPageSize);
350  source->Close();
351  origin->Commit();
352 
354  repository_alias_ + "/" + remote_path,
355  const_cast<void *>(static_cast<void const *>(callback)),
356  origin);
357 
358  if (HasPrefix(remote_path, ".cvmfs", false /*ignore_case*/)) {
360  } else if (HasSuffix(remote_path, ".html", false)) {
362  } else {
363  if (peek_before_put_)
365  }
366 
367  RequestCtrl req_ctrl;
368  MakePipe(req_ctrl.pipe_wait);
369  req_ctrl.callback_forward = callback;
370  req_ctrl.original_path = source->GetPath();
371  info->callback = const_cast<void *>(static_cast<void const *>(
372  MakeClosure(&S3Uploader::OnReqComplete, this, &req_ctrl)));
373 
374  UploadJobInfo(info);
375  req_ctrl.WaitFor();
376  LogCvmfs(kLogUploadS3, kLogDebug, "Uploading from source finished: %s",
377  source->GetPath().c_str());
378 }
379 
380 
383  "Uploading:\n"
384  "--> Object: '%s'\n"
385  "--> Bucket: '%s'\n"
386  "--> Host: '%s'\n",
387  info->object_key.c_str(), bucket_.c_str(), host_name_port_.c_str());
388 
389  s3fanout_mgr_->PushNewJob(info);
390 }
391 
392 
394  return new S3StreamHandle(callback, kInMemoryObjectThreshold,
395  spooler_definition().temporary_path);
396 }
397 
398 
400  UploadBuffer buffer,
401  const CallbackTN *callback) {
402  S3StreamHandle *s3_handle = static_cast<S3StreamHandle *>(handle);
403 
404  s3_handle->buffer->Append(buffer.data, buffer.size);
406 }
407 
408 
410  const shash::Any &content_hash) {
411  S3StreamHandle *s3_handle = static_cast<S3StreamHandle *>(handle);
412 
413  // New file name based on content hash or remote_path override
414  std::string final_path;
415  if (s3_handle->remote_path != "") {
416  final_path = repository_alias_ + "/" + s3_handle->remote_path;
417  } else {
418  final_path = repository_alias_ + "/data/" + content_hash.MakePath();
419  }
420 
421  s3_handle->buffer->Commit();
422 
423  size_t bytes_uploaded = s3_handle->buffer->GetSize();
424 
426  final_path,
427  const_cast<void *>(static_cast<void const *>(handle->commit_callback)),
428  s3_handle->buffer.Release());
429 
430  if (peek_before_put_)
432  UploadJobInfo(info);
433 
434  // Remove the temporary file
435  delete s3_handle;
436 
437  // Update statistics counters
438  if (!content_hash.HasSuffix()
439  || content_hash.suffix == shash::kSuffixPartial) {
441  CountUploadedBytes(bytes_uploaded);
442  } else if (content_hash.suffix == shash::kSuffixCatalog) {
444  CountUploadedCatalogBytes(bytes_uploaded);
445  }
446 }
447 
448 
449 s3fanout::JobInfo *S3Uploader::CreateJobInfo(const std::string &path) const {
451  return new s3fanout::JobInfo(path, NULL, buf);
452 }
453 
454 
455 void S3Uploader::DoRemoveAsync(const std::string &file_to_delete) {
456  const std::string mangled_path = repository_alias_ + "/" + file_to_delete;
457  s3fanout::JobInfo *info = CreateJobInfo(mangled_path);
458 
460 
461  LogCvmfs(kLogUploadS3, kLogDebug, "Asynchronously removing %s/%s",
462  bucket_.c_str(), info->object_key.c_str());
463  s3fanout_mgr_->PushNewJob(info);
464 }
465 
466 
468  RequestCtrl *ctrl) {
469  ctrl->return_code = results.return_code;
470  if (ctrl->callback_forward != NULL) {
471  // We are already in Respond() so we must not call it again
472  upload::UploaderResults fix_path(results.return_code, ctrl->original_path);
473  (*(ctrl->callback_forward))(fix_path);
474  delete ctrl->callback_forward;
475  ctrl->callback_forward = NULL;
476  }
477  char c = 'c';
478  WritePipe(ctrl->pipe_wait[1], &c, 1);
479 }
480 
481 
482 bool S3Uploader::Peek(const std::string &path) {
483  const std::string mangled_path = repository_alias_ + "/" + path;
484  s3fanout::JobInfo *info = CreateJobInfo(mangled_path);
485 
486  RequestCtrl req_ctrl;
487  MakePipe(req_ctrl.pipe_wait);
489  info->callback = const_cast<void *>(static_cast<void const *>(
490  MakeClosure(&S3Uploader::OnReqComplete, this, &req_ctrl)));
491 
492  IncJobsInFlight();
493  UploadJobInfo(info);
494  req_ctrl.WaitFor();
495 
496  return req_ctrl.return_code == 0;
497 }
498 
499 
500 // noop: no mkdir needed in S3 storage
501 bool S3Uploader::Mkdir(const std::string &path) { return true; }
502 
503 
505  return false; // TODO(rmeusel): implement
506 }
507 
508 
509 int64_t S3Uploader::DoGetObjectSize(const std::string &file_name) {
510  // TODO(dosarudaniel): use a head request for byte count
511  // Re-enable 661 integration test when done
512  return -EOPNOTSUPP;
513 }
514 
515 } // namespace upload
std::string host_name_
Definition: upload_s3.h:119
virtual bool Create()
Definition: upload_s3.cc:231
std::string proxy_
Definition: upload_s3.h:132
static FileBackedBuffer * Create(uint64_t in_memory_threshold, const std::string &tmp_dir="/tmp/")
DriverType driver_type
the type of the spooler driver
virtual void DoUpload(const std::string &remote_path, IngestionSource *source, const CallbackTN *callback)
Definition: upload_s3.cc:322
#define PANIC(...)
Definition: exception.h:29
CVMFS_EXPORT const LogSource source
Definition: exception.h:33
void Respond(const CallbackTN *callback, const UploaderResults &result) const
virtual void FinalizeStreamedUpload(UploadStreamHandle *handle, const shash::Any &content_hash)
Definition: upload_s3.cc:409
bool IsOn(const std::string &param_value) const
Definition: options.cc:401
std::string secret_key_
Definition: upload_s3.h:128
bool HasSuffix() const
Definition: hash.h:231
int64_t DoGetObjectSize(const std::string &file_name)
Definition: upload_s3.cc:509
virtual bool Close()=0
S3Uploader(const SpoolerDefinition &spooler_definition)
Definition: upload_s3.cc:45
const std::string object_key
Definition: s3fanout.h:107
assert((mem||(size==0))&&"Out Of Memory")
const SpoolerDefinition & spooler_definition() const
virtual bool GetSize(uint64_t *size)=0
static const unsigned kDefaultNumRetries
Definition: upload_s3.h:86
s3fanout::JobInfo * CreateJobInfo(const std::string &path) const
Definition: upload_s3.cc:449
void CountUploadedChunks() const
static bool WillHandle(const SpoolerDefinition &spooler_definition)
Definition: upload_s3.cc:226
s3fanout::AuthzMethods authz_method_
Definition: upload_s3.h:129
void MakePipe(int pipe_fd[2])
Definition: posix.cc:487
static const unsigned kDefaultTimeoutSec
Definition: upload_s3.h:87
static void * MainCollectResults(void *data)
Definition: upload_s3.cc:270
Failures error_code
Definition: s3fanout.h:140
virtual ~S3Uploader()
Definition: upload_s3.cc:98
std::string repository_alias_
Definition: upload_s3.h:117
void OnReqComplete(const upload::UploaderResults &results, RequestCtrl *ctrl)
Definition: upload_s3.cc:467
bool FileExists(const std::string &path)
Definition: posix.cc:803
const CallbackTN * commit_callback
virtual bool Open()=0
atomic_int32 io_errors_
Definition: upload_s3.h:135
UniquePtr< s3fanout::S3FanoutManager > s3fanout_mgr_
Definition: upload_s3.h:116
virtual void DoRemoveAsync(const std::string &file_to_delete)
Definition: upload_s3.cc:455
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:306
uint64_t payload_size
Definition: s3fanout.h:138
const char kSuffixPartial
Definition: hash.h:57
virtual bool Peek(const std::string &path)
Definition: upload_s3.cc:482
bool HasSuffix(const std::string &str, const std::string &suffix, const bool ignore_case)
Definition: string.cc:296
const char kSuffixCatalog
Definition: hash.h:54
virtual UploadStreamHandle * InitStreamedUpload(const CallbackTN *callback=NULL)
Definition: upload_s3.cc:393
std::string x_amz_acl_
Definition: upload_s3.h:138
std::string access_key_
Definition: upload_s3.h:127
UniquePtr< FileBackedBuffer > origin
Definition: s3fanout.h:109
virtual bool Mkdir(const std::string &path)
Definition: upload_s3.cc:501
std::string region_
Definition: upload_s3.h:120
void CountUploadedBytes(int64_t bytes_written) const
UniquePtr< FileBackedBuffer > buffer
Definition: upload_s3.h:32
pthread_t thread_collect_results_
Definition: upload_s3.h:136
static const unsigned kDefaultBackoffMaxMs
Definition: upload_s3.h:89
bool IsValid() const
Definition: pointer.h:47
static const unsigned kDefaultNumParallelUploads
Definition: upload_s3.h:85
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:279
bool GetValue(const std::string &key, std::string *value) const
Definition: options.cc:368
virtual std::string GetPath() const =0
void ParsePath(const std::string &config_file, const bool external)
Definition: options.cc:144
void UploadJobInfo(s3fanout::JobInfo *info)
Definition: upload_s3.cc:381
void Append(const void *source, uint64_t len)
void CountUploadedCatalogBytes(int64_t bytes_written) const
static const char * x_amz_acl_allowed_values_[8]
Definition: upload_s3.cc:28
RequestType request
Definition: s3fanout.h:139
uint64_t String2Uint64(const string &value)
Definition: string.cc:240
T * Release()
Definition: pointer.h:48
virtual bool PlaceBootstrappingShortcut(const shash::Any &object)
Definition: upload_s3.cc:504
const unsigned kPageSize
Definition: posix.h:30
const CallbackTN * callback_forward
Definition: upload_s3.h:102
static const unsigned kDefaultBackoffInitMs
Definition: upload_s3.h:88
void CountUploadedCatalogs() const
virtual ssize_t Read(void *buffer, size_t nbyte)=0
bool ParseSpoolerDefinition(const SpoolerDefinition &spooler_definition)
Definition: upload_s3.cc:105
Suffix suffix
Definition: hash.h:123
std::string MakePath() const
Definition: hash.h:306
virtual unsigned int GetNumberOfErrors() const
Definition: upload_s3.cc:262
static const unsigned kInMemoryObjectThreshold
Definition: upload_s3.h:90
virtual void StreamedUpload(UploadStreamHandle *handle, UploadBuffer buffer, const CallbackTN *callback)
Definition: upload_s3.cc:399
const std::string temporary_path_
Definition: upload_s3.h:134
unsigned num_retries_
Definition: upload_s3.h:125
bool IsDefined(const std::string &key)
Definition: options.cc:362
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:496
static void size_t size
Definition: smalloc.h:54
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:508
std::string host_name_port_
Definition: upload_s3.h:118
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:559
std::string flavor_
Definition: upload_s3.h:121
const char * Code2Ascii(const Failures error)
Definition: s3fanout.h:57
unsigned timeout_sec_
Definition: upload_s3.h:126
std::string bucket_
Definition: upload_s3.h:122
static CallbackTN * MakeClosure(typename BoundClosure< UploaderResults, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, const ClosureDataT &closure_data)
Definition: async.h:197
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545