Directory: | cvmfs/ |
---|---|
File: | cvmfs/upload_s3.cc |
Date: | 2025-02-09 02:34:19 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 198 | 281 | 70.5% |
Branches: | 179 | 436 | 41.1% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #include "upload_s3.h" | ||
6 | |||
7 | #include <errno.h> | ||
8 | #include <fcntl.h> | ||
9 | #include <inttypes.h> | ||
10 | #include <unistd.h> | ||
11 | |||
12 | #include <string> | ||
13 | #include <vector> | ||
14 | |||
15 | #include "compression/compression.h" | ||
16 | #include "network/s3fanout.h" | ||
17 | #include "options.h" | ||
18 | #include "util/exception.h" | ||
19 | #include "util/logging.h" | ||
20 | #include "util/posix.h" | ||
21 | #include "util/string.h" | ||
22 | |||
23 | namespace upload { | ||
24 | |||
25 | /* | ||
26 | * Allowed values of x-amz-acl according to S3 API | ||
27 | */ | ||
28 | static const char* x_amz_acl_allowed_values_[8] = { | ||
29 | "private", | ||
30 | "public-read", | ||
31 | "public-write", | ||
32 | "authenticated-read", | ||
33 | "aws-exec-read", | ||
34 | "bucket-owner-read", | ||
35 | "bucket-owner-full-control", | ||
36 | "" | ||
37 | }; | ||
38 | |||
39 | 512 | void S3Uploader::RequestCtrl::WaitFor() { | |
40 | char c; | ||
41 |
1/2✓ Branch 1 taken 512 times.
✗ Branch 2 not taken.
|
512 | ReadPipe(pipe_wait[0], &c, 1); |
42 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 512 times.
|
512 | assert(c == 'c'); |
43 |
1/2✓ Branch 1 taken 512 times.
✗ Branch 2 not taken.
|
512 | ClosePipe(pipe_wait); |
44 | 512 | } | |
45 | |||
46 | |||
47 | 12 | S3Uploader::S3Uploader(const SpoolerDefinition &spooler_definition) | |
48 | : AbstractUploader(spooler_definition) | ||
49 | 12 | , dns_buckets_(true) | |
50 | 12 | , num_parallel_uploads_(kDefaultNumParallelUploads) | |
51 | 12 | , num_retries_(kDefaultNumRetries) | |
52 | 12 | , timeout_sec_(kDefaultTimeoutSec) | |
53 | 12 | , authz_method_(s3fanout::kAuthzAwsV2) | |
54 | 12 | , peek_before_put_(true) | |
55 | 12 | , use_https_(false) | |
56 |
1/2✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
|
12 | , proxy_("") |
57 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | , temporary_path_(spooler_definition.temporary_path) |
58 |
2/4✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 14 taken 12 times.
✗ Branch 15 not taken.
|
24 | , x_amz_acl_("public-read") |
59 | { | ||
60 |
2/4✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 12 times.
✗ Branch 4 not taken.
|
12 | assert(spooler_definition.IsValid() && |
61 | spooler_definition.driver_type == SpoolerDefinition::S3); | ||
62 | |||
63 | 12 | atomic_init32(&io_errors_); | |
64 | |||
65 |
2/4✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 12 times.
|
12 | if (!ParseSpoolerDefinition(spooler_definition)) { |
66 | ✗ | PANIC(kLogStderr, "Error in parsing the spooler definition"); | |
67 | } | ||
68 | |||
69 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | s3fanout::S3FanoutManager::S3Config s3config; |
70 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | s3config.access_key = access_key_; |
71 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | s3config.secret_key = secret_key_; |
72 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | s3config.hostname_port = host_name_port_; |
73 | 12 | s3config.authz_method = authz_method_; | |
74 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | s3config.region = region_; |
75 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | s3config.flavor = flavor_; |
76 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | s3config.bucket = bucket_; |
77 | 12 | s3config.dns_buckets = dns_buckets_; | |
78 | 12 | s3config.pool_max_handles = num_parallel_uploads_; | |
79 | 12 | s3config.opt_timeout_sec = timeout_sec_; | |
80 | 12 | s3config.opt_max_retries = num_retries_; | |
81 | 12 | s3config.opt_backoff_init_ms = kDefaultBackoffInitMs; | |
82 | 12 | s3config.opt_backoff_max_ms = kDefaultBackoffMaxMs; | |
83 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | s3config.x_amz_acl = x_amz_acl_; |
84 | |||
85 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
|
12 | if (use_https_) { |
86 | ✗ | s3config.protocol = "https"; | |
87 | } else { | ||
88 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | s3config.protocol = "http"; |
89 | } | ||
90 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | s3config.proxy = proxy_; |
91 | |||
92 |
3/6✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 12 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 12 times.
✗ Branch 8 not taken.
|
12 | s3fanout_mgr_ = new s3fanout::S3FanoutManager(s3config); |
93 |
1/2✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
|
12 | s3fanout_mgr_->Spawn(); |
94 | |||
95 | 12 | int retval = pthread_create( | |
96 | &thread_collect_results_, NULL, MainCollectResults, this); | ||
97 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
|
12 | assert(retval == 0); |
98 | 12 | } | |
99 | |||
100 | |||
101 | 72 | S3Uploader::~S3Uploader() { | |
102 | // Signal termination to our own worker thread | ||
103 | 24 | s3fanout_mgr_->PushCompletedJob(NULL); | |
104 | 24 | pthread_join(thread_collect_results_, NULL); | |
105 | 48 | } | |
106 | |||
107 | |||
108 | 12 | bool S3Uploader::ParseSpoolerDefinition( | |
109 | const SpoolerDefinition &spooler_definition) | ||
110 | { | ||
111 | const std::vector<std::string> config = | ||
112 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | SplitString(spooler_definition.spooler_configuration, '@'); |
113 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 12 times.
|
12 | if (config.size() != 2) { |
114 | ✗ | LogCvmfs(kLogUploadS3, kLogStderr, | |
115 | "Failed to parse spooler configuration string '%s'.\n" | ||
116 | "Provide: <repo_alias>@/path/to/s3.conf", | ||
117 | spooler_definition.spooler_configuration.c_str()); | ||
118 | ✗ | return false; | |
119 | } | ||
120 |
1/2✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
|
12 | repository_alias_ = config[0]; |
121 | 12 | const std::string &config_path = config[1]; | |
122 | |||
123 |
2/4✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 12 times.
|
12 | if (!FileExists(config_path)) { |
124 | ✗ | LogCvmfs(kLogUploadS3, kLogStderr, | |
125 | "Cannot find S3 config file at '%s'", | ||
126 | config_path.c_str()); | ||
127 | ✗ | return false; | |
128 | } | ||
129 | |||
130 | // Parse S3 configuration | ||
131 | BashOptionsManager options_manager = BashOptionsManager( | ||
132 |
4/8✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 12 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 12 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 12 times.
✗ Branch 11 not taken.
|
12 | new DefaultOptionsTemplateManager(repository_alias_)); |
133 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | options_manager.ParsePath(config_path, false); |
134 | 12 | std::string parameter; | |
135 | |||
136 |
3/6✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 12 times.
|
12 | if (!options_manager.GetValue("CVMFS_S3_HOST", &host_name_)) { |
137 | ✗ | LogCvmfs(kLogUploadS3, kLogStderr, | |
138 | "Failed to parse CVMFS_S3_HOST from '%s'", | ||
139 | config_path.c_str()); | ||
140 | ✗ | return false; | |
141 | } | ||
142 |
3/6✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 12 times.
|
12 | if (!options_manager.GetValue("CVMFS_S3_ACCESS_KEY", &access_key_)) { |
143 | ✗ | LogCvmfs(kLogUploadS3, kLogStderr, | |
144 | "Failed to parse CVMFS_S3_ACCESS_KEY from '%s'.", | ||
145 | config_path.c_str()); | ||
146 | ✗ | return false; | |
147 | } | ||
148 |
3/6✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 12 times.
|
12 | if (!options_manager.GetValue("CVMFS_S3_SECRET_KEY", &secret_key_)) { |
149 | ✗ | LogCvmfs(kLogUploadS3, kLogStderr, | |
150 | "Failed to parse CVMFS_S3_SECRET_KEY from '%s'.", | ||
151 | config_path.c_str()); | ||
152 | ✗ | return false; | |
153 | } | ||
154 |
3/6✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 12 times.
|
12 | if (!options_manager.GetValue("CVMFS_S3_BUCKET", &bucket_)) { |
155 | ✗ | LogCvmfs(kLogUploadS3, kLogStderr, | |
156 | "Failed to parse CVMFS_S3_BUCKET from '%s'.", | ||
157 | config_path.c_str()); | ||
158 | ✗ | return false; | |
159 | } | ||
160 |
3/6✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 12 times.
✗ Branch 10 not taken.
|
12 | if (options_manager.GetValue("CVMFS_S3_DNS_BUCKETS", ¶meter)) { |
161 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | if (parameter == "false") { |
162 | 12 | dns_buckets_ = false; | |
163 | } | ||
164 | } | ||
165 |
3/6✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 12 times.
✗ Branch 10 not taken.
|
12 | if (options_manager.GetValue("CVMFS_S3_MAX_NUMBER_OF_PARALLEL_CONNECTIONS", |
166 | ¶meter)) | ||
167 | { | ||
168 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | num_parallel_uploads_ = String2Uint64(parameter); |
169 | } | ||
170 |
3/6✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 12 times.
|
12 | if (options_manager.GetValue("CVMFS_S3_MAX_RETRIES", ¶meter)) { |
171 | ✗ | num_retries_ = String2Uint64(parameter); | |
172 | } | ||
173 |
3/6✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 12 times.
|
12 | if (options_manager.GetValue("CVMFS_S3_TIMEOUT", ¶meter)) { |
174 | ✗ | timeout_sec_ = String2Uint64(parameter); | |
175 | } | ||
176 |
3/6✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 12 times.
|
12 | if (options_manager.GetValue("CVMFS_S3_REGION", ®ion_)) { |
177 | ✗ | authz_method_ = s3fanout::kAuthzAwsV4; | |
178 | } | ||
179 |
3/6✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 12 times.
|
12 | if (options_manager.GetValue("CVMFS_S3_FLAVOR", &flavor_)) { |
180 | ✗ | if (flavor_ == "azure") { | |
181 | ✗ | authz_method_ = s3fanout::kAuthzAzure; | |
182 | ✗ | } else if (flavor_ == "awsv2") { | |
183 | ✗ | authz_method_ = s3fanout::kAuthzAwsV2; | |
184 | ✗ | } else if (flavor_ == "awsv4") { | |
185 | ✗ | authz_method_ = s3fanout::kAuthzAwsV4; | |
186 | } else { | ||
187 | ✗ | LogCvmfs(kLogUploadS3, kLogStderr, | |
188 | "Failed to parse CVMFS_S3_FLAVOR from '%s', " | ||
189 | "valid options are azure, awsv2 or awsv4", | ||
190 | config_path.c_str()); | ||
191 | ✗ | return false; | |
192 | } | ||
193 | } | ||
194 |
3/6✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 12 times.
|
12 | if (options_manager.GetValue("CVMFS_S3_PEEK_BEFORE_PUT", ¶meter)) { |
195 | ✗ | peek_before_put_ = options_manager.IsOn(parameter); | |
196 | } | ||
197 |
3/6✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 12 times.
|
12 | if (options_manager.GetValue("CVMFS_S3_X_AMZ_ACL", ¶meter)) { |
198 | ✗ | bool isAllowed = false; | |
199 | ✗ | size_t const len = sizeof(x_amz_acl_allowed_values_) / | |
200 | sizeof(x_amz_acl_allowed_values_[0]); | ||
201 | ✗ | for (size_t i = 0; i < len; i++) { | |
202 | ✗ | if (x_amz_acl_allowed_values_[i] == parameter) { | |
203 | ✗ | isAllowed = true; | |
204 | ✗ | break; | |
205 | } | ||
206 | } | ||
207 | ✗ | if (!isAllowed) { | |
208 | ✗ | LogCvmfs(kLogUploadS3, kLogStderr, | |
209 | "%s is not an allowed value for CVMFS_S3_X_AMZ_ACL", | ||
210 | parameter.c_str()); | ||
211 | ✗ | return false; | |
212 | } | ||
213 | ✗ | x_amz_acl_ = parameter; | |
214 | } | ||
215 | |||
216 |
3/6✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 12 times.
|
12 | if (options_manager.GetValue("CVMFS_S3_USE_HTTPS", ¶meter)) { |
217 | ✗ | use_https_ = options_manager.IsOn(parameter); | |
218 | } | ||
219 | |||
220 |
3/6✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 12 times.
✗ Branch 10 not taken.
|
12 | if (options_manager.GetValue("CVMFS_S3_PORT", ¶meter)) { |
221 |
2/4✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 12 times.
✗ Branch 5 not taken.
|
12 | host_name_port_ = host_name_ + ":" + parameter; |
222 | } else { | ||
223 | ✗ | host_name_port_ = host_name_; | |
224 | } | ||
225 | |||
226 |
3/6✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 12 times.
|
12 | if (options_manager.IsDefined("CVMFS_S3_PROXY")) { |
227 | ✗ | options_manager.GetValue("CVMFS_S3_PROXY", &proxy_); | |
228 | } | ||
229 | |||
230 | 12 | return true; | |
231 | 12 | } | |
232 | |||
233 | |||
234 | 13 | bool S3Uploader::WillHandle(const SpoolerDefinition &spooler_definition) { | |
235 | 13 | return spooler_definition.driver_type == SpoolerDefinition::S3; | |
236 | } | ||
237 | |||
238 | |||
239 | ✗ | bool S3Uploader::Create() { | |
240 | ✗ | if (!dns_buckets_) | |
241 | ✗ | return false; | |
242 | |||
243 | ✗ | s3fanout::JobInfo *info = CreateJobInfo(""); | |
244 | ✗ | info->request = s3fanout::JobInfo::kReqPutBucket; | |
245 | ✗ | std::string request_content; | |
246 | ✗ | if (!region_.empty()) { | |
247 | request_content = | ||
248 | ✗ | std::string("<CreateBucketConfiguration xmlns=" | |
249 | "\"http://s3.amazonaws.com/doc/2006-03-01/\">" | ||
250 | ✗ | "<LocationConstraint>") + region_ + "</LocationConstraint>" | |
251 | ✗ | "</CreateBucketConfiguration>"; | |
252 | ✗ | info->origin->Append(request_content.data(), request_content.length()); | |
253 | ✗ | info->origin->Commit(); | |
254 | } | ||
255 | |||
256 | ✗ | RequestCtrl req_ctrl; | |
257 | ✗ | MakePipe(req_ctrl.pipe_wait); | |
258 | ✗ | info->callback = const_cast<void*>(static_cast<void const*>(MakeClosure( | |
259 | ✗ | &S3Uploader::OnReqComplete, this, &req_ctrl))); | |
260 | |||
261 | ✗ | IncJobsInFlight(); | |
262 | ✗ | UploadJobInfo(info); | |
263 | ✗ | req_ctrl.WaitFor(); | |
264 | |||
265 | ✗ | return req_ctrl.return_code == 0; | |
266 | } | ||
267 | |||
268 | |||
269 | 1 | unsigned int S3Uploader::GetNumberOfErrors() const { | |
270 | 1 | return atomic_read32(&io_errors_); | |
271 | } | ||
272 | |||
273 | |||
274 | /** | ||
275 | * Worker thread takes care of requesting new jobs and cleaning old ones. | ||
276 | */ | ||
277 | 12 | void *S3Uploader::MainCollectResults(void *data) { | |
278 | 12 | LogCvmfs(kLogUploadS3, kLogDebug, "Upload_S3 WorkerThread started."); | |
279 | 12 | S3Uploader *uploader = reinterpret_cast<S3Uploader *>(data); | |
280 | |||
281 | while (true) { | ||
282 | 626 | s3fanout::JobInfo *info = uploader->s3fanout_mgr_->PopCompletedJob(); | |
283 |
2/2✓ Branch 0 taken 12 times.
✓ Branch 1 taken 614 times.
|
626 | if (!info) |
284 | 12 | break; | |
285 | // Report completed job | ||
286 | 614 | int reply_code = 0; | |
287 |
2/2✓ Branch 0 taken 2 times.
✓ Branch 1 taken 612 times.
|
614 | if (info->error_code != s3fanout::kFailOk) { |
288 |
1/2✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
|
2 | if ((info->request != s3fanout::JobInfo::kReqHeadOnly) || |
289 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
|
2 | (info->error_code != s3fanout::kFailNotFound)) { |
290 | ✗ | LogCvmfs(kLogUploadS3, kLogStderr, | |
291 | "Upload job for '%s' failed. (error code: %d - %s)", | ||
292 | info->object_key.c_str(), | ||
293 | ✗ | info->error_code, | |
294 | s3fanout::Code2Ascii(info->error_code)); | ||
295 | ✗ | reply_code = 99; | |
296 | ✗ | atomic_inc32(&uploader->io_errors_); | |
297 | } | ||
298 | } | ||
299 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 613 times.
|
614 | if (info->request == s3fanout::JobInfo::kReqDelete) { |
300 |
1/2✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
1 | uploader->Respond(NULL, UploaderResults()); |
301 |
2/2✓ Branch 0 taken 5 times.
✓ Branch 1 taken 608 times.
|
613 | } else if (info->request == s3fanout::JobInfo::kReqHeadOnly) { |
302 |
2/2✓ Branch 0 taken 2 times.
✓ Branch 1 taken 3 times.
|
5 | if (info->error_code == s3fanout::kFailNotFound) reply_code = 1; |
303 |
1/2✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
|
5 | uploader->Respond(static_cast<CallbackTN*>(info->callback), |
304 | 10 | UploaderResults(UploaderResults::kLookup, | |
305 | reply_code)); | ||
306 | } else { | ||
307 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 608 times.
|
608 | if (info->request == s3fanout::JobInfo::kReqHeadPut) { |
308 | // The HEAD request was not transformed into a PUT request, thus this | ||
309 | // was a duplicate | ||
310 | // Uploaded catalogs are always unique -> | ||
311 | // assume this was a regular file and decrease appropriate counters | ||
312 | ✗ | uploader->CountDuplicates(); | |
313 | ✗ | uploader->DecUploadedChunks(); | |
314 | ✗ | uploader->CountUploadedBytes(-(info->payload_size)); | |
315 | } | ||
316 |
1/2✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
|
608 | uploader->Respond(static_cast<CallbackTN*>(info->callback), |
317 | 1216 | UploaderResults(UploaderResults::kChunkCommit, | |
318 | reply_code)); | ||
319 | |||
320 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 608 times.
|
608 | assert(!info->origin.IsValid()); |
321 | } | ||
322 |
1/2✓ Branch 0 taken 614 times.
✗ Branch 1 not taken.
|
614 | delete info; |
323 | 614 | } | |
324 | |||
325 | 12 | LogCvmfs(kLogUploadS3, kLogDebug, "Upload_S3 WorkerThread finished."); | |
326 | 12 | return NULL; | |
327 | } | ||
328 | |||
329 | |||
330 | 507 | void S3Uploader::DoUpload( | |
331 | const std::string &remote_path, | ||
332 | IngestionSource *source, | ||
333 | const CallbackTN *callback | ||
334 | ) { | ||
335 |
1/2✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
|
507 | bool rvb = source->Open(); |
336 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 507 times.
|
507 | if (!rvb) { |
337 | ✗ | Respond(callback, UploaderResults(100, source->GetPath())); | |
338 | ✗ | return; | |
339 | } | ||
340 | uint64_t size; | ||
341 |
1/2✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
|
507 | rvb = source->GetSize(&size); |
342 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 507 times.
|
507 | assert(rvb); |
343 | |||
344 | FileBackedBuffer *origin = | ||
345 | 507 | FileBackedBuffer::Create(kInMemoryObjectThreshold, | |
346 |
1/2✓ Branch 2 taken 507 times.
✗ Branch 3 not taken.
|
507 | spooler_definition().temporary_path); |
347 | |||
348 | unsigned char buffer[kPageSize]; | ||
349 | ssize_t nbytes; | ||
350 | do { | ||
351 |
1/2✓ Branch 1 taken 199167 times.
✗ Branch 2 not taken.
|
199167 | nbytes = source->Read(buffer, kPageSize); |
352 |
3/4✓ Branch 0 taken 198831 times.
✓ Branch 1 taken 336 times.
✓ Branch 3 taken 198831 times.
✗ Branch 4 not taken.
|
199167 | if (nbytes > 0) origin->Append(buffer, nbytes); |
353 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 199167 times.
|
199167 | if (nbytes < 0) { |
354 | ✗ | source->Close(); | |
355 | ✗ | delete origin; | |
356 | ✗ | Respond(callback, UploaderResults(100, source->GetPath())); | |
357 | ✗ | return; | |
358 | } | ||
359 |
2/2✓ Branch 0 taken 198660 times.
✓ Branch 1 taken 507 times.
|
199167 | } while (nbytes == kPageSize); |
360 |
1/2✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
|
507 | source->Close(); |
361 |
1/2✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
|
507 | origin->Commit(); |
362 | |||
363 | s3fanout::JobInfo *info = | ||
364 |
2/4✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 507 times.
✗ Branch 5 not taken.
|
1014 | new s3fanout::JobInfo(repository_alias_ + "/" + remote_path, |
365 | const_cast<void*>( | ||
366 | static_cast<void const*>(callback)), | ||
367 |
2/4✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 507 times.
✗ Branch 5 not taken.
|
507 | origin); |
368 | |||
369 |
3/6✓ Branch 2 taken 507 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 507 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 507 times.
|
507 | if (HasPrefix(remote_path, ".cvmfs", false /*ignore_case*/)) { |
370 | ✗ | info->request = s3fanout::JobInfo::kReqPutDotCvmfs; | |
371 |
3/6✓ Branch 2 taken 507 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 507 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 507 times.
|
507 | } else if (HasSuffix(remote_path, ".html", false)) { |
372 | ✗ | info->request = s3fanout::JobInfo::kReqPutHtml; | |
373 | } else { | ||
374 |
1/2✓ Branch 0 taken 507 times.
✗ Branch 1 not taken.
|
507 | if (peek_before_put_) |
375 | 507 | info->request = s3fanout::JobInfo::kReqHeadPut; | |
376 | } | ||
377 | |||
378 | 507 | RequestCtrl req_ctrl; | |
379 |
1/2✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
|
507 | MakePipe(req_ctrl.pipe_wait); |
380 | 507 | req_ctrl.callback_forward = callback; | |
381 |
1/2✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
|
507 | req_ctrl.original_path = source->GetPath(); |
382 | 1014 | info->callback = const_cast<void*>(static_cast<void const*>(MakeClosure( | |
383 |
1/2✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
|
507 | &S3Uploader::OnReqComplete, this, &req_ctrl))); |
384 | |||
385 |
1/2✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
|
507 | UploadJobInfo(info); |
386 |
1/2✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
|
507 | req_ctrl.WaitFor(); |
387 |
1/2✓ Branch 2 taken 507 times.
✗ Branch 3 not taken.
|
507 | LogCvmfs(kLogUploadS3, kLogDebug, "Uploading from source finished: %s", |
388 |
1/2✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
|
1014 | source->GetPath().c_str()); |
389 | 507 | } | |
390 | |||
391 | |||
392 | 613 | void S3Uploader::UploadJobInfo(s3fanout::JobInfo *info) { | |
393 | 613 | LogCvmfs(kLogUploadS3, kLogDebug, | |
394 | "Uploading:\n" | ||
395 | "--> Object: '%s'\n" | ||
396 | "--> Bucket: '%s'\n" | ||
397 | "--> Host: '%s'\n", | ||
398 | info->object_key.c_str(), | ||
399 | bucket_.c_str(), | ||
400 | host_name_port_.c_str()); | ||
401 | |||
402 | 613 | s3fanout_mgr_->PushNewJob(info); | |
403 | 613 | } | |
404 | |||
405 | |||
406 | 101 | UploadStreamHandle *S3Uploader::InitStreamedUpload(const CallbackTN *callback) { | |
407 | return new S3StreamHandle(callback, kInMemoryObjectThreshold, | ||
408 |
1/2✓ Branch 3 taken 101 times.
✗ Branch 4 not taken.
|
101 | spooler_definition().temporary_path); |
409 | } | ||
410 | |||
411 | |||
412 | 802 | void S3Uploader::StreamedUpload( | |
413 | UploadStreamHandle *handle, | ||
414 | UploadBuffer buffer, | ||
415 | const CallbackTN *callback) | ||
416 | { | ||
417 | 802 | S3StreamHandle *s3_handle = static_cast<S3StreamHandle*>(handle); | |
418 | |||
419 | 802 | s3_handle->buffer->Append(buffer.data, buffer.size); | |
420 |
1/2✓ Branch 2 taken 802 times.
✗ Branch 3 not taken.
|
802 | Respond(callback, UploaderResults(UploaderResults::kBufferUpload, 0)); |
421 | 802 | } | |
422 | |||
423 | |||
424 | 101 | void S3Uploader::FinalizeStreamedUpload( | |
425 | UploadStreamHandle *handle, | ||
426 | const shash::Any &content_hash) | ||
427 | { | ||
428 | 101 | S3StreamHandle *s3_handle = static_cast<S3StreamHandle*>(handle); | |
429 | |||
430 | // New file name based on content hash or remote_path override | ||
431 | 101 | std::string final_path; | |
432 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 101 times.
|
101 | if (s3_handle->remote_path != "") { |
433 | ✗ | final_path = repository_alias_ + "/" + s3_handle->remote_path; | |
434 | } else { | ||
435 |
3/6✓ Branch 1 taken 101 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 101 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 101 times.
✗ Branch 8 not taken.
|
101 | final_path = repository_alias_ + "/data/" + content_hash.MakePath(); |
436 | } | ||
437 | |||
438 |
1/2✓ Branch 2 taken 101 times.
✗ Branch 3 not taken.
|
101 | s3_handle->buffer->Commit(); |
439 | |||
440 |
1/2✓ Branch 2 taken 101 times.
✗ Branch 3 not taken.
|
101 | size_t bytes_uploaded = s3_handle->buffer->GetSize(); |
441 | |||
442 | s3fanout::JobInfo *info = | ||
443 | new s3fanout::JobInfo(final_path, | ||
444 | const_cast<void*>( | ||
445 | static_cast<void const*>( | ||
446 | 101 | handle->commit_callback)), | |
447 |
2/4✓ Branch 2 taken 101 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 101 times.
✗ Branch 6 not taken.
|
101 | s3_handle->buffer.Release()); |
448 | |||
449 |
1/2✓ Branch 0 taken 101 times.
✗ Branch 1 not taken.
|
101 | if (peek_before_put_) |
450 | 101 | info->request = s3fanout::JobInfo::kReqHeadPut; | |
451 |
1/2✓ Branch 1 taken 101 times.
✗ Branch 2 not taken.
|
101 | UploadJobInfo(info); |
452 | |||
453 | // Remove the temporary file | ||
454 |
1/2✓ Branch 0 taken 101 times.
✗ Branch 1 not taken.
|
101 | delete s3_handle; |
455 | |||
456 | // Update statistics counters | ||
457 |
4/4✓ Branch 1 taken 1 times.
✓ Branch 2 taken 100 times.
✓ Branch 3 taken 100 times.
✓ Branch 4 taken 1 times.
|
102 | if (!content_hash.HasSuffix() || |
458 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | content_hash.suffix == shash::kSuffixPartial) { |
459 |
1/2✓ Branch 1 taken 100 times.
✗ Branch 2 not taken.
|
100 | CountUploadedChunks(); |
460 |
1/2✓ Branch 1 taken 100 times.
✗ Branch 2 not taken.
|
100 | CountUploadedBytes(bytes_uploaded); |
461 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | } else if (content_hash.suffix == shash::kSuffixCatalog) { |
462 | ✗ | CountUploadedCatalogs(); | |
463 | ✗ | CountUploadedCatalogBytes(bytes_uploaded); | |
464 | } | ||
465 | 101 | } | |
466 | |||
467 | |||
468 | 6 | s3fanout::JobInfo *S3Uploader::CreateJobInfo(const std::string& path) const { | |
469 |
2/4✓ Branch 2 taken 6 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 6 times.
✗ Branch 6 not taken.
|
6 | FileBackedBuffer *buf = FileBackedBuffer::Create(kInMemoryObjectThreshold); |
470 |
1/2✓ Branch 2 taken 6 times.
✗ Branch 3 not taken.
|
6 | return new s3fanout::JobInfo(path, NULL, buf); |
471 | } | ||
472 | |||
473 | |||
474 | 1 | void S3Uploader::DoRemoveAsync(const std::string& file_to_delete) { | |
475 |
2/4✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
|
1 | const std::string mangled_path = repository_alias_ + "/" + file_to_delete; |
476 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | s3fanout::JobInfo *info = CreateJobInfo(mangled_path); |
477 | |||
478 | 1 | info->request = s3fanout::JobInfo::kReqDelete; | |
479 | |||
480 |
1/2✓ Branch 3 taken 1 times.
✗ Branch 4 not taken.
|
1 | LogCvmfs(kLogUploadS3, kLogDebug, "Asynchronously removing %s/%s", |
481 | bucket_.c_str(), info->object_key.c_str()); | ||
482 |
1/2✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
1 | s3fanout_mgr_->PushNewJob(info); |
483 | 1 | } | |
484 | |||
485 | |||
486 | 512 | void S3Uploader::OnReqComplete( | |
487 | const upload::UploaderResults &results, | ||
488 | RequestCtrl *ctrl) | ||
489 | { | ||
490 | 512 | ctrl->return_code = results.return_code; | |
491 |
2/2✓ Branch 0 taken 507 times.
✓ Branch 1 taken 5 times.
|
512 | if (ctrl->callback_forward != NULL) { |
492 | // We are already in Respond() so we must not call it again | ||
493 |
1/2✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
|
507 | upload::UploaderResults fix_path(results.return_code, ctrl->original_path); |
494 |
1/2✓ Branch 1 taken 507 times.
✗ Branch 2 not taken.
|
507 | (*(ctrl->callback_forward))(fix_path); |
495 |
1/2✓ Branch 0 taken 507 times.
✗ Branch 1 not taken.
|
507 | delete ctrl->callback_forward; |
496 | 507 | ctrl->callback_forward = NULL; | |
497 | 507 | } | |
498 | 512 | char c = 'c'; | |
499 |
1/2✓ Branch 1 taken 512 times.
✗ Branch 2 not taken.
|
512 | WritePipe(ctrl->pipe_wait[1], &c, 1); |
500 | 512 | } | |
501 | |||
502 | |||
503 | 5 | bool S3Uploader::Peek(const std::string& path) { | |
504 |
2/4✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 5 times.
✗ Branch 5 not taken.
|
5 | const std::string mangled_path = repository_alias_ + "/" + path; |
505 |
1/2✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
|
5 | s3fanout::JobInfo *info = CreateJobInfo(mangled_path); |
506 | |||
507 | 5 | RequestCtrl req_ctrl; | |
508 |
1/2✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
|
5 | MakePipe(req_ctrl.pipe_wait); |
509 | 5 | info->request = s3fanout::JobInfo::kReqHeadOnly; | |
510 | 10 | info->callback = const_cast<void*>(static_cast<void const*>(MakeClosure( | |
511 |
1/2✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
|
5 | &S3Uploader::OnReqComplete, this, &req_ctrl))); |
512 | |||
513 |
1/2✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
|
5 | IncJobsInFlight(); |
514 |
1/2✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
|
5 | UploadJobInfo(info); |
515 |
1/2✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
|
5 | req_ctrl.WaitFor(); |
516 | |||
517 | 5 | return req_ctrl.return_code == 0; | |
518 | 5 | } | |
519 | |||
520 | |||
521 | // noop: no mkdir needed in S3 storage | ||
522 | ✗ | bool S3Uploader::Mkdir(const std::string &path) { | |
523 | ✗ | return true; | |
524 | } | ||
525 | |||
526 | |||
527 | ✗ | bool S3Uploader::PlaceBootstrappingShortcut(const shash::Any &object) { | |
528 | ✗ | return false; // TODO(rmeusel): implement | |
529 | } | ||
530 | |||
531 | |||
532 | ✗ | int64_t S3Uploader::DoGetObjectSize(const std::string &file_name) { | |
533 | // TODO(dosarudaniel): use a head request for byte count | ||
534 | // Re-enable 661 integration test when done | ||
535 | ✗ | return -EOPNOTSUPP; | |
536 | } | ||
537 | |||
538 | } // namespace upload | ||
539 |