GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/upload_s3.cc
Date: 2026-03-15 02:35:27
Exec Total Coverage
Lines: 236 340 69.4%
Branches: 209 516 40.5%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "upload_s3.h"
6
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <inttypes.h>
10 #include <unistd.h>
11
12 #include <set>
13 #include <string>
14 #include <vector>
15
16 #include "compression/compression.h"
17 #include "network/s3fanout.h"
18 #include "options.h"
19 #include "util/exception.h"
20 #include "util/logging.h"
21 #include "util/mutex.h"
22 #include "util/posix.h"
23 #include "util/string.h"
24
25 namespace upload {
26
27 /*
28 * Allowed values of x-amz-acl according to S3 API
29 */
30 static const char *x_amz_acl_allowed_values_[8] = {"private",
31 "public-read",
32 "public-write",
33 "authenticated-read",
34 "aws-exec-read",
35 "bucket-owner-read",
36 "bucket-owner-full-control",
37 ""};
38
39 20992 void S3Uploader::RequestCtrl::WaitFor() {
40 char c;
41
1/2
✓ Branch 1 taken 20992 times.
✗ Branch 2 not taken.
20992 ReadPipe(pipe_wait[0], &c, 1);
42
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20992 times.
20992 assert(c == 'c');
43
1/2
✓ Branch 1 taken 20992 times.
✗ Branch 2 not taken.
20992 ClosePipe(pipe_wait);
44 20992 }
45
46
47 492 S3Uploader::S3Uploader(const SpoolerDefinition &spooler_definition)
48 : AbstractUploader(spooler_definition)
49 492 , dns_buckets_(true)
50 492 , num_parallel_uploads_(kDefaultNumParallelUploads)
51 492 , num_retries_(kDefaultNumRetries)
52 492 , timeout_sec_(kDefaultTimeoutSec)
53 492 , authz_method_(s3fanout::kAuthzAwsV2)
54 492 , peek_before_put_(true)
55 492 , use_https_(false)
56 492 , batch_delete_enabled_(true)
57
1/2
✓ Branch 2 taken 492 times.
✗ Branch 3 not taken.
492 , proxy_("")
58
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 , temporary_path_(spooler_definition.temporary_path)
59
2/4
✓ Branch 2 taken 492 times.
✗ Branch 3 not taken.
✓ Branch 14 taken 492 times.
✗ Branch 15 not taken.
984 , x_amz_acl_("public-read") {
60
2/4
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 492 times.
✗ Branch 4 not taken.
492 assert(spooler_definition.IsValid()
61 && spooler_definition.driver_type == SpoolerDefinition::S3);
62
63 492 atomic_init32(&io_errors_);
64 492 const int mutex_ret = pthread_mutex_init(&delete_batch_mutex_, NULL);
65
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 492 times.
492 assert(mutex_ret == 0);
66
67
2/4
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 492 times.
492 if (!ParseSpoolerDefinition(spooler_definition)) {
68 PANIC(kLogStderr, "Error in parsing the spooler definition");
69 }
70
71 // Disable batch delete for Azure (not supported)
72
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 492 times.
492 if (authz_method_ == s3fanout::kAuthzAzure)
73 batch_delete_enabled_ = false;
74
75
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 s3fanout::S3FanoutManager::S3Config s3config;
76
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 s3config.access_key = access_key_;
77
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 s3config.secret_key = secret_key_;
78
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 s3config.hostname_port = host_name_port_;
79 492 s3config.authz_method = authz_method_;
80
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 s3config.region = region_;
81
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 s3config.flavor = flavor_;
82
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 s3config.bucket = bucket_;
83 492 s3config.dns_buckets = dns_buckets_;
84 492 s3config.pool_max_handles = num_parallel_uploads_;
85 492 s3config.opt_timeout_sec = timeout_sec_;
86 492 s3config.opt_max_retries = num_retries_;
87 492 s3config.opt_backoff_init_ms = kDefaultBackoffInitMs;
88 492 s3config.opt_backoff_max_ms = kDefaultBackoffMaxMs;
89
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 s3config.x_amz_acl = x_amz_acl_;
90
91
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 492 times.
492 if (use_https_) {
92 s3config.protocol = "https";
93 } else {
94
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 s3config.protocol = "http";
95 }
96
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 s3config.proxy = proxy_;
97
98
3/6
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 492 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 492 times.
✗ Branch 8 not taken.
492 s3fanout_mgr_ = new s3fanout::S3FanoutManager(s3config);
99
1/2
✓ Branch 2 taken 492 times.
✗ Branch 3 not taken.
492 s3fanout_mgr_->Spawn();
100
101 492 const int retval = pthread_create(&thread_collect_results_, NULL,
102 MainCollectResults, this);
103
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 492 times.
492 assert(retval == 0);
104 492 }
105
106
107 2952 S3Uploader::~S3Uploader() {
108 // Signal termination to our own worker thread
109 984 s3fanout_mgr_->PushCompletedJob(NULL);
110 984 pthread_join(thread_collect_results_, NULL);
111 984 pthread_mutex_destroy(&delete_batch_mutex_);
112 1968 }
113
114
115 492 bool S3Uploader::ParseSpoolerDefinition(
116 const SpoolerDefinition &spooler_definition) {
117 const std::vector<std::string> config = SplitString(
118
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 spooler_definition.spooler_configuration, '@');
119
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 492 times.
492 if (config.size() != 2) {
120 LogCvmfs(kLogUploadS3, kLogStderr,
121 "Failed to parse spooler configuration string '%s'.\n"
122 "Provide: <repo_alias>@/path/to/s3.conf",
123 spooler_definition.spooler_configuration.c_str());
124 return false;
125 }
126
1/2
✓ Branch 2 taken 492 times.
✗ Branch 3 not taken.
492 repository_alias_ = config[0];
127 492 const std::string &config_path = config[1];
128
129
2/4
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 492 times.
492 if (!FileExists(config_path)) {
130 LogCvmfs(kLogUploadS3, kLogStderr, "Cannot find S3 config file at '%s'",
131 config_path.c_str());
132 return false;
133 }
134
135 // Parse S3 configuration
136 BashOptionsManager options_manager = BashOptionsManager(
137
4/8
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 492 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 492 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 492 times.
✗ Branch 11 not taken.
492 new DefaultOptionsTemplateManager(repository_alias_));
138
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 options_manager.ParsePath(config_path, false);
139 492 std::string parameter;
140
141
3/6
✓ Branch 2 taken 492 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 492 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 492 times.
492 if (!options_manager.GetValue("CVMFS_S3_HOST", &host_name_)) {
142 LogCvmfs(kLogUploadS3, kLogStderr,
143 "Failed to parse CVMFS_S3_HOST from '%s'", config_path.c_str());
144 return false;
145 }
146
3/6
✓ Branch 2 taken 492 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 492 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 492 times.
492 if (!options_manager.GetValue("CVMFS_S3_ACCESS_KEY", &access_key_)) {
147 LogCvmfs(kLogUploadS3, kLogStderr,
148 "Failed to parse CVMFS_S3_ACCESS_KEY from '%s'.",
149 config_path.c_str());
150 return false;
151 }
152
3/6
✓ Branch 2 taken 492 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 492 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 492 times.
492 if (!options_manager.GetValue("CVMFS_S3_SECRET_KEY", &secret_key_)) {
153 LogCvmfs(kLogUploadS3, kLogStderr,
154 "Failed to parse CVMFS_S3_SECRET_KEY from '%s'.",
155 config_path.c_str());
156 return false;
157 }
158
3/6
✓ Branch 2 taken 492 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 492 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 492 times.
492 if (!options_manager.GetValue("CVMFS_S3_BUCKET", &bucket_)) {
159 LogCvmfs(kLogUploadS3, kLogStderr,
160 "Failed to parse CVMFS_S3_BUCKET from '%s'.", config_path.c_str());
161 return false;
162 }
163
3/6
✓ Branch 2 taken 492 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 492 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 492 times.
✗ Branch 10 not taken.
492 if (options_manager.GetValue("CVMFS_S3_DNS_BUCKETS", &parameter)) {
164
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 if (parameter == "false") {
165 492 dns_buckets_ = false;
166 }
167 }
168
3/6
✓ Branch 2 taken 492 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 492 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 492 times.
✗ Branch 10 not taken.
492 if (options_manager.GetValue("CVMFS_S3_MAX_NUMBER_OF_PARALLEL_CONNECTIONS",
169 &parameter)) {
170
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 num_parallel_uploads_ = String2Uint64(parameter);
171 }
172
3/6
✓ Branch 2 taken 492 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 492 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 492 times.
492 if (options_manager.GetValue("CVMFS_S3_MAX_RETRIES", &parameter)) {
173 num_retries_ = String2Uint64(parameter);
174 }
175
3/6
✓ Branch 2 taken 492 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 492 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 492 times.
492 if (options_manager.GetValue("CVMFS_S3_TIMEOUT", &parameter)) {
176 timeout_sec_ = String2Uint64(parameter);
177 }
178
3/6
✓ Branch 2 taken 492 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 492 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 492 times.
492 if (options_manager.GetValue("CVMFS_S3_REGION", &region_)) {
179 authz_method_ = s3fanout::kAuthzAwsV4;
180 }
181
3/6
✓ Branch 2 taken 492 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 492 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 492 times.
492 if (options_manager.GetValue("CVMFS_S3_FLAVOR", &flavor_)) {
182 if (flavor_ == "azure") {
183 authz_method_ = s3fanout::kAuthzAzure;
184 } else if (flavor_ == "awsv2") {
185 authz_method_ = s3fanout::kAuthzAwsV2;
186 } else if (flavor_ == "awsv4") {
187 authz_method_ = s3fanout::kAuthzAwsV4;
188 } else {
189 LogCvmfs(kLogUploadS3, kLogStderr,
190 "Failed to parse CVMFS_S3_FLAVOR from '%s', "
191 "valid options are azure, awsv2 or awsv4",
192 config_path.c_str());
193 return false;
194 }
195 }
196
3/6
✓ Branch 2 taken 492 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 492 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 492 times.
492 if (options_manager.GetValue("CVMFS_S3_PEEK_BEFORE_PUT", &parameter)) {
197 peek_before_put_ = options_manager.IsOn(parameter);
198 }
199
3/6
✓ Branch 2 taken 492 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 492 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 492 times.
492 if (options_manager.GetValue("CVMFS_S3_X_AMZ_ACL", &parameter)) {
200 bool isAllowed = false;
201 size_t const len = sizeof(x_amz_acl_allowed_values_)
202 / sizeof(x_amz_acl_allowed_values_[0]);
203 for (size_t i = 0; i < len; i++) {
204 if (x_amz_acl_allowed_values_[i] == parameter) {
205 isAllowed = true;
206 break;
207 }
208 }
209 if (!isAllowed) {
210 LogCvmfs(kLogUploadS3, kLogStderr,
211 "%s is not an allowed value for CVMFS_S3_X_AMZ_ACL",
212 parameter.c_str());
213 return false;
214 }
215 x_amz_acl_ = parameter;
216 }
217
218
3/6
✓ Branch 2 taken 492 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 492 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 492 times.
492 if (options_manager.GetValue("CVMFS_S3_BATCH_DELETE", &parameter)) {
219 batch_delete_enabled_ = options_manager.IsOn(parameter);
220 }
221
222
3/6
✓ Branch 2 taken 492 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 492 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 492 times.
492 if (options_manager.GetValue("CVMFS_S3_USE_HTTPS", &parameter)) {
223 use_https_ = options_manager.IsOn(parameter);
224 }
225
226
3/6
✓ Branch 2 taken 492 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 492 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 492 times.
✗ Branch 10 not taken.
492 if (options_manager.GetValue("CVMFS_S3_PORT", &parameter)) {
227
2/4
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 492 times.
✗ Branch 5 not taken.
492 host_name_port_ = host_name_ + ":" + parameter;
228 } else {
229 host_name_port_ = host_name_;
230 }
231
232
3/6
✓ Branch 2 taken 492 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 492 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 492 times.
492 if (options_manager.IsDefined("CVMFS_S3_PROXY")) {
233 options_manager.GetValue("CVMFS_S3_PROXY", &proxy_);
234 }
235
236 492 return true;
237 492 }
238
239
240 614 bool S3Uploader::WillHandle(const SpoolerDefinition &spooler_definition) {
241 614 return spooler_definition.driver_type == SpoolerDefinition::S3;
242 }
243
244
245 bool S3Uploader::Create() {
246 if (!dns_buckets_)
247 return false;
248
249 s3fanout::JobInfo *info = CreateJobInfo("");
250 info->request = s3fanout::JobInfo::kReqPutBucket;
251 std::string request_content;
252 if (!region_.empty()) {
253 request_content = std::string("<CreateBucketConfiguration xmlns="
254 "\"http://s3.amazonaws.com/doc/2006-03-01/\">"
255 "<LocationConstraint>")
256 + region_
257 + "</LocationConstraint>"
258 "</CreateBucketConfiguration>";
259 info->origin->Append(request_content.data(), request_content.length());
260 info->origin->Commit();
261 }
262
263 RequestCtrl req_ctrl;
264 MakePipe(req_ctrl.pipe_wait);
265 info->callback = const_cast<void *>(static_cast<void const *>(
266 MakeClosure(&S3Uploader::OnReqComplete, this, &req_ctrl)));
267
268 IncJobsInFlight();
269 UploadJobInfo(info);
270 req_ctrl.WaitFor();
271
272 return req_ctrl.return_code == 0;
273 }
274
275
276 41 unsigned int S3Uploader::GetNumberOfErrors() const {
277 41 return atomic_read32(&io_errors_);
278 }
279
280
281 /**
282 * Worker thread takes care of requesting new jobs and cleaning old ones.
283 */
284 492 void *S3Uploader::MainCollectResults(void *data) {
285 492 LogCvmfs(kLogUploadS3, kLogDebug, "Upload_S3 WorkerThread started.");
286 492 S3Uploader *uploader = reinterpret_cast<S3Uploader *>(data);
287
288 while (true) {
289 25666 s3fanout::JobInfo *info = uploader->s3fanout_mgr_->PopCompletedJob();
290
2/2
✓ Branch 0 taken 492 times.
✓ Branch 1 taken 25174 times.
25666 if (!info)
291 492 break;
292 // Report completed job
293 25174 int reply_code = 0;
294
2/2
✓ Branch 0 taken 82 times.
✓ Branch 1 taken 25092 times.
25174 if (info->error_code != s3fanout::kFailOk) {
295
1/2
✓ Branch 0 taken 82 times.
✗ Branch 1 not taken.
82 if ((info->request != s3fanout::JobInfo::kReqHeadOnly)
296
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 82 times.
82 || (info->error_code != s3fanout::kFailNotFound)) {
297 if (info->request == s3fanout::JobInfo::kReqDeleteMulti) {
298 LogCvmfs(kLogUploadS3, kLogStderr,
299 "Batch delete of %lu objects failed. (error code: %d - %s)",
300 info->multi_delete_keys.size(), info->error_code,
301 s3fanout::Code2Ascii(info->error_code));
302 } else {
303 LogCvmfs(kLogUploadS3, kLogStderr,
304 "Upload job for '%s' failed. (error code: %d - %s)",
305 info->object_key.c_str(), info->error_code,
306 s3fanout::Code2Ascii(info->error_code));
307 }
308 reply_code = 99;
309 atomic_inc32(&uploader->io_errors_);
310 }
311 }
312
2/2
✓ Branch 0 taken 41 times.
✓ Branch 1 taken 25133 times.
25174 if (info->request == s3fanout::JobInfo::kReqDeleteMulti) {
313 // Parse response for per-key errors
314 41 std::set<std::string> failed_keys;
315 82 if (info->error_code == s3fanout::kFailOk
316
3/6
✓ Branch 0 taken 41 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 41 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 41 times.
41 && !info->response_body.empty()) {
317 std::vector<std::string> error_keys, error_codes, error_messages;
318 const unsigned num_errors = s3fanout::ParseDeleteMultiResponse(
319 info->response_body, &error_keys, &error_codes, &error_messages);
320 for (unsigned i = 0; i < num_errors; ++i) {
321 LogCvmfs(kLogUploadS3, kLogStderr,
322 "S3 multi-delete error for key '%s': %s - %s",
323 error_keys[i].c_str(), error_codes[i].c_str(),
324 error_messages[i].c_str());
325 atomic_inc32(&uploader->io_errors_);
326 failed_keys.insert(error_keys[i]);
327 }
328 }
329 // Decrement jobs_in_flight_ once per key in the batch.
330 // Report per-key error code so callers can distinguish failures.
331 41 const unsigned batch_size = info->multi_delete_keys.size();
332
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41 times.
41 const int batch_error = (info->error_code != s3fanout::kFailOk) ? 99 : 0;
333
2/2
✓ Branch 0 taken 41 times.
✓ Branch 1 taken 41 times.
82 for (unsigned i = 0; i < batch_size; ++i) {
334 const int key_error = batch_error
335
1/2
✓ Branch 0 taken 41 times.
✗ Branch 1 not taken.
82 ? batch_error
336
2/4
✓ Branch 2 taken 41 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 41 times.
41 : (failed_keys.count(info->multi_delete_keys[i]) ? 99 : 0);
337
2/4
✓ Branch 1 taken 41 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 41 times.
✗ Branch 5 not taken.
41 uploader->Respond(NULL, UploaderResults(UploaderResults::kRemove,
338 key_error));
339 }
340
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 25133 times.
25174 } else if (info->request == s3fanout::JobInfo::kReqDelete) {
341 uploader->Respond(NULL, UploaderResults());
342
2/2
✓ Branch 0 taken 205 times.
✓ Branch 1 taken 24928 times.
25133 } else if (info->request == s3fanout::JobInfo::kReqHeadOnly) {
343
2/2
✓ Branch 0 taken 82 times.
✓ Branch 1 taken 123 times.
205 if (info->error_code == s3fanout::kFailNotFound)
344 82 reply_code = 1;
345
1/2
✓ Branch 1 taken 205 times.
✗ Branch 2 not taken.
205 uploader->Respond(static_cast<CallbackTN *>(info->callback),
346 410 UploaderResults(UploaderResults::kLookup, reply_code));
347 } else {
348
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 24928 times.
24928 if (info->request == s3fanout::JobInfo::kReqHeadPut) {
349 // The HEAD request was not transformed into a PUT request, thus this
350 // was a duplicate
351 // Uploaded catalogs are always unique ->
352 // assume this was a regular file and decrease appropriate counters
353 uploader->CountDuplicates();
354 uploader->DecUploadedChunks();
355 uploader->CountUploadedBytes(-(info->payload_size));
356 }
357 49856 uploader->Respond(
358
1/2
✓ Branch 1 taken 24928 times.
✗ Branch 2 not taken.
24928 static_cast<CallbackTN *>(info->callback),
359 49856 UploaderResults(UploaderResults::kChunkCommit, reply_code));
360
361
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 24928 times.
24928 assert(!info->origin.IsValid());
362 }
363
1/2
✓ Branch 0 taken 25174 times.
✗ Branch 1 not taken.
25174 delete info;
364 25174 }
365
366 492 LogCvmfs(kLogUploadS3, kLogDebug, "Upload_S3 WorkerThread finished.");
367 492 return NULL;
368 }
369
370
371 20787 void S3Uploader::DoUpload(const std::string &remote_path,
372 IngestionSource *source,
373 const CallbackTN *callback) {
374
1/2
✓ Branch 1 taken 20787 times.
✗ Branch 2 not taken.
20787 bool rvb = source->Open();
375
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20787 times.
20787 if (!rvb) {
376 Respond(callback, UploaderResults(100, source->GetPath()));
377 return;
378 }
379 uint64_t size;
380
1/2
✓ Branch 1 taken 20787 times.
✗ Branch 2 not taken.
20787 rvb = source->GetSize(&size);
381
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20787 times.
20787 assert(rvb);
382
383 20787 FileBackedBuffer *origin = FileBackedBuffer::Create(
384
1/2
✓ Branch 2 taken 20787 times.
✗ Branch 3 not taken.
20787 kInMemoryObjectThreshold, spooler_definition().temporary_path);
385
386 unsigned char buffer[kPageSize];
387 ssize_t nbytes;
388 do {
389
1/2
✓ Branch 1 taken 8165847 times.
✗ Branch 2 not taken.
8165847 nbytes = source->Read(buffer, kPageSize);
390
2/2
✓ Branch 0 taken 8152071 times.
✓ Branch 1 taken 13776 times.
8165847 if (nbytes > 0)
391
1/2
✓ Branch 1 taken 8152071 times.
✗ Branch 2 not taken.
8152071 origin->Append(buffer, nbytes);
392
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8165847 times.
8165847 if (nbytes < 0) {
393 source->Close();
394 delete origin;
395 Respond(callback, UploaderResults(100, source->GetPath()));
396 return;
397 }
398
2/2
✓ Branch 0 taken 8145060 times.
✓ Branch 1 taken 20787 times.
8165847 } while (nbytes == kPageSize);
399
1/2
✓ Branch 1 taken 20787 times.
✗ Branch 2 not taken.
20787 source->Close();
400
1/2
✓ Branch 1 taken 20787 times.
✗ Branch 2 not taken.
20787 origin->Commit();
401
402 s3fanout::JobInfo *info = new s3fanout::JobInfo(
403
2/4
✓ Branch 1 taken 20787 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 20787 times.
✗ Branch 5 not taken.
41574 repository_alias_ + "/" + remote_path,
404 const_cast<void *>(static_cast<void const *>(callback)),
405
2/4
✓ Branch 1 taken 20787 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 20787 times.
✗ Branch 5 not taken.
20787 origin);
406
407
3/6
✓ Branch 2 taken 20787 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 20787 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 20787 times.
20787 if (HasPrefix(remote_path, ".cvmfs", false /*ignore_case*/)) {
408 info->request = s3fanout::JobInfo::kReqPutDotCvmfs;
409
3/6
✓ Branch 2 taken 20787 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 20787 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 20787 times.
20787 } else if (HasSuffix(remote_path, ".html", false)) {
410 info->request = s3fanout::JobInfo::kReqPutHtml;
411 } else {
412
1/2
✓ Branch 0 taken 20787 times.
✗ Branch 1 not taken.
20787 if (peek_before_put_)
413 20787 info->request = s3fanout::JobInfo::kReqHeadPut;
414 }
415
416 20787 RequestCtrl req_ctrl;
417
1/2
✓ Branch 1 taken 20787 times.
✗ Branch 2 not taken.
20787 MakePipe(req_ctrl.pipe_wait);
418 20787 req_ctrl.callback_forward = callback;
419
1/2
✓ Branch 1 taken 20787 times.
✗ Branch 2 not taken.
20787 req_ctrl.original_path = source->GetPath();
420 20787 info->callback = const_cast<void *>(static_cast<void const *>(
421
1/2
✓ Branch 1 taken 20787 times.
✗ Branch 2 not taken.
20787 MakeClosure(&S3Uploader::OnReqComplete, this, &req_ctrl)));
422
423
1/2
✓ Branch 1 taken 20787 times.
✗ Branch 2 not taken.
20787 UploadJobInfo(info);
424
1/2
✓ Branch 1 taken 20787 times.
✗ Branch 2 not taken.
20787 req_ctrl.WaitFor();
425
1/2
✓ Branch 2 taken 20787 times.
✗ Branch 3 not taken.
20787 LogCvmfs(kLogUploadS3, kLogDebug, "Uploading from source finished: %s",
426
1/2
✓ Branch 1 taken 20787 times.
✗ Branch 2 not taken.
41574 source->GetPath().c_str());
427 20787 }
428
429
430 25133 void S3Uploader::UploadJobInfo(s3fanout::JobInfo *info) {
431 25133 LogCvmfs(kLogUploadS3, kLogDebug,
432 "Uploading:\n"
433 "--> Object: '%s'\n"
434 "--> Bucket: '%s'\n"
435 "--> Host: '%s'\n",
436 info->object_key.c_str(), bucket_.c_str(), host_name_port_.c_str());
437
438 25133 s3fanout_mgr_->PushNewJob(info);
439 25133 }
440
441
442 4141 UploadStreamHandle *S3Uploader::InitStreamedUpload(const CallbackTN *callback) {
443 return new S3StreamHandle(callback, kInMemoryObjectThreshold,
444
1/2
✓ Branch 3 taken 4141 times.
✗ Branch 4 not taken.
4141 spooler_definition().temporary_path);
445 }
446
447
448 32882 void S3Uploader::StreamedUpload(UploadStreamHandle *handle,
449 UploadBuffer buffer,
450 const CallbackTN *callback) {
451 32882 S3StreamHandle *s3_handle = static_cast<S3StreamHandle *>(handle);
452
453 32882 s3_handle->buffer->Append(buffer.data, buffer.size);
454
1/2
✓ Branch 2 taken 32882 times.
✗ Branch 3 not taken.
32882 Respond(callback, UploaderResults(UploaderResults::kBufferUpload, 0));
455 32882 }
456
457
458 4141 void S3Uploader::FinalizeStreamedUpload(UploadStreamHandle *handle,
459 const shash::Any &content_hash) {
460 4141 S3StreamHandle *s3_handle = static_cast<S3StreamHandle *>(handle);
461
462 // New file name based on content hash or remote_path override
463 4141 std::string final_path;
464
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4141 times.
4141 if (s3_handle->remote_path != "") {
465 final_path = repository_alias_ + "/" + s3_handle->remote_path;
466 } else {
467
3/6
✓ Branch 1 taken 4141 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 4141 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 4141 times.
✗ Branch 8 not taken.
4141 final_path = repository_alias_ + "/data/" + content_hash.MakePath();
468 }
469
470
1/2
✓ Branch 2 taken 4141 times.
✗ Branch 3 not taken.
4141 s3_handle->buffer->Commit();
471
472
1/2
✓ Branch 2 taken 4141 times.
✗ Branch 3 not taken.
4141 const size_t bytes_uploaded = s3_handle->buffer->GetSize();
473
474 s3fanout::JobInfo *info = new s3fanout::JobInfo(
475 final_path,
476 4141 const_cast<void *>(static_cast<void const *>(handle->commit_callback)),
477
2/4
✓ Branch 2 taken 4141 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4141 times.
✗ Branch 6 not taken.
4141 s3_handle->buffer.Release());
478
479
1/2
✓ Branch 0 taken 4141 times.
✗ Branch 1 not taken.
4141 if (peek_before_put_)
480 4141 info->request = s3fanout::JobInfo::kReqHeadPut;
481
1/2
✓ Branch 1 taken 4141 times.
✗ Branch 2 not taken.
4141 UploadJobInfo(info);
482
483 // Remove the temporary file
484
1/2
✓ Branch 0 taken 4141 times.
✗ Branch 1 not taken.
4141 delete s3_handle;
485
486 // Update statistics counters
487 4141 if (!content_hash.HasSuffix()
488
5/6
✓ Branch 0 taken 41 times.
✓ Branch 1 taken 4100 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 41 times.
✓ Branch 4 taken 4100 times.
✓ Branch 5 taken 41 times.
4141 || content_hash.suffix == shash::kSuffixPartial) {
489
1/2
✓ Branch 1 taken 4100 times.
✗ Branch 2 not taken.
4100 CountUploadedChunks();
490
1/2
✓ Branch 1 taken 4100 times.
✗ Branch 2 not taken.
4100 CountUploadedBytes(bytes_uploaded);
491
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41 times.
41 } else if (content_hash.suffix == shash::kSuffixCatalog) {
492 CountUploadedCatalogs();
493 CountUploadedCatalogBytes(bytes_uploaded);
494 }
495 4141 }
496
497
498 205 s3fanout::JobInfo *S3Uploader::CreateJobInfo(const std::string &path) const {
499
2/4
✓ Branch 2 taken 205 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 205 times.
✗ Branch 6 not taken.
205 FileBackedBuffer *buf = FileBackedBuffer::Create(kInMemoryObjectThreshold);
500
1/2
✓ Branch 2 taken 205 times.
✗ Branch 3 not taken.
205 return new s3fanout::JobInfo(path, NULL, buf);
501 }
502
503
504 41 void S3Uploader::DoRemoveAsync(const std::string &file_to_delete) {
505
2/4
✓ Branch 1 taken 41 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 41 times.
✗ Branch 5 not taken.
41 const std::string mangled_path = repository_alias_ + "/" + file_to_delete;
506
507
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41 times.
41 if (!batch_delete_enabled_) {
508 s3fanout::JobInfo *info = CreateJobInfo(mangled_path);
509 info->request = s3fanout::JobInfo::kReqDelete;
510 LogCvmfs(kLogUploadS3, kLogDebug, "Asynchronously removing %s/%s",
511 bucket_.c_str(), info->object_key.c_str());
512 s3fanout_mgr_->PushNewJob(info);
513 return;
514 }
515
516 // Batch delete: collect keys and flush when batch is full
517 41 const MutexLockGuard guard(delete_batch_mutex_);
518
1/2
✓ Branch 1 taken 41 times.
✗ Branch 2 not taken.
41 pending_deletes_.push_back(mangled_path);
519
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 41 times.
41 if (pending_deletes_.size() >= kMaxBatchDeleteSize) {
520 FlushDeleteBatch();
521 }
522
1/2
✓ Branch 2 taken 41 times.
✗ Branch 3 not taken.
41 }
523
524
525 492 void S3Uploader::FlushDeleteBatch() const {
526 // Caller must hold delete_batch_mutex_
527
2/2
✓ Branch 1 taken 451 times.
✓ Branch 2 taken 41 times.
492 if (pending_deletes_.empty())
528 451 return;
529
530
1/2
✓ Branch 2 taken 41 times.
✗ Branch 3 not taken.
41 LogCvmfs(kLogUploadS3, kLogDebug,
531 "Flushing batch delete of %lu objects",
532 pending_deletes_.size());
533
534 // Build XML request body
535
1/2
✓ Branch 1 taken 41 times.
✗ Branch 2 not taken.
41 const std::string xml = s3fanout::ComposeDeleteMultiXml(pending_deletes_);
536
2/4
✓ Branch 2 taken 41 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 41 times.
✗ Branch 6 not taken.
41 FileBackedBuffer *buf = FileBackedBuffer::Create(kInMemoryObjectThreshold);
537
1/2
✓ Branch 3 taken 41 times.
✗ Branch 4 not taken.
41 buf->Append(xml.data(), xml.length());
538
1/2
✓ Branch 1 taken 41 times.
✗ Branch 2 not taken.
41 buf->Commit();
539
540 // The object_key for multi-delete is empty (URL is bucket root + ?delete)
541
3/6
✓ Branch 2 taken 41 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 41 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 41 times.
✗ Branch 9 not taken.
41 s3fanout::JobInfo *info = new s3fanout::JobInfo("", NULL, buf);
542 41 info->request = s3fanout::JobInfo::kReqDeleteMulti;
543 41 info->multi_delete_keys.swap(pending_deletes_);
544
545 // Each key was already counted in jobs_in_flight_ by DoRemoveAsync().
546 // MainCollectResults will call Respond() once per key to balance them.
547
1/2
✓ Branch 2 taken 41 times.
✗ Branch 3 not taken.
41 s3fanout_mgr_->PushNewJob(info);
548 41 }
549
550
551 492 void S3Uploader::WaitForUpload() const {
552 {
553 492 const MutexLockGuard guard(delete_batch_mutex_);
554
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 FlushDeleteBatch();
555 492 }
556 492 AbstractUploader::WaitForUpload();
557 492 }
558
559
560 20992 void S3Uploader::OnReqComplete(const upload::UploaderResults &results,
561 RequestCtrl *ctrl) {
562 20992 ctrl->return_code = results.return_code;
563
2/2
✓ Branch 0 taken 20787 times.
✓ Branch 1 taken 205 times.
20992 if (ctrl->callback_forward != NULL) {
564 // We are already in Respond() so we must not call it again
565 20787 const upload::UploaderResults fix_path(results.return_code,
566
1/2
✓ Branch 1 taken 20787 times.
✗ Branch 2 not taken.
20787 ctrl->original_path);
567
1/2
✓ Branch 1 taken 20787 times.
✗ Branch 2 not taken.
20787 (*(ctrl->callback_forward))(fix_path);
568
1/2
✓ Branch 0 taken 20787 times.
✗ Branch 1 not taken.
20787 delete ctrl->callback_forward;
569 20787 ctrl->callback_forward = NULL;
570 20787 }
571 20992 char c = 'c';
572
1/2
✓ Branch 1 taken 20992 times.
✗ Branch 2 not taken.
20992 WritePipe(ctrl->pipe_wait[1], &c, 1);
573 20992 }
574
575
576 205 bool S3Uploader::Peek(const std::string &path) {
577
2/4
✓ Branch 1 taken 205 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 205 times.
✗ Branch 5 not taken.
205 const std::string mangled_path = repository_alias_ + "/" + path;
578
1/2
✓ Branch 1 taken 205 times.
✗ Branch 2 not taken.
205 s3fanout::JobInfo *info = CreateJobInfo(mangled_path);
579
580 205 RequestCtrl req_ctrl;
581
1/2
✓ Branch 1 taken 205 times.
✗ Branch 2 not taken.
205 MakePipe(req_ctrl.pipe_wait);
582 205 info->request = s3fanout::JobInfo::kReqHeadOnly;
583 205 info->callback = const_cast<void *>(static_cast<void const *>(
584
1/2
✓ Branch 1 taken 205 times.
✗ Branch 2 not taken.
205 MakeClosure(&S3Uploader::OnReqComplete, this, &req_ctrl)));
585
586
1/2
✓ Branch 1 taken 205 times.
✗ Branch 2 not taken.
205 IncJobsInFlight();
587
1/2
✓ Branch 1 taken 205 times.
✗ Branch 2 not taken.
205 UploadJobInfo(info);
588
1/2
✓ Branch 1 taken 205 times.
✗ Branch 2 not taken.
205 req_ctrl.WaitFor();
589
590 205 return req_ctrl.return_code == 0;
591 205 }
592
593
594 // noop: no mkdir needed in S3 storage
595 bool S3Uploader::Mkdir(const std::string &path) { return true; }
596
597
598 bool S3Uploader::PlaceBootstrappingShortcut(const shash::Any &object) {
599 return false; // TODO(rmeusel): implement
600 }
601
602
603 int64_t S3Uploader::DoGetObjectSize(const std::string &file_name) {
604 // TODO(dosarudaniel): use a head request for byte count
605 // Re-enable 661 integration test when done
606 return -EOPNOTSUPP;
607 }
608
609 } // namespace upload
610