GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/upload_s3.cc
Date: 2026-06-14 02:36:34
Exec Total Coverage
Lines: 237 348 68.1%
Branches: 211 528 40.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "upload_s3.h"
6
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <inttypes.h>
10 #include <unistd.h>
11
12 #include <set>
13 #include <string>
14 #include <vector>
15
16 #include "compression/compression.h"
17 #include "network/s3fanout.h"
18 #include "options.h"
19 #include "util/exception.h"
20 #include "util/logging.h"
21 #include "util/mutex.h"
22 #include "util/posix.h"
23 #include "util/string.h"
24
25 namespace upload {
26
27 /*
28 * Allowed values of x-amz-acl according to S3 API
29 */
30 static const char *x_amz_acl_allowed_values_[8] = {"private",
31 "public-read",
32 "public-write",
33 "authenticated-read",
34 "aws-exec-read",
35 "bucket-owner-read",
36 "bucket-owner-full-control",
37 ""};
38
39 512 void S3Uploader::RequestCtrl::WaitFor() {
40 char c;
41
1/2
✓ Branch 1 taken 512 times.
✗ Branch 2 not taken.
512 ReadPipe(pipe_wait[0], &c, 1);
42
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 512 times.
512 assert(c == 'c');
43
1/2
✓ Branch 1 taken 512 times.
✗ Branch 2 not taken.
512 ClosePipe(pipe_wait);
44 512 }
45
46
47 13 S3Uploader::S3Uploader(const SpoolerDefinition &spooler_definition)
48 : AbstractUploader(spooler_definition)
49 13 , dns_buckets_(true)
50 13 , num_parallel_uploads_(kDefaultNumParallelUploads)
51 13 , num_retries_(kDefaultNumRetries)
52 13 , timeout_sec_(kDefaultTimeoutSec)
53 13 , authz_method_(s3fanout::kAuthzAwsV2)
54 13 , peek_before_put_(true)
55 13 , use_https_(false)
56 13 , batch_delete_enabled_(true)
57 13 , batch_delete_size_(kDefaultBatchDeleteSize)
58
1/2
✓ Branch 2 taken 13 times.
✗ Branch 3 not taken.
13 , proxy_("")
59
1/2
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
13 , temporary_path_(spooler_definition.temporary_path)
60
2/4
✓ Branch 2 taken 13 times.
✗ Branch 3 not taken.
✓ Branch 14 taken 13 times.
✗ Branch 15 not taken.
26 , x_amz_acl_("public-read") {
61
2/4
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 13 times.
✗ Branch 4 not taken.
13 assert(spooler_definition.IsValid()
62 && spooler_definition.driver_type == SpoolerDefinition::S3);
63
64 13 atomic_init32(&io_errors_);
65 13 const int mutex_ret = pthread_mutex_init(&delete_batch_mutex_, NULL);
66
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13 times.
13 assert(mutex_ret == 0);
67
68
2/4
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 13 times.
13 if (!ParseSpoolerDefinition(spooler_definition)) {
69 PANIC(kLogStderr, "Error in parsing the spooler definition");
70 }
71
72 // Disable batch delete for Azure (not supported)
73
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13 times.
13 if (authz_method_ == s3fanout::kAuthzAzure)
74 batch_delete_enabled_ = false;
75
76
1/2
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
13 s3fanout::S3FanoutManager::S3Config s3config;
77
1/2
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
13 s3config.access_key = access_key_;
78
1/2
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
13 s3config.secret_key = secret_key_;
79
1/2
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
13 s3config.hostname_port = host_name_port_;
80 13 s3config.authz_method = authz_method_;
81
1/2
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
13 s3config.region = region_;
82
1/2
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
13 s3config.flavor = flavor_;
83
1/2
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
13 s3config.bucket = bucket_;
84 13 s3config.dns_buckets = dns_buckets_;
85 13 s3config.pool_max_handles = num_parallel_uploads_;
86 13 s3config.opt_timeout_sec = timeout_sec_;
87 13 s3config.opt_max_retries = num_retries_;
88 13 s3config.opt_backoff_init_ms = kDefaultBackoffInitMs;
89 13 s3config.opt_backoff_max_ms = kDefaultBackoffMaxMs;
90
1/2
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
13 s3config.x_amz_acl = x_amz_acl_;
91
92
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13 times.
13 if (use_https_) {
93 s3config.protocol = "https";
94 } else {
95
1/2
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
13 s3config.protocol = "http";
96 }
97
1/2
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
13 s3config.proxy = proxy_;
98
99
3/6
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 13 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 13 times.
✗ Branch 8 not taken.
13 s3fanout_mgr_ = new s3fanout::S3FanoutManager(s3config);
100
1/2
✓ Branch 2 taken 13 times.
✗ Branch 3 not taken.
13 s3fanout_mgr_->Spawn();
101
102 13 const int retval = pthread_create(&thread_collect_results_, NULL,
103 MainCollectResults, this);
104
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13 times.
13 assert(retval == 0);
105 13 }
106
107
108 78 S3Uploader::~S3Uploader() {
109 // Signal termination to our own worker thread
110 26 s3fanout_mgr_->PushCompletedJob(NULL);
111 26 pthread_join(thread_collect_results_, NULL);
112 26 pthread_mutex_destroy(&delete_batch_mutex_);
113 52 }
114
115
116 13 bool S3Uploader::ParseSpoolerDefinition(
117 const SpoolerDefinition &spooler_definition) {
118 const std::vector<std::string> config = SplitString(
119
1/2
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
13 spooler_definition.spooler_configuration, '@');
120
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 13 times.
13 if (config.size() != 2) {
121 LogCvmfs(kLogUploadS3, kLogStderr,
122 "Failed to parse spooler configuration string '%s'.\n"
123 "Provide: <repo_alias>@/path/to/s3.conf",
124 spooler_definition.spooler_configuration.c_str());
125 return false;
126 }
127
1/2
✓ Branch 2 taken 13 times.
✗ Branch 3 not taken.
13 repository_alias_ = config[0];
128 13 const std::string &config_path = config[1];
129
130
2/4
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 13 times.
13 if (!FileExists(config_path)) {
131 LogCvmfs(kLogUploadS3, kLogStderr, "Cannot find S3 config file at '%s'",
132 config_path.c_str());
133 return false;
134 }
135
136 // Parse S3 configuration
137 BashOptionsManager options_manager = BashOptionsManager(
138
4/8
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 13 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 13 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 13 times.
✗ Branch 11 not taken.
13 new DefaultOptionsTemplateManager(repository_alias_));
139
1/2
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
13 options_manager.ParsePath(config_path, false);
140 13 std::string parameter;
141
142
3/6
✓ Branch 2 taken 13 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 13 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 13 times.
13 if (!options_manager.GetValue("CVMFS_S3_HOST", &host_name_)) {
143 LogCvmfs(kLogUploadS3, kLogStderr,
144 "Failed to parse CVMFS_S3_HOST from '%s'", config_path.c_str());
145 return false;
146 }
147
3/6
✓ Branch 2 taken 13 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 13 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 13 times.
13 if (!options_manager.GetValue("CVMFS_S3_ACCESS_KEY", &access_key_)) {
148 LogCvmfs(kLogUploadS3, kLogStderr,
149 "Failed to parse CVMFS_S3_ACCESS_KEY from '%s'.",
150 config_path.c_str());
151 return false;
152 }
153
3/6
✓ Branch 2 taken 13 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 13 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 13 times.
13 if (!options_manager.GetValue("CVMFS_S3_SECRET_KEY", &secret_key_)) {
154 LogCvmfs(kLogUploadS3, kLogStderr,
155 "Failed to parse CVMFS_S3_SECRET_KEY from '%s'.",
156 config_path.c_str());
157 return false;
158 }
159
3/6
✓ Branch 2 taken 13 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 13 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 13 times.
13 if (!options_manager.GetValue("CVMFS_S3_BUCKET", &bucket_)) {
160 LogCvmfs(kLogUploadS3, kLogStderr,
161 "Failed to parse CVMFS_S3_BUCKET from '%s'.", config_path.c_str());
162 return false;
163 }
164
3/6
✓ Branch 2 taken 13 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 13 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 13 times.
✗ Branch 10 not taken.
13 if (options_manager.GetValue("CVMFS_S3_DNS_BUCKETS", &parameter)) {
165
1/2
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
13 if (parameter == "false") {
166 13 dns_buckets_ = false;
167 }
168 }
169
3/6
✓ Branch 2 taken 13 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 13 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 13 times.
✗ Branch 10 not taken.
13 if (options_manager.GetValue("CVMFS_S3_MAX_NUMBER_OF_PARALLEL_CONNECTIONS",
170 &parameter)) {
171
1/2
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
13 num_parallel_uploads_ = String2Uint64(parameter);
172 }
173
3/6
✓ Branch 2 taken 13 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 13 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 13 times.
13 if (options_manager.GetValue("CVMFS_S3_MAX_RETRIES", &parameter)) {
174 num_retries_ = String2Uint64(parameter);
175 }
176
3/6
✓ Branch 2 taken 13 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 13 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 13 times.
13 if (options_manager.GetValue("CVMFS_S3_TIMEOUT", &parameter)) {
177 timeout_sec_ = String2Uint64(parameter);
178 }
179
3/6
✓ Branch 2 taken 13 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 13 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 13 times.
13 if (options_manager.GetValue("CVMFS_S3_REGION", &region_)) {
180 authz_method_ = s3fanout::kAuthzAwsV4;
181 }
182
3/6
✓ Branch 2 taken 13 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 13 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 13 times.
13 if (options_manager.GetValue("CVMFS_S3_FLAVOR", &flavor_)) {
183 if (flavor_ == "azure") {
184 authz_method_ = s3fanout::kAuthzAzure;
185 } else if (flavor_ == "awsv2") {
186 authz_method_ = s3fanout::kAuthzAwsV2;
187 } else if (flavor_ == "awsv4") {
188 authz_method_ = s3fanout::kAuthzAwsV4;
189 } else {
190 LogCvmfs(kLogUploadS3, kLogStderr,
191 "Failed to parse CVMFS_S3_FLAVOR from '%s', "
192 "valid options are azure, awsv2 or awsv4",
193 config_path.c_str());
194 return false;
195 }
196 }
197
3/6
✓ Branch 2 taken 13 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 13 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 13 times.
13 if (options_manager.GetValue("CVMFS_S3_PEEK_BEFORE_PUT", &parameter)) {
198 peek_before_put_ = options_manager.IsOn(parameter);
199 }
200
3/6
✓ Branch 2 taken 13 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 13 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 13 times.
13 if (options_manager.GetValue("CVMFS_S3_X_AMZ_ACL", &parameter)) {
201 bool isAllowed = false;
202 size_t const len = sizeof(x_amz_acl_allowed_values_)
203 / sizeof(x_amz_acl_allowed_values_[0]);
204 for (size_t i = 0; i < len; i++) {
205 if (x_amz_acl_allowed_values_[i] == parameter) {
206 isAllowed = true;
207 break;
208 }
209 }
210 if (!isAllowed) {
211 LogCvmfs(kLogUploadS3, kLogStderr,
212 "%s is not an allowed value for CVMFS_S3_X_AMZ_ACL",
213 parameter.c_str());
214 return false;
215 }
216 x_amz_acl_ = parameter;
217 }
218
219
3/6
✓ Branch 2 taken 13 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 13 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 13 times.
13 if (options_manager.GetValue("CVMFS_S3_BATCH_DELETE", &parameter)) {
220 batch_delete_enabled_ = options_manager.IsOn(parameter);
221 }
222
223
3/6
✓ Branch 2 taken 13 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 13 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 13 times.
13 if (options_manager.GetValue("CVMFS_S3_BATCH_DELETE_SIZE", &parameter)) {
224 const unsigned requested = String2Uint64(parameter);
225 if (requested == 0) {
226 LogCvmfs(kLogUploadS3, kLogStderr,
227 "CVMFS_S3_BATCH_DELETE_SIZE must be > 0, using default %u",
228 kDefaultBatchDeleteSize);
229 batch_delete_size_ = kDefaultBatchDeleteSize;
230 } else if (requested > kMaxBatchDeleteSize) {
231 LogCvmfs(kLogUploadS3, kLogStderr,
232 "Warning: CVMFS_S3_BATCH_DELETE_SIZE=%u exceeds the S3 "
233 "multi-object DELETE limit of %u, clamping to %u",
234 requested, kMaxBatchDeleteSize, kMaxBatchDeleteSize);
235 batch_delete_size_ = kMaxBatchDeleteSize;
236 } else {
237 batch_delete_size_ = requested;
238 }
239 }
240
241
3/6
✓ Branch 2 taken 13 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 13 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 13 times.
13 if (options_manager.GetValue("CVMFS_S3_USE_HTTPS", &parameter)) {
242 use_https_ = options_manager.IsOn(parameter);
243 }
244
245
3/6
✓ Branch 2 taken 13 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 13 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 13 times.
✗ Branch 10 not taken.
13 if (options_manager.GetValue("CVMFS_S3_PORT", &parameter)) {
246
2/4
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 13 times.
✗ Branch 5 not taken.
13 host_name_port_ = host_name_ + ":" + parameter;
247 } else {
248 host_name_port_ = host_name_;
249 }
250
251
3/6
✓ Branch 2 taken 13 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 13 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 13 times.
13 if (options_manager.IsDefined("CVMFS_S3_PROXY")) {
252 options_manager.GetValue("CVMFS_S3_PROXY", &proxy_);
253 }
254
255 13 return true;
256 13 }
257
258
259 15 bool S3Uploader::WillHandle(const SpoolerDefinition &spooler_definition) {
260 15 return spooler_definition.driver_type == SpoolerDefinition::S3;
261 }
262
263
264 bool S3Uploader::Create() {
265 if (!dns_buckets_)
266 return false;
267
268 s3fanout::JobInfo *info = CreateJobInfo("");
269 info->request = s3fanout::JobInfo::kReqPutBucket;
270 std::string request_content;
271 if (!region_.empty()) {
272 request_content = std::string("<CreateBucketConfiguration xmlns="
273 "\"http://s3.amazonaws.com/doc/2006-03-01/\">"
274 "<LocationConstraint>")
275 + region_
276 + "</LocationConstraint>"
277 "</CreateBucketConfiguration>";
278 info->origin->Append(request_content.data(), request_content.length());
279 info->origin->Commit();
280 }
281
282 RequestCtrl req_ctrl;
283 MakePipe(req_ctrl.pipe_wait);
284 info->callback = const_cast<void *>(static_cast<void const *>(
285 MakeClosure(&S3Uploader::OnReqComplete, this, &req_ctrl)));
286
287 IncJobsInFlight();
288 UploadJobInfo(info);
289 req_ctrl.WaitFor();
290
291 return req_ctrl.return_code == 0;
292 }
293
294
295 2 unsigned int S3Uploader::GetNumberOfErrors() const {
296 2 return atomic_read32(&io_errors_);
297 }
298
299
300 /**
301 * Worker thread takes care of requesting new jobs and cleaning old ones.
302 */
303 13 void *S3Uploader::MainCollectResults(void *data) {
304 13 LogCvmfs(kLogUploadS3, kLogDebug, "Upload_S3 WorkerThread started.");
305 13 S3Uploader *uploader = reinterpret_cast<S3Uploader *>(data);
306
307 while (true) {
308 633 s3fanout::JobInfo *info = uploader->s3fanout_mgr_->PopCompletedJob();
309
2/2
✓ Branch 0 taken 13 times.
✓ Branch 1 taken 620 times.
633 if (!info)
310 13 break;
311 // Report completed job
312 620 int reply_code = 0;
313
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 618 times.
620 if (info->error_code != s3fanout::kFailOk) {
314
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if ((info->request != s3fanout::JobInfo::kReqHeadOnly)
315
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 || (info->error_code != s3fanout::kFailNotFound)) {
316 if (info->request == s3fanout::JobInfo::kReqDeleteMulti) {
317 LogCvmfs(kLogUploadS3, kLogStderr,
318 "Batch delete of %lu objects failed. (error code: %d - %s)",
319 info->multi_delete_keys.size(), info->error_code,
320 s3fanout::Code2Ascii(info->error_code));
321 } else {
322 LogCvmfs(kLogUploadS3, kLogStderr,
323 "Upload job for '%s' failed. (error code: %d - %s)",
324 info->object_key.c_str(), info->error_code,
325 s3fanout::Code2Ascii(info->error_code));
326 }
327 reply_code = 99;
328 atomic_inc32(&uploader->io_errors_);
329 }
330 }
331
2/2
✓ Branch 0 taken 7 times.
✓ Branch 1 taken 613 times.
620 if (info->request == s3fanout::JobInfo::kReqDeleteMulti) {
332 // Parse response for per-key errors
333 7 std::set<std::string> failed_keys;
334 14 if (info->error_code == s3fanout::kFailOk
335
3/6
✓ Branch 0 taken 7 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 7 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 7 times.
7 && !info->response_body.empty()) {
336 std::vector<std::string> error_keys, error_codes, error_messages;
337 const unsigned num_errors = s3fanout::ParseDeleteMultiResponse(
338 info->response_body, &error_keys, &error_codes, &error_messages);
339 for (unsigned i = 0; i < num_errors; ++i) {
340 LogCvmfs(kLogUploadS3, kLogStderr,
341 "S3 multi-delete error for key '%s': %s - %s",
342 error_keys[i].c_str(), error_codes[i].c_str(),
343 error_messages[i].c_str());
344 atomic_inc32(&uploader->io_errors_);
345 failed_keys.insert(error_keys[i]);
346 }
347 }
348 // Decrement jobs_in_flight_ once for the entire batch.
349
2/4
✓ Branch 1 taken 7 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 7 times.
✗ Branch 5 not taken.
7 uploader->Respond(NULL, UploaderResults(UploaderResults::kRemove,
350
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7 times.
7 (info->error_code != s3fanout::kFailOk) ? 99 : 0));
351
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 613 times.
620 } else if (info->request == s3fanout::JobInfo::kReqDelete) {
352 uploader->Respond(NULL, UploaderResults());
353
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 608 times.
613 } else if (info->request == s3fanout::JobInfo::kReqHeadOnly) {
354
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 3 times.
5 if (info->error_code == s3fanout::kFailNotFound)
355 2 reply_code = 1;
356
1/2
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
5 uploader->Respond(static_cast<CallbackTN *>(info->callback),
357 10 UploaderResults(UploaderResults::kLookup, reply_code));
358 } else {
359
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 608 times.
608 if (info->request == s3fanout::JobInfo::kReqHeadPut) {
360 // The HEAD request was not transformed into a PUT request, thus this
361 // was a duplicate
362 // Uploaded catalogs are always unique ->
363 // assume this was a regular file and decrease appropriate counters
364 uploader->CountDuplicates();
365 uploader->DecUploadedChunks();
366 uploader->CountUploadedBytes(-(info->payload_size));
367 }
368 1216 uploader->Respond(
369
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 static_cast<CallbackTN *>(info->callback),
370 1216 UploaderResults(UploaderResults::kChunkCommit, reply_code));
371
372
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 608 times.
608 assert(!info->origin.IsValid());
373 }
374
1/2
✓ Branch 0 taken 620 times.
✗ Branch 1 not taken.
620 delete info;
375 620 }
376
377 13 LogCvmfs(kLogUploadS3, kLogDebug, "Upload_S3 WorkerThread finished.");
378 13 return NULL;
379 }
380
381
382 507 void S3Uploader::DoUpload(const std::string &remote_path,
383 IngestionSource *source,
384 const CallbackTN *callback) {
385
1/2
✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
507 bool rvb = source->Open();
386
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 507 times.
507 if (!rvb) {
387 Respond(callback, UploaderResults(100, source->GetPath()));
388 return;
389 }
390 uint64_t size;
391
1/2
✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
507 rvb = source->GetSize(&size);
392
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 507 times.
507 assert(rvb);
393
394 507 FileBackedBuffer *origin = FileBackedBuffer::Create(
395
1/2
✓ Branch 2 taken 507 times.
✗ Branch 3 not taken.
507 kInMemoryObjectThreshold, spooler_definition().temporary_path);
396
397 unsigned char buffer[kPageSize];
398 ssize_t nbytes;
399 do {
400
1/2
✓ Branch 1 taken 199167 times.
✗ Branch 2 not taken.
199167 nbytes = source->Read(buffer, kPageSize);
401
2/2
✓ Branch 0 taken 198831 times.
✓ Branch 1 taken 336 times.
199167 if (nbytes > 0)
402
1/2
✓ Branch 1 taken 198831 times.
✗ Branch 2 not taken.
198831 origin->Append(buffer, nbytes);
403
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 199167 times.
199167 if (nbytes < 0) {
404 source->Close();
405 delete origin;
406 Respond(callback, UploaderResults(100, source->GetPath()));
407 return;
408 }
409
2/2
✓ Branch 0 taken 198660 times.
✓ Branch 1 taken 507 times.
199167 } while (nbytes == kPageSize);
410
1/2
✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
507 source->Close();
411
1/2
✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
507 origin->Commit();
412
413 s3fanout::JobInfo *info = new s3fanout::JobInfo(
414
2/4
✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 507 times.
✗ Branch 5 not taken.
1014 repository_alias_ + "/" + remote_path,
415 const_cast<void *>(static_cast<void const *>(callback)),
416
2/4
✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 507 times.
✗ Branch 5 not taken.
507 origin);
417
418
3/6
✓ Branch 2 taken 507 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 507 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 507 times.
507 if (HasPrefix(remote_path, ".cvmfs", false /*ignore_case*/)) {
419 info->request = s3fanout::JobInfo::kReqPutDotCvmfs;
420
3/6
✓ Branch 2 taken 507 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 507 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 507 times.
507 } else if (HasSuffix(remote_path, ".html", false)) {
421 info->request = s3fanout::JobInfo::kReqPutHtml;
422 } else {
423
1/2
✓ Branch 0 taken 507 times.
✗ Branch 1 not taken.
507 if (peek_before_put_)
424 507 info->request = s3fanout::JobInfo::kReqHeadPut;
425 }
426
427 507 RequestCtrl req_ctrl;
428
1/2
✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
507 MakePipe(req_ctrl.pipe_wait);
429 507 req_ctrl.callback_forward = callback;
430
1/2
✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
507 req_ctrl.original_path = source->GetPath();
431 507 info->callback = const_cast<void *>(static_cast<void const *>(
432
1/2
✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
507 MakeClosure(&S3Uploader::OnReqComplete, this, &req_ctrl)));
433
434
1/2
✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
507 UploadJobInfo(info);
435
1/2
✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
507 req_ctrl.WaitFor();
436
1/2
✓ Branch 2 taken 507 times.
✗ Branch 3 not taken.
507 LogCvmfs(kLogUploadS3, kLogDebug, "Uploading from source finished: %s",
437
1/2
✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
1014 source->GetPath().c_str());
438 507 }
439
440
441 613 void S3Uploader::UploadJobInfo(s3fanout::JobInfo *info) {
442 613 LogCvmfs(kLogUploadS3, kLogDebug,
443 "Uploading:\n"
444 "--> Object: '%s'\n"
445 "--> Bucket: '%s'\n"
446 "--> Host: '%s'\n",
447 info->object_key.c_str(), bucket_.c_str(), host_name_port_.c_str());
448
449 613 s3fanout_mgr_->PushNewJob(info);
450 613 }
451
452
453 101 UploadStreamHandle *S3Uploader::InitStreamedUpload(const CallbackTN *callback) {
454 return new S3StreamHandle(callback, kInMemoryObjectThreshold,
455
1/2
✓ Branch 3 taken 101 times.
✗ Branch 4 not taken.
101 spooler_definition().temporary_path);
456 }
457
458
459 802 void S3Uploader::StreamedUpload(UploadStreamHandle *handle,
460 UploadBuffer buffer,
461 const CallbackTN *callback) {
462 802 S3StreamHandle *s3_handle = static_cast<S3StreamHandle *>(handle);
463
464 802 s3_handle->buffer->Append(buffer.data, buffer.size);
465
1/2
✓ Branch 2 taken 802 times.
✗ Branch 3 not taken.
802 Respond(callback, UploaderResults(UploaderResults::kBufferUpload, 0));
466 802 }
467
468
469 101 void S3Uploader::FinalizeStreamedUpload(UploadStreamHandle *handle,
470 const shash::Any &content_hash) {
471 101 S3StreamHandle *s3_handle = static_cast<S3StreamHandle *>(handle);
472
473 // New file name based on content hash or remote_path override
474 101 std::string final_path;
475
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 101 times.
101 if (s3_handle->remote_path != "") {
476 final_path = repository_alias_ + "/" + s3_handle->remote_path;
477 } else {
478
3/6
✓ Branch 1 taken 101 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 101 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 101 times.
✗ Branch 8 not taken.
101 final_path = repository_alias_ + "/data/" + content_hash.MakePath();
479 }
480
481
1/2
✓ Branch 2 taken 101 times.
✗ Branch 3 not taken.
101 s3_handle->buffer->Commit();
482
483
1/2
✓ Branch 2 taken 101 times.
✗ Branch 3 not taken.
101 const size_t bytes_uploaded = s3_handle->buffer->GetSize();
484
485 s3fanout::JobInfo *info = new s3fanout::JobInfo(
486 final_path,
487 101 const_cast<void *>(static_cast<void const *>(handle->commit_callback)),
488
2/4
✓ Branch 2 taken 101 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 101 times.
✗ Branch 6 not taken.
101 s3_handle->buffer.Release());
489
490
1/2
✓ Branch 0 taken 101 times.
✗ Branch 1 not taken.
101 if (peek_before_put_)
491 101 info->request = s3fanout::JobInfo::kReqHeadPut;
492
1/2
✓ Branch 1 taken 101 times.
✗ Branch 2 not taken.
101 UploadJobInfo(info);
493
494 // Remove the temporary file
495
1/2
✓ Branch 0 taken 101 times.
✗ Branch 1 not taken.
101 delete s3_handle;
496
497 // Update statistics counters
498 101 if (!content_hash.HasSuffix()
499
5/6
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 100 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 times.
✓ Branch 4 taken 100 times.
✓ Branch 5 taken 1 times.
101 || content_hash.suffix == shash::kSuffixPartial) {
500
1/2
✓ Branch 1 taken 100 times.
✗ Branch 2 not taken.
100 CountUploadedChunks();
501
1/2
✓ Branch 1 taken 100 times.
✗ Branch 2 not taken.
100 CountUploadedBytes(bytes_uploaded);
502
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 } else if (content_hash.suffix == shash::kSuffixCatalog) {
503 CountUploadedCatalogs();
504 CountUploadedCatalogBytes(bytes_uploaded);
505 }
506 101 }
507
508
509 5 s3fanout::JobInfo *S3Uploader::CreateJobInfo(const std::string &path) const {
510
2/4
✓ Branch 2 taken 5 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 5 times.
✗ Branch 6 not taken.
5 FileBackedBuffer *buf = FileBackedBuffer::Create(kInMemoryObjectThreshold);
511
1/2
✓ Branch 2 taken 5 times.
✗ Branch 3 not taken.
5 return new s3fanout::JobInfo(path, NULL, buf);
512 }
513
514
515 601 void S3Uploader::DoRemoveAsync(const std::string &file_to_delete) {
516
2/4
✓ Branch 1 taken 601 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 601 times.
✗ Branch 5 not taken.
601 const std::string mangled_path = repository_alias_ + "/" + file_to_delete;
517
518
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 601 times.
601 if (!batch_delete_enabled_) {
519 s3fanout::JobInfo *info = CreateJobInfo(mangled_path);
520 info->request = s3fanout::JobInfo::kReqDelete;
521 LogCvmfs(kLogUploadS3, kLogDebug, "Asynchronously removing %s/%s",
522 bucket_.c_str(), info->object_key.c_str());
523 s3fanout_mgr_->PushNewJob(info);
524 return;
525 }
526
527 // Batch delete: collect keys and flush when the S3 batch limit is reached.
528 //
529 // The caller (RemoveAsync) already incremented jobs_in_flight_ for this key.
530 // We undo that immediately: for batched deletes, jobs_in_flight_ is managed
531 // per-batch rather than per-key. FlushDeleteBatch() increments once when
532 // the batch is dispatched, and MainCollectResults decrements once when the
533 // batch response arrives. This avoids a deadlock where jobs_in_flight_
534 // saturates (e.g. at 512) before the batch threshold (1000) is reached,
535 // blocking forever on a flush that never happens.
536
1/2
✓ Branch 1 taken 601 times.
✗ Branch 2 not taken.
601 DecJobsInFlight();
537 601 const MutexLockGuard guard(delete_batch_mutex_);
538
1/2
✓ Branch 1 taken 601 times.
✗ Branch 2 not taken.
601 pending_deletes_.push_back(mangled_path);
539
2/2
✓ Branch 1 taken 6 times.
✓ Branch 2 taken 595 times.
601 if (pending_deletes_.size() >= batch_delete_size_) {
540
1/2
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
6 FlushDeleteBatch();
541 }
542
1/2
✓ Branch 2 taken 601 times.
✗ Branch 3 not taken.
601 }
543
544
545 19 void S3Uploader::FlushDeleteBatch() const {
546 // Caller must hold delete_batch_mutex_
547
2/2
✓ Branch 1 taken 12 times.
✓ Branch 2 taken 7 times.
19 if (pending_deletes_.empty())
548 12 return;
549
550
1/2
✓ Branch 2 taken 7 times.
✗ Branch 3 not taken.
7 LogCvmfs(kLogUploadS3, kLogDebug,
551 "Flushing batch delete of %lu objects",
552 pending_deletes_.size());
553
554 // Build XML request body
555
1/2
✓ Branch 1 taken 7 times.
✗ Branch 2 not taken.
7 const std::string xml = s3fanout::ComposeDeleteMultiXml(pending_deletes_);
556
2/4
✓ Branch 2 taken 7 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 7 times.
✗ Branch 6 not taken.
7 FileBackedBuffer *buf = FileBackedBuffer::Create(kInMemoryObjectThreshold);
557
1/2
✓ Branch 3 taken 7 times.
✗ Branch 4 not taken.
7 buf->Append(xml.data(), xml.length());
558
1/2
✓ Branch 1 taken 7 times.
✗ Branch 2 not taken.
7 buf->Commit();
559
560 // The object_key for multi-delete is empty (URL is bucket root + ?delete)
561
3/6
✓ Branch 2 taken 7 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 7 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 7 times.
✗ Branch 9 not taken.
7 s3fanout::JobInfo *info = new s3fanout::JobInfo("", NULL, buf);
562 7 info->request = s3fanout::JobInfo::kReqDeleteMulti;
563 7 info->multi_delete_keys.swap(pending_deletes_);
564
565 // Track this batch as a single job in flight. MainCollectResults will
566 // call Respond() exactly once for kReqDeleteMulti to balance it.
567
1/2
✓ Branch 1 taken 7 times.
✗ Branch 2 not taken.
7 IncJobsInFlight();
568
1/2
✓ Branch 2 taken 7 times.
✗ Branch 3 not taken.
7 s3fanout_mgr_->PushNewJob(info);
569 7 }
570
571
572 13 void S3Uploader::WaitForUpload() const {
573 {
574 13 const MutexLockGuard guard(delete_batch_mutex_);
575
1/2
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
13 FlushDeleteBatch();
576 13 }
577 13 AbstractUploader::WaitForUpload();
578 13 }
579
580
581 512 void S3Uploader::OnReqComplete(const upload::UploaderResults &results,
582 RequestCtrl *ctrl) {
583 512 ctrl->return_code = results.return_code;
584
2/2
✓ Branch 0 taken 507 times.
✓ Branch 1 taken 5 times.
512 if (ctrl->callback_forward != NULL) {
585 // We are already in Respond() so we must not call it again
586 507 const upload::UploaderResults fix_path(results.return_code,
587
1/2
✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
507 ctrl->original_path);
588
1/2
✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
507 (*(ctrl->callback_forward))(fix_path);
589
1/2
✓ Branch 0 taken 507 times.
✗ Branch 1 not taken.
507 delete ctrl->callback_forward;
590 507 ctrl->callback_forward = NULL;
591 507 }
592 512 char c = 'c';
593
1/2
✓ Branch 1 taken 512 times.
✗ Branch 2 not taken.
512 WritePipe(ctrl->pipe_wait[1], &c, 1);
594 512 }
595
596
597 5 bool S3Uploader::Peek(const std::string &path) {
598
2/4
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 5 times.
✗ Branch 5 not taken.
5 const std::string mangled_path = repository_alias_ + "/" + path;
599
1/2
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
5 s3fanout::JobInfo *info = CreateJobInfo(mangled_path);
600
601 5 RequestCtrl req_ctrl;
602
1/2
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
5 MakePipe(req_ctrl.pipe_wait);
603 5 info->request = s3fanout::JobInfo::kReqHeadOnly;
604 5 info->callback = const_cast<void *>(static_cast<void const *>(
605
1/2
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
5 MakeClosure(&S3Uploader::OnReqComplete, this, &req_ctrl)));
606
607
1/2
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
5 IncJobsInFlight();
608
1/2
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
5 UploadJobInfo(info);
609
1/2
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
5 req_ctrl.WaitFor();
610
611 5 return req_ctrl.return_code == 0;
612 5 }
613
614
615 // noop: no mkdir needed in S3 storage
616 bool S3Uploader::Mkdir(const std::string &path) { return true; }
617
618
619 bool S3Uploader::PlaceBootstrappingShortcut(const shash::Any &object) {
620 return false; // TODO(rmeusel): implement
621 }
622
623
624 int64_t S3Uploader::DoGetObjectSize(const std::string &file_name) {
625 // TODO(dosarudaniel): use a head request for byte count
626 // Re-enable 661 integration test when done
627 return -EOPNOTSUPP;
628 }
629
630 } // namespace upload
631