GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/upload_s3.cc
Date: 2025-06-29 02:35:41
Exec Total Coverage
Lines: 202 285 70.9%
Branches: 179 436 41.1%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "upload_s3.h"
6
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <inttypes.h>
10 #include <unistd.h>
11
12 #include <string>
13 #include <vector>
14
15 #include "compression/compression.h"
16 #include "network/s3fanout.h"
17 #include "options.h"
18 #include "util/exception.h"
19 #include "util/logging.h"
20 #include "util/posix.h"
21 #include "util/string.h"
22
23 namespace upload {
24
25 /*
26 * Allowed values of x-amz-acl according to S3 API
27 */
28 static const char *x_amz_acl_allowed_values_[8] = {"private",
29 "public-read",
30 "public-write",
31 "authenticated-read",
32 "aws-exec-read",
33 "bucket-owner-read",
34 "bucket-owner-full-control",
35 ""};
36
37 8192 void S3Uploader::RequestCtrl::WaitFor() {
38 char c;
39
1/2
✓ Branch 1 taken 8192 times.
✗ Branch 2 not taken.
8192 ReadPipe(pipe_wait[0], &c, 1);
40
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8192 times.
8192 assert(c == 'c');
41
1/2
✓ Branch 1 taken 8192 times.
✗ Branch 2 not taken.
8192 ClosePipe(pipe_wait);
42 8192 }
43
44
45 192 S3Uploader::S3Uploader(const SpoolerDefinition &spooler_definition)
46 : AbstractUploader(spooler_definition)
47 192 , dns_buckets_(true)
48 192 , num_parallel_uploads_(kDefaultNumParallelUploads)
49 192 , num_retries_(kDefaultNumRetries)
50 192 , timeout_sec_(kDefaultTimeoutSec)
51 192 , authz_method_(s3fanout::kAuthzAwsV2)
52 192 , peek_before_put_(true)
53 192 , use_https_(false)
54
1/2
✓ Branch 2 taken 192 times.
✗ Branch 3 not taken.
192 , proxy_("")
55
1/2
✓ Branch 1 taken 192 times.
✗ Branch 2 not taken.
192 , temporary_path_(spooler_definition.temporary_path)
56
2/4
✓ Branch 2 taken 192 times.
✗ Branch 3 not taken.
✓ Branch 14 taken 192 times.
✗ Branch 15 not taken.
384 , x_amz_acl_("public-read") {
57
2/4
✓ Branch 1 taken 192 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 192 times.
✗ Branch 4 not taken.
192 assert(spooler_definition.IsValid()
58 && spooler_definition.driver_type == SpoolerDefinition::S3);
59
60 192 atomic_init32(&io_errors_);
61
62
2/4
✓ Branch 1 taken 192 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 192 times.
192 if (!ParseSpoolerDefinition(spooler_definition)) {
63 PANIC(kLogStderr, "Error in parsing the spooler definition");
64 }
65
66
1/2
✓ Branch 1 taken 192 times.
✗ Branch 2 not taken.
192 s3fanout::S3FanoutManager::S3Config s3config;
67
1/2
✓ Branch 1 taken 192 times.
✗ Branch 2 not taken.
192 s3config.access_key = access_key_;
68
1/2
✓ Branch 1 taken 192 times.
✗ Branch 2 not taken.
192 s3config.secret_key = secret_key_;
69
1/2
✓ Branch 1 taken 192 times.
✗ Branch 2 not taken.
192 s3config.hostname_port = host_name_port_;
70 192 s3config.authz_method = authz_method_;
71
1/2
✓ Branch 1 taken 192 times.
✗ Branch 2 not taken.
192 s3config.region = region_;
72
1/2
✓ Branch 1 taken 192 times.
✗ Branch 2 not taken.
192 s3config.flavor = flavor_;
73
1/2
✓ Branch 1 taken 192 times.
✗ Branch 2 not taken.
192 s3config.bucket = bucket_;
74 192 s3config.dns_buckets = dns_buckets_;
75 192 s3config.pool_max_handles = num_parallel_uploads_;
76 192 s3config.opt_timeout_sec = timeout_sec_;
77 192 s3config.opt_max_retries = num_retries_;
78 192 s3config.opt_backoff_init_ms = kDefaultBackoffInitMs;
79 192 s3config.opt_backoff_max_ms = kDefaultBackoffMaxMs;
80
1/2
✓ Branch 1 taken 192 times.
✗ Branch 2 not taken.
192 s3config.x_amz_acl = x_amz_acl_;
81
82
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 192 times.
192 if (use_https_) {
83 s3config.protocol = "https";
84 } else {
85
1/2
✓ Branch 1 taken 192 times.
✗ Branch 2 not taken.
192 s3config.protocol = "http";
86 }
87
1/2
✓ Branch 1 taken 192 times.
✗ Branch 2 not taken.
192 s3config.proxy = proxy_;
88
89
3/6
✓ Branch 1 taken 192 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 192 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 192 times.
✗ Branch 8 not taken.
192 s3fanout_mgr_ = new s3fanout::S3FanoutManager(s3config);
90
1/2
✓ Branch 2 taken 192 times.
✗ Branch 3 not taken.
192 s3fanout_mgr_->Spawn();
91
92 192 const int retval = pthread_create(&thread_collect_results_, NULL,
93 MainCollectResults, this);
94
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 192 times.
192 assert(retval == 0);
95 192 }
96
97
98 1152 S3Uploader::~S3Uploader() {
99 // Signal termination to our own worker thread
100 384 s3fanout_mgr_->PushCompletedJob(NULL);
101 384 pthread_join(thread_collect_results_, NULL);
102 768 }
103
104
105 192 bool S3Uploader::ParseSpoolerDefinition(
106 const SpoolerDefinition &spooler_definition) {
107 const std::vector<std::string> config = SplitString(
108
1/2
✓ Branch 1 taken 192 times.
✗ Branch 2 not taken.
192 spooler_definition.spooler_configuration, '@');
109
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 192 times.
192 if (config.size() != 2) {
110 LogCvmfs(kLogUploadS3, kLogStderr,
111 "Failed to parse spooler configuration string '%s'.\n"
112 "Provide: <repo_alias>@/path/to/s3.conf",
113 spooler_definition.spooler_configuration.c_str());
114 return false;
115 }
116
1/2
✓ Branch 2 taken 192 times.
✗ Branch 3 not taken.
192 repository_alias_ = config[0];
117 192 const std::string &config_path = config[1];
118
119
2/4
✓ Branch 1 taken 192 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 192 times.
192 if (!FileExists(config_path)) {
120 LogCvmfs(kLogUploadS3, kLogStderr, "Cannot find S3 config file at '%s'",
121 config_path.c_str());
122 return false;
123 }
124
125 // Parse S3 configuration
126 BashOptionsManager options_manager = BashOptionsManager(
127
4/8
✓ Branch 1 taken 192 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 192 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 192 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 192 times.
✗ Branch 11 not taken.
192 new DefaultOptionsTemplateManager(repository_alias_));
128
1/2
✓ Branch 1 taken 192 times.
✗ Branch 2 not taken.
192 options_manager.ParsePath(config_path, false);
129 192 std::string parameter;
130
131
3/6
✓ Branch 2 taken 192 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 192 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 192 times.
192 if (!options_manager.GetValue("CVMFS_S3_HOST", &host_name_)) {
132 LogCvmfs(kLogUploadS3, kLogStderr,
133 "Failed to parse CVMFS_S3_HOST from '%s'", config_path.c_str());
134 return false;
135 }
136
3/6
✓ Branch 2 taken 192 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 192 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 192 times.
192 if (!options_manager.GetValue("CVMFS_S3_ACCESS_KEY", &access_key_)) {
137 LogCvmfs(kLogUploadS3, kLogStderr,
138 "Failed to parse CVMFS_S3_ACCESS_KEY from '%s'.",
139 config_path.c_str());
140 return false;
141 }
142
3/6
✓ Branch 2 taken 192 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 192 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 192 times.
192 if (!options_manager.GetValue("CVMFS_S3_SECRET_KEY", &secret_key_)) {
143 LogCvmfs(kLogUploadS3, kLogStderr,
144 "Failed to parse CVMFS_S3_SECRET_KEY from '%s'.",
145 config_path.c_str());
146 return false;
147 }
148
3/6
✓ Branch 2 taken 192 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 192 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 192 times.
192 if (!options_manager.GetValue("CVMFS_S3_BUCKET", &bucket_)) {
149 LogCvmfs(kLogUploadS3, kLogStderr,
150 "Failed to parse CVMFS_S3_BUCKET from '%s'.", config_path.c_str());
151 return false;
152 }
153
3/6
✓ Branch 2 taken 192 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 192 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 192 times.
✗ Branch 10 not taken.
192 if (options_manager.GetValue("CVMFS_S3_DNS_BUCKETS", &parameter)) {
154
1/2
✓ Branch 1 taken 192 times.
✗ Branch 2 not taken.
192 if (parameter == "false") {
155 192 dns_buckets_ = false;
156 }
157 }
158
3/6
✓ Branch 2 taken 192 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 192 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 192 times.
✗ Branch 10 not taken.
192 if (options_manager.GetValue("CVMFS_S3_MAX_NUMBER_OF_PARALLEL_CONNECTIONS",
159 &parameter)) {
160
1/2
✓ Branch 1 taken 192 times.
✗ Branch 2 not taken.
192 num_parallel_uploads_ = String2Uint64(parameter);
161 }
162
3/6
✓ Branch 2 taken 192 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 192 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 192 times.
192 if (options_manager.GetValue("CVMFS_S3_MAX_RETRIES", &parameter)) {
163 num_retries_ = String2Uint64(parameter);
164 }
165
3/6
✓ Branch 2 taken 192 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 192 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 192 times.
192 if (options_manager.GetValue("CVMFS_S3_TIMEOUT", &parameter)) {
166 timeout_sec_ = String2Uint64(parameter);
167 }
168
3/6
✓ Branch 2 taken 192 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 192 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 192 times.
192 if (options_manager.GetValue("CVMFS_S3_REGION", &region_)) {
169 authz_method_ = s3fanout::kAuthzAwsV4;
170 }
171
3/6
✓ Branch 2 taken 192 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 192 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 192 times.
192 if (options_manager.GetValue("CVMFS_S3_FLAVOR", &flavor_)) {
172 if (flavor_ == "azure") {
173 authz_method_ = s3fanout::kAuthzAzure;
174 } else if (flavor_ == "awsv2") {
175 authz_method_ = s3fanout::kAuthzAwsV2;
176 } else if (flavor_ == "awsv4") {
177 authz_method_ = s3fanout::kAuthzAwsV4;
178 } else {
179 LogCvmfs(kLogUploadS3, kLogStderr,
180 "Failed to parse CVMFS_S3_FLAVOR from '%s', "
181 "valid options are azure, awsv2 or awsv4",
182 config_path.c_str());
183 return false;
184 }
185 }
186
3/6
✓ Branch 2 taken 192 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 192 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 192 times.
192 if (options_manager.GetValue("CVMFS_S3_PEEK_BEFORE_PUT", &parameter)) {
187 peek_before_put_ = options_manager.IsOn(parameter);
188 }
189
3/6
✓ Branch 2 taken 192 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 192 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 192 times.
192 if (options_manager.GetValue("CVMFS_S3_X_AMZ_ACL", &parameter)) {
190 bool isAllowed = false;
191 size_t const len = sizeof(x_amz_acl_allowed_values_)
192 / sizeof(x_amz_acl_allowed_values_[0]);
193 for (size_t i = 0; i < len; i++) {
194 if (x_amz_acl_allowed_values_[i] == parameter) {
195 isAllowed = true;
196 break;
197 }
198 }
199 if (!isAllowed) {
200 LogCvmfs(kLogUploadS3, kLogStderr,
201 "%s is not an allowed value for CVMFS_S3_X_AMZ_ACL",
202 parameter.c_str());
203 return false;
204 }
205 x_amz_acl_ = parameter;
206 }
207
208
3/6
✓ Branch 2 taken 192 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 192 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 192 times.
192 if (options_manager.GetValue("CVMFS_S3_USE_HTTPS", &parameter)) {
209 use_https_ = options_manager.IsOn(parameter);
210 }
211
212
3/6
✓ Branch 2 taken 192 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 192 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 192 times.
✗ Branch 10 not taken.
192 if (options_manager.GetValue("CVMFS_S3_PORT", &parameter)) {
213
2/4
✓ Branch 1 taken 192 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 192 times.
✗ Branch 5 not taken.
192 host_name_port_ = host_name_ + ":" + parameter;
214 } else {
215 host_name_port_ = host_name_;
216 }
217
218
3/6
✓ Branch 2 taken 192 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 192 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 192 times.
192 if (options_manager.IsDefined("CVMFS_S3_PROXY")) {
219 options_manager.GetValue("CVMFS_S3_PROXY", &proxy_);
220 }
221
222 192 return true;
223 192 }
224
225
226 303 bool S3Uploader::WillHandle(const SpoolerDefinition &spooler_definition) {
227 303 return spooler_definition.driver_type == SpoolerDefinition::S3;
228 }
229
230
231 bool S3Uploader::Create() {
232 if (!dns_buckets_)
233 return false;
234
235 s3fanout::JobInfo *info = CreateJobInfo("");
236 info->request = s3fanout::JobInfo::kReqPutBucket;
237 std::string request_content;
238 if (!region_.empty()) {
239 request_content = std::string("<CreateBucketConfiguration xmlns="
240 "\"http://s3.amazonaws.com/doc/2006-03-01/\">"
241 "<LocationConstraint>")
242 + region_
243 + "</LocationConstraint>"
244 "</CreateBucketConfiguration>";
245 info->origin->Append(request_content.data(), request_content.length());
246 info->origin->Commit();
247 }
248
249 RequestCtrl req_ctrl;
250 MakePipe(req_ctrl.pipe_wait);
251 info->callback = const_cast<void *>(static_cast<void const *>(
252 MakeClosure(&S3Uploader::OnReqComplete, this, &req_ctrl)));
253
254 IncJobsInFlight();
255 UploadJobInfo(info);
256 req_ctrl.WaitFor();
257
258 return req_ctrl.return_code == 0;
259 }
260
261
262 16 unsigned int S3Uploader::GetNumberOfErrors() const {
263 16 return atomic_read32(&io_errors_);
264 }
265
266
267 /**
268 * Worker thread takes care of requesting new jobs and cleaning old ones.
269 */
270 192 void *S3Uploader::MainCollectResults(void *data) {
271 192 LogCvmfs(kLogUploadS3, kLogDebug, "Upload_S3 WorkerThread started.");
272 192 S3Uploader *uploader = reinterpret_cast<S3Uploader *>(data);
273
274 while (true) {
275 10016 s3fanout::JobInfo *info = uploader->s3fanout_mgr_->PopCompletedJob();
276
2/2
✓ Branch 0 taken 192 times.
✓ Branch 1 taken 9824 times.
10016 if (!info)
277 192 break;
278 // Report completed job
279 9824 int reply_code = 0;
280
2/2
✓ Branch 0 taken 32 times.
✓ Branch 1 taken 9792 times.
9824 if (info->error_code != s3fanout::kFailOk) {
281
1/2
✓ Branch 0 taken 32 times.
✗ Branch 1 not taken.
32 if ((info->request != s3fanout::JobInfo::kReqHeadOnly)
282
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 32 times.
32 || (info->error_code != s3fanout::kFailNotFound)) {
283 LogCvmfs(kLogUploadS3, kLogStderr,
284 "Upload job for '%s' failed. (error code: %d - %s)",
285 info->object_key.c_str(), info->error_code,
286 s3fanout::Code2Ascii(info->error_code));
287 reply_code = 99;
288 atomic_inc32(&uploader->io_errors_);
289 }
290 }
291
2/2
✓ Branch 0 taken 16 times.
✓ Branch 1 taken 9808 times.
9824 if (info->request == s3fanout::JobInfo::kReqDelete) {
292
1/2
✓ Branch 2 taken 16 times.
✗ Branch 3 not taken.
16 uploader->Respond(NULL, UploaderResults());
293
2/2
✓ Branch 0 taken 80 times.
✓ Branch 1 taken 9728 times.
9808 } else if (info->request == s3fanout::JobInfo::kReqHeadOnly) {
294
2/2
✓ Branch 0 taken 32 times.
✓ Branch 1 taken 48 times.
80 if (info->error_code == s3fanout::kFailNotFound)
295 32 reply_code = 1;
296
1/2
✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
80 uploader->Respond(static_cast<CallbackTN *>(info->callback),
297 160 UploaderResults(UploaderResults::kLookup, reply_code));
298 } else {
299
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9728 times.
9728 if (info->request == s3fanout::JobInfo::kReqHeadPut) {
300 // The HEAD request was not transformed into a PUT request, thus this
301 // was a duplicate
302 // Uploaded catalogs are always unique ->
303 // assume this was a regular file and decrease appropriate counters
304 uploader->CountDuplicates();
305 uploader->DecUploadedChunks();
306 uploader->CountUploadedBytes(-(info->payload_size));
307 }
308 19456 uploader->Respond(
309
1/2
✓ Branch 1 taken 9728 times.
✗ Branch 2 not taken.
9728 static_cast<CallbackTN *>(info->callback),
310 19456 UploaderResults(UploaderResults::kChunkCommit, reply_code));
311
312
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9728 times.
9728 assert(!info->origin.IsValid());
313 }
314
1/2
✓ Branch 0 taken 9824 times.
✗ Branch 1 not taken.
9824 delete info;
315 9824 }
316
317 192 LogCvmfs(kLogUploadS3, kLogDebug, "Upload_S3 WorkerThread finished.");
318 192 return NULL;
319 }
320
321
322 8112 void S3Uploader::DoUpload(const std::string &remote_path,
323 IngestionSource *source,
324 const CallbackTN *callback) {
325
1/2
✓ Branch 1 taken 8112 times.
✗ Branch 2 not taken.
8112 bool rvb = source->Open();
326
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8112 times.
8112 if (!rvb) {
327 Respond(callback, UploaderResults(100, source->GetPath()));
328 return;
329 }
330 uint64_t size;
331
1/2
✓ Branch 1 taken 8112 times.
✗ Branch 2 not taken.
8112 rvb = source->GetSize(&size);
332
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8112 times.
8112 assert(rvb);
333
334 8112 FileBackedBuffer *origin = FileBackedBuffer::Create(
335
1/2
✓ Branch 2 taken 8112 times.
✗ Branch 3 not taken.
8112 kInMemoryObjectThreshold, spooler_definition().temporary_path);
336
337 unsigned char buffer[kPageSize];
338 ssize_t nbytes;
339 do {
340
1/2
✓ Branch 1 taken 3186672 times.
✗ Branch 2 not taken.
3186672 nbytes = source->Read(buffer, kPageSize);
341
2/2
✓ Branch 0 taken 3181296 times.
✓ Branch 1 taken 5376 times.
3186672 if (nbytes > 0)
342
1/2
✓ Branch 1 taken 3181296 times.
✗ Branch 2 not taken.
3181296 origin->Append(buffer, nbytes);
343
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3186672 times.
3186672 if (nbytes < 0) {
344 source->Close();
345 delete origin;
346 Respond(callback, UploaderResults(100, source->GetPath()));
347 return;
348 }
349
2/2
✓ Branch 0 taken 3178560 times.
✓ Branch 1 taken 8112 times.
3186672 } while (nbytes == kPageSize);
350
1/2
✓ Branch 1 taken 8112 times.
✗ Branch 2 not taken.
8112 source->Close();
351
1/2
✓ Branch 1 taken 8112 times.
✗ Branch 2 not taken.
8112 origin->Commit();
352
353 s3fanout::JobInfo *info = new s3fanout::JobInfo(
354
2/4
✓ Branch 1 taken 8112 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 8112 times.
✗ Branch 5 not taken.
16224 repository_alias_ + "/" + remote_path,
355 const_cast<void *>(static_cast<void const *>(callback)),
356
2/4
✓ Branch 1 taken 8112 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 8112 times.
✗ Branch 5 not taken.
8112 origin);
357
358
3/6
✓ Branch 2 taken 8112 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 8112 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 8112 times.
8112 if (HasPrefix(remote_path, ".cvmfs", false /*ignore_case*/)) {
359 info->request = s3fanout::JobInfo::kReqPutDotCvmfs;
360
3/6
✓ Branch 2 taken 8112 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 8112 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 8112 times.
8112 } else if (HasSuffix(remote_path, ".html", false)) {
361 info->request = s3fanout::JobInfo::kReqPutHtml;
362 } else {
363
1/2
✓ Branch 0 taken 8112 times.
✗ Branch 1 not taken.
8112 if (peek_before_put_)
364 8112 info->request = s3fanout::JobInfo::kReqHeadPut;
365 }
366
367 8112 RequestCtrl req_ctrl;
368
1/2
✓ Branch 1 taken 8112 times.
✗ Branch 2 not taken.
8112 MakePipe(req_ctrl.pipe_wait);
369 8112 req_ctrl.callback_forward = callback;
370
1/2
✓ Branch 1 taken 8112 times.
✗ Branch 2 not taken.
8112 req_ctrl.original_path = source->GetPath();
371 8112 info->callback = const_cast<void *>(static_cast<void const *>(
372
1/2
✓ Branch 1 taken 8112 times.
✗ Branch 2 not taken.
8112 MakeClosure(&S3Uploader::OnReqComplete, this, &req_ctrl)));
373
374
1/2
✓ Branch 1 taken 8112 times.
✗ Branch 2 not taken.
8112 UploadJobInfo(info);
375
1/2
✓ Branch 1 taken 8112 times.
✗ Branch 2 not taken.
8112 req_ctrl.WaitFor();
376
1/2
✓ Branch 2 taken 8112 times.
✗ Branch 3 not taken.
8112 LogCvmfs(kLogUploadS3, kLogDebug, "Uploading from source finished: %s",
377
1/2
✓ Branch 1 taken 8112 times.
✗ Branch 2 not taken.
16224 source->GetPath().c_str());
378 8112 }
379
380
381 9808 void S3Uploader::UploadJobInfo(s3fanout::JobInfo *info) {
382 9808 LogCvmfs(kLogUploadS3, kLogDebug,
383 "Uploading:\n"
384 "--> Object: '%s'\n"
385 "--> Bucket: '%s'\n"
386 "--> Host: '%s'\n",
387 info->object_key.c_str(), bucket_.c_str(), host_name_port_.c_str());
388
389 9808 s3fanout_mgr_->PushNewJob(info);
390 9808 }
391
392
393 1616 UploadStreamHandle *S3Uploader::InitStreamedUpload(const CallbackTN *callback) {
394 return new S3StreamHandle(callback, kInMemoryObjectThreshold,
395
1/2
✓ Branch 3 taken 1616 times.
✗ Branch 4 not taken.
1616 spooler_definition().temporary_path);
396 }
397
398
399 12832 void S3Uploader::StreamedUpload(UploadStreamHandle *handle,
400 UploadBuffer buffer,
401 const CallbackTN *callback) {
402 12832 S3StreamHandle *s3_handle = static_cast<S3StreamHandle *>(handle);
403
404 12832 s3_handle->buffer->Append(buffer.data, buffer.size);
405
1/2
✓ Branch 2 taken 12832 times.
✗ Branch 3 not taken.
12832 Respond(callback, UploaderResults(UploaderResults::kBufferUpload, 0));
406 12832 }
407
408
409 1616 void S3Uploader::FinalizeStreamedUpload(UploadStreamHandle *handle,
410 const shash::Any &content_hash) {
411 1616 S3StreamHandle *s3_handle = static_cast<S3StreamHandle *>(handle);
412
413 // New file name based on content hash or remote_path override
414 1616 std::string final_path;
415
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1616 times.
1616 if (s3_handle->remote_path != "") {
416 final_path = repository_alias_ + "/" + s3_handle->remote_path;
417 } else {
418
3/6
✓ Branch 1 taken 1616 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1616 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 1616 times.
✗ Branch 8 not taken.
1616 final_path = repository_alias_ + "/data/" + content_hash.MakePath();
419 }
420
421
1/2
✓ Branch 2 taken 1616 times.
✗ Branch 3 not taken.
1616 s3_handle->buffer->Commit();
422
423
1/2
✓ Branch 2 taken 1616 times.
✗ Branch 3 not taken.
1616 const size_t bytes_uploaded = s3_handle->buffer->GetSize();
424
425 s3fanout::JobInfo *info = new s3fanout::JobInfo(
426 final_path,
427 1616 const_cast<void *>(static_cast<void const *>(handle->commit_callback)),
428
2/4
✓ Branch 2 taken 1616 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1616 times.
✗ Branch 6 not taken.
1616 s3_handle->buffer.Release());
429
430
1/2
✓ Branch 0 taken 1616 times.
✗ Branch 1 not taken.
1616 if (peek_before_put_)
431 1616 info->request = s3fanout::JobInfo::kReqHeadPut;
432
1/2
✓ Branch 1 taken 1616 times.
✗ Branch 2 not taken.
1616 UploadJobInfo(info);
433
434 // Remove the temporary file
435
1/2
✓ Branch 0 taken 1616 times.
✗ Branch 1 not taken.
1616 delete s3_handle;
436
437 // Update statistics counters
438 1616 if (!content_hash.HasSuffix()
439
5/6
✓ Branch 0 taken 16 times.
✓ Branch 1 taken 1600 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 16 times.
✓ Branch 4 taken 1600 times.
✓ Branch 5 taken 16 times.
1616 || content_hash.suffix == shash::kSuffixPartial) {
440
1/2
✓ Branch 1 taken 1600 times.
✗ Branch 2 not taken.
1600 CountUploadedChunks();
441
1/2
✓ Branch 1 taken 1600 times.
✗ Branch 2 not taken.
1600 CountUploadedBytes(bytes_uploaded);
442
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 16 times.
16 } else if (content_hash.suffix == shash::kSuffixCatalog) {
443 CountUploadedCatalogs();
444 CountUploadedCatalogBytes(bytes_uploaded);
445 }
446 1616 }
447
448
449 96 s3fanout::JobInfo *S3Uploader::CreateJobInfo(const std::string &path) const {
450
2/4
✓ Branch 2 taken 96 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 96 times.
✗ Branch 6 not taken.
96 FileBackedBuffer *buf = FileBackedBuffer::Create(kInMemoryObjectThreshold);
451
1/2
✓ Branch 2 taken 96 times.
✗ Branch 3 not taken.
96 return new s3fanout::JobInfo(path, NULL, buf);
452 }
453
454
455 16 void S3Uploader::DoRemoveAsync(const std::string &file_to_delete) {
456
2/4
✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 16 times.
✗ Branch 5 not taken.
16 const std::string mangled_path = repository_alias_ + "/" + file_to_delete;
457
1/2
✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
16 s3fanout::JobInfo *info = CreateJobInfo(mangled_path);
458
459 16 info->request = s3fanout::JobInfo::kReqDelete;
460
461
1/2
✓ Branch 3 taken 16 times.
✗ Branch 4 not taken.
16 LogCvmfs(kLogUploadS3, kLogDebug, "Asynchronously removing %s/%s",
462 bucket_.c_str(), info->object_key.c_str());
463
1/2
✓ Branch 2 taken 16 times.
✗ Branch 3 not taken.
16 s3fanout_mgr_->PushNewJob(info);
464 16 }
465
466
467 8192 void S3Uploader::OnReqComplete(const upload::UploaderResults &results,
468 RequestCtrl *ctrl) {
469 8192 ctrl->return_code = results.return_code;
470
2/2
✓ Branch 0 taken 8112 times.
✓ Branch 1 taken 80 times.
8192 if (ctrl->callback_forward != NULL) {
471 // We are already in Respond() so we must not call it again
472 8112 const upload::UploaderResults fix_path(results.return_code,
473
1/2
✓ Branch 1 taken 8112 times.
✗ Branch 2 not taken.
8112 ctrl->original_path);
474
1/2
✓ Branch 1 taken 8112 times.
✗ Branch 2 not taken.
8112 (*(ctrl->callback_forward))(fix_path);
475
1/2
✓ Branch 0 taken 8112 times.
✗ Branch 1 not taken.
8112 delete ctrl->callback_forward;
476 8112 ctrl->callback_forward = NULL;
477 8112 }
478 8192 char c = 'c';
479
1/2
✓ Branch 1 taken 8192 times.
✗ Branch 2 not taken.
8192 WritePipe(ctrl->pipe_wait[1], &c, 1);
480 8192 }
481
482
483 80 bool S3Uploader::Peek(const std::string &path) {
484
2/4
✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 80 times.
✗ Branch 5 not taken.
80 const std::string mangled_path = repository_alias_ + "/" + path;
485
1/2
✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
80 s3fanout::JobInfo *info = CreateJobInfo(mangled_path);
486
487 80 RequestCtrl req_ctrl;
488
1/2
✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
80 MakePipe(req_ctrl.pipe_wait);
489 80 info->request = s3fanout::JobInfo::kReqHeadOnly;
490 80 info->callback = const_cast<void *>(static_cast<void const *>(
491
1/2
✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
80 MakeClosure(&S3Uploader::OnReqComplete, this, &req_ctrl)));
492
493
1/2
✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
80 IncJobsInFlight();
494
1/2
✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
80 UploadJobInfo(info);
495
1/2
✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
80 req_ctrl.WaitFor();
496
497 80 return req_ctrl.return_code == 0;
498 80 }
499
500
501 // noop: no mkdir needed in S3 storage
502 bool S3Uploader::Mkdir(const std::string &path) { return true; }
503
504
505 bool S3Uploader::PlaceBootstrappingShortcut(const shash::Any &object) {
506 return false; // TODO(rmeusel): implement
507 }
508
509
510 int64_t S3Uploader::DoGetObjectSize(const std::string &file_name) {
511 // TODO(dosarudaniel): use a head request for byte count
512 // Re-enable 661 integration test when done
513 return -EOPNOTSUPP;
514 }
515
516 } // namespace upload
517