GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/upload_s3.cc
Date: 2026-01-25 02:35:50
Exec Total Coverage
Lines: 202 285 70.9%
Branches: 179 436 41.1%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "upload_s3.h"
6
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <inttypes.h>
10 #include <unistd.h>
11
12 #include <string>
13 #include <vector>
14
15 #include "compression/compression.h"
16 #include "network/s3fanout.h"
17 #include "options.h"
18 #include "util/exception.h"
19 #include "util/logging.h"
20 #include "util/posix.h"
21 #include "util/string.h"
22
23 namespace upload {
24
25 /*
26 * Allowed values of x-amz-acl according to S3 API
27 */
28 static const char *x_amz_acl_allowed_values_[8] = {"private",
29 "public-read",
30 "public-write",
31 "authenticated-read",
32 "aws-exec-read",
33 "bucket-owner-read",
34 "bucket-owner-full-control",
35 ""};
36
37 4608 void S3Uploader::RequestCtrl::WaitFor() {
38 char c;
39
1/2
✓ Branch 1 taken 4608 times.
✗ Branch 2 not taken.
4608 ReadPipe(pipe_wait[0], &c, 1);
40
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4608 times.
4608 assert(c == 'c');
41
1/2
✓ Branch 1 taken 4608 times.
✗ Branch 2 not taken.
4608 ClosePipe(pipe_wait);
42 4608 }
43
44
45 108 S3Uploader::S3Uploader(const SpoolerDefinition &spooler_definition)
46 : AbstractUploader(spooler_definition)
47 108 , dns_buckets_(true)
48 108 , num_parallel_uploads_(kDefaultNumParallelUploads)
49 108 , num_retries_(kDefaultNumRetries)
50 108 , timeout_sec_(kDefaultTimeoutSec)
51 108 , authz_method_(s3fanout::kAuthzAwsV2)
52 108 , peek_before_put_(true)
53 108 , use_https_(false)
54
1/2
✓ Branch 2 taken 108 times.
✗ Branch 3 not taken.
108 , proxy_("")
55
1/2
✓ Branch 1 taken 108 times.
✗ Branch 2 not taken.
108 , temporary_path_(spooler_definition.temporary_path)
56
2/4
✓ Branch 2 taken 108 times.
✗ Branch 3 not taken.
✓ Branch 14 taken 108 times.
✗ Branch 15 not taken.
216 , x_amz_acl_("public-read") {
57
2/4
✓ Branch 1 taken 108 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 108 times.
✗ Branch 4 not taken.
108 assert(spooler_definition.IsValid()
58 && spooler_definition.driver_type == SpoolerDefinition::S3);
59
60 108 atomic_init32(&io_errors_);
61
62
2/4
✓ Branch 1 taken 108 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 108 times.
108 if (!ParseSpoolerDefinition(spooler_definition)) {
63 PANIC(kLogStderr, "Error in parsing the spooler definition");
64 }
65
66
1/2
✓ Branch 1 taken 108 times.
✗ Branch 2 not taken.
108 s3fanout::S3FanoutManager::S3Config s3config;
67
1/2
✓ Branch 1 taken 108 times.
✗ Branch 2 not taken.
108 s3config.access_key = access_key_;
68
1/2
✓ Branch 1 taken 108 times.
✗ Branch 2 not taken.
108 s3config.secret_key = secret_key_;
69
1/2
✓ Branch 1 taken 108 times.
✗ Branch 2 not taken.
108 s3config.hostname_port = host_name_port_;
70 108 s3config.authz_method = authz_method_;
71
1/2
✓ Branch 1 taken 108 times.
✗ Branch 2 not taken.
108 s3config.region = region_;
72
1/2
✓ Branch 1 taken 108 times.
✗ Branch 2 not taken.
108 s3config.flavor = flavor_;
73
1/2
✓ Branch 1 taken 108 times.
✗ Branch 2 not taken.
108 s3config.bucket = bucket_;
74 108 s3config.dns_buckets = dns_buckets_;
75 108 s3config.pool_max_handles = num_parallel_uploads_;
76 108 s3config.opt_timeout_sec = timeout_sec_;
77 108 s3config.opt_max_retries = num_retries_;
78 108 s3config.opt_backoff_init_ms = kDefaultBackoffInitMs;
79 108 s3config.opt_backoff_max_ms = kDefaultBackoffMaxMs;
80
1/2
✓ Branch 1 taken 108 times.
✗ Branch 2 not taken.
108 s3config.x_amz_acl = x_amz_acl_;
81
82
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 108 times.
108 if (use_https_) {
83 s3config.protocol = "https";
84 } else {
85
1/2
✓ Branch 1 taken 108 times.
✗ Branch 2 not taken.
108 s3config.protocol = "http";
86 }
87
1/2
✓ Branch 1 taken 108 times.
✗ Branch 2 not taken.
108 s3config.proxy = proxy_;
88
89
3/6
✓ Branch 1 taken 108 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 108 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 108 times.
✗ Branch 8 not taken.
108 s3fanout_mgr_ = new s3fanout::S3FanoutManager(s3config);
90
1/2
✓ Branch 2 taken 108 times.
✗ Branch 3 not taken.
108 s3fanout_mgr_->Spawn();
91
92 108 const int retval = pthread_create(&thread_collect_results_, NULL,
93 MainCollectResults, this);
94
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 108 times.
108 assert(retval == 0);
95 108 }
96
97
98 648 S3Uploader::~S3Uploader() {
99 // Signal termination to our own worker thread
100 216 s3fanout_mgr_->PushCompletedJob(NULL);
101 216 pthread_join(thread_collect_results_, NULL);
102 432 }
103
104
105 108 bool S3Uploader::ParseSpoolerDefinition(
106 const SpoolerDefinition &spooler_definition) {
107 const std::vector<std::string> config = SplitString(
108
1/2
✓ Branch 1 taken 108 times.
✗ Branch 2 not taken.
108 spooler_definition.spooler_configuration, '@');
109
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 108 times.
108 if (config.size() != 2) {
110 LogCvmfs(kLogUploadS3, kLogStderr,
111 "Failed to parse spooler configuration string '%s'.\n"
112 "Provide: <repo_alias>@/path/to/s3.conf",
113 spooler_definition.spooler_configuration.c_str());
114 return false;
115 }
116
1/2
✓ Branch 2 taken 108 times.
✗ Branch 3 not taken.
108 repository_alias_ = config[0];
117 108 const std::string &config_path = config[1];
118
119
2/4
✓ Branch 1 taken 108 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 108 times.
108 if (!FileExists(config_path)) {
120 LogCvmfs(kLogUploadS3, kLogStderr, "Cannot find S3 config file at '%s'",
121 config_path.c_str());
122 return false;
123 }
124
125 // Parse S3 configuration
126 BashOptionsManager options_manager = BashOptionsManager(
127
4/8
✓ Branch 1 taken 108 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 108 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 108 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 108 times.
✗ Branch 11 not taken.
108 new DefaultOptionsTemplateManager(repository_alias_));
128
1/2
✓ Branch 1 taken 108 times.
✗ Branch 2 not taken.
108 options_manager.ParsePath(config_path, false);
129 108 std::string parameter;
130
131
3/6
✓ Branch 2 taken 108 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 108 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 108 times.
108 if (!options_manager.GetValue("CVMFS_S3_HOST", &host_name_)) {
132 LogCvmfs(kLogUploadS3, kLogStderr,
133 "Failed to parse CVMFS_S3_HOST from '%s'", config_path.c_str());
134 return false;
135 }
136
3/6
✓ Branch 2 taken 108 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 108 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 108 times.
108 if (!options_manager.GetValue("CVMFS_S3_ACCESS_KEY", &access_key_)) {
137 LogCvmfs(kLogUploadS3, kLogStderr,
138 "Failed to parse CVMFS_S3_ACCESS_KEY from '%s'.",
139 config_path.c_str());
140 return false;
141 }
142
3/6
✓ Branch 2 taken 108 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 108 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 108 times.
108 if (!options_manager.GetValue("CVMFS_S3_SECRET_KEY", &secret_key_)) {
143 LogCvmfs(kLogUploadS3, kLogStderr,
144 "Failed to parse CVMFS_S3_SECRET_KEY from '%s'.",
145 config_path.c_str());
146 return false;
147 }
148
3/6
✓ Branch 2 taken 108 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 108 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 108 times.
108 if (!options_manager.GetValue("CVMFS_S3_BUCKET", &bucket_)) {
149 LogCvmfs(kLogUploadS3, kLogStderr,
150 "Failed to parse CVMFS_S3_BUCKET from '%s'.", config_path.c_str());
151 return false;
152 }
153
3/6
✓ Branch 2 taken 108 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 108 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 108 times.
✗ Branch 10 not taken.
108 if (options_manager.GetValue("CVMFS_S3_DNS_BUCKETS", &parameter)) {
154
1/2
✓ Branch 1 taken 108 times.
✗ Branch 2 not taken.
108 if (parameter == "false") {
155 108 dns_buckets_ = false;
156 }
157 }
158
3/6
✓ Branch 2 taken 108 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 108 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 108 times.
✗ Branch 10 not taken.
108 if (options_manager.GetValue("CVMFS_S3_MAX_NUMBER_OF_PARALLEL_CONNECTIONS",
159 &parameter)) {
160
1/2
✓ Branch 1 taken 108 times.
✗ Branch 2 not taken.
108 num_parallel_uploads_ = String2Uint64(parameter);
161 }
162
3/6
✓ Branch 2 taken 108 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 108 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 108 times.
108 if (options_manager.GetValue("CVMFS_S3_MAX_RETRIES", &parameter)) {
163 num_retries_ = String2Uint64(parameter);
164 }
165
3/6
✓ Branch 2 taken 108 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 108 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 108 times.
108 if (options_manager.GetValue("CVMFS_S3_TIMEOUT", &parameter)) {
166 timeout_sec_ = String2Uint64(parameter);
167 }
168
3/6
✓ Branch 2 taken 108 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 108 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 108 times.
108 if (options_manager.GetValue("CVMFS_S3_REGION", &region_)) {
169 authz_method_ = s3fanout::kAuthzAwsV4;
170 }
171
3/6
✓ Branch 2 taken 108 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 108 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 108 times.
108 if (options_manager.GetValue("CVMFS_S3_FLAVOR", &flavor_)) {
172 if (flavor_ == "azure") {
173 authz_method_ = s3fanout::kAuthzAzure;
174 } else if (flavor_ == "awsv2") {
175 authz_method_ = s3fanout::kAuthzAwsV2;
176 } else if (flavor_ == "awsv4") {
177 authz_method_ = s3fanout::kAuthzAwsV4;
178 } else {
179 LogCvmfs(kLogUploadS3, kLogStderr,
180 "Failed to parse CVMFS_S3_FLAVOR from '%s', "
181 "valid options are azure, awsv2 or awsv4",
182 config_path.c_str());
183 return false;
184 }
185 }
186
3/6
✓ Branch 2 taken 108 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 108 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 108 times.
108 if (options_manager.GetValue("CVMFS_S3_PEEK_BEFORE_PUT", &parameter)) {
187 peek_before_put_ = options_manager.IsOn(parameter);
188 }
189
3/6
✓ Branch 2 taken 108 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 108 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 108 times.
108 if (options_manager.GetValue("CVMFS_S3_X_AMZ_ACL", &parameter)) {
190 bool isAllowed = false;
191 size_t const len = sizeof(x_amz_acl_allowed_values_)
192 / sizeof(x_amz_acl_allowed_values_[0]);
193 for (size_t i = 0; i < len; i++) {
194 if (x_amz_acl_allowed_values_[i] == parameter) {
195 isAllowed = true;
196 break;
197 }
198 }
199 if (!isAllowed) {
200 LogCvmfs(kLogUploadS3, kLogStderr,
201 "%s is not an allowed value for CVMFS_S3_X_AMZ_ACL",
202 parameter.c_str());
203 return false;
204 }
205 x_amz_acl_ = parameter;
206 }
207
208
3/6
✓ Branch 2 taken 108 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 108 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 108 times.
108 if (options_manager.GetValue("CVMFS_S3_USE_HTTPS", &parameter)) {
209 use_https_ = options_manager.IsOn(parameter);
210 }
211
212
3/6
✓ Branch 2 taken 108 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 108 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 108 times.
✗ Branch 10 not taken.
108 if (options_manager.GetValue("CVMFS_S3_PORT", &parameter)) {
213
2/4
✓ Branch 1 taken 108 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 108 times.
✗ Branch 5 not taken.
108 host_name_port_ = host_name_ + ":" + parameter;
214 } else {
215 host_name_port_ = host_name_;
216 }
217
218
3/6
✓ Branch 2 taken 108 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 108 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 108 times.
108 if (options_manager.IsDefined("CVMFS_S3_PROXY")) {
219 options_manager.GetValue("CVMFS_S3_PROXY", &proxy_);
220 }
221
222 108 return true;
223 108 }
224
225
226 163 bool S3Uploader::WillHandle(const SpoolerDefinition &spooler_definition) {
227 163 return spooler_definition.driver_type == SpoolerDefinition::S3;
228 }
229
230
231 bool S3Uploader::Create() {
232 if (!dns_buckets_)
233 return false;
234
235 s3fanout::JobInfo *info = CreateJobInfo("");
236 info->request = s3fanout::JobInfo::kReqPutBucket;
237 std::string request_content;
238 if (!region_.empty()) {
239 request_content = std::string("<CreateBucketConfiguration xmlns="
240 "\"http://s3.amazonaws.com/doc/2006-03-01/\">"
241 "<LocationConstraint>")
242 + region_
243 + "</LocationConstraint>"
244 "</CreateBucketConfiguration>";
245 info->origin->Append(request_content.data(), request_content.length());
246 info->origin->Commit();
247 }
248
249 RequestCtrl req_ctrl;
250 MakePipe(req_ctrl.pipe_wait);
251 info->callback = const_cast<void *>(static_cast<void const *>(
252 MakeClosure(&S3Uploader::OnReqComplete, this, &req_ctrl)));
253
254 IncJobsInFlight();
255 UploadJobInfo(info);
256 req_ctrl.WaitFor();
257
258 return req_ctrl.return_code == 0;
259 }
260
261
262 9 unsigned int S3Uploader::GetNumberOfErrors() const {
263 9 return atomic_read32(&io_errors_);
264 }
265
266
267 /**
268 * Worker thread takes care of requesting new jobs and cleaning old ones.
269 */
270 108 void *S3Uploader::MainCollectResults(void *data) {
271 108 LogCvmfs(kLogUploadS3, kLogDebug, "Upload_S3 WorkerThread started.");
272 108 S3Uploader *uploader = reinterpret_cast<S3Uploader *>(data);
273
274 while (true) {
275 5634 s3fanout::JobInfo *info = uploader->s3fanout_mgr_->PopCompletedJob();
276
2/2
✓ Branch 0 taken 108 times.
✓ Branch 1 taken 5526 times.
5634 if (!info)
277 108 break;
278 // Report completed job
279 5526 int reply_code = 0;
280
2/2
✓ Branch 0 taken 18 times.
✓ Branch 1 taken 5508 times.
5526 if (info->error_code != s3fanout::kFailOk) {
281
1/2
✓ Branch 0 taken 18 times.
✗ Branch 1 not taken.
18 if ((info->request != s3fanout::JobInfo::kReqHeadOnly)
282
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18 times.
18 || (info->error_code != s3fanout::kFailNotFound)) {
283 LogCvmfs(kLogUploadS3, kLogStderr,
284 "Upload job for '%s' failed. (error code: %d - %s)",
285 info->object_key.c_str(), info->error_code,
286 s3fanout::Code2Ascii(info->error_code));
287 reply_code = 99;
288 atomic_inc32(&uploader->io_errors_);
289 }
290 }
291
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 5517 times.
5526 if (info->request == s3fanout::JobInfo::kReqDelete) {
292
1/2
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
9 uploader->Respond(NULL, UploaderResults());
293
2/2
✓ Branch 0 taken 45 times.
✓ Branch 1 taken 5472 times.
5517 } else if (info->request == s3fanout::JobInfo::kReqHeadOnly) {
294
2/2
✓ Branch 0 taken 18 times.
✓ Branch 1 taken 27 times.
45 if (info->error_code == s3fanout::kFailNotFound)
295 18 reply_code = 1;
296
1/2
✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
45 uploader->Respond(static_cast<CallbackTN *>(info->callback),
297 90 UploaderResults(UploaderResults::kLookup, reply_code));
298 } else {
299
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5472 times.
5472 if (info->request == s3fanout::JobInfo::kReqHeadPut) {
300 // The HEAD request was not transformed into a PUT request, thus this
301 // was a duplicate
302 // Uploaded catalogs are always unique ->
303 // assume this was a regular file and decrease appropriate counters
304 uploader->CountDuplicates();
305 uploader->DecUploadedChunks();
306 uploader->CountUploadedBytes(-(info->payload_size));
307 }
308 10944 uploader->Respond(
309
1/2
✓ Branch 1 taken 5472 times.
✗ Branch 2 not taken.
5472 static_cast<CallbackTN *>(info->callback),
310 10944 UploaderResults(UploaderResults::kChunkCommit, reply_code));
311
312
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5472 times.
5472 assert(!info->origin.IsValid());
313 }
314
1/2
✓ Branch 0 taken 5526 times.
✗ Branch 1 not taken.
5526 delete info;
315 5526 }
316
317 108 LogCvmfs(kLogUploadS3, kLogDebug, "Upload_S3 WorkerThread finished.");
318 108 return NULL;
319 }
320
321
322 4563 void S3Uploader::DoUpload(const std::string &remote_path,
323 IngestionSource *source,
324 const CallbackTN *callback) {
325
1/2
✓ Branch 1 taken 4563 times.
✗ Branch 2 not taken.
4563 bool rvb = source->Open();
326
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4563 times.
4563 if (!rvb) {
327 Respond(callback, UploaderResults(100, source->GetPath()));
328 return;
329 }
330 uint64_t size;
331
1/2
✓ Branch 1 taken 4563 times.
✗ Branch 2 not taken.
4563 rvb = source->GetSize(&size);
332
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4563 times.
4563 assert(rvb);
333
334 4563 FileBackedBuffer *origin = FileBackedBuffer::Create(
335
1/2
✓ Branch 2 taken 4563 times.
✗ Branch 3 not taken.
4563 kInMemoryObjectThreshold, spooler_definition().temporary_path);
336
337 unsigned char buffer[kPageSize];
338 ssize_t nbytes;
339 do {
340
1/2
✓ Branch 1 taken 1792503 times.
✗ Branch 2 not taken.
1792503 nbytes = source->Read(buffer, kPageSize);
341
2/2
✓ Branch 0 taken 1789479 times.
✓ Branch 1 taken 3024 times.
1792503 if (nbytes > 0)
342
1/2
✓ Branch 1 taken 1789479 times.
✗ Branch 2 not taken.
1789479 origin->Append(buffer, nbytes);
343
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1792503 times.
1792503 if (nbytes < 0) {
344 source->Close();
345 delete origin;
346 Respond(callback, UploaderResults(100, source->GetPath()));
347 return;
348 }
349
2/2
✓ Branch 0 taken 1787940 times.
✓ Branch 1 taken 4563 times.
1792503 } while (nbytes == kPageSize);
350
1/2
✓ Branch 1 taken 4563 times.
✗ Branch 2 not taken.
4563 source->Close();
351
1/2
✓ Branch 1 taken 4563 times.
✗ Branch 2 not taken.
4563 origin->Commit();
352
353 s3fanout::JobInfo *info = new s3fanout::JobInfo(
354
2/4
✓ Branch 1 taken 4563 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 4563 times.
✗ Branch 5 not taken.
9126 repository_alias_ + "/" + remote_path,
355 const_cast<void *>(static_cast<void const *>(callback)),
356
2/4
✓ Branch 1 taken 4563 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 4563 times.
✗ Branch 5 not taken.
4563 origin);
357
358
3/6
✓ Branch 2 taken 4563 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4563 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 4563 times.
4563 if (HasPrefix(remote_path, ".cvmfs", false /*ignore_case*/)) {
359 info->request = s3fanout::JobInfo::kReqPutDotCvmfs;
360
3/6
✓ Branch 2 taken 4563 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4563 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 4563 times.
4563 } else if (HasSuffix(remote_path, ".html", false)) {
361 info->request = s3fanout::JobInfo::kReqPutHtml;
362 } else {
363
1/2
✓ Branch 0 taken 4563 times.
✗ Branch 1 not taken.
4563 if (peek_before_put_)
364 4563 info->request = s3fanout::JobInfo::kReqHeadPut;
365 }
366
367 4563 RequestCtrl req_ctrl;
368
1/2
✓ Branch 1 taken 4563 times.
✗ Branch 2 not taken.
4563 MakePipe(req_ctrl.pipe_wait);
369 4563 req_ctrl.callback_forward = callback;
370
1/2
✓ Branch 1 taken 4563 times.
✗ Branch 2 not taken.
4563 req_ctrl.original_path = source->GetPath();
371 4563 info->callback = const_cast<void *>(static_cast<void const *>(
372
1/2
✓ Branch 1 taken 4563 times.
✗ Branch 2 not taken.
4563 MakeClosure(&S3Uploader::OnReqComplete, this, &req_ctrl)));
373
374
1/2
✓ Branch 1 taken 4563 times.
✗ Branch 2 not taken.
4563 UploadJobInfo(info);
375
1/2
✓ Branch 1 taken 4563 times.
✗ Branch 2 not taken.
4563 req_ctrl.WaitFor();
376
1/2
✓ Branch 2 taken 4563 times.
✗ Branch 3 not taken.
4563 LogCvmfs(kLogUploadS3, kLogDebug, "Uploading from source finished: %s",
377
1/2
✓ Branch 1 taken 4563 times.
✗ Branch 2 not taken.
9126 source->GetPath().c_str());
378 4563 }
379
380
381 5517 void S3Uploader::UploadJobInfo(s3fanout::JobInfo *info) {
382 5517 LogCvmfs(kLogUploadS3, kLogDebug,
383 "Uploading:\n"
384 "--> Object: '%s'\n"
385 "--> Bucket: '%s'\n"
386 "--> Host: '%s'\n",
387 info->object_key.c_str(), bucket_.c_str(), host_name_port_.c_str());
388
389 5517 s3fanout_mgr_->PushNewJob(info);
390 5517 }
391
392
393 909 UploadStreamHandle *S3Uploader::InitStreamedUpload(const CallbackTN *callback) {
394 return new S3StreamHandle(callback, kInMemoryObjectThreshold,
395
1/2
✓ Branch 3 taken 909 times.
✗ Branch 4 not taken.
909 spooler_definition().temporary_path);
396 }
397
398
399 7218 void S3Uploader::StreamedUpload(UploadStreamHandle *handle,
400 UploadBuffer buffer,
401 const CallbackTN *callback) {
402 7218 S3StreamHandle *s3_handle = static_cast<S3StreamHandle *>(handle);
403
404 7218 s3_handle->buffer->Append(buffer.data, buffer.size);
405
1/2
✓ Branch 2 taken 7218 times.
✗ Branch 3 not taken.
7218 Respond(callback, UploaderResults(UploaderResults::kBufferUpload, 0));
406 7218 }
407
408
409 909 void S3Uploader::FinalizeStreamedUpload(UploadStreamHandle *handle,
410 const shash::Any &content_hash) {
411 909 S3StreamHandle *s3_handle = static_cast<S3StreamHandle *>(handle);
412
413 // New file name based on content hash or remote_path override
414 909 std::string final_path;
415
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 909 times.
909 if (s3_handle->remote_path != "") {
416 final_path = repository_alias_ + "/" + s3_handle->remote_path;
417 } else {
418
3/6
✓ Branch 1 taken 909 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 909 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 909 times.
✗ Branch 8 not taken.
909 final_path = repository_alias_ + "/data/" + content_hash.MakePath();
419 }
420
421
1/2
✓ Branch 2 taken 909 times.
✗ Branch 3 not taken.
909 s3_handle->buffer->Commit();
422
423
1/2
✓ Branch 2 taken 909 times.
✗ Branch 3 not taken.
909 const size_t bytes_uploaded = s3_handle->buffer->GetSize();
424
425 s3fanout::JobInfo *info = new s3fanout::JobInfo(
426 final_path,
427 909 const_cast<void *>(static_cast<void const *>(handle->commit_callback)),
428
2/4
✓ Branch 2 taken 909 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 909 times.
✗ Branch 6 not taken.
909 s3_handle->buffer.Release());
429
430
1/2
✓ Branch 0 taken 909 times.
✗ Branch 1 not taken.
909 if (peek_before_put_)
431 909 info->request = s3fanout::JobInfo::kReqHeadPut;
432
1/2
✓ Branch 1 taken 909 times.
✗ Branch 2 not taken.
909 UploadJobInfo(info);
433
434 // Remove the temporary file
435
1/2
✓ Branch 0 taken 909 times.
✗ Branch 1 not taken.
909 delete s3_handle;
436
437 // Update statistics counters
438 909 if (!content_hash.HasSuffix()
439
5/6
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 900 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 9 times.
✓ Branch 4 taken 900 times.
✓ Branch 5 taken 9 times.
909 || content_hash.suffix == shash::kSuffixPartial) {
440
1/2
✓ Branch 1 taken 900 times.
✗ Branch 2 not taken.
900 CountUploadedChunks();
441
1/2
✓ Branch 1 taken 900 times.
✗ Branch 2 not taken.
900 CountUploadedBytes(bytes_uploaded);
442
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
9 } else if (content_hash.suffix == shash::kSuffixCatalog) {
443 CountUploadedCatalogs();
444 CountUploadedCatalogBytes(bytes_uploaded);
445 }
446 909 }
447
448
449 54 s3fanout::JobInfo *S3Uploader::CreateJobInfo(const std::string &path) const {
450
2/4
✓ Branch 2 taken 54 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 54 times.
✗ Branch 6 not taken.
54 FileBackedBuffer *buf = FileBackedBuffer::Create(kInMemoryObjectThreshold);
451
1/2
✓ Branch 2 taken 54 times.
✗ Branch 3 not taken.
54 return new s3fanout::JobInfo(path, NULL, buf);
452 }
453
454
455 9 void S3Uploader::DoRemoveAsync(const std::string &file_to_delete) {
456
2/4
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 9 times.
✗ Branch 5 not taken.
9 const std::string mangled_path = repository_alias_ + "/" + file_to_delete;
457
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
9 s3fanout::JobInfo *info = CreateJobInfo(mangled_path);
458
459 9 info->request = s3fanout::JobInfo::kReqDelete;
460
461
1/2
✓ Branch 3 taken 9 times.
✗ Branch 4 not taken.
9 LogCvmfs(kLogUploadS3, kLogDebug, "Asynchronously removing %s/%s",
462 bucket_.c_str(), info->object_key.c_str());
463
1/2
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
9 s3fanout_mgr_->PushNewJob(info);
464 9 }
465
466
467 4608 void S3Uploader::OnReqComplete(const upload::UploaderResults &results,
468 RequestCtrl *ctrl) {
469 4608 ctrl->return_code = results.return_code;
470
2/2
✓ Branch 0 taken 4563 times.
✓ Branch 1 taken 45 times.
4608 if (ctrl->callback_forward != NULL) {
471 // We are already in Respond() so we must not call it again
472 4563 const upload::UploaderResults fix_path(results.return_code,
473
1/2
✓ Branch 1 taken 4563 times.
✗ Branch 2 not taken.
4563 ctrl->original_path);
474
1/2
✓ Branch 1 taken 4563 times.
✗ Branch 2 not taken.
4563 (*(ctrl->callback_forward))(fix_path);
475
1/2
✓ Branch 0 taken 4563 times.
✗ Branch 1 not taken.
4563 delete ctrl->callback_forward;
476 4563 ctrl->callback_forward = NULL;
477 4563 }
478 4608 char c = 'c';
479
1/2
✓ Branch 1 taken 4608 times.
✗ Branch 2 not taken.
4608 WritePipe(ctrl->pipe_wait[1], &c, 1);
480 4608 }
481
482
483 45 bool S3Uploader::Peek(const std::string &path) {
484
2/4
✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 45 times.
✗ Branch 5 not taken.
45 const std::string mangled_path = repository_alias_ + "/" + path;
485
1/2
✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
45 s3fanout::JobInfo *info = CreateJobInfo(mangled_path);
486
487 45 RequestCtrl req_ctrl;
488
1/2
✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
45 MakePipe(req_ctrl.pipe_wait);
489 45 info->request = s3fanout::JobInfo::kReqHeadOnly;
490 45 info->callback = const_cast<void *>(static_cast<void const *>(
491
1/2
✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
45 MakeClosure(&S3Uploader::OnReqComplete, this, &req_ctrl)));
492
493
1/2
✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
45 IncJobsInFlight();
494
1/2
✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
45 UploadJobInfo(info);
495
1/2
✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
45 req_ctrl.WaitFor();
496
497 45 return req_ctrl.return_code == 0;
498 45 }
499
500
501 // noop: no mkdir needed in S3 storage
502 bool S3Uploader::Mkdir(const std::string &path) { return true; }
503
504
505 bool S3Uploader::PlaceBootstrappingShortcut(const shash::Any &object) {
506 return false; // TODO(rmeusel): implement
507 }
508
509
510 int64_t S3Uploader::DoGetObjectSize(const std::string &file_name) {
511 // TODO(dosarudaniel): use a head request for byte count
512 // Re-enable 661 integration test when done
513 return -EOPNOTSUPP;
514 }
515
516 } // namespace upload
517