GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/upload_s3.cc
Date: 2025-10-19 02:35:28
Exec Total Coverage
Lines: 202 285 70.9%
Branches: 179 436 41.1%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "upload_s3.h"
6
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <inttypes.h>
10 #include <unistd.h>
11
12 #include <string>
13 #include <vector>
14
15 #include "compression/compression.h"
16 #include "network/s3fanout.h"
17 #include "options.h"
18 #include "util/exception.h"
19 #include "util/logging.h"
20 #include "util/posix.h"
21 #include "util/string.h"
22
23 namespace upload {
24
25 /*
26 * Allowed values of x-amz-acl according to S3 API
27 */
28 static const char *x_amz_acl_allowed_values_[8] = {"private",
29 "public-read",
30 "public-write",
31 "authenticated-read",
32 "aws-exec-read",
33 "bucket-owner-read",
34 "bucket-owner-full-control",
35 ""};
36
37 15872 void S3Uploader::RequestCtrl::WaitFor() {
38 char c;
39
1/2
✓ Branch 1 taken 15872 times.
✗ Branch 2 not taken.
15872 ReadPipe(pipe_wait[0], &c, 1);
40
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15872 times.
15872 assert(c == 'c');
41
1/2
✓ Branch 1 taken 15872 times.
✗ Branch 2 not taken.
15872 ClosePipe(pipe_wait);
42 15872 }
43
44
45 372 S3Uploader::S3Uploader(const SpoolerDefinition &spooler_definition)
46 : AbstractUploader(spooler_definition)
47 372 , dns_buckets_(true)
48 372 , num_parallel_uploads_(kDefaultNumParallelUploads)
49 372 , num_retries_(kDefaultNumRetries)
50 372 , timeout_sec_(kDefaultTimeoutSec)
51 372 , authz_method_(s3fanout::kAuthzAwsV2)
52 372 , peek_before_put_(true)
53 372 , use_https_(false)
54
1/2
✓ Branch 2 taken 372 times.
✗ Branch 3 not taken.
372 , proxy_("")
55
1/2
✓ Branch 1 taken 372 times.
✗ Branch 2 not taken.
372 , temporary_path_(spooler_definition.temporary_path)
56
2/4
✓ Branch 2 taken 372 times.
✗ Branch 3 not taken.
✓ Branch 14 taken 372 times.
✗ Branch 15 not taken.
744 , x_amz_acl_("public-read") {
57
2/4
✓ Branch 1 taken 372 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 372 times.
✗ Branch 4 not taken.
372 assert(spooler_definition.IsValid()
58 && spooler_definition.driver_type == SpoolerDefinition::S3);
59
60 372 atomic_init32(&io_errors_);
61
62
2/4
✓ Branch 1 taken 372 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 372 times.
372 if (!ParseSpoolerDefinition(spooler_definition)) {
63 PANIC(kLogStderr, "Error in parsing the spooler definition");
64 }
65
66
1/2
✓ Branch 1 taken 372 times.
✗ Branch 2 not taken.
372 s3fanout::S3FanoutManager::S3Config s3config;
67
1/2
✓ Branch 1 taken 372 times.
✗ Branch 2 not taken.
372 s3config.access_key = access_key_;
68
1/2
✓ Branch 1 taken 372 times.
✗ Branch 2 not taken.
372 s3config.secret_key = secret_key_;
69
1/2
✓ Branch 1 taken 372 times.
✗ Branch 2 not taken.
372 s3config.hostname_port = host_name_port_;
70 372 s3config.authz_method = authz_method_;
71
1/2
✓ Branch 1 taken 372 times.
✗ Branch 2 not taken.
372 s3config.region = region_;
72
1/2
✓ Branch 1 taken 372 times.
✗ Branch 2 not taken.
372 s3config.flavor = flavor_;
73
1/2
✓ Branch 1 taken 372 times.
✗ Branch 2 not taken.
372 s3config.bucket = bucket_;
74 372 s3config.dns_buckets = dns_buckets_;
75 372 s3config.pool_max_handles = num_parallel_uploads_;
76 372 s3config.opt_timeout_sec = timeout_sec_;
77 372 s3config.opt_max_retries = num_retries_;
78 372 s3config.opt_backoff_init_ms = kDefaultBackoffInitMs;
79 372 s3config.opt_backoff_max_ms = kDefaultBackoffMaxMs;
80
1/2
✓ Branch 1 taken 372 times.
✗ Branch 2 not taken.
372 s3config.x_amz_acl = x_amz_acl_;
81
82
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 372 times.
372 if (use_https_) {
83 s3config.protocol = "https";
84 } else {
85
1/2
✓ Branch 1 taken 372 times.
✗ Branch 2 not taken.
372 s3config.protocol = "http";
86 }
87
1/2
✓ Branch 1 taken 372 times.
✗ Branch 2 not taken.
372 s3config.proxy = proxy_;
88
89
3/6
✓ Branch 1 taken 372 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 372 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 372 times.
✗ Branch 8 not taken.
372 s3fanout_mgr_ = new s3fanout::S3FanoutManager(s3config);
90
1/2
✓ Branch 2 taken 372 times.
✗ Branch 3 not taken.
372 s3fanout_mgr_->Spawn();
91
92 372 const int retval = pthread_create(&thread_collect_results_, NULL,
93 MainCollectResults, this);
94
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 372 times.
372 assert(retval == 0);
95 372 }
96
97
98 2232 S3Uploader::~S3Uploader() {
99 // Signal termination to our own worker thread
100 744 s3fanout_mgr_->PushCompletedJob(NULL);
101 744 pthread_join(thread_collect_results_, NULL);
102 1488 }
103
104
105 372 bool S3Uploader::ParseSpoolerDefinition(
106 const SpoolerDefinition &spooler_definition) {
107 const std::vector<std::string> config = SplitString(
108
1/2
✓ Branch 1 taken 372 times.
✗ Branch 2 not taken.
372 spooler_definition.spooler_configuration, '@');
109
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 372 times.
372 if (config.size() != 2) {
110 LogCvmfs(kLogUploadS3, kLogStderr,
111 "Failed to parse spooler configuration string '%s'.\n"
112 "Provide: <repo_alias>@/path/to/s3.conf",
113 spooler_definition.spooler_configuration.c_str());
114 return false;
115 }
116
1/2
✓ Branch 2 taken 372 times.
✗ Branch 3 not taken.
372 repository_alias_ = config[0];
117 372 const std::string &config_path = config[1];
118
119
2/4
✓ Branch 1 taken 372 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 372 times.
372 if (!FileExists(config_path)) {
120 LogCvmfs(kLogUploadS3, kLogStderr, "Cannot find S3 config file at '%s'",
121 config_path.c_str());
122 return false;
123 }
124
125 // Parse S3 configuration
126 BashOptionsManager options_manager = BashOptionsManager(
127
4/8
✓ Branch 1 taken 372 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 372 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 372 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 372 times.
✗ Branch 11 not taken.
372 new DefaultOptionsTemplateManager(repository_alias_));
128
1/2
✓ Branch 1 taken 372 times.
✗ Branch 2 not taken.
372 options_manager.ParsePath(config_path, false);
129 372 std::string parameter;
130
131
3/6
✓ Branch 2 taken 372 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 372 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 372 times.
372 if (!options_manager.GetValue("CVMFS_S3_HOST", &host_name_)) {
132 LogCvmfs(kLogUploadS3, kLogStderr,
133 "Failed to parse CVMFS_S3_HOST from '%s'", config_path.c_str());
134 return false;
135 }
136
3/6
✓ Branch 2 taken 372 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 372 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 372 times.
372 if (!options_manager.GetValue("CVMFS_S3_ACCESS_KEY", &access_key_)) {
137 LogCvmfs(kLogUploadS3, kLogStderr,
138 "Failed to parse CVMFS_S3_ACCESS_KEY from '%s'.",
139 config_path.c_str());
140 return false;
141 }
142
3/6
✓ Branch 2 taken 372 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 372 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 372 times.
372 if (!options_manager.GetValue("CVMFS_S3_SECRET_KEY", &secret_key_)) {
143 LogCvmfs(kLogUploadS3, kLogStderr,
144 "Failed to parse CVMFS_S3_SECRET_KEY from '%s'.",
145 config_path.c_str());
146 return false;
147 }
148
3/6
✓ Branch 2 taken 372 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 372 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 372 times.
372 if (!options_manager.GetValue("CVMFS_S3_BUCKET", &bucket_)) {
149 LogCvmfs(kLogUploadS3, kLogStderr,
150 "Failed to parse CVMFS_S3_BUCKET from '%s'.", config_path.c_str());
151 return false;
152 }
153
3/6
✓ Branch 2 taken 372 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 372 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 372 times.
✗ Branch 10 not taken.
372 if (options_manager.GetValue("CVMFS_S3_DNS_BUCKETS", &parameter)) {
154
1/2
✓ Branch 1 taken 372 times.
✗ Branch 2 not taken.
372 if (parameter == "false") {
155 372 dns_buckets_ = false;
156 }
157 }
158
3/6
✓ Branch 2 taken 372 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 372 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 372 times.
✗ Branch 10 not taken.
372 if (options_manager.GetValue("CVMFS_S3_MAX_NUMBER_OF_PARALLEL_CONNECTIONS",
159 &parameter)) {
160
1/2
✓ Branch 1 taken 372 times.
✗ Branch 2 not taken.
372 num_parallel_uploads_ = String2Uint64(parameter);
161 }
162
3/6
✓ Branch 2 taken 372 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 372 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 372 times.
372 if (options_manager.GetValue("CVMFS_S3_MAX_RETRIES", &parameter)) {
163 num_retries_ = String2Uint64(parameter);
164 }
165
3/6
✓ Branch 2 taken 372 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 372 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 372 times.
372 if (options_manager.GetValue("CVMFS_S3_TIMEOUT", &parameter)) {
166 timeout_sec_ = String2Uint64(parameter);
167 }
168
3/6
✓ Branch 2 taken 372 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 372 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 372 times.
372 if (options_manager.GetValue("CVMFS_S3_REGION", &region_)) {
169 authz_method_ = s3fanout::kAuthzAwsV4;
170 }
171
3/6
✓ Branch 2 taken 372 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 372 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 372 times.
372 if (options_manager.GetValue("CVMFS_S3_FLAVOR", &flavor_)) {
172 if (flavor_ == "azure") {
173 authz_method_ = s3fanout::kAuthzAzure;
174 } else if (flavor_ == "awsv2") {
175 authz_method_ = s3fanout::kAuthzAwsV2;
176 } else if (flavor_ == "awsv4") {
177 authz_method_ = s3fanout::kAuthzAwsV4;
178 } else {
179 LogCvmfs(kLogUploadS3, kLogStderr,
180 "Failed to parse CVMFS_S3_FLAVOR from '%s', "
181 "valid options are azure, awsv2 or awsv4",
182 config_path.c_str());
183 return false;
184 }
185 }
186
3/6
✓ Branch 2 taken 372 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 372 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 372 times.
372 if (options_manager.GetValue("CVMFS_S3_PEEK_BEFORE_PUT", &parameter)) {
187 peek_before_put_ = options_manager.IsOn(parameter);
188 }
189
3/6
✓ Branch 2 taken 372 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 372 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 372 times.
372 if (options_manager.GetValue("CVMFS_S3_X_AMZ_ACL", &parameter)) {
190 bool isAllowed = false;
191 size_t const len = sizeof(x_amz_acl_allowed_values_)
192 / sizeof(x_amz_acl_allowed_values_[0]);
193 for (size_t i = 0; i < len; i++) {
194 if (x_amz_acl_allowed_values_[i] == parameter) {
195 isAllowed = true;
196 break;
197 }
198 }
199 if (!isAllowed) {
200 LogCvmfs(kLogUploadS3, kLogStderr,
201 "%s is not an allowed value for CVMFS_S3_X_AMZ_ACL",
202 parameter.c_str());
203 return false;
204 }
205 x_amz_acl_ = parameter;
206 }
207
208
3/6
✓ Branch 2 taken 372 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 372 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 372 times.
372 if (options_manager.GetValue("CVMFS_S3_USE_HTTPS", &parameter)) {
209 use_https_ = options_manager.IsOn(parameter);
210 }
211
212
3/6
✓ Branch 2 taken 372 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 372 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 372 times.
✗ Branch 10 not taken.
372 if (options_manager.GetValue("CVMFS_S3_PORT", &parameter)) {
213
2/4
✓ Branch 1 taken 372 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 372 times.
✗ Branch 5 not taken.
372 host_name_port_ = host_name_ + ":" + parameter;
214 } else {
215 host_name_port_ = host_name_;
216 }
217
218
3/6
✓ Branch 2 taken 372 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 372 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 372 times.
372 if (options_manager.IsDefined("CVMFS_S3_PROXY")) {
219 options_manager.GetValue("CVMFS_S3_PROXY", &proxy_);
220 }
221
222 372 return true;
223 372 }
224
225
226 476 bool S3Uploader::WillHandle(const SpoolerDefinition &spooler_definition) {
227 476 return spooler_definition.driver_type == SpoolerDefinition::S3;
228 }
229
230
231 bool S3Uploader::Create() {
232 if (!dns_buckets_)
233 return false;
234
235 s3fanout::JobInfo *info = CreateJobInfo("");
236 info->request = s3fanout::JobInfo::kReqPutBucket;
237 std::string request_content;
238 if (!region_.empty()) {
239 request_content = std::string("<CreateBucketConfiguration xmlns="
240 "\"http://s3.amazonaws.com/doc/2006-03-01/\">"
241 "<LocationConstraint>")
242 + region_
243 + "</LocationConstraint>"
244 "</CreateBucketConfiguration>";
245 info->origin->Append(request_content.data(), request_content.length());
246 info->origin->Commit();
247 }
248
249 RequestCtrl req_ctrl;
250 MakePipe(req_ctrl.pipe_wait);
251 info->callback = const_cast<void *>(static_cast<void const *>(
252 MakeClosure(&S3Uploader::OnReqComplete, this, &req_ctrl)));
253
254 IncJobsInFlight();
255 UploadJobInfo(info);
256 req_ctrl.WaitFor();
257
258 return req_ctrl.return_code == 0;
259 }
260
261
262 31 unsigned int S3Uploader::GetNumberOfErrors() const {
263 31 return atomic_read32(&io_errors_);
264 }
265
266
267 /**
268 * Worker thread takes care of requesting new jobs and cleaning old ones.
269 */
270 372 void *S3Uploader::MainCollectResults(void *data) {
271 372 LogCvmfs(kLogUploadS3, kLogDebug, "Upload_S3 WorkerThread started.");
272 372 S3Uploader *uploader = reinterpret_cast<S3Uploader *>(data);
273
274 while (true) {
275 19406 s3fanout::JobInfo *info = uploader->s3fanout_mgr_->PopCompletedJob();
276
2/2
✓ Branch 0 taken 372 times.
✓ Branch 1 taken 19034 times.
19406 if (!info)
277 372 break;
278 // Report completed job
279 19034 int reply_code = 0;
280
2/2
✓ Branch 0 taken 62 times.
✓ Branch 1 taken 18972 times.
19034 if (info->error_code != s3fanout::kFailOk) {
281
1/2
✓ Branch 0 taken 62 times.
✗ Branch 1 not taken.
62 if ((info->request != s3fanout::JobInfo::kReqHeadOnly)
282
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 62 times.
62 || (info->error_code != s3fanout::kFailNotFound)) {
283 LogCvmfs(kLogUploadS3, kLogStderr,
284 "Upload job for '%s' failed. (error code: %d - %s)",
285 info->object_key.c_str(), info->error_code,
286 s3fanout::Code2Ascii(info->error_code));
287 reply_code = 99;
288 atomic_inc32(&uploader->io_errors_);
289 }
290 }
291
2/2
✓ Branch 0 taken 31 times.
✓ Branch 1 taken 19003 times.
19034 if (info->request == s3fanout::JobInfo::kReqDelete) {
292
1/2
✓ Branch 2 taken 31 times.
✗ Branch 3 not taken.
31 uploader->Respond(NULL, UploaderResults());
293
2/2
✓ Branch 0 taken 155 times.
✓ Branch 1 taken 18848 times.
19003 } else if (info->request == s3fanout::JobInfo::kReqHeadOnly) {
294
2/2
✓ Branch 0 taken 62 times.
✓ Branch 1 taken 93 times.
155 if (info->error_code == s3fanout::kFailNotFound)
295 62 reply_code = 1;
296
1/2
✓ Branch 1 taken 155 times.
✗ Branch 2 not taken.
155 uploader->Respond(static_cast<CallbackTN *>(info->callback),
297 310 UploaderResults(UploaderResults::kLookup, reply_code));
298 } else {
299
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18848 times.
18848 if (info->request == s3fanout::JobInfo::kReqHeadPut) {
300 // The HEAD request was not transformed into a PUT request, thus this
301 // was a duplicate
302 // Uploaded catalogs are always unique ->
303 // assume this was a regular file and decrease appropriate counters
304 uploader->CountDuplicates();
305 uploader->DecUploadedChunks();
306 uploader->CountUploadedBytes(-(info->payload_size));
307 }
308 37696 uploader->Respond(
309
1/2
✓ Branch 1 taken 18848 times.
✗ Branch 2 not taken.
18848 static_cast<CallbackTN *>(info->callback),
310 37696 UploaderResults(UploaderResults::kChunkCommit, reply_code));
311
312
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 18848 times.
18848 assert(!info->origin.IsValid());
313 }
314
1/2
✓ Branch 0 taken 19034 times.
✗ Branch 1 not taken.
19034 delete info;
315 19034 }
316
317 372 LogCvmfs(kLogUploadS3, kLogDebug, "Upload_S3 WorkerThread finished.");
318 372 return NULL;
319 }
320
321
322 15717 void S3Uploader::DoUpload(const std::string &remote_path,
323 IngestionSource *source,
324 const CallbackTN *callback) {
325
1/2
✓ Branch 1 taken 15717 times.
✗ Branch 2 not taken.
15717 bool rvb = source->Open();
326
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15717 times.
15717 if (!rvb) {
327 Respond(callback, UploaderResults(100, source->GetPath()));
328 return;
329 }
330 uint64_t size;
331
1/2
✓ Branch 1 taken 15717 times.
✗ Branch 2 not taken.
15717 rvb = source->GetSize(&size);
332
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15717 times.
15717 assert(rvb);
333
334 15717 FileBackedBuffer *origin = FileBackedBuffer::Create(
335
1/2
✓ Branch 2 taken 15717 times.
✗ Branch 3 not taken.
15717 kInMemoryObjectThreshold, spooler_definition().temporary_path);
336
337 unsigned char buffer[kPageSize];
338 ssize_t nbytes;
339 do {
340
1/2
✓ Branch 1 taken 6174177 times.
✗ Branch 2 not taken.
6174177 nbytes = source->Read(buffer, kPageSize);
341
2/2
✓ Branch 0 taken 6163761 times.
✓ Branch 1 taken 10416 times.
6174177 if (nbytes > 0)
342
1/2
✓ Branch 1 taken 6163761 times.
✗ Branch 2 not taken.
6163761 origin->Append(buffer, nbytes);
343
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6174177 times.
6174177 if (nbytes < 0) {
344 source->Close();
345 delete origin;
346 Respond(callback, UploaderResults(100, source->GetPath()));
347 return;
348 }
349
2/2
✓ Branch 0 taken 6158460 times.
✓ Branch 1 taken 15717 times.
6174177 } while (nbytes == kPageSize);
350
1/2
✓ Branch 1 taken 15717 times.
✗ Branch 2 not taken.
15717 source->Close();
351
1/2
✓ Branch 1 taken 15717 times.
✗ Branch 2 not taken.
15717 origin->Commit();
352
353 s3fanout::JobInfo *info = new s3fanout::JobInfo(
354
2/4
✓ Branch 1 taken 15717 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 15717 times.
✗ Branch 5 not taken.
31434 repository_alias_ + "/" + remote_path,
355 const_cast<void *>(static_cast<void const *>(callback)),
356
2/4
✓ Branch 1 taken 15717 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 15717 times.
✗ Branch 5 not taken.
15717 origin);
357
358
3/6
✓ Branch 2 taken 15717 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 15717 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 15717 times.
15717 if (HasPrefix(remote_path, ".cvmfs", false /*ignore_case*/)) {
359 info->request = s3fanout::JobInfo::kReqPutDotCvmfs;
360
3/6
✓ Branch 2 taken 15717 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 15717 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 15717 times.
15717 } else if (HasSuffix(remote_path, ".html", false)) {
361 info->request = s3fanout::JobInfo::kReqPutHtml;
362 } else {
363
1/2
✓ Branch 0 taken 15717 times.
✗ Branch 1 not taken.
15717 if (peek_before_put_)
364 15717 info->request = s3fanout::JobInfo::kReqHeadPut;
365 }
366
367 15717 RequestCtrl req_ctrl;
368
1/2
✓ Branch 1 taken 15717 times.
✗ Branch 2 not taken.
15717 MakePipe(req_ctrl.pipe_wait);
369 15717 req_ctrl.callback_forward = callback;
370
1/2
✓ Branch 1 taken 15717 times.
✗ Branch 2 not taken.
15717 req_ctrl.original_path = source->GetPath();
371 15717 info->callback = const_cast<void *>(static_cast<void const *>(
372
1/2
✓ Branch 1 taken 15717 times.
✗ Branch 2 not taken.
15717 MakeClosure(&S3Uploader::OnReqComplete, this, &req_ctrl)));
373
374
1/2
✓ Branch 1 taken 15717 times.
✗ Branch 2 not taken.
15717 UploadJobInfo(info);
375
1/2
✓ Branch 1 taken 15717 times.
✗ Branch 2 not taken.
15717 req_ctrl.WaitFor();
376
1/2
✓ Branch 2 taken 15717 times.
✗ Branch 3 not taken.
15717 LogCvmfs(kLogUploadS3, kLogDebug, "Uploading from source finished: %s",
377
1/2
✓ Branch 1 taken 15717 times.
✗ Branch 2 not taken.
31434 source->GetPath().c_str());
378 15717 }
379
380
381 19003 void S3Uploader::UploadJobInfo(s3fanout::JobInfo *info) {
382 19003 LogCvmfs(kLogUploadS3, kLogDebug,
383 "Uploading:\n"
384 "--> Object: '%s'\n"
385 "--> Bucket: '%s'\n"
386 "--> Host: '%s'\n",
387 info->object_key.c_str(), bucket_.c_str(), host_name_port_.c_str());
388
389 19003 s3fanout_mgr_->PushNewJob(info);
390 19003 }
391
392
393 3131 UploadStreamHandle *S3Uploader::InitStreamedUpload(const CallbackTN *callback) {
394 return new S3StreamHandle(callback, kInMemoryObjectThreshold,
395
1/2
✓ Branch 3 taken 3131 times.
✗ Branch 4 not taken.
3131 spooler_definition().temporary_path);
396 }
397
398
399 24862 void S3Uploader::StreamedUpload(UploadStreamHandle *handle,
400 UploadBuffer buffer,
401 const CallbackTN *callback) {
402 24862 S3StreamHandle *s3_handle = static_cast<S3StreamHandle *>(handle);
403
404 24862 s3_handle->buffer->Append(buffer.data, buffer.size);
405
1/2
✓ Branch 2 taken 24862 times.
✗ Branch 3 not taken.
24862 Respond(callback, UploaderResults(UploaderResults::kBufferUpload, 0));
406 24862 }
407
408
409 3131 void S3Uploader::FinalizeStreamedUpload(UploadStreamHandle *handle,
410 const shash::Any &content_hash) {
411 3131 S3StreamHandle *s3_handle = static_cast<S3StreamHandle *>(handle);
412
413 // New file name based on content hash or remote_path override
414 3131 std::string final_path;
415
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3131 times.
3131 if (s3_handle->remote_path != "") {
416 final_path = repository_alias_ + "/" + s3_handle->remote_path;
417 } else {
418
3/6
✓ Branch 1 taken 3131 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 3131 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 3131 times.
✗ Branch 8 not taken.
3131 final_path = repository_alias_ + "/data/" + content_hash.MakePath();
419 }
420
421
1/2
✓ Branch 2 taken 3131 times.
✗ Branch 3 not taken.
3131 s3_handle->buffer->Commit();
422
423
1/2
✓ Branch 2 taken 3131 times.
✗ Branch 3 not taken.
3131 const size_t bytes_uploaded = s3_handle->buffer->GetSize();
424
425 s3fanout::JobInfo *info = new s3fanout::JobInfo(
426 final_path,
427 3131 const_cast<void *>(static_cast<void const *>(handle->commit_callback)),
428
2/4
✓ Branch 2 taken 3131 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 3131 times.
✗ Branch 6 not taken.
3131 s3_handle->buffer.Release());
429
430
1/2
✓ Branch 0 taken 3131 times.
✗ Branch 1 not taken.
3131 if (peek_before_put_)
431 3131 info->request = s3fanout::JobInfo::kReqHeadPut;
432
1/2
✓ Branch 1 taken 3131 times.
✗ Branch 2 not taken.
3131 UploadJobInfo(info);
433
434 // Remove the temporary file
435
1/2
✓ Branch 0 taken 3131 times.
✗ Branch 1 not taken.
3131 delete s3_handle;
436
437 // Update statistics counters
438 3131 if (!content_hash.HasSuffix()
439
5/6
✓ Branch 0 taken 31 times.
✓ Branch 1 taken 3100 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 31 times.
✓ Branch 4 taken 3100 times.
✓ Branch 5 taken 31 times.
3131 || content_hash.suffix == shash::kSuffixPartial) {
440
1/2
✓ Branch 1 taken 3100 times.
✗ Branch 2 not taken.
3100 CountUploadedChunks();
441
1/2
✓ Branch 1 taken 3100 times.
✗ Branch 2 not taken.
3100 CountUploadedBytes(bytes_uploaded);
442
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31 times.
31 } else if (content_hash.suffix == shash::kSuffixCatalog) {
443 CountUploadedCatalogs();
444 CountUploadedCatalogBytes(bytes_uploaded);
445 }
446 3131 }
447
448
449 186 s3fanout::JobInfo *S3Uploader::CreateJobInfo(const std::string &path) const {
450
2/4
✓ Branch 2 taken 186 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 186 times.
✗ Branch 6 not taken.
186 FileBackedBuffer *buf = FileBackedBuffer::Create(kInMemoryObjectThreshold);
451
1/2
✓ Branch 2 taken 186 times.
✗ Branch 3 not taken.
186 return new s3fanout::JobInfo(path, NULL, buf);
452 }
453
454
455 31 void S3Uploader::DoRemoveAsync(const std::string &file_to_delete) {
456
2/4
✓ Branch 1 taken 31 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 31 times.
✗ Branch 5 not taken.
31 const std::string mangled_path = repository_alias_ + "/" + file_to_delete;
457
1/2
✓ Branch 1 taken 31 times.
✗ Branch 2 not taken.
31 s3fanout::JobInfo *info = CreateJobInfo(mangled_path);
458
459 31 info->request = s3fanout::JobInfo::kReqDelete;
460
461
1/2
✓ Branch 3 taken 31 times.
✗ Branch 4 not taken.
31 LogCvmfs(kLogUploadS3, kLogDebug, "Asynchronously removing %s/%s",
462 bucket_.c_str(), info->object_key.c_str());
463
1/2
✓ Branch 2 taken 31 times.
✗ Branch 3 not taken.
31 s3fanout_mgr_->PushNewJob(info);
464 31 }
465
466
467 15872 void S3Uploader::OnReqComplete(const upload::UploaderResults &results,
468 RequestCtrl *ctrl) {
469 15872 ctrl->return_code = results.return_code;
470
2/2
✓ Branch 0 taken 15717 times.
✓ Branch 1 taken 155 times.
15872 if (ctrl->callback_forward != NULL) {
471 // We are already in Respond() so we must not call it again
472 15717 const upload::UploaderResults fix_path(results.return_code,
473
1/2
✓ Branch 1 taken 15717 times.
✗ Branch 2 not taken.
15717 ctrl->original_path);
474
1/2
✓ Branch 1 taken 15717 times.
✗ Branch 2 not taken.
15717 (*(ctrl->callback_forward))(fix_path);
475
1/2
✓ Branch 0 taken 15717 times.
✗ Branch 1 not taken.
15717 delete ctrl->callback_forward;
476 15717 ctrl->callback_forward = NULL;
477 15717 }
478 15872 char c = 'c';
479
1/2
✓ Branch 1 taken 15872 times.
✗ Branch 2 not taken.
15872 WritePipe(ctrl->pipe_wait[1], &c, 1);
480 15872 }
481
482
483 155 bool S3Uploader::Peek(const std::string &path) {
484
2/4
✓ Branch 1 taken 155 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 155 times.
✗ Branch 5 not taken.
155 const std::string mangled_path = repository_alias_ + "/" + path;
485
1/2
✓ Branch 1 taken 155 times.
✗ Branch 2 not taken.
155 s3fanout::JobInfo *info = CreateJobInfo(mangled_path);
486
487 155 RequestCtrl req_ctrl;
488
1/2
✓ Branch 1 taken 155 times.
✗ Branch 2 not taken.
155 MakePipe(req_ctrl.pipe_wait);
489 155 info->request = s3fanout::JobInfo::kReqHeadOnly;
490 155 info->callback = const_cast<void *>(static_cast<void const *>(
491
1/2
✓ Branch 1 taken 155 times.
✗ Branch 2 not taken.
155 MakeClosure(&S3Uploader::OnReqComplete, this, &req_ctrl)));
492
493
1/2
✓ Branch 1 taken 155 times.
✗ Branch 2 not taken.
155 IncJobsInFlight();
494
1/2
✓ Branch 1 taken 155 times.
✗ Branch 2 not taken.
155 UploadJobInfo(info);
495
1/2
✓ Branch 1 taken 155 times.
✗ Branch 2 not taken.
155 req_ctrl.WaitFor();
496
497 155 return req_ctrl.return_code == 0;
498 155 }
499
500
501 // noop: no mkdir needed in S3 storage
502 bool S3Uploader::Mkdir(const std::string &path) { return true; }
503
504
505 bool S3Uploader::PlaceBootstrappingShortcut(const shash::Any &object) {
506 return false; // TODO(rmeusel): implement
507 }
508
509
510 int64_t S3Uploader::DoGetObjectSize(const std::string &file_name) {
511 // TODO(dosarudaniel): use a head request for byte count
512 // Re-enable 661 integration test when done
513 return -EOPNOTSUPP;
514 }
515
516 } // namespace upload
517