35 "bucket-owner-full-control",
95 int retval = pthread_create(
111 const std::vector<std::string> config =
113 if (config.size() != 2) {
115 "Failed to parse spooler configuration string '%s'.\n"
116 "Provide: <repo_alias>@/path/to/s3.conf",
121 const std::string &config_path = config[1];
125 "Cannot find S3 config file at '%s'",
126 config_path.c_str());
133 options_manager.
ParsePath(config_path,
false);
134 std::string parameter;
138 "Failed to parse CVMFS_S3_HOST from '%s'",
139 config_path.c_str());
144 "Failed to parse CVMFS_S3_ACCESS_KEY from '%s'.",
145 config_path.c_str());
150 "Failed to parse CVMFS_S3_SECRET_KEY from '%s'.",
151 config_path.c_str());
156 "Failed to parse CVMFS_S3_BUCKET from '%s'.",
157 config_path.c_str());
160 if (options_manager.
GetValue(
"CVMFS_S3_DNS_BUCKETS", ¶meter)) {
161 if (parameter ==
"false") {
165 if (options_manager.
GetValue(
"CVMFS_S3_MAX_NUMBER_OF_PARALLEL_CONNECTIONS",
170 if (options_manager.
GetValue(
"CVMFS_S3_MAX_RETRIES", ¶meter)) {
173 if (options_manager.
GetValue(
"CVMFS_S3_TIMEOUT", ¶meter)) {
182 }
else if (
flavor_ ==
"awsv2") {
184 }
else if (
flavor_ ==
"awsv4") {
188 "Failed to parse CVMFS_S3_FLAVOR from '%s', "
189 "valid options are azure, awsv2 or awsv4",
190 config_path.c_str());
194 if (options_manager.
GetValue(
"CVMFS_S3_PEEK_BEFORE_PUT", ¶meter)) {
197 if (options_manager.
GetValue(
"CVMFS_S3_X_AMZ_ACL", ¶meter)) {
198 bool isAllowed =
false;
201 for (
size_t i = 0; i < len; i++) {
209 "%s is not an allowed value for CVMFS_S3_X_AMZ_ACL",
216 if (options_manager.
GetValue(
"CVMFS_S3_USE_HTTPS", ¶meter)) {
220 if (options_manager.
GetValue(
"CVMFS_S3_PORT", ¶meter)) {
226 if (options_manager.
IsDefined(
"CVMFS_S3_PROXY")) {
245 std::string request_content;
248 std::string(
"<CreateBucketConfiguration xmlns="
249 "\"http://s3.amazonaws.com/doc/2006-03-01/\">"
250 "<LocationConstraint>") +
region_ +
"</LocationConstraint>"
251 "</CreateBucketConfiguration>";
252 info->
origin->Append(request_content.data(), request_content.length());
291 "Upload job for '%s' failed. (error code: %d - %s)",
331 const std::string &remote_path,
335 bool rvb = source->
Open();
352 if (nbytes > 0) origin->
Append(buffer, nbytes);
366 static_cast<void const*>(callback)),
369 if (
HasPrefix(remote_path,
".cvmfs",
false )) {
371 }
else if (
HasSuffix(remote_path,
".html",
false)) {
431 std::string final_path;
438 s3_handle->
buffer->Commit();
440 size_t bytes_uploaded = s3_handle->
buffer->GetSize();
445 static_cast<void const*>(
AuthzMethods authz_method
static FileBackedBuffer * Create(uint64_t in_memory_threshold, const std::string &tmp_dir="/tmp/")
DriverType driver_type
the type of the spooler driver
virtual void DoUpload(const std::string &remote_path, IngestionSource *source, const CallbackTN *callback)
void Respond(const CallbackTN *callback, const UploaderResults &result) const
virtual void FinalizeStreamedUpload(UploadStreamHandle *handle, const shash::Any &content_hash)
bool IsOn(const std::string ¶m_value) const
int64_t DoGetObjectSize(const std::string &file_name)
S3Uploader(const SpoolerDefinition &spooler_definition)
void DecUploadedChunks() const
const std::string object_key
assert((mem||(size==0))&&"Out Of Memory")
const SpoolerDefinition & spooler_definition() const
virtual bool GetSize(uint64_t *size)=0
static const unsigned kDefaultNumRetries
s3fanout::JobInfo * CreateJobInfo(const std::string &path) const
std::string hostname_port
void CountUploadedChunks() const
static bool WillHandle(const SpoolerDefinition &spooler_definition)
s3fanout::AuthzMethods authz_method_
void MakePipe(int pipe_fd[2])
static const unsigned kDefaultTimeoutSec
static void * MainCollectResults(void *data)
std::string repository_alias_
void OnReqComplete(const upload::UploaderResults &results, RequestCtrl *ctrl)
bool FileExists(const std::string &path)
const CallbackTN * commit_callback
UniquePtr< s3fanout::S3FanoutManager > s3fanout_mgr_
virtual void DoRemoveAsync(const std::string &file_to_delete)
std::string original_path
vector< string > SplitString(const string &str, char delim)
const char kSuffixPartial
virtual bool Peek(const std::string &path)
bool HasSuffix(const std::string &str, const std::string &suffix, const bool ignore_case)
const char kSuffixCatalog
virtual UploadStreamHandle * InitStreamedUpload(const CallbackTN *callback=NULL)
int num_parallel_uploads_
unsigned opt_backoff_max_ms
UniquePtr< FileBackedBuffer > origin
virtual bool Mkdir(const std::string &path)
void CountUploadedBytes(int64_t bytes_written) const
uint32_t pool_max_handles
UniquePtr< FileBackedBuffer > buffer
pthread_t thread_collect_results_
static const unsigned kDefaultBackoffMaxMs
static const unsigned kDefaultNumParallelUploads
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
bool GetValue(const std::string &key, std::string *value) const
unsigned opt_backoff_init_ms
void CountDuplicates() const
virtual std::string GetPath() const =0
void ParsePath(const std::string &config_file, const bool external)
void UploadJobInfo(s3fanout::JobInfo *info)
void Append(const void *source, uint64_t len)
void CountUploadedCatalogBytes(int64_t bytes_written) const
static const char * x_amz_acl_allowed_values_[8]
uint64_t String2Uint64(const string &value)
virtual bool PlaceBootstrappingShortcut(const shash::Any &object)
const CallbackTN * callback_forward
static const unsigned kDefaultBackoffInitMs
void CountUploadedCatalogs() const
virtual ssize_t Read(void *buffer, size_t nbyte)=0
bool ParseSpoolerDefinition(const SpoolerDefinition &spooler_definition)
std::string MakePath() const
virtual unsigned int GetNumberOfErrors() const
static const unsigned kInMemoryObjectThreshold
virtual void StreamedUpload(UploadStreamHandle *handle, UploadBuffer buffer, const CallbackTN *callback)
const std::string temporary_path_
bool IsDefined(const std::string &key)
void WritePipe(int fd, const void *buf, size_t nbyte)
void ReadPipe(int fd, void *buf, size_t nbyte)
std::string host_name_port_
void ClosePipe(int pipe_fd[2])
std::string spooler_configuration
const char * Code2Ascii(const Failures error)
static CallbackTN * MakeClosure(typename BoundClosure< UploaderResults, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, const ClosureDataT &closure_data)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)