GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/upload_s3.cc
Date: 2024-04-28 02:33:07
Exec Total Coverage
Lines: 198 281 70.5%
Branches: 179 436 41.1%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "upload_s3.h"
6
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <inttypes.h>
10 #include <unistd.h>
11
12 #include <string>
13 #include <vector>
14
15 #include "compression.h"
16 #include "network/s3fanout.h"
17 #include "options.h"
18 #include "util/exception.h"
19 #include "util/logging.h"
20 #include "util/posix.h"
21 #include "util/string.h"
22
23 namespace upload {
24
25 /*
26 * Allowed values of x-amz-acl according to S3 API
27 */
28 static const char* x_amz_acl_allowed_values_[8] = {
29 "private",
30 "public-read",
31 "public-write",
32 "authenticated-read",
33 "aws-exec-read",
34 "bucket-owner-read",
35 "bucket-owner-full-control",
36 ""
37 };
38
39 512 void S3Uploader::RequestCtrl::WaitFor() {
40 char c;
41
1/2
✓ Branch 1 taken 512 times.
✗ Branch 2 not taken.
512 ReadPipe(pipe_wait[0], &c, 1);
42
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 512 times.
512 assert(c == 'c');
43
1/2
✓ Branch 1 taken 512 times.
✗ Branch 2 not taken.
512 ClosePipe(pipe_wait);
44 512 }
45
46
47 12 S3Uploader::S3Uploader(const SpoolerDefinition &spooler_definition)
48 : AbstractUploader(spooler_definition)
49 12 , dns_buckets_(true)
50 12 , num_parallel_uploads_(kDefaultNumParallelUploads)
51 12 , num_retries_(kDefaultNumRetries)
52 12 , timeout_sec_(kDefaultTimeoutSec)
53 12 , authz_method_(s3fanout::kAuthzAwsV2)
54 12 , peek_before_put_(true)
55 12 , use_https_(false)
56
1/2
✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
12 , proxy_("")
57
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 , temporary_path_(spooler_definition.temporary_path)
58
2/4
✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 14 taken 12 times.
✗ Branch 15 not taken.
24 , x_amz_acl_("public-read")
59 {
60
2/4
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 12 times.
✗ Branch 4 not taken.
12 assert(spooler_definition.IsValid() &&
61 spooler_definition.driver_type == SpoolerDefinition::S3);
62
63 12 atomic_init32(&io_errors_);
64
65
2/4
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 12 times.
12 if (!ParseSpoolerDefinition(spooler_definition)) {
66 PANIC(kLogStderr, "Error in parsing the spooler definition");
67 }
68
69
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 s3fanout::S3FanoutManager::S3Config s3config;
70
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 s3config.access_key = access_key_;
71
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 s3config.secret_key = secret_key_;
72
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 s3config.hostname_port = host_name_port_;
73 12 s3config.authz_method = authz_method_;
74
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 s3config.region = region_;
75
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 s3config.flavor = flavor_;
76
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 s3config.bucket = bucket_;
77 12 s3config.dns_buckets = dns_buckets_;
78 12 s3config.pool_max_handles = num_parallel_uploads_;
79 12 s3config.opt_timeout_sec = timeout_sec_;
80 12 s3config.opt_max_retries = num_retries_;
81 12 s3config.opt_backoff_init_ms = kDefaultBackoffInitMs;
82 12 s3config.opt_backoff_max_ms = kDefaultBackoffMaxMs;
83
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 s3config.x_amz_acl = x_amz_acl_;
84
85
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
12 if (use_https_) {
86 s3config.protocol = "https";
87 } else {
88
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 s3config.protocol = "http";
89 }
90
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 s3config.proxy = proxy_;
91
92
3/6
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 12 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 12 times.
✗ Branch 8 not taken.
12 s3fanout_mgr_ = new s3fanout::S3FanoutManager(s3config);
93
1/2
✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
12 s3fanout_mgr_->Spawn();
94
95 12 int retval = pthread_create(
96 &thread_collect_results_, NULL, MainCollectResults, this);
97
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
12 assert(retval == 0);
98 12 }
99
100
101 72 S3Uploader::~S3Uploader() {
102 // Signal termination to our own worker thread
103 24 s3fanout_mgr_->PushCompletedJob(NULL);
104 24 pthread_join(thread_collect_results_, NULL);
105 48 }
106
107
108 12 bool S3Uploader::ParseSpoolerDefinition(
109 const SpoolerDefinition &spooler_definition)
110 {
111 const std::vector<std::string> config =
112
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 SplitString(spooler_definition.spooler_configuration, '@');
113
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 12 times.
12 if (config.size() != 2) {
114 LogCvmfs(kLogUploadS3, kLogStderr,
115 "Failed to parse spooler configuration string '%s'.\n"
116 "Provide: <repo_alias>@/path/to/s3.conf",
117 spooler_definition.spooler_configuration.c_str());
118 return false;
119 }
120
1/2
✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
12 repository_alias_ = config[0];
121 12 const std::string &config_path = config[1];
122
123
2/4
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 12 times.
12 if (!FileExists(config_path)) {
124 LogCvmfs(kLogUploadS3, kLogStderr,
125 "Cannot find S3 config file at '%s'",
126 config_path.c_str());
127 return false;
128 }
129
130 // Parse S3 configuration
131 BashOptionsManager options_manager = BashOptionsManager(
132
4/8
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 12 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 12 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 12 times.
✗ Branch 11 not taken.
12 new DefaultOptionsTemplateManager(repository_alias_));
133
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 options_manager.ParsePath(config_path, false);
134 12 std::string parameter;
135
136
3/6
✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 12 times.
12 if (!options_manager.GetValue("CVMFS_S3_HOST", &host_name_)) {
137 LogCvmfs(kLogUploadS3, kLogStderr,
138 "Failed to parse CVMFS_S3_HOST from '%s'",
139 config_path.c_str());
140 return false;
141 }
142
3/6
✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 12 times.
12 if (!options_manager.GetValue("CVMFS_S3_ACCESS_KEY", &access_key_)) {
143 LogCvmfs(kLogUploadS3, kLogStderr,
144 "Failed to parse CVMFS_S3_ACCESS_KEY from '%s'.",
145 config_path.c_str());
146 return false;
147 }
148
3/6
✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 12 times.
12 if (!options_manager.GetValue("CVMFS_S3_SECRET_KEY", &secret_key_)) {
149 LogCvmfs(kLogUploadS3, kLogStderr,
150 "Failed to parse CVMFS_S3_SECRET_KEY from '%s'.",
151 config_path.c_str());
152 return false;
153 }
154
3/6
✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 12 times.
12 if (!options_manager.GetValue("CVMFS_S3_BUCKET", &bucket_)) {
155 LogCvmfs(kLogUploadS3, kLogStderr,
156 "Failed to parse CVMFS_S3_BUCKET from '%s'.",
157 config_path.c_str());
158 return false;
159 }
160
3/6
✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 12 times.
✗ Branch 10 not taken.
12 if (options_manager.GetValue("CVMFS_S3_DNS_BUCKETS", &parameter)) {
161
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 if (parameter == "false") {
162 12 dns_buckets_ = false;
163 }
164 }
165
3/6
✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 12 times.
✗ Branch 10 not taken.
12 if (options_manager.GetValue("CVMFS_S3_MAX_NUMBER_OF_PARALLEL_CONNECTIONS",
166 &parameter))
167 {
168
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 num_parallel_uploads_ = String2Uint64(parameter);
169 }
170
3/6
✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 12 times.
12 if (options_manager.GetValue("CVMFS_S3_MAX_RETRIES", &parameter)) {
171 num_retries_ = String2Uint64(parameter);
172 }
173
3/6
✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 12 times.
12 if (options_manager.GetValue("CVMFS_S3_TIMEOUT", &parameter)) {
174 timeout_sec_ = String2Uint64(parameter);
175 }
176
3/6
✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 12 times.
12 if (options_manager.GetValue("CVMFS_S3_REGION", &region_)) {
177 authz_method_ = s3fanout::kAuthzAwsV4;
178 }
179
3/6
✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 12 times.
12 if (options_manager.GetValue("CVMFS_S3_FLAVOR", &flavor_)) {
180 if (flavor_ == "azure") {
181 authz_method_ = s3fanout::kAuthzAzure;
182 } else if (flavor_ == "awsv2") {
183 authz_method_ = s3fanout::kAuthzAwsV2;
184 } else if (flavor_ == "awsv4") {
185 authz_method_ = s3fanout::kAuthzAwsV4;
186 } else {
187 LogCvmfs(kLogUploadS3, kLogStderr,
188 "Failed to parse CVMFS_S3_FLAVOR from '%s', "
189 "valid options are azure, awsv2 or awsv4",
190 config_path.c_str());
191 return false;
192 }
193 }
194
3/6
✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 12 times.
12 if (options_manager.GetValue("CVMFS_S3_PEEK_BEFORE_PUT", &parameter)) {
195 peek_before_put_ = options_manager.IsOn(parameter);
196 }
197
3/6
✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 12 times.
12 if (options_manager.GetValue("CVMFS_S3_X_AMZ_ACL", &parameter)) {
198 bool isAllowed = false;
199 size_t const len = sizeof(x_amz_acl_allowed_values_) /
200 sizeof(x_amz_acl_allowed_values_[0]);
201 for (size_t i = 0; i < len; i++) {
202 if (x_amz_acl_allowed_values_[i] == parameter) {
203 isAllowed = true;
204 break;
205 }
206 }
207 if (!isAllowed) {
208 LogCvmfs(kLogUploadS3, kLogStderr,
209 "%s is not an allowed value for CVMFS_S3_X_AMZ_ACL",
210 parameter.c_str());
211 return false;
212 }
213 x_amz_acl_ = parameter;
214 }
215
216
3/6
✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 12 times.
12 if (options_manager.GetValue("CVMFS_S3_USE_HTTPS", &parameter)) {
217 use_https_ = options_manager.IsOn(parameter);
218 }
219
220
3/6
✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 12 times.
✗ Branch 10 not taken.
12 if (options_manager.GetValue("CVMFS_S3_PORT", &parameter)) {
221
2/4
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 12 times.
✗ Branch 5 not taken.
12 host_name_port_ = host_name_ + ":" + parameter;
222 } else {
223 host_name_port_ = host_name_;
224 }
225
226
3/6
✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 12 times.
12 if (options_manager.IsDefined("CVMFS_S3_PROXY")) {
227 options_manager.GetValue("CVMFS_S3_PROXY", &proxy_);
228 }
229
230 12 return true;
231 12 }
232
233
234 17 bool S3Uploader::WillHandle(const SpoolerDefinition &spooler_definition) {
235 17 return spooler_definition.driver_type == SpoolerDefinition::S3;
236 }
237
238
239 bool S3Uploader::Create() {
240 if (!dns_buckets_)
241 return false;
242
243 s3fanout::JobInfo *info = CreateJobInfo("");
244 info->request = s3fanout::JobInfo::kReqPutBucket;
245 std::string request_content;
246 if (!region_.empty()) {
247 request_content =
248 std::string("<CreateBucketConfiguration xmlns="
249 "\"http://s3.amazonaws.com/doc/2006-03-01/\">"
250 "<LocationConstraint>") + region_ + "</LocationConstraint>"
251 "</CreateBucketConfiguration>";
252 info->origin->Append(request_content.data(), request_content.length());
253 info->origin->Commit();
254 }
255
256 RequestCtrl req_ctrl;
257 MakePipe(req_ctrl.pipe_wait);
258 info->callback = const_cast<void*>(static_cast<void const*>(MakeClosure(
259 &S3Uploader::OnReqComplete, this, &req_ctrl)));
260
261 IncJobsInFlight();
262 UploadJobInfo(info);
263 req_ctrl.WaitFor();
264
265 return req_ctrl.return_code == 0;
266 }
267
268
269 1 unsigned int S3Uploader::GetNumberOfErrors() const {
270 1 return atomic_read32(&io_errors_);
271 }
272
273
274 /**
275 * Worker thread takes care of requesting new jobs and cleaning old ones.
276 */
277 12 void *S3Uploader::MainCollectResults(void *data) {
278 12 LogCvmfs(kLogUploadS3, kLogDebug, "Upload_S3 WorkerThread started.");
279 12 S3Uploader *uploader = reinterpret_cast<S3Uploader *>(data);
280
281 while (true) {
282 626 s3fanout::JobInfo *info = uploader->s3fanout_mgr_->PopCompletedJob();
283
2/2
✓ Branch 0 taken 12 times.
✓ Branch 1 taken 614 times.
626 if (!info)
284 12 break;
285 // Report completed job
286 614 int reply_code = 0;
287
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 612 times.
614 if (info->error_code != s3fanout::kFailOk) {
288
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if ((info->request != s3fanout::JobInfo::kReqHeadOnly) ||
289
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 (info->error_code != s3fanout::kFailNotFound)) {
290 LogCvmfs(kLogUploadS3, kLogStderr,
291 "Upload job for '%s' failed. (error code: %d - %s)",
292 info->object_key.c_str(),
293 info->error_code,
294 s3fanout::Code2Ascii(info->error_code));
295 reply_code = 99;
296 atomic_inc32(&uploader->io_errors_);
297 }
298 }
299
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 613 times.
614 if (info->request == s3fanout::JobInfo::kReqDelete) {
300
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 uploader->Respond(NULL, UploaderResults());
301
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 608 times.
613 } else if (info->request == s3fanout::JobInfo::kReqHeadOnly) {
302
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 3 times.
5 if (info->error_code == s3fanout::kFailNotFound) reply_code = 1;
303
1/2
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
5 uploader->Respond(static_cast<CallbackTN*>(info->callback),
304 10 UploaderResults(UploaderResults::kLookup,
305 reply_code));
306 } else {
307
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 608 times.
608 if (info->request == s3fanout::JobInfo::kReqHeadPut) {
308 // The HEAD request was not transformed into a PUT request, thus this
309 // was a duplicate
310 // Uploaded catalogs are always unique ->
311 // assume this was a regular file and decrease appropriate counters
312 uploader->CountDuplicates();
313 uploader->DecUploadedChunks();
314 uploader->CountUploadedBytes(-(info->payload_size));
315 }
316
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 uploader->Respond(static_cast<CallbackTN*>(info->callback),
317 1216 UploaderResults(UploaderResults::kChunkCommit,
318 reply_code));
319
320
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 608 times.
608 assert(!info->origin.IsValid());
321 }
322
1/2
✓ Branch 0 taken 614 times.
✗ Branch 1 not taken.
614 delete info;
323 614 }
324
325 12 LogCvmfs(kLogUploadS3, kLogDebug, "Upload_S3 WorkerThread finished.");
326 12 return NULL;
327 }
328
329
330 507 void S3Uploader::DoUpload(
331 const std::string &remote_path,
332 IngestionSource *source,
333 const CallbackTN *callback
334 ) {
335
1/2
✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
507 bool rvb = source->Open();
336
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 507 times.
507 if (!rvb) {
337 Respond(callback, UploaderResults(100, source->GetPath()));
338 return;
339 }
340 uint64_t size;
341
1/2
✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
507 rvb = source->GetSize(&size);
342
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 507 times.
507 assert(rvb);
343
344 FileBackedBuffer *origin =
345 507 FileBackedBuffer::Create(kInMemoryObjectThreshold,
346
1/2
✓ Branch 2 taken 507 times.
✗ Branch 3 not taken.
507 spooler_definition().temporary_path);
347
348 unsigned char buffer[kPageSize];
349 ssize_t nbytes;
350 do {
351
1/2
✓ Branch 1 taken 199167 times.
✗ Branch 2 not taken.
199167 nbytes = source->Read(buffer, kPageSize);
352
3/4
✓ Branch 0 taken 198831 times.
✓ Branch 1 taken 336 times.
✓ Branch 3 taken 198831 times.
✗ Branch 4 not taken.
199167 if (nbytes > 0) origin->Append(buffer, nbytes);
353
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 199167 times.
199167 if (nbytes < 0) {
354 source->Close();
355 delete origin;
356 Respond(callback, UploaderResults(100, source->GetPath()));
357 return;
358 }
359
2/2
✓ Branch 0 taken 198660 times.
✓ Branch 1 taken 507 times.
199167 } while (nbytes == kPageSize);
360
1/2
✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
507 source->Close();
361
1/2
✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
507 origin->Commit();
362
363 s3fanout::JobInfo *info =
364
2/4
✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 507 times.
✗ Branch 5 not taken.
1014 new s3fanout::JobInfo(repository_alias_ + "/" + remote_path,
365 const_cast<void*>(
366 static_cast<void const*>(callback)),
367
2/4
✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 507 times.
✗ Branch 5 not taken.
507 origin);
368
369
3/6
✓ Branch 2 taken 507 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 507 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 507 times.
507 if (HasPrefix(remote_path, ".cvmfs", false /*ignore_case*/)) {
370 info->request = s3fanout::JobInfo::kReqPutDotCvmfs;
371
3/6
✓ Branch 2 taken 507 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 507 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 507 times.
507 } else if (HasSuffix(remote_path, ".html", false)) {
372 info->request = s3fanout::JobInfo::kReqPutHtml;
373 } else {
374
1/2
✓ Branch 0 taken 507 times.
✗ Branch 1 not taken.
507 if (peek_before_put_)
375 507 info->request = s3fanout::JobInfo::kReqHeadPut;
376 }
377
378 507 RequestCtrl req_ctrl;
379
1/2
✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
507 MakePipe(req_ctrl.pipe_wait);
380 507 req_ctrl.callback_forward = callback;
381
1/2
✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
507 req_ctrl.original_path = source->GetPath();
382 1014 info->callback = const_cast<void*>(static_cast<void const*>(MakeClosure(
383
1/2
✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
507 &S3Uploader::OnReqComplete, this, &req_ctrl)));
384
385
1/2
✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
507 UploadJobInfo(info);
386
1/2
✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
507 req_ctrl.WaitFor();
387
1/2
✓ Branch 2 taken 507 times.
✗ Branch 3 not taken.
507 LogCvmfs(kLogUploadS3, kLogDebug, "Uploading from source finished: %s",
388
1/2
✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
1014 source->GetPath().c_str());
389 507 }
390
391
392 613 void S3Uploader::UploadJobInfo(s3fanout::JobInfo *info) {
393 613 LogCvmfs(kLogUploadS3, kLogDebug,
394 "Uploading:\n"
395 "--> Object: '%s'\n"
396 "--> Bucket: '%s'\n"
397 "--> Host: '%s'\n",
398 info->object_key.c_str(),
399 bucket_.c_str(),
400 host_name_port_.c_str());
401
402 613 s3fanout_mgr_->PushNewJob(info);
403 613 }
404
405
406 101 UploadStreamHandle *S3Uploader::InitStreamedUpload(const CallbackTN *callback) {
407 return new S3StreamHandle(callback, kInMemoryObjectThreshold,
408
1/2
✓ Branch 3 taken 101 times.
✗ Branch 4 not taken.
101 spooler_definition().temporary_path);
409 }
410
411
412 802 void S3Uploader::StreamedUpload(
413 UploadStreamHandle *handle,
414 UploadBuffer buffer,
415 const CallbackTN *callback)
416 {
417 802 S3StreamHandle *s3_handle = static_cast<S3StreamHandle*>(handle);
418
419 802 s3_handle->buffer->Append(buffer.data, buffer.size);
420
1/2
✓ Branch 2 taken 802 times.
✗ Branch 3 not taken.
802 Respond(callback, UploaderResults(UploaderResults::kBufferUpload, 0));
421 802 }
422
423
424 101 void S3Uploader::FinalizeStreamedUpload(
425 UploadStreamHandle *handle,
426 const shash::Any &content_hash)
427 {
428 101 S3StreamHandle *s3_handle = static_cast<S3StreamHandle*>(handle);
429
430 // New file name based on content hash or remote_path override
431 101 std::string final_path;
432
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 101 times.
101 if (s3_handle->remote_path != "") {
433 final_path = repository_alias_ + "/" + s3_handle->remote_path;
434 } else {
435
3/6
✓ Branch 1 taken 101 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 101 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 101 times.
✗ Branch 8 not taken.
101 final_path = repository_alias_ + "/data/" + content_hash.MakePath();
436 }
437
438
1/2
✓ Branch 2 taken 101 times.
✗ Branch 3 not taken.
101 s3_handle->buffer->Commit();
439
440
1/2
✓ Branch 2 taken 101 times.
✗ Branch 3 not taken.
101 size_t bytes_uploaded = s3_handle->buffer->GetSize();
441
442 s3fanout::JobInfo *info =
443 new s3fanout::JobInfo(final_path,
444 const_cast<void*>(
445 static_cast<void const*>(
446 101 handle->commit_callback)),
447
2/4
✓ Branch 2 taken 101 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 101 times.
✗ Branch 6 not taken.
101 s3_handle->buffer.Release());
448
449
1/2
✓ Branch 0 taken 101 times.
✗ Branch 1 not taken.
101 if (peek_before_put_)
450 101 info->request = s3fanout::JobInfo::kReqHeadPut;
451
1/2
✓ Branch 1 taken 101 times.
✗ Branch 2 not taken.
101 UploadJobInfo(info);
452
453 // Remove the temporary file
454
1/2
✓ Branch 0 taken 101 times.
✗ Branch 1 not taken.
101 delete s3_handle;
455
456 // Update statistics counters
457
4/4
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 100 times.
✓ Branch 3 taken 100 times.
✓ Branch 4 taken 1 times.
102 if (!content_hash.HasSuffix() ||
458
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 content_hash.suffix == shash::kSuffixPartial) {
459
1/2
✓ Branch 1 taken 100 times.
✗ Branch 2 not taken.
100 CountUploadedChunks();
460
1/2
✓ Branch 1 taken 100 times.
✗ Branch 2 not taken.
100 CountUploadedBytes(bytes_uploaded);
461
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 } else if (content_hash.suffix == shash::kSuffixCatalog) {
462 CountUploadedCatalogs();
463 CountUploadedCatalogBytes(bytes_uploaded);
464 }
465 101 }
466
467
468 6 s3fanout::JobInfo *S3Uploader::CreateJobInfo(const std::string& path) const {
469
2/4
✓ Branch 2 taken 6 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 6 times.
✗ Branch 6 not taken.
6 FileBackedBuffer *buf = FileBackedBuffer::Create(kInMemoryObjectThreshold);
470
1/2
✓ Branch 2 taken 6 times.
✗ Branch 3 not taken.
6 return new s3fanout::JobInfo(path, NULL, buf);
471 }
472
473
474 1 void S3Uploader::DoRemoveAsync(const std::string& file_to_delete) {
475
2/4
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
1 const std::string mangled_path = repository_alias_ + "/" + file_to_delete;
476
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 s3fanout::JobInfo *info = CreateJobInfo(mangled_path);
477
478 1 info->request = s3fanout::JobInfo::kReqDelete;
479
480
1/2
✓ Branch 3 taken 1 times.
✗ Branch 4 not taken.
1 LogCvmfs(kLogUploadS3, kLogDebug, "Asynchronously removing %s/%s",
481 bucket_.c_str(), info->object_key.c_str());
482
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 s3fanout_mgr_->PushNewJob(info);
483 1 }
484
485
486 512 void S3Uploader::OnReqComplete(
487 const upload::UploaderResults &results,
488 RequestCtrl *ctrl)
489 {
490 512 ctrl->return_code = results.return_code;
491
2/2
✓ Branch 0 taken 507 times.
✓ Branch 1 taken 5 times.
512 if (ctrl->callback_forward != NULL) {
492 // We are already in Respond() so we must not call it again
493
1/2
✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
507 upload::UploaderResults fix_path(results.return_code, ctrl->original_path);
494
1/2
✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
507 (*(ctrl->callback_forward))(fix_path);
495
1/2
✓ Branch 0 taken 507 times.
✗ Branch 1 not taken.
507 delete ctrl->callback_forward;
496 507 ctrl->callback_forward = NULL;
497 507 }
498 512 char c = 'c';
499
1/2
✓ Branch 1 taken 512 times.
✗ Branch 2 not taken.
512 WritePipe(ctrl->pipe_wait[1], &c, 1);
500 512 }
501
502
503 5 bool S3Uploader::Peek(const std::string& path) {
504
2/4
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 5 times.
✗ Branch 5 not taken.
5 const std::string mangled_path = repository_alias_ + "/" + path;
505
1/2
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
5 s3fanout::JobInfo *info = CreateJobInfo(mangled_path);
506
507 5 RequestCtrl req_ctrl;
508
1/2
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
5 MakePipe(req_ctrl.pipe_wait);
509 5 info->request = s3fanout::JobInfo::kReqHeadOnly;
510 10 info->callback = const_cast<void*>(static_cast<void const*>(MakeClosure(
511
1/2
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
5 &S3Uploader::OnReqComplete, this, &req_ctrl)));
512
513
1/2
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
5 IncJobsInFlight();
514
1/2
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
5 UploadJobInfo(info);
515
1/2
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
5 req_ctrl.WaitFor();
516
517 5 return req_ctrl.return_code == 0;
518 5 }
519
520
521 // noop: no mkdir needed in S3 storage
522 bool S3Uploader::Mkdir(const std::string &path) {
523 return true;
524 }
525
526
527 bool S3Uploader::PlaceBootstrappingShortcut(const shash::Any &object) {
528 return false; // TODO(rmeusel): implement
529 }
530
531
532 int64_t S3Uploader::DoGetObjectSize(const std::string &file_name) {
533 // TODO(dosarudaniel): use a head request for byte count
534 // Re-enable 661 integration test when done
535 return -EOPNOTSUPP;
536 }
537
538 } // namespace upload
539