Directory: | cvmfs/ |
---|---|
File: | cvmfs/network/s3fanout.cc |
Date: | 2025-09-28 02:35:26 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 573 | 813 | 70.5% |
Branches: | 412 | 1179 | 34.9% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | * | ||
4 | * Runs a thread using libcurls asynchronous I/O mode to push data to S3 | ||
5 | */ | ||
6 | |||
7 | #include "s3fanout.h" | ||
8 | |||
9 | #include <pthread.h> | ||
10 | |||
11 | #include <algorithm> | ||
12 | #include <cassert> | ||
13 | #include <cerrno> | ||
14 | #include <utility> | ||
15 | |||
16 | #include "upload_facility.h" | ||
17 | #include "util/concurrency.h" | ||
18 | #include "util/exception.h" | ||
19 | #include "util/platform.h" | ||
20 | #include "util/posix.h" | ||
21 | #include "util/string.h" | ||
22 | |||
23 | using namespace std; // NOLINT | ||
24 | |||
25 | namespace s3fanout { | ||
26 | |||
27 | const char *S3FanoutManager::kCacheControlCas = "Cache-Control: max-age=259200"; | ||
28 | const unsigned S3FanoutManager::kDefault429ThrottleMs = 250; | ||
29 | const unsigned S3FanoutManager::kMax429ThrottleMs = 10000; | ||
30 | const unsigned S3FanoutManager::kThrottleReportIntervalSec = 10; | ||
31 | const unsigned S3FanoutManager::kDefaultHTTPPort = 80; | ||
32 | const unsigned S3FanoutManager::kDefaultHTTPSPort = 443; | ||
33 | |||
34 | 12 | std::string S3FanoutManager::MkDotCvmfsCacheControlHeader(unsigned defaultMaxAge, int overrideMaxAge) | |
35 | { | ||
36 | const char *var; | ||
37 | int max_age_sec; | ||
38 | char *at_null_terminator_if_number; | ||
39 | 12 | bool value_determined = false; | |
40 | |||
41 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
|
12 | if (overrideMaxAge >= 0) { |
42 | ✗ | max_age_sec = overrideMaxAge; | |
43 | ✗ | value_determined = true; | |
44 | } | ||
45 | |||
46 |
1/2✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
|
12 | if (!value_determined) { |
47 | 12 | var = getenv("CVMFS_MAX_TTL_SECS"); | |
48 |
1/4✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
|
12 | if (var && var[0]) { |
49 | ✗ | max_age_sec = strtoll(var, &at_null_terminator_if_number, 10); | |
50 | ✗ | if (*at_null_terminator_if_number == '\0' | |
51 | ✗ | && max_age_sec >= 0) { | |
52 | ✗ | value_determined = true; | |
53 | } | ||
54 | } | ||
55 | } | ||
56 | |||
57 |
1/2✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
|
12 | if (!value_determined) { |
58 | 12 | var = getenv("CVMFS_MAX_TTL"); | |
59 |
1/4✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
|
12 | if (var && var[0]) { |
60 | ✗ | max_age_sec = strtoll(var, &at_null_terminator_if_number, 10); | |
61 | ✗ | if (*at_null_terminator_if_number == '\0' | |
62 | ✗ | && max_age_sec >= 0) { | |
63 | ✗ | max_age_sec *= 60; | |
64 | ✗ | value_determined = true; | |
65 | } | ||
66 | } | ||
67 | } | ||
68 | |||
69 |
1/2✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
|
12 | if (!value_determined) { |
70 | 12 | max_age_sec = defaultMaxAge; | |
71 | } | ||
72 | |||
73 |
2/4✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 12 times.
✗ Branch 5 not taken.
|
24 | return "Cache-Control: max-age=" + std::to_string(max_age_sec); |
74 | } | ||
75 | |||
76 | /** | ||
77 | * Parses Retry-After and X-Retry-In headers attached to HTTP 429 responses | ||
78 | */ | ||
79 | 558 | void S3FanoutManager::DetectThrottleIndicator(const std::string &header, | |
80 | JobInfo *info) { | ||
81 | 558 | std::string value_str; | |
82 |
4/6✓ Branch 2 taken 558 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 558 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 277 times.
✓ Branch 10 taken 281 times.
|
558 | if (HasPrefix(header, "retry-after:", true)) |
83 |
1/2✓ Branch 1 taken 277 times.
✗ Branch 2 not taken.
|
277 | value_str = header.substr(12); |
84 |
4/6✓ Branch 2 taken 558 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 558 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 195 times.
✓ Branch 10 taken 363 times.
|
558 | if (HasPrefix(header, "x-retry-in:", true)) |
85 |
1/2✓ Branch 1 taken 195 times.
✗ Branch 2 not taken.
|
195 | value_str = header.substr(11); |
86 | |||
87 |
1/2✓ Branch 1 taken 558 times.
✗ Branch 2 not taken.
|
558 | value_str = Trim(value_str, true /* trim_newline */); |
88 |
2/2✓ Branch 1 taken 394 times.
✓ Branch 2 taken 164 times.
|
558 | if (!value_str.empty()) { |
89 |
1/2✓ Branch 1 taken 394 times.
✗ Branch 2 not taken.
|
394 | const unsigned value_numeric = String2Uint64(value_str); |
90 |
2/4✓ Branch 2 taken 394 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 394 times.
✗ Branch 6 not taken.
|
788 | const unsigned value_ms = HasSuffix(value_str, "ms", true /* ignore_case */) |
91 |
2/2✓ Branch 0 taken 195 times.
✓ Branch 1 taken 199 times.
|
394 | ? value_numeric |
92 | 394 | : (value_numeric * 1000); | |
93 |
2/2✓ Branch 0 taken 355 times.
✓ Branch 1 taken 39 times.
|
394 | if (value_ms > 0) |
94 | 355 | info->throttle_ms = std::min(value_ms, kMax429ThrottleMs); | |
95 | } | ||
96 | 558 | } | |
97 | |||
98 | |||
99 | /** | ||
100 | * Called by curl for every HTTP header. Not called for file:// transfers. | ||
101 | */ | ||
102 | 3682 | static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, | |
103 | void *info_link) { | ||
104 | 3682 | const size_t num_bytes = size * nmemb; | |
105 |
1/2✓ Branch 2 taken 3682 times.
✗ Branch 3 not taken.
|
3682 | const string header_line(static_cast<const char *>(ptr), num_bytes); |
106 | 3682 | JobInfo *info = static_cast<JobInfo *>(info_link); | |
107 | |||
108 | // Check for http status code errors | ||
109 |
4/6✓ Branch 2 taken 3682 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 3682 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 1226 times.
✓ Branch 10 taken 2456 times.
|
3682 | if (HasPrefix(header_line, "HTTP/1.", false)) { |
110 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 1226 times.
|
1226 | if (header_line.length() < 10) |
111 | ✗ | return 0; | |
112 | |||
113 | unsigned i; | ||
114 |
5/6✓ Branch 1 taken 2452 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1226 times.
✓ Branch 5 taken 1226 times.
✓ Branch 6 taken 1226 times.
✓ Branch 7 taken 1226 times.
|
2452 | for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) { |
115 | } | ||
116 | |||
117 |
2/2✓ Branch 1 taken 612 times.
✓ Branch 2 taken 614 times.
|
1226 | if (header_line[i] == '2') { |
118 | 612 | return num_bytes; | |
119 | } else { | ||
120 |
1/2✓ Branch 2 taken 614 times.
✗ Branch 3 not taken.
|
614 | LogCvmfs(kLogS3Fanout, kLogDebug, "http status error code [info %p]: %s", |
121 | info, header_line.c_str()); | ||
122 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 614 times.
|
614 | if (header_line.length() < i + 3) { |
123 | ✗ | LogCvmfs(kLogS3Fanout, kLogStderr, "S3: invalid HTTP response '%s'", | |
124 | header_line.c_str()); | ||
125 | ✗ | info->error_code = kFailOther; | |
126 | ✗ | return 0; | |
127 | } | ||
128 |
2/4✓ Branch 3 taken 614 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 614 times.
✗ Branch 7 not taken.
|
614 | info->http_error = String2Int64(string(&header_line[i], 3)); |
129 | |||
130 |
2/6✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 610 times.
✗ Branch 5 not taken.
|
614 | switch (info->http_error) { |
131 | 4 | case 429: | |
132 | 4 | info->error_code = kFailRetry; | |
133 | 4 | info->throttle_ms = S3FanoutManager::kDefault429ThrottleMs; | |
134 | 4 | info->throttle_timestamp = platform_monotonic_time(); | |
135 | 4 | return num_bytes; | |
136 | ✗ | case 503: | |
137 | case 502: // Can happen if the S3 gateway-backend connection breaks | ||
138 | case 500: // sometimes see this as a transient error from S3 | ||
139 | ✗ | info->error_code = kFailServiceUnavailable; | |
140 | ✗ | break; | |
141 | ✗ | case 501: | |
142 | case 400: | ||
143 | ✗ | info->error_code = kFailBadRequest; | |
144 | ✗ | break; | |
145 | ✗ | case 403: | |
146 | ✗ | info->error_code = kFailForbidden; | |
147 | ✗ | break; | |
148 | 610 | case 404: | |
149 | 610 | info->error_code = kFailNotFound; | |
150 | 610 | return num_bytes; | |
151 | ✗ | default: | |
152 | ✗ | info->error_code = kFailOther; | |
153 | } | ||
154 | ✗ | return 0; | |
155 | } | ||
156 | } | ||
157 | |||
158 |
2/2✓ Branch 0 taken 12 times.
✓ Branch 1 taken 2444 times.
|
2456 | if (info->error_code == kFailRetry) { |
159 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | S3FanoutManager::DetectThrottleIndicator(header_line, info); |
160 | } | ||
161 | |||
162 | 2456 | return num_bytes; | |
163 | 3682 | } | |
164 | |||
165 | |||
166 | /** | ||
167 | * Called by curl for every new chunk to upload. | ||
168 | */ | ||
169 | 15669 | static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, | |
170 | void *info_link) { | ||
171 | 15669 | const size_t num_bytes = size * nmemb; | |
172 | 15669 | JobInfo *info = static_cast<JobInfo *>(info_link); | |
173 | |||
174 | 15669 | LogCvmfs(kLogS3Fanout, kLogDebug, "Data callback with %zu bytes", num_bytes); | |
175 | |||
176 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 15669 times.
|
15669 | if (num_bytes == 0) |
177 | ✗ | return 0; | |
178 | |||
179 | 15669 | const uint64_t read_bytes = info->origin->Read(ptr, num_bytes); | |
180 | |||
181 | 15669 | LogCvmfs(kLogS3Fanout, kLogDebug, "source buffer pushed out %lu bytes", | |
182 | read_bytes); | ||
183 | |||
184 | 15669 | return read_bytes; | |
185 | } | ||
186 | |||
187 | |||
188 | /** | ||
189 | * For the time being, ignore all received information in the HTTP body | ||
190 | */ | ||
191 | ✗ | static size_t CallbackCurlBody(char * /*ptr*/, size_t size, size_t nmemb, | |
192 | void * /*userdata*/) { | ||
193 | ✗ | return size * nmemb; | |
194 | } | ||
195 | |||
196 | |||
197 | /** | ||
198 | * Called when new curl sockets arrive or existing curl sockets depart. | ||
199 | */ | ||
200 | 2720 | int S3FanoutManager::CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, | |
201 | void *userp, void *socketp) { | ||
202 | 2720 | S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(userp); | |
203 | 2720 | LogCvmfs(kLogS3Fanout, kLogDebug, | |
204 | "CallbackCurlSocket called with easy " | ||
205 | "handle %p, socket %d, action %d, up %p, " | ||
206 | "sp %p, fds_inuse %d, jobs %d", | ||
207 | easy, s, action, userp, socketp, s3fanout_mgr->watch_fds_inuse_, | ||
208 | 2720 | s3fanout_mgr->available_jobs_->Get()); | |
209 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2720 times.
|
2720 | if (action == CURL_POLL_NONE) |
210 | ✗ | return 0; | |
211 | |||
212 | // Find s in watch_fds_ | ||
213 | // First 2 fds are job and terminate pipes (not curl related) | ||
214 | unsigned index; | ||
215 |
2/2✓ Branch 0 taken 4504 times.
✓ Branch 1 taken 1225 times.
|
5729 | for (index = 2; index < s3fanout_mgr->watch_fds_inuse_; ++index) { |
216 |
2/2✓ Branch 0 taken 1495 times.
✓ Branch 1 taken 3009 times.
|
4504 | if (s3fanout_mgr->watch_fds_[index].fd == s) |
217 | 1495 | break; | |
218 | } | ||
219 | // Or create newly | ||
220 |
2/2✓ Branch 0 taken 1225 times.
✓ Branch 1 taken 1495 times.
|
2720 | if (index == s3fanout_mgr->watch_fds_inuse_) { |
221 | // Extend array if necessary | ||
222 |
2/2✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1223 times.
|
1225 | if (s3fanout_mgr->watch_fds_inuse_ == s3fanout_mgr->watch_fds_size_) { |
223 | 2 | s3fanout_mgr->watch_fds_size_ *= 2; | |
224 | 2 | s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>( | |
225 | 2 | srealloc(s3fanout_mgr->watch_fds_, | |
226 | 2 | s3fanout_mgr->watch_fds_size_ * sizeof(struct pollfd))); | |
227 | } | ||
228 | 1225 | s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].fd = s; | |
229 | 1225 | s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].events = 0; | |
230 | 1225 | s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].revents = 0; | |
231 | 1225 | s3fanout_mgr->watch_fds_inuse_++; | |
232 | } | ||
233 | |||
234 |
4/5✓ Branch 0 taken 1225 times.
✓ Branch 1 taken 6 times.
✓ Branch 2 taken 264 times.
✓ Branch 3 taken 1225 times.
✗ Branch 4 not taken.
|
2720 | switch (action) { |
235 | 1225 | case CURL_POLL_IN: | |
236 | 1225 | s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI; | |
237 | 1225 | break; | |
238 | 6 | case CURL_POLL_OUT: | |
239 | 6 | s3fanout_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND; | |
240 | 6 | break; | |
241 | 264 | case CURL_POLL_INOUT: | |
242 | 264 | s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI | POLLOUT | |
243 | | POLLWRBAND; | ||
244 | 264 | break; | |
245 | 1225 | case CURL_POLL_REMOVE: | |
246 |
2/2✓ Branch 0 taken 193 times.
✓ Branch 1 taken 1032 times.
|
1225 | if (index < s3fanout_mgr->watch_fds_inuse_ - 1) |
247 | s3fanout_mgr | ||
248 | 193 | ->watch_fds_[index] = s3fanout_mgr->watch_fds_ | |
249 | 193 | [s3fanout_mgr->watch_fds_inuse_ - 1]; | |
250 | 1225 | s3fanout_mgr->watch_fds_inuse_--; | |
251 | // Shrink array if necessary | ||
252 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1225 times.
|
1225 | if ((s3fanout_mgr->watch_fds_inuse_ > s3fanout_mgr->watch_fds_max_) |
253 | ✗ | && (s3fanout_mgr->watch_fds_inuse_ | |
254 | ✗ | < s3fanout_mgr->watch_fds_size_ / 2)) { | |
255 | ✗ | s3fanout_mgr->watch_fds_size_ /= 2; | |
256 | ✗ | s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>( | |
257 | ✗ | srealloc(s3fanout_mgr->watch_fds_, | |
258 | ✗ | s3fanout_mgr->watch_fds_size_ * sizeof(struct pollfd))); | |
259 | } | ||
260 | 1225 | break; | |
261 | ✗ | default: | |
262 | ✗ | PANIC(NULL); | |
263 | } | ||
264 | |||
265 | 2720 | return 0; | |
266 | } | ||
267 | |||
268 | |||
269 | /** | ||
270 | * Worker thread event loop. | ||
271 | */ | ||
272 | 12 | void *S3FanoutManager::MainUpload(void *data) { | |
273 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread started"); |
274 | 12 | S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(data); | |
275 | |||
276 | 12 | s3fanout_mgr->InitPipeWatchFds(); | |
277 | |||
278 | // Don't schedule more jobs into the multi handle than the maximum number of | ||
279 | // parallel connections. This should prevent starvation and thus a timeout | ||
280 | // of the authorization header (CVM-1339). | ||
281 | 12 | unsigned jobs_in_flight = 0; | |
282 | |||
283 | while (true) { | ||
284 | // Check events with 100ms timeout | ||
285 | 16755 | const int timeout_ms = 100; | |
286 |
1/2✓ Branch 1 taken 16755 times.
✗ Branch 2 not taken.
|
16755 | int retval = poll(s3fanout_mgr->watch_fds_, s3fanout_mgr->watch_fds_inuse_, |
287 | timeout_ms); | ||
288 |
2/2✓ Branch 0 taken 102 times.
✓ Branch 1 taken 16653 times.
|
16755 | if (retval == 0) { |
289 | // Handle timeout | ||
290 | 102 | int still_running = 0; | |
291 |
1/2✓ Branch 1 taken 102 times.
✗ Branch 2 not taken.
|
102 | retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_, |
292 | CURL_SOCKET_TIMEOUT, 0, &still_running); | ||
293 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 102 times.
|
102 | if (retval != CURLM_OK) { |
294 | ✗ | LogCvmfs(kLogS3Fanout, kLogStderr, "Error, timeout due to: %d", retval); | |
295 | ✗ | assert(retval == CURLM_OK); | |
296 | } | ||
297 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 16653 times.
|
16653 | } else if (retval < 0) { |
298 | ✗ | assert(errno == EINTR); | |
299 | ✗ | continue; | |
300 | } | ||
301 | |||
302 | // Terminate I/O thread | ||
303 |
2/2✓ Branch 0 taken 12 times.
✓ Branch 1 taken 16743 times.
|
16755 | if (s3fanout_mgr->watch_fds_[0].revents) |
304 | 12 | break; | |
305 | |||
306 | // New job incoming | ||
307 |
2/2✓ Branch 0 taken 614 times.
✓ Branch 1 taken 16129 times.
|
16743 | if (s3fanout_mgr->watch_fds_[1].revents) { |
308 | 614 | s3fanout_mgr->watch_fds_[1].revents = 0; | |
309 | JobInfo *info; | ||
310 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | ReadPipe(s3fanout_mgr->pipe_jobs_[0], &info, sizeof(info)); |
311 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | CURL *handle = s3fanout_mgr->AcquireCurlHandle(); |
312 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 614 times.
|
614 | if (handle == NULL) { |
313 | ✗ | PANIC(kLogStderr, "Failed to acquire CURL handle."); | |
314 | } | ||
315 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | const s3fanout::Failures init_failure = s3fanout_mgr->InitializeRequest( |
316 | info, handle); | ||
317 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 614 times.
|
614 | if (init_failure != s3fanout::kFailOk) { |
318 | ✗ | PANIC(kLogStderr, | |
319 | "Failed to initialize CURL handle (error: %d - %s | errno: %d)", | ||
320 | init_failure, Code2Ascii(init_failure), errno); | ||
321 | } | ||
322 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | s3fanout_mgr->SetUrlOptions(info); |
323 | |||
324 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | curl_multi_add_handle(s3fanout_mgr->curl_multi_, handle); |
325 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | s3fanout_mgr->active_requests_->insert(info); |
326 | 614 | jobs_in_flight++; | |
327 | 614 | int still_running = 0, retval = 0; | |
328 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_, |
329 | CURL_SOCKET_TIMEOUT, 0, &still_running); | ||
330 | |||
331 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | LogCvmfs(kLogS3Fanout, kLogDebug, "curl_multi_socket_action: %d - %d", |
332 | retval, still_running); | ||
333 | } | ||
334 | |||
335 | |||
336 | // Activity on curl sockets | ||
337 | // Within this loop the curl_multi_socket_action() may cause socket(s) | ||
338 | // to be removed from watch_fds_. If a socket is removed it is replaced | ||
339 | // by the socket at the end of the array and the inuse count is decreased. | ||
340 | // Therefore loop over the array in reverse order. | ||
341 | // First 2 fds are job and terminate pipes (not curl related) | ||
342 |
2/2✓ Branch 0 taken 38228 times.
✓ Branch 1 taken 16743 times.
|
54971 | for (int32_t i = s3fanout_mgr->watch_fds_inuse_ - 1; i >= 2; --i) { |
343 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 38228 times.
|
38228 | if (static_cast<uint32_t>(i) >= s3fanout_mgr->watch_fds_inuse_) { |
344 | ✗ | continue; | |
345 | } | ||
346 |
2/2✓ Branch 0 taken 17068 times.
✓ Branch 1 taken 21160 times.
|
38228 | if (s3fanout_mgr->watch_fds_[i].revents) { |
347 | 17068 | int ev_bitmask = 0; | |
348 |
2/2✓ Branch 0 taken 1833 times.
✓ Branch 1 taken 15235 times.
|
17068 | if (s3fanout_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI)) |
349 | 1833 | ev_bitmask |= CURL_CSELECT_IN; | |
350 |
2/2✓ Branch 0 taken 15235 times.
✓ Branch 1 taken 1833 times.
|
17068 | if (s3fanout_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND)) |
351 | 15235 | ev_bitmask |= CURL_CSELECT_OUT; | |
352 | 17068 | if (s3fanout_mgr->watch_fds_[i].revents | |
353 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 17068 times.
|
17068 | & (POLLERR | POLLHUP | POLLNVAL)) |
354 | ✗ | ev_bitmask |= CURL_CSELECT_ERR; | |
355 | 17068 | s3fanout_mgr->watch_fds_[i].revents = 0; | |
356 | |||
357 | 17068 | int still_running = 0; | |
358 | 17068 | retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_, | |
359 |
1/2✓ Branch 1 taken 17068 times.
✗ Branch 2 not taken.
|
17068 | s3fanout_mgr->watch_fds_[i].fd, |
360 | ev_bitmask, | ||
361 | &still_running); | ||
362 | } | ||
363 | } | ||
364 | |||
365 | // Check if transfers are completed | ||
366 | CURLMsg *curl_msg; | ||
367 | int msgs_in_queue; | ||
368 |
3/4✓ Branch 1 taken 17969 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1226 times.
✓ Branch 4 taken 16743 times.
|
17969 | while ((curl_msg = curl_multi_info_read(s3fanout_mgr->curl_multi_, |
369 | &msgs_in_queue))) { | ||
370 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1226 times.
|
1226 | assert(curl_msg->msg == CURLMSG_DONE); |
371 | |||
372 | 1226 | s3fanout_mgr->statistics_->num_requests++; | |
373 | JobInfo *info; | ||
374 | 1226 | CURL *easy_handle = curl_msg->easy_handle; | |
375 | 1226 | const int curl_error = curl_msg->data.result; | |
376 |
1/2✓ Branch 1 taken 1226 times.
✗ Branch 2 not taken.
|
1226 | curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info); |
377 | |||
378 |
1/2✓ Branch 1 taken 1226 times.
✗ Branch 2 not taken.
|
1226 | curl_multi_remove_handle(s3fanout_mgr->curl_multi_, easy_handle); |
379 |
3/4✓ Branch 1 taken 1226 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 612 times.
✓ Branch 4 taken 614 times.
|
1226 | if (s3fanout_mgr->VerifyAndFinalize(curl_error, info)) { |
380 |
1/2✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
|
612 | curl_multi_add_handle(s3fanout_mgr->curl_multi_, easy_handle); |
381 | 612 | int still_running = 0; | |
382 |
1/2✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
|
612 | curl_multi_socket_action(s3fanout_mgr->curl_multi_, CURL_SOCKET_TIMEOUT, |
383 | 0, &still_running); | ||
384 | } else { | ||
385 | // Return easy handle into pool and write result back | ||
386 | 614 | jobs_in_flight--; | |
387 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | s3fanout_mgr->active_requests_->erase(info); |
388 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | s3fanout_mgr->ReleaseCurlHandle(info, easy_handle); |
389 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | s3fanout_mgr->available_jobs_->Decrement(); |
390 | |||
391 | // Add to list of completed jobs | ||
392 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | s3fanout_mgr->PushCompletedJob(info); |
393 | } | ||
394 | } | ||
395 | 16743 | } | |
396 | |||
397 | 12 | set<CURL *>::iterator i = s3fanout_mgr->pool_handles_inuse_->begin(); | |
398 | 12 | const set<CURL *>::const_iterator i_end = s3fanout_mgr->pool_handles_inuse_ | |
399 | 12 | ->end(); | |
400 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 12 times.
|
12 | for (; i != i_end; ++i) { |
401 | ✗ | curl_multi_remove_handle(s3fanout_mgr->curl_multi_, *i); | |
402 | ✗ | curl_easy_cleanup(*i); | |
403 | } | ||
404 | 12 | s3fanout_mgr->pool_handles_inuse_->clear(); | |
405 | 12 | free(s3fanout_mgr->watch_fds_); | |
406 | |||
407 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread terminated"); |
408 | 12 | return NULL; | |
409 | } | ||
410 | |||
411 | |||
412 | /** | ||
413 | * Gets an idle CURL handle from the pool. Creates a new one and adds it to | ||
414 | * the pool if necessary. | ||
415 | */ | ||
416 | 614 | CURL *S3FanoutManager::AcquireCurlHandle() const { | |
417 | CURL *handle; | ||
418 | |||
419 | 614 | const MutexLockGuard guard(curl_handle_lock_); | |
420 | |||
421 |
2/2✓ Branch 1 taken 49 times.
✓ Branch 2 taken 565 times.
|
614 | if (pool_handles_idle_->empty()) { |
422 | CURLcode retval; | ||
423 | |||
424 | // Create a new handle | ||
425 |
1/2✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
|
49 | handle = curl_easy_init(); |
426 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 49 times.
|
49 | assert(handle != NULL); |
427 | |||
428 | // Other settings | ||
429 |
1/2✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
|
49 | retval = curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1); |
430 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 49 times.
|
49 | assert(retval == CURLE_OK); |
431 |
1/2✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
|
49 | retval = curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION, |
432 | CallbackCurlHeader); | ||
433 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 49 times.
|
49 | assert(retval == CURLE_OK); |
434 |
1/2✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
|
49 | retval = curl_easy_setopt(handle, CURLOPT_READFUNCTION, CallbackCurlData); |
435 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 49 times.
|
49 | assert(retval == CURLE_OK); |
436 |
1/2✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
|
49 | retval = curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlBody); |
437 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 49 times.
|
49 | assert(retval == CURLE_OK); |
438 | } else { | ||
439 | 565 | handle = *(pool_handles_idle_->begin()); | |
440 |
1/2✓ Branch 2 taken 565 times.
✗ Branch 3 not taken.
|
565 | pool_handles_idle_->erase(pool_handles_idle_->begin()); |
441 | } | ||
442 | |||
443 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | pool_handles_inuse_->insert(handle); |
444 | |||
445 | 614 | return handle; | |
446 | 614 | } | |
447 | |||
448 | |||
449 | 614 | void S3FanoutManager::ReleaseCurlHandle(JobInfo *info, CURL *handle) const { | |
450 |
1/2✓ Branch 0 taken 614 times.
✗ Branch 1 not taken.
|
614 | if (info->http_headers) { |
451 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | curl_slist_free_all(info->http_headers); |
452 | 614 | info->http_headers = NULL; | |
453 | } | ||
454 | |||
455 | 614 | const MutexLockGuard guard(curl_handle_lock_); | |
456 | |||
457 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | const set<CURL *>::iterator elem = pool_handles_inuse_->find(handle); |
458 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 614 times.
|
614 | assert(elem != pool_handles_inuse_->end()); |
459 | |||
460 |
2/2✓ Branch 1 taken 29 times.
✓ Branch 2 taken 585 times.
|
614 | if (pool_handles_idle_->size() > config_.pool_max_handles) { |
461 |
1/2✓ Branch 1 taken 29 times.
✗ Branch 2 not taken.
|
29 | const CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, NULL); |
462 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
|
29 | assert(retval == CURLE_OK); |
463 |
1/2✓ Branch 1 taken 29 times.
✗ Branch 2 not taken.
|
29 | curl_easy_cleanup(handle); |
464 | const std::map<CURL *, S3FanOutDnsEntry *>::size_type | ||
465 |
1/2✓ Branch 1 taken 29 times.
✗ Branch 2 not taken.
|
29 | retitems = curl_sharehandles_->erase(handle); |
466 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
|
29 | assert(retitems == 1); |
467 | } else { | ||
468 |
1/2✓ Branch 1 taken 585 times.
✗ Branch 2 not taken.
|
585 | pool_handles_idle_->insert(handle); |
469 | } | ||
470 | |||
471 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | pool_handles_inuse_->erase(elem); |
472 | 614 | } | |
473 | |||
474 | 12 | void S3FanoutManager::InitPipeWatchFds() { | |
475 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
|
12 | assert(watch_fds_inuse_ == 0); |
476 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
|
12 | assert(watch_fds_size_ >= 2); |
477 | 12 | watch_fds_[0].fd = pipe_terminate_[0]; | |
478 | 12 | watch_fds_[0].events = POLLIN | POLLPRI; | |
479 | 12 | watch_fds_[0].revents = 0; | |
480 | 12 | ++watch_fds_inuse_; | |
481 | 12 | watch_fds_[1].fd = pipe_jobs_[0]; | |
482 | 12 | watch_fds_[1].events = POLLIN | POLLPRI; | |
483 | 12 | watch_fds_[1].revents = 0; | |
484 | 12 | ++watch_fds_inuse_; | |
485 | 12 | } | |
486 | |||
487 | /** | ||
488 | * The Amazon AWS 2 authorization header according to | ||
489 | * http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html#ConstructingTheAuthenticationHeader | ||
490 | */ | ||
491 | 1222 | bool S3FanoutManager::MkV2Authz(const JobInfo &info, | |
492 | vector<string> *headers) const { | ||
493 | 1222 | string payload_hash; | |
494 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | const bool retval = MkPayloadHash(info, &payload_hash); |
495 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | if (!retval) |
496 | ✗ | return false; | |
497 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | const string content_type = GetContentType(info); |
498 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | const string request = GetRequestString(info); |
499 | |||
500 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | const string timestamp = RfcTimestamp(); |
501 |
5/10✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 1222 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 1222 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 1222 times.
✗ Branch 14 not taken.
|
2444 | string to_sign = request + "\n" + payload_hash + "\n" + content_type + "\n" |
502 |
2/4✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
|
2444 | + timestamp + "\n"; |
503 |
2/4✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1222 times.
✗ Branch 4 not taken.
|
1222 | if (config_.x_amz_acl != "") { |
504 |
2/4✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
|
2444 | to_sign += "x-amz-acl:" + config_.x_amz_acl + "\n" + // default ACL |
505 |
5/10✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 1222 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 1222 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 1222 times.
✗ Branch 14 not taken.
|
2444 | "/" + config_.bucket + "/" + info.object_key; |
506 | } | ||
507 |
1/2✓ Branch 3 taken 1222 times.
✗ Branch 4 not taken.
|
1222 | LogCvmfs(kLogS3Fanout, kLogDebug, "%s string to sign for: %s", |
508 | request.c_str(), info.object_key.c_str()); | ||
509 | |||
510 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | shash::Any hmac; |
511 | 1222 | hmac.algorithm = shash::kSha1; | |
512 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | shash::Hmac(config_.secret_key, |
513 | 1222 | reinterpret_cast<const unsigned char *>(to_sign.data()), | |
514 | 1222 | to_sign.length(), &hmac); | |
515 | |||
516 |
3/6✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 1222 times.
✗ Branch 8 not taken.
|
3666 | headers->push_back("Authorization: AWS " + config_.access_key + ":" |
517 |
3/6✓ Branch 2 taken 1222 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1222 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 1222 times.
✗ Branch 9 not taken.
|
6110 | + Base64(string(reinterpret_cast<char *>(hmac.digest), |
518 | 1222 | hmac.GetDigestSize()))); | |
519 |
2/4✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
|
1222 | headers->push_back("Date: " + timestamp); |
520 |
2/4✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
|
1222 | headers->push_back("X-Amz-Acl: " + config_.x_amz_acl); |
521 |
2/2✓ Branch 1 taken 608 times.
✓ Branch 2 taken 614 times.
|
1222 | if (!payload_hash.empty()) |
522 |
2/4✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 608 times.
✗ Branch 5 not taken.
|
608 | headers->push_back("Content-MD5: " + payload_hash); |
523 |
2/2✓ Branch 1 taken 608 times.
✓ Branch 2 taken 614 times.
|
1222 | if (!content_type.empty()) |
524 |
2/4✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 608 times.
✗ Branch 5 not taken.
|
608 | headers->push_back("Content-Type: " + content_type); |
525 | 1222 | return true; | |
526 | 1222 | } | |
527 | |||
528 | |||
529 | ✗ | string S3FanoutManager::GetUriEncode(const string &val, | |
530 | bool encode_slash) const { | ||
531 | ✗ | string result; | |
532 | ✗ | const unsigned len = val.length(); | |
533 | ✗ | result.reserve(len); | |
534 | ✗ | for (unsigned i = 0; i < len; ++i) { | |
535 | ✗ | const char c = val[i]; | |
536 | ✗ | if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') | |
537 | ✗ | || (c >= '0' && c <= '9') || c == '_' || c == '-' || c == '~' | |
538 | ✗ | || c == '.') { | |
539 | ✗ | result.push_back(c); | |
540 | ✗ | } else if (c == '/') { | |
541 | ✗ | if (encode_slash) { | |
542 | ✗ | result += "%2F"; | |
543 | } else { | ||
544 | ✗ | result.push_back(c); | |
545 | } | ||
546 | } else { | ||
547 | ✗ | result.push_back('%'); | |
548 | ✗ | result.push_back((c / 16) + ((c / 16 <= 9) ? '0' : 'A' - 10)); | |
549 | ✗ | result.push_back((c % 16) + ((c % 16 <= 9) ? '0' : 'A' - 10)); | |
550 | } | ||
551 | } | ||
552 | ✗ | return result; | |
553 | } | ||
554 | |||
555 | |||
556 | ✗ | string S3FanoutManager::GetAwsV4SigningKey(const string &date) const { | |
557 | ✗ | if (last_signing_key_.first == date) | |
558 | ✗ | return last_signing_key_.second; | |
559 | |||
560 | ✗ | const string date_key = shash::Hmac256("AWS4" + config_.secret_key, date, | |
561 | ✗ | true); | |
562 | ✗ | const string date_region_key = shash::Hmac256(date_key, config_.region, true); | |
563 | const string date_region_service_key = shash::Hmac256(date_region_key, "s3", | ||
564 | ✗ | true); | |
565 | string signing_key = shash::Hmac256(date_region_service_key, "aws4_request", | ||
566 | ✗ | true); | |
567 | ✗ | last_signing_key_.first = date; | |
568 | ✗ | last_signing_key_.second = signing_key; | |
569 | ✗ | return signing_key; | |
570 | } | ||
571 | |||
572 | |||
573 | /** | ||
574 | * The Amazon AWS4 authorization header according to | ||
575 | * http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-auth-using-authorization-header.html | ||
576 | */ | ||
577 | ✗ | bool S3FanoutManager::MkV4Authz(const JobInfo &info, | |
578 | vector<string> *headers) const { | ||
579 | ✗ | string payload_hash; | |
580 | ✗ | const bool retval = MkPayloadHash(info, &payload_hash); | |
581 | ✗ | if (!retval) | |
582 | ✗ | return false; | |
583 | ✗ | const string content_type = GetContentType(info); | |
584 | ✗ | const string timestamp = IsoTimestamp(); | |
585 | ✗ | const string date = timestamp.substr(0, 8); | |
586 | ✗ | vector<string> tokens = SplitString(complete_hostname_, ':'); | |
587 | ✗ | assert(tokens.size() <= 2); | |
588 | ✗ | string canonical_hostname = tokens[0]; | |
589 | |||
590 | // if we could split the hostname in two and if the port is *NOT* a default | ||
591 | // one | ||
592 | ✗ | if (tokens.size() == 2 | |
593 | ✗ | && !((String2Uint64(tokens[1]) == kDefaultHTTPPort) | |
594 | ✗ | || (String2Uint64(tokens[1]) == kDefaultHTTPSPort))) | |
595 | ✗ | canonical_hostname += ":" + tokens[1]; | |
596 | |||
597 | ✗ | string signed_headers; | |
598 | ✗ | string canonical_headers; | |
599 | ✗ | if (!content_type.empty()) { | |
600 | ✗ | signed_headers += "content-type;"; | |
601 | ✗ | headers->push_back("Content-Type: " + content_type); | |
602 | ✗ | canonical_headers += "content-type:" + content_type + "\n"; | |
603 | } | ||
604 | ✗ | if (config_.x_amz_acl != "") { | |
605 | ✗ | signed_headers += "host;x-amz-acl;x-amz-content-sha256;x-amz-date"; | |
606 | } else { | ||
607 | ✗ | signed_headers += "host;x-amz-content-sha256;x-amz-date"; | |
608 | } | ||
609 | ✗ | canonical_headers += "host:" + canonical_hostname + "\n"; | |
610 | ✗ | if (config_.x_amz_acl != "") { | |
611 | ✗ | canonical_headers += "x-amz-acl:" + config_.x_amz_acl + "\n"; | |
612 | } | ||
613 | ✗ | canonical_headers += "x-amz-content-sha256:" + payload_hash + "\n" | |
614 | ✗ | + "x-amz-date:" + timestamp + "\n"; | |
615 | |||
616 | ✗ | const string scope = date + "/" + config_.region + "/s3/aws4_request"; | |
617 | ✗ | const string uri = config_.dns_buckets ? (string("/") + info.object_key) | |
618 | ✗ | : (string("/") + config_.bucket + "/" | |
619 | ✗ | + info.object_key); | |
620 | |||
621 | ✗ | const string canonical_request = GetRequestString(info) + "\n" | |
622 | ✗ | + GetUriEncode(uri, false) + "\n" + "\n" | |
623 | ✗ | + canonical_headers + "\n" + signed_headers | |
624 | ✗ | + "\n" + payload_hash; | |
625 | |||
626 | ✗ | const string hash_request = shash::Sha256String(canonical_request.c_str()); | |
627 | |||
628 | ✗ | const string string_to_sign = "AWS4-HMAC-SHA256\n" + timestamp + "\n" + scope | |
629 | ✗ | + "\n" + hash_request; | |
630 | |||
631 | ✗ | const string signing_key = GetAwsV4SigningKey(date); | |
632 | ✗ | const string signature = shash::Hmac256(signing_key, string_to_sign); | |
633 | |||
634 | ✗ | headers->push_back("X-Amz-Acl: " + config_.x_amz_acl); | |
635 | ✗ | headers->push_back("X-Amz-Content-Sha256: " + payload_hash); | |
636 | ✗ | headers->push_back("X-Amz-Date: " + timestamp); | |
637 | ✗ | headers->push_back("Authorization: AWS4-HMAC-SHA256 " | |
638 | "Credential=" | ||
639 | ✗ | + config_.access_key + "/" + scope | |
640 | ✗ | + "," | |
641 | "SignedHeaders=" | ||
642 | ✗ | + signed_headers | |
643 | ✗ | + "," | |
644 | "Signature=" | ||
645 | ✗ | + signature); | |
646 | ✗ | return true; | |
647 | } | ||
648 | |||
649 | /** | ||
650 | * The Azure Blob authorization header according to | ||
651 | * https://docs.microsoft.com/en-us/rest/api/storageservices/authorize-with-shared-key | ||
652 | */ | ||
653 | ✗ | bool S3FanoutManager::MkAzureAuthz(const JobInfo &info, | |
654 | vector<string> *headers) const { | ||
655 | ✗ | const string timestamp = RfcTimestamp(); | |
656 | const string canonical_headers = "x-ms-blob-type:BlockBlob\nx-ms-date:" | ||
657 | ✗ | + timestamp + "\nx-ms-version:2011-08-18"; | |
658 | ✗ | const string canonical_resource = "/" + config_.access_key + "/" | |
659 | ✗ | + config_.bucket + "/" + info.object_key; | |
660 | |||
661 | ✗ | string string_to_sign; | |
662 | ✗ | if ((info.request == JobInfo::kReqHeadOnly) | |
663 | ✗ | || (info.request == JobInfo::kReqHeadPut) | |
664 | ✗ | || (info.request == JobInfo::kReqDelete)) { | |
665 | ✗ | string_to_sign = GetRequestString(info) + string("\n\n\n") | |
666 | ✗ | + "\n\n\n\n\n\n\n\n\n" + canonical_headers + "\n" | |
667 | ✗ | + canonical_resource; | |
668 | } else { | ||
669 | ✗ | string_to_sign = GetRequestString(info) + string("\n\n\n") | |
670 | ✗ | + string(StringifyInt(info.origin->GetSize())) | |
671 | ✗ | + "\n\n\n\n\n\n\n\n\n" + canonical_headers + "\n" | |
672 | ✗ | + canonical_resource; | |
673 | } | ||
674 | |||
675 | ✗ | string signing_key; | |
676 | ✗ | const int retval = Debase64(config_.secret_key, &signing_key); | |
677 | ✗ | if (!retval) | |
678 | ✗ | return false; | |
679 | |||
680 | ✗ | const string signature = shash::Hmac256(signing_key, string_to_sign, true); | |
681 | |||
682 | ✗ | headers->push_back("x-ms-date: " + timestamp); | |
683 | ✗ | headers->push_back("x-ms-version: 2011-08-18"); | |
684 | ✗ | headers->push_back("Authorization: SharedKey " + config_.access_key + ":" | |
685 | ✗ | + Base64(signature)); | |
686 | ✗ | headers->push_back("x-ms-blob-type: BlockBlob"); | |
687 | ✗ | return true; | |
688 | } | ||
689 | |||
690 | 1222 | void S3FanoutManager::InitializeDnsSettingsCurl(CURL *handle, | |
691 | CURLSH *sharehandle, | ||
692 | curl_slist *clist) const { | ||
693 | 1222 | CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, sharehandle); | |
694 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | assert(retval == CURLE_OK); |
695 | 1222 | retval = curl_easy_setopt(handle, CURLOPT_RESOLVE, clist); | |
696 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | assert(retval == CURLE_OK); |
697 | 1222 | } | |
698 | |||
699 | |||
700 | 1222 | int S3FanoutManager::InitializeDnsSettings(CURL *handle, | |
701 | std::string host_with_port) const { | ||
702 | // Use existing handle | ||
703 | const std::map<CURL *, S3FanOutDnsEntry *>::const_iterator | ||
704 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | it = curl_sharehandles_->find(handle); |
705 |
2/2✓ Branch 3 taken 1173 times.
✓ Branch 4 taken 49 times.
|
1222 | if (it != curl_sharehandles_->end()) { |
706 |
1/2✓ Branch 1 taken 1173 times.
✗ Branch 2 not taken.
|
1173 | InitializeDnsSettingsCurl(handle, it->second->sharehandle, |
707 | 1173 | it->second->clist); | |
708 | 1173 | return 0; | |
709 | } | ||
710 | |||
711 | // Add protocol information for extraction of fields for DNS | ||
712 |
2/4✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 49 times.
✗ Branch 4 not taken.
|
49 | if (!IsHttpUrl(host_with_port)) |
713 |
2/4✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 49 times.
✗ Branch 5 not taken.
|
49 | host_with_port = config_.protocol + "://" + host_with_port; |
714 |
1/2✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
|
49 | const std::string remote_host = dns::ExtractHost(host_with_port); |
715 |
1/2✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
|
49 | const std::string remote_port = dns::ExtractPort(host_with_port); |
716 | |||
717 | // If we have the name already resolved, use the least used IP | ||
718 | 49 | S3FanOutDnsEntry *useme = NULL; | |
719 | 49 | unsigned int usemin = UINT_MAX; | |
720 | 49 | std::set<S3FanOutDnsEntry *>::iterator its3 = sharehandles_->begin(); | |
721 |
2/2✓ Branch 3 taken 39 times.
✓ Branch 4 taken 49 times.
|
88 | for (; its3 != sharehandles_->end(); ++its3) { |
722 |
1/2✓ Branch 2 taken 39 times.
✗ Branch 3 not taken.
|
39 | if ((*its3)->dns_name == remote_host) { |
723 |
1/2✓ Branch 1 taken 39 times.
✗ Branch 2 not taken.
|
39 | if (usemin >= (*its3)->counter) { |
724 | 39 | usemin = (*its3)->counter; | |
725 | 39 | useme = (*its3); | |
726 | } | ||
727 | } | ||
728 | } | ||
729 |
2/2✓ Branch 0 taken 39 times.
✓ Branch 1 taken 10 times.
|
49 | if (useme != NULL) { |
730 |
1/2✓ Branch 1 taken 39 times.
✗ Branch 2 not taken.
|
39 | curl_sharehandles_->insert( |
731 | 39 | std::pair<CURL *, S3FanOutDnsEntry *>(handle, useme)); | |
732 | 39 | useme->counter++; | |
733 |
1/2✓ Branch 1 taken 39 times.
✗ Branch 2 not taken.
|
39 | InitializeDnsSettingsCurl(handle, useme->sharehandle, useme->clist); |
734 | 39 | return 0; | |
735 | } | ||
736 | |||
737 | // We need to resolve the hostname | ||
738 | // TODO(ssheikki): support ipv6 also... if (opt_ipv4_only_) | ||
739 |
1/2✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
|
10 | const dns::Host host = resolver_->Resolve(remote_host); |
740 |
1/2✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
|
10 | set<string> ipv4_addresses = host.ipv4_addresses(); |
741 | 10 | std::set<string>::iterator its = ipv4_addresses.begin(); | |
742 | 10 | S3FanOutDnsEntry *dnse = NULL; | |
743 |
2/2✓ Branch 3 taken 10 times.
✓ Branch 4 taken 10 times.
|
20 | for (; its != ipv4_addresses.end(); ++its) { |
744 |
2/4✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 10 times.
✗ Branch 5 not taken.
|
10 | dnse = new S3FanOutDnsEntry(); |
745 | 10 | dnse->counter = 0; | |
746 |
1/2✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
|
10 | dnse->dns_name = remote_host; |
747 |
4/12✗ Branch 1 not taken.
✓ Branch 2 taken 10 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 8 taken 10 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 10 times.
✗ Branch 12 not taken.
✗ Branch 14 not taken.
✓ Branch 15 taken 10 times.
✗ Branch 18 not taken.
✗ Branch 19 not taken.
|
10 | dnse->port = remote_port.size() == 0 ? "80" : remote_port; |
748 |
1/2✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
|
10 | dnse->ip = *its; |
749 | 10 | dnse->clist = NULL; | |
750 |
1/2✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
|
10 | dnse->clist = curl_slist_append( |
751 | dnse->clist, | ||
752 |
4/8✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 10 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 10 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 10 times.
✗ Branch 11 not taken.
|
20 | (dnse->dns_name + ":" + dnse->port + ":" + dnse->ip).c_str()); |
753 |
1/2✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
|
10 | dnse->sharehandle = curl_share_init(); |
754 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
|
10 | assert(dnse->sharehandle != NULL); |
755 |
1/2✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
|
10 | const CURLSHcode share_retval = curl_share_setopt( |
756 | dnse->sharehandle, CURLSHOPT_SHARE, CURL_LOCK_DATA_DNS); | ||
757 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
|
10 | assert(share_retval == CURLSHE_OK); |
758 |
1/2✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
|
10 | sharehandles_->insert(dnse); |
759 | } | ||
760 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
|
10 | if (dnse == NULL) { |
761 | ✗ | LogCvmfs(kLogS3Fanout, kLogStderr | kLogSyslogErr, | |
762 | "Error: DNS resolve failed for address '%s'.", | ||
763 | remote_host.c_str()); | ||
764 | ✗ | assert(dnse != NULL); | |
765 | ✗ | return -1; | |
766 | } | ||
767 |
1/2✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
|
10 | curl_sharehandles_->insert( |
768 | 10 | std::pair<CURL *, S3FanOutDnsEntry *>(handle, dnse)); | |
769 | 10 | dnse->counter++; | |
770 |
1/2✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
|
10 | InitializeDnsSettingsCurl(handle, dnse->sharehandle, dnse->clist); |
771 | |||
772 | 10 | return 0; | |
773 | 49 | } | |
774 | |||
775 | |||
776 | 1222 | bool S3FanoutManager::MkPayloadHash(const JobInfo &info, | |
777 | string *hex_hash) const { | ||
778 |
2/2✓ Branch 0 taken 1217 times.
✓ Branch 1 taken 5 times.
|
1222 | if ((info.request == JobInfo::kReqHeadOnly) |
779 |
2/2✓ Branch 0 taken 609 times.
✓ Branch 1 taken 608 times.
|
1217 | || (info.request == JobInfo::kReqHeadPut) |
780 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 608 times.
|
609 | || (info.request == JobInfo::kReqDelete)) { |
781 |
1/4✓ Branch 0 taken 614 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
|
614 | switch (config_.authz_method) { |
782 | 614 | case kAuthzAwsV2: | |
783 | 614 | hex_hash->clear(); | |
784 | 614 | break; | |
785 | ✗ | case kAuthzAwsV4: | |
786 | // Sha256 over empty string | ||
787 | *hex_hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b78" | ||
788 | ✗ | "52b855"; | |
789 | ✗ | break; | |
790 | ✗ | case kAuthzAzure: | |
791 | // no payload hash required for Azure signature | ||
792 | ✗ | hex_hash->clear(); | |
793 | ✗ | break; | |
794 | ✗ | default: | |
795 | ✗ | PANIC(NULL); | |
796 | } | ||
797 | 614 | return true; | |
798 | } | ||
799 | |||
800 | // PUT, there is actually payload | ||
801 |
1/2✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
|
608 | shash::Any payload_hash(shash::kMd5); |
802 | |||
803 | unsigned char *data; | ||
804 | 608 | const unsigned int nbytes = info.origin->Data( | |
805 |
2/4✓ Branch 2 taken 608 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 608 times.
✗ Branch 6 not taken.
|
608 | reinterpret_cast<void **>(&data), info.origin->GetSize(), 0); |
806 |
2/4✓ Branch 2 taken 608 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 608 times.
|
608 | assert(nbytes == info.origin->GetSize()); |
807 | |||
808 |
1/4✓ Branch 0 taken 608 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
|
608 | switch (config_.authz_method) { |
809 | 608 | case kAuthzAwsV2: | |
810 |
1/2✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
|
608 | shash::HashMem(data, nbytes, &payload_hash); |
811 |
2/4✓ Branch 2 taken 608 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 608 times.
✗ Branch 6 not taken.
|
1824 | *hex_hash = Base64(string(reinterpret_cast<char *>(payload_hash.digest), |
812 | 1216 | payload_hash.GetDigestSize())); | |
813 | 608 | return true; | |
814 | ✗ | case kAuthzAwsV4: | |
815 | ✗ | *hex_hash = shash::Sha256Mem(data, nbytes); | |
816 | ✗ | return true; | |
817 | ✗ | case kAuthzAzure: | |
818 | // no payload hash required for Azure signature | ||
819 | ✗ | hex_hash->clear(); | |
820 | ✗ | return true; | |
821 | ✗ | default: | |
822 | ✗ | PANIC(NULL); | |
823 | } | ||
824 | } | ||
825 | |||
826 | 1223 | string S3FanoutManager::GetRequestString(const JobInfo &info) const { | |
827 |
3/4✓ Branch 0 taken 613 times.
✓ Branch 1 taken 608 times.
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
|
1223 | switch (info.request) { |
828 | 613 | case JobInfo::kReqHeadOnly: | |
829 | case JobInfo::kReqHeadPut: | ||
830 |
1/2✓ Branch 2 taken 613 times.
✗ Branch 3 not taken.
|
613 | return "HEAD"; |
831 | 608 | case JobInfo::kReqPutCas: | |
832 | case JobInfo::kReqPutDotCvmfs: | ||
833 | case JobInfo::kReqPutHtml: | ||
834 | case JobInfo::kReqPutBucket: | ||
835 |
1/2✓ Branch 2 taken 608 times.
✗ Branch 3 not taken.
|
608 | return "PUT"; |
836 | 2 | case JobInfo::kReqDelete: | |
837 |
1/2✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
|
2 | return "DELETE"; |
838 | ✗ | default: | |
839 | ✗ | PANIC(NULL); | |
840 | } | ||
841 | } | ||
842 | |||
843 | |||
844 | 1222 | string S3FanoutManager::GetContentType(const JobInfo &info) const { | |
845 |
2/6✓ Branch 0 taken 614 times.
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
|
1222 | switch (info.request) { |
846 | 614 | case JobInfo::kReqHeadOnly: | |
847 | case JobInfo::kReqHeadPut: | ||
848 | case JobInfo::kReqDelete: | ||
849 |
1/2✓ Branch 2 taken 614 times.
✗ Branch 3 not taken.
|
614 | return ""; |
850 | 608 | case JobInfo::kReqPutCas: | |
851 |
1/2✓ Branch 2 taken 608 times.
✗ Branch 3 not taken.
|
608 | return "application/octet-stream"; |
852 | ✗ | case JobInfo::kReqPutDotCvmfs: | |
853 | ✗ | return "application/x-cvmfs"; | |
854 | ✗ | case JobInfo::kReqPutHtml: | |
855 | ✗ | return "text/html"; | |
856 | ✗ | case JobInfo::kReqPutBucket: | |
857 | ✗ | return "text/xml"; | |
858 | ✗ | default: | |
859 | ✗ | PANIC(NULL); | |
860 | } | ||
861 | } | ||
862 | |||
863 | |||
864 | /** | ||
865 | * Request parameters set the URL and other options such as timeout and | ||
866 | * proxy. | ||
867 | */ | ||
868 | 1222 | Failures S3FanoutManager::InitializeRequest(JobInfo *info, CURL *handle) const { | |
869 | // Initialize internal download state | ||
870 | 1222 | info->curl_handle = handle; | |
871 | 1222 | info->error_code = kFailOk; | |
872 | 1222 | info->http_error = 0; | |
873 | 1222 | info->num_retries = 0; | |
874 | 1222 | info->backoff_ms = 0; | |
875 | 1222 | info->throttle_ms = 0; | |
876 | 1222 | info->throttle_timestamp = 0; | |
877 | 1222 | info->http_headers = NULL; | |
878 | // info->payload_size is needed in S3Uploader::MainCollectResults, | ||
879 | // where info->origin is already destroyed. | ||
880 |
1/2✓ Branch 2 taken 1222 times.
✗ Branch 3 not taken.
|
1222 | info->payload_size = info->origin->GetSize(); |
881 | |||
882 |
2/4✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
|
1222 | InitializeDnsSettings(handle, complete_hostname_); |
883 | |||
884 | CURLcode retval; | ||
885 |
2/2✓ Branch 0 taken 1217 times.
✓ Branch 1 taken 5 times.
|
1222 | if ((info->request == JobInfo::kReqHeadOnly) |
886 |
2/2✓ Branch 0 taken 609 times.
✓ Branch 1 taken 608 times.
|
1217 | || (info->request == JobInfo::kReqHeadPut) |
887 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 608 times.
|
609 | || (info->request == JobInfo::kReqDelete)) { |
888 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 0); |
889 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 614 times.
|
614 | assert(retval == CURLE_OK); |
890 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 1); |
891 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 614 times.
|
614 | assert(retval == CURLE_OK); |
892 | |||
893 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 613 times.
|
614 | if (info->request == JobInfo::kReqDelete) { |
894 |
2/4✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 1 times.
✗ Branch 6 not taken.
|
1 | retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, |
895 | GetRequestString(*info).c_str()); | ||
896 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | assert(retval == CURLE_OK); |
897 | } else { | ||
898 |
1/2✓ Branch 1 taken 613 times.
✗ Branch 2 not taken.
|
613 | retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL); |
899 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 613 times.
|
613 | assert(retval == CURLE_OK); |
900 | } | ||
901 | } else { | ||
902 |
1/2✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
|
608 | retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL); |
903 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 608 times.
|
608 | assert(retval == CURLE_OK); |
904 |
1/2✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
|
608 | retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 1); |
905 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 608 times.
|
608 | assert(retval == CURLE_OK); |
906 |
1/2✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
|
608 | retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 0); |
907 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 608 times.
|
608 | assert(retval == CURLE_OK); |
908 |
2/4✓ Branch 2 taken 608 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 608 times.
✗ Branch 6 not taken.
|
608 | retval = curl_easy_setopt(handle, CURLOPT_INFILESIZE_LARGE, |
909 | static_cast<curl_off_t>(info->origin->GetSize())); | ||
910 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 608 times.
|
608 | assert(retval == CURLE_OK); |
911 | |||
912 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 608 times.
|
608 | if (info->request == JobInfo::kReqPutDotCvmfs) { |
913 | ✗ | info->http_headers = curl_slist_append( | |
914 | info->http_headers, dot_cvmfs_cache_control_header.c_str()); | ||
915 |
1/2✓ Branch 0 taken 608 times.
✗ Branch 1 not taken.
|
608 | } else if (info->request == JobInfo::kReqPutCas) { |
916 |
1/2✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
|
608 | info->http_headers = curl_slist_append(info->http_headers, |
917 | kCacheControlCas); | ||
918 | } | ||
919 | } | ||
920 | |||
921 | bool retval_b; | ||
922 | |||
923 | // Authorization | ||
924 | 1222 | vector<string> authz_headers; | |
925 |
1/4✓ Branch 0 taken 1222 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
|
1222 | switch (config_.authz_method) { |
926 | 1222 | case kAuthzAwsV2: | |
927 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | retval_b = MkV2Authz(*info, &authz_headers); |
928 | 1222 | break; | |
929 | ✗ | case kAuthzAwsV4: | |
930 | ✗ | retval_b = MkV4Authz(*info, &authz_headers); | |
931 | ✗ | break; | |
932 | ✗ | case kAuthzAzure: | |
933 | ✗ | retval_b = MkAzureAuthz(*info, &authz_headers); | |
934 | ✗ | break; | |
935 | ✗ | default: | |
936 | ✗ | PANIC(NULL); | |
937 | } | ||
938 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | if (!retval_b) |
939 | ✗ | return kFailLocalIO; | |
940 |
2/2✓ Branch 1 taken 4882 times.
✓ Branch 2 taken 1222 times.
|
6104 | for (unsigned i = 0; i < authz_headers.size(); ++i) { |
941 |
1/2✓ Branch 2 taken 4882 times.
✗ Branch 3 not taken.
|
4882 | info->http_headers = curl_slist_append(info->http_headers, |
942 | 4882 | authz_headers[i].c_str()); | |
943 | } | ||
944 | |||
945 | // Common headers | ||
946 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | info->http_headers = curl_slist_append(info->http_headers, |
947 | "Connection: Keep-Alive"); | ||
948 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | info->http_headers = curl_slist_append(info->http_headers, "Pragma:"); |
949 | // No 100-continue | ||
950 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | info->http_headers = curl_slist_append(info->http_headers, "Expect:"); |
951 | // Strip unnecessary header | ||
952 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | info->http_headers = curl_slist_append(info->http_headers, "Accept:"); |
953 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | info->http_headers = curl_slist_append(info->http_headers, |
954 | 1222 | user_agent_->c_str()); | |
955 | |||
956 | // Set curl parameters | ||
957 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | retval = curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info)); |
958 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | assert(retval == CURLE_OK); |
959 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | retval = curl_easy_setopt(handle, CURLOPT_HEADERDATA, |
960 | static_cast<void *>(info)); | ||
961 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | assert(retval == CURLE_OK); |
962 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | retval = curl_easy_setopt(handle, CURLOPT_READDATA, |
963 | static_cast<void *>(info)); | ||
964 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | assert(retval == CURLE_OK); |
965 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | retval = curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->http_headers); |
966 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | assert(retval == CURLE_OK); |
967 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | if (opt_ipv4_only_) { |
968 | ✗ | retval = curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4); | |
969 | ✗ | assert(retval == CURLE_OK); | |
970 | } | ||
971 | // Follow HTTP redirects | ||
972 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | retval = curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L); |
973 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | assert(retval == CURLE_OK); |
974 | |||
975 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | retval = curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, info->errorbuffer); |
976 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | assert(retval == CURLE_OK); |
977 | |||
978 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 1222 times.
|
1222 | if (config_.protocol == "https") { |
979 | ✗ | retval = curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 1L); | |
980 | ✗ | assert(retval == CURLE_OK); | |
981 | ✗ | retval = curl_easy_setopt(handle, CURLOPT_PROXY_SSL_VERIFYPEER, 1L); | |
982 | ✗ | assert(retval == CURLE_OK); | |
983 | ✗ | const bool add_cert = ssl_certificate_store_.ApplySslCertificatePath( | |
984 | handle); | ||
985 | ✗ | assert(add_cert); | |
986 | } | ||
987 | |||
988 | 1222 | return kFailOk; | |
989 | 1222 | } | |
990 | |||
991 | |||
992 | /** | ||
993 | * Sets the URL specific options such as host to use and timeout. | ||
994 | */ | ||
995 | 1222 | void S3FanoutManager::SetUrlOptions(JobInfo *info) const { | |
996 | 1222 | CURL *curl_handle = info->curl_handle; | |
997 | CURLcode retval; | ||
998 | |||
999 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | retval = curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, |
1000 | config_.opt_timeout_sec); | ||
1001 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | assert(retval == CURLE_OK); |
1002 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, |
1003 | kLowSpeedLimit); | ||
1004 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | assert(retval == CURLE_OK); |
1005 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, |
1006 | config_.opt_timeout_sec); | ||
1007 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | assert(retval == CURLE_OK); |
1008 | |||
1009 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | if (is_curl_debug_) { |
1010 | ✗ | retval = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE, 1); | |
1011 | ✗ | assert(retval == CURLE_OK); | |
1012 | } | ||
1013 | |||
1014 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | const string url = MkUrl(info->object_key); |
1015 |
1/2✓ Branch 2 taken 1222 times.
✗ Branch 3 not taken.
|
1222 | retval = curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str()); |
1016 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | assert(retval == CURLE_OK); |
1017 | |||
1018 |
1/2✓ Branch 2 taken 1222 times.
✗ Branch 3 not taken.
|
1222 | retval = curl_easy_setopt(curl_handle, CURLOPT_PROXY, config_.proxy.c_str()); |
1019 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | assert(retval == CURLE_OK); |
1020 | 1222 | } | |
1021 | |||
1022 | |||
1023 | /** | ||
1024 | * Adds transfer time and uploaded bytes to the global counters. | ||
1025 | */ | ||
1026 | 1226 | void S3FanoutManager::UpdateStatistics(CURL *handle) { | |
1027 | double val; | ||
1028 | |||
1029 |
2/4✓ Branch 1 taken 1226 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1226 times.
✗ Branch 4 not taken.
|
1226 | if (curl_easy_getinfo(handle, CURLINFO_SIZE_UPLOAD, &val) == CURLE_OK) |
1030 | 1226 | statistics_->transferred_bytes += val; | |
1031 | 1226 | } | |
1032 | |||
1033 | |||
1034 | /** | ||
1035 | * Retry if possible and if not already done too often. | ||
1036 | */ | ||
1037 | 6 | bool S3FanoutManager::CanRetry(const JobInfo *info) { | |
1038 | 6 | return (info->error_code == kFailHostConnection | |
1039 |
1/2✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
|
6 | || info->error_code == kFailHostResolve |
1040 |
1/2✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
|
6 | || info->error_code == kFailServiceUnavailable |
1041 |
2/2✓ Branch 0 taken 4 times.
✓ Branch 1 taken 2 times.
|
6 | || info->error_code == kFailRetry) |
1042 |
2/4✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
|
12 | && (info->num_retries < config_.opt_max_retries); |
1043 | } | ||
1044 | |||
1045 | |||
1046 | /** | ||
1047 | * Backoff for retry to introduce a jitter into a upload sequence. | ||
1048 | * | ||
1049 | * \return true if backoff has been performed, false otherwise | ||
1050 | */ | ||
1051 | 4 | void S3FanoutManager::Backoff(JobInfo *info) { | |
1052 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
|
4 | if (info->error_code != kFailRetry) |
1053 | ✗ | info->num_retries++; | |
1054 | 4 | statistics_->num_retries++; | |
1055 | |||
1056 |
1/2✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
|
4 | if (info->throttle_ms > 0) { |
1057 | 4 | LogCvmfs(kLogS3Fanout, kLogDebug, "throttling for %d ms", | |
1058 | info->throttle_ms); | ||
1059 | 4 | const uint64_t now = platform_monotonic_time(); | |
1060 |
1/2✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
|
4 | if ((info->throttle_timestamp + (info->throttle_ms / 1000)) >= now) { |
1061 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 3 times.
|
4 | if ((now - timestamp_last_throttle_report_) |
1062 | > kThrottleReportIntervalSec) { | ||
1063 | 1 | LogCvmfs(kLogS3Fanout, kLogStdout, | |
1064 | "Warning: S3 backend throttling %ums " | ||
1065 | "(total backoff time so far %lums)", | ||
1066 | 1 | info->throttle_ms, statistics_->ms_throttled); | |
1067 | 1 | timestamp_last_throttle_report_ = now; | |
1068 | } | ||
1069 | 4 | statistics_->ms_throttled += info->throttle_ms; | |
1070 | 4 | SafeSleepMs(info->throttle_ms); | |
1071 | } | ||
1072 | } else { | ||
1073 | ✗ | if (info->backoff_ms == 0) { | |
1074 | // Must be != 0 | ||
1075 | ✗ | info->backoff_ms = prng_.Next(config_.opt_backoff_init_ms + 1); | |
1076 | } else { | ||
1077 | ✗ | info->backoff_ms *= 2; | |
1078 | } | ||
1079 | ✗ | if (info->backoff_ms > config_.opt_backoff_max_ms) | |
1080 | ✗ | info->backoff_ms = config_.opt_backoff_max_ms; | |
1081 | |||
1082 | ✗ | LogCvmfs(kLogS3Fanout, kLogDebug, "backing off for %d ms", | |
1083 | info->backoff_ms); | ||
1084 | ✗ | SafeSleepMs(info->backoff_ms); | |
1085 | } | ||
1086 | 4 | } | |
1087 | |||
1088 | |||
1089 | /** | ||
1090 | * Checks the result of a curl request and implements the failure logic | ||
1091 | * and takes care of cleanup. | ||
1092 | * | ||
1093 | * @return true if request should be repeated, false otherwise | ||
1094 | */ | ||
1095 | 1226 | bool S3FanoutManager::VerifyAndFinalize(const int curl_error, JobInfo *info) { | |
1096 | 1226 | LogCvmfs(kLogS3Fanout, kLogDebug, | |
1097 | "Verify uploaded/tested object %s " | ||
1098 | "(curl error %d, info error %d, info request %d)", | ||
1099 | 1226 | info->object_key.c_str(), curl_error, info->error_code, | |
1100 | 1226 | info->request); | |
1101 | 1226 | UpdateStatistics(info->curl_handle); | |
1102 | |||
1103 | // Verification and error classification | ||
1104 |
1/6✓ Branch 0 taken 1226 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
|
1226 | switch (curl_error) { |
1105 | 1226 | case CURLE_OK: | |
1106 |
2/2✓ Branch 0 taken 1222 times.
✓ Branch 1 taken 4 times.
|
1226 | if ((info->error_code != kFailRetry) |
1107 |
2/2✓ Branch 0 taken 612 times.
✓ Branch 1 taken 610 times.
|
1222 | && (info->error_code != kFailNotFound)) { |
1108 | 612 | info->error_code = kFailOk; | |
1109 | } | ||
1110 | 1226 | break; | |
1111 | ✗ | case CURLE_UNSUPPORTED_PROTOCOL: | |
1112 | case CURLE_URL_MALFORMAT: | ||
1113 | ✗ | info->error_code = kFailBadRequest; | |
1114 | ✗ | break; | |
1115 | ✗ | case CURLE_COULDNT_RESOLVE_HOST: | |
1116 | ✗ | info->error_code = kFailHostResolve; | |
1117 | ✗ | break; | |
1118 | ✗ | case CURLE_COULDNT_CONNECT: | |
1119 | case CURLE_OPERATION_TIMEDOUT: | ||
1120 | case CURLE_SEND_ERROR: | ||
1121 | case CURLE_RECV_ERROR: | ||
1122 | ✗ | info->error_code = kFailHostConnection; | |
1123 | ✗ | break; | |
1124 | ✗ | case CURLE_ABORTED_BY_CALLBACK: | |
1125 | case CURLE_WRITE_ERROR: | ||
1126 | // Error set by callback | ||
1127 | ✗ | break; | |
1128 | ✗ | default: | |
1129 | ✗ | LogCvmfs(kLogS3Fanout, kLogStderr | kLogSyslogErr, | |
1130 | "unexpected curl error (%d) while trying to upload %s: %s", | ||
1131 | curl_error, info->object_key.c_str(), info->errorbuffer); | ||
1132 | ✗ | info->error_code = kFailOther; | |
1133 | ✗ | break; | |
1134 | } | ||
1135 | |||
1136 | // Transform HEAD to PUT request | ||
1137 |
2/2✓ Branch 0 taken 610 times.
✓ Branch 1 taken 616 times.
|
1226 | if ((info->error_code == kFailNotFound) |
1138 |
2/2✓ Branch 0 taken 608 times.
✓ Branch 1 taken 2 times.
|
610 | && (info->request == JobInfo::kReqHeadPut)) { |
1139 | 608 | LogCvmfs(kLogS3Fanout, kLogDebug, "not found: %s, uploading", | |
1140 | info->object_key.c_str()); | ||
1141 | 608 | info->request = JobInfo::kReqPutCas; | |
1142 | 608 | curl_slist_free_all(info->http_headers); | |
1143 | 608 | info->http_headers = NULL; | |
1144 | 608 | const s3fanout::Failures init_failure = InitializeRequest( | |
1145 | info, info->curl_handle); | ||
1146 | |||
1147 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 608 times.
|
608 | if (init_failure != s3fanout::kFailOk) { |
1148 | ✗ | PANIC(kLogStderr, | |
1149 | "Failed to initialize CURL handle " | ||
1150 | "(error: %d - %s | errno: %d)", | ||
1151 | init_failure, Code2Ascii(init_failure), errno); | ||
1152 | } | ||
1153 | 608 | SetUrlOptions(info); | |
1154 | // Reset origin | ||
1155 | 608 | info->origin->Rewind(); | |
1156 | 608 | return true; // Again, Put | |
1157 | } | ||
1158 | |||
1159 | // Determination if failed request should be repeated | ||
1160 | 618 | bool try_again = false; | |
1161 |
2/2✓ Branch 0 taken 6 times.
✓ Branch 1 taken 612 times.
|
618 | if (info->error_code != kFailOk) { |
1162 | 6 | try_again = CanRetry(info); | |
1163 | } | ||
1164 |
2/2✓ Branch 0 taken 4 times.
✓ Branch 1 taken 614 times.
|
618 | if (try_again) { |
1165 |
1/2✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
|
4 | if (info->request == JobInfo::kReqPutCas |
1166 |
1/2✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
|
4 | || info->request == JobInfo::kReqPutDotCvmfs |
1167 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
|
4 | || info->request == JobInfo::kReqPutHtml) { |
1168 | ✗ | LogCvmfs(kLogS3Fanout, kLogDebug, "Trying again to upload %s", | |
1169 | info->object_key.c_str()); | ||
1170 | // Reset origin | ||
1171 | ✗ | info->origin->Rewind(); | |
1172 | } | ||
1173 | 4 | Backoff(info); | |
1174 | 4 | info->error_code = kFailOk; | |
1175 | 4 | info->http_error = 0; | |
1176 | 4 | info->throttle_ms = 0; | |
1177 | 4 | info->backoff_ms = 0; | |
1178 | 4 | info->throttle_timestamp = 0; | |
1179 | 4 | return true; // try again | |
1180 | } | ||
1181 | |||
1182 | // Cleanup opened resources | ||
1183 | 614 | info->origin.Destroy(); | |
1184 | |||
1185 |
3/4✓ Branch 0 taken 2 times.
✓ Branch 1 taken 612 times.
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
|
614 | if ((info->error_code != kFailOk) && (info->http_error != 0) |
1186 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
|
2 | && (info->http_error != 404)) { |
1187 | ✗ | LogCvmfs(kLogS3Fanout, kLogStderr, "S3: HTTP failure %d", info->http_error); | |
1188 | } | ||
1189 | 614 | return false; // stop transfer | |
1190 | } | ||
1191 | |||
1192 |
2/4✓ Branch 3 taken 12 times.
✗ Branch 4 not taken.
✓ Branch 9 taken 12 times.
✗ Branch 10 not taken.
|
12 | S3FanoutManager::S3FanoutManager(const S3Config &config) : config_(config) { |
1193 | 12 | atomic_init32(&multi_threaded_); | |
1194 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | MakePipe(pipe_terminate_); |
1195 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | MakePipe(pipe_jobs_); |
1196 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | MakePipe(pipe_completed_); |
1197 | |||
1198 | int retval; | ||
1199 | 12 | jobs_todo_lock_ = reinterpret_cast<pthread_mutex_t *>( | |
1200 | 12 | smalloc(sizeof(pthread_mutex_t))); | |
1201 | 12 | retval = pthread_mutex_init(jobs_todo_lock_, NULL); | |
1202 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
|
12 | assert(retval == 0); |
1203 | 12 | curl_handle_lock_ = reinterpret_cast<pthread_mutex_t *>( | |
1204 | 12 | smalloc(sizeof(pthread_mutex_t))); | |
1205 | 12 | retval = pthread_mutex_init(curl_handle_lock_, NULL); | |
1206 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
|
12 | assert(retval == 0); |
1207 | |||
1208 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | active_requests_ = new set<JobInfo *>; |
1209 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | pool_handles_idle_ = new set<CURL *>; |
1210 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | pool_handles_inuse_ = new set<CURL *>; |
1211 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | curl_sharehandles_ = new map<CURL *, S3FanOutDnsEntry *>; |
1212 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | sharehandles_ = new set<S3FanOutDnsEntry *>; |
1213 | 12 | watch_fds_max_ = 4 * config_.pool_max_handles; | |
1214 | 12 | max_available_jobs_ = 4 * config_.pool_max_handles; | |
1215 |
2/4✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 12 times.
✗ Branch 5 not taken.
|
12 | available_jobs_ = new Semaphore(max_available_jobs_); |
1216 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
|
12 | assert(NULL != available_jobs_); |
1217 | |||
1218 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | statistics_ = new Statistics(); |
1219 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | user_agent_ = new string(); |
1220 |
2/4✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
|
12 | *user_agent_ = "User-Agent: cvmfs " + string(CVMFS_VERSION); |
1221 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | complete_hostname_ = MkCompleteHostname(); |
1222 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | dot_cvmfs_cache_control_header = MkDotCvmfsCacheControlHeader(); |
1223 | |||
1224 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | const CURLcode cretval = curl_global_init(CURL_GLOBAL_ALL); |
1225 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
|
12 | assert(cretval == CURLE_OK); |
1226 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | curl_multi_ = curl_multi_init(); |
1227 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
|
12 | assert(curl_multi_ != NULL); |
1228 | CURLMcode mretval; | ||
1229 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION, |
1230 | CallbackCurlSocket); | ||
1231 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
|
12 | assert(mretval == CURLM_OK); |
1232 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA, |
1233 | static_cast<void *>(this)); | ||
1234 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
|
12 | assert(mretval == CURLM_OK); |
1235 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | mretval = curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS, |
1236 | config_.pool_max_handles); | ||
1237 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
|
12 | assert(mretval == CURLM_OK); |
1238 | |||
1239 | 12 | prng_.InitLocaltime(); | |
1240 | |||
1241 | 12 | thread_upload_ = 0; | |
1242 | 12 | timestamp_last_throttle_report_ = 0; | |
1243 | 12 | is_curl_debug_ = (getenv("_CVMFS_CURL_DEBUG") != NULL); | |
1244 | |||
1245 | // Parsing environment variables | ||
1246 | 12 | if ((getenv("CVMFS_IPV4_ONLY") != NULL) | |
1247 |
2/6✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 12 times.
|
12 | && (strlen(getenv("CVMFS_IPV4_ONLY")) > 0)) { |
1248 | ✗ | opt_ipv4_only_ = true; | |
1249 | } else { | ||
1250 | 12 | opt_ipv4_only_ = false; | |
1251 | } | ||
1252 | |||
1253 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | resolver_ = dns::CaresResolver::Create(opt_ipv4_only_, 2, 2000); |
1254 | |||
1255 | 12 | watch_fds_ = static_cast<struct pollfd *>(smalloc(4 * sizeof(struct pollfd))); | |
1256 | 12 | watch_fds_size_ = 4; | |
1257 | 12 | watch_fds_inuse_ = 0; | |
1258 | |||
1259 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | ssl_certificate_store_.UseSystemCertificatePath(); |
1260 | 12 | } | |
1261 | |||
1262 | 24 | S3FanoutManager::~S3FanoutManager() { | |
1263 | 12 | pthread_mutex_destroy(jobs_todo_lock_); | |
1264 | 12 | free(jobs_todo_lock_); | |
1265 | 12 | pthread_mutex_destroy(curl_handle_lock_); | |
1266 | 12 | free(curl_handle_lock_); | |
1267 | |||
1268 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | if (atomic_xadd32(&multi_threaded_, 0) == 1) { |
1269 | // Shutdown I/O thread | ||
1270 | 12 | char buf = 'T'; | |
1271 | 12 | WritePipe(pipe_terminate_[1], &buf, 1); | |
1272 | 12 | pthread_join(thread_upload_, NULL); | |
1273 | } | ||
1274 | 12 | ClosePipe(pipe_terminate_); | |
1275 | 12 | ClosePipe(pipe_jobs_); | |
1276 | 12 | ClosePipe(pipe_completed_); | |
1277 | |||
1278 | 12 | set<CURL *>::iterator i = pool_handles_idle_->begin(); | |
1279 | 12 | const set<CURL *>::const_iterator iEnd = pool_handles_idle_->end(); | |
1280 |
2/2✓ Branch 2 taken 20 times.
✓ Branch 3 taken 12 times.
|
32 | for (; i != iEnd; ++i) { |
1281 | 20 | curl_easy_cleanup(*i); | |
1282 | } | ||
1283 | |||
1284 | 12 | set<S3FanOutDnsEntry *>::iterator is = sharehandles_->begin(); | |
1285 | 12 | const set<S3FanOutDnsEntry *>::const_iterator isEnd = sharehandles_->end(); | |
1286 |
2/2✓ Branch 2 taken 10 times.
✓ Branch 3 taken 12 times.
|
22 | for (; is != isEnd; ++is) { |
1287 | 10 | curl_share_cleanup((*is)->sharehandle); | |
1288 | 10 | curl_slist_free_all((*is)->clist); | |
1289 |
1/2✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
|
10 | delete *is; |
1290 | } | ||
1291 | 12 | pool_handles_idle_->clear(); | |
1292 | 12 | curl_sharehandles_->clear(); | |
1293 | 12 | sharehandles_->clear(); | |
1294 |
1/2✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
|
12 | delete active_requests_; |
1295 |
1/2✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
|
12 | delete pool_handles_idle_; |
1296 |
1/2✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
|
12 | delete pool_handles_inuse_; |
1297 |
1/2✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
|
12 | delete curl_sharehandles_; |
1298 |
1/2✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
|
12 | delete sharehandles_; |
1299 |
1/2✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
|
12 | delete user_agent_; |
1300 | 12 | curl_multi_cleanup(curl_multi_); | |
1301 | |||
1302 |
1/2✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
|
12 | delete statistics_; |
1303 | |||
1304 |
1/2✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
|
12 | delete available_jobs_; |
1305 | |||
1306 | 12 | curl_global_cleanup(); | |
1307 | 12 | } | |
1308 | |||
1309 | /** | ||
1310 | * Spawns the I/O worker thread. No way back except ~S3FanoutManager. | ||
1311 | */ | ||
1312 | 12 | void S3FanoutManager::Spawn() { | |
1313 | 12 | LogCvmfs(kLogS3Fanout, kLogDebug, "S3FanoutManager spawned"); | |
1314 | |||
1315 | 12 | const int retval = pthread_create(&thread_upload_, NULL, MainUpload, | |
1316 | static_cast<void *>(this)); | ||
1317 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
|
12 | assert(retval == 0); |
1318 | |||
1319 | 12 | atomic_inc32(&multi_threaded_); | |
1320 | 12 | } | |
1321 | |||
1322 | 3 | const Statistics &S3FanoutManager::GetStatistics() { return *statistics_; } | |
1323 | |||
1324 | /** | ||
1325 | * Push new job to be uploaded to the S3 cloud storage. | ||
1326 | */ | ||
1327 | 614 | void S3FanoutManager::PushNewJob(JobInfo *info) { | |
1328 | 614 | available_jobs_->Increment(); | |
1329 | 614 | WritePipe(pipe_jobs_[1], &info, sizeof(info)); | |
1330 | 614 | } | |
1331 | |||
1332 | /** | ||
1333 | * Push completed job to list of completed jobs | ||
1334 | */ | ||
1335 | 626 | void S3FanoutManager::PushCompletedJob(JobInfo *info) { | |
1336 | 626 | WritePipe(pipe_completed_[1], &info, sizeof(info)); | |
1337 | 626 | } | |
1338 | |||
1339 | /** | ||
1340 | * Pop completed job | ||
1341 | */ | ||
1342 | 626 | JobInfo *S3FanoutManager::PopCompletedJob() { | |
1343 | JobInfo *info; | ||
1344 |
1/2✓ Branch 1 taken 626 times.
✗ Branch 2 not taken.
|
626 | ReadPipe(pipe_completed_[0], &info, sizeof(info)); |
1345 | 626 | return info; | |
1346 | } | ||
1347 | |||
1348 | //------------------------------------------------------------------------------ | ||
1349 | |||
1350 | |||
1351 | ✗ | string Statistics::Print() const { | |
1352 | ✗ | return "Transferred Bytes: " + StringifyInt(uint64_t(transferred_bytes)) | |
1353 | ✗ | + "\n" + "Transfer duration: " + StringifyInt(uint64_t(transfer_time)) | |
1354 | ✗ | + " s\n" + "Number of requests: " + StringifyInt(num_requests) + "\n" | |
1355 | ✗ | + "Number of retries: " + StringifyInt(num_retries) + "\n"; | |
1356 | } | ||
1357 | |||
1358 | } // namespace s3fanout | ||
1359 |