GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/upload_s3.cc
Date: 2026-04-12 02:41:08
Exec Total Coverage
Lines: 234 338 69.2%
Branches: 206 512 40.2%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "upload_s3.h"
6
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <inttypes.h>
10 #include <unistd.h>
11
12 #include <set>
13 #include <string>
14 #include <vector>
15
16 #include "compression/compression.h"
17 #include "network/s3fanout.h"
18 #include "options.h"
19 #include "util/exception.h"
20 #include "util/logging.h"
21 #include "util/mutex.h"
22 #include "util/posix.h"
23 #include "util/string.h"
24
25 namespace upload {
26
27 /*
28 * Allowed values of x-amz-acl according to S3 API
29 */
30 static const char *x_amz_acl_allowed_values_[8] = {"private",
31 "public-read",
32 "public-write",
33 "authenticated-read",
34 "aws-exec-read",
35 "bucket-owner-read",
36 "bucket-owner-full-control",
37 ""};
38
39 23040 void S3Uploader::RequestCtrl::WaitFor() {
40 char c;
41
1/2
✓ Branch 1 taken 23040 times.
✗ Branch 2 not taken.
23040 ReadPipe(pipe_wait[0], &c, 1);
42
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 23040 times.
23040 assert(c == 'c');
43
1/2
✓ Branch 1 taken 23040 times.
✗ Branch 2 not taken.
23040 ClosePipe(pipe_wait);
44 23040 }
45
46
47 585 S3Uploader::S3Uploader(const SpoolerDefinition &spooler_definition)
48 : AbstractUploader(spooler_definition)
49 585 , dns_buckets_(true)
50 585 , num_parallel_uploads_(kDefaultNumParallelUploads)
51 585 , num_retries_(kDefaultNumRetries)
52 585 , timeout_sec_(kDefaultTimeoutSec)
53 585 , authz_method_(s3fanout::kAuthzAwsV2)
54 585 , peek_before_put_(true)
55 585 , use_https_(false)
56 585 , batch_delete_enabled_(true)
57
1/2
✓ Branch 2 taken 585 times.
✗ Branch 3 not taken.
585 , proxy_("")
58
1/2
✓ Branch 1 taken 585 times.
✗ Branch 2 not taken.
585 , temporary_path_(spooler_definition.temporary_path)
59
2/4
✓ Branch 2 taken 585 times.
✗ Branch 3 not taken.
✓ Branch 14 taken 585 times.
✗ Branch 15 not taken.
1170 , x_amz_acl_("public-read") {
60
2/4
✓ Branch 1 taken 585 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 585 times.
✗ Branch 4 not taken.
585 assert(spooler_definition.IsValid()
61 && spooler_definition.driver_type == SpoolerDefinition::S3);
62
63 585 atomic_init32(&io_errors_);
64 585 const int mutex_ret = pthread_mutex_init(&delete_batch_mutex_, NULL);
65
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 585 times.
585 assert(mutex_ret == 0);
66
67
2/4
✓ Branch 1 taken 585 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 585 times.
585 if (!ParseSpoolerDefinition(spooler_definition)) {
68 PANIC(kLogStderr, "Error in parsing the spooler definition");
69 }
70
71 // Disable batch delete for Azure (not supported)
72
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 585 times.
585 if (authz_method_ == s3fanout::kAuthzAzure)
73 batch_delete_enabled_ = false;
74
75
1/2
✓ Branch 1 taken 585 times.
✗ Branch 2 not taken.
585 s3fanout::S3FanoutManager::S3Config s3config;
76
1/2
✓ Branch 1 taken 585 times.
✗ Branch 2 not taken.
585 s3config.access_key = access_key_;
77
1/2
✓ Branch 1 taken 585 times.
✗ Branch 2 not taken.
585 s3config.secret_key = secret_key_;
78
1/2
✓ Branch 1 taken 585 times.
✗ Branch 2 not taken.
585 s3config.hostname_port = host_name_port_;
79 585 s3config.authz_method = authz_method_;
80
1/2
✓ Branch 1 taken 585 times.
✗ Branch 2 not taken.
585 s3config.region = region_;
81
1/2
✓ Branch 1 taken 585 times.
✗ Branch 2 not taken.
585 s3config.flavor = flavor_;
82
1/2
✓ Branch 1 taken 585 times.
✗ Branch 2 not taken.
585 s3config.bucket = bucket_;
83 585 s3config.dns_buckets = dns_buckets_;
84 585 s3config.pool_max_handles = num_parallel_uploads_;
85 585 s3config.opt_timeout_sec = timeout_sec_;
86 585 s3config.opt_max_retries = num_retries_;
87 585 s3config.opt_backoff_init_ms = kDefaultBackoffInitMs;
88 585 s3config.opt_backoff_max_ms = kDefaultBackoffMaxMs;
89
1/2
✓ Branch 1 taken 585 times.
✗ Branch 2 not taken.
585 s3config.x_amz_acl = x_amz_acl_;
90
91
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 585 times.
585 if (use_https_) {
92 s3config.protocol = "https";
93 } else {
94
1/2
✓ Branch 1 taken 585 times.
✗ Branch 2 not taken.
585 s3config.protocol = "http";
95 }
96
1/2
✓ Branch 1 taken 585 times.
✗ Branch 2 not taken.
585 s3config.proxy = proxy_;
97
98
3/6
✓ Branch 1 taken 585 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 585 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 585 times.
✗ Branch 8 not taken.
585 s3fanout_mgr_ = new s3fanout::S3FanoutManager(s3config);
99
1/2
✓ Branch 2 taken 585 times.
✗ Branch 3 not taken.
585 s3fanout_mgr_->Spawn();
100
101 585 const int retval = pthread_create(&thread_collect_results_, NULL,
102 MainCollectResults, this);
103
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 585 times.
585 assert(retval == 0);
104 585 }
105
106
107 3510 S3Uploader::~S3Uploader() {
108 // Signal termination to our own worker thread
109 1170 s3fanout_mgr_->PushCompletedJob(NULL);
110 1170 pthread_join(thread_collect_results_, NULL);
111 1170 pthread_mutex_destroy(&delete_batch_mutex_);
112 2340 }
113
114
115 585 bool S3Uploader::ParseSpoolerDefinition(
116 const SpoolerDefinition &spooler_definition) {
117 const std::vector<std::string> config = SplitString(
118
1/2
✓ Branch 1 taken 585 times.
✗ Branch 2 not taken.
585 spooler_definition.spooler_configuration, '@');
119
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 585 times.
585 if (config.size() != 2) {
120 LogCvmfs(kLogUploadS3, kLogStderr,
121 "Failed to parse spooler configuration string '%s'.\n"
122 "Provide: <repo_alias>@/path/to/s3.conf",
123 spooler_definition.spooler_configuration.c_str());
124 return false;
125 }
126
1/2
✓ Branch 2 taken 585 times.
✗ Branch 3 not taken.
585 repository_alias_ = config[0];
127 585 const std::string &config_path = config[1];
128
129
2/4
✓ Branch 1 taken 585 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 585 times.
585 if (!FileExists(config_path)) {
130 LogCvmfs(kLogUploadS3, kLogStderr, "Cannot find S3 config file at '%s'",
131 config_path.c_str());
132 return false;
133 }
134
135 // Parse S3 configuration
136 BashOptionsManager options_manager = BashOptionsManager(
137
4/8
✓ Branch 1 taken 585 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 585 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 585 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 585 times.
✗ Branch 11 not taken.
585 new DefaultOptionsTemplateManager(repository_alias_));
138
1/2
✓ Branch 1 taken 585 times.
✗ Branch 2 not taken.
585 options_manager.ParsePath(config_path, false);
139 585 std::string parameter;
140
141
3/6
✓ Branch 2 taken 585 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 585 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 585 times.
585 if (!options_manager.GetValue("CVMFS_S3_HOST", &host_name_)) {
142 LogCvmfs(kLogUploadS3, kLogStderr,
143 "Failed to parse CVMFS_S3_HOST from '%s'", config_path.c_str());
144 return false;
145 }
146
3/6
✓ Branch 2 taken 585 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 585 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 585 times.
585 if (!options_manager.GetValue("CVMFS_S3_ACCESS_KEY", &access_key_)) {
147 LogCvmfs(kLogUploadS3, kLogStderr,
148 "Failed to parse CVMFS_S3_ACCESS_KEY from '%s'.",
149 config_path.c_str());
150 return false;
151 }
152
3/6
✓ Branch 2 taken 585 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 585 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 585 times.
585 if (!options_manager.GetValue("CVMFS_S3_SECRET_KEY", &secret_key_)) {
153 LogCvmfs(kLogUploadS3, kLogStderr,
154 "Failed to parse CVMFS_S3_SECRET_KEY from '%s'.",
155 config_path.c_str());
156 return false;
157 }
158
3/6
✓ Branch 2 taken 585 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 585 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 585 times.
585 if (!options_manager.GetValue("CVMFS_S3_BUCKET", &bucket_)) {
159 LogCvmfs(kLogUploadS3, kLogStderr,
160 "Failed to parse CVMFS_S3_BUCKET from '%s'.", config_path.c_str());
161 return false;
162 }
163
3/6
✓ Branch 2 taken 585 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 585 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 585 times.
✗ Branch 10 not taken.
585 if (options_manager.GetValue("CVMFS_S3_DNS_BUCKETS", &parameter)) {
164
1/2
✓ Branch 1 taken 585 times.
✗ Branch 2 not taken.
585 if (parameter == "false") {
165 585 dns_buckets_ = false;
166 }
167 }
168
3/6
✓ Branch 2 taken 585 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 585 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 585 times.
✗ Branch 10 not taken.
585 if (options_manager.GetValue("CVMFS_S3_MAX_NUMBER_OF_PARALLEL_CONNECTIONS",
169 &parameter)) {
170
1/2
✓ Branch 1 taken 585 times.
✗ Branch 2 not taken.
585 num_parallel_uploads_ = String2Uint64(parameter);
171 }
172
3/6
✓ Branch 2 taken 585 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 585 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 585 times.
585 if (options_manager.GetValue("CVMFS_S3_MAX_RETRIES", &parameter)) {
173 num_retries_ = String2Uint64(parameter);
174 }
175
3/6
✓ Branch 2 taken 585 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 585 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 585 times.
585 if (options_manager.GetValue("CVMFS_S3_TIMEOUT", &parameter)) {
176 timeout_sec_ = String2Uint64(parameter);
177 }
178
3/6
✓ Branch 2 taken 585 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 585 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 585 times.
585 if (options_manager.GetValue("CVMFS_S3_REGION", &region_)) {
179 authz_method_ = s3fanout::kAuthzAwsV4;
180 }
181
3/6
✓ Branch 2 taken 585 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 585 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 585 times.
585 if (options_manager.GetValue("CVMFS_S3_FLAVOR", &flavor_)) {
182 if (flavor_ == "azure") {
183 authz_method_ = s3fanout::kAuthzAzure;
184 } else if (flavor_ == "awsv2") {
185 authz_method_ = s3fanout::kAuthzAwsV2;
186 } else if (flavor_ == "awsv4") {
187 authz_method_ = s3fanout::kAuthzAwsV4;
188 } else {
189 LogCvmfs(kLogUploadS3, kLogStderr,
190 "Failed to parse CVMFS_S3_FLAVOR from '%s', "
191 "valid options are azure, awsv2 or awsv4",
192 config_path.c_str());
193 return false;
194 }
195 }
196
3/6
✓ Branch 2 taken 585 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 585 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 585 times.
585 if (options_manager.GetValue("CVMFS_S3_PEEK_BEFORE_PUT", &parameter)) {
197 peek_before_put_ = options_manager.IsOn(parameter);
198 }
199
3/6
✓ Branch 2 taken 585 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 585 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 585 times.
585 if (options_manager.GetValue("CVMFS_S3_X_AMZ_ACL", &parameter)) {
200 bool isAllowed = false;
201 size_t const len = sizeof(x_amz_acl_allowed_values_)
202 / sizeof(x_amz_acl_allowed_values_[0]);
203 for (size_t i = 0; i < len; i++) {
204 if (x_amz_acl_allowed_values_[i] == parameter) {
205 isAllowed = true;
206 break;
207 }
208 }
209 if (!isAllowed) {
210 LogCvmfs(kLogUploadS3, kLogStderr,
211 "%s is not an allowed value for CVMFS_S3_X_AMZ_ACL",
212 parameter.c_str());
213 return false;
214 }
215 x_amz_acl_ = parameter;
216 }
217
218
3/6
✓ Branch 2 taken 585 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 585 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 585 times.
585 if (options_manager.GetValue("CVMFS_S3_BATCH_DELETE", &parameter)) {
219 batch_delete_enabled_ = options_manager.IsOn(parameter);
220 }
221
222
3/6
✓ Branch 2 taken 585 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 585 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 585 times.
585 if (options_manager.GetValue("CVMFS_S3_USE_HTTPS", &parameter)) {
223 use_https_ = options_manager.IsOn(parameter);
224 }
225
226
3/6
✓ Branch 2 taken 585 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 585 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 585 times.
✗ Branch 10 not taken.
585 if (options_manager.GetValue("CVMFS_S3_PORT", &parameter)) {
227
2/4
✓ Branch 1 taken 585 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 585 times.
✗ Branch 5 not taken.
585 host_name_port_ = host_name_ + ":" + parameter;
228 } else {
229 host_name_port_ = host_name_;
230 }
231
232
3/6
✓ Branch 2 taken 585 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 585 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 585 times.
585 if (options_manager.IsDefined("CVMFS_S3_PROXY")) {
233 options_manager.GetValue("CVMFS_S3_PROXY", &proxy_);
234 }
235
236 585 return true;
237 585 }
238
239
240 687 bool S3Uploader::WillHandle(const SpoolerDefinition &spooler_definition) {
241 687 return spooler_definition.driver_type == SpoolerDefinition::S3;
242 }
243
244
245 bool S3Uploader::Create() {
246 if (!dns_buckets_)
247 return false;
248
249 s3fanout::JobInfo *info = CreateJobInfo("");
250 info->request = s3fanout::JobInfo::kReqPutBucket;
251 std::string request_content;
252 if (!region_.empty()) {
253 request_content = std::string("<CreateBucketConfiguration xmlns="
254 "\"http://s3.amazonaws.com/doc/2006-03-01/\">"
255 "<LocationConstraint>")
256 + region_
257 + "</LocationConstraint>"
258 "</CreateBucketConfiguration>";
259 info->origin->Append(request_content.data(), request_content.length());
260 info->origin->Commit();
261 }
262
263 RequestCtrl req_ctrl;
264 MakePipe(req_ctrl.pipe_wait);
265 info->callback = const_cast<void *>(static_cast<void const *>(
266 MakeClosure(&S3Uploader::OnReqComplete, this, &req_ctrl)));
267
268 IncJobsInFlight();
269 UploadJobInfo(info);
270 req_ctrl.WaitFor();
271
272 return req_ctrl.return_code == 0;
273 }
274
275
276 90 unsigned int S3Uploader::GetNumberOfErrors() const {
277 90 return atomic_read32(&io_errors_);
278 }
279
280
281 /**
282 * Worker thread takes care of requesting new jobs and cleaning old ones.
283 */
284 585 void *S3Uploader::MainCollectResults(void *data) {
285 585 LogCvmfs(kLogUploadS3, kLogDebug, "Upload_S3 WorkerThread started.");
286 585 S3Uploader *uploader = reinterpret_cast<S3Uploader *>(data);
287
288 while (true) {
289 28260 s3fanout::JobInfo *info = uploader->s3fanout_mgr_->PopCompletedJob();
290
2/2
✓ Branch 0 taken 585 times.
✓ Branch 1 taken 27675 times.
28260 if (!info)
291 585 break;
292 // Report completed job
293 27675 int reply_code = 0;
294
2/2
✓ Branch 0 taken 90 times.
✓ Branch 1 taken 27585 times.
27675 if (info->error_code != s3fanout::kFailOk) {
295
1/2
✓ Branch 0 taken 90 times.
✗ Branch 1 not taken.
90 if ((info->request != s3fanout::JobInfo::kReqHeadOnly)
296
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 90 times.
90 || (info->error_code != s3fanout::kFailNotFound)) {
297 if (info->request == s3fanout::JobInfo::kReqDeleteMulti) {
298 LogCvmfs(kLogUploadS3, kLogStderr,
299 "Batch delete of %lu objects failed. (error code: %d - %s)",
300 info->multi_delete_keys.size(), info->error_code,
301 s3fanout::Code2Ascii(info->error_code));
302 } else {
303 LogCvmfs(kLogUploadS3, kLogStderr,
304 "Upload job for '%s' failed. (error code: %d - %s)",
305 info->object_key.c_str(), info->error_code,
306 s3fanout::Code2Ascii(info->error_code));
307 }
308 reply_code = 99;
309 atomic_inc32(&uploader->io_errors_);
310 }
311 }
312
2/2
✓ Branch 0 taken 90 times.
✓ Branch 1 taken 27585 times.
27675 if (info->request == s3fanout::JobInfo::kReqDeleteMulti) {
313 // Parse response for per-key errors
314 90 std::set<std::string> failed_keys;
315 180 if (info->error_code == s3fanout::kFailOk
316
3/6
✓ Branch 0 taken 90 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 90 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 90 times.
90 && !info->response_body.empty()) {
317 std::vector<std::string> error_keys, error_codes, error_messages;
318 const unsigned num_errors = s3fanout::ParseDeleteMultiResponse(
319 info->response_body, &error_keys, &error_codes, &error_messages);
320 for (unsigned i = 0; i < num_errors; ++i) {
321 LogCvmfs(kLogUploadS3, kLogStderr,
322 "S3 multi-delete error for key '%s': %s - %s",
323 error_keys[i].c_str(), error_codes[i].c_str(),
324 error_messages[i].c_str());
325 atomic_inc32(&uploader->io_errors_);
326 failed_keys.insert(error_keys[i]);
327 }
328 }
329 // Decrement jobs_in_flight_ once for the entire batch.
330
2/4
✓ Branch 1 taken 90 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 90 times.
✗ Branch 5 not taken.
90 uploader->Respond(NULL, UploaderResults(UploaderResults::kRemove,
331
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 90 times.
90 (info->error_code != s3fanout::kFailOk) ? 99 : 0));
332
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 27585 times.
27675 } else if (info->request == s3fanout::JobInfo::kReqDelete) {
333 uploader->Respond(NULL, UploaderResults());
334
2/2
✓ Branch 0 taken 225 times.
✓ Branch 1 taken 27360 times.
27585 } else if (info->request == s3fanout::JobInfo::kReqHeadOnly) {
335
2/2
✓ Branch 0 taken 90 times.
✓ Branch 1 taken 135 times.
225 if (info->error_code == s3fanout::kFailNotFound)
336 90 reply_code = 1;
337
1/2
✓ Branch 1 taken 225 times.
✗ Branch 2 not taken.
225 uploader->Respond(static_cast<CallbackTN *>(info->callback),
338 450 UploaderResults(UploaderResults::kLookup, reply_code));
339 } else {
340
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 27360 times.
27360 if (info->request == s3fanout::JobInfo::kReqHeadPut) {
341 // The HEAD request was not transformed into a PUT request, thus this
342 // was a duplicate
343 // Uploaded catalogs are always unique ->
344 // assume this was a regular file and decrease appropriate counters
345 uploader->CountDuplicates();
346 uploader->DecUploadedChunks();
347 uploader->CountUploadedBytes(-(info->payload_size));
348 }
349 54720 uploader->Respond(
350
1/2
✓ Branch 1 taken 27360 times.
✗ Branch 2 not taken.
27360 static_cast<CallbackTN *>(info->callback),
351 54720 UploaderResults(UploaderResults::kChunkCommit, reply_code));
352
353
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 27360 times.
27360 assert(!info->origin.IsValid());
354 }
355
1/2
✓ Branch 0 taken 27675 times.
✗ Branch 1 not taken.
27675 delete info;
356 27675 }
357
358 585 LogCvmfs(kLogUploadS3, kLogDebug, "Upload_S3 WorkerThread finished.");
359 585 return NULL;
360 }
361
362
363 22815 void S3Uploader::DoUpload(const std::string &remote_path,
364 IngestionSource *source,
365 const CallbackTN *callback) {
366
1/2
✓ Branch 1 taken 22815 times.
✗ Branch 2 not taken.
22815 bool rvb = source->Open();
367
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 22815 times.
22815 if (!rvb) {
368 Respond(callback, UploaderResults(100, source->GetPath()));
369 return;
370 }
371 uint64_t size;
372
1/2
✓ Branch 1 taken 22815 times.
✗ Branch 2 not taken.
22815 rvb = source->GetSize(&size);
373
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 22815 times.
22815 assert(rvb);
374
375 22815 FileBackedBuffer *origin = FileBackedBuffer::Create(
376
1/2
✓ Branch 2 taken 22815 times.
✗ Branch 3 not taken.
22815 kInMemoryObjectThreshold, spooler_definition().temporary_path);
377
378 unsigned char buffer[kPageSize];
379 ssize_t nbytes;
380 do {
381
1/2
✓ Branch 1 taken 8962515 times.
✗ Branch 2 not taken.
8962515 nbytes = source->Read(buffer, kPageSize);
382
2/2
✓ Branch 0 taken 8947395 times.
✓ Branch 1 taken 15120 times.
8962515 if (nbytes > 0)
383
1/2
✓ Branch 1 taken 8947395 times.
✗ Branch 2 not taken.
8947395 origin->Append(buffer, nbytes);
384
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8962515 times.
8962515 if (nbytes < 0) {
385 source->Close();
386 delete origin;
387 Respond(callback, UploaderResults(100, source->GetPath()));
388 return;
389 }
390
2/2
✓ Branch 0 taken 8939700 times.
✓ Branch 1 taken 22815 times.
8962515 } while (nbytes == kPageSize);
391
1/2
✓ Branch 1 taken 22815 times.
✗ Branch 2 not taken.
22815 source->Close();
392
1/2
✓ Branch 1 taken 22815 times.
✗ Branch 2 not taken.
22815 origin->Commit();
393
394 s3fanout::JobInfo *info = new s3fanout::JobInfo(
395
2/4
✓ Branch 1 taken 22815 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 22815 times.
✗ Branch 5 not taken.
45630 repository_alias_ + "/" + remote_path,
396 const_cast<void *>(static_cast<void const *>(callback)),
397
2/4
✓ Branch 1 taken 22815 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 22815 times.
✗ Branch 5 not taken.
22815 origin);
398
399
3/6
✓ Branch 2 taken 22815 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 22815 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 22815 times.
22815 if (HasPrefix(remote_path, ".cvmfs", false /*ignore_case*/)) {
400 info->request = s3fanout::JobInfo::kReqPutDotCvmfs;
401
3/6
✓ Branch 2 taken 22815 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 22815 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 22815 times.
22815 } else if (HasSuffix(remote_path, ".html", false)) {
402 info->request = s3fanout::JobInfo::kReqPutHtml;
403 } else {
404
1/2
✓ Branch 0 taken 22815 times.
✗ Branch 1 not taken.
22815 if (peek_before_put_)
405 22815 info->request = s3fanout::JobInfo::kReqHeadPut;
406 }
407
408 22815 RequestCtrl req_ctrl;
409
1/2
✓ Branch 1 taken 22815 times.
✗ Branch 2 not taken.
22815 MakePipe(req_ctrl.pipe_wait);
410 22815 req_ctrl.callback_forward = callback;
411
1/2
✓ Branch 1 taken 22815 times.
✗ Branch 2 not taken.
22815 req_ctrl.original_path = source->GetPath();
412 22815 info->callback = const_cast<void *>(static_cast<void const *>(
413
1/2
✓ Branch 1 taken 22815 times.
✗ Branch 2 not taken.
22815 MakeClosure(&S3Uploader::OnReqComplete, this, &req_ctrl)));
414
415
1/2
✓ Branch 1 taken 22815 times.
✗ Branch 2 not taken.
22815 UploadJobInfo(info);
416
1/2
✓ Branch 1 taken 22815 times.
✗ Branch 2 not taken.
22815 req_ctrl.WaitFor();
417
1/2
✓ Branch 2 taken 22815 times.
✗ Branch 3 not taken.
22815 LogCvmfs(kLogUploadS3, kLogDebug, "Uploading from source finished: %s",
418
1/2
✓ Branch 1 taken 22815 times.
✗ Branch 2 not taken.
45630 source->GetPath().c_str());
419 22815 }
420
421
422 27585 void S3Uploader::UploadJobInfo(s3fanout::JobInfo *info) {
423 27585 LogCvmfs(kLogUploadS3, kLogDebug,
424 "Uploading:\n"
425 "--> Object: '%s'\n"
426 "--> Bucket: '%s'\n"
427 "--> Host: '%s'\n",
428 info->object_key.c_str(), bucket_.c_str(), host_name_port_.c_str());
429
430 27585 s3fanout_mgr_->PushNewJob(info);
431 27585 }
432
433
434 4545 UploadStreamHandle *S3Uploader::InitStreamedUpload(const CallbackTN *callback) {
435 return new S3StreamHandle(callback, kInMemoryObjectThreshold,
436
1/2
✓ Branch 3 taken 4545 times.
✗ Branch 4 not taken.
4545 spooler_definition().temporary_path);
437 }
438
439
440 36045 void S3Uploader::StreamedUpload(UploadStreamHandle *handle,
441 UploadBuffer buffer,
442 const CallbackTN *callback) {
443 36045 S3StreamHandle *s3_handle = static_cast<S3StreamHandle *>(handle);
444
445 36045 s3_handle->buffer->Append(buffer.data, buffer.size);
446
1/2
✓ Branch 2 taken 36090 times.
✗ Branch 3 not taken.
36090 Respond(callback, UploaderResults(UploaderResults::kBufferUpload, 0));
447 36090 }
448
449
450 4545 void S3Uploader::FinalizeStreamedUpload(UploadStreamHandle *handle,
451 const shash::Any &content_hash) {
452 4545 S3StreamHandle *s3_handle = static_cast<S3StreamHandle *>(handle);
453
454 // New file name based on content hash or remote_path override
455 4545 std::string final_path;
456
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4545 times.
4545 if (s3_handle->remote_path != "") {
457 final_path = repository_alias_ + "/" + s3_handle->remote_path;
458 } else {
459
3/6
✓ Branch 1 taken 4545 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 4545 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 4545 times.
✗ Branch 8 not taken.
4545 final_path = repository_alias_ + "/data/" + content_hash.MakePath();
460 }
461
462
1/2
✓ Branch 2 taken 4545 times.
✗ Branch 3 not taken.
4545 s3_handle->buffer->Commit();
463
464
1/2
✓ Branch 2 taken 4545 times.
✗ Branch 3 not taken.
4545 const size_t bytes_uploaded = s3_handle->buffer->GetSize();
465
466 s3fanout::JobInfo *info = new s3fanout::JobInfo(
467 final_path,
468 4545 const_cast<void *>(static_cast<void const *>(handle->commit_callback)),
469
2/4
✓ Branch 2 taken 4545 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4545 times.
✗ Branch 6 not taken.
4545 s3_handle->buffer.Release());
470
471
1/2
✓ Branch 0 taken 4545 times.
✗ Branch 1 not taken.
4545 if (peek_before_put_)
472 4545 info->request = s3fanout::JobInfo::kReqHeadPut;
473
1/2
✓ Branch 1 taken 4545 times.
✗ Branch 2 not taken.
4545 UploadJobInfo(info);
474
475 // Remove the temporary file
476
1/2
✓ Branch 0 taken 4545 times.
✗ Branch 1 not taken.
4545 delete s3_handle;
477
478 // Update statistics counters
479 4545 if (!content_hash.HasSuffix()
480
5/6
✓ Branch 0 taken 45 times.
✓ Branch 1 taken 4500 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 45 times.
✓ Branch 4 taken 4500 times.
✓ Branch 5 taken 45 times.
4545 || content_hash.suffix == shash::kSuffixPartial) {
481
1/2
✓ Branch 1 taken 4500 times.
✗ Branch 2 not taken.
4500 CountUploadedChunks();
482
1/2
✓ Branch 1 taken 4500 times.
✗ Branch 2 not taken.
4500 CountUploadedBytes(bytes_uploaded);
483
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 45 times.
45 } else if (content_hash.suffix == shash::kSuffixCatalog) {
484 CountUploadedCatalogs();
485 CountUploadedCatalogBytes(bytes_uploaded);
486 }
487 4545 }
488
489
490 225 s3fanout::JobInfo *S3Uploader::CreateJobInfo(const std::string &path) const {
491
2/4
✓ Branch 2 taken 225 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 225 times.
✗ Branch 6 not taken.
225 FileBackedBuffer *buf = FileBackedBuffer::Create(kInMemoryObjectThreshold);
492
1/2
✓ Branch 2 taken 225 times.
✗ Branch 3 not taken.
225 return new s3fanout::JobInfo(path, NULL, buf);
493 }
494
495
496 27045 void S3Uploader::DoRemoveAsync(const std::string &file_to_delete) {
497
2/4
✓ Branch 1 taken 27045 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 27045 times.
✗ Branch 5 not taken.
27045 const std::string mangled_path = repository_alias_ + "/" + file_to_delete;
498
499
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 27045 times.
27045 if (!batch_delete_enabled_) {
500 s3fanout::JobInfo *info = CreateJobInfo(mangled_path);
501 info->request = s3fanout::JobInfo::kReqDelete;
502 LogCvmfs(kLogUploadS3, kLogDebug, "Asynchronously removing %s/%s",
503 bucket_.c_str(), info->object_key.c_str());
504 s3fanout_mgr_->PushNewJob(info);
505 return;
506 }
507
508 // Batch delete: collect keys and flush when the S3 batch limit is reached.
509 //
510 // The caller (RemoveAsync) already incremented jobs_in_flight_ for this key.
511 // We undo that immediately: for batched deletes, jobs_in_flight_ is managed
512 // per-batch rather than per-key. FlushDeleteBatch() increments once when
513 // the batch is dispatched, and MainCollectResults decrements once when the
514 // batch response arrives. This avoids a deadlock where jobs_in_flight_
515 // saturates (e.g. at 512) before the batch threshold (1000) is reached,
516 // blocking forever on a flush that never happens.
517
1/2
✓ Branch 1 taken 27045 times.
✗ Branch 2 not taken.
27045 DecJobsInFlight();
518 27045 const MutexLockGuard guard(delete_batch_mutex_);
519
1/2
✓ Branch 1 taken 27045 times.
✗ Branch 2 not taken.
27045 pending_deletes_.push_back(mangled_path);
520
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 27045 times.
27045 if (pending_deletes_.size() >= kMaxBatchDeleteSize) {
521 FlushDeleteBatch();
522 }
523
1/2
✓ Branch 2 taken 27045 times.
✗ Branch 3 not taken.
27045 }
524
525
526 585 void S3Uploader::FlushDeleteBatch() const {
527 // Caller must hold delete_batch_mutex_
528
2/2
✓ Branch 1 taken 495 times.
✓ Branch 2 taken 90 times.
585 if (pending_deletes_.empty())
529 495 return;
530
531
1/2
✓ Branch 2 taken 90 times.
✗ Branch 3 not taken.
90 LogCvmfs(kLogUploadS3, kLogDebug,
532 "Flushing batch delete of %lu objects",
533 pending_deletes_.size());
534
535 // Build XML request body
536
1/2
✓ Branch 1 taken 90 times.
✗ Branch 2 not taken.
90 const std::string xml = s3fanout::ComposeDeleteMultiXml(pending_deletes_);
537
2/4
✓ Branch 2 taken 90 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 90 times.
✗ Branch 6 not taken.
90 FileBackedBuffer *buf = FileBackedBuffer::Create(kInMemoryObjectThreshold);
538
1/2
✓ Branch 3 taken 90 times.
✗ Branch 4 not taken.
90 buf->Append(xml.data(), xml.length());
539
1/2
✓ Branch 1 taken 90 times.
✗ Branch 2 not taken.
90 buf->Commit();
540
541 // The object_key for multi-delete is empty (URL is bucket root + ?delete)
542
3/6
✓ Branch 2 taken 90 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 90 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 90 times.
✗ Branch 9 not taken.
90 s3fanout::JobInfo *info = new s3fanout::JobInfo("", NULL, buf);
543 90 info->request = s3fanout::JobInfo::kReqDeleteMulti;
544 90 info->multi_delete_keys.swap(pending_deletes_);
545
546 // Track this batch as a single job in flight. MainCollectResults will
547 // call Respond() exactly once for kReqDeleteMulti to balance it.
548
1/2
✓ Branch 1 taken 90 times.
✗ Branch 2 not taken.
90 IncJobsInFlight();
549
1/2
✓ Branch 2 taken 90 times.
✗ Branch 3 not taken.
90 s3fanout_mgr_->PushNewJob(info);
550 90 }
551
552
553 585 void S3Uploader::WaitForUpload() const {
554 {
555 585 const MutexLockGuard guard(delete_batch_mutex_);
556
1/2
✓ Branch 1 taken 585 times.
✗ Branch 2 not taken.
585 FlushDeleteBatch();
557 585 }
558 585 AbstractUploader::WaitForUpload();
559 585 }
560
561
562 23040 void S3Uploader::OnReqComplete(const upload::UploaderResults &results,
563 RequestCtrl *ctrl) {
564 23040 ctrl->return_code = results.return_code;
565
2/2
✓ Branch 0 taken 22815 times.
✓ Branch 1 taken 225 times.
23040 if (ctrl->callback_forward != NULL) {
566 // We are already in Respond() so we must not call it again
567 22815 const upload::UploaderResults fix_path(results.return_code,
568
1/2
✓ Branch 1 taken 22815 times.
✗ Branch 2 not taken.
22815 ctrl->original_path);
569
1/2
✓ Branch 1 taken 22815 times.
✗ Branch 2 not taken.
22815 (*(ctrl->callback_forward))(fix_path);
570
1/2
✓ Branch 0 taken 22815 times.
✗ Branch 1 not taken.
22815 delete ctrl->callback_forward;
571 22815 ctrl->callback_forward = NULL;
572 22815 }
573 23040 char c = 'c';
574
1/2
✓ Branch 1 taken 23040 times.
✗ Branch 2 not taken.
23040 WritePipe(ctrl->pipe_wait[1], &c, 1);
575 23040 }
576
577
578 225 bool S3Uploader::Peek(const std::string &path) {
579
2/4
✓ Branch 1 taken 225 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 225 times.
✗ Branch 5 not taken.
225 const std::string mangled_path = repository_alias_ + "/" + path;
580
1/2
✓ Branch 1 taken 225 times.
✗ Branch 2 not taken.
225 s3fanout::JobInfo *info = CreateJobInfo(mangled_path);
581
582 225 RequestCtrl req_ctrl;
583
1/2
✓ Branch 1 taken 225 times.
✗ Branch 2 not taken.
225 MakePipe(req_ctrl.pipe_wait);
584 225 info->request = s3fanout::JobInfo::kReqHeadOnly;
585 225 info->callback = const_cast<void *>(static_cast<void const *>(
586
1/2
✓ Branch 1 taken 225 times.
✗ Branch 2 not taken.
225 MakeClosure(&S3Uploader::OnReqComplete, this, &req_ctrl)));
587
588
1/2
✓ Branch 1 taken 225 times.
✗ Branch 2 not taken.
225 IncJobsInFlight();
589
1/2
✓ Branch 1 taken 225 times.
✗ Branch 2 not taken.
225 UploadJobInfo(info);
590
1/2
✓ Branch 1 taken 225 times.
✗ Branch 2 not taken.
225 req_ctrl.WaitFor();
591
592 225 return req_ctrl.return_code == 0;
593 225 }
594
595
596 // noop: no mkdir needed in S3 storage
597 bool S3Uploader::Mkdir(const std::string &path) { return true; }
598
599
600 bool S3Uploader::PlaceBootstrappingShortcut(const shash::Any &object) {
601 return false; // TODO(rmeusel): implement
602 }
603
604
605 int64_t S3Uploader::DoGetObjectSize(const std::string &file_name) {
606 // TODO(dosarudaniel): use a head request for byte count
607 // Re-enable 661 integration test when done
608 return -EOPNOTSUPP;
609 }
610
611 } // namespace upload
612