Directory: | cvmfs/ |
---|---|
File: | cvmfs/s3fanout.cc |
Date: | 2023-02-05 02:36:10 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 561 | 800 | 70.1% |
Branches: | 402 | 1149 | 35.0% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | * | ||
4 | * Runs a thread using libcurls asynchronous I/O mode to push data to S3 | ||
5 | */ | ||
6 | |||
7 | #include <pthread.h> | ||
8 | |||
9 | #include <algorithm> | ||
10 | #include <cassert> | ||
11 | #include <cerrno> | ||
12 | #include <utility> | ||
13 | |||
14 | #include "cvmfs_config.h" | ||
15 | #include "s3fanout.h" | ||
16 | #include "upload_facility.h" | ||
17 | #include "util/concurrency.h" | ||
18 | #include "util/exception.h" | ||
19 | #include "util/platform.h" | ||
20 | #include "util/posix.h" | ||
21 | #include "util/string.h" | ||
22 | |||
23 | using namespace std; // NOLINT | ||
24 | |||
25 | namespace s3fanout { | ||
26 | |||
27 | const char *S3FanoutManager::kCacheControlCas = "Cache-Control: max-age=259200"; | ||
28 | const char *S3FanoutManager::kCacheControlDotCvmfs = | ||
29 | "Cache-Control: max-age=61"; | ||
30 | const unsigned S3FanoutManager::kDefault429ThrottleMs = 250; | ||
31 | const unsigned S3FanoutManager::kMax429ThrottleMs = 10000; | ||
32 | const unsigned S3FanoutManager::kThrottleReportIntervalSec = 10; | ||
33 | const unsigned S3FanoutManager::kDefaultHTTPPort = 80; | ||
34 | const unsigned S3FanoutManager::kDefaultHTTPSPort = 443; | ||
35 | |||
36 | |||
37 | /** | ||
38 | * Parses Retry-After and X-Retry-In headers attached to HTTP 429 responses | ||
39 | */ | ||
40 | 26 | void S3FanoutManager::DetectThrottleIndicator( | |
41 | const std::string &header, | ||
42 | JobInfo *info) | ||
43 | { | ||
44 | 26 | std::string value_str; | |
45 |
4/6✓ Branch 2 taken 26 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 26 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 11 times.
✓ Branch 10 taken 15 times.
|
26 | if (HasPrefix(header, "retry-after:", true)) |
46 |
1/2✓ Branch 1 taken 11 times.
✗ Branch 2 not taken.
|
11 | value_str = header.substr(12); |
47 |
4/6✓ Branch 2 taken 26 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 26 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 5 times.
✓ Branch 10 taken 21 times.
|
26 | if (HasPrefix(header, "x-retry-in:", true)) |
48 |
1/2✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
|
5 | value_str = header.substr(11); |
49 | |||
50 |
1/2✓ Branch 1 taken 26 times.
✗ Branch 2 not taken.
|
26 | value_str = Trim(value_str, true /* trim_newline */); |
51 |
2/2✓ Branch 1 taken 14 times.
✓ Branch 2 taken 12 times.
|
26 | if (!value_str.empty()) { |
52 |
1/2✓ Branch 1 taken 14 times.
✗ Branch 2 not taken.
|
14 | unsigned value_numeric = String2Uint64(value_str); |
53 | unsigned value_ms = | ||
54 |
4/7✓ Branch 2 taken 14 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 14 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 5 times.
✓ Branch 8 taken 9 times.
|
28 | HasSuffix(value_str, "ms", true /* ignore_case */) ? |
55 | 14 | value_numeric : (value_numeric * 1000); | |
56 |
2/2✓ Branch 0 taken 13 times.
✓ Branch 1 taken 1 times.
|
14 | if (value_ms > 0) |
57 | 13 | info->throttle_ms = std::min(value_ms, kMax429ThrottleMs); | |
58 | } | ||
59 | 26 | } | |
60 | |||
61 | |||
62 | /** | ||
63 | * Called by curl for every HTTP header. Not called for file:// transfers. | ||
64 | */ | ||
65 | 3682 | static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, | |
66 | void *info_link) { | ||
67 | 3682 | const size_t num_bytes = size*nmemb; | |
68 |
1/2✓ Branch 2 taken 3682 times.
✗ Branch 3 not taken.
|
3682 | const string header_line(static_cast<const char *>(ptr), num_bytes); |
69 | 3682 | JobInfo *info = static_cast<JobInfo *>(info_link); | |
70 | |||
71 | // Check for http status code errors | ||
72 |
4/6✓ Branch 2 taken 3682 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 3682 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 1226 times.
✓ Branch 10 taken 2456 times.
|
3682 | if (HasPrefix(header_line, "HTTP/1.", false)) { |
73 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 1226 times.
|
1226 | if (header_line.length() < 10) |
74 | ✗ | return 0; | |
75 | |||
76 | unsigned i; | ||
77 |
5/6✓ Branch 1 taken 2452 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1226 times.
✓ Branch 5 taken 1226 times.
✓ Branch 6 taken 1226 times.
✓ Branch 7 taken 1226 times.
|
2452 | for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {} |
78 | |||
79 |
2/2✓ Branch 1 taken 612 times.
✓ Branch 2 taken 614 times.
|
1226 | if (header_line[i] == '2') { |
80 | 612 | return num_bytes; | |
81 | } else { | ||
82 |
1/2✓ Branch 2 taken 614 times.
✗ Branch 3 not taken.
|
614 | LogCvmfs(kLogS3Fanout, kLogDebug, "http status error code [info %p]: %s", |
83 | info, header_line.c_str()); | ||
84 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 614 times.
|
614 | if (header_line.length() < i+3) { |
85 | ✗ | LogCvmfs(kLogS3Fanout, kLogStderr, "S3: invalid HTTP response '%s'", | |
86 | header_line.c_str()); | ||
87 | ✗ | info->error_code = kFailOther; | |
88 | ✗ | return 0; | |
89 | } | ||
90 |
2/4✓ Branch 3 taken 614 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 614 times.
✗ Branch 7 not taken.
|
614 | info->http_error = String2Int64(string(&header_line[i], 3)); |
91 | |||
92 |
2/6✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 610 times.
✗ Branch 5 not taken.
|
614 | switch (info->http_error) { |
93 | 4 | case 429: | |
94 | 4 | info->error_code = kFailRetry; | |
95 | 4 | info->throttle_ms = S3FanoutManager::kDefault429ThrottleMs; | |
96 | 4 | info->throttle_timestamp = platform_monotonic_time(); | |
97 | 4 | return num_bytes; | |
98 | ✗ | case 503: | |
99 | case 502: // Can happen if the S3 gateway-backend connection breaks | ||
100 | case 500: // sometimes see this as a transient error from S3 | ||
101 | ✗ | info->error_code = kFailServiceUnavailable; | |
102 | ✗ | break; | |
103 | ✗ | case 501: | |
104 | case 400: | ||
105 | ✗ | info->error_code = kFailBadRequest; | |
106 | ✗ | break; | |
107 | ✗ | case 403: | |
108 | ✗ | info->error_code = kFailForbidden; | |
109 | ✗ | break; | |
110 | 610 | case 404: | |
111 | 610 | info->error_code = kFailNotFound; | |
112 | 610 | return num_bytes; | |
113 | ✗ | default: | |
114 | ✗ | info->error_code = kFailOther; | |
115 | } | ||
116 | ✗ | return 0; | |
117 | } | ||
118 | } | ||
119 | |||
120 |
2/2✓ Branch 0 taken 12 times.
✓ Branch 1 taken 2444 times.
|
2456 | if (info->error_code == kFailRetry) { |
121 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | S3FanoutManager::DetectThrottleIndicator(header_line, info); |
122 | } | ||
123 | |||
124 | 2456 | return num_bytes; | |
125 | 3682 | } | |
126 | |||
127 | |||
128 | /** | ||
129 | * Called by curl for every new chunk to upload. | ||
130 | */ | ||
131 | 15669 | static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, | |
132 | void *info_link) { | ||
133 | 15669 | const size_t num_bytes = size*nmemb; | |
134 | 15669 | JobInfo *info = static_cast<JobInfo *>(info_link); | |
135 | |||
136 | 15669 | LogCvmfs(kLogS3Fanout, kLogDebug, "Data callback with %d bytes", num_bytes); | |
137 | |||
138 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 15669 times.
|
15669 | if (num_bytes == 0) |
139 | ✗ | return 0; | |
140 | |||
141 | 15669 | uint64_t read_bytes = info->origin->Read(ptr, num_bytes); | |
142 | |||
143 | 15669 | LogCvmfs(kLogS3Fanout, kLogDebug, | |
144 | "source buffer pushed out %d bytes", read_bytes); | ||
145 | |||
146 | 15669 | return read_bytes; | |
147 | } | ||
148 | |||
149 | |||
150 | /** | ||
151 | * For the time being, ignore all received information in the HTTP body | ||
152 | */ | ||
153 | ✗ | static size_t CallbackCurlBody( | |
154 | char * /*ptr*/, size_t size, size_t nmemb, void * /*userdata*/) | ||
155 | { | ||
156 | ✗ | return size * nmemb; | |
157 | } | ||
158 | |||
159 | |||
160 | /** | ||
161 | * Called when new curl sockets arrive or existing curl sockets depart. | ||
162 | */ | ||
163 | 2719 | int S3FanoutManager::CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, | |
164 | void *userp, void *socketp) { | ||
165 | 2719 | S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(userp); | |
166 | 2719 | LogCvmfs(kLogS3Fanout, kLogDebug, "CallbackCurlSocket called with easy " | |
167 | "handle %p, socket %d, action %d, up %d, " | ||
168 | "sp %d, fds_inuse %d, jobs %d", | ||
169 | easy, s, action, userp, | ||
170 | socketp, s3fanout_mgr->watch_fds_inuse_, | ||
171 | 2719 | s3fanout_mgr->available_jobs_->Get()); | |
172 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2719 times.
|
2719 | if (action == CURL_POLL_NONE) |
173 | ✗ | return 0; | |
174 | |||
175 | // Find s in watch_fds_ | ||
176 | // First 2 fds are job and terminate pipes (not curl related) | ||
177 | unsigned index; | ||
178 |
2/2✓ Branch 0 taken 4385 times.
✓ Branch 1 taken 1226 times.
|
5611 | for (index = 2; index < s3fanout_mgr->watch_fds_inuse_; ++index) { |
179 |
2/2✓ Branch 0 taken 1493 times.
✓ Branch 1 taken 2892 times.
|
4385 | if (s3fanout_mgr->watch_fds_[index].fd == s) |
180 | 1493 | break; | |
181 | } | ||
182 | // Or create newly | ||
183 |
2/2✓ Branch 0 taken 1226 times.
✓ Branch 1 taken 1493 times.
|
2719 | if (index == s3fanout_mgr->watch_fds_inuse_) { |
184 | // Extend array if necessary | ||
185 |
2/2✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1224 times.
|
1226 | if (s3fanout_mgr->watch_fds_inuse_ == s3fanout_mgr->watch_fds_size_) { |
186 | 2 | s3fanout_mgr->watch_fds_size_ *= 2; | |
187 | 2 | s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>( | |
188 | 2 | srealloc(s3fanout_mgr->watch_fds_, | |
189 | 2 | s3fanout_mgr->watch_fds_size_*sizeof(struct pollfd))); | |
190 | } | ||
191 | 1226 | s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].fd = s; | |
192 | 1226 | s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].events = 0; | |
193 | 1226 | s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].revents = 0; | |
194 | 1226 | s3fanout_mgr->watch_fds_inuse_++; | |
195 | } | ||
196 | |||
197 |
4/5✓ Branch 0 taken 1226 times.
✓ Branch 1 taken 3 times.
✓ Branch 2 taken 264 times.
✓ Branch 3 taken 1226 times.
✗ Branch 4 not taken.
|
2719 | switch (action) { |
198 | 1226 | case CURL_POLL_IN: | |
199 | 1226 | s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI; | |
200 | 1226 | break; | |
201 | 3 | case CURL_POLL_OUT: | |
202 | 3 | s3fanout_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND; | |
203 | 3 | break; | |
204 | 264 | case CURL_POLL_INOUT: | |
205 | 264 | s3fanout_mgr->watch_fds_[index].events = | |
206 | POLLIN | POLLPRI | POLLOUT | POLLWRBAND; | ||
207 | 264 | break; | |
208 | 1226 | case CURL_POLL_REMOVE: | |
209 |
2/2✓ Branch 0 taken 195 times.
✓ Branch 1 taken 1031 times.
|
1226 | if (index < s3fanout_mgr->watch_fds_inuse_-1) |
210 | 195 | s3fanout_mgr->watch_fds_[index] = | |
211 | 195 | s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_-1]; | |
212 | 1226 | s3fanout_mgr->watch_fds_inuse_--; | |
213 | // Shrink array if necessary | ||
214 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1226 times.
|
1226 | if ((s3fanout_mgr->watch_fds_inuse_ > s3fanout_mgr->watch_fds_max_) && |
215 | ✗ | (s3fanout_mgr->watch_fds_inuse_ < s3fanout_mgr->watch_fds_size_/2)) { | |
216 | ✗ | s3fanout_mgr->watch_fds_size_ /= 2; | |
217 | ✗ | s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>( | |
218 | ✗ | srealloc(s3fanout_mgr->watch_fds_, | |
219 | ✗ | s3fanout_mgr->watch_fds_size_*sizeof(struct pollfd))); | |
220 | } | ||
221 | 1226 | break; | |
222 | ✗ | default: | |
223 | ✗ | PANIC(NULL); | |
224 | } | ||
225 | |||
226 | 2719 | return 0; | |
227 | } | ||
228 | |||
229 | |||
230 | /** | ||
231 | * Worker thread event loop. | ||
232 | */ | ||
233 | 12 | void *S3FanoutManager::MainUpload(void *data) { | |
234 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread started"); |
235 | 12 | S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(data); | |
236 | |||
237 | 12 | s3fanout_mgr->InitPipeWatchFds(); | |
238 | |||
239 | // Don't schedule more jobs into the multi handle than the maximum number of | ||
240 | // parallel connections. This should prevent starvation and thus a timeout | ||
241 | // of the authorization header (CVM-1339). | ||
242 | 12 | unsigned jobs_in_flight = 0; | |
243 | |||
244 | while (true) { | ||
245 | // Check events with 100ms timeout | ||
246 | 16653 | int timeout_ms = 100; | |
247 |
1/2✓ Branch 1 taken 16653 times.
✗ Branch 2 not taken.
|
16653 | int retval = poll(s3fanout_mgr->watch_fds_, s3fanout_mgr->watch_fds_inuse_, |
248 | timeout_ms); | ||
249 |
2/2✓ Branch 0 taken 89 times.
✓ Branch 1 taken 16564 times.
|
16653 | if (retval == 0) { |
250 | // Handle timeout | ||
251 | 89 | int still_running = 0; | |
252 |
1/2✓ Branch 1 taken 89 times.
✗ Branch 2 not taken.
|
89 | retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_, |
253 | CURL_SOCKET_TIMEOUT, | ||
254 | 0, | ||
255 | &still_running); | ||
256 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 89 times.
|
89 | if (retval != CURLM_OK) { |
257 | ✗ | LogCvmfs(kLogS3Fanout, kLogStderr, "Error, timeout due to: %d", retval); | |
258 | ✗ | assert(retval == CURLM_OK); | |
259 | } | ||
260 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 16564 times.
|
16564 | } else if (retval < 0) { |
261 | ✗ | assert(errno == EINTR); | |
262 | ✗ | continue; | |
263 | } | ||
264 | |||
265 | // Terminate I/O thread | ||
266 |
2/2✓ Branch 0 taken 12 times.
✓ Branch 1 taken 16641 times.
|
16653 | if (s3fanout_mgr->watch_fds_[0].revents) |
267 | 12 | break; | |
268 | |||
269 | // New job incoming | ||
270 |
2/2✓ Branch 0 taken 614 times.
✓ Branch 1 taken 16027 times.
|
16641 | if (s3fanout_mgr->watch_fds_[1].revents) { |
271 | 614 | s3fanout_mgr->watch_fds_[1].revents = 0; | |
272 | JobInfo *info; | ||
273 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | ReadPipe(s3fanout_mgr->pipe_jobs_[0], &info, sizeof(info)); |
274 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | CURL *handle = s3fanout_mgr->AcquireCurlHandle(); |
275 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 614 times.
|
614 | if (handle == NULL) { |
276 | ✗ | PANIC(kLogStderr, "Failed to acquire CURL handle."); | |
277 | } | ||
278 | s3fanout::Failures init_failure = | ||
279 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | s3fanout_mgr->InitializeRequest(info, handle); |
280 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 614 times.
|
614 | if (init_failure != s3fanout::kFailOk) { |
281 | ✗ | PANIC(kLogStderr, | |
282 | "Failed to initialize CURL handle (error: %d - %s | errno: %d)", | ||
283 | init_failure, Code2Ascii(init_failure), errno); | ||
284 | } | ||
285 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | s3fanout_mgr->SetUrlOptions(info); |
286 | |||
287 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | curl_multi_add_handle(s3fanout_mgr->curl_multi_, handle); |
288 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | s3fanout_mgr->active_requests_->insert(info); |
289 | 614 | jobs_in_flight++; | |
290 | 614 | int still_running = 0, retval = 0; | |
291 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_, |
292 | CURL_SOCKET_TIMEOUT, | ||
293 | 0, | ||
294 | &still_running); | ||
295 | |||
296 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | LogCvmfs(kLogS3Fanout, kLogDebug, |
297 | "curl_multi_socket_action: %d - %d", | ||
298 | retval, still_running); | ||
299 | } | ||
300 | |||
301 | |||
302 | // Activity on curl sockets | ||
303 | // Within this loop the curl_multi_socket_action() may cause socket(s) | ||
304 | // to be removed from watch_fds_. If a socket is removed it is replaced | ||
305 | // by the socket at the end of the array and the inuse count is decreased. | ||
306 | // Therefore loop over the array in reverse order. | ||
307 | // First 2 fds are job and terminate pipes (not curl related) | ||
308 |
2/2✓ Branch 0 taken 37424 times.
✓ Branch 1 taken 16641 times.
|
54065 | for (int32_t i = s3fanout_mgr->watch_fds_inuse_ - 1; i >= 2; --i) { |
309 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 37424 times.
|
37424 | if (static_cast<uint32_t>(i) >= s3fanout_mgr->watch_fds_inuse_) { |
310 | ✗ | continue; | |
311 | } | ||
312 |
2/2✓ Branch 0 taken 17066 times.
✓ Branch 1 taken 20358 times.
|
37424 | if (s3fanout_mgr->watch_fds_[i].revents) { |
313 | 17066 | int ev_bitmask = 0; | |
314 |
2/2✓ Branch 0 taken 1834 times.
✓ Branch 1 taken 15232 times.
|
17066 | if (s3fanout_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI)) |
315 | 1834 | ev_bitmask |= CURL_CSELECT_IN; | |
316 |
2/2✓ Branch 0 taken 15232 times.
✓ Branch 1 taken 1834 times.
|
17066 | if (s3fanout_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND)) |
317 | 15232 | ev_bitmask |= CURL_CSELECT_OUT; | |
318 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 17066 times.
|
17066 | if (s3fanout_mgr->watch_fds_[i].revents & |
319 | (POLLERR | POLLHUP | POLLNVAL)) | ||
320 | ✗ | ev_bitmask |= CURL_CSELECT_ERR; | |
321 | 17066 | s3fanout_mgr->watch_fds_[i].revents = 0; | |
322 | |||
323 | 17066 | int still_running = 0; | |
324 | 17066 | retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_, | |
325 |
1/2✓ Branch 1 taken 17066 times.
✗ Branch 2 not taken.
|
17066 | s3fanout_mgr->watch_fds_[i].fd, |
326 | ev_bitmask, | ||
327 | &still_running); | ||
328 | } | ||
329 | } | ||
330 | |||
331 | // Check if transfers are completed | ||
332 | CURLMsg *curl_msg; | ||
333 | int msgs_in_queue; | ||
334 |
3/4✓ Branch 1 taken 17867 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1226 times.
✓ Branch 4 taken 16641 times.
|
17867 | while ((curl_msg = curl_multi_info_read(s3fanout_mgr->curl_multi_, |
335 | &msgs_in_queue))) | ||
336 | { | ||
337 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1226 times.
|
1226 | assert(curl_msg->msg == CURLMSG_DONE); |
338 | |||
339 | 1226 | s3fanout_mgr->statistics_->num_requests++; | |
340 | JobInfo *info; | ||
341 | 1226 | CURL *easy_handle = curl_msg->easy_handle; | |
342 | 1226 | int curl_error = curl_msg->data.result; | |
343 |
1/2✓ Branch 1 taken 1226 times.
✗ Branch 2 not taken.
|
1226 | curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info); |
344 | |||
345 |
1/2✓ Branch 1 taken 1226 times.
✗ Branch 2 not taken.
|
1226 | curl_multi_remove_handle(s3fanout_mgr->curl_multi_, easy_handle); |
346 |
3/4✓ Branch 1 taken 1226 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 612 times.
✓ Branch 4 taken 614 times.
|
1226 | if (s3fanout_mgr->VerifyAndFinalize(curl_error, info)) { |
347 |
1/2✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
|
612 | curl_multi_add_handle(s3fanout_mgr->curl_multi_, easy_handle); |
348 | 612 | int still_running = 0; | |
349 |
1/2✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
|
612 | curl_multi_socket_action(s3fanout_mgr->curl_multi_, |
350 | CURL_SOCKET_TIMEOUT, | ||
351 | 0, | ||
352 | &still_running); | ||
353 | } else { | ||
354 | // Return easy handle into pool and write result back | ||
355 | 614 | jobs_in_flight--; | |
356 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | s3fanout_mgr->active_requests_->erase(info); |
357 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | s3fanout_mgr->ReleaseCurlHandle(info, easy_handle); |
358 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | s3fanout_mgr->available_jobs_->Decrement(); |
359 | |||
360 | // Add to list of completed jobs | ||
361 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | s3fanout_mgr->PushCompletedJob(info); |
362 | } | ||
363 | } | ||
364 | 16641 | } | |
365 | |||
366 | 12 | set<CURL *>::iterator i = s3fanout_mgr->pool_handles_inuse_->begin(); | |
367 | const set<CURL *>::const_iterator i_end = | ||
368 | 12 | s3fanout_mgr->pool_handles_inuse_->end(); | |
369 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 12 times.
|
12 | for (; i != i_end; ++i) { |
370 | ✗ | curl_multi_remove_handle(s3fanout_mgr->curl_multi_, *i); | |
371 | ✗ | curl_easy_cleanup(*i); | |
372 | } | ||
373 | 12 | s3fanout_mgr->pool_handles_inuse_->clear(); | |
374 | 12 | free(s3fanout_mgr->watch_fds_); | |
375 | |||
376 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread terminated"); |
377 | 12 | return NULL; | |
378 | } | ||
379 | |||
380 | |||
381 | /** | ||
382 | * Gets an idle CURL handle from the pool. Creates a new one and adds it to | ||
383 | * the pool if necessary. | ||
384 | */ | ||
385 | 614 | CURL *S3FanoutManager::AcquireCurlHandle() const { | |
386 | CURL *handle; | ||
387 | |||
388 | 614 | MutexLockGuard guard(curl_handle_lock_); | |
389 | |||
390 |
2/2✓ Branch 1 taken 49 times.
✓ Branch 2 taken 565 times.
|
614 | if (pool_handles_idle_->empty()) { |
391 | CURLcode retval; | ||
392 | |||
393 | // Create a new handle | ||
394 |
1/2✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
|
49 | handle = curl_easy_init(); |
395 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 49 times.
|
49 | assert(handle != NULL); |
396 | |||
397 | // Other settings | ||
398 |
1/2✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
|
49 | retval = curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1); |
399 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 49 times.
|
49 | assert(retval == CURLE_OK); |
400 |
1/2✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
|
49 | retval = curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION, |
401 | CallbackCurlHeader); | ||
402 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 49 times.
|
49 | assert(retval == CURLE_OK); |
403 |
1/2✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
|
49 | retval = curl_easy_setopt(handle, CURLOPT_READFUNCTION, CallbackCurlData); |
404 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 49 times.
|
49 | assert(retval == CURLE_OK); |
405 |
1/2✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
|
49 | retval = curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlBody); |
406 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 49 times.
|
49 | assert(retval == CURLE_OK); |
407 | } else { | ||
408 | 565 | handle = *(pool_handles_idle_->begin()); | |
409 |
1/2✓ Branch 2 taken 565 times.
✗ Branch 3 not taken.
|
565 | pool_handles_idle_->erase(pool_handles_idle_->begin()); |
410 | } | ||
411 | |||
412 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | pool_handles_inuse_->insert(handle); |
413 | |||
414 | 614 | return handle; | |
415 | 614 | } | |
416 | |||
417 | |||
418 | 614 | void S3FanoutManager::ReleaseCurlHandle(JobInfo *info, CURL *handle) const { | |
419 |
1/2✓ Branch 0 taken 614 times.
✗ Branch 1 not taken.
|
614 | if (info->http_headers) { |
420 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | curl_slist_free_all(info->http_headers); |
421 | 614 | info->http_headers = NULL; | |
422 | } | ||
423 | |||
424 | 614 | MutexLockGuard guard(curl_handle_lock_); | |
425 | |||
426 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | set<CURL *>::iterator elem = pool_handles_inuse_->find(handle); |
427 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 614 times.
|
614 | assert(elem != pool_handles_inuse_->end()); |
428 | |||
429 |
2/2✓ Branch 1 taken 29 times.
✓ Branch 2 taken 585 times.
|
614 | if (pool_handles_idle_->size() > config_.pool_max_handles) { |
430 |
1/2✓ Branch 1 taken 29 times.
✗ Branch 2 not taken.
|
29 | CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, NULL); |
431 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
|
29 | assert(retval == CURLE_OK); |
432 |
1/2✓ Branch 1 taken 29 times.
✗ Branch 2 not taken.
|
29 | curl_easy_cleanup(handle); |
433 | std::map<CURL *, S3FanOutDnsEntry *>::size_type retitems = | ||
434 |
1/2✓ Branch 1 taken 29 times.
✗ Branch 2 not taken.
|
29 | curl_sharehandles_->erase(handle); |
435 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
|
29 | assert(retitems == 1); |
436 | } else { | ||
437 |
1/2✓ Branch 1 taken 585 times.
✗ Branch 2 not taken.
|
585 | pool_handles_idle_->insert(handle); |
438 | } | ||
439 | |||
440 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | pool_handles_inuse_->erase(elem); |
441 | 614 | } | |
442 | |||
443 | 12 | void S3FanoutManager::InitPipeWatchFds() { | |
444 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
|
12 | assert(watch_fds_inuse_ == 0); |
445 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
|
12 | assert(watch_fds_size_ >= 2); |
446 | 12 | watch_fds_[0].fd = pipe_terminate_[0]; | |
447 | 12 | watch_fds_[0].events = POLLIN | POLLPRI; | |
448 | 12 | watch_fds_[0].revents = 0; | |
449 | 12 | ++watch_fds_inuse_; | |
450 | 12 | watch_fds_[1].fd = pipe_jobs_[0]; | |
451 | 12 | watch_fds_[1].events = POLLIN | POLLPRI; | |
452 | 12 | watch_fds_[1].revents = 0; | |
453 | 12 | ++watch_fds_inuse_; | |
454 | 12 | } | |
455 | |||
456 | /** | ||
457 | * The Amazon AWS 2 authorization header according to | ||
458 | * http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html#ConstructingTheAuthenticationHeader | ||
459 | */ | ||
460 | 1222 | bool S3FanoutManager::MkV2Authz(const JobInfo &info, vector<string> *headers) | |
461 | const | ||
462 | { | ||
463 | 1222 | string payload_hash; | |
464 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | bool retval = MkPayloadHash(info, &payload_hash); |
465 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | if (!retval) |
466 | ✗ | return false; | |
467 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | string content_type = GetContentType(info); |
468 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | string request = GetRequestString(info); |
469 | |||
470 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | string timestamp = RfcTimestamp(); |
471 |
2/4✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
|
2444 | string to_sign = request + "\n" + |
472 |
2/4✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
|
2444 | payload_hash + "\n" + |
473 |
2/4✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
|
2444 | content_type + "\n" + |
474 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | timestamp + "\n"; |
475 |
2/4✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1222 times.
✗ Branch 4 not taken.
|
1222 | if (config_.x_amz_acl != "") { |
476 |
2/4✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
|
2444 | to_sign += "x-amz-acl:" + config_.x_amz_acl + "\n" + // default ACL |
477 |
5/10✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 1222 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 1222 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 1222 times.
✗ Branch 14 not taken.
|
2444 | "/" + config_.bucket + "/" + info.object_key; |
478 | } | ||
479 |
1/2✓ Branch 3 taken 1222 times.
✗ Branch 4 not taken.
|
1222 | LogCvmfs(kLogS3Fanout, kLogDebug, "%s string to sign for: %s", |
480 | request.c_str(), info.object_key.c_str()); | ||
481 | |||
482 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | shash::Any hmac; |
483 | 1222 | hmac.algorithm = shash::kSha1; | |
484 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | shash::Hmac(config_.secret_key, |
485 | 1222 | reinterpret_cast<const unsigned char *>(to_sign.data()), | |
486 | 1222 | to_sign.length(), &hmac); | |
487 | |||
488 |
4/8✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 1222 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 1222 times.
✗ Branch 11 not taken.
|
1222 | headers->push_back("Authorization: AWS " + config_.access_key + ":" + |
489 |
2/4✓ Branch 2 taken 1222 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1222 times.
✗ Branch 6 not taken.
|
3666 | Base64(string(reinterpret_cast<char *>(hmac.digest), |
490 | 1222 | hmac.GetDigestSize()))); | |
491 |
2/4✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
|
1222 | headers->push_back("Date: " + timestamp); |
492 |
2/4✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
|
1222 | headers->push_back("X-Amz-Acl: " + config_.x_amz_acl); |
493 |
2/2✓ Branch 1 taken 608 times.
✓ Branch 2 taken 614 times.
|
1222 | if (!payload_hash.empty()) |
494 |
2/4✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 608 times.
✗ Branch 5 not taken.
|
608 | headers->push_back("Content-MD5: " + payload_hash); |
495 |
2/2✓ Branch 1 taken 608 times.
✓ Branch 2 taken 614 times.
|
1222 | if (!content_type.empty()) |
496 |
2/4✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 608 times.
✗ Branch 5 not taken.
|
608 | headers->push_back("Content-Type: " + content_type); |
497 | 1222 | return true; | |
498 | 1222 | } | |
499 | |||
500 | |||
501 | ✗ | string S3FanoutManager::GetUriEncode(const string &val, bool encode_slash) | |
502 | const | ||
503 | { | ||
504 | ✗ | string result; | |
505 | ✗ | const unsigned len = val.length(); | |
506 | ✗ | result.reserve(len); | |
507 | ✗ | for (unsigned i = 0; i < len; ++i) { | |
508 | ✗ | char c = val[i]; | |
509 | ✗ | if ((c >= 'A' && c <= 'Z') || | |
510 | ✗ | (c >= 'a' && c <= 'z') || | |
511 | ✗ | (c >= '0' && c <= '9') || | |
512 | ✗ | c == '_' || c == '-' || c == '~' || c == '.') | |
513 | { | ||
514 | ✗ | result.push_back(c); | |
515 | ✗ | } else if (c == '/') { | |
516 | ✗ | if (encode_slash) { | |
517 | ✗ | result += "%2F"; | |
518 | } else { | ||
519 | ✗ | result.push_back(c); | |
520 | } | ||
521 | } else { | ||
522 | ✗ | result.push_back('%'); | |
523 | ✗ | result.push_back((c / 16) + ((c / 16 <= 9) ? '0' : 'A'-10)); | |
524 | ✗ | result.push_back((c % 16) + ((c % 16 <= 9) ? '0' : 'A'-10)); | |
525 | } | ||
526 | } | ||
527 | ✗ | return result; | |
528 | } | ||
529 | |||
530 | |||
531 | ✗ | string S3FanoutManager::GetAwsV4SigningKey(const string &date) const | |
532 | { | ||
533 | ✗ | if (last_signing_key_.first == date) | |
534 | ✗ | return last_signing_key_.second; | |
535 | |||
536 | ✗ | string date_key = shash::Hmac256("AWS4" + config_.secret_key, date, true); | |
537 | ✗ | string date_region_key = shash::Hmac256(date_key, config_.region, true); | |
538 | ✗ | string date_region_service_key = shash::Hmac256(date_region_key, "s3", true); | |
539 | string signing_key = | ||
540 | ✗ | shash::Hmac256(date_region_service_key, "aws4_request", true); | |
541 | ✗ | last_signing_key_.first = date; | |
542 | ✗ | last_signing_key_.second = signing_key; | |
543 | ✗ | return signing_key; | |
544 | } | ||
545 | |||
546 | |||
547 | /** | ||
548 | * The Amazon AWS4 authorization header according to | ||
549 | * http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-auth-using-authorization-header.html | ||
550 | */ | ||
551 | ✗ | bool S3FanoutManager::MkV4Authz(const JobInfo &info, vector<string> *headers) | |
552 | const | ||
553 | { | ||
554 | ✗ | string payload_hash; | |
555 | ✗ | bool retval = MkPayloadHash(info, &payload_hash); | |
556 | ✗ | if (!retval) | |
557 | ✗ | return false; | |
558 | ✗ | string content_type = GetContentType(info); | |
559 | ✗ | string timestamp = IsoTimestamp(); | |
560 | ✗ | string date = timestamp.substr(0, 8); | |
561 | ✗ | vector<string> tokens = SplitString(complete_hostname_, ':'); | |
562 | ✗ | assert(tokens.size() <= 2); | |
563 | ✗ | string canonical_hostname = tokens[0]; | |
564 | |||
565 | // if we could split the hostname in two and if the port is *NOT* a default | ||
566 | // one | ||
567 | ✗ | if (tokens.size() == 2 && !((String2Uint64(tokens[1]) == kDefaultHTTPPort) || | |
568 | ✗ | (String2Uint64(tokens[1]) == kDefaultHTTPSPort))) | |
569 | ✗ | canonical_hostname += ":" + tokens[1]; | |
570 | |||
571 | ✗ | string signed_headers; | |
572 | ✗ | string canonical_headers; | |
573 | ✗ | if (!content_type.empty()) { | |
574 | ✗ | signed_headers += "content-type;"; | |
575 | ✗ | headers->push_back("Content-Type: " + content_type); | |
576 | ✗ | canonical_headers += "content-type:" + content_type + "\n"; | |
577 | } | ||
578 | ✗ | if (config_.x_amz_acl != "") { | |
579 | ✗ | signed_headers += "host;x-amz-acl;x-amz-content-sha256;x-amz-date"; | |
580 | } else { | ||
581 | ✗ | signed_headers += "host;x-amz-content-sha256;x-amz-date"; | |
582 | } | ||
583 | canonical_headers += | ||
584 | ✗ | "host:" + canonical_hostname + "\n"; | |
585 | ✗ | if (config_.x_amz_acl != "") { | |
586 | ✗ | canonical_headers += "x-amz-acl:" + config_.x_amz_acl +"\n"; | |
587 | } | ||
588 | ✗ | canonical_headers += "x-amz-content-sha256:" + payload_hash + "\n" + | |
589 | ✗ | "x-amz-date:" + timestamp + "\n"; | |
590 | |||
591 | ✗ | string scope = date + "/" + config_.region + "/s3/aws4_request"; | |
592 | ✗ | string uri = config_.dns_buckets ? | |
593 | ✗ | (string("/") + info.object_key) : | |
594 | ✗ | (string("/") + config_.bucket + "/" + info.object_key); | |
595 | |||
596 | string canonical_request = | ||
597 | ✗ | GetRequestString(info) + "\n" + | |
598 | ✗ | GetUriEncode(uri, false) + "\n" + | |
599 | ✗ | "\n" + | |
600 | ✗ | canonical_headers + "\n" + | |
601 | ✗ | signed_headers + "\n" + | |
602 | ✗ | payload_hash; | |
603 | |||
604 | ✗ | string hash_request = shash::Sha256String(canonical_request.c_str()); | |
605 | |||
606 | string string_to_sign = | ||
607 | ✗ | "AWS4-HMAC-SHA256\n" + | |
608 | ✗ | timestamp + "\n" + | |
609 | ✗ | scope + "\n" + | |
610 | ✗ | hash_request; | |
611 | |||
612 | ✗ | string signing_key = GetAwsV4SigningKey(date); | |
613 | ✗ | string signature = shash::Hmac256(signing_key, string_to_sign); | |
614 | |||
615 | ✗ | headers->push_back("X-Amz-Acl: " + config_.x_amz_acl); | |
616 | ✗ | headers->push_back("X-Amz-Content-Sha256: " + payload_hash); | |
617 | ✗ | headers->push_back("X-Amz-Date: " + timestamp); | |
618 | ✗ | headers->push_back( | |
619 | "Authorization: AWS4-HMAC-SHA256 " | ||
620 | ✗ | "Credential=" + config_.access_key + "/" + scope + "," | |
621 | ✗ | "SignedHeaders=" + signed_headers + "," | |
622 | ✗ | "Signature=" + signature); | |
623 | ✗ | return true; | |
624 | } | ||
625 | |||
626 | /** | ||
627 | * The Azure Blob authorization header according to | ||
628 | * https://docs.microsoft.com/en-us/rest/api/storageservices/authorize-with-shared-key | ||
629 | */ | ||
630 | ✗ | bool S3FanoutManager::MkAzureAuthz(const JobInfo &info, vector<string> *headers) | |
631 | const | ||
632 | { | ||
633 | ✗ | string timestamp = RfcTimestamp(); | |
634 | string canonical_headers = | ||
635 | ✗ | "x-ms-blob-type:BlockBlob\nx-ms-date:" + | |
636 | timestamp + | ||
637 | ✗ | "\nx-ms-version:2011-08-18"; | |
638 | string canonical_resource = | ||
639 | ✗ | "/" + config_.access_key + "/" + config_.bucket + "/" + info.object_key; | |
640 | |||
641 | ✗ | string string_to_sign; | |
642 | ✗ | if ((info.request == JobInfo::kReqHeadOnly) || | |
643 | ✗ | (info.request == JobInfo::kReqHeadPut) || | |
644 | ✗ | (info.request == JobInfo::kReqDelete)) { | |
645 | ✗ | string_to_sign = | |
646 | ✗ | GetRequestString(info) + | |
647 | ✗ | string("\n\n\n") + | |
648 | ✗ | "\n\n\n\n\n\n\n\n\n" + | |
649 | ✗ | canonical_headers + "\n" + | |
650 | ✗ | canonical_resource; | |
651 | } else { | ||
652 | string_to_sign = | ||
653 | ✗ | GetRequestString(info) + | |
654 | ✗ | string("\n\n\n") + | |
655 | ✗ | string(StringifyInt(info.origin->GetSize())) + "\n\n\n\n\n\n\n\n\n" + | |
656 | ✗ | canonical_headers + "\n" + | |
657 | ✗ | canonical_resource; | |
658 | } | ||
659 | |||
660 | ✗ | string signing_key; | |
661 | ✗ | int retval = Debase64(config_.secret_key, &signing_key); | |
662 | ✗ | if (!retval) | |
663 | ✗ | return false; | |
664 | |||
665 | ✗ | string signature = shash::Hmac256(signing_key, string_to_sign, true); | |
666 | |||
667 | ✗ | headers->push_back("x-ms-date: " + timestamp); | |
668 | ✗ | headers->push_back("x-ms-version: 2011-08-18"); | |
669 | ✗ | headers->push_back( | |
670 | ✗ | "Authorization: SharedKey " + config_.access_key + ":" + Base64(signature)); | |
671 | ✗ | headers->push_back("x-ms-blob-type: BlockBlob"); | |
672 | ✗ | return true; | |
673 | } | ||
674 | |||
675 | 1222 | void S3FanoutManager::InitializeDnsSettingsCurl( | |
676 | CURL *handle, | ||
677 | CURLSH *sharehandle, | ||
678 | curl_slist *clist) const | ||
679 | { | ||
680 | 1222 | CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, sharehandle); | |
681 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | assert(retval == CURLE_OK); |
682 | 1222 | retval = curl_easy_setopt(handle, CURLOPT_RESOLVE, clist); | |
683 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | assert(retval == CURLE_OK); |
684 | 1222 | } | |
685 | |||
686 | |||
687 | 1222 | int S3FanoutManager::InitializeDnsSettings( | |
688 | CURL *handle, | ||
689 | std::string host_with_port) const | ||
690 | { | ||
691 | // Use existing handle | ||
692 | std::map<CURL *, S3FanOutDnsEntry *>::const_iterator it = | ||
693 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | curl_sharehandles_->find(handle); |
694 |
2/2✓ Branch 3 taken 1173 times.
✓ Branch 4 taken 49 times.
|
1222 | if (it != curl_sharehandles_->end()) { |
695 |
1/2✓ Branch 1 taken 1173 times.
✗ Branch 2 not taken.
|
1173 | InitializeDnsSettingsCurl(handle, it->second->sharehandle, |
696 | 1173 | it->second->clist); | |
697 | 1173 | return 0; | |
698 | } | ||
699 | |||
700 | // Add protocol information for extraction of fields for DNS | ||
701 |
2/4✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 49 times.
✗ Branch 4 not taken.
|
49 | if (!IsHttpUrl(host_with_port)) |
702 |
2/4✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 49 times.
✗ Branch 5 not taken.
|
49 | host_with_port = config_.protocol + "://" + host_with_port; |
703 |
1/2✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
|
49 | std::string remote_host = dns::ExtractHost(host_with_port); |
704 |
1/2✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
|
49 | std::string remote_port = dns::ExtractPort(host_with_port); |
705 | |||
706 | // If we have the name already resolved, use the least used IP | ||
707 | 49 | S3FanOutDnsEntry *useme = NULL; | |
708 | 49 | unsigned int usemin = UINT_MAX; | |
709 | 49 | std::set<S3FanOutDnsEntry *>::iterator its3 = sharehandles_->begin(); | |
710 |
2/2✓ Branch 3 taken 39 times.
✓ Branch 4 taken 49 times.
|
88 | for (; its3 != sharehandles_->end(); ++its3) { |
711 |
1/2✓ Branch 2 taken 39 times.
✗ Branch 3 not taken.
|
39 | if ((*its3)->dns_name == remote_host) { |
712 |
1/2✓ Branch 1 taken 39 times.
✗ Branch 2 not taken.
|
39 | if (usemin >= (*its3)->counter) { |
713 | 39 | usemin = (*its3)->counter; | |
714 | 39 | useme = (*its3); | |
715 | } | ||
716 | } | ||
717 | } | ||
718 |
2/2✓ Branch 0 taken 39 times.
✓ Branch 1 taken 10 times.
|
49 | if (useme != NULL) { |
719 |
1/2✓ Branch 2 taken 39 times.
✗ Branch 3 not taken.
|
39 | curl_sharehandles_->insert(std::pair<CURL *, |
720 | S3FanOutDnsEntry *>(handle, useme)); | ||
721 | 39 | useme->counter++; | |
722 |
1/2✓ Branch 1 taken 39 times.
✗ Branch 2 not taken.
|
39 | InitializeDnsSettingsCurl(handle, useme->sharehandle, useme->clist); |
723 | 39 | return 0; | |
724 | } | ||
725 | |||
726 | // We need to resolve the hostname | ||
727 | // TODO(ssheikki): support ipv6 also... if (opt_ipv4_only_) | ||
728 |
1/2✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
|
10 | dns::Host host = resolver_->Resolve(remote_host); |
729 |
1/2✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
|
10 | set<string> ipv4_addresses = host.ipv4_addresses(); |
730 | 10 | std::set<string>::iterator its = ipv4_addresses.begin(); | |
731 | 10 | S3FanOutDnsEntry *dnse = NULL; | |
732 |
2/2✓ Branch 3 taken 10 times.
✓ Branch 4 taken 10 times.
|
20 | for ( ; its != ipv4_addresses.end(); ++its) { |
733 |
2/4✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 10 times.
✗ Branch 5 not taken.
|
10 | dnse = new S3FanOutDnsEntry(); |
734 | 10 | dnse->counter = 0; | |
735 |
1/2✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
|
10 | dnse->dns_name = remote_host; |
736 |
3/10✗ Branch 1 not taken.
✓ Branch 2 taken 10 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 8 taken 10 times.
✗ Branch 9 not taken.
✗ Branch 12 not taken.
✓ Branch 13 taken 10 times.
✗ Branch 15 not taken.
✗ Branch 16 not taken.
|
10 | dnse->port = remote_port.size() == 0 ? "80" : remote_port; |
737 |
1/2✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
|
10 | dnse->ip = *its; |
738 | 10 | dnse->clist = NULL; | |
739 |
1/2✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
|
10 | dnse->clist = curl_slist_append(dnse->clist, |
740 |
1/2✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
|
20 | (dnse->dns_name+":"+ |
741 |
2/4✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 10 times.
✗ Branch 5 not taken.
|
20 | dnse->port+":"+ |
742 |
1/2✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
|
10 | dnse->ip).c_str()); |
743 |
1/2✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
|
10 | dnse->sharehandle = curl_share_init(); |
744 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
|
10 | assert(dnse->sharehandle != NULL); |
745 |
1/2✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
|
10 | CURLSHcode share_retval = curl_share_setopt(dnse->sharehandle, |
746 | CURLSHOPT_SHARE, | ||
747 | CURL_LOCK_DATA_DNS); | ||
748 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
|
10 | assert(share_retval == CURLSHE_OK); |
749 |
1/2✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
|
10 | sharehandles_->insert(dnse); |
750 | } | ||
751 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
|
10 | if (dnse == NULL) { |
752 | ✗ | LogCvmfs(kLogS3Fanout, kLogStderr | kLogSyslogErr, | |
753 | "Error: DNS resolve failed for address '%s'.", | ||
754 | remote_host.c_str()); | ||
755 | ✗ | assert(dnse != NULL); | |
756 | ✗ | return -1; | |
757 | } | ||
758 |
1/2✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
|
10 | curl_sharehandles_->insert( |
759 | 10 | std::pair<CURL *, S3FanOutDnsEntry *>(handle, dnse)); | |
760 | 10 | dnse->counter++; | |
761 |
1/2✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
|
10 | InitializeDnsSettingsCurl(handle, dnse->sharehandle, dnse->clist); |
762 | |||
763 | 10 | return 0; | |
764 | 49 | } | |
765 | |||
766 | |||
767 | 1222 | bool S3FanoutManager::MkPayloadHash(const JobInfo &info, string *hex_hash) | |
768 | const | ||
769 | { | ||
770 |
2/2✓ Branch 0 taken 1217 times.
✓ Branch 1 taken 5 times.
|
1222 | if ((info.request == JobInfo::kReqHeadOnly) || |
771 |
2/2✓ Branch 0 taken 609 times.
✓ Branch 1 taken 608 times.
|
1217 | (info.request == JobInfo::kReqHeadPut) || |
772 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 608 times.
|
609 | (info.request == JobInfo::kReqDelete)) |
773 | { | ||
774 |
1/4✓ Branch 0 taken 614 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
|
614 | switch (config_.authz_method) { |
775 | 614 | case kAuthzAwsV2: | |
776 | 614 | hex_hash->clear(); | |
777 | 614 | break; | |
778 | ✗ | case kAuthzAwsV4: | |
779 | // Sha256 over empty string | ||
780 | *hex_hash = | ||
781 | ✗ | "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; | |
782 | ✗ | break; | |
783 | ✗ | case kAuthzAzure: | |
784 | // no payload hash required for Azure signature | ||
785 | ✗ | hex_hash->clear(); | |
786 | ✗ | break; | |
787 | ✗ | default: | |
788 | ✗ | PANIC(NULL); | |
789 | } | ||
790 | 614 | return true; | |
791 | } | ||
792 | |||
793 | // PUT, there is actually payload | ||
794 |
1/2✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
|
608 | shash::Any payload_hash(shash::kMd5); |
795 | |||
796 | unsigned char *data; | ||
797 | unsigned int nbytes = | ||
798 | 608 | info.origin->Data(reinterpret_cast<void **>(&data), | |
799 |
2/4✓ Branch 2 taken 608 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 608 times.
✗ Branch 6 not taken.
|
608 | info.origin->GetSize(), 0); |
800 |
2/4✓ Branch 2 taken 608 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 608 times.
|
608 | assert(nbytes == info.origin->GetSize()); |
801 | |||
802 |
1/4✓ Branch 0 taken 608 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
|
608 | switch (config_.authz_method) { |
803 | 608 | case kAuthzAwsV2: | |
804 |
1/2✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
|
608 | shash::HashMem(data, nbytes, &payload_hash); |
805 | *hex_hash = | ||
806 |
2/4✓ Branch 2 taken 608 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 608 times.
✗ Branch 6 not taken.
|
1824 | Base64(string(reinterpret_cast<char *>(payload_hash.digest), |
807 | 1216 | payload_hash.GetDigestSize())); | |
808 | 608 | return true; | |
809 | ✗ | case kAuthzAwsV4: | |
810 | *hex_hash = | ||
811 | ✗ | shash::Sha256Mem(data, nbytes); | |
812 | ✗ | return true; | |
813 | ✗ | case kAuthzAzure: | |
814 | // no payload hash required for Azure signature | ||
815 | ✗ | hex_hash->clear(); | |
816 | ✗ | return true; | |
817 | ✗ | default: | |
818 | ✗ | PANIC(NULL); | |
819 | } | ||
820 | } | ||
821 | |||
822 | 1223 | string S3FanoutManager::GetRequestString(const JobInfo &info) const { | |
823 |
3/4✓ Branch 0 taken 613 times.
✓ Branch 1 taken 608 times.
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
|
1223 | switch (info.request) { |
824 | 613 | case JobInfo::kReqHeadOnly: | |
825 | case JobInfo::kReqHeadPut: | ||
826 |
1/2✓ Branch 2 taken 613 times.
✗ Branch 3 not taken.
|
613 | return "HEAD"; |
827 | 608 | case JobInfo::kReqPutCas: | |
828 | case JobInfo::kReqPutDotCvmfs: | ||
829 | case JobInfo::kReqPutHtml: | ||
830 | case JobInfo::kReqPutBucket: | ||
831 |
1/2✓ Branch 2 taken 608 times.
✗ Branch 3 not taken.
|
608 | return "PUT"; |
832 | 2 | case JobInfo::kReqDelete: | |
833 |
1/2✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
|
2 | return "DELETE"; |
834 | ✗ | default: | |
835 | ✗ | PANIC(NULL); | |
836 | } | ||
837 | } | ||
838 | |||
839 | |||
840 | 1222 | string S3FanoutManager::GetContentType(const JobInfo &info) const { | |
841 |
2/6✓ Branch 0 taken 614 times.
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
|
1222 | switch (info.request) { |
842 | 614 | case JobInfo::kReqHeadOnly: | |
843 | case JobInfo::kReqHeadPut: | ||
844 | case JobInfo::kReqDelete: | ||
845 |
1/2✓ Branch 2 taken 614 times.
✗ Branch 3 not taken.
|
614 | return ""; |
846 | 608 | case JobInfo::kReqPutCas: | |
847 |
1/2✓ Branch 2 taken 608 times.
✗ Branch 3 not taken.
|
608 | return "application/octet-stream"; |
848 | ✗ | case JobInfo::kReqPutDotCvmfs: | |
849 | ✗ | return "application/x-cvmfs"; | |
850 | ✗ | case JobInfo::kReqPutHtml: | |
851 | ✗ | return "text/html"; | |
852 | ✗ | case JobInfo::kReqPutBucket: | |
853 | ✗ | return "text/xml"; | |
854 | ✗ | default: | |
855 | ✗ | PANIC(NULL); | |
856 | } | ||
857 | } | ||
858 | |||
859 | |||
860 | /** | ||
861 | * Request parameters set the URL and other options such as timeout and | ||
862 | * proxy. | ||
863 | */ | ||
864 | 1222 | Failures S3FanoutManager::InitializeRequest(JobInfo *info, CURL *handle) const { | |
865 | // Initialize internal download state | ||
866 | 1222 | info->curl_handle = handle; | |
867 | 1222 | info->error_code = kFailOk; | |
868 | 1222 | info->http_error = 0; | |
869 | 1222 | info->num_retries = 0; | |
870 | 1222 | info->backoff_ms = 0; | |
871 | 1222 | info->throttle_ms = 0; | |
872 | 1222 | info->throttle_timestamp = 0; | |
873 | 1222 | info->http_headers = NULL; | |
874 | // info->payload_size is needed in S3Uploader::MainCollectResults, | ||
875 | // where info->origin is already destroyed. | ||
876 |
1/2✓ Branch 2 taken 1222 times.
✗ Branch 3 not taken.
|
1222 | info->payload_size = info->origin->GetSize(); |
877 | |||
878 |
2/4✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
|
1222 | InitializeDnsSettings(handle, complete_hostname_); |
879 | |||
880 | CURLcode retval; | ||
881 |
2/2✓ Branch 0 taken 1217 times.
✓ Branch 1 taken 5 times.
|
1222 | if ((info->request == JobInfo::kReqHeadOnly) || |
882 |
2/2✓ Branch 0 taken 609 times.
✓ Branch 1 taken 608 times.
|
1217 | (info->request == JobInfo::kReqHeadPut) || |
883 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 608 times.
|
609 | (info->request == JobInfo::kReqDelete)) |
884 | { | ||
885 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 0); |
886 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 614 times.
|
614 | assert(retval == CURLE_OK); |
887 |
1/2✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
|
614 | retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 1); |
888 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 614 times.
|
614 | assert(retval == CURLE_OK); |
889 | |||
890 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 613 times.
|
614 | if (info->request == JobInfo::kReqDelete) |
891 | { | ||
892 |
2/4✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 1 times.
✗ Branch 6 not taken.
|
1 | retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, |
893 | GetRequestString(*info).c_str()); | ||
894 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | assert(retval == CURLE_OK); |
895 | } else { | ||
896 |
1/2✓ Branch 1 taken 613 times.
✗ Branch 2 not taken.
|
613 | retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL); |
897 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 613 times.
|
613 | assert(retval == CURLE_OK); |
898 | } | ||
899 | } else { | ||
900 |
1/2✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
|
608 | retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL); |
901 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 608 times.
|
608 | assert(retval == CURLE_OK); |
902 |
1/2✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
|
608 | retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 1); |
903 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 608 times.
|
608 | assert(retval == CURLE_OK); |
904 |
1/2✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
|
608 | retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 0); |
905 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 608 times.
|
608 | assert(retval == CURLE_OK); |
906 |
2/4✓ Branch 2 taken 608 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 608 times.
✗ Branch 6 not taken.
|
608 | retval = curl_easy_setopt(handle, CURLOPT_INFILESIZE_LARGE, |
907 | static_cast<curl_off_t>(info->origin->GetSize())); | ||
908 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 608 times.
|
608 | assert(retval == CURLE_OK); |
909 | |||
910 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 608 times.
|
608 | if (info->request == JobInfo::kReqPutDotCvmfs) { |
911 | ✗ | info->http_headers = | |
912 | ✗ | curl_slist_append(info->http_headers, kCacheControlDotCvmfs); | |
913 |
1/2✓ Branch 0 taken 608 times.
✗ Branch 1 not taken.
|
608 | } else if (info->request == JobInfo::kReqPutCas) { |
914 | 608 | info->http_headers = | |
915 |
1/2✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
|
608 | curl_slist_append(info->http_headers, kCacheControlCas); |
916 | } | ||
917 | } | ||
918 | |||
919 | bool retval_b; | ||
920 | |||
921 | // Authorization | ||
922 | 1222 | vector<string> authz_headers; | |
923 |
1/4✓ Branch 0 taken 1222 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
|
1222 | switch (config_.authz_method) { |
924 | 1222 | case kAuthzAwsV2: | |
925 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | retval_b = MkV2Authz(*info, &authz_headers); |
926 | 1222 | break; | |
927 | ✗ | case kAuthzAwsV4: | |
928 | ✗ | retval_b = MkV4Authz(*info, &authz_headers); | |
929 | ✗ | break; | |
930 | ✗ | case kAuthzAzure: | |
931 | ✗ | retval_b = MkAzureAuthz(*info, &authz_headers); | |
932 | ✗ | break; | |
933 | ✗ | default: | |
934 | ✗ | PANIC(NULL); | |
935 | } | ||
936 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | if (!retval_b) |
937 | ✗ | return kFailLocalIO; | |
938 |
2/2✓ Branch 1 taken 4882 times.
✓ Branch 2 taken 1222 times.
|
6104 | for (unsigned i = 0; i < authz_headers.size(); ++i) { |
939 | 4882 | info->http_headers = | |
940 |
1/2✓ Branch 3 taken 4882 times.
✗ Branch 4 not taken.
|
4882 | curl_slist_append(info->http_headers, authz_headers[i].c_str()); |
941 | } | ||
942 | |||
943 | // Common headers | ||
944 | 1222 | info->http_headers = | |
945 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | curl_slist_append(info->http_headers, "Connection: Keep-Alive"); |
946 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | info->http_headers = curl_slist_append(info->http_headers, "Pragma:"); |
947 | // No 100-continue | ||
948 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | info->http_headers = curl_slist_append(info->http_headers, "Expect:"); |
949 | // Strip unnecessary header | ||
950 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | info->http_headers = curl_slist_append(info->http_headers, "Accept:"); |
951 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | info->http_headers = curl_slist_append(info->http_headers, |
952 | 1222 | user_agent_->c_str()); | |
953 | |||
954 | // Set curl parameters | ||
955 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | retval = curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info)); |
956 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | assert(retval == CURLE_OK); |
957 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | retval = curl_easy_setopt(handle, CURLOPT_HEADERDATA, |
958 | static_cast<void *>(info)); | ||
959 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | assert(retval == CURLE_OK); |
960 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | retval = curl_easy_setopt(handle, CURLOPT_READDATA, |
961 | static_cast<void *>(info)); | ||
962 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | assert(retval == CURLE_OK); |
963 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | retval = curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->http_headers); |
964 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | assert(retval == CURLE_OK); |
965 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | if (opt_ipv4_only_) { |
966 | ✗ | retval = curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4); | |
967 | ✗ | assert(retval == CURLE_OK); | |
968 | } | ||
969 | // Follow HTTP redirects | ||
970 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | retval = curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L); |
971 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | assert(retval == CURLE_OK); |
972 | |||
973 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | retval = curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, info->errorbuffer); |
974 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | assert(retval == CURLE_OK); |
975 | |||
976 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 1222 times.
|
1222 | if (config_.protocol == "https") { |
977 | ✗ | retval = curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 1L); | |
978 | ✗ | assert(retval == CURLE_OK); | |
979 | ✗ | retval = curl_easy_setopt(handle, CURLOPT_PROXY_SSL_VERIFYPEER, 1L); | |
980 | ✗ | assert(retval == CURLE_OK); | |
981 | ✗ | bool add_cert = ssl_certificate_store_.ApplySslCertificatePath(handle); | |
982 | ✗ | assert(add_cert); | |
983 | } | ||
984 | |||
985 | 1222 | return kFailOk; | |
986 | 1222 | } | |
987 | |||
988 | |||
989 | /** | ||
990 | * Sets the URL specific options such as host to use and timeout. | ||
991 | */ | ||
992 | 1222 | void S3FanoutManager::SetUrlOptions(JobInfo *info) const { | |
993 | 1222 | CURL *curl_handle = info->curl_handle; | |
994 | CURLcode retval; | ||
995 | |||
996 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | retval = curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, |
997 | config_.opt_timeout_sec); | ||
998 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | assert(retval == CURLE_OK); |
999 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, |
1000 | kLowSpeedLimit); | ||
1001 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | assert(retval == CURLE_OK); |
1002 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, |
1003 | config_.opt_timeout_sec); | ||
1004 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | assert(retval == CURLE_OK); |
1005 | |||
1006 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | if (is_curl_debug_) { |
1007 | ✗ | retval = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE, 1); | |
1008 | ✗ | assert(retval == CURLE_OK); | |
1009 | } | ||
1010 | |||
1011 |
1/2✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
|
1222 | string url = MkUrl(info->object_key); |
1012 |
1/2✓ Branch 2 taken 1222 times.
✗ Branch 3 not taken.
|
1222 | retval = curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str()); |
1013 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | assert(retval == CURLE_OK); |
1014 | |||
1015 |
1/2✓ Branch 2 taken 1222 times.
✗ Branch 3 not taken.
|
1222 | retval = curl_easy_setopt(curl_handle, CURLOPT_PROXY, config_.proxy.c_str()); |
1016 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | assert(retval == CURLE_OK); |
1017 | 1222 | } | |
1018 | |||
1019 | |||
1020 | /** | ||
1021 | * Adds transfer time and uploaded bytes to the global counters. | ||
1022 | */ | ||
1023 | 1226 | void S3FanoutManager::UpdateStatistics(CURL *handle) { | |
1024 | double val; | ||
1025 | |||
1026 |
2/4✓ Branch 1 taken 1226 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1226 times.
✗ Branch 4 not taken.
|
1226 | if (curl_easy_getinfo(handle, CURLINFO_SIZE_UPLOAD, &val) == CURLE_OK) |
1027 | 1226 | statistics_->transferred_bytes += val; | |
1028 | 1226 | } | |
1029 | |||
1030 | |||
1031 | /** | ||
1032 | * Retry if possible and if not already done too often. | ||
1033 | */ | ||
1034 | 6 | bool S3FanoutManager::CanRetry(const JobInfo *info) { | |
1035 | return | ||
1036 | 12 | (info->error_code == kFailHostConnection || | |
1037 |
1/2✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
|
6 | info->error_code == kFailHostResolve || |
1038 |
1/2✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
|
6 | info->error_code == kFailServiceUnavailable || |
1039 |
3/4✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
✓ Branch 3 taken 2 times.
|
16 | info->error_code == kFailRetry) && |
1040 |
1/2✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
|
10 | (info->num_retries < config_.opt_max_retries); |
1041 | } | ||
1042 | |||
1043 | |||
1044 | /** | ||
1045 | * Backoff for retry to introduce a jitter into a upload sequence. | ||
1046 | * | ||
1047 | * \return true if backoff has been performed, false otherwise | ||
1048 | */ | ||
1049 | 4 | void S3FanoutManager::Backoff(JobInfo *info) { | |
1050 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
|
4 | if (info->error_code != kFailRetry) |
1051 | ✗ | info->num_retries++; | |
1052 | 4 | statistics_->num_retries++; | |
1053 | |||
1054 |
1/2✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
|
4 | if (info->throttle_ms > 0) { |
1055 | 4 | LogCvmfs(kLogS3Fanout, kLogDebug, "throttling for %d ms", | |
1056 | info->throttle_ms); | ||
1057 | 4 | uint64_t now = platform_monotonic_time(); | |
1058 |
1/2✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
|
4 | if ((info->throttle_timestamp + (info->throttle_ms / 1000)) >= now) { |
1059 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 3 times.
|
4 | if ((now - timestamp_last_throttle_report_) > kThrottleReportIntervalSec) |
1060 | { | ||
1061 | 1 | LogCvmfs(kLogS3Fanout, kLogStdout, | |
1062 | "Warning: S3 backend throttling %ums " | ||
1063 | "(total backoff time so far %ums)", | ||
1064 | info->throttle_ms, | ||
1065 | 1 | statistics_->ms_throttled); | |
1066 | 1 | timestamp_last_throttle_report_ = now; | |
1067 | } | ||
1068 | 4 | statistics_->ms_throttled += info->throttle_ms; | |
1069 | 4 | SafeSleepMs(info->throttle_ms); | |
1070 | } | ||
1071 | } else { | ||
1072 | ✗ | if (info->backoff_ms == 0) { | |
1073 | // Must be != 0 | ||
1074 | ✗ | info->backoff_ms = prng_.Next(config_.opt_backoff_init_ms + 1); | |
1075 | } else { | ||
1076 | ✗ | info->backoff_ms *= 2; | |
1077 | } | ||
1078 | ✗ | if (info->backoff_ms > config_.opt_backoff_max_ms) | |
1079 | ✗ | info->backoff_ms = config_.opt_backoff_max_ms; | |
1080 | |||
1081 | ✗ | LogCvmfs(kLogS3Fanout, kLogDebug, "backing off for %d ms", | |
1082 | info->backoff_ms); | ||
1083 | ✗ | SafeSleepMs(info->backoff_ms); | |
1084 | } | ||
1085 | 4 | } | |
1086 | |||
1087 | |||
1088 | /** | ||
1089 | * Checks the result of a curl request and implements the failure logic | ||
1090 | * and takes care of cleanup. | ||
1091 | * | ||
1092 | * @return true if request should be repeated, false otherwise | ||
1093 | */ | ||
1094 | 1226 | bool S3FanoutManager::VerifyAndFinalize(const int curl_error, JobInfo *info) { | |
1095 | 1226 | LogCvmfs(kLogS3Fanout, kLogDebug, "Verify uploaded/tested object %s " | |
1096 | "(curl error %d, info error %d, info request %d)", | ||
1097 | info->object_key.c_str(), | ||
1098 | 1226 | curl_error, info->error_code, info->request); | |
1099 | 1226 | UpdateStatistics(info->curl_handle); | |
1100 | |||
1101 | // Verification and error classification | ||
1102 |
1/6✓ Branch 0 taken 1226 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
|
1226 | switch (curl_error) { |
1103 | 1226 | case CURLE_OK: | |
1104 |
2/2✓ Branch 0 taken 1222 times.
✓ Branch 1 taken 4 times.
|
1226 | if ((info->error_code != kFailRetry) && |
1105 |
2/2✓ Branch 0 taken 612 times.
✓ Branch 1 taken 610 times.
|
1222 | (info->error_code != kFailNotFound)) |
1106 | { | ||
1107 | 612 | info->error_code = kFailOk; | |
1108 | } | ||
1109 | 1226 | break; | |
1110 | ✗ | case CURLE_UNSUPPORTED_PROTOCOL: | |
1111 | case CURLE_URL_MALFORMAT: | ||
1112 | ✗ | info->error_code = kFailBadRequest; | |
1113 | ✗ | break; | |
1114 | ✗ | case CURLE_COULDNT_RESOLVE_HOST: | |
1115 | ✗ | info->error_code = kFailHostResolve; | |
1116 | ✗ | break; | |
1117 | ✗ | case CURLE_COULDNT_CONNECT: | |
1118 | case CURLE_OPERATION_TIMEDOUT: | ||
1119 | case CURLE_SEND_ERROR: | ||
1120 | case CURLE_RECV_ERROR: | ||
1121 | ✗ | info->error_code = kFailHostConnection; | |
1122 | ✗ | break; | |
1123 | ✗ | case CURLE_ABORTED_BY_CALLBACK: | |
1124 | case CURLE_WRITE_ERROR: | ||
1125 | // Error set by callback | ||
1126 | ✗ | break; | |
1127 | ✗ | default: | |
1128 | ✗ | LogCvmfs(kLogS3Fanout, kLogStderr | kLogSyslogErr, | |
1129 | "unexpected curl error (%d) while trying to upload %s: %s", | ||
1130 | curl_error, info->object_key.c_str(), info->errorbuffer); | ||
1131 | ✗ | info->error_code = kFailOther; | |
1132 | ✗ | break; | |
1133 | } | ||
1134 | |||
1135 | // Transform HEAD to PUT request | ||
1136 |
2/2✓ Branch 0 taken 610 times.
✓ Branch 1 taken 616 times.
|
1226 | if ((info->error_code == kFailNotFound) && |
1137 |
2/2✓ Branch 0 taken 608 times.
✓ Branch 1 taken 2 times.
|
610 | (info->request == JobInfo::kReqHeadPut)) |
1138 | { | ||
1139 | 608 | LogCvmfs(kLogS3Fanout, kLogDebug, "not found: %s, uploading", | |
1140 | info->object_key.c_str()); | ||
1141 | 608 | info->request = JobInfo::kReqPutCas; | |
1142 | 608 | curl_slist_free_all(info->http_headers); | |
1143 | 608 | info->http_headers = NULL; | |
1144 | 608 | s3fanout::Failures init_failure = InitializeRequest(info, | |
1145 | info->curl_handle); | ||
1146 | |||
1147 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 608 times.
|
608 | if (init_failure != s3fanout::kFailOk) { |
1148 | ✗ | PANIC(kLogStderr, | |
1149 | "Failed to initialize CURL handle " | ||
1150 | "(error: %d - %s | errno: %d)", | ||
1151 | init_failure, Code2Ascii(init_failure), errno); | ||
1152 | } | ||
1153 | 608 | SetUrlOptions(info); | |
1154 | // Reset origin | ||
1155 | 608 | info->origin->Rewind(); | |
1156 | 608 | return true; // Again, Put | |
1157 | } | ||
1158 | |||
1159 | // Determination if failed request should be repeated | ||
1160 | 618 | bool try_again = false; | |
1161 |
2/2✓ Branch 0 taken 6 times.
✓ Branch 1 taken 612 times.
|
618 | if (info->error_code != kFailOk) { |
1162 | 6 | try_again = CanRetry(info); | |
1163 | } | ||
1164 |
2/2✓ Branch 0 taken 4 times.
✓ Branch 1 taken 614 times.
|
618 | if (try_again) { |
1165 |
1/2✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
|
4 | if (info->request == JobInfo::kReqPutCas || |
1166 |
1/2✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
|
4 | info->request == JobInfo::kReqPutDotCvmfs || |
1167 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
|
4 | info->request == JobInfo::kReqPutHtml) { |
1168 | ✗ | LogCvmfs(kLogS3Fanout, kLogDebug, "Trying again to upload %s", | |
1169 | info->object_key.c_str()); | ||
1170 | // Reset origin | ||
1171 | ✗ | info->origin->Rewind(); | |
1172 | } | ||
1173 | 4 | Backoff(info); | |
1174 | 4 | info->error_code = kFailOk; | |
1175 | 4 | info->http_error = 0; | |
1176 | 4 | info->throttle_ms = 0; | |
1177 | 4 | info->backoff_ms = 0; | |
1178 | 4 | info->throttle_timestamp = 0; | |
1179 | 4 | return true; // try again | |
1180 | } | ||
1181 | |||
1182 | // Cleanup opened resources | ||
1183 | 614 | info->origin.Destroy(); | |
1184 | |||
1185 |
2/2✓ Branch 0 taken 2 times.
✓ Branch 1 taken 612 times.
|
614 | if ((info->error_code != kFailOk) && |
1186 |
2/4✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
|
2 | (info->http_error != 0) && (info->http_error != 404)) |
1187 | { | ||
1188 | ✗ | LogCvmfs(kLogS3Fanout, kLogStderr, "S3: HTTP failure %d", info->http_error); | |
1189 | } | ||
1190 | 614 | return false; // stop transfer | |
1191 | } | ||
1192 | |||
1193 |
2/4✓ Branch 3 taken 12 times.
✗ Branch 4 not taken.
✓ Branch 9 taken 12 times.
✗ Branch 10 not taken.
|
12 | S3FanoutManager::S3FanoutManager(const S3Config &config) : config_(config) { |
1194 | 12 | atomic_init32(&multi_threaded_); | |
1195 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | MakePipe(pipe_terminate_); |
1196 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | MakePipe(pipe_jobs_); |
1197 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | MakePipe(pipe_completed_); |
1198 | |||
1199 | int retval; | ||
1200 | 12 | jobs_todo_lock_ = | |
1201 | 12 | reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t))); | |
1202 | 12 | retval = pthread_mutex_init(jobs_todo_lock_, NULL); | |
1203 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
|
12 | assert(retval == 0); |
1204 | 12 | curl_handle_lock_ = | |
1205 | 12 | reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t))); | |
1206 | 12 | retval = pthread_mutex_init(curl_handle_lock_, NULL); | |
1207 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
|
12 | assert(retval == 0); |
1208 | |||
1209 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | active_requests_ = new set<JobInfo *>; |
1210 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | pool_handles_idle_ = new set<CURL *>; |
1211 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | pool_handles_inuse_ = new set<CURL *>; |
1212 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | curl_sharehandles_ = new map<CURL *, S3FanOutDnsEntry *>; |
1213 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | sharehandles_ = new set<S3FanOutDnsEntry *>; |
1214 | 12 | watch_fds_max_ = 4 * config_.pool_max_handles; | |
1215 | 12 | max_available_jobs_ = 4 * config_.pool_max_handles; | |
1216 |
2/4✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 12 times.
✗ Branch 5 not taken.
|
12 | available_jobs_ = new Semaphore(max_available_jobs_); |
1217 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
|
12 | assert(NULL != available_jobs_); |
1218 | |||
1219 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | statistics_ = new Statistics(); |
1220 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | user_agent_ = new string(); |
1221 |
2/4✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
|
12 | *user_agent_ = "User-Agent: cvmfs " + string(VERSION); |
1222 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | complete_hostname_ = MkCompleteHostname(); |
1223 | |||
1224 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | CURLcode cretval = curl_global_init(CURL_GLOBAL_ALL); |
1225 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
|
12 | assert(cretval == CURLE_OK); |
1226 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | curl_multi_ = curl_multi_init(); |
1227 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
|
12 | assert(curl_multi_ != NULL); |
1228 | CURLMcode mretval; | ||
1229 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION, |
1230 | CallbackCurlSocket); | ||
1231 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
|
12 | assert(mretval == CURLM_OK); |
1232 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA, |
1233 | static_cast<void *>(this)); | ||
1234 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
|
12 | assert(mretval == CURLM_OK); |
1235 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | mretval = curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS, |
1236 | config_.pool_max_handles); | ||
1237 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
|
12 | assert(mretval == CURLM_OK); |
1238 | |||
1239 | 12 | prng_.InitLocaltime(); | |
1240 | |||
1241 | 12 | thread_upload_ = 0; | |
1242 | 12 | timestamp_last_throttle_report_ = 0; | |
1243 | 12 | is_curl_debug_ = (getenv("_CVMFS_CURL_DEBUG") != NULL); | |
1244 | |||
1245 | // Parsing environment variables | ||
1246 |
2/4✗ Branch 1 not taken.
✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 12 times.
|
12 | if ((getenv("CVMFS_IPV4_ONLY") != NULL) && |
1247 | ✗ | (strlen(getenv("CVMFS_IPV4_ONLY")) > 0)) { | |
1248 | ✗ | opt_ipv4_only_ = true; | |
1249 | } else { | ||
1250 | 12 | opt_ipv4_only_ = false; | |
1251 | } | ||
1252 | |||
1253 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | resolver_ = dns::CaresResolver::Create(opt_ipv4_only_, 2, 2000); |
1254 | |||
1255 | 12 | watch_fds_ = static_cast<struct pollfd *>(smalloc(4 * sizeof(struct pollfd))); | |
1256 | 12 | watch_fds_size_ = 4; | |
1257 | 12 | watch_fds_inuse_ = 0; | |
1258 | |||
1259 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | ssl_certificate_store_.UseSystemCertificatePath(); |
1260 | 12 | } | |
1261 | |||
1262 | 24 | S3FanoutManager::~S3FanoutManager() { | |
1263 | 12 | pthread_mutex_destroy(jobs_todo_lock_); | |
1264 | 12 | free(jobs_todo_lock_); | |
1265 | 12 | pthread_mutex_destroy(curl_handle_lock_); | |
1266 | 12 | free(curl_handle_lock_); | |
1267 | |||
1268 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | if (atomic_xadd32(&multi_threaded_, 0) == 1) { |
1269 | // Shutdown I/O thread | ||
1270 | 12 | char buf = 'T'; | |
1271 | 12 | WritePipe(pipe_terminate_[1], &buf, 1); | |
1272 | 12 | pthread_join(thread_upload_, NULL); | |
1273 | } | ||
1274 | 12 | ClosePipe(pipe_terminate_); | |
1275 | 12 | ClosePipe(pipe_jobs_); | |
1276 | 12 | ClosePipe(pipe_completed_); | |
1277 | |||
1278 | 12 | set<CURL *>::iterator i = pool_handles_idle_->begin(); | |
1279 | 12 | const set<CURL *>::const_iterator iEnd = pool_handles_idle_->end(); | |
1280 |
2/2✓ Branch 2 taken 20 times.
✓ Branch 3 taken 12 times.
|
32 | for (; i != iEnd; ++i) { |
1281 | 20 | curl_easy_cleanup(*i); | |
1282 | } | ||
1283 | |||
1284 | 12 | set<S3FanOutDnsEntry *>::iterator is = sharehandles_->begin(); | |
1285 | 12 | const set<S3FanOutDnsEntry *>::const_iterator isEnd = sharehandles_->end(); | |
1286 |
2/2✓ Branch 2 taken 10 times.
✓ Branch 3 taken 12 times.
|
22 | for (; is != isEnd; ++is) { |
1287 | 10 | curl_share_cleanup((*is)->sharehandle); | |
1288 | 10 | curl_slist_free_all((*is)->clist); | |
1289 |
1/2✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
|
10 | delete *is; |
1290 | } | ||
1291 | 12 | pool_handles_idle_->clear(); | |
1292 | 12 | curl_sharehandles_->clear(); | |
1293 | 12 | sharehandles_->clear(); | |
1294 |
1/2✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
|
12 | delete active_requests_; |
1295 |
1/2✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
|
12 | delete pool_handles_idle_; |
1296 |
1/2✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
|
12 | delete pool_handles_inuse_; |
1297 |
1/2✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
|
12 | delete curl_sharehandles_; |
1298 |
1/2✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
|
12 | delete sharehandles_; |
1299 |
1/2✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
|
12 | delete user_agent_; |
1300 | 12 | curl_multi_cleanup(curl_multi_); | |
1301 | |||
1302 |
1/2✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
|
12 | delete statistics_; |
1303 | |||
1304 |
1/2✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
|
12 | delete available_jobs_; |
1305 | |||
1306 | 12 | curl_global_cleanup(); | |
1307 | 12 | } | |
1308 | |||
1309 | /** | ||
1310 | * Spawns the I/O worker thread. No way back except ~S3FanoutManager. | ||
1311 | */ | ||
1312 | 12 | void S3FanoutManager::Spawn() { | |
1313 | 12 | LogCvmfs(kLogS3Fanout, kLogDebug, "S3FanoutManager spawned"); | |
1314 | |||
1315 | 12 | int retval = pthread_create(&thread_upload_, NULL, MainUpload, | |
1316 | static_cast<void *>(this)); | ||
1317 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
|
12 | assert(retval == 0); |
1318 | |||
1319 | 12 | atomic_inc32(&multi_threaded_); | |
1320 | 12 | } | |
1321 | |||
1322 | 3 | const Statistics &S3FanoutManager::GetStatistics() { | |
1323 | 3 | return *statistics_; | |
1324 | } | ||
1325 | |||
1326 | /** | ||
1327 | * Push new job to be uploaded to the S3 cloud storage. | ||
1328 | */ | ||
1329 | 614 | void S3FanoutManager::PushNewJob(JobInfo *info) { | |
1330 | 614 | available_jobs_->Increment(); | |
1331 | 614 | WritePipe(pipe_jobs_[1], &info, sizeof(info)); | |
1332 | 614 | } | |
1333 | |||
1334 | /** | ||
1335 | * Push completed job to list of completed jobs | ||
1336 | */ | ||
1337 | 626 | void S3FanoutManager::PushCompletedJob(JobInfo *info) { | |
1338 | 626 | WritePipe(pipe_completed_[1], &info, sizeof(info)); | |
1339 | 626 | } | |
1340 | |||
1341 | /** | ||
1342 | * Pop completed job | ||
1343 | */ | ||
1344 | 626 | JobInfo *S3FanoutManager::PopCompletedJob() { | |
1345 | JobInfo *info; | ||
1346 |
1/2✓ Branch 1 taken 626 times.
✗ Branch 2 not taken.
|
626 | ReadPipe(pipe_completed_[0], &info, sizeof(info)); |
1347 | 626 | return info; | |
1348 | } | ||
1349 | |||
1350 | //------------------------------------------------------------------------------ | ||
1351 | |||
1352 | |||
1353 | ✗ | string Statistics::Print() const { | |
1354 | return | ||
1355 | ✗ | "Transferred Bytes: " + | |
1356 | ✗ | StringifyInt(uint64_t(transferred_bytes)) + "\n" + | |
1357 | ✗ | "Transfer duration: " + | |
1358 | ✗ | StringifyInt(uint64_t(transfer_time)) + " s\n" + | |
1359 | ✗ | "Number of requests: " + | |
1360 | ✗ | StringifyInt(num_requests) + "\n" + | |
1361 | ✗ | "Number of retries: " + | |
1362 | ✗ | StringifyInt(num_retries) + "\n"; | |
1363 | } | ||
1364 | |||
1365 | } // namespace s3fanout | ||
1366 |