GCC Code Coverage Report | |||||||||||||||||||||
|
|||||||||||||||||||||
Line | Branch | Exec | Source |
1 |
/** |
||
2 |
* This file is part of the CernVM File System. |
||
3 |
* |
||
4 |
* Runs a thread using libcurls asynchronous I/O mode to push data to S3 |
||
5 |
*/ |
||
6 |
|||
7 |
#include <pthread.h> |
||
8 |
|||
9 |
#include <algorithm> |
||
10 |
#include <cerrno> |
||
11 |
#include <utility> |
||
12 |
|||
13 |
#include "cvmfs_config.h" |
||
14 |
#include "platform.h" |
||
15 |
#include "s3fanout.h" |
||
16 |
#include "upload_facility.h" |
||
17 |
#include "util/posix.h" |
||
18 |
#include "util/string.h" |
||
19 |
#include "util_concurrency.h" |
||
20 |
|||
21 |
using namespace std; // NOLINT |
||
22 |
|||
23 |
namespace s3fanout { |
||
24 |
|||
25 |
const char *S3FanoutManager::kCacheControlCas = "Cache-Control: max-age=259200"; |
||
26 |
const char *S3FanoutManager::kCacheControlDotCvmfs = |
||
27 |
"Cache-Control: max-age=61"; |
||
28 |
const unsigned S3FanoutManager::kDefault429ThrottleMs = 250; |
||
29 |
const unsigned S3FanoutManager::kMax429ThrottleMs = 10000; |
||
30 |
const unsigned S3FanoutManager::kThrottleReportIntervalSec = 60; |
||
31 |
|||
32 |
|||
33 |
/** |
||
34 |
* Parses Retry-After and X-Retry-In headers attached to HTTP 429 responses |
||
35 |
*/ |
||
36 |
105 |
void S3FanoutManager::DetectThrottleIndicator( |
|
37 |
const std::string &header, |
||
38 |
JobInfo *info) |
||
39 |
{ |
||
40 |
105 |
std::string value_str; |
|
41 |
✓✗✗✓ |
105 |
if (HasPrefix(header, "retry-after:", true)) |
42 |
52 |
value_str = header.substr(12); |
|
43 |
✓✗✗✓ |
105 |
if (HasPrefix(header, "x-retry-in:", true)) |
44 |
3 |
value_str = header.substr(11); |
|
45 |
|||
46 |
105 |
value_str = Trim(value_str); |
|
47 |
✓✓ | 105 |
if (!value_str.empty()) { |
48 |
53 |
unsigned value_ms = String2Uint64(value_str) * 1000; |
|
49 |
✓✓ | 53 |
if (value_ms > 0) |
50 |
52 |
info->throttle_ms = std::min(value_ms, kMax429ThrottleMs); |
|
51 |
} |
||
52 |
105 |
} |
|
53 |
|||
54 |
|||
55 |
/** |
||
56 |
* Called by curl for every HTTP header. Not called for file:// transfers. |
||
57 |
*/ |
||
58 |
12063 |
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, |
|
59 |
void *info_link) { |
||
60 |
12063 |
const size_t num_bytes = size*nmemb; |
|
61 |
12063 |
const string header_line(static_cast<const char *>(ptr), num_bytes); |
|
62 |
12063 |
JobInfo *info = static_cast<JobInfo *>(info_link); |
|
63 |
|||
64 |
// Check for http status code errors |
||
65 |
✓✗✗✓ |
12063 |
if (HasPrefix(header_line, "HTTP/1.", false)) { |
66 |
✗✓ | 6005 |
if (header_line.length() < 10) |
67 |
return 0; |
||
68 |
|||
69 |
unsigned i; |
||
70 |
✓✗✓✓ ✓✓ |
6005 |
for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {} |
71 |
|||
72 |
✓✓ | 6005 |
if (header_line[i] == '2') { |
73 |
2981 |
return num_bytes; |
|
74 |
} else { |
||
75 |
LogCvmfs(kLogS3Fanout, kLogDebug, "http status error code: %s", |
||
76 |
3024 |
header_line.c_str()); |
|
77 |
✗✓ | 3024 |
if (header_line.length() < i+3) { |
78 |
LogCvmfs(kLogS3Fanout, kLogStderr, "S3: invalid HTTP response '%s'", |
||
79 |
header_line.c_str()); |
||
80 |
info->error_code = kFailOther; |
||
81 |
return 0; |
||
82 |
} |
||
83 |
3024 |
info->http_error = String2Int64(string(&header_line[i], 3)); |
|
84 |
|||
85 |
✓✗✗✗ ✓✗ |
3024 |
switch (info->http_error) { |
86 |
case 429: |
||
87 |
48 |
info->error_code = kFailRetry; |
|
88 |
48 |
info->throttle_ms = S3FanoutManager::kDefault429ThrottleMs; |
|
89 |
48 |
info->throttle_timestamp = platform_monotonic_time(); |
|
90 |
48 |
return num_bytes; |
|
91 |
case 503: |
||
92 |
case 502: // Can happen if the S3 gateway-backend connection breaks |
||
93 |
info->error_code = kFailServiceUnavailable; |
||
94 |
break; |
||
95 |
case 501: |
||
96 |
case 400: |
||
97 |
info->error_code = kFailBadRequest; |
||
98 |
break; |
||
99 |
case 403: |
||
100 |
info->error_code = kFailForbidden; |
||
101 |
break; |
||
102 |
case 404: |
||
103 |
2976 |
info->error_code = kFailNotFound; |
|
104 |
2976 |
break; |
|
105 |
default: |
||
106 |
info->error_code = kFailOther; |
||
107 |
} |
||
108 |
2976 |
return 0; |
|
109 |
} |
||
110 |
} |
||
111 |
|||
112 |
✓✓ | 6058 |
if (info->error_code == kFailRetry) { |
113 |
96 |
S3FanoutManager::DetectThrottleIndicator(header_line, info); |
|
114 |
} |
||
115 |
|||
116 |
6058 |
return num_bytes; |
|
117 |
} |
||
118 |
|||
119 |
|||
120 |
/** |
||
121 |
* Called by curl for every new chunk to upload. |
||
122 |
*/ |
||
123 |
93233 |
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, |
|
124 |
void *info_link) { |
||
125 |
93233 |
const size_t num_bytes = size*nmemb; |
|
126 |
93233 |
JobInfo *info = static_cast<JobInfo *>(info_link); |
|
127 |
|||
128 |
// In case of 429, we potentially need to read all the headers so we can |
||
129 |
// only abort the transfer in the (probably empty) data section |
||
130 |
✗✓ | 93233 |
if (info->error_code == kFailRetry) |
131 |
return 0; |
||
132 |
|||
133 |
93233 |
LogCvmfs(kLogS3Fanout, kLogDebug, "Data callback with %d bytes", num_bytes); |
|
134 |
|||
135 |
✗✓ | 93233 |
if (num_bytes == 0) |
136 |
return 0; |
||
137 |
|||
138 |
✓✓ | 93233 |
if (info->origin == kOriginMem) { |
139 |
28858 |
const size_t avail_bytes = info->origin_mem.size - info->origin_mem.pos; |
|
140 |
✓✓ | 28858 |
const size_t send_size = avail_bytes < num_bytes ? avail_bytes : num_bytes; |
141 |
28858 |
memcpy(ptr, info->origin_mem.data + info->origin_mem.pos, send_size); |
|
142 |
28858 |
info->origin_mem.pos += send_size; |
|
143 |
28858 |
LogCvmfs(kLogS3Fanout, kLogDebug, "mem pushed out %d bytes", send_size); |
|
144 |
28858 |
return send_size; |
|
145 |
✓✗ | 64375 |
} else if (info->origin == kOriginPath) { |
146 |
64375 |
size_t read_bytes = fread(ptr, 1, num_bytes, info->origin_file); |
|
147 |
✓✓ | 64375 |
if (read_bytes != num_bytes) { |
148 |
✗✓ | 695 |
if (ferror(info->origin_file) != 0) { |
149 |
LogCvmfs(kLogS3Fanout, kLogStderr, "local I/O error reading %s", |
||
150 |
info->origin_path.c_str()); |
||
151 |
return CURL_READFUNC_ABORT; |
||
152 |
} |
||
153 |
} |
||
154 |
64375 |
LogCvmfs(kLogS3Fanout, kLogDebug, "file pushed out %d bytes", read_bytes); |
|
155 |
64375 |
return read_bytes; |
|
156 |
} |
||
157 |
|||
158 |
return CURL_READFUNC_ABORT; |
||
159 |
} |
||
160 |
|||
161 |
|||
162 |
/** |
||
163 |
* Called when new curl sockets arrive or existing curl sockets depart. |
||
164 |
*/ |
||
165 |
13408 |
int S3FanoutManager::CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, |
|
166 |
void *userp, void *socketp) { |
||
167 |
13408 |
S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(userp); |
|
168 |
13408 |
const int ajobs = *s3fanout_mgr->available_jobs_; |
|
169 |
LogCvmfs(kLogS3Fanout, kLogDebug, "CallbackCurlSocket called with easy " |
||
170 |
"handle %p, socket %d, action %d, up %d, " |
||
171 |
"sp %d, fds_inuse %d, jobs %d", |
||
172 |
easy, s, action, userp, |
||
173 |
13408 |
socketp, s3fanout_mgr->watch_fds_inuse_, ajobs); |
|
174 |
✗✓ | 13408 |
if (action == CURL_POLL_NONE) |
175 |
return 0; |
||
176 |
|||
177 |
// Find s in watch_fds_ |
||
178 |
unsigned index; |
||
179 |
✓✓ | 90745 |
for (index = 0; index < s3fanout_mgr->watch_fds_inuse_; ++index) { |
180 |
✓✓ | 84824 |
if (s3fanout_mgr->watch_fds_[index].fd == s) |
181 |
7487 |
break; |
|
182 |
} |
||
183 |
// Or create newly |
||
184 |
✓✓ | 13408 |
if (index == s3fanout_mgr->watch_fds_inuse_) { |
185 |
// Extend array if necessary |
||
186 |
✓✓ | 5921 |
if (s3fanout_mgr->watch_fds_inuse_ == s3fanout_mgr->watch_fds_size_) { |
187 |
39 |
s3fanout_mgr->watch_fds_size_ *= 2; |
|
188 |
s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>( |
||
189 |
srealloc(s3fanout_mgr->watch_fds_, |
||
190 |
39 |
s3fanout_mgr->watch_fds_size_*sizeof(struct pollfd))); |
|
191 |
} |
||
192 |
5921 |
s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].fd = s; |
|
193 |
5921 |
s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].events = 0; |
|
194 |
5921 |
s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].revents = 0; |
|
195 |
5921 |
s3fanout_mgr->watch_fds_inuse_++; |
|
196 |
} |
||
197 |
|||
198 |
✓✓✓✓ ✗ |
13408 |
switch (action) { |
199 |
case CURL_POLL_IN: |
||
200 |
5921 |
s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI; |
|
201 |
5921 |
break; |
|
202 |
case CURL_POLL_OUT: |
||
203 |
21 |
s3fanout_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND; |
|
204 |
21 |
break; |
|
205 |
case CURL_POLL_INOUT: |
||
206 |
1545 |
s3fanout_mgr->watch_fds_[index].events = |
|
207 |
1545 |
POLLIN | POLLPRI | POLLOUT | POLLWRBAND; |
|
208 |
1545 |
break; |
|
209 |
case CURL_POLL_REMOVE: |
||
210 |
✓✓ | 5921 |
if (index < s3fanout_mgr->watch_fds_inuse_-1) |
211 |
s3fanout_mgr->watch_fds_[index] = |
||
212 |
5390 |
s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_-1]; |
|
213 |
5921 |
s3fanout_mgr->watch_fds_inuse_--; |
|
214 |
// Shrink array if necessary |
||
215 |
✗✓✗✗ |
5921 |
if ((s3fanout_mgr->watch_fds_inuse_ > s3fanout_mgr->watch_fds_max_) && |
216 |
(s3fanout_mgr->watch_fds_inuse_ < s3fanout_mgr->watch_fds_size_/2)) { |
||
217 |
s3fanout_mgr->watch_fds_size_ /= 2; |
||
218 |
s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>( |
||
219 |
srealloc(s3fanout_mgr->watch_fds_, |
||
220 |
s3fanout_mgr->watch_fds_size_*sizeof(struct pollfd))); |
||
221 |
} |
||
222 |
break; |
||
223 |
default: |
||
224 |
break; |
||
225 |
} |
||
226 |
|||
227 |
13408 |
return 0; |
|
228 |
} |
||
229 |
|||
230 |
|||
231 |
/** |
||
232 |
* Worker thread event loop. |
||
233 |
*/ |
||
234 |
99 |
void *S3FanoutManager::MainUpload(void *data) { |
|
235 |
99 |
LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread started"); |
|
236 |
99 |
S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(data); |
|
237 |
|||
238 |
// Don't schedule more jobs into the multi handle than the maximum number of |
||
239 |
// parallel connections. This should prevent starvation and thus a timeout |
||
240 |
// of the authorization header (CVM-1339). |
||
241 |
99 |
unsigned jobs_in_flight = 0; |
|
242 |
|||
243 |
✓✓ | 196024 |
while (s3fanout_mgr->thread_upload_run_) { |
244 |
195826 |
JobInfo *info = NULL; |
|
245 |
195826 |
pthread_mutex_lock(s3fanout_mgr->jobs_todo_lock_); |
|
246 |
✓✓✓✓ ✓✓ |
195826 |
if (!s3fanout_mgr->jobs_todo_.empty() && |
247 |
(jobs_in_flight < s3fanout_mgr->pool_max_handles_)) |
||
248 |
{ |
||
249 |
2966 |
info = s3fanout_mgr->jobs_todo_.back(); |
|
250 |
2966 |
s3fanout_mgr->jobs_todo_.pop_back(); |
|
251 |
} |
||
252 |
195826 |
pthread_mutex_unlock(s3fanout_mgr->jobs_todo_lock_); |
|
253 |
|||
254 |
✓✓ | 195826 |
if (info != NULL) { |
255 |
2966 |
CURL *handle = s3fanout_mgr->AcquireCurlHandle(); |
|
256 |
✗✓ | 2966 |
if (handle == NULL) { |
257 |
LogCvmfs(kLogS3Fanout, kLogStderr, "Failed to acquire CURL handle."); |
||
258 |
assert(handle != NULL); |
||
259 |
} |
||
260 |
|||
261 |
s3fanout::Failures init_failure = |
||
262 |
2966 |
s3fanout_mgr->InitializeRequest(info, handle); |
|
263 |
✗✓ | 2966 |
if (init_failure != s3fanout::kFailOk) { |
264 |
LogCvmfs(kLogS3Fanout, kLogStderr, |
||
265 |
"Failed to initialize CURL handle (error: %d - %s | errno: %d)", |
||
266 |
init_failure, Code2Ascii(init_failure), errno); |
||
267 |
abort(); |
||
268 |
} |
||
269 |
2966 |
s3fanout_mgr->SetUrlOptions(info); |
|
270 |
|||
271 |
2966 |
curl_multi_add_handle(s3fanout_mgr->curl_multi_, handle); |
|
272 |
2966 |
jobs_in_flight++; |
|
273 |
2966 |
int still_running = 0, retval = 0; |
|
274 |
retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_, |
||
275 |
CURL_SOCKET_TIMEOUT, |
||
276 |
0, |
||
277 |
2966 |
&still_running); |
|
278 |
|||
279 |
LogCvmfs(kLogS3Fanout, kLogDebug, |
||
280 |
"curl_multi_socket_action: %d - %d", |
||
281 |
2966 |
retval, still_running); |
|
282 |
} |
||
283 |
|||
284 |
// Check events with 1ms timeout |
||
285 |
195826 |
int timeout = 1; |
|
286 |
int retval = poll(s3fanout_mgr->watch_fds_, s3fanout_mgr->watch_fds_inuse_, |
||
287 |
195826 |
timeout); |
|
288 |
✓✓ | 195826 |
if (retval == 0) { |
289 |
// Handle timeout |
||
290 |
123279 |
int still_running = 0; |
|
291 |
retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_, |
||
292 |
CURL_SOCKET_TIMEOUT, |
||
293 |
0, |
||
294 |
123279 |
&still_running); |
|
295 |
✗✓ | 123279 |
if (retval != CURLM_OK) { |
296 |
LogCvmfs(kLogS3Fanout, kLogStderr, "Error, timeout due to: %d", retval); |
||
297 |
assert(retval == CURLM_OK); |
||
298 |
} |
||
299 |
✗✓ | 72547 |
} else if (retval < 0) { |
300 |
assert(errno == EINTR); |
||
301 |
continue; |
||
302 |
} |
||
303 |
|||
304 |
// Activity on curl sockets |
||
305 |
✓✓ | 1274900 |
for (unsigned i = 0; i < s3fanout_mgr->watch_fds_inuse_; ++i) { |
306 |
✓✓ | 1079074 |
if (s3fanout_mgr->watch_fds_[i].revents) { |
307 |
98183 |
int ev_bitmask = 0; |
|
308 |
✓✓ | 98183 |
if (s3fanout_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI)) |
309 |
8750 |
ev_bitmask |= CURL_CSELECT_IN; |
|
310 |
✓✓ | 98183 |
if (s3fanout_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND)) |
311 |
89433 |
ev_bitmask |= CURL_CSELECT_OUT; |
|
312 |
✓✓ | 98183 |
if (s3fanout_mgr->watch_fds_[i].revents & |
313 |
(POLLERR | POLLHUP | POLLNVAL)) |
||
314 |
4 |
ev_bitmask |= CURL_CSELECT_ERR; |
|
315 |
98183 |
s3fanout_mgr->watch_fds_[i].revents = 0; |
|
316 |
|||
317 |
98183 |
int still_running = 0; |
|
318 |
retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_, |
||
319 |
98183 |
s3fanout_mgr->watch_fds_[i].fd, |
|
320 |
ev_bitmask, |
||
321 |
98183 |
&still_running); |
|
322 |
} |
||
323 |
} |
||
324 |
|||
325 |
// Check if transfers are completed |
||
326 |
CURLMsg *curl_msg; |
||
327 |
int msgs_in_queue; |
||
328 |
✓✓ | 397631 |
while ((curl_msg = curl_multi_info_read(s3fanout_mgr->curl_multi_, |
329 |
&msgs_in_queue))) { |
||
330 |
✓✗ | 5979 |
if (curl_msg->msg == CURLMSG_DONE) { |
331 |
5979 |
s3fanout_mgr->statistics_->num_requests++; |
|
332 |
JobInfo *info; |
||
333 |
5979 |
CURL *easy_handle = curl_msg->easy_handle; |
|
334 |
5979 |
int curl_error = curl_msg->data.result; |
|
335 |
5979 |
curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info); |
|
336 |
|||
337 |
5979 |
curl_multi_remove_handle(s3fanout_mgr->curl_multi_, easy_handle); |
|
338 |
✓✓ | 5979 |
if (s3fanout_mgr->VerifyAndFinalize(curl_error, info)) { |
339 |
3013 |
curl_multi_add_handle(s3fanout_mgr->curl_multi_, easy_handle); |
|
340 |
3013 |
int still_running = 0; |
|
341 |
curl_multi_socket_action(s3fanout_mgr->curl_multi_, |
||
342 |
CURL_SOCKET_TIMEOUT, |
||
343 |
0, |
||
344 |
3013 |
&still_running); |
|
345 |
} else { |
||
346 |
// Return easy handle into pool and write result back |
||
347 |
2966 |
jobs_in_flight--; |
|
348 |
2966 |
s3fanout_mgr->ReleaseCurlHandle(info, easy_handle); |
|
349 |
2966 |
s3fanout_mgr->available_jobs_->Decrement(); |
|
350 |
|||
351 |
2966 |
pthread_mutex_lock(s3fanout_mgr->jobs_completed_lock_); |
|
352 |
2966 |
s3fanout_mgr->jobs_completed_.push_back(info); |
|
353 |
2966 |
pthread_mutex_unlock(s3fanout_mgr->jobs_completed_lock_); |
|
354 |
} |
||
355 |
} |
||
356 |
} |
||
357 |
} |
||
358 |
|||
359 |
99 |
set<CURL *>::iterator i = s3fanout_mgr->pool_handles_inuse_->begin(); |
|
360 |
const set<CURL *>::const_iterator i_end = |
||
361 |
99 |
s3fanout_mgr->pool_handles_inuse_->end(); |
|
362 |
✗✓ | 99 |
for (; i != i_end; ++i) { |
363 |
curl_multi_remove_handle(s3fanout_mgr->curl_multi_, *i); |
||
364 |
curl_easy_cleanup(*i); |
||
365 |
} |
||
366 |
99 |
s3fanout_mgr->pool_handles_inuse_->clear(); |
|
367 |
99 |
free(s3fanout_mgr->watch_fds_); |
|
368 |
|||
369 |
99 |
LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread terminated"); |
|
370 |
99 |
return NULL; |
|
371 |
} |
||
372 |
|||
373 |
|||
374 |
/** |
||
375 |
* Gets an idle CURL handle from the pool. Creates a new one and adds it to |
||
376 |
* the pool if necessary. |
||
377 |
*/ |
||
378 |
3095 |
CURL *S3FanoutManager::AcquireCurlHandle() const { |
|
379 |
CURL *handle; |
||
380 |
|||
381 |
3095 |
MutexLockGuard guard(curl_handle_lock_); |
|
382 |
|||
383 |
✓✓ | 3095 |
if (pool_handles_idle_->empty()) { |
384 |
CURLcode retval; |
||
385 |
|||
386 |
// Create a new handle |
||
387 |
216 |
handle = curl_easy_init(); |
|
388 |
✗✓ | 216 |
assert(handle != NULL); |
389 |
|||
390 |
// Other settings |
||
391 |
216 |
retval = curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1); |
|
392 |
✗✓ | 216 |
assert(retval == CURLE_OK); |
393 |
retval = curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION, |
||
394 |
216 |
CallbackCurlHeader); |
|
395 |
✗✓ | 216 |
assert(retval == CURLE_OK); |
396 |
216 |
retval = curl_easy_setopt(handle, CURLOPT_READFUNCTION, CallbackCurlData); |
|
397 |
✗✓ | 216 |
assert(retval == CURLE_OK); |
398 |
} else { |
||
399 |
2879 |
handle = *(pool_handles_idle_->begin()); |
|
400 |
2879 |
pool_handles_idle_->erase(pool_handles_idle_->begin()); |
|
401 |
} |
||
402 |
|||
403 |
3095 |
pool_handles_inuse_->insert(handle); |
|
404 |
|||
405 |
3095 |
return handle; |
|
406 |
} |
||
407 |
|||
408 |
|||
409 |
3095 |
void S3FanoutManager::ReleaseCurlHandle(JobInfo *info, CURL *handle) const { |
|
410 |
✓✗ | 3095 |
if (info->http_headers) { |
411 |
3095 |
curl_slist_free_all(info->http_headers); |
|
412 |
3095 |
info->http_headers = NULL; |
|
413 |
} |
||
414 |
|||
415 |
3095 |
MutexLockGuard guard(curl_handle_lock_); |
|
416 |
|||
417 |
3095 |
set<CURL *>::iterator elem = pool_handles_inuse_->find(handle); |
|
418 |
✗✓ | 3095 |
assert(elem != pool_handles_inuse_->end()); |
419 |
|||
420 |
✗✓ | 3095 |
if (pool_handles_idle_->size() > pool_max_handles_) { |
421 |
CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, NULL); |
||
422 |
assert(retval == CURLE_OK); |
||
423 |
curl_easy_cleanup(handle); |
||
424 |
std::map<CURL *, S3FanOutDnsEntry *>::size_type retitems = |
||
425 |
curl_sharehandles_->erase(handle); |
||
426 |
assert(retitems == 1); |
||
427 |
} else { |
||
428 |
3095 |
pool_handles_idle_->insert(handle); |
|
429 |
} |
||
430 |
|||
431 |
3095 |
pool_handles_inuse_->erase(elem); |
|
432 |
3095 |
} |
|
433 |
|||
434 |
|||
435 |
/** |
||
436 |
* The Amazon AWS 2 authorization header according to |
||
437 |
* http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html#ConstructingTheAuthenticationHeader |
||
438 |
*/ |
||
439 |
6056 |
bool S3FanoutManager::MkV2Authz(const JobInfo &info, vector<string> *headers) |
|
440 |
const |
||
441 |
{ |
||
442 |
6056 |
string payload_hash; |
|
443 |
6056 |
bool retval = MkPayloadHash(info, &payload_hash); |
|
444 |
✗✓ | 6056 |
if (!retval) |
445 |
return false; |
||
446 |
6056 |
string content_type = GetContentType(info); |
|
447 |
6056 |
string request = GetRequestString(info); |
|
448 |
|||
449 |
6056 |
string timestamp = RfcTimestamp(); |
|
450 |
string to_sign = request + "\n" + |
||
451 |
payload_hash + "\n" + |
||
452 |
content_type + "\n" + |
||
453 |
timestamp + "\n" + |
||
454 |
"x-amz-acl:public-read" + "\n" + // default ACL |
||
455 |
6056 |
"/" + info.bucket + "/" + info.object_key; |
|
456 |
LogCvmfs(kLogS3Fanout, kLogDebug, "%s string to sign for: %s", |
||
457 |
6056 |
request.c_str(), info.object_key.c_str()); |
|
458 |
|||
459 |
6056 |
shash::Any hmac; |
|
460 |
6056 |
hmac.algorithm = shash::kSha1; |
|
461 |
shash::Hmac(info.secret_key, |
||
462 |
reinterpret_cast<const unsigned char *>(to_sign.data()), |
||
463 |
6056 |
to_sign.length(), &hmac); |
|
464 |
|||
465 |
headers->push_back("Authorization: AWS " + info.access_key + ":" + |
||
466 |
Base64(string(reinterpret_cast<char *>(hmac.digest), |
||
467 |
6056 |
hmac.GetDigestSize()))); |
|
468 |
6056 |
headers->push_back("Date: " + timestamp); |
|
469 |
6056 |
headers->push_back("x-amz-acl: public-read"); |
|
470 |
✓✓ | 6056 |
if (!payload_hash.empty()) |
471 |
2961 |
headers->push_back("Content-MD5: " + payload_hash); |
|
472 |
✓✓ | 6056 |
if (!content_type.empty()) |
473 |
2961 |
headers->push_back("Content-Type: " + content_type); |
|
474 |
6056 |
return true; |
|
475 |
} |
||
476 |
|||
477 |
|||
478 |
string S3FanoutManager::GetUriEncode(const string &val, bool encode_slash) |
||
479 |
const |
||
480 |
{ |
||
481 |
string result; |
||
482 |
const unsigned len = val.length(); |
||
483 |
result.reserve(len); |
||
484 |
for (unsigned i = 0; i < len; ++i) { |
||
485 |
char c = val[i]; |
||
486 |
if ((c >= 'A' && c <= 'Z') || |
||
487 |
(c >= 'a' && c <= 'z') || |
||
488 |
(c >= '0' && c <= '9') || |
||
489 |
c == '_' || c == '-' || c == '~' || c == '.') |
||
490 |
{ |
||
491 |
result.push_back(c); |
||
492 |
} else if (c == '/') { |
||
493 |
if (encode_slash) { |
||
494 |
result += "%2F"; |
||
495 |
} else { |
||
496 |
result.push_back(c); |
||
497 |
} |
||
498 |
} else { |
||
499 |
result.push_back('%'); |
||
500 |
result.push_back((c / 16) + ((c / 16 <= 9) ? '0' : 'A'-10)); |
||
501 |
result.push_back((c % 16) + ((c % 16 <= 9) ? '0' : 'A'-10)); |
||
502 |
} |
||
503 |
} |
||
504 |
return result; |
||
505 |
} |
||
506 |
|||
507 |
|||
508 |
string S3FanoutManager::GetAwsV4SigningKey( |
||
509 |
const JobInfo &info, |
||
510 |
const string &date) const |
||
511 |
{ |
||
512 |
string id = info.secret_key + info.region + date; |
||
513 |
map<string, string>::const_iterator iter = signing_keys_.find(id); |
||
514 |
if (iter != signing_keys_.end()) |
||
515 |
return iter->second; |
||
516 |
|||
517 |
string date_key = shash::Hmac256("AWS4" + info.secret_key, date, true); |
||
518 |
string date_region_key = shash::Hmac256(date_key, info.region, true); |
||
519 |
string date_region_service_key = shash::Hmac256(date_region_key, "s3", true); |
||
520 |
string signing_key = |
||
521 |
shash::Hmac256(date_region_service_key, "aws4_request", true); |
||
522 |
signing_keys_[id] = signing_key; |
||
523 |
return signing_key; |
||
524 |
} |
||
525 |
|||
526 |
|||
527 |
/** |
||
528 |
* The Amazon AWS4 authorization header according to |
||
529 |
* http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-auth-using-authorization-header.html |
||
530 |
*/ |
||
531 |
bool S3FanoutManager::MkV4Authz(const JobInfo &info, vector<string> *headers) |
||
532 |
const |
||
533 |
{ |
||
534 |
string payload_hash; |
||
535 |
bool retval = MkPayloadHash(info, &payload_hash); |
||
536 |
if (!retval) |
||
537 |
return false; |
||
538 |
string content_type = GetContentType(info); |
||
539 |
string timestamp = IsoTimestamp(); |
||
540 |
string date = timestamp.substr(0, 8); |
||
541 |
vector<string> tokens = SplitString(info.hostname, ':'); |
||
542 |
string host_only = tokens[0]; |
||
543 |
|||
544 |
string signed_headers; |
||
545 |
string canonical_headers; |
||
546 |
if (!content_type.empty()) { |
||
547 |
signed_headers += "content-type;"; |
||
548 |
headers->push_back("Content-Type: " + content_type); |
||
549 |
canonical_headers += "content-type:" + content_type + "\n"; |
||
550 |
} |
||
551 |
signed_headers += "host;x-amz-acl;x-amz-content-sha256;x-amz-date"; |
||
552 |
canonical_headers += |
||
553 |
"host:" + host_only + "\n" + |
||
554 |
"x-amz-acl:public-read\n" |
||
555 |
"x-amz-content-sha256:" + payload_hash + "\n" + |
||
556 |
"x-amz-date:" + timestamp + "\n"; |
||
557 |
|||
558 |
string scope = date + "/" + info.region + "/s3/aws4_request"; |
||
559 |
|||
560 |
string canonical_request = |
||
561 |
GetRequestString(info) + "\n" + |
||
562 |
GetUriEncode("/" + info.bucket + "/" + info.object_key, false) + "\n" + |
||
563 |
"\n" + |
||
564 |
canonical_headers + "\n" + |
||
565 |
signed_headers + "\n" + |
||
566 |
payload_hash; |
||
567 |
|||
568 |
string hash_request = shash::Sha256String(canonical_request.c_str()); |
||
569 |
|||
570 |
string string_to_sign = |
||
571 |
"AWS4-HMAC-SHA256\n" + |
||
572 |
timestamp + "\n" + |
||
573 |
scope + "\n" + |
||
574 |
hash_request; |
||
575 |
|||
576 |
string signing_key = GetAwsV4SigningKey(info, date); |
||
577 |
string signature = shash::Hmac256(signing_key, string_to_sign); |
||
578 |
|||
579 |
headers->push_back("x-amz-acl: public-read"); |
||
580 |
headers->push_back("x-amz-content-sha256: " + payload_hash); |
||
581 |
headers->push_back("x-amz-date: " + timestamp); |
||
582 |
headers->push_back( |
||
583 |
"Authorization: AWS4-HMAC-SHA256 " |
||
584 |
"Credential=" + info.access_key + "/" + scope + "," |
||
585 |
"SignedHeaders=" + signed_headers + "," |
||
586 |
"Signature=" + signature); |
||
587 |
return true; |
||
588 |
} |
||
589 |
|||
590 |
|||
591 |
6056 |
void S3FanoutManager::InitializeDnsSettingsCurl( |
|
592 |
CURL *handle, |
||
593 |
CURLSH *sharehandle, |
||
594 |
curl_slist *clist) const |
||
595 |
{ |
||
596 |
6056 |
CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, sharehandle); |
|
597 |
✗✓ | 6056 |
assert(retval == CURLE_OK); |
598 |
6056 |
retval = curl_easy_setopt(handle, CURLOPT_RESOLVE, clist); |
|
599 |
✗✓ | 6056 |
assert(retval == CURLE_OK); |
600 |
6056 |
} |
|
601 |
|||
602 |
|||
603 |
6056 |
int S3FanoutManager::InitializeDnsSettings( |
|
604 |
CURL *handle, |
||
605 |
std::string host_with_port) const |
||
606 |
{ |
||
607 |
// Use existing handle |
||
608 |
std::map<CURL *, S3FanOutDnsEntry *>::const_iterator it = |
||
609 |
6056 |
curl_sharehandles_->find(handle); |
|
610 |
✓✓ | 6056 |
if (it != curl_sharehandles_->end()) { |
611 |
InitializeDnsSettingsCurl(handle, it->second->sharehandle, |
||
612 |
5840 |
it->second->clist); |
|
613 |
5840 |
return 0; |
|
614 |
} |
||
615 |
|||
616 |
// Remove port number if such exists |
||
617 |
✗✗✗✓ |
216 |
if (!HasPrefix(host_with_port, "http://", false /*ignore_case*/)) |
618 |
216 |
host_with_port = "http://" + host_with_port; |
|
619 |
216 |
std::string remote_host = dns::ExtractHost(host_with_port); |
|
620 |
216 |
std::string remote_port = dns::ExtractPort(host_with_port); |
|
621 |
|||
622 |
// If we have the name already resolved, use the least used IP |
||
623 |
216 |
S3FanOutDnsEntry *useme = NULL; |
|
624 |
216 |
unsigned int usemin = UINT_MAX; |
|
625 |
216 |
std::set<S3FanOutDnsEntry *>::iterator its3 = sharehandles_->begin(); |
|
626 |
✓✓ | 333 |
for (; its3 != sharehandles_->end(); ++its3) { |
627 |
✓✗ | 117 |
if ((*its3)->dns_name == remote_host) { |
628 |
✓✗ | 117 |
if (usemin >= (*its3)->counter) { |
629 |
117 |
usemin = (*its3)->counter; |
|
630 |
117 |
useme = (*its3); |
|
631 |
} |
||
632 |
} |
||
633 |
} |
||
634 |
✓✓ | 216 |
if (useme != NULL) { |
635 |
curl_sharehandles_->insert(std::pair<CURL *, |
||
636 |
117 |
S3FanOutDnsEntry *>(handle, useme)); |
|
637 |
117 |
useme->counter++; |
|
638 |
117 |
InitializeDnsSettingsCurl(handle, useme->sharehandle, useme->clist); |
|
639 |
117 |
return 0; |
|
640 |
} |
||
641 |
|||
642 |
// We need to resolve the hostname |
||
643 |
// TODO(ssheikki): support ipv6 also... if (opt_ipv4_only_) |
||
644 |
99 |
dns::Host host = resolver_->Resolve(remote_host); |
|
645 |
99 |
set<string> ipv4_addresses = host.ipv4_addresses(); |
|
646 |
99 |
std::set<string>::iterator its = ipv4_addresses.begin(); |
|
647 |
99 |
S3FanOutDnsEntry *dnse = NULL; |
|
648 |
✓✓ | 198 |
for ( ; its != ipv4_addresses.end(); ++its) { |
649 |
99 |
dnse = new S3FanOutDnsEntry(); |
|
650 |
99 |
dnse->counter = 0; |
|
651 |
99 |
dnse->dns_name = remote_host; |
|
652 |
✗✓✗✗ ✗✗✓ |
99 |
dnse->port = remote_port.size() == 0 ? "80" : remote_port; |
653 |
99 |
dnse->ip = *its; |
|
654 |
99 |
dnse->clist = NULL; |
|
655 |
dnse->clist = curl_slist_append(dnse->clist, |
||
656 |
(dnse->dns_name+":"+ |
||
657 |
dnse->port+":"+ |
||
658 |
99 |
dnse->ip).c_str()); |
|
659 |
99 |
dnse->sharehandle = curl_share_init(); |
|
660 |
✗✓ | 99 |
assert(dnse->sharehandle != NULL); |
661 |
99 |
CURLSHcode share_retval = curl_share_setopt(dnse->sharehandle, |
|
662 |
CURLSHOPT_SHARE, |
||
663 |
CURL_LOCK_DATA_DNS); |
||
664 |
✗✓ | 99 |
assert(share_retval == CURLSHE_OK); |
665 |
99 |
sharehandles_->insert(dnse); |
|
666 |
} |
||
667 |
✗✓ | 99 |
if (dnse == NULL) { |
668 |
LogCvmfs(kLogS3Fanout, kLogStderr | kLogSyslogErr, |
||
669 |
"Error: DNS resolve failed for address '%s'.", |
||
670 |
remote_host.c_str()); |
||
671 |
assert(dnse != NULL); |
||
672 |
return -1; |
||
673 |
} |
||
674 |
curl_sharehandles_->insert( |
||
675 |
99 |
std::pair<CURL *, S3FanOutDnsEntry *>(handle, dnse)); |
|
676 |
99 |
dnse->counter++; |
|
677 |
99 |
InitializeDnsSettingsCurl(handle, dnse->sharehandle, dnse->clist); |
|
678 |
|||
679 |
99 |
return 0; |
|
680 |
} |
||
681 |
|||
682 |
|||
683 |
6056 |
bool S3FanoutManager::MkPayloadHash(const JobInfo &info, string *hex_hash) |
|
684 |
const |
||
685 |
{ |
||
686 |
✓✓✓✓ |
6056 |
if ((info.request == JobInfo::kReqHead) || |
687 |
(info.request == JobInfo::kReqDelete)) |
||
688 |
{ |
||
689 |
✓✗✗ | 3095 |
switch (info.authz_method) { |
690 |
case kAuthzAwsV2: |
||
691 |
3095 |
hex_hash->clear(); |
|
692 |
3095 |
break; |
|
693 |
case kAuthzAwsV4: |
||
694 |
// Sha256 over empty string |
||
695 |
*hex_hash = |
||
696 |
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; |
||
697 |
break; |
||
698 |
default: |
||
699 |
abort(); |
||
700 |
} |
||
701 |
3095 |
return true; |
|
702 |
} |
||
703 |
|||
704 |
// PUT, there is actually payload |
||
705 |
2961 |
shash::Any payload_hash(shash::kMd5); |
|
706 |
bool retval; |
||
707 |
|||
708 |
✓✓✗ | 2961 |
switch (info.origin) { |
709 |
case kOriginMem: |
||
710 |
✓✗✗ | 907 |
switch (info.authz_method) { |
711 |
case kAuthzAwsV2: |
||
712 |
shash::HashMem(info.origin_mem.data, info.origin_mem.size, |
||
713 |
907 |
&payload_hash); |
|
714 |
*hex_hash = |
||
715 |
Base64(string(reinterpret_cast<char *>(payload_hash.digest), |
||
716 |
907 |
payload_hash.GetDigestSize())); |
|
717 |
907 |
return true; |
|
718 |
case kAuthzAwsV4: |
||
719 |
*hex_hash = |
||
720 |
shash::Sha256Mem(info.origin_mem.data, info.origin_mem.size); |
||
721 |
return true; |
||
722 |
default: |
||
723 |
abort(); |
||
724 |
} |
||
725 |
case kOriginPath: |
||
726 |
✓✗✗ | 2054 |
switch (info.authz_method) { |
727 |
case kAuthzAwsV2: |
||
728 |
2054 |
retval = shash::HashFile(info.origin_path, &payload_hash); |
|
729 |
✗✓ | 2054 |
if (!retval) { |
730 |
LogCvmfs(kLogS3Fanout, kLogStderr, |
||
731 |
"failed to hash file %s (errno: %d)", |
||
732 |
info.origin_path.c_str(), errno); |
||
733 |
return false; |
||
734 |
} |
||
735 |
*hex_hash = |
||
736 |
Base64(string(reinterpret_cast<char *>(payload_hash.digest), |
||
737 |
2054 |
payload_hash.GetDigestSize())); |
|
738 |
2054 |
return true; |
|
739 |
case kAuthzAwsV4: |
||
740 |
*hex_hash = shash::Sha256File(info.origin_path); |
||
741 |
if (hex_hash->empty()) { |
||
742 |
LogCvmfs(kLogS3Fanout, kLogStderr, |
||
743 |
"failed to hash file %s (errno: %d)", |
||
744 |
info.origin_path.c_str(), errno); |
||
745 |
return false; |
||
746 |
} |
||
747 |
return true; |
||
748 |
default: |
||
749 |
abort(); |
||
750 |
} |
||
751 |
default: |
||
752 |
abort(); |
||
753 |
} |
||
754 |
} |
||
755 |
|||
756 |
|||
757 |
6056 |
bool S3FanoutManager::MkPayloadSize(const JobInfo &info, uint64_t *size) const { |
|
758 |
✓✓✓✓ |
6056 |
if ((info.request == JobInfo::kReqHead) || |
759 |
(info.request == JobInfo::kReqDelete)) |
||
760 |
{ |
||
761 |
3095 |
*size = 0; |
|
762 |
3095 |
return true; |
|
763 |
} |
||
764 |
|||
765 |
int64_t file_size; |
||
766 |
✓✓✗ | 2961 |
switch (info.origin) { |
767 |
case kOriginMem: |
||
768 |
907 |
*size = info.origin_mem.size; |
|
769 |
907 |
return true; |
|
770 |
case kOriginPath: |
||
771 |
2054 |
file_size = GetFileSize(info.origin_path); |
|
772 |
✗✓ | 2054 |
if (file_size < 0) { |
773 |
LogCvmfs(kLogS3Fanout, kLogStderr, "failed to stat file %s (errno: %d)", |
||
774 |
info.origin_path.c_str(), errno); |
||
775 |
return false; |
||
776 |
} |
||
777 |
2054 |
*size = file_size; |
|
778 |
2054 |
return true; |
|
779 |
default: |
||
780 |
abort(); |
||
781 |
} |
||
782 |
} |
||
783 |
|||
784 |
|||
785 |
6061 |
string S3FanoutManager::GetRequestString(const JobInfo &info) const { |
|
786 |
✓✓✓✗ |
6061 |
switch (info.request) { |
787 |
case JobInfo::kReqHead: |
||
788 |
3090 |
return "HEAD"; |
|
789 |
case JobInfo::kReqPutCas: |
||
790 |
// fall through |
||
791 |
case JobInfo::kReqPutDotCvmfs: |
||
792 |
2961 |
return "PUT"; |
|
793 |
case JobInfo::kReqDelete: |
||
794 |
10 |
return "DELETE"; |
|
795 |
default: |
||
796 |
abort(); |
||
797 |
} |
||
798 |
} |
||
799 |
|||
800 |
|||
801 |
6056 |
string S3FanoutManager::GetContentType(const JobInfo &info) const { |
|
802 |
✓✓✗✗ |
6056 |
switch (info.request) { |
803 |
case JobInfo::kReqHead: |
||
804 |
// fall through |
||
805 |
case JobInfo::kReqDelete: |
||
806 |
3095 |
return ""; |
|
807 |
case JobInfo::kReqPutCas: |
||
808 |
2961 |
return "application/octet-stream"; |
|
809 |
case JobInfo::kReqPutDotCvmfs: |
||
810 |
return "application/x-cvmfs"; |
||
811 |
default: |
||
812 |
abort(); |
||
813 |
} |
||
814 |
} |
||
815 |
|||
816 |
|||
817 |
/** |
||
818 |
* Request parameters set the URL and other options such as timeout and |
||
819 |
* proxy. |
||
820 |
*/ |
||
821 |
6056 |
Failures S3FanoutManager::InitializeRequest(JobInfo *info, CURL *handle) const { |
|
822 |
// Initialize internal download state |
||
823 |
6056 |
info->curl_handle = handle; |
|
824 |
6056 |
info->error_code = kFailOk; |
|
825 |
6056 |
info->http_error = 0; |
|
826 |
6056 |
info->num_retries = 0; |
|
827 |
6056 |
info->backoff_ms = 0; |
|
828 |
6056 |
info->throttle_ms = 0; |
|
829 |
6056 |
info->throttle_timestamp = 0; |
|
830 |
6056 |
info->http_headers = NULL; |
|
831 |
|||
832 |
6056 |
InitializeDnsSettings(handle, info->hostname); |
|
833 |
|||
834 |
bool retval_b; |
||
835 |
uint64_t payload_size; |
||
836 |
6056 |
retval_b = MkPayloadSize(*info, &payload_size); |
|
837 |
✗✓ | 6056 |
if (!retval_b) |
838 |
return kFailLocalIO; |
||
839 |
|||
840 |
CURLcode retval; |
||
841 |
✓✓✓✓ |
9151 |
if (info->request == JobInfo::kReqHead || |
842 |
info->request == JobInfo::kReqDelete) |
||
843 |
{ |
||
844 |
3095 |
retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 0); |
|
845 |
✗✓ | 3095 |
assert(retval == CURLE_OK); |
846 |
3095 |
retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 1); |
|
847 |
✗✓ | 3095 |
assert(retval == CURLE_OK); |
848 |
info->http_headers = |
||
849 |
3095 |
curl_slist_append(info->http_headers, "Content-Length: 0"); |
|
850 |
|||
851 |
✓✓ | 3095 |
if (info->request == JobInfo::kReqDelete) { |
852 |
retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, |
||
853 |
5 |
GetRequestString(*info).c_str()); |
|
854 |
✗✓ | 5 |
assert(retval == CURLE_OK); |
855 |
} else { |
||
856 |
3090 |
retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL); |
|
857 |
✗✓ | 3090 |
assert(retval == CURLE_OK); |
858 |
} |
||
859 |
} else { |
||
860 |
2961 |
retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL); |
|
861 |
✗✓ | 2961 |
assert(retval == CURLE_OK); |
862 |
2961 |
retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 1); |
|
863 |
✗✓ | 2961 |
assert(retval == CURLE_OK); |
864 |
2961 |
retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 0); |
|
865 |
✗✓ | 2961 |
assert(retval == CURLE_OK); |
866 |
retval = curl_easy_setopt(handle, CURLOPT_INFILESIZE_LARGE, |
||
867 |
2961 |
static_cast<curl_off_t>(payload_size)); |
|
868 |
✗✓ | 2961 |
assert(retval == CURLE_OK); |
869 |
✓✓ | 2961 |
if (info->origin == kOriginPath) { |
870 |
✗✓ | 2054 |
assert(info->origin_file == NULL); |
871 |
2054 |
info->origin_file = fopen(info->origin_path.c_str(), "r"); |
|
872 |
✗✓ | 2054 |
if (info->origin_file == NULL) { |
873 |
LogCvmfs(kLogS3Fanout, kLogStderr, "failed to open file %s (errno: %d)", |
||
874 |
info->origin_path.c_str(), errno); |
||
875 |
return kFailLocalIO; |
||
876 |
} |
||
877 |
} |
||
878 |
|||
879 |
✗✓ | 2961 |
if (info->request == JobInfo::kReqPutDotCvmfs) { |
880 |
info->http_headers = |
||
881 |
curl_slist_append(info->http_headers, kCacheControlDotCvmfs); |
||
882 |
} else { |
||
883 |
info->http_headers = |
||
884 |
2961 |
curl_slist_append(info->http_headers, kCacheControlCas); |
|
885 |
} |
||
886 |
} |
||
887 |
|||
888 |
// Authorization |
||
889 |
6056 |
vector<string> authz_headers; |
|
890 |
✓✗✗ | 6056 |
switch (info->authz_method) { |
891 |
case kAuthzAwsV2: |
||
892 |
6056 |
retval_b = MkV2Authz(*info, &authz_headers); |
|
893 |
6056 |
break; |
|
894 |
case kAuthzAwsV4: |
||
895 |
retval_b = MkV4Authz(*info, &authz_headers); |
||
896 |
break; |
||
897 |
default: |
||
898 |
abort(); |
||
899 |
} |
||
900 |
✗✓ | 6056 |
if (!retval_b) |
901 |
return kFailLocalIO; |
||
902 |
✓✓ | 30146 |
for (unsigned i = 0; i < authz_headers.size(); ++i) { |
903 |
info->http_headers = |
||
904 |
24090 |
curl_slist_append(info->http_headers, authz_headers[i].c_str()); |
|
905 |
} |
||
906 |
|||
907 |
// Common headers |
||
908 |
info->http_headers = |
||
909 |
6056 |
curl_slist_append(info->http_headers, "Connection: Keep-Alive"); |
|
910 |
6056 |
info->http_headers = curl_slist_append(info->http_headers, "Pragma:"); |
|
911 |
// No 100-continue |
||
912 |
6056 |
info->http_headers = curl_slist_append(info->http_headers, "Expect:"); |
|
913 |
// Strip unnecessary header |
||
914 |
6056 |
info->http_headers = curl_slist_append(info->http_headers, "Accept:"); |
|
915 |
info->http_headers = curl_slist_append(info->http_headers, |
||
916 |
6056 |
user_agent_->c_str()); |
|
917 |
|||
918 |
// Set curl parameters |
||
919 |
6056 |
retval = curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info)); |
|
920 |
✗✓ | 6056 |
assert(retval == CURLE_OK); |
921 |
retval = curl_easy_setopt(handle, CURLOPT_WRITEHEADER, |
||
922 |
6056 |
static_cast<void *>(info)); |
|
923 |
✗✓ | 6056 |
assert(retval == CURLE_OK); |
924 |
retval = curl_easy_setopt(handle, CURLOPT_READDATA, |
||
925 |
6056 |
static_cast<void *>(info)); |
|
926 |
✗✓ | 6056 |
assert(retval == CURLE_OK); |
927 |
6056 |
retval = curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->http_headers); |
|
928 |
✗✓ | 6056 |
assert(retval == CURLE_OK); |
929 |
✗✓ | 6056 |
if (opt_ipv4_only_) { |
930 |
retval = curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4); |
||
931 |
assert(retval == CURLE_OK); |
||
932 |
} |
||
933 |
// Follow HTTP redirects |
||
934 |
6056 |
retval = curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L); |
|
935 |
✗✓ | 6056 |
assert(retval == CURLE_OK); |
936 |
|||
937 |
6056 |
return kFailOk; |
|
938 |
} |
||
939 |
|||
940 |
|||
941 |
/** |
||
942 |
* Sets the URL specific options such as host to use and timeout. |
||
943 |
*/ |
||
944 |
6056 |
void S3FanoutManager::SetUrlOptions(JobInfo *info) const { |
|
945 |
6056 |
CURL *curl_handle = info->curl_handle; |
|
946 |
CURLcode retval; |
||
947 |
|||
948 |
6056 |
pthread_mutex_lock(lock_options_); |
|
949 |
6056 |
retval = curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_); |
|
950 |
✗✓ | 6056 |
assert(retval == CURLE_OK); |
951 |
6056 |
pthread_mutex_unlock(lock_options_); |
|
952 |
|||
953 |
6056 |
string url = MkUrl(info->hostname, info->bucket, (info->object_key)); |
|
954 |
6056 |
retval = curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str()); |
|
955 |
✗✓ | 6056 |
assert(retval == CURLE_OK); |
956 |
6056 |
} |
|
957 |
|||
958 |
|||
959 |
/** |
||
960 |
* Adds transfer time and uploaded bytes to the global counters. |
||
961 |
*/ |
||
962 |
5979 |
void S3FanoutManager::UpdateStatistics(CURL *handle) { |
|
963 |
double val; |
||
964 |
|||
965 |
✓✗ | 5979 |
if (curl_easy_getinfo(handle, CURLINFO_SIZE_UPLOAD, &val) == CURLE_OK) |
966 |
5979 |
statistics_->transferred_bytes += val; |
|
967 |
5979 |
} |
|
968 |
|||
969 |
|||
970 |
/** |
||
971 |
* Retry if possible and if not already done too often. |
||
972 |
*/ |
||
973 |
52 |
bool S3FanoutManager::CanRetry(const JobInfo *info) { |
|
974 |
52 |
pthread_mutex_lock(lock_options_); |
|
975 |
52 |
unsigned max_retries = opt_max_retries_; |
|
976 |
52 |
pthread_mutex_unlock(lock_options_); |
|
977 |
|||
978 |
return |
||
979 |
(info->error_code == kFailHostConnection || |
||
980 |
info->error_code == kFailHostResolve || |
||
981 |
info->error_code == kFailServiceUnavailable || |
||
982 |
info->error_code == kFailRetry) && |
||
983 |
✓✓✓✗ ✓✗✓✗ ✓✗ |
52 |
(info->num_retries < max_retries); |
984 |
} |
||
985 |
|||
986 |
|||
987 |
/** |
||
988 |
* Backoff for retry to introduce a jitter into a upload sequence. |
||
989 |
* |
||
990 |
* \return true if backoff has been performed, false otherwise |
||
991 |
*/ |
||
992 |
52 |
void S3FanoutManager::Backoff(JobInfo *info) { |
|
993 |
52 |
pthread_mutex_lock(lock_options_); |
|
994 |
52 |
unsigned backoff_init_ms = opt_backoff_init_ms_; |
|
995 |
52 |
unsigned backoff_max_ms = opt_backoff_max_ms_; |
|
996 |
52 |
pthread_mutex_unlock(lock_options_); |
|
997 |
|||
998 |
✓✓ | 52 |
if (info->error_code != kFailRetry) |
999 |
4 |
info->num_retries++; |
|
1000 |
52 |
statistics_->num_retries++; |
|
1001 |
|||
1002 |
✓✓ | 52 |
if (info->throttle_ms > 0) { |
1003 |
LogCvmfs(kLogS3Fanout, kLogDebug, "throttling for %d ms", |
||
1004 |
48 |
info->throttle_ms); |
|
1005 |
48 |
uint64_t now = platform_monotonic_time(); |
|
1006 |
✓✗ | 48 |
if ((info->throttle_timestamp + (info->throttle_ms / 1000)) > now) { |
1007 |
✓✗ | 48 |
if ((now - timestamp_last_throttle_report_) > kThrottleReportIntervalSec) |
1008 |
{ |
||
1009 |
LogCvmfs(kLogS3Fanout, kLogStdout, |
||
1010 |
48 |
"Warning: S3 backend throttling (%ums)", info->throttle_ms); |
|
1011 |
} |
||
1012 |
48 |
statistics_->ms_throttled += info->throttle_ms; |
|
1013 |
48 |
SafeSleepMs(info->throttle_ms); |
|
1014 |
} |
||
1015 |
} else { |
||
1016 |
✓✗ | 4 |
if (info->backoff_ms == 0) { |
1017 |
4 |
info->backoff_ms = prng_.Next(backoff_init_ms + 1); // Must be != 0 |
|
1018 |
} else { |
||
1019 |
info->backoff_ms *= 2; |
||
1020 |
} |
||
1021 |
✗✓ | 4 |
if (info->backoff_ms > backoff_max_ms) |
1022 |
info->backoff_ms = backoff_max_ms; |
||
1023 |
|||
1024 |
LogCvmfs(kLogS3Fanout, kLogDebug, "backing off for %d ms", |
||
1025 |
4 |
info->backoff_ms); |
|
1026 |
4 |
SafeSleepMs(info->backoff_ms); |
|
1027 |
} |
||
1028 |
52 |
} |
|
1029 |
|||
1030 |
|||
1031 |
/** |
||
1032 |
* Checks the result of a curl request and implements the failure logic |
||
1033 |
* and takes care of cleanup. |
||
1034 |
* |
||
1035 |
* @return true if request should be repeated, false otherwise |
||
1036 |
*/ |
||
1037 |
5979 |
bool S3FanoutManager::VerifyAndFinalize(const int curl_error, JobInfo *info) { |
|
1038 |
LogCvmfs(kLogS3Fanout, kLogDebug, "Verify uploaded/tested object %s " |
||
1039 |
"(curl error %d, info error %d, info request %d)", |
||
1040 |
info->object_key.c_str(), |
||
1041 |
5979 |
curl_error, info->error_code, info->request); |
|
1042 |
5979 |
UpdateStatistics(info->curl_handle); |
|
1043 |
|||
1044 |
// Verification and error classification |
||
1045 |
✓✗✗✓ ✓✗ |
5979 |
switch (curl_error) { |
1046 |
case CURLE_OK: |
||
1047 |
✓✓ | 3014 |
if (info->error_code != kFailRetry) |
1048 |
2966 |
info->error_code = kFailOk; |
|
1049 |
3014 |
break; |
|
1050 |
case CURLE_UNSUPPORTED_PROTOCOL: |
||
1051 |
case CURLE_URL_MALFORMAT: |
||
1052 |
info->error_code = kFailBadRequest; |
||
1053 |
break; |
||
1054 |
case CURLE_COULDNT_RESOLVE_HOST: |
||
1055 |
info->error_code = kFailHostResolve; |
||
1056 |
break; |
||
1057 |
case CURLE_COULDNT_CONNECT: |
||
1058 |
case CURLE_OPERATION_TIMEDOUT: |
||
1059 |
case CURLE_SEND_ERROR: |
||
1060 |
case CURLE_RECV_ERROR: |
||
1061 |
4 |
info->error_code = kFailHostConnection; |
|
1062 |
4 |
break; |
|
1063 |
case CURLE_ABORTED_BY_CALLBACK: |
||
1064 |
case CURLE_WRITE_ERROR: |
||
1065 |
// Error set by callback |
||
1066 |
2961 |
break; |
|
1067 |
default: |
||
1068 |
LogCvmfs(kLogS3Fanout, kLogStderr | kLogSyslogErr, |
||
1069 |
"unexpected curl error (%d) while trying to upload %s", |
||
1070 |
curl_error, info->object_key.c_str()); |
||
1071 |
info->error_code = kFailOther; |
||
1072 |
break; |
||
1073 |
} |
||
1074 |
|||
1075 |
// Transform HEAD to PUT request |
||
1076 |
✓✓✓✗ |
5979 |
if ((info->error_code == kFailNotFound) && |
1077 |
(info->request == JobInfo::kReqHead)) { |
||
1078 |
LogCvmfs(kLogS3Fanout, kLogDebug, "not found: %s, uploading", |
||
1079 |
2961 |
info->object_key.c_str()); |
|
1080 |
2961 |
info->request = JobInfo::kReqPutCas; |
|
1081 |
2961 |
curl_slist_free_all(info->http_headers); |
|
1082 |
2961 |
info->http_headers = NULL; |
|
1083 |
s3fanout::Failures init_failure = InitializeRequest(info, |
||
1084 |
2961 |
info->curl_handle); |
|
1085 |
|||
1086 |
✗✓ | 2961 |
if (init_failure != s3fanout::kFailOk) { |
1087 |
LogCvmfs(kLogS3Fanout, kLogStderr, "Failed to initialize CURL handle " |
||
1088 |
"(error: %d - %s | errno: %d)", |
||
1089 |
init_failure, Code2Ascii(init_failure), errno); |
||
1090 |
abort(); |
||
1091 |
} |
||
1092 |
2961 |
SetUrlOptions(info); |
|
1093 |
// Reset origin |
||
1094 |
✓✓ | 2961 |
if (info->origin == kOriginMem) |
1095 |
907 |
info->origin_mem.pos = 0; |
|
1096 |
✓✓ | 2961 |
if (info->origin == kOriginPath) |
1097 |
2054 |
rewind(info->origin_file); |
|
1098 |
2961 |
return true; // Again, Put |
|
1099 |
} |
||
1100 |
|||
1101 |
// Determination if failed request should be repeated |
||
1102 |
3018 |
bool try_again = false; |
|
1103 |
✓✓ | 3018 |
if (info->error_code != kFailOk) { |
1104 |
52 |
try_again = CanRetry(info); |
|
1105 |
} |
||
1106 |
✓✓ | 3018 |
if (try_again) { |
1107 |
✓✗✗✓ |
52 |
if (info->request == JobInfo::kReqPutCas || |
1108 |
info->request == JobInfo::kReqPutDotCvmfs) { |
||
1109 |
LogCvmfs(kLogS3Fanout, kLogDebug, "Trying again to upload %s", |
||
1110 |
info->object_key.c_str()); |
||
1111 |
// Reset origin |
||
1112 |
if (info->origin == kOriginMem) |
||
1113 |
info->origin_mem.pos = 0; |
||
1114 |
if (info->origin == kOriginPath) { |
||
1115 |
assert(info->origin_file != NULL); |
||
1116 |
rewind(info->origin_file); |
||
1117 |
} |
||
1118 |
} |
||
1119 |
52 |
Backoff(info); |
|
1120 |
52 |
return true; // try again |
|
1121 |
} |
||
1122 |
|||
1123 |
// Cleanup opened resources |
||
1124 |
✓✓ | 2966 |
if (info->origin == kOriginPath) { |
1125 |
✗✓ | 2054 |
assert(info->mmf == NULL); |
1126 |
✓✗ | 2054 |
if (info->origin_file != NULL) { |
1127 |
✗✓ | 2054 |
if (fclose(info->origin_file) != 0) |
1128 |
info->error_code = kFailLocalIO; |
||
1129 |
2054 |
info->origin_file = NULL; |
|
1130 |
} |
||
1131 |
✓✗ | 912 |
} else if (info->origin == kOriginMem) { |
1132 |
✗✓ | 912 |
assert(info->origin_file == NULL); |
1133 |
✓✓ | 912 |
if (info->mmf != NULL) { |
1134 |
907 |
info->mmf->Unmap(); |
|
1135 |
✓✗ | 907 |
delete info->mmf; |
1136 |
907 |
info->mmf = NULL; |
|
1137 |
} |
||
1138 |
} |
||
1139 |
|||
1140 |
✗✓✗✗ ✗✗ |
2966 |
if ((info->error_code != kFailOk) && |
1141 |
(info->http_error != 0) && (info->http_error != 404)) |
||
1142 |
{ |
||
1143 |
LogCvmfs(kLogS3Fanout, kLogStderr, "S3: HTTP failure %d", info->http_error); |
||
1144 |
} |
||
1145 |
2966 |
return false; // stop transfer |
|
1146 |
} |
||
1147 |
|||
1148 |
99 |
S3FanoutManager::S3FanoutManager() { |
|
1149 |
99 |
pool_handles_idle_ = NULL; |
|
1150 |
99 |
pool_handles_inuse_ = NULL; |
|
1151 |
99 |
sharehandles_ = NULL; |
|
1152 |
99 |
curl_sharehandles_ = NULL; |
|
1153 |
99 |
pool_max_handles_ = 0; |
|
1154 |
99 |
curl_multi_ = NULL; |
|
1155 |
99 |
user_agent_ = NULL; |
|
1156 |
|||
1157 |
99 |
dns_buckets_ = true; |
|
1158 |
|||
1159 |
99 |
atomic_init32(&multi_threaded_); |
|
1160 |
99 |
watch_fds_ = NULL; |
|
1161 |
99 |
watch_fds_size_ = 0; |
|
1162 |
99 |
watch_fds_inuse_ = 0; |
|
1163 |
99 |
watch_fds_max_ = 0; |
|
1164 |
|||
1165 |
lock_options_ = |
||
1166 |
99 |
reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t))); |
|
1167 |
99 |
int retval = pthread_mutex_init(lock_options_, NULL); |
|
1168 |
✗✓ | 99 |
assert(retval == 0); |
1169 |
jobs_completed_lock_ = |
||
1170 |
99 |
reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t))); |
|
1171 |
99 |
retval = pthread_mutex_init(jobs_completed_lock_, NULL); |
|
1172 |
✗✓ | 99 |
assert(retval == 0); |
1173 |
jobs_todo_lock_ = |
||
1174 |
99 |
reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t))); |
|
1175 |
99 |
retval = pthread_mutex_init(jobs_todo_lock_, NULL); |
|
1176 |
✗✓ | 99 |
assert(retval == 0); |
1177 |
curl_handle_lock_ = |
||
1178 |
99 |
reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t))); |
|
1179 |
99 |
retval = pthread_mutex_init(curl_handle_lock_, NULL); |
|
1180 |
✗✓ | 99 |
assert(retval == 0); |
1181 |
|||
1182 |
99 |
opt_timeout_ = 0; |
|
1183 |
99 |
opt_max_retries_ = 0; |
|
1184 |
99 |
opt_backoff_init_ms_ = 0; |
|
1185 |
99 |
opt_backoff_max_ms_ = 0; |
|
1186 |
99 |
opt_ipv4_only_ = false; |
|
1187 |
|||
1188 |
99 |
max_available_jobs_ = 0; |
|
1189 |
99 |
thread_upload_ = 0; |
|
1190 |
99 |
thread_upload_run_ = false; |
|
1191 |
99 |
resolver_ = NULL; |
|
1192 |
99 |
available_jobs_ = NULL; |
|
1193 |
99 |
statistics_ = NULL; |
|
1194 |
99 |
timestamp_last_throttle_report_ = 0; |
|
1195 |
99 |
} |
|
1196 |
|||
1197 |
|||
1198 |
99 |
S3FanoutManager::~S3FanoutManager() { |
|
1199 |
99 |
pthread_mutex_destroy(lock_options_); |
|
1200 |
99 |
free(lock_options_); |
|
1201 |
99 |
pthread_mutex_destroy(jobs_completed_lock_); |
|
1202 |
99 |
free(jobs_completed_lock_); |
|
1203 |
99 |
pthread_mutex_destroy(jobs_todo_lock_); |
|
1204 |
99 |
free(jobs_todo_lock_); |
|
1205 |
99 |
pthread_mutex_destroy(curl_handle_lock_); |
|
1206 |
99 |
free(curl_handle_lock_); |
|
1207 |
99 |
} |
|
1208 |
|||
1209 |
99 |
void S3FanoutManager::Init(const unsigned int max_pool_handles, |
|
1210 |
bool dns_buckets) { |
||
1211 |
99 |
atomic_init32(&multi_threaded_); |
|
1212 |
99 |
CURLcode retval = curl_global_init(CURL_GLOBAL_ALL); |
|
1213 |
✗✓ | 99 |
assert(retval == CURLE_OK); |
1214 |
99 |
pool_handles_idle_ = new set<CURL *>; |
|
1215 |
99 |
pool_handles_inuse_ = new set<CURL *>; |
|
1216 |
99 |
curl_sharehandles_ = new map<CURL *, S3FanOutDnsEntry *>; |
|
1217 |
99 |
sharehandles_ = new set<S3FanOutDnsEntry *>; |
|
1218 |
99 |
pool_max_handles_ = max_pool_handles; |
|
1219 |
99 |
watch_fds_max_ = 4 * pool_max_handles_; |
|
1220 |
|||
1221 |
99 |
max_available_jobs_ = 4 * pool_max_handles_; |
|
1222 |
99 |
available_jobs_ = new Semaphore(max_available_jobs_); |
|
1223 |
✗✓ | 99 |
assert(NULL != available_jobs_); |
1224 |
|||
1225 |
99 |
opt_timeout_ = 20; |
|
1226 |
99 |
statistics_ = new Statistics(); |
|
1227 |
99 |
user_agent_ = new string(); |
|
1228 |
99 |
*user_agent_ = "User-Agent: cvmfs " + string(VERSION); |
|
1229 |
|||
1230 |
99 |
dns_buckets_ = dns_buckets; |
|
1231 |
|||
1232 |
99 |
curl_multi_ = curl_multi_init(); |
|
1233 |
✗✓ | 99 |
assert(curl_multi_ != NULL); |
1234 |
CURLMcode mretval; |
||
1235 |
mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION, |
||
1236 |
99 |
CallbackCurlSocket); |
|
1237 |
✗✓ | 99 |
assert(mretval == CURLM_OK); |
1238 |
mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA, |
||
1239 |
99 |
static_cast<void *>(this)); |
|
1240 |
✗✓ | 99 |
assert(mretval == CURLM_OK); |
1241 |
mretval = curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS, |
||
1242 |
99 |
pool_max_handles_); |
|
1243 |
✗✓ | 99 |
assert(mretval == CURLM_OK); |
1244 |
|||
1245 |
99 |
prng_.InitLocaltime(); |
|
1246 |
|||
1247 |
// Parsing environment variables |
||
1248 |
✗✓✗✗ ✗✓ |
99 |
if ((getenv("CVMFS_IPV4_ONLY") != NULL) && |
1249 |
(strlen(getenv("CVMFS_IPV4_ONLY")) > 0)) { |
||
1250 |
opt_ipv4_only_ = true; |
||
1251 |
} |
||
1252 |
|||
1253 |
99 |
watch_fds_ = static_cast<struct pollfd *>(smalloc(2 * sizeof(struct pollfd))); |
|
1254 |
99 |
watch_fds_size_ = 2; |
|
1255 |
99 |
watch_fds_inuse_ = 0; |
|
1256 |
|||
1257 |
99 |
SetRetryParameters(3, 100, 2000); |
|
1258 |
|||
1259 |
99 |
resolver_ = dns::CaresResolver::Create(opt_ipv4_only_, 2, 2000); |
|
1260 |
99 |
} |
|
1261 |
|||
1262 |
99 |
void S3FanoutManager::Fini() { |
|
1263 |
✓✗ | 99 |
if (atomic_xadd32(&multi_threaded_, 0) == 1) { |
1264 |
// Shutdown I/O thread |
||
1265 |
99 |
thread_upload_run_ = false; |
|
1266 |
99 |
pthread_join(thread_upload_, NULL); |
|
1267 |
} |
||
1268 |
|||
1269 |
99 |
set<CURL *>::iterator i = pool_handles_idle_->begin(); |
|
1270 |
99 |
const set<CURL *>::const_iterator iEnd = pool_handles_idle_->end(); |
|
1271 |
✓✓ | 315 |
for (; i != iEnd; ++i) { |
1272 |
216 |
curl_easy_cleanup(*i); |
|
1273 |
} |
||
1274 |
|||
1275 |
99 |
set<S3FanOutDnsEntry *>::iterator is = sharehandles_->begin(); |
|
1276 |
99 |
const set<S3FanOutDnsEntry *>::const_iterator isEnd = sharehandles_->end(); |
|
1277 |
✓✓ | 198 |
for (; is != isEnd; ++is) { |
1278 |
99 |
curl_share_cleanup((*is)->sharehandle); |
|
1279 |
99 |
curl_slist_free_all((*is)->clist); |
|
1280 |
✓✗ | 99 |
delete *is; |
1281 |
} |
||
1282 |
99 |
pool_handles_idle_->clear(); |
|
1283 |
99 |
curl_sharehandles_->clear(); |
|
1284 |
99 |
sharehandles_->clear(); |
|
1285 |
✓✗ | 99 |
delete pool_handles_idle_; |
1286 |
✓✗ | 99 |
delete pool_handles_inuse_; |
1287 |
✓✗ | 99 |
delete curl_sharehandles_; |
1288 |
✓✗ | 99 |
delete sharehandles_; |
1289 |
✓✗ | 99 |
delete user_agent_; |
1290 |
99 |
curl_multi_cleanup(curl_multi_); |
|
1291 |
99 |
pool_handles_idle_ = NULL; |
|
1292 |
99 |
pool_handles_inuse_ = NULL; |
|
1293 |
99 |
curl_sharehandles_ = NULL; |
|
1294 |
99 |
sharehandles_ = NULL; |
|
1295 |
99 |
user_agent_ = NULL; |
|
1296 |
99 |
curl_multi_ = NULL; |
|
1297 |
|||
1298 |
99 |
delete statistics_; |
|
1299 |
99 |
statistics_ = NULL; |
|
1300 |
|||
1301 |
✓✗ | 99 |
delete available_jobs_; |
1302 |
|||
1303 |
99 |
curl_global_cleanup(); |
|
1304 |
99 |
} |
|
1305 |
|||
1306 |
|||
1307 |
/** |
||
1308 |
* Spawns the I/O worker thread. No way back except Fini(); Init(); |
||
1309 |
*/ |
||
1310 |
99 |
void S3FanoutManager::Spawn() { |
|
1311 |
99 |
LogCvmfs(kLogS3Fanout, kLogDebug, "S3FanoutManager spawned"); |
|
1312 |
|||
1313 |
99 |
thread_upload_run_ = true; |
|
1314 |
int retval = pthread_create(&thread_upload_, NULL, MainUpload, |
||
1315 |
99 |
static_cast<void *>(this)); |
|
1316 |
✗✓ | 99 |
assert(retval == 0); |
1317 |
|||
1318 |
99 |
atomic_inc32(&multi_threaded_); |
|
1319 |
99 |
} |
|
1320 |
|||
1321 |
|||
1322 |
/** |
||
1323 |
* The timeout counts for all sorts of connection phases, |
||
1324 |
* DNS, HTTP connect, etc. |
||
1325 |
*/ |
||
1326 |
99 |
void S3FanoutManager::SetTimeout(const unsigned seconds) { |
|
1327 |
99 |
pthread_mutex_lock(lock_options_); |
|
1328 |
99 |
opt_timeout_ = seconds; |
|
1329 |
99 |
pthread_mutex_unlock(lock_options_); |
|
1330 |
99 |
} |
|
1331 |
|||
1332 |
|||
1333 |
/** |
||
1334 |
* Receives the currently active timeout value. |
||
1335 |
*/ |
||
1336 |
void S3FanoutManager::GetTimeout(unsigned *seconds) { |
||
1337 |
pthread_mutex_lock(lock_options_); |
||
1338 |
*seconds = opt_timeout_; |
||
1339 |
pthread_mutex_unlock(lock_options_); |
||
1340 |
} |
||
1341 |
|||
1342 |
|||
1343 |
36 |
const Statistics &S3FanoutManager::GetStatistics() { |
|
1344 |
36 |
return *statistics_; |
|
1345 |
} |
||
1346 |
|||
1347 |
|||
1348 |
198 |
void S3FanoutManager::SetRetryParameters(const unsigned max_retries, |
|
1349 |
const unsigned backoff_init_ms, |
||
1350 |
const unsigned backoff_max_ms) { |
||
1351 |
198 |
pthread_mutex_lock(lock_options_); |
|
1352 |
198 |
opt_max_retries_ = max_retries; |
|
1353 |
198 |
opt_backoff_init_ms_ = backoff_init_ms; |
|
1354 |
198 |
opt_backoff_max_ms_ = backoff_max_ms; |
|
1355 |
198 |
pthread_mutex_unlock(lock_options_); |
|
1356 |
198 |
} |
|
1357 |
|||
1358 |
/** |
||
1359 |
* Get completed jobs, so they can be cleaned and deleted properly. |
||
1360 |
*/ |
||
1361 |
2330112653 |
int S3FanoutManager::PopCompletedJobs(std::vector<s3fanout::JobInfo*> *jobs) { |
|
1362 |
2330112653 |
pthread_mutex_lock(jobs_completed_lock_); |
|
1363 |
2330112653 |
std::vector<JobInfo*>::iterator it = jobs_completed_.begin(); |
|
1364 |
2330112653 |
const std::vector<JobInfo*>::const_iterator itend = jobs_completed_.end(); |
|
1365 |
✓✓ | 2330115619 |
for (; it != itend; ++it) { |
1366 |
2966 |
jobs->push_back(*it); |
|
1367 |
} |
||
1368 |
2330112653 |
jobs_completed_.clear(); |
|
1369 |
2330112653 |
pthread_mutex_unlock(jobs_completed_lock_); |
|
1370 |
|||
1371 |
2330112653 |
return 0; |
|
1372 |
} |
||
1373 |
|||
1374 |
/** |
||
1375 |
* Push new job to be uploaded to the S3 cloud storage. |
||
1376 |
*/ |
||
1377 |
2966 |
void S3FanoutManager::PushNewJob(JobInfo *info) { |
|
1378 |
2966 |
available_jobs_->Increment(); |
|
1379 |
|||
1380 |
2966 |
pthread_mutex_lock(jobs_todo_lock_); |
|
1381 |
2966 |
jobs_todo_.push_back(info); |
|
1382 |
2966 |
pthread_mutex_unlock(jobs_todo_lock_); |
|
1383 |
2966 |
} |
|
1384 |
|||
1385 |
|||
1386 |
/** |
||
1387 |
* Performs given job synchronously. |
||
1388 |
* |
||
1389 |
* @return true if exists, otherwise false |
||
1390 |
*/ |
||
1391 |
129 |
bool S3FanoutManager::DoSingleJob(JobInfo *info) const { |
|
1392 |
129 |
bool retme = false; |
|
1393 |
|||
1394 |
129 |
CURL *handle = AcquireCurlHandle(); |
|
1395 |
✗✓ | 129 |
if (handle == NULL) { |
1396 |
LogCvmfs(kLogS3Fanout, kLogStderr, "Failed to acquire CURL handle."); |
||
1397 |
assert(handle != NULL); |
||
1398 |
} |
||
1399 |
|||
1400 |
129 |
InitializeRequest(info, handle); |
|
1401 |
129 |
SetUrlOptions(info); |
|
1402 |
|||
1403 |
129 |
CURLcode resl = curl_easy_perform(handle); |
|
1404 |
✓✓✓✗ |
129 |
if (resl == CURLE_OK && info->error_code == kFailOk) { |
1405 |
15 |
retme = true; |
|
1406 |
} |
||
1407 |
|||
1408 |
129 |
ReleaseCurlHandle(info, handle); |
|
1409 |
|||
1410 |
129 |
return retme; |
|
1411 |
} |
||
1412 |
|||
1413 |
//------------------------------------------------------------------------------ |
||
1414 |
|||
1415 |
|||
1416 |
string Statistics::Print() const { |
||
1417 |
return |
||
1418 |
"Transferred Bytes: " + |
||
1419 |
StringifyInt(uint64_t(transferred_bytes)) + "\n" + |
||
1420 |
"Transfer duration: " + |
||
1421 |
StringifyInt(uint64_t(transfer_time)) + " s\n" + |
||
1422 |
"Number of requests: " + |
||
1423 |
StringifyInt(num_requests) + "\n" + |
||
1424 |
"Number of retries: " + |
||
1425 |
StringifyInt(num_retries) + "\n"; |
||
1426 |
} |
||
1427 |
|||
1428 |
} // namespace s3fanout |
Generated by: GCOVR (Version 4.1) |