Directory: | cvmfs/ |
---|---|
File: | cvmfs/network/s3fanout.cc |
Date: | 2025-07-13 02:35:07 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 560 | 789 | 71.0% |
Branches: | 403 | 1149 | 35.1% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | * | ||
4 | * Runs a thread using libcurls asynchronous I/O mode to push data to S3 | ||
5 | */ | ||
6 | |||
7 | #include "s3fanout.h" | ||
8 | |||
9 | #include <pthread.h> | ||
10 | |||
11 | #include <algorithm> | ||
12 | #include <cassert> | ||
13 | #include <cerrno> | ||
14 | #include <utility> | ||
15 | |||
16 | #include "upload_facility.h" | ||
17 | #include "util/concurrency.h" | ||
18 | #include "util/exception.h" | ||
19 | #include "util/platform.h" | ||
20 | #include "util/posix.h" | ||
21 | #include "util/string.h" | ||
22 | |||
23 | using namespace std; // NOLINT | ||
24 | |||
25 | namespace s3fanout { | ||
26 | |||
27 | const char *S3FanoutManager::kCacheControlCas = "Cache-Control: max-age=259200"; | ||
28 | const char | ||
29 | *S3FanoutManager::kCacheControlDotCvmfs = "Cache-Control: max-age=61"; | ||
30 | const unsigned S3FanoutManager::kDefault429ThrottleMs = 250; | ||
31 | const unsigned S3FanoutManager::kMax429ThrottleMs = 10000; | ||
32 | const unsigned S3FanoutManager::kThrottleReportIntervalSec = 10; | ||
33 | const unsigned S3FanoutManager::kDefaultHTTPPort = 80; | ||
34 | const unsigned S3FanoutManager::kDefaultHTTPSPort = 443; | ||
35 | |||
36 | |||
37 | /** | ||
38 | * Parses Retry-After and X-Retry-In headers attached to HTTP 429 responses | ||
39 | */ | ||
40 | 214 | void S3FanoutManager::DetectThrottleIndicator(const std::string &header, | |
41 | JobInfo *info) { | ||
42 | 214 | std::string value_str; | |
43 |
4/6✓ Branch 2 taken 214 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 214 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 97 times.
✓ Branch 10 taken 117 times.
|
214 | if (HasPrefix(header, "retry-after:", true)) |
44 |
1/2✓ Branch 1 taken 97 times.
✗ Branch 2 not taken.
|
97 | value_str = header.substr(12); |
45 |
4/6✓ Branch 2 taken 214 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 214 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 55 times.
✓ Branch 10 taken 159 times.
|
214 | if (HasPrefix(header, "x-retry-in:", true)) |
46 |
1/2✓ Branch 1 taken 55 times.
✗ Branch 2 not taken.
|
55 | value_str = header.substr(11); |
47 | |||
48 |
1/2✓ Branch 1 taken 214 times.
✗ Branch 2 not taken.
|
214 | value_str = Trim(value_str, true /* trim_newline */); |
49 |
2/2✓ Branch 1 taken 130 times.
✓ Branch 2 taken 84 times.
|
214 | if (!value_str.empty()) { |
50 |
1/2✓ Branch 1 taken 130 times.
✗ Branch 2 not taken.
|
130 | const unsigned value_numeric = String2Uint64(value_str); |
51 |
2/4✓ Branch 2 taken 130 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 130 times.
✗ Branch 6 not taken.
|
260 | const unsigned value_ms = HasSuffix(value_str, "ms", true /* ignore_case */) |
52 |
2/2✓ Branch 0 taken 55 times.
✓ Branch 1 taken 75 times.
|
130 | ? value_numeric |
53 | 130 | : (value_numeric * 1000); | |
54 |
2/2✓ Branch 0 taken 119 times.
✓ Branch 1 taken 11 times.
|
130 | if (value_ms > 0) |
55 | 119 | info->throttle_ms = std::min(value_ms, kMax429ThrottleMs); | |
56 | } | ||
57 | 214 | } | |
58 | |||
59 | |||
60 | /** | ||
61 | * Called by curl for every HTTP header. Not called for file:// transfers. | ||
62 | */ | ||
63 | 18410 | static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, | |
64 | void *info_link) { | ||
65 | 18410 | const size_t num_bytes = size * nmemb; | |
66 |
1/2✓ Branch 2 taken 18410 times.
✗ Branch 3 not taken.
|
18410 | const string header_line(static_cast<const char *>(ptr), num_bytes); |
67 | 18410 | JobInfo *info = static_cast<JobInfo *>(info_link); | |
68 | |||
69 | // Check for http status code errors | ||
70 |
4/6✓ Branch 2 taken 18410 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 18410 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 6130 times.
✓ Branch 10 taken 12280 times.
|
18410 | if (HasPrefix(header_line, "HTTP/1.", false)) { |
71 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 6130 times.
|
6130 | if (header_line.length() < 10) |
72 | ✗ | return 0; | |
73 | |||
74 | unsigned i; | ||
75 |
5/6✓ Branch 1 taken 12260 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 6130 times.
✓ Branch 5 taken 6130 times.
✓ Branch 6 taken 6130 times.
✓ Branch 7 taken 6130 times.
|
12260 | for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) { |
76 | } | ||
77 | |||
78 |
2/2✓ Branch 1 taken 3060 times.
✓ Branch 2 taken 3070 times.
|
6130 | if (header_line[i] == '2') { |
79 | 3060 | return num_bytes; | |
80 | } else { | ||
81 |
1/2✓ Branch 2 taken 3070 times.
✗ Branch 3 not taken.
|
3070 | LogCvmfs(kLogS3Fanout, kLogDebug, "http status error code [info %p]: %s", |
82 | info, header_line.c_str()); | ||
83 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 3070 times.
|
3070 | if (header_line.length() < i + 3) { |
84 | ✗ | LogCvmfs(kLogS3Fanout, kLogStderr, "S3: invalid HTTP response '%s'", | |
85 | header_line.c_str()); | ||
86 | ✗ | info->error_code = kFailOther; | |
87 | ✗ | return 0; | |
88 | } | ||
89 |
2/4✓ Branch 3 taken 3070 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 3070 times.
✗ Branch 7 not taken.
|
3070 | info->http_error = String2Int64(string(&header_line[i], 3)); |
90 | |||
91 |
2/6✓ Branch 0 taken 20 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 3050 times.
✗ Branch 5 not taken.
|
3070 | switch (info->http_error) { |
92 | 20 | case 429: | |
93 | 20 | info->error_code = kFailRetry; | |
94 | 20 | info->throttle_ms = S3FanoutManager::kDefault429ThrottleMs; | |
95 | 20 | info->throttle_timestamp = platform_monotonic_time(); | |
96 | 20 | return num_bytes; | |
97 | ✗ | case 503: | |
98 | case 502: // Can happen if the S3 gateway-backend connection breaks | ||
99 | case 500: // sometimes see this as a transient error from S3 | ||
100 | ✗ | info->error_code = kFailServiceUnavailable; | |
101 | ✗ | break; | |
102 | ✗ | case 501: | |
103 | case 400: | ||
104 | ✗ | info->error_code = kFailBadRequest; | |
105 | ✗ | break; | |
106 | ✗ | case 403: | |
107 | ✗ | info->error_code = kFailForbidden; | |
108 | ✗ | break; | |
109 | 3050 | case 404: | |
110 | 3050 | info->error_code = kFailNotFound; | |
111 | 3050 | return num_bytes; | |
112 | ✗ | default: | |
113 | ✗ | info->error_code = kFailOther; | |
114 | } | ||
115 | ✗ | return 0; | |
116 | } | ||
117 | } | ||
118 | |||
119 |
2/2✓ Branch 0 taken 60 times.
✓ Branch 1 taken 12220 times.
|
12280 | if (info->error_code == kFailRetry) { |
120 |
1/2✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
|
60 | S3FanoutManager::DetectThrottleIndicator(header_line, info); |
121 | } | ||
122 | |||
123 | 12280 | return num_bytes; | |
124 | 18410 | } | |
125 | |||
126 | |||
127 | /** | ||
128 | * Called by curl for every new chunk to upload. | ||
129 | */ | ||
130 | 78345 | static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, | |
131 | void *info_link) { | ||
132 | 78345 | const size_t num_bytes = size * nmemb; | |
133 | 78345 | JobInfo *info = static_cast<JobInfo *>(info_link); | |
134 | |||
135 | 78345 | LogCvmfs(kLogS3Fanout, kLogDebug, "Data callback with %zu bytes", num_bytes); | |
136 | |||
137 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 78345 times.
|
78345 | if (num_bytes == 0) |
138 | ✗ | return 0; | |
139 | |||
140 | 78345 | const uint64_t read_bytes = info->origin->Read(ptr, num_bytes); | |
141 | |||
142 | 78345 | LogCvmfs(kLogS3Fanout, kLogDebug, "source buffer pushed out %lu bytes", | |
143 | read_bytes); | ||
144 | |||
145 | 78345 | return read_bytes; | |
146 | } | ||
147 | |||
148 | |||
149 | /** | ||
150 | * For the time being, ignore all received information in the HTTP body | ||
151 | */ | ||
152 | ✗ | static size_t CallbackCurlBody(char * /*ptr*/, size_t size, size_t nmemb, | |
153 | void * /*userdata*/) { | ||
154 | ✗ | return size * nmemb; | |
155 | } | ||
156 | |||
157 | |||
158 | /** | ||
159 | * Called when new curl sockets arrive or existing curl sockets depart. | ||
160 | */ | ||
161 | 13610 | int S3FanoutManager::CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, | |
162 | void *userp, void *socketp) { | ||
163 | 13610 | S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(userp); | |
164 | 13610 | LogCvmfs(kLogS3Fanout, kLogDebug, | |
165 | "CallbackCurlSocket called with easy " | ||
166 | "handle %p, socket %d, action %d, up %p, " | ||
167 | "sp %p, fds_inuse %d, jobs %d", | ||
168 | easy, s, action, userp, socketp, s3fanout_mgr->watch_fds_inuse_, | ||
169 | 13610 | s3fanout_mgr->available_jobs_->Get()); | |
170 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 13610 times.
|
13610 | if (action == CURL_POLL_NONE) |
171 | ✗ | return 0; | |
172 | |||
173 | // Find s in watch_fds_ | ||
174 | // First 2 fds are job and terminate pipes (not curl related) | ||
175 | unsigned index; | ||
176 |
2/2✓ Branch 0 taken 21780 times.
✓ Branch 1 taken 6130 times.
|
27910 | for (index = 2; index < s3fanout_mgr->watch_fds_inuse_; ++index) { |
177 |
2/2✓ Branch 0 taken 7480 times.
✓ Branch 1 taken 14300 times.
|
21780 | if (s3fanout_mgr->watch_fds_[index].fd == s) |
178 | 7480 | break; | |
179 | } | ||
180 | // Or create newly | ||
181 |
2/2✓ Branch 0 taken 6130 times.
✓ Branch 1 taken 7480 times.
|
13610 | if (index == s3fanout_mgr->watch_fds_inuse_) { |
182 | // Extend array if necessary | ||
183 |
2/2✓ Branch 0 taken 10 times.
✓ Branch 1 taken 6120 times.
|
6130 | if (s3fanout_mgr->watch_fds_inuse_ == s3fanout_mgr->watch_fds_size_) { |
184 | 10 | s3fanout_mgr->watch_fds_size_ *= 2; | |
185 | 10 | s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>( | |
186 | 10 | srealloc(s3fanout_mgr->watch_fds_, | |
187 | 10 | s3fanout_mgr->watch_fds_size_ * sizeof(struct pollfd))); | |
188 | } | ||
189 | 6130 | s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].fd = s; | |
190 | 6130 | s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].events = 0; | |
191 | 6130 | s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].revents = 0; | |
192 | 6130 | s3fanout_mgr->watch_fds_inuse_++; | |
193 | } | ||
194 | |||
195 |
4/5✓ Branch 0 taken 6130 times.
✓ Branch 1 taken 30 times.
✓ Branch 2 taken 1320 times.
✓ Branch 3 taken 6130 times.
✗ Branch 4 not taken.
|
13610 | switch (action) { |
196 | 6130 | case CURL_POLL_IN: | |
197 | 6130 | s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI; | |
198 | 6130 | break; | |
199 | 30 | case CURL_POLL_OUT: | |
200 | 30 | s3fanout_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND; | |
201 | 30 | break; | |
202 | 1320 | case CURL_POLL_INOUT: | |
203 | 1320 | s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI | POLLOUT | |
204 | | POLLWRBAND; | ||
205 | 1320 | break; | |
206 | 6130 | case CURL_POLL_REMOVE: | |
207 |
2/2✓ Branch 0 taken 980 times.
✓ Branch 1 taken 5150 times.
|
6130 | if (index < s3fanout_mgr->watch_fds_inuse_ - 1) |
208 | s3fanout_mgr | ||
209 | 980 | ->watch_fds_[index] = s3fanout_mgr->watch_fds_ | |
210 | 980 | [s3fanout_mgr->watch_fds_inuse_ - 1]; | |
211 | 6130 | s3fanout_mgr->watch_fds_inuse_--; | |
212 | // Shrink array if necessary | ||
213 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6130 times.
|
6130 | if ((s3fanout_mgr->watch_fds_inuse_ > s3fanout_mgr->watch_fds_max_) |
214 | ✗ | && (s3fanout_mgr->watch_fds_inuse_ | |
215 | ✗ | < s3fanout_mgr->watch_fds_size_ / 2)) { | |
216 | ✗ | s3fanout_mgr->watch_fds_size_ /= 2; | |
217 | ✗ | s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>( | |
218 | ✗ | srealloc(s3fanout_mgr->watch_fds_, | |
219 | ✗ | s3fanout_mgr->watch_fds_size_ * sizeof(struct pollfd))); | |
220 | } | ||
221 | 6130 | break; | |
222 | ✗ | default: | |
223 | ✗ | PANIC(NULL); | |
224 | } | ||
225 | |||
226 | 13610 | return 0; | |
227 | } | ||
228 | |||
229 | |||
230 | /** | ||
231 | * Worker thread event loop. | ||
232 | */ | ||
233 | 60 | void *S3FanoutManager::MainUpload(void *data) { | |
234 |
1/2✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
|
60 | LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread started"); |
235 | 60 | S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(data); | |
236 | |||
237 | 60 | s3fanout_mgr->InitPipeWatchFds(); | |
238 | |||
239 | // Don't schedule more jobs into the multi handle than the maximum number of | ||
240 | // parallel connections. This should prevent starvation and thus a timeout | ||
241 | // of the authorization header (CVM-1339). | ||
242 | 60 | unsigned jobs_in_flight = 0; | |
243 | |||
244 | while (true) { | ||
245 | // Check events with 100ms timeout | ||
246 | 83725 | const int timeout_ms = 100; | |
247 |
1/2✓ Branch 1 taken 83725 times.
✗ Branch 2 not taken.
|
83725 | int retval = poll(s3fanout_mgr->watch_fds_, s3fanout_mgr->watch_fds_inuse_, |
248 | timeout_ms); | ||
249 |
2/2✓ Branch 0 taken 505 times.
✓ Branch 1 taken 83220 times.
|
83725 | if (retval == 0) { |
250 | // Handle timeout | ||
251 | 505 | int still_running = 0; | |
252 |
1/2✓ Branch 1 taken 505 times.
✗ Branch 2 not taken.
|
505 | retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_, |
253 | CURL_SOCKET_TIMEOUT, 0, &still_running); | ||
254 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 505 times.
|
505 | if (retval != CURLM_OK) { |
255 | ✗ | LogCvmfs(kLogS3Fanout, kLogStderr, "Error, timeout due to: %d", retval); | |
256 | ✗ | assert(retval == CURLM_OK); | |
257 | } | ||
258 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 83220 times.
|
83220 | } else if (retval < 0) { |
259 | ✗ | assert(errno == EINTR); | |
260 | ✗ | continue; | |
261 | } | ||
262 | |||
263 | // Terminate I/O thread | ||
264 |
2/2✓ Branch 0 taken 60 times.
✓ Branch 1 taken 83665 times.
|
83725 | if (s3fanout_mgr->watch_fds_[0].revents) |
265 | 60 | break; | |
266 | |||
267 | // New job incoming | ||
268 |
2/2✓ Branch 0 taken 3070 times.
✓ Branch 1 taken 80595 times.
|
83665 | if (s3fanout_mgr->watch_fds_[1].revents) { |
269 | 3070 | s3fanout_mgr->watch_fds_[1].revents = 0; | |
270 | JobInfo *info; | ||
271 |
1/2✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
|
3070 | ReadPipe(s3fanout_mgr->pipe_jobs_[0], &info, sizeof(info)); |
272 |
1/2✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
|
3070 | CURL *handle = s3fanout_mgr->AcquireCurlHandle(); |
273 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3070 times.
|
3070 | if (handle == NULL) { |
274 | ✗ | PANIC(kLogStderr, "Failed to acquire CURL handle."); | |
275 | } | ||
276 |
1/2✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
|
3070 | const s3fanout::Failures init_failure = s3fanout_mgr->InitializeRequest( |
277 | info, handle); | ||
278 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3070 times.
|
3070 | if (init_failure != s3fanout::kFailOk) { |
279 | ✗ | PANIC(kLogStderr, | |
280 | "Failed to initialize CURL handle (error: %d - %s | errno: %d)", | ||
281 | init_failure, Code2Ascii(init_failure), errno); | ||
282 | } | ||
283 |
1/2✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
|
3070 | s3fanout_mgr->SetUrlOptions(info); |
284 | |||
285 |
1/2✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
|
3070 | curl_multi_add_handle(s3fanout_mgr->curl_multi_, handle); |
286 |
1/2✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
|
3070 | s3fanout_mgr->active_requests_->insert(info); |
287 | 3070 | jobs_in_flight++; | |
288 | 3070 | int still_running = 0, retval = 0; | |
289 |
1/2✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
|
3070 | retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_, |
290 | CURL_SOCKET_TIMEOUT, 0, &still_running); | ||
291 | |||
292 |
1/2✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
|
3070 | LogCvmfs(kLogS3Fanout, kLogDebug, "curl_multi_socket_action: %d - %d", |
293 | retval, still_running); | ||
294 | } | ||
295 | |||
296 | |||
297 | // Activity on curl sockets | ||
298 | // Within this loop the curl_multi_socket_action() may cause socket(s) | ||
299 | // to be removed from watch_fds_. If a socket is removed it is replaced | ||
300 | // by the socket at the end of the array and the inuse count is decreased. | ||
301 | // Therefore loop over the array in reverse order. | ||
302 | // First 2 fds are job and terminate pipes (not curl related) | ||
303 |
2/2✓ Branch 0 taken 188600 times.
✓ Branch 1 taken 83665 times.
|
272265 | for (int32_t i = s3fanout_mgr->watch_fds_inuse_ - 1; i >= 2; --i) { |
304 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 188600 times.
|
188600 | if (static_cast<uint32_t>(i) >= s3fanout_mgr->watch_fds_inuse_) { |
305 | ✗ | continue; | |
306 | } | ||
307 |
2/2✓ Branch 0 taken 85345 times.
✓ Branch 1 taken 103255 times.
|
188600 | if (s3fanout_mgr->watch_fds_[i].revents) { |
308 | 85345 | int ev_bitmask = 0; | |
309 |
2/2✓ Branch 0 taken 9170 times.
✓ Branch 1 taken 76175 times.
|
85345 | if (s3fanout_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI)) |
310 | 9170 | ev_bitmask |= CURL_CSELECT_IN; | |
311 |
2/2✓ Branch 0 taken 76175 times.
✓ Branch 1 taken 9170 times.
|
85345 | if (s3fanout_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND)) |
312 | 76175 | ev_bitmask |= CURL_CSELECT_OUT; | |
313 | 85345 | if (s3fanout_mgr->watch_fds_[i].revents | |
314 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 85345 times.
|
85345 | & (POLLERR | POLLHUP | POLLNVAL)) |
315 | ✗ | ev_bitmask |= CURL_CSELECT_ERR; | |
316 | 85345 | s3fanout_mgr->watch_fds_[i].revents = 0; | |
317 | |||
318 | 85345 | int still_running = 0; | |
319 | 85345 | retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_, | |
320 |
1/2✓ Branch 1 taken 85345 times.
✗ Branch 2 not taken.
|
85345 | s3fanout_mgr->watch_fds_[i].fd, |
321 | ev_bitmask, | ||
322 | &still_running); | ||
323 | } | ||
324 | } | ||
325 | |||
326 | // Check if transfers are completed | ||
327 | CURLMsg *curl_msg; | ||
328 | int msgs_in_queue; | ||
329 |
3/4✓ Branch 1 taken 89795 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 6130 times.
✓ Branch 4 taken 83665 times.
|
89795 | while ((curl_msg = curl_multi_info_read(s3fanout_mgr->curl_multi_, |
330 | &msgs_in_queue))) { | ||
331 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6130 times.
|
6130 | assert(curl_msg->msg == CURLMSG_DONE); |
332 | |||
333 | 6130 | s3fanout_mgr->statistics_->num_requests++; | |
334 | JobInfo *info; | ||
335 | 6130 | CURL *easy_handle = curl_msg->easy_handle; | |
336 | 6130 | const int curl_error = curl_msg->data.result; | |
337 |
1/2✓ Branch 1 taken 6130 times.
✗ Branch 2 not taken.
|
6130 | curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info); |
338 | |||
339 |
1/2✓ Branch 1 taken 6130 times.
✗ Branch 2 not taken.
|
6130 | curl_multi_remove_handle(s3fanout_mgr->curl_multi_, easy_handle); |
340 |
3/4✓ Branch 1 taken 6130 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 3060 times.
✓ Branch 4 taken 3070 times.
|
6130 | if (s3fanout_mgr->VerifyAndFinalize(curl_error, info)) { |
341 |
1/2✓ Branch 1 taken 3060 times.
✗ Branch 2 not taken.
|
3060 | curl_multi_add_handle(s3fanout_mgr->curl_multi_, easy_handle); |
342 | 3060 | int still_running = 0; | |
343 |
1/2✓ Branch 1 taken 3060 times.
✗ Branch 2 not taken.
|
3060 | curl_multi_socket_action(s3fanout_mgr->curl_multi_, CURL_SOCKET_TIMEOUT, |
344 | 0, &still_running); | ||
345 | } else { | ||
346 | // Return easy handle into pool and write result back | ||
347 | 3070 | jobs_in_flight--; | |
348 |
1/2✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
|
3070 | s3fanout_mgr->active_requests_->erase(info); |
349 |
1/2✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
|
3070 | s3fanout_mgr->ReleaseCurlHandle(info, easy_handle); |
350 |
1/2✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
|
3070 | s3fanout_mgr->available_jobs_->Decrement(); |
351 | |||
352 | // Add to list of completed jobs | ||
353 |
1/2✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
|
3070 | s3fanout_mgr->PushCompletedJob(info); |
354 | } | ||
355 | } | ||
356 | 83665 | } | |
357 | |||
358 | 60 | set<CURL *>::iterator i = s3fanout_mgr->pool_handles_inuse_->begin(); | |
359 | 60 | const set<CURL *>::const_iterator i_end = s3fanout_mgr->pool_handles_inuse_ | |
360 | 60 | ->end(); | |
361 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 60 times.
|
60 | for (; i != i_end; ++i) { |
362 | ✗ | curl_multi_remove_handle(s3fanout_mgr->curl_multi_, *i); | |
363 | ✗ | curl_easy_cleanup(*i); | |
364 | } | ||
365 | 60 | s3fanout_mgr->pool_handles_inuse_->clear(); | |
366 | 60 | free(s3fanout_mgr->watch_fds_); | |
367 | |||
368 |
1/2✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
|
60 | LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread terminated"); |
369 | 60 | return NULL; | |
370 | } | ||
371 | |||
372 | |||
373 | /** | ||
374 | * Gets an idle CURL handle from the pool. Creates a new one and adds it to | ||
375 | * the pool if necessary. | ||
376 | */ | ||
377 | 3070 | CURL *S3FanoutManager::AcquireCurlHandle() const { | |
378 | CURL *handle; | ||
379 | |||
380 | 3070 | const MutexLockGuard guard(curl_handle_lock_); | |
381 | |||
382 |
2/2✓ Branch 1 taken 245 times.
✓ Branch 2 taken 2825 times.
|
3070 | if (pool_handles_idle_->empty()) { |
383 | CURLcode retval; | ||
384 | |||
385 | // Create a new handle | ||
386 |
1/2✓ Branch 1 taken 245 times.
✗ Branch 2 not taken.
|
245 | handle = curl_easy_init(); |
387 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 245 times.
|
245 | assert(handle != NULL); |
388 | |||
389 | // Other settings | ||
390 |
1/2✓ Branch 1 taken 245 times.
✗ Branch 2 not taken.
|
245 | retval = curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1); |
391 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 245 times.
|
245 | assert(retval == CURLE_OK); |
392 |
1/2✓ Branch 1 taken 245 times.
✗ Branch 2 not taken.
|
245 | retval = curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION, |
393 | CallbackCurlHeader); | ||
394 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 245 times.
|
245 | assert(retval == CURLE_OK); |
395 |
1/2✓ Branch 1 taken 245 times.
✗ Branch 2 not taken.
|
245 | retval = curl_easy_setopt(handle, CURLOPT_READFUNCTION, CallbackCurlData); |
396 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 245 times.
|
245 | assert(retval == CURLE_OK); |
397 |
1/2✓ Branch 1 taken 245 times.
✗ Branch 2 not taken.
|
245 | retval = curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlBody); |
398 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 245 times.
|
245 | assert(retval == CURLE_OK); |
399 | } else { | ||
400 | 2825 | handle = *(pool_handles_idle_->begin()); | |
401 |
1/2✓ Branch 2 taken 2825 times.
✗ Branch 3 not taken.
|
2825 | pool_handles_idle_->erase(pool_handles_idle_->begin()); |
402 | } | ||
403 | |||
404 |
1/2✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
|
3070 | pool_handles_inuse_->insert(handle); |
405 | |||
406 | 3070 | return handle; | |
407 | 3070 | } | |
408 | |||
409 | |||
410 | 3070 | void S3FanoutManager::ReleaseCurlHandle(JobInfo *info, CURL *handle) const { | |
411 |
1/2✓ Branch 0 taken 3070 times.
✗ Branch 1 not taken.
|
3070 | if (info->http_headers) { |
412 |
1/2✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
|
3070 | curl_slist_free_all(info->http_headers); |
413 | 3070 | info->http_headers = NULL; | |
414 | } | ||
415 | |||
416 | 3070 | const MutexLockGuard guard(curl_handle_lock_); | |
417 | |||
418 |
1/2✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
|
3070 | const set<CURL *>::iterator elem = pool_handles_inuse_->find(handle); |
419 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 3070 times.
|
3070 | assert(elem != pool_handles_inuse_->end()); |
420 | |||
421 |
2/2✓ Branch 1 taken 145 times.
✓ Branch 2 taken 2925 times.
|
3070 | if (pool_handles_idle_->size() > config_.pool_max_handles) { |
422 |
1/2✓ Branch 1 taken 145 times.
✗ Branch 2 not taken.
|
145 | const CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, NULL); |
423 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 145 times.
|
145 | assert(retval == CURLE_OK); |
424 |
1/2✓ Branch 1 taken 145 times.
✗ Branch 2 not taken.
|
145 | curl_easy_cleanup(handle); |
425 | const std::map<CURL *, S3FanOutDnsEntry *>::size_type | ||
426 |
1/2✓ Branch 1 taken 145 times.
✗ Branch 2 not taken.
|
145 | retitems = curl_sharehandles_->erase(handle); |
427 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 145 times.
|
145 | assert(retitems == 1); |
428 | } else { | ||
429 |
1/2✓ Branch 1 taken 2925 times.
✗ Branch 2 not taken.
|
2925 | pool_handles_idle_->insert(handle); |
430 | } | ||
431 | |||
432 |
1/2✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
|
3070 | pool_handles_inuse_->erase(elem); |
433 | 3070 | } | |
434 | |||
435 | 60 | void S3FanoutManager::InitPipeWatchFds() { | |
436 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
|
60 | assert(watch_fds_inuse_ == 0); |
437 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
|
60 | assert(watch_fds_size_ >= 2); |
438 | 60 | watch_fds_[0].fd = pipe_terminate_[0]; | |
439 | 60 | watch_fds_[0].events = POLLIN | POLLPRI; | |
440 | 60 | watch_fds_[0].revents = 0; | |
441 | 60 | ++watch_fds_inuse_; | |
442 | 60 | watch_fds_[1].fd = pipe_jobs_[0]; | |
443 | 60 | watch_fds_[1].events = POLLIN | POLLPRI; | |
444 | 60 | watch_fds_[1].revents = 0; | |
445 | 60 | ++watch_fds_inuse_; | |
446 | 60 | } | |
447 | |||
448 | /** | ||
449 | * The Amazon AWS 2 authorization header according to | ||
450 | * http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html#ConstructingTheAuthenticationHeader | ||
451 | */ | ||
452 | 6110 | bool S3FanoutManager::MkV2Authz(const JobInfo &info, | |
453 | vector<string> *headers) const { | ||
454 | 6110 | string payload_hash; | |
455 |
1/2✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
|
6110 | const bool retval = MkPayloadHash(info, &payload_hash); |
456 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
|
6110 | if (!retval) |
457 | ✗ | return false; | |
458 |
1/2✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
|
6110 | const string content_type = GetContentType(info); |
459 |
1/2✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
|
6110 | const string request = GetRequestString(info); |
460 | |||
461 |
1/2✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
|
6110 | const string timestamp = RfcTimestamp(); |
462 |
5/10✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 6110 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 6110 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 6110 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 6110 times.
✗ Branch 14 not taken.
|
12220 | string to_sign = request + "\n" + payload_hash + "\n" + content_type + "\n" |
463 |
2/4✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 6110 times.
✗ Branch 5 not taken.
|
12220 | + timestamp + "\n"; |
464 |
2/4✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 6110 times.
✗ Branch 4 not taken.
|
6110 | if (config_.x_amz_acl != "") { |
465 |
2/4✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 6110 times.
✗ Branch 5 not taken.
|
12220 | to_sign += "x-amz-acl:" + config_.x_amz_acl + "\n" + // default ACL |
466 |
5/10✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 6110 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 6110 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 6110 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 6110 times.
✗ Branch 14 not taken.
|
12220 | "/" + config_.bucket + "/" + info.object_key; |
467 | } | ||
468 |
1/2✓ Branch 3 taken 6110 times.
✗ Branch 4 not taken.
|
6110 | LogCvmfs(kLogS3Fanout, kLogDebug, "%s string to sign for: %s", |
469 | request.c_str(), info.object_key.c_str()); | ||
470 | |||
471 |
1/2✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
|
6110 | shash::Any hmac; |
472 | 6110 | hmac.algorithm = shash::kSha1; | |
473 |
1/2✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
|
6110 | shash::Hmac(config_.secret_key, |
474 | 6110 | reinterpret_cast<const unsigned char *>(to_sign.data()), | |
475 | 6110 | to_sign.length(), &hmac); | |
476 | |||
477 |
3/6✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 6110 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 6110 times.
✗ Branch 8 not taken.
|
18330 | headers->push_back("Authorization: AWS " + config_.access_key + ":" |
478 |
3/6✓ Branch 2 taken 6110 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 6110 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 6110 times.
✗ Branch 9 not taken.
|
30550 | + Base64(string(reinterpret_cast<char *>(hmac.digest), |
479 | 6110 | hmac.GetDigestSize()))); | |
480 |
2/4✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 6110 times.
✗ Branch 5 not taken.
|
6110 | headers->push_back("Date: " + timestamp); |
481 |
2/4✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 6110 times.
✗ Branch 5 not taken.
|
6110 | headers->push_back("X-Amz-Acl: " + config_.x_amz_acl); |
482 |
2/2✓ Branch 1 taken 3040 times.
✓ Branch 2 taken 3070 times.
|
6110 | if (!payload_hash.empty()) |
483 |
2/4✓ Branch 1 taken 3040 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 3040 times.
✗ Branch 5 not taken.
|
3040 | headers->push_back("Content-MD5: " + payload_hash); |
484 |
2/2✓ Branch 1 taken 3040 times.
✓ Branch 2 taken 3070 times.
|
6110 | if (!content_type.empty()) |
485 |
2/4✓ Branch 1 taken 3040 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 3040 times.
✗ Branch 5 not taken.
|
3040 | headers->push_back("Content-Type: " + content_type); |
486 | 6110 | return true; | |
487 | 6110 | } | |
488 | |||
489 | |||
490 | ✗ | string S3FanoutManager::GetUriEncode(const string &val, | |
491 | bool encode_slash) const { | ||
492 | ✗ | string result; | |
493 | ✗ | const unsigned len = val.length(); | |
494 | ✗ | result.reserve(len); | |
495 | ✗ | for (unsigned i = 0; i < len; ++i) { | |
496 | ✗ | const char c = val[i]; | |
497 | ✗ | if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') | |
498 | ✗ | || (c >= '0' && c <= '9') || c == '_' || c == '-' || c == '~' | |
499 | ✗ | || c == '.') { | |
500 | ✗ | result.push_back(c); | |
501 | ✗ | } else if (c == '/') { | |
502 | ✗ | if (encode_slash) { | |
503 | ✗ | result += "%2F"; | |
504 | } else { | ||
505 | ✗ | result.push_back(c); | |
506 | } | ||
507 | } else { | ||
508 | ✗ | result.push_back('%'); | |
509 | ✗ | result.push_back((c / 16) + ((c / 16 <= 9) ? '0' : 'A' - 10)); | |
510 | ✗ | result.push_back((c % 16) + ((c % 16 <= 9) ? '0' : 'A' - 10)); | |
511 | } | ||
512 | } | ||
513 | ✗ | return result; | |
514 | } | ||
515 | |||
516 | |||
517 | ✗ | string S3FanoutManager::GetAwsV4SigningKey(const string &date) const { | |
518 | ✗ | if (last_signing_key_.first == date) | |
519 | ✗ | return last_signing_key_.second; | |
520 | |||
521 | ✗ | const string date_key = shash::Hmac256("AWS4" + config_.secret_key, date, | |
522 | ✗ | true); | |
523 | ✗ | const string date_region_key = shash::Hmac256(date_key, config_.region, true); | |
524 | const string date_region_service_key = shash::Hmac256(date_region_key, "s3", | ||
525 | ✗ | true); | |
526 | string signing_key = shash::Hmac256(date_region_service_key, "aws4_request", | ||
527 | ✗ | true); | |
528 | ✗ | last_signing_key_.first = date; | |
529 | ✗ | last_signing_key_.second = signing_key; | |
530 | ✗ | return signing_key; | |
531 | } | ||
532 | |||
533 | |||
534 | /** | ||
535 | * The Amazon AWS4 authorization header according to | ||
536 | * http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-auth-using-authorization-header.html | ||
537 | */ | ||
538 | ✗ | bool S3FanoutManager::MkV4Authz(const JobInfo &info, | |
539 | vector<string> *headers) const { | ||
540 | ✗ | string payload_hash; | |
541 | ✗ | const bool retval = MkPayloadHash(info, &payload_hash); | |
542 | ✗ | if (!retval) | |
543 | ✗ | return false; | |
544 | ✗ | const string content_type = GetContentType(info); | |
545 | ✗ | const string timestamp = IsoTimestamp(); | |
546 | ✗ | const string date = timestamp.substr(0, 8); | |
547 | ✗ | vector<string> tokens = SplitString(complete_hostname_, ':'); | |
548 | ✗ | assert(tokens.size() <= 2); | |
549 | ✗ | string canonical_hostname = tokens[0]; | |
550 | |||
551 | // if we could split the hostname in two and if the port is *NOT* a default | ||
552 | // one | ||
553 | ✗ | if (tokens.size() == 2 | |
554 | ✗ | && !((String2Uint64(tokens[1]) == kDefaultHTTPPort) | |
555 | ✗ | || (String2Uint64(tokens[1]) == kDefaultHTTPSPort))) | |
556 | ✗ | canonical_hostname += ":" + tokens[1]; | |
557 | |||
558 | ✗ | string signed_headers; | |
559 | ✗ | string canonical_headers; | |
560 | ✗ | if (!content_type.empty()) { | |
561 | ✗ | signed_headers += "content-type;"; | |
562 | ✗ | headers->push_back("Content-Type: " + content_type); | |
563 | ✗ | canonical_headers += "content-type:" + content_type + "\n"; | |
564 | } | ||
565 | ✗ | if (config_.x_amz_acl != "") { | |
566 | ✗ | signed_headers += "host;x-amz-acl;x-amz-content-sha256;x-amz-date"; | |
567 | } else { | ||
568 | ✗ | signed_headers += "host;x-amz-content-sha256;x-amz-date"; | |
569 | } | ||
570 | ✗ | canonical_headers += "host:" + canonical_hostname + "\n"; | |
571 | ✗ | if (config_.x_amz_acl != "") { | |
572 | ✗ | canonical_headers += "x-amz-acl:" + config_.x_amz_acl + "\n"; | |
573 | } | ||
574 | ✗ | canonical_headers += "x-amz-content-sha256:" + payload_hash + "\n" | |
575 | ✗ | + "x-amz-date:" + timestamp + "\n"; | |
576 | |||
577 | ✗ | const string scope = date + "/" + config_.region + "/s3/aws4_request"; | |
578 | ✗ | const string uri = config_.dns_buckets ? (string("/") + info.object_key) | |
579 | ✗ | : (string("/") + config_.bucket + "/" | |
580 | ✗ | + info.object_key); | |
581 | |||
582 | ✗ | const string canonical_request = GetRequestString(info) + "\n" | |
583 | ✗ | + GetUriEncode(uri, false) + "\n" + "\n" | |
584 | ✗ | + canonical_headers + "\n" + signed_headers | |
585 | ✗ | + "\n" + payload_hash; | |
586 | |||
587 | ✗ | const string hash_request = shash::Sha256String(canonical_request.c_str()); | |
588 | |||
589 | ✗ | const string string_to_sign = "AWS4-HMAC-SHA256\n" + timestamp + "\n" + scope | |
590 | ✗ | + "\n" + hash_request; | |
591 | |||
592 | ✗ | const string signing_key = GetAwsV4SigningKey(date); | |
593 | ✗ | const string signature = shash::Hmac256(signing_key, string_to_sign); | |
594 | |||
595 | ✗ | headers->push_back("X-Amz-Acl: " + config_.x_amz_acl); | |
596 | ✗ | headers->push_back("X-Amz-Content-Sha256: " + payload_hash); | |
597 | ✗ | headers->push_back("X-Amz-Date: " + timestamp); | |
598 | ✗ | headers->push_back("Authorization: AWS4-HMAC-SHA256 " | |
599 | "Credential=" | ||
600 | ✗ | + config_.access_key + "/" + scope | |
601 | ✗ | + "," | |
602 | "SignedHeaders=" | ||
603 | ✗ | + signed_headers | |
604 | ✗ | + "," | |
605 | "Signature=" | ||
606 | ✗ | + signature); | |
607 | ✗ | return true; | |
608 | } | ||
609 | |||
610 | /** | ||
611 | * The Azure Blob authorization header according to | ||
612 | * https://docs.microsoft.com/en-us/rest/api/storageservices/authorize-with-shared-key | ||
613 | */ | ||
614 | ✗ | bool S3FanoutManager::MkAzureAuthz(const JobInfo &info, | |
615 | vector<string> *headers) const { | ||
616 | ✗ | const string timestamp = RfcTimestamp(); | |
617 | const string canonical_headers = "x-ms-blob-type:BlockBlob\nx-ms-date:" | ||
618 | ✗ | + timestamp + "\nx-ms-version:2011-08-18"; | |
619 | ✗ | const string canonical_resource = "/" + config_.access_key + "/" | |
620 | ✗ | + config_.bucket + "/" + info.object_key; | |
621 | |||
622 | ✗ | string string_to_sign; | |
623 | ✗ | if ((info.request == JobInfo::kReqHeadOnly) | |
624 | ✗ | || (info.request == JobInfo::kReqHeadPut) | |
625 | ✗ | || (info.request == JobInfo::kReqDelete)) { | |
626 | ✗ | string_to_sign = GetRequestString(info) + string("\n\n\n") | |
627 | ✗ | + "\n\n\n\n\n\n\n\n\n" + canonical_headers + "\n" | |
628 | ✗ | + canonical_resource; | |
629 | } else { | ||
630 | ✗ | string_to_sign = GetRequestString(info) + string("\n\n\n") | |
631 | ✗ | + string(StringifyInt(info.origin->GetSize())) | |
632 | ✗ | + "\n\n\n\n\n\n\n\n\n" + canonical_headers + "\n" | |
633 | ✗ | + canonical_resource; | |
634 | } | ||
635 | |||
636 | ✗ | string signing_key; | |
637 | ✗ | const int retval = Debase64(config_.secret_key, &signing_key); | |
638 | ✗ | if (!retval) | |
639 | ✗ | return false; | |
640 | |||
641 | ✗ | const string signature = shash::Hmac256(signing_key, string_to_sign, true); | |
642 | |||
643 | ✗ | headers->push_back("x-ms-date: " + timestamp); | |
644 | ✗ | headers->push_back("x-ms-version: 2011-08-18"); | |
645 | ✗ | headers->push_back("Authorization: SharedKey " + config_.access_key + ":" | |
646 | ✗ | + Base64(signature)); | |
647 | ✗ | headers->push_back("x-ms-blob-type: BlockBlob"); | |
648 | ✗ | return true; | |
649 | } | ||
650 | |||
651 | 6110 | void S3FanoutManager::InitializeDnsSettingsCurl(CURL *handle, | |
652 | CURLSH *sharehandle, | ||
653 | curl_slist *clist) const { | ||
654 | 6110 | CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, sharehandle); | |
655 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
|
6110 | assert(retval == CURLE_OK); |
656 | 6110 | retval = curl_easy_setopt(handle, CURLOPT_RESOLVE, clist); | |
657 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
|
6110 | assert(retval == CURLE_OK); |
658 | 6110 | } | |
659 | |||
660 | |||
661 | 6110 | int S3FanoutManager::InitializeDnsSettings(CURL *handle, | |
662 | std::string host_with_port) const { | ||
663 | // Use existing handle | ||
664 | const std::map<CURL *, S3FanOutDnsEntry *>::const_iterator | ||
665 |
1/2✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
|
6110 | it = curl_sharehandles_->find(handle); |
666 |
2/2✓ Branch 3 taken 5865 times.
✓ Branch 4 taken 245 times.
|
6110 | if (it != curl_sharehandles_->end()) { |
667 |
1/2✓ Branch 1 taken 5865 times.
✗ Branch 2 not taken.
|
5865 | InitializeDnsSettingsCurl(handle, it->second->sharehandle, |
668 | 5865 | it->second->clist); | |
669 | 5865 | return 0; | |
670 | } | ||
671 | |||
672 | // Add protocol information for extraction of fields for DNS | ||
673 |
2/4✓ Branch 1 taken 245 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 245 times.
✗ Branch 4 not taken.
|
245 | if (!IsHttpUrl(host_with_port)) |
674 |
2/4✓ Branch 1 taken 245 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 245 times.
✗ Branch 5 not taken.
|
245 | host_with_port = config_.protocol + "://" + host_with_port; |
675 |
1/2✓ Branch 1 taken 245 times.
✗ Branch 2 not taken.
|
245 | const std::string remote_host = dns::ExtractHost(host_with_port); |
676 |
1/2✓ Branch 1 taken 245 times.
✗ Branch 2 not taken.
|
245 | const std::string remote_port = dns::ExtractPort(host_with_port); |
677 | |||
678 | // If we have the name already resolved, use the least used IP | ||
679 | 245 | S3FanOutDnsEntry *useme = NULL; | |
680 | 245 | unsigned int usemin = UINT_MAX; | |
681 | 245 | std::set<S3FanOutDnsEntry *>::iterator its3 = sharehandles_->begin(); | |
682 |
2/2✓ Branch 3 taken 195 times.
✓ Branch 4 taken 245 times.
|
440 | for (; its3 != sharehandles_->end(); ++its3) { |
683 |
1/2✓ Branch 2 taken 195 times.
✗ Branch 3 not taken.
|
195 | if ((*its3)->dns_name == remote_host) { |
684 |
1/2✓ Branch 1 taken 195 times.
✗ Branch 2 not taken.
|
195 | if (usemin >= (*its3)->counter) { |
685 | 195 | usemin = (*its3)->counter; | |
686 | 195 | useme = (*its3); | |
687 | } | ||
688 | } | ||
689 | } | ||
690 |
2/2✓ Branch 0 taken 195 times.
✓ Branch 1 taken 50 times.
|
245 | if (useme != NULL) { |
691 |
1/2✓ Branch 1 taken 195 times.
✗ Branch 2 not taken.
|
195 | curl_sharehandles_->insert( |
692 | 195 | std::pair<CURL *, S3FanOutDnsEntry *>(handle, useme)); | |
693 | 195 | useme->counter++; | |
694 |
1/2✓ Branch 1 taken 195 times.
✗ Branch 2 not taken.
|
195 | InitializeDnsSettingsCurl(handle, useme->sharehandle, useme->clist); |
695 | 195 | return 0; | |
696 | } | ||
697 | |||
698 | // We need to resolve the hostname | ||
699 | // TODO(ssheikki): support ipv6 also... if (opt_ipv4_only_) | ||
700 |
1/2✓ Branch 1 taken 50 times.
✗ Branch 2 not taken.
|
50 | const dns::Host host = resolver_->Resolve(remote_host); |
701 |
1/2✓ Branch 2 taken 50 times.
✗ Branch 3 not taken.
|
50 | set<string> ipv4_addresses = host.ipv4_addresses(); |
702 | 50 | std::set<string>::iterator its = ipv4_addresses.begin(); | |
703 | 50 | S3FanOutDnsEntry *dnse = NULL; | |
704 |
2/2✓ Branch 3 taken 50 times.
✓ Branch 4 taken 50 times.
|
100 | for (; its != ipv4_addresses.end(); ++its) { |
705 |
2/4✓ Branch 1 taken 50 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 50 times.
✗ Branch 5 not taken.
|
50 | dnse = new S3FanOutDnsEntry(); |
706 | 50 | dnse->counter = 0; | |
707 |
1/2✓ Branch 1 taken 50 times.
✗ Branch 2 not taken.
|
50 | dnse->dns_name = remote_host; |
708 |
4/12✗ Branch 1 not taken.
✓ Branch 2 taken 50 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 8 taken 50 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 50 times.
✗ Branch 12 not taken.
✗ Branch 14 not taken.
✓ Branch 15 taken 50 times.
✗ Branch 18 not taken.
✗ Branch 19 not taken.
|
50 | dnse->port = remote_port.size() == 0 ? "80" : remote_port; |
709 |
1/2✓ Branch 2 taken 50 times.
✗ Branch 3 not taken.
|
50 | dnse->ip = *its; |
710 | 50 | dnse->clist = NULL; | |
711 |
1/2✓ Branch 2 taken 50 times.
✗ Branch 3 not taken.
|
50 | dnse->clist = curl_slist_append( |
712 | dnse->clist, | ||
713 |
4/8✓ Branch 1 taken 50 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 50 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 50 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 50 times.
✗ Branch 11 not taken.
|
100 | (dnse->dns_name + ":" + dnse->port + ":" + dnse->ip).c_str()); |
714 |
1/2✓ Branch 1 taken 50 times.
✗ Branch 2 not taken.
|
50 | dnse->sharehandle = curl_share_init(); |
715 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 50 times.
|
50 | assert(dnse->sharehandle != NULL); |
716 |
1/2✓ Branch 1 taken 50 times.
✗ Branch 2 not taken.
|
50 | const CURLSHcode share_retval = curl_share_setopt( |
717 | dnse->sharehandle, CURLSHOPT_SHARE, CURL_LOCK_DATA_DNS); | ||
718 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 50 times.
|
50 | assert(share_retval == CURLSHE_OK); |
719 |
1/2✓ Branch 1 taken 50 times.
✗ Branch 2 not taken.
|
50 | sharehandles_->insert(dnse); |
720 | } | ||
721 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 50 times.
|
50 | if (dnse == NULL) { |
722 | ✗ | LogCvmfs(kLogS3Fanout, kLogStderr | kLogSyslogErr, | |
723 | "Error: DNS resolve failed for address '%s'.", | ||
724 | remote_host.c_str()); | ||
725 | ✗ | assert(dnse != NULL); | |
726 | ✗ | return -1; | |
727 | } | ||
728 |
1/2✓ Branch 1 taken 50 times.
✗ Branch 2 not taken.
|
50 | curl_sharehandles_->insert( |
729 | 50 | std::pair<CURL *, S3FanOutDnsEntry *>(handle, dnse)); | |
730 | 50 | dnse->counter++; | |
731 |
1/2✓ Branch 1 taken 50 times.
✗ Branch 2 not taken.
|
50 | InitializeDnsSettingsCurl(handle, dnse->sharehandle, dnse->clist); |
732 | |||
733 | 50 | return 0; | |
734 | 245 | } | |
735 | |||
736 | |||
737 | 6110 | bool S3FanoutManager::MkPayloadHash(const JobInfo &info, | |
738 | string *hex_hash) const { | ||
739 |
2/2✓ Branch 0 taken 6085 times.
✓ Branch 1 taken 25 times.
|
6110 | if ((info.request == JobInfo::kReqHeadOnly) |
740 |
2/2✓ Branch 0 taken 3045 times.
✓ Branch 1 taken 3040 times.
|
6085 | || (info.request == JobInfo::kReqHeadPut) |
741 |
2/2✓ Branch 0 taken 5 times.
✓ Branch 1 taken 3040 times.
|
3045 | || (info.request == JobInfo::kReqDelete)) { |
742 |
1/4✓ Branch 0 taken 3070 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
|
3070 | switch (config_.authz_method) { |
743 | 3070 | case kAuthzAwsV2: | |
744 | 3070 | hex_hash->clear(); | |
745 | 3070 | break; | |
746 | ✗ | case kAuthzAwsV4: | |
747 | // Sha256 over empty string | ||
748 | *hex_hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b78" | ||
749 | ✗ | "52b855"; | |
750 | ✗ | break; | |
751 | ✗ | case kAuthzAzure: | |
752 | // no payload hash required for Azure signature | ||
753 | ✗ | hex_hash->clear(); | |
754 | ✗ | break; | |
755 | ✗ | default: | |
756 | ✗ | PANIC(NULL); | |
757 | } | ||
758 | 3070 | return true; | |
759 | } | ||
760 | |||
761 | // PUT, there is actually payload | ||
762 |
1/2✓ Branch 1 taken 3040 times.
✗ Branch 2 not taken.
|
3040 | shash::Any payload_hash(shash::kMd5); |
763 | |||
764 | unsigned char *data; | ||
765 | 3040 | const unsigned int nbytes = info.origin->Data( | |
766 |
2/4✓ Branch 2 taken 3040 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 3040 times.
✗ Branch 6 not taken.
|
3040 | reinterpret_cast<void **>(&data), info.origin->GetSize(), 0); |
767 |
2/4✓ Branch 2 taken 3040 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 3040 times.
|
3040 | assert(nbytes == info.origin->GetSize()); |
768 | |||
769 |
1/4✓ Branch 0 taken 3040 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
|
3040 | switch (config_.authz_method) { |
770 | 3040 | case kAuthzAwsV2: | |
771 |
1/2✓ Branch 1 taken 3040 times.
✗ Branch 2 not taken.
|
3040 | shash::HashMem(data, nbytes, &payload_hash); |
772 |
2/4✓ Branch 2 taken 3040 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 3040 times.
✗ Branch 6 not taken.
|
9120 | *hex_hash = Base64(string(reinterpret_cast<char *>(payload_hash.digest), |
773 | 6080 | payload_hash.GetDigestSize())); | |
774 | 3040 | return true; | |
775 | ✗ | case kAuthzAwsV4: | |
776 | ✗ | *hex_hash = shash::Sha256Mem(data, nbytes); | |
777 | ✗ | return true; | |
778 | ✗ | case kAuthzAzure: | |
779 | // no payload hash required for Azure signature | ||
780 | ✗ | hex_hash->clear(); | |
781 | ✗ | return true; | |
782 | ✗ | default: | |
783 | ✗ | PANIC(NULL); | |
784 | } | ||
785 | } | ||
786 | |||
787 | 6115 | string S3FanoutManager::GetRequestString(const JobInfo &info) const { | |
788 |
3/4✓ Branch 0 taken 3065 times.
✓ Branch 1 taken 3040 times.
✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
|
6115 | switch (info.request) { |
789 | 3065 | case JobInfo::kReqHeadOnly: | |
790 | case JobInfo::kReqHeadPut: | ||
791 |
1/2✓ Branch 2 taken 3065 times.
✗ Branch 3 not taken.
|
3065 | return "HEAD"; |
792 | 3040 | case JobInfo::kReqPutCas: | |
793 | case JobInfo::kReqPutDotCvmfs: | ||
794 | case JobInfo::kReqPutHtml: | ||
795 | case JobInfo::kReqPutBucket: | ||
796 |
1/2✓ Branch 2 taken 3040 times.
✗ Branch 3 not taken.
|
3040 | return "PUT"; |
797 | 10 | case JobInfo::kReqDelete: | |
798 |
1/2✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
|
10 | return "DELETE"; |
799 | ✗ | default: | |
800 | ✗ | PANIC(NULL); | |
801 | } | ||
802 | } | ||
803 | |||
804 | |||
805 | 6110 | string S3FanoutManager::GetContentType(const JobInfo &info) const { | |
806 |
2/6✓ Branch 0 taken 3070 times.
✓ Branch 1 taken 3040 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
|
6110 | switch (info.request) { |
807 | 3070 | case JobInfo::kReqHeadOnly: | |
808 | case JobInfo::kReqHeadPut: | ||
809 | case JobInfo::kReqDelete: | ||
810 |
1/2✓ Branch 2 taken 3070 times.
✗ Branch 3 not taken.
|
3070 | return ""; |
811 | 3040 | case JobInfo::kReqPutCas: | |
812 |
1/2✓ Branch 2 taken 3040 times.
✗ Branch 3 not taken.
|
3040 | return "application/octet-stream"; |
813 | ✗ | case JobInfo::kReqPutDotCvmfs: | |
814 | ✗ | return "application/x-cvmfs"; | |
815 | ✗ | case JobInfo::kReqPutHtml: | |
816 | ✗ | return "text/html"; | |
817 | ✗ | case JobInfo::kReqPutBucket: | |
818 | ✗ | return "text/xml"; | |
819 | ✗ | default: | |
820 | ✗ | PANIC(NULL); | |
821 | } | ||
822 | } | ||
823 | |||
824 | |||
825 | /** | ||
826 | * Request parameters set the URL and other options such as timeout and | ||
827 | * proxy. | ||
828 | */ | ||
829 | 6110 | Failures S3FanoutManager::InitializeRequest(JobInfo *info, CURL *handle) const { | |
830 | // Initialize internal download state | ||
831 | 6110 | info->curl_handle = handle; | |
832 | 6110 | info->error_code = kFailOk; | |
833 | 6110 | info->http_error = 0; | |
834 | 6110 | info->num_retries = 0; | |
835 | 6110 | info->backoff_ms = 0; | |
836 | 6110 | info->throttle_ms = 0; | |
837 | 6110 | info->throttle_timestamp = 0; | |
838 | 6110 | info->http_headers = NULL; | |
839 | // info->payload_size is needed in S3Uploader::MainCollectResults, | ||
840 | // where info->origin is already destroyed. | ||
841 |
1/2✓ Branch 2 taken 6110 times.
✗ Branch 3 not taken.
|
6110 | info->payload_size = info->origin->GetSize(); |
842 | |||
843 |
2/4✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 6110 times.
✗ Branch 5 not taken.
|
6110 | InitializeDnsSettings(handle, complete_hostname_); |
844 | |||
845 | CURLcode retval; | ||
846 |
2/2✓ Branch 0 taken 6085 times.
✓ Branch 1 taken 25 times.
|
6110 | if ((info->request == JobInfo::kReqHeadOnly) |
847 |
2/2✓ Branch 0 taken 3045 times.
✓ Branch 1 taken 3040 times.
|
6085 | || (info->request == JobInfo::kReqHeadPut) |
848 |
2/2✓ Branch 0 taken 5 times.
✓ Branch 1 taken 3040 times.
|
3045 | || (info->request == JobInfo::kReqDelete)) { |
849 |
1/2✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
|
3070 | retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 0); |
850 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3070 times.
|
3070 | assert(retval == CURLE_OK); |
851 |
1/2✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
|
3070 | retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 1); |
852 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3070 times.
|
3070 | assert(retval == CURLE_OK); |
853 | |||
854 |
2/2✓ Branch 0 taken 5 times.
✓ Branch 1 taken 3065 times.
|
3070 | if (info->request == JobInfo::kReqDelete) { |
855 |
2/4✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 5 times.
✗ Branch 6 not taken.
|
5 | retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, |
856 | GetRequestString(*info).c_str()); | ||
857 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 5 times.
|
5 | assert(retval == CURLE_OK); |
858 | } else { | ||
859 |
1/2✓ Branch 1 taken 3065 times.
✗ Branch 2 not taken.
|
3065 | retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL); |
860 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3065 times.
|
3065 | assert(retval == CURLE_OK); |
861 | } | ||
862 | } else { | ||
863 |
1/2✓ Branch 1 taken 3040 times.
✗ Branch 2 not taken.
|
3040 | retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL); |
864 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3040 times.
|
3040 | assert(retval == CURLE_OK); |
865 |
1/2✓ Branch 1 taken 3040 times.
✗ Branch 2 not taken.
|
3040 | retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 1); |
866 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3040 times.
|
3040 | assert(retval == CURLE_OK); |
867 |
1/2✓ Branch 1 taken 3040 times.
✗ Branch 2 not taken.
|
3040 | retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 0); |
868 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3040 times.
|
3040 | assert(retval == CURLE_OK); |
869 |
2/4✓ Branch 2 taken 3040 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 3040 times.
✗ Branch 6 not taken.
|
3040 | retval = curl_easy_setopt(handle, CURLOPT_INFILESIZE_LARGE, |
870 | static_cast<curl_off_t>(info->origin->GetSize())); | ||
871 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3040 times.
|
3040 | assert(retval == CURLE_OK); |
872 | |||
873 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3040 times.
|
3040 | if (info->request == JobInfo::kReqPutDotCvmfs) { |
874 | ✗ | info->http_headers = curl_slist_append(info->http_headers, | |
875 | kCacheControlDotCvmfs); | ||
876 |
1/2✓ Branch 0 taken 3040 times.
✗ Branch 1 not taken.
|
3040 | } else if (info->request == JobInfo::kReqPutCas) { |
877 |
1/2✓ Branch 1 taken 3040 times.
✗ Branch 2 not taken.
|
3040 | info->http_headers = curl_slist_append(info->http_headers, |
878 | kCacheControlCas); | ||
879 | } | ||
880 | } | ||
881 | |||
882 | bool retval_b; | ||
883 | |||
884 | // Authorization | ||
885 | 6110 | vector<string> authz_headers; | |
886 |
1/4✓ Branch 0 taken 6110 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
|
6110 | switch (config_.authz_method) { |
887 | 6110 | case kAuthzAwsV2: | |
888 |
1/2✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
|
6110 | retval_b = MkV2Authz(*info, &authz_headers); |
889 | 6110 | break; | |
890 | ✗ | case kAuthzAwsV4: | |
891 | ✗ | retval_b = MkV4Authz(*info, &authz_headers); | |
892 | ✗ | break; | |
893 | ✗ | case kAuthzAzure: | |
894 | ✗ | retval_b = MkAzureAuthz(*info, &authz_headers); | |
895 | ✗ | break; | |
896 | ✗ | default: | |
897 | ✗ | PANIC(NULL); | |
898 | } | ||
899 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
|
6110 | if (!retval_b) |
900 | ✗ | return kFailLocalIO; | |
901 |
2/2✓ Branch 1 taken 24410 times.
✓ Branch 2 taken 6110 times.
|
30520 | for (unsigned i = 0; i < authz_headers.size(); ++i) { |
902 |
1/2✓ Branch 2 taken 24410 times.
✗ Branch 3 not taken.
|
24410 | info->http_headers = curl_slist_append(info->http_headers, |
903 | 24410 | authz_headers[i].c_str()); | |
904 | } | ||
905 | |||
906 | // Common headers | ||
907 |
1/2✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
|
6110 | info->http_headers = curl_slist_append(info->http_headers, |
908 | "Connection: Keep-Alive"); | ||
909 |
1/2✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
|
6110 | info->http_headers = curl_slist_append(info->http_headers, "Pragma:"); |
910 | // No 100-continue | ||
911 |
1/2✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
|
6110 | info->http_headers = curl_slist_append(info->http_headers, "Expect:"); |
912 | // Strip unnecessary header | ||
913 |
1/2✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
|
6110 | info->http_headers = curl_slist_append(info->http_headers, "Accept:"); |
914 |
1/2✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
|
6110 | info->http_headers = curl_slist_append(info->http_headers, |
915 | 6110 | user_agent_->c_str()); | |
916 | |||
917 | // Set curl parameters | ||
918 |
1/2✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
|
6110 | retval = curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info)); |
919 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
|
6110 | assert(retval == CURLE_OK); |
920 |
1/2✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
|
6110 | retval = curl_easy_setopt(handle, CURLOPT_HEADERDATA, |
921 | static_cast<void *>(info)); | ||
922 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
|
6110 | assert(retval == CURLE_OK); |
923 |
1/2✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
|
6110 | retval = curl_easy_setopt(handle, CURLOPT_READDATA, |
924 | static_cast<void *>(info)); | ||
925 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
|
6110 | assert(retval == CURLE_OK); |
926 |
1/2✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
|
6110 | retval = curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->http_headers); |
927 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
|
6110 | assert(retval == CURLE_OK); |
928 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
|
6110 | if (opt_ipv4_only_) { |
929 | ✗ | retval = curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4); | |
930 | ✗ | assert(retval == CURLE_OK); | |
931 | } | ||
932 | // Follow HTTP redirects | ||
933 |
1/2✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
|
6110 | retval = curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L); |
934 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
|
6110 | assert(retval == CURLE_OK); |
935 | |||
936 |
1/2✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
|
6110 | retval = curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, info->errorbuffer); |
937 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
|
6110 | assert(retval == CURLE_OK); |
938 | |||
939 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 6110 times.
|
6110 | if (config_.protocol == "https") { |
940 | ✗ | retval = curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 1L); | |
941 | ✗ | assert(retval == CURLE_OK); | |
942 | ✗ | retval = curl_easy_setopt(handle, CURLOPT_PROXY_SSL_VERIFYPEER, 1L); | |
943 | ✗ | assert(retval == CURLE_OK); | |
944 | ✗ | const bool add_cert = ssl_certificate_store_.ApplySslCertificatePath( | |
945 | handle); | ||
946 | ✗ | assert(add_cert); | |
947 | } | ||
948 | |||
949 | 6110 | return kFailOk; | |
950 | 6110 | } | |
951 | |||
952 | |||
953 | /** | ||
954 | * Sets the URL specific options such as host to use and timeout. | ||
955 | */ | ||
956 | 6110 | void S3FanoutManager::SetUrlOptions(JobInfo *info) const { | |
957 | 6110 | CURL *curl_handle = info->curl_handle; | |
958 | CURLcode retval; | ||
959 | |||
960 |
1/2✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
|
6110 | retval = curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, |
961 | config_.opt_timeout_sec); | ||
962 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
|
6110 | assert(retval == CURLE_OK); |
963 |
1/2✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
|
6110 | retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, |
964 | kLowSpeedLimit); | ||
965 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
|
6110 | assert(retval == CURLE_OK); |
966 |
1/2✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
|
6110 | retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, |
967 | config_.opt_timeout_sec); | ||
968 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
|
6110 | assert(retval == CURLE_OK); |
969 | |||
970 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
|
6110 | if (is_curl_debug_) { |
971 | ✗ | retval = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE, 1); | |
972 | ✗ | assert(retval == CURLE_OK); | |
973 | } | ||
974 | |||
975 |
1/2✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
|
6110 | const string url = MkUrl(info->object_key); |
976 |
1/2✓ Branch 2 taken 6110 times.
✗ Branch 3 not taken.
|
6110 | retval = curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str()); |
977 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
|
6110 | assert(retval == CURLE_OK); |
978 | |||
979 |
1/2✓ Branch 2 taken 6110 times.
✗ Branch 3 not taken.
|
6110 | retval = curl_easy_setopt(curl_handle, CURLOPT_PROXY, config_.proxy.c_str()); |
980 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
|
6110 | assert(retval == CURLE_OK); |
981 | 6110 | } | |
982 | |||
983 | |||
984 | /** | ||
985 | * Adds transfer time and uploaded bytes to the global counters. | ||
986 | */ | ||
987 | 6130 | void S3FanoutManager::UpdateStatistics(CURL *handle) { | |
988 | double val; | ||
989 | |||
990 |
2/4✓ Branch 1 taken 6130 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 6130 times.
✗ Branch 4 not taken.
|
6130 | if (curl_easy_getinfo(handle, CURLINFO_SIZE_UPLOAD, &val) == CURLE_OK) |
991 | 6130 | statistics_->transferred_bytes += val; | |
992 | 6130 | } | |
993 | |||
994 | |||
995 | /** | ||
996 | * Retry if possible and if not already done too often. | ||
997 | */ | ||
998 | 30 | bool S3FanoutManager::CanRetry(const JobInfo *info) { | |
999 | 30 | return (info->error_code == kFailHostConnection | |
1000 |
1/2✓ Branch 0 taken 30 times.
✗ Branch 1 not taken.
|
30 | || info->error_code == kFailHostResolve |
1001 |
1/2✓ Branch 0 taken 30 times.
✗ Branch 1 not taken.
|
30 | || info->error_code == kFailServiceUnavailable |
1002 |
2/2✓ Branch 0 taken 20 times.
✓ Branch 1 taken 10 times.
|
30 | || info->error_code == kFailRetry) |
1003 |
2/4✓ Branch 0 taken 30 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 20 times.
✗ Branch 3 not taken.
|
60 | && (info->num_retries < config_.opt_max_retries); |
1004 | } | ||
1005 | |||
1006 | |||
1007 | /** | ||
1008 | * Backoff for retry to introduce a jitter into a upload sequence. | ||
1009 | * | ||
1010 | * \return true if backoff has been performed, false otherwise | ||
1011 | */ | ||
1012 | 20 | void S3FanoutManager::Backoff(JobInfo *info) { | |
1013 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 20 times.
|
20 | if (info->error_code != kFailRetry) |
1014 | ✗ | info->num_retries++; | |
1015 | 20 | statistics_->num_retries++; | |
1016 | |||
1017 |
1/2✓ Branch 0 taken 20 times.
✗ Branch 1 not taken.
|
20 | if (info->throttle_ms > 0) { |
1018 | 20 | LogCvmfs(kLogS3Fanout, kLogDebug, "throttling for %d ms", | |
1019 | info->throttle_ms); | ||
1020 | 20 | const uint64_t now = platform_monotonic_time(); | |
1021 |
1/2✓ Branch 0 taken 20 times.
✗ Branch 1 not taken.
|
20 | if ((info->throttle_timestamp + (info->throttle_ms / 1000)) >= now) { |
1022 |
2/2✓ Branch 0 taken 5 times.
✓ Branch 1 taken 15 times.
|
20 | if ((now - timestamp_last_throttle_report_) |
1023 | > kThrottleReportIntervalSec) { | ||
1024 | 5 | LogCvmfs(kLogS3Fanout, kLogStdout, | |
1025 | "Warning: S3 backend throttling %ums " | ||
1026 | "(total backoff time so far %lums)", | ||
1027 | 5 | info->throttle_ms, statistics_->ms_throttled); | |
1028 | 5 | timestamp_last_throttle_report_ = now; | |
1029 | } | ||
1030 | 20 | statistics_->ms_throttled += info->throttle_ms; | |
1031 | 20 | SafeSleepMs(info->throttle_ms); | |
1032 | } | ||
1033 | } else { | ||
1034 | ✗ | if (info->backoff_ms == 0) { | |
1035 | // Must be != 0 | ||
1036 | ✗ | info->backoff_ms = prng_.Next(config_.opt_backoff_init_ms + 1); | |
1037 | } else { | ||
1038 | ✗ | info->backoff_ms *= 2; | |
1039 | } | ||
1040 | ✗ | if (info->backoff_ms > config_.opt_backoff_max_ms) | |
1041 | ✗ | info->backoff_ms = config_.opt_backoff_max_ms; | |
1042 | |||
1043 | ✗ | LogCvmfs(kLogS3Fanout, kLogDebug, "backing off for %d ms", | |
1044 | info->backoff_ms); | ||
1045 | ✗ | SafeSleepMs(info->backoff_ms); | |
1046 | } | ||
1047 | 20 | } | |
1048 | |||
1049 | |||
1050 | /** | ||
1051 | * Checks the result of a curl request and implements the failure logic | ||
1052 | * and takes care of cleanup. | ||
1053 | * | ||
1054 | * @return true if request should be repeated, false otherwise | ||
1055 | */ | ||
1056 | 6130 | bool S3FanoutManager::VerifyAndFinalize(const int curl_error, JobInfo *info) { | |
1057 | 6130 | LogCvmfs(kLogS3Fanout, kLogDebug, | |
1058 | "Verify uploaded/tested object %s " | ||
1059 | "(curl error %d, info error %d, info request %d)", | ||
1060 | 6130 | info->object_key.c_str(), curl_error, info->error_code, | |
1061 | 6130 | info->request); | |
1062 | 6130 | UpdateStatistics(info->curl_handle); | |
1063 | |||
1064 | // Verification and error classification | ||
1065 |
1/6✓ Branch 0 taken 6130 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
|
6130 | switch (curl_error) { |
1066 | 6130 | case CURLE_OK: | |
1067 |
2/2✓ Branch 0 taken 6110 times.
✓ Branch 1 taken 20 times.
|
6130 | if ((info->error_code != kFailRetry) |
1068 |
2/2✓ Branch 0 taken 3060 times.
✓ Branch 1 taken 3050 times.
|
6110 | && (info->error_code != kFailNotFound)) { |
1069 | 3060 | info->error_code = kFailOk; | |
1070 | } | ||
1071 | 6130 | break; | |
1072 | ✗ | case CURLE_UNSUPPORTED_PROTOCOL: | |
1073 | case CURLE_URL_MALFORMAT: | ||
1074 | ✗ | info->error_code = kFailBadRequest; | |
1075 | ✗ | break; | |
1076 | ✗ | case CURLE_COULDNT_RESOLVE_HOST: | |
1077 | ✗ | info->error_code = kFailHostResolve; | |
1078 | ✗ | break; | |
1079 | ✗ | case CURLE_COULDNT_CONNECT: | |
1080 | case CURLE_OPERATION_TIMEDOUT: | ||
1081 | case CURLE_SEND_ERROR: | ||
1082 | case CURLE_RECV_ERROR: | ||
1083 | ✗ | info->error_code = kFailHostConnection; | |
1084 | ✗ | break; | |
1085 | ✗ | case CURLE_ABORTED_BY_CALLBACK: | |
1086 | case CURLE_WRITE_ERROR: | ||
1087 | // Error set by callback | ||
1088 | ✗ | break; | |
1089 | ✗ | default: | |
1090 | ✗ | LogCvmfs(kLogS3Fanout, kLogStderr | kLogSyslogErr, | |
1091 | "unexpected curl error (%d) while trying to upload %s: %s", | ||
1092 | curl_error, info->object_key.c_str(), info->errorbuffer); | ||
1093 | ✗ | info->error_code = kFailOther; | |
1094 | ✗ | break; | |
1095 | } | ||
1096 | |||
1097 | // Transform HEAD to PUT request | ||
1098 |
2/2✓ Branch 0 taken 3050 times.
✓ Branch 1 taken 3080 times.
|
6130 | if ((info->error_code == kFailNotFound) |
1099 |
2/2✓ Branch 0 taken 3040 times.
✓ Branch 1 taken 10 times.
|
3050 | && (info->request == JobInfo::kReqHeadPut)) { |
1100 | 3040 | LogCvmfs(kLogS3Fanout, kLogDebug, "not found: %s, uploading", | |
1101 | info->object_key.c_str()); | ||
1102 | 3040 | info->request = JobInfo::kReqPutCas; | |
1103 | 3040 | curl_slist_free_all(info->http_headers); | |
1104 | 3040 | info->http_headers = NULL; | |
1105 | 3040 | const s3fanout::Failures init_failure = InitializeRequest( | |
1106 | info, info->curl_handle); | ||
1107 | |||
1108 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3040 times.
|
3040 | if (init_failure != s3fanout::kFailOk) { |
1109 | ✗ | PANIC(kLogStderr, | |
1110 | "Failed to initialize CURL handle " | ||
1111 | "(error: %d - %s | errno: %d)", | ||
1112 | init_failure, Code2Ascii(init_failure), errno); | ||
1113 | } | ||
1114 | 3040 | SetUrlOptions(info); | |
1115 | // Reset origin | ||
1116 | 3040 | info->origin->Rewind(); | |
1117 | 3040 | return true; // Again, Put | |
1118 | } | ||
1119 | |||
1120 | // Determination if failed request should be repeated | ||
1121 | 3090 | bool try_again = false; | |
1122 |
2/2✓ Branch 0 taken 30 times.
✓ Branch 1 taken 3060 times.
|
3090 | if (info->error_code != kFailOk) { |
1123 | 30 | try_again = CanRetry(info); | |
1124 | } | ||
1125 |
2/2✓ Branch 0 taken 20 times.
✓ Branch 1 taken 3070 times.
|
3090 | if (try_again) { |
1126 |
1/2✓ Branch 0 taken 20 times.
✗ Branch 1 not taken.
|
20 | if (info->request == JobInfo::kReqPutCas |
1127 |
1/2✓ Branch 0 taken 20 times.
✗ Branch 1 not taken.
|
20 | || info->request == JobInfo::kReqPutDotCvmfs |
1128 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 20 times.
|
20 | || info->request == JobInfo::kReqPutHtml) { |
1129 | ✗ | LogCvmfs(kLogS3Fanout, kLogDebug, "Trying again to upload %s", | |
1130 | info->object_key.c_str()); | ||
1131 | // Reset origin | ||
1132 | ✗ | info->origin->Rewind(); | |
1133 | } | ||
1134 | 20 | Backoff(info); | |
1135 | 20 | info->error_code = kFailOk; | |
1136 | 20 | info->http_error = 0; | |
1137 | 20 | info->throttle_ms = 0; | |
1138 | 20 | info->backoff_ms = 0; | |
1139 | 20 | info->throttle_timestamp = 0; | |
1140 | 20 | return true; // try again | |
1141 | } | ||
1142 | |||
1143 | // Cleanup opened resources | ||
1144 | 3070 | info->origin.Destroy(); | |
1145 | |||
1146 |
3/4✓ Branch 0 taken 10 times.
✓ Branch 1 taken 3060 times.
✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
|
3070 | if ((info->error_code != kFailOk) && (info->http_error != 0) |
1147 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
|
10 | && (info->http_error != 404)) { |
1148 | ✗ | LogCvmfs(kLogS3Fanout, kLogStderr, "S3: HTTP failure %d", info->http_error); | |
1149 | } | ||
1150 | 3070 | return false; // stop transfer | |
1151 | } | ||
1152 | |||
1153 |
2/4✓ Branch 3 taken 60 times.
✗ Branch 4 not taken.
✓ Branch 9 taken 60 times.
✗ Branch 10 not taken.
|
60 | S3FanoutManager::S3FanoutManager(const S3Config &config) : config_(config) { |
1154 | 60 | atomic_init32(&multi_threaded_); | |
1155 |
1/2✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
|
60 | MakePipe(pipe_terminate_); |
1156 |
1/2✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
|
60 | MakePipe(pipe_jobs_); |
1157 |
1/2✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
|
60 | MakePipe(pipe_completed_); |
1158 | |||
1159 | int retval; | ||
1160 | 60 | jobs_todo_lock_ = reinterpret_cast<pthread_mutex_t *>( | |
1161 | 60 | smalloc(sizeof(pthread_mutex_t))); | |
1162 | 60 | retval = pthread_mutex_init(jobs_todo_lock_, NULL); | |
1163 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
|
60 | assert(retval == 0); |
1164 | 60 | curl_handle_lock_ = reinterpret_cast<pthread_mutex_t *>( | |
1165 | 60 | smalloc(sizeof(pthread_mutex_t))); | |
1166 | 60 | retval = pthread_mutex_init(curl_handle_lock_, NULL); | |
1167 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
|
60 | assert(retval == 0); |
1168 | |||
1169 |
1/2✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
|
60 | active_requests_ = new set<JobInfo *>; |
1170 |
1/2✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
|
60 | pool_handles_idle_ = new set<CURL *>; |
1171 |
1/2✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
|
60 | pool_handles_inuse_ = new set<CURL *>; |
1172 |
1/2✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
|
60 | curl_sharehandles_ = new map<CURL *, S3FanOutDnsEntry *>; |
1173 |
1/2✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
|
60 | sharehandles_ = new set<S3FanOutDnsEntry *>; |
1174 | 60 | watch_fds_max_ = 4 * config_.pool_max_handles; | |
1175 | 60 | max_available_jobs_ = 4 * config_.pool_max_handles; | |
1176 |
2/4✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 60 times.
✗ Branch 5 not taken.
|
60 | available_jobs_ = new Semaphore(max_available_jobs_); |
1177 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
|
60 | assert(NULL != available_jobs_); |
1178 | |||
1179 |
1/2✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
|
60 | statistics_ = new Statistics(); |
1180 |
1/2✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
|
60 | user_agent_ = new string(); |
1181 |
2/4✓ Branch 2 taken 60 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 60 times.
✗ Branch 6 not taken.
|
60 | *user_agent_ = "User-Agent: cvmfs " + string(CVMFS_VERSION); |
1182 |
1/2✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
|
60 | complete_hostname_ = MkCompleteHostname(); |
1183 | |||
1184 |
1/2✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
|
60 | const CURLcode cretval = curl_global_init(CURL_GLOBAL_ALL); |
1185 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
|
60 | assert(cretval == CURLE_OK); |
1186 |
1/2✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
|
60 | curl_multi_ = curl_multi_init(); |
1187 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
|
60 | assert(curl_multi_ != NULL); |
1188 | CURLMcode mretval; | ||
1189 |
1/2✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
|
60 | mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION, |
1190 | CallbackCurlSocket); | ||
1191 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
|
60 | assert(mretval == CURLM_OK); |
1192 |
1/2✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
|
60 | mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA, |
1193 | static_cast<void *>(this)); | ||
1194 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
|
60 | assert(mretval == CURLM_OK); |
1195 |
1/2✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
|
60 | mretval = curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS, |
1196 | config_.pool_max_handles); | ||
1197 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
|
60 | assert(mretval == CURLM_OK); |
1198 | |||
1199 | 60 | prng_.InitLocaltime(); | |
1200 | |||
1201 | 60 | thread_upload_ = 0; | |
1202 | 60 | timestamp_last_throttle_report_ = 0; | |
1203 | 60 | is_curl_debug_ = (getenv("_CVMFS_CURL_DEBUG") != NULL); | |
1204 | |||
1205 | // Parsing environment variables | ||
1206 | 60 | if ((getenv("CVMFS_IPV4_ONLY") != NULL) | |
1207 |
2/6✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 60 times.
|
60 | && (strlen(getenv("CVMFS_IPV4_ONLY")) > 0)) { |
1208 | ✗ | opt_ipv4_only_ = true; | |
1209 | } else { | ||
1210 | 60 | opt_ipv4_only_ = false; | |
1211 | } | ||
1212 | |||
1213 |
1/2✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
|
60 | resolver_ = dns::CaresResolver::Create(opt_ipv4_only_, 2, 2000); |
1214 | |||
1215 | 60 | watch_fds_ = static_cast<struct pollfd *>(smalloc(4 * sizeof(struct pollfd))); | |
1216 | 60 | watch_fds_size_ = 4; | |
1217 | 60 | watch_fds_inuse_ = 0; | |
1218 | |||
1219 |
1/2✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
|
60 | ssl_certificate_store_.UseSystemCertificatePath(); |
1220 | 60 | } | |
1221 | |||
1222 | 120 | S3FanoutManager::~S3FanoutManager() { | |
1223 | 60 | pthread_mutex_destroy(jobs_todo_lock_); | |
1224 | 60 | free(jobs_todo_lock_); | |
1225 | 60 | pthread_mutex_destroy(curl_handle_lock_); | |
1226 | 60 | free(curl_handle_lock_); | |
1227 | |||
1228 |
1/2✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
|
60 | if (atomic_xadd32(&multi_threaded_, 0) == 1) { |
1229 | // Shutdown I/O thread | ||
1230 | 60 | char buf = 'T'; | |
1231 | 60 | WritePipe(pipe_terminate_[1], &buf, 1); | |
1232 | 60 | pthread_join(thread_upload_, NULL); | |
1233 | } | ||
1234 | 60 | ClosePipe(pipe_terminate_); | |
1235 | 60 | ClosePipe(pipe_jobs_); | |
1236 | 60 | ClosePipe(pipe_completed_); | |
1237 | |||
1238 | 60 | set<CURL *>::iterator i = pool_handles_idle_->begin(); | |
1239 | 60 | const set<CURL *>::const_iterator iEnd = pool_handles_idle_->end(); | |
1240 |
2/2✓ Branch 2 taken 100 times.
✓ Branch 3 taken 60 times.
|
160 | for (; i != iEnd; ++i) { |
1241 | 100 | curl_easy_cleanup(*i); | |
1242 | } | ||
1243 | |||
1244 | 60 | set<S3FanOutDnsEntry *>::iterator is = sharehandles_->begin(); | |
1245 | 60 | const set<S3FanOutDnsEntry *>::const_iterator isEnd = sharehandles_->end(); | |
1246 |
2/2✓ Branch 2 taken 50 times.
✓ Branch 3 taken 60 times.
|
110 | for (; is != isEnd; ++is) { |
1247 | 50 | curl_share_cleanup((*is)->sharehandle); | |
1248 | 50 | curl_slist_free_all((*is)->clist); | |
1249 |
1/2✓ Branch 1 taken 50 times.
✗ Branch 2 not taken.
|
50 | delete *is; |
1250 | } | ||
1251 | 60 | pool_handles_idle_->clear(); | |
1252 | 60 | curl_sharehandles_->clear(); | |
1253 | 60 | sharehandles_->clear(); | |
1254 |
1/2✓ Branch 0 taken 60 times.
✗ Branch 1 not taken.
|
60 | delete active_requests_; |
1255 |
1/2✓ Branch 0 taken 60 times.
✗ Branch 1 not taken.
|
60 | delete pool_handles_idle_; |
1256 |
1/2✓ Branch 0 taken 60 times.
✗ Branch 1 not taken.
|
60 | delete pool_handles_inuse_; |
1257 |
1/2✓ Branch 0 taken 60 times.
✗ Branch 1 not taken.
|
60 | delete curl_sharehandles_; |
1258 |
1/2✓ Branch 0 taken 60 times.
✗ Branch 1 not taken.
|
60 | delete sharehandles_; |
1259 |
1/2✓ Branch 0 taken 60 times.
✗ Branch 1 not taken.
|
60 | delete user_agent_; |
1260 | 60 | curl_multi_cleanup(curl_multi_); | |
1261 | |||
1262 |
1/2✓ Branch 0 taken 60 times.
✗ Branch 1 not taken.
|
60 | delete statistics_; |
1263 | |||
1264 |
1/2✓ Branch 0 taken 60 times.
✗ Branch 1 not taken.
|
60 | delete available_jobs_; |
1265 | |||
1266 | 60 | curl_global_cleanup(); | |
1267 | 60 | } | |
1268 | |||
1269 | /** | ||
1270 | * Spawns the I/O worker thread. No way back except ~S3FanoutManager. | ||
1271 | */ | ||
1272 | 60 | void S3FanoutManager::Spawn() { | |
1273 | 60 | LogCvmfs(kLogS3Fanout, kLogDebug, "S3FanoutManager spawned"); | |
1274 | |||
1275 | 60 | const int retval = pthread_create(&thread_upload_, NULL, MainUpload, | |
1276 | static_cast<void *>(this)); | ||
1277 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
|
60 | assert(retval == 0); |
1278 | |||
1279 | 60 | atomic_inc32(&multi_threaded_); | |
1280 | 60 | } | |
1281 | |||
1282 | 15 | const Statistics &S3FanoutManager::GetStatistics() { return *statistics_; } | |
1283 | |||
1284 | /** | ||
1285 | * Push new job to be uploaded to the S3 cloud storage. | ||
1286 | */ | ||
1287 | 3070 | void S3FanoutManager::PushNewJob(JobInfo *info) { | |
1288 | 3070 | available_jobs_->Increment(); | |
1289 | 3070 | WritePipe(pipe_jobs_[1], &info, sizeof(info)); | |
1290 | 3070 | } | |
1291 | |||
1292 | /** | ||
1293 | * Push completed job to list of completed jobs | ||
1294 | */ | ||
1295 | 3130 | void S3FanoutManager::PushCompletedJob(JobInfo *info) { | |
1296 | 3130 | WritePipe(pipe_completed_[1], &info, sizeof(info)); | |
1297 | 3130 | } | |
1298 | |||
1299 | /** | ||
1300 | * Pop completed job | ||
1301 | */ | ||
1302 | 3130 | JobInfo *S3FanoutManager::PopCompletedJob() { | |
1303 | JobInfo *info; | ||
1304 |
1/2✓ Branch 1 taken 3130 times.
✗ Branch 2 not taken.
|
3130 | ReadPipe(pipe_completed_[0], &info, sizeof(info)); |
1305 | 3130 | return info; | |
1306 | } | ||
1307 | |||
1308 | //------------------------------------------------------------------------------ | ||
1309 | |||
1310 | |||
1311 | ✗ | string Statistics::Print() const { | |
1312 | ✗ | return "Transferred Bytes: " + StringifyInt(uint64_t(transferred_bytes)) | |
1313 | ✗ | + "\n" + "Transfer duration: " + StringifyInt(uint64_t(transfer_time)) | |
1314 | ✗ | + " s\n" + "Number of requests: " + StringifyInt(num_requests) + "\n" | |
1315 | ✗ | + "Number of retries: " + StringifyInt(num_retries) + "\n"; | |
1316 | } | ||
1317 | |||
1318 | } // namespace s3fanout | ||
1319 |