Directory: | cvmfs/ |
---|---|
File: | cvmfs/upload_s3.cc |
Date: | 2025-06-22 02:36:02 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 202 | 285 | 70.9% |
Branches: | 179 | 436 | 41.1% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #include "upload_s3.h" | ||
6 | |||
7 | #include <errno.h> | ||
8 | #include <fcntl.h> | ||
9 | #include <inttypes.h> | ||
10 | #include <unistd.h> | ||
11 | |||
12 | #include <string> | ||
13 | #include <vector> | ||
14 | |||
15 | #include "compression/compression.h" | ||
16 | #include "network/s3fanout.h" | ||
17 | #include "options.h" | ||
18 | #include "util/exception.h" | ||
19 | #include "util/logging.h" | ||
20 | #include "util/posix.h" | ||
21 | #include "util/string.h" | ||
22 | |||
23 | namespace upload { | ||
24 | |||
25 | /* | ||
26 | * Allowed values of x-amz-acl according to S3 API | ||
27 | */ | ||
28 | static const char *x_amz_acl_allowed_values_[8] = {"private", | ||
29 | "public-read", | ||
30 | "public-write", | ||
31 | "authenticated-read", | ||
32 | "aws-exec-read", | ||
33 | "bucket-owner-read", | ||
34 | "bucket-owner-full-control", | ||
35 | ""}; | ||
36 | |||
37 | 7168 | void S3Uploader::RequestCtrl::WaitFor() { | |
38 | char c; | ||
39 |
1/2✓ Branch 1 taken 7168 times.
✗ Branch 2 not taken.
|
7168 | ReadPipe(pipe_wait[0], &c, 1); |
40 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7168 times.
|
7168 | assert(c == 'c'); |
41 |
1/2✓ Branch 1 taken 7168 times.
✗ Branch 2 not taken.
|
7168 | ClosePipe(pipe_wait); |
42 | 7168 | } | |
43 | |||
44 | |||
45 | 168 | S3Uploader::S3Uploader(const SpoolerDefinition &spooler_definition) | |
46 | : AbstractUploader(spooler_definition) | ||
47 | 168 | , dns_buckets_(true) | |
48 | 168 | , num_parallel_uploads_(kDefaultNumParallelUploads) | |
49 | 168 | , num_retries_(kDefaultNumRetries) | |
50 | 168 | , timeout_sec_(kDefaultTimeoutSec) | |
51 | 168 | , authz_method_(s3fanout::kAuthzAwsV2) | |
52 | 168 | , peek_before_put_(true) | |
53 | 168 | , use_https_(false) | |
54 |
1/2✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
|
168 | , proxy_("") |
55 |
1/2✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
|
168 | , temporary_path_(spooler_definition.temporary_path) |
56 |
2/4✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 14 taken 168 times.
✗ Branch 15 not taken.
|
336 | , x_amz_acl_("public-read") { |
57 |
2/4✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 168 times.
✗ Branch 4 not taken.
|
168 | assert(spooler_definition.IsValid() |
58 | && spooler_definition.driver_type == SpoolerDefinition::S3); | ||
59 | |||
60 | 168 | atomic_init32(&io_errors_); | |
61 | |||
62 |
2/4✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 168 times.
|
168 | if (!ParseSpoolerDefinition(spooler_definition)) { |
63 | ✗ | PANIC(kLogStderr, "Error in parsing the spooler definition"); | |
64 | } | ||
65 | |||
66 |
1/2✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
|
168 | s3fanout::S3FanoutManager::S3Config s3config; |
67 |
1/2✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
|
168 | s3config.access_key = access_key_; |
68 |
1/2✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
|
168 | s3config.secret_key = secret_key_; |
69 |
1/2✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
|
168 | s3config.hostname_port = host_name_port_; |
70 | 168 | s3config.authz_method = authz_method_; | |
71 |
1/2✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
|
168 | s3config.region = region_; |
72 |
1/2✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
|
168 | s3config.flavor = flavor_; |
73 |
1/2✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
|
168 | s3config.bucket = bucket_; |
74 | 168 | s3config.dns_buckets = dns_buckets_; | |
75 | 168 | s3config.pool_max_handles = num_parallel_uploads_; | |
76 | 168 | s3config.opt_timeout_sec = timeout_sec_; | |
77 | 168 | s3config.opt_max_retries = num_retries_; | |
78 | 168 | s3config.opt_backoff_init_ms = kDefaultBackoffInitMs; | |
79 | 168 | s3config.opt_backoff_max_ms = kDefaultBackoffMaxMs; | |
80 |
1/2✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
|
168 | s3config.x_amz_acl = x_amz_acl_; |
81 | |||
82 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 168 times.
|
168 | if (use_https_) { |
83 | ✗ | s3config.protocol = "https"; | |
84 | } else { | ||
85 |
1/2✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
|
168 | s3config.protocol = "http"; |
86 | } | ||
87 |
1/2✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
|
168 | s3config.proxy = proxy_; |
88 | |||
89 |
3/6✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 168 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 168 times.
✗ Branch 8 not taken.
|
168 | s3fanout_mgr_ = new s3fanout::S3FanoutManager(s3config); |
90 |
1/2✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
|
168 | s3fanout_mgr_->Spawn(); |
91 | |||
92 | const int retval = | ||
93 | 168 | pthread_create(&thread_collect_results_, NULL, MainCollectResults, this); | |
94 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 168 times.
|
168 | assert(retval == 0); |
95 | 168 | } | |
96 | |||
97 | |||
98 | 1008 | S3Uploader::~S3Uploader() { | |
99 | // Signal termination to our own worker thread | ||
100 | 336 | s3fanout_mgr_->PushCompletedJob(NULL); | |
101 | 336 | pthread_join(thread_collect_results_, NULL); | |
102 | 672 | } | |
103 | |||
104 | |||
105 | 168 | bool S3Uploader::ParseSpoolerDefinition( | |
106 | const SpoolerDefinition &spooler_definition) { | ||
107 | const std::vector<std::string> config = SplitString( | ||
108 |
1/2✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
|
168 | spooler_definition.spooler_configuration, '@'); |
109 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 168 times.
|
168 | if (config.size() != 2) { |
110 | ✗ | LogCvmfs(kLogUploadS3, kLogStderr, | |
111 | "Failed to parse spooler configuration string '%s'.\n" | ||
112 | "Provide: <repo_alias>@/path/to/s3.conf", | ||
113 | spooler_definition.spooler_configuration.c_str()); | ||
114 | ✗ | return false; | |
115 | } | ||
116 |
1/2✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
|
168 | repository_alias_ = config[0]; |
117 | 168 | const std::string &config_path = config[1]; | |
118 | |||
119 |
2/4✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 168 times.
|
168 | if (!FileExists(config_path)) { |
120 | ✗ | LogCvmfs(kLogUploadS3, kLogStderr, "Cannot find S3 config file at '%s'", | |
121 | config_path.c_str()); | ||
122 | ✗ | return false; | |
123 | } | ||
124 | |||
125 | // Parse S3 configuration | ||
126 | BashOptionsManager options_manager = BashOptionsManager( | ||
127 |
4/8✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 168 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 168 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 168 times.
✗ Branch 11 not taken.
|
168 | new DefaultOptionsTemplateManager(repository_alias_)); |
128 |
1/2✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
|
168 | options_manager.ParsePath(config_path, false); |
129 | 168 | std::string parameter; | |
130 | |||
131 |
3/6✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 168 times.
|
168 | if (!options_manager.GetValue("CVMFS_S3_HOST", &host_name_)) { |
132 | ✗ | LogCvmfs(kLogUploadS3, kLogStderr, | |
133 | "Failed to parse CVMFS_S3_HOST from '%s'", config_path.c_str()); | ||
134 | ✗ | return false; | |
135 | } | ||
136 |
3/6✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 168 times.
|
168 | if (!options_manager.GetValue("CVMFS_S3_ACCESS_KEY", &access_key_)) { |
137 | ✗ | LogCvmfs(kLogUploadS3, kLogStderr, | |
138 | "Failed to parse CVMFS_S3_ACCESS_KEY from '%s'.", | ||
139 | config_path.c_str()); | ||
140 | ✗ | return false; | |
141 | } | ||
142 |
3/6✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 168 times.
|
168 | if (!options_manager.GetValue("CVMFS_S3_SECRET_KEY", &secret_key_)) { |
143 | ✗ | LogCvmfs(kLogUploadS3, kLogStderr, | |
144 | "Failed to parse CVMFS_S3_SECRET_KEY from '%s'.", | ||
145 | config_path.c_str()); | ||
146 | ✗ | return false; | |
147 | } | ||
148 |
3/6✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 168 times.
|
168 | if (!options_manager.GetValue("CVMFS_S3_BUCKET", &bucket_)) { |
149 | ✗ | LogCvmfs(kLogUploadS3, kLogStderr, | |
150 | "Failed to parse CVMFS_S3_BUCKET from '%s'.", config_path.c_str()); | ||
151 | ✗ | return false; | |
152 | } | ||
153 |
3/6✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 168 times.
✗ Branch 10 not taken.
|
168 | if (options_manager.GetValue("CVMFS_S3_DNS_BUCKETS", ¶meter)) { |
154 |
1/2✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
|
168 | if (parameter == "false") { |
155 | 168 | dns_buckets_ = false; | |
156 | } | ||
157 | } | ||
158 |
3/6✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 168 times.
✗ Branch 10 not taken.
|
168 | if (options_manager.GetValue("CVMFS_S3_MAX_NUMBER_OF_PARALLEL_CONNECTIONS", |
159 | ¶meter)) { | ||
160 |
1/2✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
|
168 | num_parallel_uploads_ = String2Uint64(parameter); |
161 | } | ||
162 |
3/6✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 168 times.
|
168 | if (options_manager.GetValue("CVMFS_S3_MAX_RETRIES", ¶meter)) { |
163 | ✗ | num_retries_ = String2Uint64(parameter); | |
164 | } | ||
165 |
3/6✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 168 times.
|
168 | if (options_manager.GetValue("CVMFS_S3_TIMEOUT", ¶meter)) { |
166 | ✗ | timeout_sec_ = String2Uint64(parameter); | |
167 | } | ||
168 |
3/6✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 168 times.
|
168 | if (options_manager.GetValue("CVMFS_S3_REGION", ®ion_)) { |
169 | ✗ | authz_method_ = s3fanout::kAuthzAwsV4; | |
170 | } | ||
171 |
3/6✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 168 times.
|
168 | if (options_manager.GetValue("CVMFS_S3_FLAVOR", &flavor_)) { |
172 | ✗ | if (flavor_ == "azure") { | |
173 | ✗ | authz_method_ = s3fanout::kAuthzAzure; | |
174 | ✗ | } else if (flavor_ == "awsv2") { | |
175 | ✗ | authz_method_ = s3fanout::kAuthzAwsV2; | |
176 | ✗ | } else if (flavor_ == "awsv4") { | |
177 | ✗ | authz_method_ = s3fanout::kAuthzAwsV4; | |
178 | } else { | ||
179 | ✗ | LogCvmfs(kLogUploadS3, kLogStderr, | |
180 | "Failed to parse CVMFS_S3_FLAVOR from '%s', " | ||
181 | "valid options are azure, awsv2 or awsv4", | ||
182 | config_path.c_str()); | ||
183 | ✗ | return false; | |
184 | } | ||
185 | } | ||
186 |
3/6✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 168 times.
|
168 | if (options_manager.GetValue("CVMFS_S3_PEEK_BEFORE_PUT", ¶meter)) { |
187 | ✗ | peek_before_put_ = options_manager.IsOn(parameter); | |
188 | } | ||
189 |
3/6✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 168 times.
|
168 | if (options_manager.GetValue("CVMFS_S3_X_AMZ_ACL", ¶meter)) { |
190 | ✗ | bool isAllowed = false; | |
191 | ✗ | size_t const len = sizeof(x_amz_acl_allowed_values_) | |
192 | / sizeof(x_amz_acl_allowed_values_[0]); | ||
193 | ✗ | for (size_t i = 0; i < len; i++) { | |
194 | ✗ | if (x_amz_acl_allowed_values_[i] == parameter) { | |
195 | ✗ | isAllowed = true; | |
196 | ✗ | break; | |
197 | } | ||
198 | } | ||
199 | ✗ | if (!isAllowed) { | |
200 | ✗ | LogCvmfs(kLogUploadS3, kLogStderr, | |
201 | "%s is not an allowed value for CVMFS_S3_X_AMZ_ACL", | ||
202 | parameter.c_str()); | ||
203 | ✗ | return false; | |
204 | } | ||
205 | ✗ | x_amz_acl_ = parameter; | |
206 | } | ||
207 | |||
208 |
3/6✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 168 times.
|
168 | if (options_manager.GetValue("CVMFS_S3_USE_HTTPS", ¶meter)) { |
209 | ✗ | use_https_ = options_manager.IsOn(parameter); | |
210 | } | ||
211 | |||
212 |
3/6✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 168 times.
✗ Branch 10 not taken.
|
168 | if (options_manager.GetValue("CVMFS_S3_PORT", ¶meter)) { |
213 |
2/4✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 168 times.
✗ Branch 5 not taken.
|
168 | host_name_port_ = host_name_ + ":" + parameter; |
214 | } else { | ||
215 | ✗ | host_name_port_ = host_name_; | |
216 | } | ||
217 | |||
218 |
3/6✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 168 times.
|
168 | if (options_manager.IsDefined("CVMFS_S3_PROXY")) { |
219 | ✗ | options_manager.GetValue("CVMFS_S3_PROXY", &proxy_); | |
220 | } | ||
221 | |||
222 | 168 | return true; | |
223 | 168 | } | |
224 | |||
225 | |||
226 | 225 | bool S3Uploader::WillHandle(const SpoolerDefinition &spooler_definition) { | |
227 | 225 | return spooler_definition.driver_type == SpoolerDefinition::S3; | |
228 | } | ||
229 | |||
230 | |||
231 | ✗ | bool S3Uploader::Create() { | |
232 | ✗ | if (!dns_buckets_) | |
233 | ✗ | return false; | |
234 | |||
235 | ✗ | s3fanout::JobInfo *info = CreateJobInfo(""); | |
236 | ✗ | info->request = s3fanout::JobInfo::kReqPutBucket; | |
237 | ✗ | std::string request_content; | |
238 | ✗ | if (!region_.empty()) { | |
239 | ✗ | request_content = std::string("<CreateBucketConfiguration xmlns=" | |
240 | "\"http://s3.amazonaws.com/doc/2006-03-01/\">" | ||
241 | "<LocationConstraint>") | ||
242 | ✗ | + region_ | |
243 | ✗ | + "</LocationConstraint>" | |
244 | ✗ | "</CreateBucketConfiguration>"; | |
245 | ✗ | info->origin->Append(request_content.data(), request_content.length()); | |
246 | ✗ | info->origin->Commit(); | |
247 | } | ||
248 | |||
249 | ✗ | RequestCtrl req_ctrl; | |
250 | ✗ | MakePipe(req_ctrl.pipe_wait); | |
251 | ✗ | info->callback = const_cast<void *>(static_cast<void const *>( | |
252 | ✗ | MakeClosure(&S3Uploader::OnReqComplete, this, &req_ctrl))); | |
253 | |||
254 | ✗ | IncJobsInFlight(); | |
255 | ✗ | UploadJobInfo(info); | |
256 | ✗ | req_ctrl.WaitFor(); | |
257 | |||
258 | ✗ | return req_ctrl.return_code == 0; | |
259 | } | ||
260 | |||
261 | |||
262 | 14 | unsigned int S3Uploader::GetNumberOfErrors() const { | |
263 | 14 | return atomic_read32(&io_errors_); | |
264 | } | ||
265 | |||
266 | |||
267 | /** | ||
268 | * Worker thread takes care of requesting new jobs and cleaning old ones. | ||
269 | */ | ||
270 | 168 | void *S3Uploader::MainCollectResults(void *data) { | |
271 | 168 | LogCvmfs(kLogUploadS3, kLogDebug, "Upload_S3 WorkerThread started."); | |
272 | 168 | S3Uploader *uploader = reinterpret_cast<S3Uploader *>(data); | |
273 | |||
274 | while (true) { | ||
275 | 8764 | s3fanout::JobInfo *info = uploader->s3fanout_mgr_->PopCompletedJob(); | |
276 |
2/2✓ Branch 0 taken 168 times.
✓ Branch 1 taken 8596 times.
|
8764 | if (!info) |
277 | 168 | break; | |
278 | // Report completed job | ||
279 | 8596 | int reply_code = 0; | |
280 |
2/2✓ Branch 0 taken 28 times.
✓ Branch 1 taken 8568 times.
|
8596 | if (info->error_code != s3fanout::kFailOk) { |
281 |
1/2✓ Branch 0 taken 28 times.
✗ Branch 1 not taken.
|
28 | if ((info->request != s3fanout::JobInfo::kReqHeadOnly) |
282 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 28 times.
|
28 | || (info->error_code != s3fanout::kFailNotFound)) { |
283 | ✗ | LogCvmfs(kLogUploadS3, kLogStderr, | |
284 | "Upload job for '%s' failed. (error code: %d - %s)", | ||
285 | ✗ | info->object_key.c_str(), info->error_code, | |
286 | s3fanout::Code2Ascii(info->error_code)); | ||
287 | ✗ | reply_code = 99; | |
288 | ✗ | atomic_inc32(&uploader->io_errors_); | |
289 | } | ||
290 | } | ||
291 |
2/2✓ Branch 0 taken 14 times.
✓ Branch 1 taken 8582 times.
|
8596 | if (info->request == s3fanout::JobInfo::kReqDelete) { |
292 |
1/2✓ Branch 2 taken 14 times.
✗ Branch 3 not taken.
|
14 | uploader->Respond(NULL, UploaderResults()); |
293 |
2/2✓ Branch 0 taken 70 times.
✓ Branch 1 taken 8512 times.
|
8582 | } else if (info->request == s3fanout::JobInfo::kReqHeadOnly) { |
294 |
2/2✓ Branch 0 taken 28 times.
✓ Branch 1 taken 42 times.
|
70 | if (info->error_code == s3fanout::kFailNotFound) |
295 | 28 | reply_code = 1; | |
296 |
1/2✓ Branch 1 taken 70 times.
✗ Branch 2 not taken.
|
70 | uploader->Respond(static_cast<CallbackTN *>(info->callback), |
297 | 140 | UploaderResults(UploaderResults::kLookup, reply_code)); | |
298 | } else { | ||
299 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 8512 times.
|
8512 | if (info->request == s3fanout::JobInfo::kReqHeadPut) { |
300 | // The HEAD request was not transformed into a PUT request, thus this | ||
301 | // was a duplicate | ||
302 | // Uploaded catalogs are always unique -> | ||
303 | // assume this was a regular file and decrease appropriate counters | ||
304 | ✗ | uploader->CountDuplicates(); | |
305 | ✗ | uploader->DecUploadedChunks(); | |
306 | ✗ | uploader->CountUploadedBytes(-(info->payload_size)); | |
307 | } | ||
308 | 17024 | uploader->Respond( | |
309 |
1/2✓ Branch 1 taken 8512 times.
✗ Branch 2 not taken.
|
8512 | static_cast<CallbackTN *>(info->callback), |
310 | 17024 | UploaderResults(UploaderResults::kChunkCommit, reply_code)); | |
311 | |||
312 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 8512 times.
|
8512 | assert(!info->origin.IsValid()); |
313 | } | ||
314 |
1/2✓ Branch 0 taken 8596 times.
✗ Branch 1 not taken.
|
8596 | delete info; |
315 | 8596 | } | |
316 | |||
317 | 168 | LogCvmfs(kLogUploadS3, kLogDebug, "Upload_S3 WorkerThread finished."); | |
318 | 168 | return NULL; | |
319 | } | ||
320 | |||
321 | |||
322 | 7098 | void S3Uploader::DoUpload(const std::string &remote_path, | |
323 | IngestionSource *source, | ||
324 | const CallbackTN *callback) { | ||
325 |
1/2✓ Branch 1 taken 7098 times.
✗ Branch 2 not taken.
|
7098 | bool rvb = source->Open(); |
326 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7098 times.
|
7098 | if (!rvb) { |
327 | ✗ | Respond(callback, UploaderResults(100, source->GetPath())); | |
328 | ✗ | return; | |
329 | } | ||
330 | uint64_t size; | ||
331 |
1/2✓ Branch 1 taken 7098 times.
✗ Branch 2 not taken.
|
7098 | rvb = source->GetSize(&size); |
332 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7098 times.
|
7098 | assert(rvb); |
333 | |||
334 | 7098 | FileBackedBuffer *origin = FileBackedBuffer::Create( | |
335 |
1/2✓ Branch 2 taken 7098 times.
✗ Branch 3 not taken.
|
7098 | kInMemoryObjectThreshold, spooler_definition().temporary_path); |
336 | |||
337 | unsigned char buffer[kPageSize]; | ||
338 | ssize_t nbytes; | ||
339 | do { | ||
340 |
1/2✓ Branch 1 taken 2788338 times.
✗ Branch 2 not taken.
|
2788338 | nbytes = source->Read(buffer, kPageSize); |
341 |
2/2✓ Branch 0 taken 2783634 times.
✓ Branch 1 taken 4704 times.
|
2788338 | if (nbytes > 0) |
342 |
1/2✓ Branch 1 taken 2783634 times.
✗ Branch 2 not taken.
|
2783634 | origin->Append(buffer, nbytes); |
343 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2788338 times.
|
2788338 | if (nbytes < 0) { |
344 | ✗ | source->Close(); | |
345 | ✗ | delete origin; | |
346 | ✗ | Respond(callback, UploaderResults(100, source->GetPath())); | |
347 | ✗ | return; | |
348 | } | ||
349 |
2/2✓ Branch 0 taken 2781240 times.
✓ Branch 1 taken 7098 times.
|
2788338 | } while (nbytes == kPageSize); |
350 |
1/2✓ Branch 1 taken 7098 times.
✗ Branch 2 not taken.
|
7098 | source->Close(); |
351 |
1/2✓ Branch 1 taken 7098 times.
✗ Branch 2 not taken.
|
7098 | origin->Commit(); |
352 | |||
353 | s3fanout::JobInfo *info = new s3fanout::JobInfo( | ||
354 |
2/4✓ Branch 1 taken 7098 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 7098 times.
✗ Branch 5 not taken.
|
14196 | repository_alias_ + "/" + remote_path, |
355 | const_cast<void *>(static_cast<void const *>(callback)), | ||
356 |
2/4✓ Branch 1 taken 7098 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 7098 times.
✗ Branch 5 not taken.
|
7098 | origin); |
357 | |||
358 |
3/6✓ Branch 2 taken 7098 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 7098 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 7098 times.
|
7098 | if (HasPrefix(remote_path, ".cvmfs", false /*ignore_case*/)) { |
359 | ✗ | info->request = s3fanout::JobInfo::kReqPutDotCvmfs; | |
360 |
3/6✓ Branch 2 taken 7098 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 7098 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 7098 times.
|
7098 | } else if (HasSuffix(remote_path, ".html", false)) { |
361 | ✗ | info->request = s3fanout::JobInfo::kReqPutHtml; | |
362 | } else { | ||
363 |
1/2✓ Branch 0 taken 7098 times.
✗ Branch 1 not taken.
|
7098 | if (peek_before_put_) |
364 | 7098 | info->request = s3fanout::JobInfo::kReqHeadPut; | |
365 | } | ||
366 | |||
367 | 7098 | RequestCtrl req_ctrl; | |
368 |
1/2✓ Branch 1 taken 7098 times.
✗ Branch 2 not taken.
|
7098 | MakePipe(req_ctrl.pipe_wait); |
369 | 7098 | req_ctrl.callback_forward = callback; | |
370 |
1/2✓ Branch 1 taken 7098 times.
✗ Branch 2 not taken.
|
7098 | req_ctrl.original_path = source->GetPath(); |
371 | 7098 | info->callback = const_cast<void *>(static_cast<void const *>( | |
372 |
1/2✓ Branch 1 taken 7098 times.
✗ Branch 2 not taken.
|
7098 | MakeClosure(&S3Uploader::OnReqComplete, this, &req_ctrl))); |
373 | |||
374 |
1/2✓ Branch 1 taken 7098 times.
✗ Branch 2 not taken.
|
7098 | UploadJobInfo(info); |
375 |
1/2✓ Branch 1 taken 7098 times.
✗ Branch 2 not taken.
|
7098 | req_ctrl.WaitFor(); |
376 |
1/2✓ Branch 2 taken 7098 times.
✗ Branch 3 not taken.
|
7098 | LogCvmfs(kLogUploadS3, kLogDebug, "Uploading from source finished: %s", |
377 |
1/2✓ Branch 1 taken 7098 times.
✗ Branch 2 not taken.
|
14196 | source->GetPath().c_str()); |
378 | 7098 | } | |
379 | |||
380 | |||
381 | 8582 | void S3Uploader::UploadJobInfo(s3fanout::JobInfo *info) { | |
382 | 8582 | LogCvmfs(kLogUploadS3, kLogDebug, | |
383 | "Uploading:\n" | ||
384 | "--> Object: '%s'\n" | ||
385 | "--> Bucket: '%s'\n" | ||
386 | "--> Host: '%s'\n", | ||
387 | info->object_key.c_str(), bucket_.c_str(), host_name_port_.c_str()); | ||
388 | |||
389 | 8582 | s3fanout_mgr_->PushNewJob(info); | |
390 | 8582 | } | |
391 | |||
392 | |||
393 | 1414 | UploadStreamHandle *S3Uploader::InitStreamedUpload(const CallbackTN *callback) { | |
394 | return new S3StreamHandle(callback, kInMemoryObjectThreshold, | ||
395 |
1/2✓ Branch 3 taken 1414 times.
✗ Branch 4 not taken.
|
1414 | spooler_definition().temporary_path); |
396 | } | ||
397 | |||
398 | |||
399 | 11228 | void S3Uploader::StreamedUpload(UploadStreamHandle *handle, | |
400 | UploadBuffer buffer, | ||
401 | const CallbackTN *callback) { | ||
402 | 11228 | S3StreamHandle *s3_handle = static_cast<S3StreamHandle *>(handle); | |
403 | |||
404 | 11228 | s3_handle->buffer->Append(buffer.data, buffer.size); | |
405 |
1/2✓ Branch 2 taken 11228 times.
✗ Branch 3 not taken.
|
11228 | Respond(callback, UploaderResults(UploaderResults::kBufferUpload, 0)); |
406 | 11228 | } | |
407 | |||
408 | |||
409 | 1414 | void S3Uploader::FinalizeStreamedUpload(UploadStreamHandle *handle, | |
410 | const shash::Any &content_hash) { | ||
411 | 1414 | S3StreamHandle *s3_handle = static_cast<S3StreamHandle *>(handle); | |
412 | |||
413 | // New file name based on content hash or remote_path override | ||
414 | 1414 | std::string final_path; | |
415 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 1414 times.
|
1414 | if (s3_handle->remote_path != "") { |
416 | ✗ | final_path = repository_alias_ + "/" + s3_handle->remote_path; | |
417 | } else { | ||
418 |
3/6✓ Branch 1 taken 1414 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1414 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 1414 times.
✗ Branch 8 not taken.
|
1414 | final_path = repository_alias_ + "/data/" + content_hash.MakePath(); |
419 | } | ||
420 | |||
421 |
1/2✓ Branch 2 taken 1414 times.
✗ Branch 3 not taken.
|
1414 | s3_handle->buffer->Commit(); |
422 | |||
423 |
1/2✓ Branch 2 taken 1414 times.
✗ Branch 3 not taken.
|
1414 | const size_t bytes_uploaded = s3_handle->buffer->GetSize(); |
424 | |||
425 | s3fanout::JobInfo *info = new s3fanout::JobInfo( | ||
426 | final_path, | ||
427 | 1414 | const_cast<void *>(static_cast<void const *>(handle->commit_callback)), | |
428 |
2/4✓ Branch 2 taken 1414 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1414 times.
✗ Branch 6 not taken.
|
1414 | s3_handle->buffer.Release()); |
429 | |||
430 |
1/2✓ Branch 0 taken 1414 times.
✗ Branch 1 not taken.
|
1414 | if (peek_before_put_) |
431 | 1414 | info->request = s3fanout::JobInfo::kReqHeadPut; | |
432 |
1/2✓ Branch 1 taken 1414 times.
✗ Branch 2 not taken.
|
1414 | UploadJobInfo(info); |
433 | |||
434 | // Remove the temporary file | ||
435 |
1/2✓ Branch 0 taken 1414 times.
✗ Branch 1 not taken.
|
1414 | delete s3_handle; |
436 | |||
437 | // Update statistics counters | ||
438 | 1414 | if (!content_hash.HasSuffix() | |
439 |
5/6✓ Branch 0 taken 14 times.
✓ Branch 1 taken 1400 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 14 times.
✓ Branch 4 taken 1400 times.
✓ Branch 5 taken 14 times.
|
1414 | || content_hash.suffix == shash::kSuffixPartial) { |
440 |
1/2✓ Branch 1 taken 1400 times.
✗ Branch 2 not taken.
|
1400 | CountUploadedChunks(); |
441 |
1/2✓ Branch 1 taken 1400 times.
✗ Branch 2 not taken.
|
1400 | CountUploadedBytes(bytes_uploaded); |
442 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 14 times.
|
14 | } else if (content_hash.suffix == shash::kSuffixCatalog) { |
443 | ✗ | CountUploadedCatalogs(); | |
444 | ✗ | CountUploadedCatalogBytes(bytes_uploaded); | |
445 | } | ||
446 | 1414 | } | |
447 | |||
448 | |||
449 | 84 | s3fanout::JobInfo *S3Uploader::CreateJobInfo(const std::string &path) const { | |
450 |
2/4✓ Branch 2 taken 84 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 84 times.
✗ Branch 6 not taken.
|
84 | FileBackedBuffer *buf = FileBackedBuffer::Create(kInMemoryObjectThreshold); |
451 |
1/2✓ Branch 2 taken 84 times.
✗ Branch 3 not taken.
|
84 | return new s3fanout::JobInfo(path, NULL, buf); |
452 | } | ||
453 | |||
454 | |||
455 | 14 | void S3Uploader::DoRemoveAsync(const std::string &file_to_delete) { | |
456 |
2/4✓ Branch 1 taken 14 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 14 times.
✗ Branch 5 not taken.
|
14 | const std::string mangled_path = repository_alias_ + "/" + file_to_delete; |
457 |
1/2✓ Branch 1 taken 14 times.
✗ Branch 2 not taken.
|
14 | s3fanout::JobInfo *info = CreateJobInfo(mangled_path); |
458 | |||
459 | 14 | info->request = s3fanout::JobInfo::kReqDelete; | |
460 | |||
461 |
1/2✓ Branch 3 taken 14 times.
✗ Branch 4 not taken.
|
14 | LogCvmfs(kLogUploadS3, kLogDebug, "Asynchronously removing %s/%s", |
462 | bucket_.c_str(), info->object_key.c_str()); | ||
463 |
1/2✓ Branch 2 taken 14 times.
✗ Branch 3 not taken.
|
14 | s3fanout_mgr_->PushNewJob(info); |
464 | 14 | } | |
465 | |||
466 | |||
467 | 7168 | void S3Uploader::OnReqComplete(const upload::UploaderResults &results, | |
468 | RequestCtrl *ctrl) { | ||
469 | 7168 | ctrl->return_code = results.return_code; | |
470 |
2/2✓ Branch 0 taken 7098 times.
✓ Branch 1 taken 70 times.
|
7168 | if (ctrl->callback_forward != NULL) { |
471 | // We are already in Respond() so we must not call it again | ||
472 | 7098 | const upload::UploaderResults fix_path(results.return_code, | |
473 |
1/2✓ Branch 1 taken 7098 times.
✗ Branch 2 not taken.
|
7098 | ctrl->original_path); |
474 |
1/2✓ Branch 1 taken 7098 times.
✗ Branch 2 not taken.
|
7098 | (*(ctrl->callback_forward))(fix_path); |
475 |
1/2✓ Branch 0 taken 7098 times.
✗ Branch 1 not taken.
|
7098 | delete ctrl->callback_forward; |
476 | 7098 | ctrl->callback_forward = NULL; | |
477 | 7098 | } | |
478 | 7168 | char c = 'c'; | |
479 |
1/2✓ Branch 1 taken 7168 times.
✗ Branch 2 not taken.
|
7168 | WritePipe(ctrl->pipe_wait[1], &c, 1); |
480 | 7168 | } | |
481 | |||
482 | |||
483 | 70 | bool S3Uploader::Peek(const std::string &path) { | |
484 |
2/4✓ Branch 1 taken 70 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 70 times.
✗ Branch 5 not taken.
|
70 | const std::string mangled_path = repository_alias_ + "/" + path; |
485 |
1/2✓ Branch 1 taken 70 times.
✗ Branch 2 not taken.
|
70 | s3fanout::JobInfo *info = CreateJobInfo(mangled_path); |
486 | |||
487 | 70 | RequestCtrl req_ctrl; | |
488 |
1/2✓ Branch 1 taken 70 times.
✗ Branch 2 not taken.
|
70 | MakePipe(req_ctrl.pipe_wait); |
489 | 70 | info->request = s3fanout::JobInfo::kReqHeadOnly; | |
490 | 70 | info->callback = const_cast<void *>(static_cast<void const *>( | |
491 |
1/2✓ Branch 1 taken 70 times.
✗ Branch 2 not taken.
|
70 | MakeClosure(&S3Uploader::OnReqComplete, this, &req_ctrl))); |
492 | |||
493 |
1/2✓ Branch 1 taken 70 times.
✗ Branch 2 not taken.
|
70 | IncJobsInFlight(); |
494 |
1/2✓ Branch 1 taken 70 times.
✗ Branch 2 not taken.
|
70 | UploadJobInfo(info); |
495 |
1/2✓ Branch 1 taken 70 times.
✗ Branch 2 not taken.
|
70 | req_ctrl.WaitFor(); |
496 | |||
497 | 70 | return req_ctrl.return_code == 0; | |
498 | 70 | } | |
499 | |||
500 | |||
501 | // noop: no mkdir needed in S3 storage | ||
502 | ✗ | bool S3Uploader::Mkdir(const std::string &path) { return true; } | |
503 | |||
504 | |||
505 | ✗ | bool S3Uploader::PlaceBootstrappingShortcut(const shash::Any &object) { | |
506 | ✗ | return false; // TODO(rmeusel): implement | |
507 | } | ||
508 | |||
509 | |||
510 | ✗ | int64_t S3Uploader::DoGetObjectSize(const std::string &file_name) { | |
511 | // TODO(dosarudaniel): use a head request for byte count | ||
512 | // Re-enable 661 integration test when done | ||
513 | ✗ | return -EOPNOTSUPP; | |
514 | } | ||
515 | |||
516 | } // namespace upload | ||
517 |