CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
upload_s3.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_UPLOAD_S3_H_
6 #define CVMFS_UPLOAD_S3_H_
7 
8 #include <pthread.h>
9 
10 #include <string>
11 #include <utility>
12 #include <vector>
13 
14 #include "network/s3fanout.h"
15 #include "upload_facility.h"
16 #include "util/atomic.h"
18 #include "util/pointer.h"
19 #include "util/single_copy.h"
20 
21 namespace upload {
22 
26  uint64_t in_memory_threshold,
27  const std::string &tmp_dir = "/tmp/")
28  : UploadStreamHandle(commit_callback)
29  {
30  buffer = FileBackedBuffer::Create(in_memory_threshold, tmp_dir);
31  }
32 
33  // Ownership is later transferred to the S3 fanout
35 };
36 
43 class S3Uploader : public AbstractUploader {
44  public:
46  virtual ~S3Uploader();
47  static bool WillHandle(const SpoolerDefinition &spooler_definition);
48 
49  virtual std::string name() const { return "S3"; }
50 
51  virtual bool Create();
52 
60  virtual void DoUpload(const std::string &remote_path,
61  IngestionSource *source,
62  const CallbackTN *callback);
63 
65  const CallbackTN *callback = NULL);
66  virtual void StreamedUpload(UploadStreamHandle *handle, UploadBuffer buffer,
67  const CallbackTN *callback);
68  virtual void FinalizeStreamedUpload(UploadStreamHandle *handle,
69  const shash::Any &content_hash);
70 
71  virtual void DoRemoveAsync(const std::string &file_to_delete);
72  virtual bool Peek(const std::string &path);
73  virtual bool Mkdir(const std::string &path);
74  virtual bool PlaceBootstrappingShortcut(const shash::Any &object);
75 
76  virtual unsigned int GetNumberOfErrors() const;
77  int64_t DoGetObjectSize(const std::string &file_name);
78 
79  // Only for testing
81  return s3fanout_mgr_.weak_ref();
82  }
83 
84  private:
85  static const unsigned kDefaultPort = 80;
86  static const unsigned kHttpsPort = 443;
87  static const unsigned kDefaultNumParallelUploads = 16;
88  static const unsigned kDefaultNumRetries = 3;
89  static const unsigned kDefaultTimeoutSec = 60;
90  static const unsigned kDefaultBackoffInitMs = 100;
91  static const unsigned kDefaultBackoffMaxMs = 2000;
92  static const unsigned kInMemoryObjectThreshold = 500*1024; // 500KiB
93 
94  // Used to make the async HTTP requests synchronous in Peek() Create(),
95  // and Upload() of single bits
98  pipe_wait[0] = pipe_wait[1] = -1;
99  }
100 
101  void WaitFor();
102 
105  std::string original_path;
106  int pipe_wait[2];
107  };
108 
109  void OnReqComplete(const upload::UploaderResults &results, RequestCtrl *ctrl);
110 
111  static void *MainCollectResults(void *data);
112 
113  bool ParseSpoolerDefinition(const SpoolerDefinition &spooler_definition);
114  void UploadJobInfo(s3fanout::JobInfo *info);
115 
116  s3fanout::JobInfo *CreateJobInfo(const std::string &path) const;
117 
119  std::string repository_alias_;
120  std::string host_name_port_;
121  std::string host_name_;
122  std::string region_;
123  std::string flavor_;
124  std::string bucket_;
127  unsigned num_retries_;
128  unsigned timeout_sec_;
129  std::string access_key_;
130  std::string secret_key_;
134  std::string proxy_;
135 
136  const std::string temporary_path_;
139 
140  std::string x_amz_acl_;
141 }; // S3Uploader
142 
143 } // namespace upload
144 
145 #endif // CVMFS_UPLOAD_S3_H_
std::string host_name_
Definition: upload_s3.h:121
virtual bool Create()
Definition: upload_s3.cc:239
s3fanout::S3FanoutManager * GetS3FanoutManager()
Definition: upload_s3.h:80
std::string proxy_
Definition: upload_s3.h:134
static const unsigned kDefaultPort
Definition: upload_s3.h:85
static FileBackedBuffer * Create(uint64_t in_memory_threshold, const std::string &tmp_dir="/tmp/")
T * weak_ref() const
Definition: pointer.h:42
virtual void DoUpload(const std::string &remote_path, IngestionSource *source, const CallbackTN *callback)
Definition: upload_s3.cc:330
virtual void FinalizeStreamedUpload(UploadStreamHandle *handle, const shash::Any &content_hash)
Definition: upload_s3.cc:424
std::string secret_key_
Definition: upload_s3.h:130
int64_t DoGetObjectSize(const std::string &file_name)
Definition: upload_s3.cc:532
S3Uploader(const SpoolerDefinition &spooler_definition)
Definition: upload_s3.cc:47
const SpoolerDefinition & spooler_definition() const
S3StreamHandle(const CallbackTN *commit_callback, uint64_t in_memory_threshold, const std::string &tmp_dir="/tmp/")
Definition: upload_s3.h:24
static const unsigned kDefaultNumRetries
Definition: upload_s3.h:88
s3fanout::JobInfo * CreateJobInfo(const std::string &path) const
Definition: upload_s3.cc:468
static bool WillHandle(const SpoolerDefinition &spooler_definition)
Definition: upload_s3.cc:234
s3fanout::AuthzMethods authz_method_
Definition: upload_s3.h:131
static const unsigned kDefaultTimeoutSec
Definition: upload_s3.h:89
static void * MainCollectResults(void *data)
Definition: upload_s3.cc:277
virtual ~S3Uploader()
Definition: upload_s3.cc:101
std::string repository_alias_
Definition: upload_s3.h:119
AuthzMethods
Definition: s3fanout.h:32
void OnReqComplete(const upload::UploaderResults &results, RequestCtrl *ctrl)
Definition: upload_s3.cc:486
int32_t atomic_int32
Definition: atomic.h:17
const CallbackTN * commit_callback
atomic_int32 io_errors_
Definition: upload_s3.h:137
UniquePtr< s3fanout::S3FanoutManager > s3fanout_mgr_
Definition: upload_s3.h:118
virtual void DoRemoveAsync(const std::string &file_to_delete)
Definition: upload_s3.cc:474
virtual bool Peek(const std::string &path)
Definition: upload_s3.cc:503
virtual UploadStreamHandle * InitStreamedUpload(const CallbackTN *callback=NULL)
Definition: upload_s3.cc:406
std::string x_amz_acl_
Definition: upload_s3.h:140
std::string access_key_
Definition: upload_s3.h:129
virtual bool Mkdir(const std::string &path)
Definition: upload_s3.cc:522
std::string region_
Definition: upload_s3.h:122
UniquePtr< FileBackedBuffer > buffer
Definition: upload_s3.h:34
pthread_t thread_collect_results_
Definition: upload_s3.h:138
static const unsigned kDefaultBackoffMaxMs
Definition: upload_s3.h:91
static const unsigned kDefaultNumParallelUploads
Definition: upload_s3.h:87
void UploadJobInfo(s3fanout::JobInfo *info)
Definition: upload_s3.cc:392
AbstractUploader::CallbackTN CallbackTN
virtual bool PlaceBootstrappingShortcut(const shash::Any &object)
Definition: upload_s3.cc:527
const CallbackTN * callback_forward
Definition: upload_s3.h:104
static const unsigned kDefaultBackoffInitMs
Definition: upload_s3.h:90
virtual std::string name() const
Definition: upload_s3.h:49
bool ParseSpoolerDefinition(const SpoolerDefinition &spooler_definition)
Definition: upload_s3.cc:108
static const unsigned kHttpsPort
Definition: upload_s3.h:86
virtual unsigned int GetNumberOfErrors() const
Definition: upload_s3.cc:269
static const unsigned kInMemoryObjectThreshold
Definition: upload_s3.h:92
virtual void StreamedUpload(UploadStreamHandle *handle, UploadBuffer buffer, const CallbackTN *callback)
Definition: upload_s3.cc:412
const std::string temporary_path_
Definition: upload_s3.h:136
unsigned num_retries_
Definition: upload_s3.h:127
CallbackBase< UploaderResults > CallbackTN
Definition: async.h:192
std::string host_name_port_
Definition: upload_s3.h:120
std::string flavor_
Definition: upload_s3.h:123
unsigned timeout_sec_
Definition: upload_s3.h:128
std::string bucket_
Definition: upload_s3.h:124