GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/upload_s3.cc
Date: 2025-06-22 02:36:02
Exec Total Coverage
Lines: 202 285 70.9%
Branches: 179 436 41.1%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "upload_s3.h"
6
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <inttypes.h>
10 #include <unistd.h>
11
12 #include <string>
13 #include <vector>
14
15 #include "compression/compression.h"
16 #include "network/s3fanout.h"
17 #include "options.h"
18 #include "util/exception.h"
19 #include "util/logging.h"
20 #include "util/posix.h"
21 #include "util/string.h"
22
23 namespace upload {
24
25 /*
26 * Allowed values of x-amz-acl according to S3 API
27 */
28 static const char *x_amz_acl_allowed_values_[8] = {"private",
29 "public-read",
30 "public-write",
31 "authenticated-read",
32 "aws-exec-read",
33 "bucket-owner-read",
34 "bucket-owner-full-control",
35 ""};
36
37 7168 void S3Uploader::RequestCtrl::WaitFor() {
38 char c;
39
1/2
✓ Branch 1 taken 7168 times.
✗ Branch 2 not taken.
7168 ReadPipe(pipe_wait[0], &c, 1);
40
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7168 times.
7168 assert(c == 'c');
41
1/2
✓ Branch 1 taken 7168 times.
✗ Branch 2 not taken.
7168 ClosePipe(pipe_wait);
42 7168 }
43
44
45 168 S3Uploader::S3Uploader(const SpoolerDefinition &spooler_definition)
46 : AbstractUploader(spooler_definition)
47 168 , dns_buckets_(true)
48 168 , num_parallel_uploads_(kDefaultNumParallelUploads)
49 168 , num_retries_(kDefaultNumRetries)
50 168 , timeout_sec_(kDefaultTimeoutSec)
51 168 , authz_method_(s3fanout::kAuthzAwsV2)
52 168 , peek_before_put_(true)
53 168 , use_https_(false)
54
1/2
✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
168 , proxy_("")
55
1/2
✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
168 , temporary_path_(spooler_definition.temporary_path)
56
2/4
✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 14 taken 168 times.
✗ Branch 15 not taken.
336 , x_amz_acl_("public-read") {
57
2/4
✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 168 times.
✗ Branch 4 not taken.
168 assert(spooler_definition.IsValid()
58 && spooler_definition.driver_type == SpoolerDefinition::S3);
59
60 168 atomic_init32(&io_errors_);
61
62
2/4
✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 168 times.
168 if (!ParseSpoolerDefinition(spooler_definition)) {
63 PANIC(kLogStderr, "Error in parsing the spooler definition");
64 }
65
66
1/2
✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
168 s3fanout::S3FanoutManager::S3Config s3config;
67
1/2
✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
168 s3config.access_key = access_key_;
68
1/2
✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
168 s3config.secret_key = secret_key_;
69
1/2
✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
168 s3config.hostname_port = host_name_port_;
70 168 s3config.authz_method = authz_method_;
71
1/2
✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
168 s3config.region = region_;
72
1/2
✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
168 s3config.flavor = flavor_;
73
1/2
✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
168 s3config.bucket = bucket_;
74 168 s3config.dns_buckets = dns_buckets_;
75 168 s3config.pool_max_handles = num_parallel_uploads_;
76 168 s3config.opt_timeout_sec = timeout_sec_;
77 168 s3config.opt_max_retries = num_retries_;
78 168 s3config.opt_backoff_init_ms = kDefaultBackoffInitMs;
79 168 s3config.opt_backoff_max_ms = kDefaultBackoffMaxMs;
80
1/2
✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
168 s3config.x_amz_acl = x_amz_acl_;
81
82
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 168 times.
168 if (use_https_) {
83 s3config.protocol = "https";
84 } else {
85
1/2
✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
168 s3config.protocol = "http";
86 }
87
1/2
✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
168 s3config.proxy = proxy_;
88
89
3/6
✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 168 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 168 times.
✗ Branch 8 not taken.
168 s3fanout_mgr_ = new s3fanout::S3FanoutManager(s3config);
90
1/2
✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
168 s3fanout_mgr_->Spawn();
91
92 const int retval =
93 168 pthread_create(&thread_collect_results_, NULL, MainCollectResults, this);
94
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 168 times.
168 assert(retval == 0);
95 168 }
96
97
98 1008 S3Uploader::~S3Uploader() {
99 // Signal termination to our own worker thread
100 336 s3fanout_mgr_->PushCompletedJob(NULL);
101 336 pthread_join(thread_collect_results_, NULL);
102 672 }
103
104
105 168 bool S3Uploader::ParseSpoolerDefinition(
106 const SpoolerDefinition &spooler_definition) {
107 const std::vector<std::string> config = SplitString(
108
1/2
✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
168 spooler_definition.spooler_configuration, '@');
109
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 168 times.
168 if (config.size() != 2) {
110 LogCvmfs(kLogUploadS3, kLogStderr,
111 "Failed to parse spooler configuration string '%s'.\n"
112 "Provide: <repo_alias>@/path/to/s3.conf",
113 spooler_definition.spooler_configuration.c_str());
114 return false;
115 }
116
1/2
✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
168 repository_alias_ = config[0];
117 168 const std::string &config_path = config[1];
118
119
2/4
✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 168 times.
168 if (!FileExists(config_path)) {
120 LogCvmfs(kLogUploadS3, kLogStderr, "Cannot find S3 config file at '%s'",
121 config_path.c_str());
122 return false;
123 }
124
125 // Parse S3 configuration
126 BashOptionsManager options_manager = BashOptionsManager(
127
4/8
✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 168 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 168 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 168 times.
✗ Branch 11 not taken.
168 new DefaultOptionsTemplateManager(repository_alias_));
128
1/2
✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
168 options_manager.ParsePath(config_path, false);
129 168 std::string parameter;
130
131
3/6
✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 168 times.
168 if (!options_manager.GetValue("CVMFS_S3_HOST", &host_name_)) {
132 LogCvmfs(kLogUploadS3, kLogStderr,
133 "Failed to parse CVMFS_S3_HOST from '%s'", config_path.c_str());
134 return false;
135 }
136
3/6
✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 168 times.
168 if (!options_manager.GetValue("CVMFS_S3_ACCESS_KEY", &access_key_)) {
137 LogCvmfs(kLogUploadS3, kLogStderr,
138 "Failed to parse CVMFS_S3_ACCESS_KEY from '%s'.",
139 config_path.c_str());
140 return false;
141 }
142
3/6
✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 168 times.
168 if (!options_manager.GetValue("CVMFS_S3_SECRET_KEY", &secret_key_)) {
143 LogCvmfs(kLogUploadS3, kLogStderr,
144 "Failed to parse CVMFS_S3_SECRET_KEY from '%s'.",
145 config_path.c_str());
146 return false;
147 }
148
3/6
✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 168 times.
168 if (!options_manager.GetValue("CVMFS_S3_BUCKET", &bucket_)) {
149 LogCvmfs(kLogUploadS3, kLogStderr,
150 "Failed to parse CVMFS_S3_BUCKET from '%s'.", config_path.c_str());
151 return false;
152 }
153
3/6
✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 168 times.
✗ Branch 10 not taken.
168 if (options_manager.GetValue("CVMFS_S3_DNS_BUCKETS", &parameter)) {
154
1/2
✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
168 if (parameter == "false") {
155 168 dns_buckets_ = false;
156 }
157 }
158
3/6
✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 168 times.
✗ Branch 10 not taken.
168 if (options_manager.GetValue("CVMFS_S3_MAX_NUMBER_OF_PARALLEL_CONNECTIONS",
159 &parameter)) {
160
1/2
✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
168 num_parallel_uploads_ = String2Uint64(parameter);
161 }
162
3/6
✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 168 times.
168 if (options_manager.GetValue("CVMFS_S3_MAX_RETRIES", &parameter)) {
163 num_retries_ = String2Uint64(parameter);
164 }
165
3/6
✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 168 times.
168 if (options_manager.GetValue("CVMFS_S3_TIMEOUT", &parameter)) {
166 timeout_sec_ = String2Uint64(parameter);
167 }
168
3/6
✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 168 times.
168 if (options_manager.GetValue("CVMFS_S3_REGION", &region_)) {
169 authz_method_ = s3fanout::kAuthzAwsV4;
170 }
171
3/6
✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 168 times.
168 if (options_manager.GetValue("CVMFS_S3_FLAVOR", &flavor_)) {
172 if (flavor_ == "azure") {
173 authz_method_ = s3fanout::kAuthzAzure;
174 } else if (flavor_ == "awsv2") {
175 authz_method_ = s3fanout::kAuthzAwsV2;
176 } else if (flavor_ == "awsv4") {
177 authz_method_ = s3fanout::kAuthzAwsV4;
178 } else {
179 LogCvmfs(kLogUploadS3, kLogStderr,
180 "Failed to parse CVMFS_S3_FLAVOR from '%s', "
181 "valid options are azure, awsv2 or awsv4",
182 config_path.c_str());
183 return false;
184 }
185 }
186
3/6
✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 168 times.
168 if (options_manager.GetValue("CVMFS_S3_PEEK_BEFORE_PUT", &parameter)) {
187 peek_before_put_ = options_manager.IsOn(parameter);
188 }
189
3/6
✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 168 times.
168 if (options_manager.GetValue("CVMFS_S3_X_AMZ_ACL", &parameter)) {
190 bool isAllowed = false;
191 size_t const len = sizeof(x_amz_acl_allowed_values_)
192 / sizeof(x_amz_acl_allowed_values_[0]);
193 for (size_t i = 0; i < len; i++) {
194 if (x_amz_acl_allowed_values_[i] == parameter) {
195 isAllowed = true;
196 break;
197 }
198 }
199 if (!isAllowed) {
200 LogCvmfs(kLogUploadS3, kLogStderr,
201 "%s is not an allowed value for CVMFS_S3_X_AMZ_ACL",
202 parameter.c_str());
203 return false;
204 }
205 x_amz_acl_ = parameter;
206 }
207
208
3/6
✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 168 times.
168 if (options_manager.GetValue("CVMFS_S3_USE_HTTPS", &parameter)) {
209 use_https_ = options_manager.IsOn(parameter);
210 }
211
212
3/6
✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 168 times.
✗ Branch 10 not taken.
168 if (options_manager.GetValue("CVMFS_S3_PORT", &parameter)) {
213
2/4
✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 168 times.
✗ Branch 5 not taken.
168 host_name_port_ = host_name_ + ":" + parameter;
214 } else {
215 host_name_port_ = host_name_;
216 }
217
218
3/6
✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 168 times.
168 if (options_manager.IsDefined("CVMFS_S3_PROXY")) {
219 options_manager.GetValue("CVMFS_S3_PROXY", &proxy_);
220 }
221
222 168 return true;
223 168 }
224
225
226 225 bool S3Uploader::WillHandle(const SpoolerDefinition &spooler_definition) {
227 225 return spooler_definition.driver_type == SpoolerDefinition::S3;
228 }
229
230
231 bool S3Uploader::Create() {
232 if (!dns_buckets_)
233 return false;
234
235 s3fanout::JobInfo *info = CreateJobInfo("");
236 info->request = s3fanout::JobInfo::kReqPutBucket;
237 std::string request_content;
238 if (!region_.empty()) {
239 request_content = std::string("<CreateBucketConfiguration xmlns="
240 "\"http://s3.amazonaws.com/doc/2006-03-01/\">"
241 "<LocationConstraint>")
242 + region_
243 + "</LocationConstraint>"
244 "</CreateBucketConfiguration>";
245 info->origin->Append(request_content.data(), request_content.length());
246 info->origin->Commit();
247 }
248
249 RequestCtrl req_ctrl;
250 MakePipe(req_ctrl.pipe_wait);
251 info->callback = const_cast<void *>(static_cast<void const *>(
252 MakeClosure(&S3Uploader::OnReqComplete, this, &req_ctrl)));
253
254 IncJobsInFlight();
255 UploadJobInfo(info);
256 req_ctrl.WaitFor();
257
258 return req_ctrl.return_code == 0;
259 }
260
261
262 14 unsigned int S3Uploader::GetNumberOfErrors() const {
263 14 return atomic_read32(&io_errors_);
264 }
265
266
267 /**
268 * Worker thread takes care of requesting new jobs and cleaning old ones.
269 */
270 168 void *S3Uploader::MainCollectResults(void *data) {
271 168 LogCvmfs(kLogUploadS3, kLogDebug, "Upload_S3 WorkerThread started.");
272 168 S3Uploader *uploader = reinterpret_cast<S3Uploader *>(data);
273
274 while (true) {
275 8764 s3fanout::JobInfo *info = uploader->s3fanout_mgr_->PopCompletedJob();
276
2/2
✓ Branch 0 taken 168 times.
✓ Branch 1 taken 8596 times.
8764 if (!info)
277 168 break;
278 // Report completed job
279 8596 int reply_code = 0;
280
2/2
✓ Branch 0 taken 28 times.
✓ Branch 1 taken 8568 times.
8596 if (info->error_code != s3fanout::kFailOk) {
281
1/2
✓ Branch 0 taken 28 times.
✗ Branch 1 not taken.
28 if ((info->request != s3fanout::JobInfo::kReqHeadOnly)
282
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 28 times.
28 || (info->error_code != s3fanout::kFailNotFound)) {
283 LogCvmfs(kLogUploadS3, kLogStderr,
284 "Upload job for '%s' failed. (error code: %d - %s)",
285 info->object_key.c_str(), info->error_code,
286 s3fanout::Code2Ascii(info->error_code));
287 reply_code = 99;
288 atomic_inc32(&uploader->io_errors_);
289 }
290 }
291
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 8582 times.
8596 if (info->request == s3fanout::JobInfo::kReqDelete) {
292
1/2
✓ Branch 2 taken 14 times.
✗ Branch 3 not taken.
14 uploader->Respond(NULL, UploaderResults());
293
2/2
✓ Branch 0 taken 70 times.
✓ Branch 1 taken 8512 times.
8582 } else if (info->request == s3fanout::JobInfo::kReqHeadOnly) {
294
2/2
✓ Branch 0 taken 28 times.
✓ Branch 1 taken 42 times.
70 if (info->error_code == s3fanout::kFailNotFound)
295 28 reply_code = 1;
296
1/2
✓ Branch 1 taken 70 times.
✗ Branch 2 not taken.
70 uploader->Respond(static_cast<CallbackTN *>(info->callback),
297 140 UploaderResults(UploaderResults::kLookup, reply_code));
298 } else {
299
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8512 times.
8512 if (info->request == s3fanout::JobInfo::kReqHeadPut) {
300 // The HEAD request was not transformed into a PUT request, thus this
301 // was a duplicate
302 // Uploaded catalogs are always unique ->
303 // assume this was a regular file and decrease appropriate counters
304 uploader->CountDuplicates();
305 uploader->DecUploadedChunks();
306 uploader->CountUploadedBytes(-(info->payload_size));
307 }
308 17024 uploader->Respond(
309
1/2
✓ Branch 1 taken 8512 times.
✗ Branch 2 not taken.
8512 static_cast<CallbackTN *>(info->callback),
310 17024 UploaderResults(UploaderResults::kChunkCommit, reply_code));
311
312
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 8512 times.
8512 assert(!info->origin.IsValid());
313 }
314
1/2
✓ Branch 0 taken 8596 times.
✗ Branch 1 not taken.
8596 delete info;
315 8596 }
316
317 168 LogCvmfs(kLogUploadS3, kLogDebug, "Upload_S3 WorkerThread finished.");
318 168 return NULL;
319 }
320
321
322 7098 void S3Uploader::DoUpload(const std::string &remote_path,
323 IngestionSource *source,
324 const CallbackTN *callback) {
325
1/2
✓ Branch 1 taken 7098 times.
✗ Branch 2 not taken.
7098 bool rvb = source->Open();
326
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7098 times.
7098 if (!rvb) {
327 Respond(callback, UploaderResults(100, source->GetPath()));
328 return;
329 }
330 uint64_t size;
331
1/2
✓ Branch 1 taken 7098 times.
✗ Branch 2 not taken.
7098 rvb = source->GetSize(&size);
332
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7098 times.
7098 assert(rvb);
333
334 7098 FileBackedBuffer *origin = FileBackedBuffer::Create(
335
1/2
✓ Branch 2 taken 7098 times.
✗ Branch 3 not taken.
7098 kInMemoryObjectThreshold, spooler_definition().temporary_path);
336
337 unsigned char buffer[kPageSize];
338 ssize_t nbytes;
339 do {
340
1/2
✓ Branch 1 taken 2788338 times.
✗ Branch 2 not taken.
2788338 nbytes = source->Read(buffer, kPageSize);
341
2/2
✓ Branch 0 taken 2783634 times.
✓ Branch 1 taken 4704 times.
2788338 if (nbytes > 0)
342
1/2
✓ Branch 1 taken 2783634 times.
✗ Branch 2 not taken.
2783634 origin->Append(buffer, nbytes);
343
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2788338 times.
2788338 if (nbytes < 0) {
344 source->Close();
345 delete origin;
346 Respond(callback, UploaderResults(100, source->GetPath()));
347 return;
348 }
349
2/2
✓ Branch 0 taken 2781240 times.
✓ Branch 1 taken 7098 times.
2788338 } while (nbytes == kPageSize);
350
1/2
✓ Branch 1 taken 7098 times.
✗ Branch 2 not taken.
7098 source->Close();
351
1/2
✓ Branch 1 taken 7098 times.
✗ Branch 2 not taken.
7098 origin->Commit();
352
353 s3fanout::JobInfo *info = new s3fanout::JobInfo(
354
2/4
✓ Branch 1 taken 7098 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 7098 times.
✗ Branch 5 not taken.
14196 repository_alias_ + "/" + remote_path,
355 const_cast<void *>(static_cast<void const *>(callback)),
356
2/4
✓ Branch 1 taken 7098 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 7098 times.
✗ Branch 5 not taken.
7098 origin);
357
358
3/6
✓ Branch 2 taken 7098 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 7098 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 7098 times.
7098 if (HasPrefix(remote_path, ".cvmfs", false /*ignore_case*/)) {
359 info->request = s3fanout::JobInfo::kReqPutDotCvmfs;
360
3/6
✓ Branch 2 taken 7098 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 7098 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 7098 times.
7098 } else if (HasSuffix(remote_path, ".html", false)) {
361 info->request = s3fanout::JobInfo::kReqPutHtml;
362 } else {
363
1/2
✓ Branch 0 taken 7098 times.
✗ Branch 1 not taken.
7098 if (peek_before_put_)
364 7098 info->request = s3fanout::JobInfo::kReqHeadPut;
365 }
366
367 7098 RequestCtrl req_ctrl;
368
1/2
✓ Branch 1 taken 7098 times.
✗ Branch 2 not taken.
7098 MakePipe(req_ctrl.pipe_wait);
369 7098 req_ctrl.callback_forward = callback;
370
1/2
✓ Branch 1 taken 7098 times.
✗ Branch 2 not taken.
7098 req_ctrl.original_path = source->GetPath();
371 7098 info->callback = const_cast<void *>(static_cast<void const *>(
372
1/2
✓ Branch 1 taken 7098 times.
✗ Branch 2 not taken.
7098 MakeClosure(&S3Uploader::OnReqComplete, this, &req_ctrl)));
373
374
1/2
✓ Branch 1 taken 7098 times.
✗ Branch 2 not taken.
7098 UploadJobInfo(info);
375
1/2
✓ Branch 1 taken 7098 times.
✗ Branch 2 not taken.
7098 req_ctrl.WaitFor();
376
1/2
✓ Branch 2 taken 7098 times.
✗ Branch 3 not taken.
7098 LogCvmfs(kLogUploadS3, kLogDebug, "Uploading from source finished: %s",
377
1/2
✓ Branch 1 taken 7098 times.
✗ Branch 2 not taken.
14196 source->GetPath().c_str());
378 7098 }
379
380
381 8582 void S3Uploader::UploadJobInfo(s3fanout::JobInfo *info) {
382 8582 LogCvmfs(kLogUploadS3, kLogDebug,
383 "Uploading:\n"
384 "--> Object: '%s'\n"
385 "--> Bucket: '%s'\n"
386 "--> Host: '%s'\n",
387 info->object_key.c_str(), bucket_.c_str(), host_name_port_.c_str());
388
389 8582 s3fanout_mgr_->PushNewJob(info);
390 8582 }
391
392
393 1414 UploadStreamHandle *S3Uploader::InitStreamedUpload(const CallbackTN *callback) {
394 return new S3StreamHandle(callback, kInMemoryObjectThreshold,
395
1/2
✓ Branch 3 taken 1414 times.
✗ Branch 4 not taken.
1414 spooler_definition().temporary_path);
396 }
397
398
399 11228 void S3Uploader::StreamedUpload(UploadStreamHandle *handle,
400 UploadBuffer buffer,
401 const CallbackTN *callback) {
402 11228 S3StreamHandle *s3_handle = static_cast<S3StreamHandle *>(handle);
403
404 11228 s3_handle->buffer->Append(buffer.data, buffer.size);
405
1/2
✓ Branch 2 taken 11228 times.
✗ Branch 3 not taken.
11228 Respond(callback, UploaderResults(UploaderResults::kBufferUpload, 0));
406 11228 }
407
408
409 1414 void S3Uploader::FinalizeStreamedUpload(UploadStreamHandle *handle,
410 const shash::Any &content_hash) {
411 1414 S3StreamHandle *s3_handle = static_cast<S3StreamHandle *>(handle);
412
413 // New file name based on content hash or remote_path override
414 1414 std::string final_path;
415
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1414 times.
1414 if (s3_handle->remote_path != "") {
416 final_path = repository_alias_ + "/" + s3_handle->remote_path;
417 } else {
418
3/6
✓ Branch 1 taken 1414 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1414 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 1414 times.
✗ Branch 8 not taken.
1414 final_path = repository_alias_ + "/data/" + content_hash.MakePath();
419 }
420
421
1/2
✓ Branch 2 taken 1414 times.
✗ Branch 3 not taken.
1414 s3_handle->buffer->Commit();
422
423
1/2
✓ Branch 2 taken 1414 times.
✗ Branch 3 not taken.
1414 const size_t bytes_uploaded = s3_handle->buffer->GetSize();
424
425 s3fanout::JobInfo *info = new s3fanout::JobInfo(
426 final_path,
427 1414 const_cast<void *>(static_cast<void const *>(handle->commit_callback)),
428
2/4
✓ Branch 2 taken 1414 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1414 times.
✗ Branch 6 not taken.
1414 s3_handle->buffer.Release());
429
430
1/2
✓ Branch 0 taken 1414 times.
✗ Branch 1 not taken.
1414 if (peek_before_put_)
431 1414 info->request = s3fanout::JobInfo::kReqHeadPut;
432
1/2
✓ Branch 1 taken 1414 times.
✗ Branch 2 not taken.
1414 UploadJobInfo(info);
433
434 // Remove the temporary file
435
1/2
✓ Branch 0 taken 1414 times.
✗ Branch 1 not taken.
1414 delete s3_handle;
436
437 // Update statistics counters
438 1414 if (!content_hash.HasSuffix()
439
5/6
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 1400 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 14 times.
✓ Branch 4 taken 1400 times.
✓ Branch 5 taken 14 times.
1414 || content_hash.suffix == shash::kSuffixPartial) {
440
1/2
✓ Branch 1 taken 1400 times.
✗ Branch 2 not taken.
1400 CountUploadedChunks();
441
1/2
✓ Branch 1 taken 1400 times.
✗ Branch 2 not taken.
1400 CountUploadedBytes(bytes_uploaded);
442
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 14 times.
14 } else if (content_hash.suffix == shash::kSuffixCatalog) {
443 CountUploadedCatalogs();
444 CountUploadedCatalogBytes(bytes_uploaded);
445 }
446 1414 }
447
448
449 84 s3fanout::JobInfo *S3Uploader::CreateJobInfo(const std::string &path) const {
450
2/4
✓ Branch 2 taken 84 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 84 times.
✗ Branch 6 not taken.
84 FileBackedBuffer *buf = FileBackedBuffer::Create(kInMemoryObjectThreshold);
451
1/2
✓ Branch 2 taken 84 times.
✗ Branch 3 not taken.
84 return new s3fanout::JobInfo(path, NULL, buf);
452 }
453
454
455 14 void S3Uploader::DoRemoveAsync(const std::string &file_to_delete) {
456
2/4
✓ Branch 1 taken 14 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 14 times.
✗ Branch 5 not taken.
14 const std::string mangled_path = repository_alias_ + "/" + file_to_delete;
457
1/2
✓ Branch 1 taken 14 times.
✗ Branch 2 not taken.
14 s3fanout::JobInfo *info = CreateJobInfo(mangled_path);
458
459 14 info->request = s3fanout::JobInfo::kReqDelete;
460
461
1/2
✓ Branch 3 taken 14 times.
✗ Branch 4 not taken.
14 LogCvmfs(kLogUploadS3, kLogDebug, "Asynchronously removing %s/%s",
462 bucket_.c_str(), info->object_key.c_str());
463
1/2
✓ Branch 2 taken 14 times.
✗ Branch 3 not taken.
14 s3fanout_mgr_->PushNewJob(info);
464 14 }
465
466
467 7168 void S3Uploader::OnReqComplete(const upload::UploaderResults &results,
468 RequestCtrl *ctrl) {
469 7168 ctrl->return_code = results.return_code;
470
2/2
✓ Branch 0 taken 7098 times.
✓ Branch 1 taken 70 times.
7168 if (ctrl->callback_forward != NULL) {
471 // We are already in Respond() so we must not call it again
472 7098 const upload::UploaderResults fix_path(results.return_code,
473
1/2
✓ Branch 1 taken 7098 times.
✗ Branch 2 not taken.
7098 ctrl->original_path);
474
1/2
✓ Branch 1 taken 7098 times.
✗ Branch 2 not taken.
7098 (*(ctrl->callback_forward))(fix_path);
475
1/2
✓ Branch 0 taken 7098 times.
✗ Branch 1 not taken.
7098 delete ctrl->callback_forward;
476 7098 ctrl->callback_forward = NULL;
477 7098 }
478 7168 char c = 'c';
479
1/2
✓ Branch 1 taken 7168 times.
✗ Branch 2 not taken.
7168 WritePipe(ctrl->pipe_wait[1], &c, 1);
480 7168 }
481
482
483 70 bool S3Uploader::Peek(const std::string &path) {
484
2/4
✓ Branch 1 taken 70 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 70 times.
✗ Branch 5 not taken.
70 const std::string mangled_path = repository_alias_ + "/" + path;
485
1/2
✓ Branch 1 taken 70 times.
✗ Branch 2 not taken.
70 s3fanout::JobInfo *info = CreateJobInfo(mangled_path);
486
487 70 RequestCtrl req_ctrl;
488
1/2
✓ Branch 1 taken 70 times.
✗ Branch 2 not taken.
70 MakePipe(req_ctrl.pipe_wait);
489 70 info->request = s3fanout::JobInfo::kReqHeadOnly;
490 70 info->callback = const_cast<void *>(static_cast<void const *>(
491
1/2
✓ Branch 1 taken 70 times.
✗ Branch 2 not taken.
70 MakeClosure(&S3Uploader::OnReqComplete, this, &req_ctrl)));
492
493
1/2
✓ Branch 1 taken 70 times.
✗ Branch 2 not taken.
70 IncJobsInFlight();
494
1/2
✓ Branch 1 taken 70 times.
✗ Branch 2 not taken.
70 UploadJobInfo(info);
495
1/2
✓ Branch 1 taken 70 times.
✗ Branch 2 not taken.
70 req_ctrl.WaitFor();
496
497 70 return req_ctrl.return_code == 0;
498 70 }
499
500
501 // noop: no mkdir needed in S3 storage
502 bool S3Uploader::Mkdir(const std::string &path) { return true; }
503
504
505 bool S3Uploader::PlaceBootstrappingShortcut(const shash::Any &object) {
506 return false; // TODO(rmeusel): implement
507 }
508
509
510 int64_t S3Uploader::DoGetObjectSize(const std::string &file_name) {
511 // TODO(dosarudaniel): use a head request for byte count
512 // Re-enable 661 integration test when done
513 return -EOPNOTSUPP;
514 }
515
516 } // namespace upload
517