34 "bucket-owner-full-control",
107 const std::vector<std::string> config =
SplitString(
109 if (config.size() != 2) {
111 "Failed to parse spooler configuration string '%s'.\n"
112 "Provide: <repo_alias>@/path/to/s3.conf",
117 const std::string &config_path = config[1];
121 config_path.c_str());
128 options_manager.
ParsePath(config_path,
false);
129 std::string parameter;
133 "Failed to parse CVMFS_S3_HOST from '%s'", config_path.c_str());
138 "Failed to parse CVMFS_S3_ACCESS_KEY from '%s'.",
139 config_path.c_str());
144 "Failed to parse CVMFS_S3_SECRET_KEY from '%s'.",
145 config_path.c_str());
150 "Failed to parse CVMFS_S3_BUCKET from '%s'.", config_path.c_str());
153 if (options_manager.
GetValue(
"CVMFS_S3_DNS_BUCKETS", ¶meter)) {
154 if (parameter ==
"false") {
158 if (options_manager.
GetValue(
"CVMFS_S3_MAX_NUMBER_OF_PARALLEL_CONNECTIONS",
162 if (options_manager.
GetValue(
"CVMFS_S3_MAX_RETRIES", ¶meter)) {
165 if (options_manager.
GetValue(
"CVMFS_S3_TIMEOUT", ¶meter)) {
174 }
else if (
flavor_ ==
"awsv2") {
176 }
else if (
flavor_ ==
"awsv4") {
180 "Failed to parse CVMFS_S3_FLAVOR from '%s', "
181 "valid options are azure, awsv2 or awsv4",
182 config_path.c_str());
186 if (options_manager.
GetValue(
"CVMFS_S3_PEEK_BEFORE_PUT", ¶meter)) {
189 if (options_manager.
GetValue(
"CVMFS_S3_X_AMZ_ACL", ¶meter)) {
190 bool isAllowed =
false;
193 for (
size_t i = 0; i < len; i++) {
201 "%s is not an allowed value for CVMFS_S3_X_AMZ_ACL",
208 if (options_manager.
GetValue(
"CVMFS_S3_USE_HTTPS", ¶meter)) {
212 if (options_manager.
GetValue(
"CVMFS_S3_PORT", ¶meter)) {
218 if (options_manager.
IsDefined(
"CVMFS_S3_PROXY")) {
237 std::string request_content;
239 request_content = std::string(
"<CreateBucketConfiguration xmlns="
240 "\"http://s3.amazonaws.com/doc/2006-03-01/\">"
241 "<LocationConstraint>")
243 +
"</LocationConstraint>"
244 "</CreateBucketConfiguration>";
245 info->
origin->Append(request_content.data(), request_content.length());
251 info->
callback =
const_cast<void *
>(
static_cast<void const *
>(
284 "Upload job for '%s' failed. (error code: %d - %s)",
309 static_cast<CallbackTN *>(info->
callback),
325 bool rvb = source->
Open();
342 origin->
Append(buffer, nbytes);
355 const_cast<void *>(static_cast<void const *>(callback)),
358 if (
HasPrefix(remote_path,
".cvmfs",
false )) {
360 }
else if (
HasSuffix(remote_path,
".html",
false)) {
371 info->
callback =
const_cast<void *
>(
static_cast<void const *
>(
414 std::string final_path;
421 s3_handle->
buffer->Commit();
423 size_t bytes_uploaded = s3_handle->
buffer->GetSize();
427 const_cast<void *>(static_cast<void const *>(handle->
commit_callback)),
489 info->
callback =
const_cast<void *
>(
static_cast<void const *
>(
AuthzMethods authz_method
static FileBackedBuffer * Create(uint64_t in_memory_threshold, const std::string &tmp_dir="/tmp/")
DriverType driver_type
the type of the spooler driver
virtual void DoUpload(const std::string &remote_path, IngestionSource *source, const CallbackTN *callback)
CVMFS_EXPORT const LogSource source
void Respond(const CallbackTN *callback, const UploaderResults &result) const
virtual void FinalizeStreamedUpload(UploadStreamHandle *handle, const shash::Any &content_hash)
bool IsOn(const std::string ¶m_value) const
int64_t DoGetObjectSize(const std::string &file_name)
S3Uploader(const SpoolerDefinition &spooler_definition)
void DecUploadedChunks() const
const std::string object_key
assert((mem||(size==0))&&"Out Of Memory")
const SpoolerDefinition & spooler_definition() const
virtual bool GetSize(uint64_t *size)=0
static const unsigned kDefaultNumRetries
s3fanout::JobInfo * CreateJobInfo(const std::string &path) const
std::string hostname_port
void CountUploadedChunks() const
static bool WillHandle(const SpoolerDefinition &spooler_definition)
s3fanout::AuthzMethods authz_method_
void MakePipe(int pipe_fd[2])
static const unsigned kDefaultTimeoutSec
static void * MainCollectResults(void *data)
std::string repository_alias_
void OnReqComplete(const upload::UploaderResults &results, RequestCtrl *ctrl)
bool FileExists(const std::string &path)
const CallbackTN * commit_callback
UniquePtr< s3fanout::S3FanoutManager > s3fanout_mgr_
virtual void DoRemoveAsync(const std::string &file_to_delete)
std::string original_path
vector< string > SplitString(const string &str, char delim)
const char kSuffixPartial
virtual bool Peek(const std::string &path)
bool HasSuffix(const std::string &str, const std::string &suffix, const bool ignore_case)
const char kSuffixCatalog
virtual UploadStreamHandle * InitStreamedUpload(const CallbackTN *callback=NULL)
int num_parallel_uploads_
unsigned opt_backoff_max_ms
UniquePtr< FileBackedBuffer > origin
virtual bool Mkdir(const std::string &path)
void CountUploadedBytes(int64_t bytes_written) const
uint32_t pool_max_handles
UniquePtr< FileBackedBuffer > buffer
pthread_t thread_collect_results_
static const unsigned kDefaultBackoffMaxMs
static const unsigned kDefaultNumParallelUploads
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
bool GetValue(const std::string &key, std::string *value) const
unsigned opt_backoff_init_ms
void CountDuplicates() const
virtual std::string GetPath() const =0
void ParsePath(const std::string &config_file, const bool external)
void UploadJobInfo(s3fanout::JobInfo *info)
void Append(const void *source, uint64_t len)
void CountUploadedCatalogBytes(int64_t bytes_written) const
static const char * x_amz_acl_allowed_values_[8]
uint64_t String2Uint64(const string &value)
virtual bool PlaceBootstrappingShortcut(const shash::Any &object)
const CallbackTN * callback_forward
static const unsigned kDefaultBackoffInitMs
void CountUploadedCatalogs() const
virtual ssize_t Read(void *buffer, size_t nbyte)=0
bool ParseSpoolerDefinition(const SpoolerDefinition &spooler_definition)
std::string MakePath() const
virtual unsigned int GetNumberOfErrors() const
static const unsigned kInMemoryObjectThreshold
virtual void StreamedUpload(UploadStreamHandle *handle, UploadBuffer buffer, const CallbackTN *callback)
const std::string temporary_path_
bool IsDefined(const std::string &key)
void WritePipe(int fd, const void *buf, size_t nbyte)
void ReadPipe(int fd, void *buf, size_t nbyte)
std::string host_name_port_
void ClosePipe(int pipe_fd[2])
std::string spooler_configuration
const char * Code2Ascii(const Failures error)
static CallbackTN * MakeClosure(typename BoundClosure< UploaderResults, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, const ClosureDataT &closure_data)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)