CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
upload_s3.cc
Go to the documentation of this file.
1 
5 #include "upload_s3.h"
6 
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <inttypes.h>
10 #include <unistd.h>
11 
12 #include <string>
13 #include <vector>
14 
15 #include "compression.h"
16 #include "network/s3fanout.h"
17 #include "options.h"
18 #include "util/exception.h"
19 #include "util/logging.h"
20 #include "util/posix.h"
21 #include "util/string.h"
22 
23 namespace upload {
24 
25 /*
26  * Allowed values of x-amz-acl according to S3 API
27  */
28 static const char* x_amz_acl_allowed_values_[8] = {
29  "private",
30  "public-read",
31  "public-write",
32  "authenticated-read",
33  "aws-exec-read",
34  "bucket-owner-read",
35  "bucket-owner-full-control",
36  ""
37 };
38 
40  char c;
41  ReadPipe(pipe_wait[0], &c, 1);
42  assert(c == 'c');
44 }
45 
46 
48  : AbstractUploader(spooler_definition)
49  , dns_buckets_(true)
53  , authz_method_(s3fanout::kAuthzAwsV2)
54  , peek_before_put_(true)
55  , use_https_(false)
56  , proxy_("")
57  , temporary_path_(spooler_definition.temporary_path)
58  , x_amz_acl_("public-read")
59 {
60  assert(spooler_definition.IsValid() &&
61  spooler_definition.driver_type == SpoolerDefinition::S3);
62 
63  atomic_init32(&io_errors_);
64 
65  if (!ParseSpoolerDefinition(spooler_definition)) {
66  PANIC(kLogStderr, "Error in parsing the spooler definition");
67  }
68 
70  s3config.access_key = access_key_;
71  s3config.secret_key = secret_key_;
72  s3config.hostname_port = host_name_port_;
73  s3config.authz_method = authz_method_;
74  s3config.region = region_;
75  s3config.flavor = flavor_;
76  s3config.bucket = bucket_;
77  s3config.dns_buckets = dns_buckets_;
79  s3config.opt_timeout_sec = timeout_sec_;
80  s3config.opt_max_retries = num_retries_;
83  s3config.x_amz_acl = x_amz_acl_;
84 
85  if (use_https_) {
86  s3config.protocol = "https";
87  } else {
88  s3config.protocol = "http";
89  }
90  s3config.proxy = proxy_;
91 
93  s3fanout_mgr_->Spawn();
94 
95  int retval = pthread_create(
97  assert(retval == 0);
98 }
99 
100 
102  // Signal termination to our own worker thread
103  s3fanout_mgr_->PushCompletedJob(NULL);
104  pthread_join(thread_collect_results_, NULL);
105 }
106 
107 
109  const SpoolerDefinition &spooler_definition)
110 {
111  const std::vector<std::string> config =
112  SplitString(spooler_definition.spooler_configuration, '@');
113  if (config.size() != 2) {
115  "Failed to parse spooler configuration string '%s'.\n"
116  "Provide: <repo_alias>@/path/to/s3.conf",
117  spooler_definition.spooler_configuration.c_str());
118  return false;
119  }
120  repository_alias_ = config[0];
121  const std::string &config_path = config[1];
122 
123  if (!FileExists(config_path)) {
125  "Cannot find S3 config file at '%s'",
126  config_path.c_str());
127  return false;
128  }
129 
130  // Parse S3 configuration
131  BashOptionsManager options_manager = BashOptionsManager(
133  options_manager.ParsePath(config_path, false);
134  std::string parameter;
135 
136  if (!options_manager.GetValue("CVMFS_S3_HOST", &host_name_)) {
138  "Failed to parse CVMFS_S3_HOST from '%s'",
139  config_path.c_str());
140  return false;
141  }
142  if (!options_manager.GetValue("CVMFS_S3_ACCESS_KEY", &access_key_)) {
144  "Failed to parse CVMFS_S3_ACCESS_KEY from '%s'.",
145  config_path.c_str());
146  return false;
147  }
148  if (!options_manager.GetValue("CVMFS_S3_SECRET_KEY", &secret_key_)) {
150  "Failed to parse CVMFS_S3_SECRET_KEY from '%s'.",
151  config_path.c_str());
152  return false;
153  }
154  if (!options_manager.GetValue("CVMFS_S3_BUCKET", &bucket_)) {
156  "Failed to parse CVMFS_S3_BUCKET from '%s'.",
157  config_path.c_str());
158  return false;
159  }
160  if (options_manager.GetValue("CVMFS_S3_DNS_BUCKETS", &parameter)) {
161  if (parameter == "false") {
162  dns_buckets_ = false;
163  }
164  }
165  if (options_manager.GetValue("CVMFS_S3_MAX_NUMBER_OF_PARALLEL_CONNECTIONS",
166  &parameter))
167  {
169  }
170  if (options_manager.GetValue("CVMFS_S3_MAX_RETRIES", &parameter)) {
171  num_retries_ = String2Uint64(parameter);
172  }
173  if (options_manager.GetValue("CVMFS_S3_TIMEOUT", &parameter)) {
174  timeout_sec_ = String2Uint64(parameter);
175  }
176  if (options_manager.GetValue("CVMFS_S3_REGION", &region_)) {
178  }
179  if (options_manager.GetValue("CVMFS_S3_FLAVOR", &flavor_)) {
180  if (flavor_ == "azure") {
182  } else if (flavor_ == "awsv2") {
184  } else if (flavor_ == "awsv4") {
186  } else {
188  "Failed to parse CVMFS_S3_FLAVOR from '%s', "
189  "valid options are azure, awsv2 or awsv4",
190  config_path.c_str());
191  return false;
192  }
193  }
194  if (options_manager.GetValue("CVMFS_S3_PEEK_BEFORE_PUT", &parameter)) {
195  peek_before_put_ = options_manager.IsOn(parameter);
196  }
197  if (options_manager.GetValue("CVMFS_S3_X_AMZ_ACL", &parameter)) {
198  bool isAllowed = false;
199  size_t const len = sizeof(x_amz_acl_allowed_values_) /
200  sizeof(x_amz_acl_allowed_values_[0]);
201  for (size_t i = 0; i < len; i++) {
202  if (x_amz_acl_allowed_values_[i] == parameter) {
203  isAllowed = true;
204  break;
205  }
206  }
207  if (!isAllowed) {
209  "%s is not an allowed value for CVMFS_S3_X_AMZ_ACL",
210  parameter.c_str());
211  return false;
212  }
213  x_amz_acl_ = parameter;
214  }
215 
216  if (options_manager.GetValue("CVMFS_S3_USE_HTTPS", &parameter)) {
217  use_https_ = options_manager.IsOn(parameter);
218  }
219 
220  if (options_manager.GetValue("CVMFS_S3_PORT", &parameter)) {
221  host_name_port_ = host_name_ + ":" + parameter;
222  } else {
224  }
225 
226  if (options_manager.IsDefined("CVMFS_S3_PROXY")) {
227  options_manager.GetValue("CVMFS_S3_PROXY", &proxy_);
228  }
229 
230  return true;
231 }
232 
233 
234 bool S3Uploader::WillHandle(const SpoolerDefinition &spooler_definition) {
235  return spooler_definition.driver_type == SpoolerDefinition::S3;
236 }
237 
238 
240  if (!dns_buckets_)
241  return false;
242 
243  s3fanout::JobInfo *info = CreateJobInfo("");
245  std::string request_content;
246  if (!region_.empty()) {
247  request_content =
248  std::string("<CreateBucketConfiguration xmlns="
249  "\"http://s3.amazonaws.com/doc/2006-03-01/\">"
250  "<LocationConstraint>") + region_ + "</LocationConstraint>"
251  "</CreateBucketConfiguration>";
252  info->origin->Append(request_content.data(), request_content.length());
253  info->origin->Commit();
254  }
255 
256  RequestCtrl req_ctrl;
257  MakePipe(req_ctrl.pipe_wait);
258  info->callback = const_cast<void*>(static_cast<void const*>(MakeClosure(
259  &S3Uploader::OnReqComplete, this, &req_ctrl)));
260 
261  IncJobsInFlight();
262  UploadJobInfo(info);
263  req_ctrl.WaitFor();
264 
265  return req_ctrl.return_code == 0;
266 }
267 
268 
269 unsigned int S3Uploader::GetNumberOfErrors() const {
270  return atomic_read32(&io_errors_);
271 }
272 
273 
278  LogCvmfs(kLogUploadS3, kLogDebug, "Upload_S3 WorkerThread started.");
279  S3Uploader *uploader = reinterpret_cast<S3Uploader *>(data);
280 
281  while (true) {
282  s3fanout::JobInfo *info = uploader->s3fanout_mgr_->PopCompletedJob();
283  if (!info)
284  break;
285  // Report completed job
286  int reply_code = 0;
287  if (info->error_code != s3fanout::kFailOk) {
288  if ((info->request != s3fanout::JobInfo::kReqHeadOnly) ||
289  (info->error_code != s3fanout::kFailNotFound)) {
291  "Upload job for '%s' failed. (error code: %d - %s)",
292  info->object_key.c_str(),
293  info->error_code,
295  reply_code = 99;
296  atomic_inc32(&uploader->io_errors_);
297  }
298  }
299  if (info->request == s3fanout::JobInfo::kReqDelete) {
300  uploader->Respond(NULL, UploaderResults());
301  } else if (info->request == s3fanout::JobInfo::kReqHeadOnly) {
302  if (info->error_code == s3fanout::kFailNotFound) reply_code = 1;
303  uploader->Respond(static_cast<CallbackTN*>(info->callback),
305  reply_code));
306  } else {
308  // The HEAD request was not transformed into a PUT request, thus this
309  // was a duplicate
310  // Uploaded catalogs are always unique ->
311  // assume this was a regular file and decrease appropriate counters
312  uploader->CountDuplicates();
313  uploader->DecUploadedChunks();
314  uploader->CountUploadedBytes(-(info->payload_size));
315  }
316  uploader->Respond(static_cast<CallbackTN*>(info->callback),
318  reply_code));
319 
320  assert(!info->origin.IsValid());
321  }
322  delete info;
323  }
324 
325  LogCvmfs(kLogUploadS3, kLogDebug, "Upload_S3 WorkerThread finished.");
326  return NULL;
327 }
328 
329 
331  const std::string &remote_path,
332  IngestionSource *source,
333  const CallbackTN *callback
334 ) {
335  bool rvb = source->Open();
336  if (!rvb) {
337  Respond(callback, UploaderResults(100, source->GetPath()));
338  return;
339  }
340  uint64_t size;
341  rvb = source->GetSize(&size);
342  assert(rvb);
343 
344  FileBackedBuffer *origin =
346  spooler_definition().temporary_path);
347 
348  unsigned char buffer[kPageSize];
349  ssize_t nbytes;
350  do {
351  nbytes = source->Read(buffer, kPageSize);
352  if (nbytes > 0) origin->Append(buffer, nbytes);
353  if (nbytes < 0) {
354  source->Close();
355  delete origin;
356  Respond(callback, UploaderResults(100, source->GetPath()));
357  return;
358  }
359  } while (nbytes == kPageSize);
360  source->Close();
361  origin->Commit();
362 
363  s3fanout::JobInfo *info =
364  new s3fanout::JobInfo(repository_alias_ + "/" + remote_path,
365  const_cast<void*>(
366  static_cast<void const*>(callback)),
367  origin);
368 
369  if (HasPrefix(remote_path, ".cvmfs", false /*ignore_case*/)) {
371  } else if (HasSuffix(remote_path, ".html", false)) {
373  } else {
374  if (peek_before_put_)
376  }
377 
378  RequestCtrl req_ctrl;
379  MakePipe(req_ctrl.pipe_wait);
380  req_ctrl.callback_forward = callback;
381  req_ctrl.original_path = source->GetPath();
382  info->callback = const_cast<void*>(static_cast<void const*>(MakeClosure(
383  &S3Uploader::OnReqComplete, this, &req_ctrl)));
384 
385  UploadJobInfo(info);
386  req_ctrl.WaitFor();
387  LogCvmfs(kLogUploadS3, kLogDebug, "Uploading from source finished: %s",
388  source->GetPath().c_str());
389 }
390 
391 
394  "Uploading:\n"
395  "--> Object: '%s'\n"
396  "--> Bucket: '%s'\n"
397  "--> Host: '%s'\n",
398  info->object_key.c_str(),
399  bucket_.c_str(),
400  host_name_port_.c_str());
401 
402  s3fanout_mgr_->PushNewJob(info);
403 }
404 
405 
407  return new S3StreamHandle(callback, kInMemoryObjectThreshold,
408  spooler_definition().temporary_path);
409 }
410 
411 
413  UploadStreamHandle *handle,
414  UploadBuffer buffer,
415  const CallbackTN *callback)
416 {
417  S3StreamHandle *s3_handle = static_cast<S3StreamHandle*>(handle);
418 
419  s3_handle->buffer->Append(buffer.data, buffer.size);
421 }
422 
423 
425  UploadStreamHandle *handle,
426  const shash::Any &content_hash)
427 {
428  S3StreamHandle *s3_handle = static_cast<S3StreamHandle*>(handle);
429 
430  // New file name based on content hash or remote_path override
431  std::string final_path;
432  if (s3_handle->remote_path != "") {
433  final_path = repository_alias_ + "/" + s3_handle->remote_path;
434  } else {
435  final_path = repository_alias_ + "/data/" + content_hash.MakePath();
436  }
437 
438  s3_handle->buffer->Commit();
439 
440  size_t bytes_uploaded = s3_handle->buffer->GetSize();
441 
442  s3fanout::JobInfo *info =
443  new s3fanout::JobInfo(final_path,
444  const_cast<void*>(
445  static_cast<void const*>(
446  handle->commit_callback)),
447  s3_handle->buffer.Release());
448 
449  if (peek_before_put_)
451  UploadJobInfo(info);
452 
453  // Remove the temporary file
454  delete s3_handle;
455 
456  // Update statistics counters
457  if (!content_hash.HasSuffix() ||
458  content_hash.suffix == shash::kSuffixPartial) {
460  CountUploadedBytes(bytes_uploaded);
461  } else if (content_hash.suffix == shash::kSuffixCatalog) {
463  CountUploadedCatalogBytes(bytes_uploaded);
464  }
465 }
466 
467 
468 s3fanout::JobInfo *S3Uploader::CreateJobInfo(const std::string& path) const {
470  return new s3fanout::JobInfo(path, NULL, buf);
471 }
472 
473 
474 void S3Uploader::DoRemoveAsync(const std::string& file_to_delete) {
475  const std::string mangled_path = repository_alias_ + "/" + file_to_delete;
476  s3fanout::JobInfo *info = CreateJobInfo(mangled_path);
477 
479 
480  LogCvmfs(kLogUploadS3, kLogDebug, "Asynchronously removing %s/%s",
481  bucket_.c_str(), info->object_key.c_str());
482  s3fanout_mgr_->PushNewJob(info);
483 }
484 
485 
487  const upload::UploaderResults &results,
488  RequestCtrl *ctrl)
489 {
490  ctrl->return_code = results.return_code;
491  if (ctrl->callback_forward != NULL) {
492  // We are already in Respond() so we must not call it again
493  upload::UploaderResults fix_path(results.return_code, ctrl->original_path);
494  (*(ctrl->callback_forward))(fix_path);
495  delete ctrl->callback_forward;
496  ctrl->callback_forward = NULL;
497  }
498  char c = 'c';
499  WritePipe(ctrl->pipe_wait[1], &c, 1);
500 }
501 
502 
503 bool S3Uploader::Peek(const std::string& path) {
504  const std::string mangled_path = repository_alias_ + "/" + path;
505  s3fanout::JobInfo *info = CreateJobInfo(mangled_path);
506 
507  RequestCtrl req_ctrl;
508  MakePipe(req_ctrl.pipe_wait);
510  info->callback = const_cast<void*>(static_cast<void const*>(MakeClosure(
511  &S3Uploader::OnReqComplete, this, &req_ctrl)));
512 
513  IncJobsInFlight();
514  UploadJobInfo(info);
515  req_ctrl.WaitFor();
516 
517  return req_ctrl.return_code == 0;
518 }
519 
520 
521 // noop: no mkdir needed in S3 storage
522 bool S3Uploader::Mkdir(const std::string &path) {
523  return true;
524 }
525 
526 
528  return false; // TODO(rmeusel): implement
529 }
530 
531 
532 int64_t S3Uploader::DoGetObjectSize(const std::string &file_name) {
533  // TODO(dosarudaniel): use a head request for byte count
534  // Re-enable 661 integration test when done
535  return -EOPNOTSUPP;
536 }
537 
538 } // namespace upload
std::string host_name_
Definition: upload_s3.h:121
virtual bool Create()
Definition: upload_s3.cc:239
std::string proxy_
Definition: upload_s3.h:134
static FileBackedBuffer * Create(uint64_t in_memory_threshold, const std::string &tmp_dir="/tmp/")
DriverType driver_type
the type of the spooler driver
virtual void DoUpload(const std::string &remote_path, IngestionSource *source, const CallbackTN *callback)
Definition: upload_s3.cc:330
#define PANIC(...)
Definition: exception.h:29
void Respond(const CallbackTN *callback, const UploaderResults &result) const
virtual void FinalizeStreamedUpload(UploadStreamHandle *handle, const shash::Any &content_hash)
Definition: upload_s3.cc:424
bool IsOn(const std::string &param_value) const
Definition: options.cc:409
std::string secret_key_
Definition: upload_s3.h:130
bool HasSuffix() const
Definition: hash.h:239
int64_t DoGetObjectSize(const std::string &file_name)
Definition: upload_s3.cc:532
virtual bool Close()=0
S3Uploader(const SpoolerDefinition &spooler_definition)
Definition: upload_s3.cc:47
const std::string object_key
Definition: s3fanout.h:108
assert((mem||(size==0))&&"Out Of Memory")
const SpoolerDefinition & spooler_definition() const
virtual bool GetSize(uint64_t *size)=0
static const unsigned kDefaultNumRetries
Definition: upload_s3.h:88
s3fanout::JobInfo * CreateJobInfo(const std::string &path) const
Definition: upload_s3.cc:468
void CountUploadedChunks() const
static bool WillHandle(const SpoolerDefinition &spooler_definition)
Definition: upload_s3.cc:234
s3fanout::AuthzMethods authz_method_
Definition: upload_s3.h:131
void MakePipe(int pipe_fd[2])
Definition: posix.cc:492
static const unsigned kDefaultTimeoutSec
Definition: upload_s3.h:89
static void * MainCollectResults(void *data)
Definition: upload_s3.cc:277
Failures error_code
Definition: s3fanout.h:146
virtual ~S3Uploader()
Definition: upload_s3.cc:101
std::string repository_alias_
Definition: upload_s3.h:119
void OnReqComplete(const upload::UploaderResults &results, RequestCtrl *ctrl)
Definition: upload_s3.cc:486
bool FileExists(const std::string &path)
Definition: posix.cc:791
const CallbackTN * commit_callback
virtual bool Open()=0
atomic_int32 io_errors_
Definition: upload_s3.h:137
UniquePtr< s3fanout::S3FanoutManager > s3fanout_mgr_
Definition: upload_s3.h:118
virtual void DoRemoveAsync(const std::string &file_to_delete)
Definition: upload_s3.cc:474
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:290
uint64_t payload_size
Definition: s3fanout.h:144
const char kSuffixPartial
Definition: hash.h:57
virtual bool Peek(const std::string &path)
Definition: upload_s3.cc:503
bool HasSuffix(const std::string &str, const std::string &suffix, const bool ignore_case)
Definition: string.cc:281
const char kSuffixCatalog
Definition: hash.h:54
virtual UploadStreamHandle * InitStreamedUpload(const CallbackTN *callback=NULL)
Definition: upload_s3.cc:406
std::string x_amz_acl_
Definition: upload_s3.h:140
std::string access_key_
Definition: upload_s3.h:129
UniquePtr< FileBackedBuffer > origin
Definition: s3fanout.h:110
virtual bool Mkdir(const std::string &path)
Definition: upload_s3.cc:522
std::string region_
Definition: upload_s3.h:122
void CountUploadedBytes(int64_t bytes_written) const
UniquePtr< FileBackedBuffer > buffer
Definition: upload_s3.h:34
pthread_t thread_collect_results_
Definition: upload_s3.h:138
static const unsigned kDefaultBackoffMaxMs
Definition: upload_s3.h:91
bool IsValid() const
Definition: pointer.h:43
static const unsigned kDefaultNumParallelUploads
Definition: upload_s3.h:87
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:267
bool GetValue(const std::string &key, std::string *value) const
Definition: options.cc:376
virtual std::string GetPath() const =0
void ParsePath(const std::string &config_file, const bool external)
Definition: options.cc:146
void UploadJobInfo(s3fanout::JobInfo *info)
Definition: upload_s3.cc:392
void Append(const void *source, uint64_t len)
void CountUploadedCatalogBytes(int64_t bytes_written) const
static const char * x_amz_acl_allowed_values_[8]
Definition: upload_s3.cc:28
RequestType request
Definition: s3fanout.h:145
uint64_t String2Uint64(const string &value)
Definition: string.cc:228
T * Release()
Definition: pointer.h:44
virtual bool PlaceBootstrappingShortcut(const shash::Any &object)
Definition: upload_s3.cc:527
const unsigned kPageSize
Definition: posix.h:30
const CallbackTN * callback_forward
Definition: upload_s3.h:104
static const unsigned kDefaultBackoffInitMs
Definition: upload_s3.h:90
void CountUploadedCatalogs() const
virtual ssize_t Read(void *buffer, size_t nbyte)=0
bool ParseSpoolerDefinition(const SpoolerDefinition &spooler_definition)
Definition: upload_s3.cc:108
Suffix suffix
Definition: hash.h:126
std::string MakePath() const
Definition: hash.h:316
virtual unsigned int GetNumberOfErrors() const
Definition: upload_s3.cc:269
static const unsigned kInMemoryObjectThreshold
Definition: upload_s3.h:92
virtual void StreamedUpload(UploadStreamHandle *handle, UploadBuffer buffer, const CallbackTN *callback)
Definition: upload_s3.cc:412
const std::string temporary_path_
Definition: upload_s3.h:136
unsigned num_retries_
Definition: upload_s3.h:127
bool IsDefined(const std::string &key)
Definition: options.cc:370
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:501
static void size_t size
Definition: smalloc.h:54
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:513
std::string host_name_port_
Definition: upload_s3.h:120
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:551
std::string flavor_
Definition: upload_s3.h:123
const char * Code2Ascii(const Failures error)
Definition: s3fanout.h:57
unsigned timeout_sec_
Definition: upload_s3.h:128
std::string bucket_
Definition: upload_s3.h:124
static CallbackTN * MakeClosure(typename BoundClosure< UploaderResults, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, const ClosureDataT &closure_data)
Definition: async.h:204
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528