GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/s3fanout.cc
Date: 2023-02-05 02:36:10
Exec Total Coverage
Lines: 561 800 70.1%
Branches: 402 1149 35.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 *
4 * Runs a thread using libcurls asynchronous I/O mode to push data to S3
5 */
6
7 #include <pthread.h>
8
9 #include <algorithm>
10 #include <cassert>
11 #include <cerrno>
12 #include <utility>
13
14 #include "cvmfs_config.h"
15 #include "s3fanout.h"
16 #include "upload_facility.h"
17 #include "util/concurrency.h"
18 #include "util/exception.h"
19 #include "util/platform.h"
20 #include "util/posix.h"
21 #include "util/string.h"
22
23 using namespace std; // NOLINT
24
25 namespace s3fanout {
26
27 const char *S3FanoutManager::kCacheControlCas = "Cache-Control: max-age=259200";
28 const char *S3FanoutManager::kCacheControlDotCvmfs =
29 "Cache-Control: max-age=61";
30 const unsigned S3FanoutManager::kDefault429ThrottleMs = 250;
31 const unsigned S3FanoutManager::kMax429ThrottleMs = 10000;
32 const unsigned S3FanoutManager::kThrottleReportIntervalSec = 10;
33 const unsigned S3FanoutManager::kDefaultHTTPPort = 80;
34 const unsigned S3FanoutManager::kDefaultHTTPSPort = 443;
35
36
37 /**
38 * Parses Retry-After and X-Retry-In headers attached to HTTP 429 responses
39 */
40 26 void S3FanoutManager::DetectThrottleIndicator(
41 const std::string &header,
42 JobInfo *info)
43 {
44 26 std::string value_str;
45
4/6
✓ Branch 2 taken 26 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 26 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 11 times.
✓ Branch 10 taken 15 times.
26 if (HasPrefix(header, "retry-after:", true))
46
1/2
✓ Branch 1 taken 11 times.
✗ Branch 2 not taken.
11 value_str = header.substr(12);
47
4/6
✓ Branch 2 taken 26 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 26 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 5 times.
✓ Branch 10 taken 21 times.
26 if (HasPrefix(header, "x-retry-in:", true))
48
1/2
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
5 value_str = header.substr(11);
49
50
1/2
✓ Branch 1 taken 26 times.
✗ Branch 2 not taken.
26 value_str = Trim(value_str, true /* trim_newline */);
51
2/2
✓ Branch 1 taken 14 times.
✓ Branch 2 taken 12 times.
26 if (!value_str.empty()) {
52
1/2
✓ Branch 1 taken 14 times.
✗ Branch 2 not taken.
14 unsigned value_numeric = String2Uint64(value_str);
53 unsigned value_ms =
54
4/7
✓ Branch 2 taken 14 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 14 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 5 times.
✓ Branch 8 taken 9 times.
28 HasSuffix(value_str, "ms", true /* ignore_case */) ?
55 14 value_numeric : (value_numeric * 1000);
56
2/2
✓ Branch 0 taken 13 times.
✓ Branch 1 taken 1 times.
14 if (value_ms > 0)
57 13 info->throttle_ms = std::min(value_ms, kMax429ThrottleMs);
58 }
59 26 }
60
61
62 /**
63 * Called by curl for every HTTP header. Not called for file:// transfers.
64 */
65 3682 static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb,
66 void *info_link) {
67 3682 const size_t num_bytes = size*nmemb;
68
1/2
✓ Branch 2 taken 3682 times.
✗ Branch 3 not taken.
3682 const string header_line(static_cast<const char *>(ptr), num_bytes);
69 3682 JobInfo *info = static_cast<JobInfo *>(info_link);
70
71 // Check for http status code errors
72
4/6
✓ Branch 2 taken 3682 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 3682 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 1226 times.
✓ Branch 10 taken 2456 times.
3682 if (HasPrefix(header_line, "HTTP/1.", false)) {
73
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1226 times.
1226 if (header_line.length() < 10)
74 return 0;
75
76 unsigned i;
77
5/6
✓ Branch 1 taken 2452 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1226 times.
✓ Branch 5 taken 1226 times.
✓ Branch 6 taken 1226 times.
✓ Branch 7 taken 1226 times.
2452 for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {}
78
79
2/2
✓ Branch 1 taken 612 times.
✓ Branch 2 taken 614 times.
1226 if (header_line[i] == '2') {
80 612 return num_bytes;
81 } else {
82
1/2
✓ Branch 2 taken 614 times.
✗ Branch 3 not taken.
614 LogCvmfs(kLogS3Fanout, kLogDebug, "http status error code [info %p]: %s",
83 info, header_line.c_str());
84
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 614 times.
614 if (header_line.length() < i+3) {
85 LogCvmfs(kLogS3Fanout, kLogStderr, "S3: invalid HTTP response '%s'",
86 header_line.c_str());
87 info->error_code = kFailOther;
88 return 0;
89 }
90
2/4
✓ Branch 3 taken 614 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 614 times.
✗ Branch 7 not taken.
614 info->http_error = String2Int64(string(&header_line[i], 3));
91
92
2/6
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 610 times.
✗ Branch 5 not taken.
614 switch (info->http_error) {
93 4 case 429:
94 4 info->error_code = kFailRetry;
95 4 info->throttle_ms = S3FanoutManager::kDefault429ThrottleMs;
96 4 info->throttle_timestamp = platform_monotonic_time();
97 4 return num_bytes;
98 case 503:
99 case 502: // Can happen if the S3 gateway-backend connection breaks
100 case 500: // sometimes see this as a transient error from S3
101 info->error_code = kFailServiceUnavailable;
102 break;
103 case 501:
104 case 400:
105 info->error_code = kFailBadRequest;
106 break;
107 case 403:
108 info->error_code = kFailForbidden;
109 break;
110 610 case 404:
111 610 info->error_code = kFailNotFound;
112 610 return num_bytes;
113 default:
114 info->error_code = kFailOther;
115 }
116 return 0;
117 }
118 }
119
120
2/2
✓ Branch 0 taken 12 times.
✓ Branch 1 taken 2444 times.
2456 if (info->error_code == kFailRetry) {
121
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 S3FanoutManager::DetectThrottleIndicator(header_line, info);
122 }
123
124 2456 return num_bytes;
125 3682 }
126
127
128 /**
129 * Called by curl for every new chunk to upload.
130 */
131 15669 static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb,
132 void *info_link) {
133 15669 const size_t num_bytes = size*nmemb;
134 15669 JobInfo *info = static_cast<JobInfo *>(info_link);
135
136 15669 LogCvmfs(kLogS3Fanout, kLogDebug, "Data callback with %d bytes", num_bytes);
137
138
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15669 times.
15669 if (num_bytes == 0)
139 return 0;
140
141 15669 uint64_t read_bytes = info->origin->Read(ptr, num_bytes);
142
143 15669 LogCvmfs(kLogS3Fanout, kLogDebug,
144 "source buffer pushed out %d bytes", read_bytes);
145
146 15669 return read_bytes;
147 }
148
149
150 /**
151 * For the time being, ignore all received information in the HTTP body
152 */
153 static size_t CallbackCurlBody(
154 char * /*ptr*/, size_t size, size_t nmemb, void * /*userdata*/)
155 {
156 return size * nmemb;
157 }
158
159
160 /**
161 * Called when new curl sockets arrive or existing curl sockets depart.
162 */
163 2719 int S3FanoutManager::CallbackCurlSocket(CURL *easy, curl_socket_t s, int action,
164 void *userp, void *socketp) {
165 2719 S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(userp);
166 2719 LogCvmfs(kLogS3Fanout, kLogDebug, "CallbackCurlSocket called with easy "
167 "handle %p, socket %d, action %d, up %d, "
168 "sp %d, fds_inuse %d, jobs %d",
169 easy, s, action, userp,
170 socketp, s3fanout_mgr->watch_fds_inuse_,
171 2719 s3fanout_mgr->available_jobs_->Get());
172
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2719 times.
2719 if (action == CURL_POLL_NONE)
173 return 0;
174
175 // Find s in watch_fds_
176 // First 2 fds are job and terminate pipes (not curl related)
177 unsigned index;
178
2/2
✓ Branch 0 taken 4385 times.
✓ Branch 1 taken 1226 times.
5611 for (index = 2; index < s3fanout_mgr->watch_fds_inuse_; ++index) {
179
2/2
✓ Branch 0 taken 1493 times.
✓ Branch 1 taken 2892 times.
4385 if (s3fanout_mgr->watch_fds_[index].fd == s)
180 1493 break;
181 }
182 // Or create newly
183
2/2
✓ Branch 0 taken 1226 times.
✓ Branch 1 taken 1493 times.
2719 if (index == s3fanout_mgr->watch_fds_inuse_) {
184 // Extend array if necessary
185
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1224 times.
1226 if (s3fanout_mgr->watch_fds_inuse_ == s3fanout_mgr->watch_fds_size_) {
186 2 s3fanout_mgr->watch_fds_size_ *= 2;
187 2 s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
188 2 srealloc(s3fanout_mgr->watch_fds_,
189 2 s3fanout_mgr->watch_fds_size_*sizeof(struct pollfd)));
190 }
191 1226 s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].fd = s;
192 1226 s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].events = 0;
193 1226 s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].revents = 0;
194 1226 s3fanout_mgr->watch_fds_inuse_++;
195 }
196
197
4/5
✓ Branch 0 taken 1226 times.
✓ Branch 1 taken 3 times.
✓ Branch 2 taken 264 times.
✓ Branch 3 taken 1226 times.
✗ Branch 4 not taken.
2719 switch (action) {
198 1226 case CURL_POLL_IN:
199 1226 s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI;
200 1226 break;
201 3 case CURL_POLL_OUT:
202 3 s3fanout_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND;
203 3 break;
204 264 case CURL_POLL_INOUT:
205 264 s3fanout_mgr->watch_fds_[index].events =
206 POLLIN | POLLPRI | POLLOUT | POLLWRBAND;
207 264 break;
208 1226 case CURL_POLL_REMOVE:
209
2/2
✓ Branch 0 taken 195 times.
✓ Branch 1 taken 1031 times.
1226 if (index < s3fanout_mgr->watch_fds_inuse_-1)
210 195 s3fanout_mgr->watch_fds_[index] =
211 195 s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_-1];
212 1226 s3fanout_mgr->watch_fds_inuse_--;
213 // Shrink array if necessary
214
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1226 times.
1226 if ((s3fanout_mgr->watch_fds_inuse_ > s3fanout_mgr->watch_fds_max_) &&
215 (s3fanout_mgr->watch_fds_inuse_ < s3fanout_mgr->watch_fds_size_/2)) {
216 s3fanout_mgr->watch_fds_size_ /= 2;
217 s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
218 srealloc(s3fanout_mgr->watch_fds_,
219 s3fanout_mgr->watch_fds_size_*sizeof(struct pollfd)));
220 }
221 1226 break;
222 default:
223 PANIC(NULL);
224 }
225
226 2719 return 0;
227 }
228
229
230 /**
231 * Worker thread event loop.
232 */
233 12 void *S3FanoutManager::MainUpload(void *data) {
234
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread started");
235 12 S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(data);
236
237 12 s3fanout_mgr->InitPipeWatchFds();
238
239 // Don't schedule more jobs into the multi handle than the maximum number of
240 // parallel connections. This should prevent starvation and thus a timeout
241 // of the authorization header (CVM-1339).
242 12 unsigned jobs_in_flight = 0;
243
244 while (true) {
245 // Check events with 100ms timeout
246 16653 int timeout_ms = 100;
247
1/2
✓ Branch 1 taken 16653 times.
✗ Branch 2 not taken.
16653 int retval = poll(s3fanout_mgr->watch_fds_, s3fanout_mgr->watch_fds_inuse_,
248 timeout_ms);
249
2/2
✓ Branch 0 taken 89 times.
✓ Branch 1 taken 16564 times.
16653 if (retval == 0) {
250 // Handle timeout
251 89 int still_running = 0;
252
1/2
✓ Branch 1 taken 89 times.
✗ Branch 2 not taken.
89 retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
253 CURL_SOCKET_TIMEOUT,
254 0,
255 &still_running);
256
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 89 times.
89 if (retval != CURLM_OK) {
257 LogCvmfs(kLogS3Fanout, kLogStderr, "Error, timeout due to: %d", retval);
258 assert(retval == CURLM_OK);
259 }
260
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 16564 times.
16564 } else if (retval < 0) {
261 assert(errno == EINTR);
262 continue;
263 }
264
265 // Terminate I/O thread
266
2/2
✓ Branch 0 taken 12 times.
✓ Branch 1 taken 16641 times.
16653 if (s3fanout_mgr->watch_fds_[0].revents)
267 12 break;
268
269 // New job incoming
270
2/2
✓ Branch 0 taken 614 times.
✓ Branch 1 taken 16027 times.
16641 if (s3fanout_mgr->watch_fds_[1].revents) {
271 614 s3fanout_mgr->watch_fds_[1].revents = 0;
272 JobInfo *info;
273
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 ReadPipe(s3fanout_mgr->pipe_jobs_[0], &info, sizeof(info));
274
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 CURL *handle = s3fanout_mgr->AcquireCurlHandle();
275
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 614 times.
614 if (handle == NULL) {
276 PANIC(kLogStderr, "Failed to acquire CURL handle.");
277 }
278 s3fanout::Failures init_failure =
279
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 s3fanout_mgr->InitializeRequest(info, handle);
280
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 614 times.
614 if (init_failure != s3fanout::kFailOk) {
281 PANIC(kLogStderr,
282 "Failed to initialize CURL handle (error: %d - %s | errno: %d)",
283 init_failure, Code2Ascii(init_failure), errno);
284 }
285
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 s3fanout_mgr->SetUrlOptions(info);
286
287
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 curl_multi_add_handle(s3fanout_mgr->curl_multi_, handle);
288
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 s3fanout_mgr->active_requests_->insert(info);
289 614 jobs_in_flight++;
290 614 int still_running = 0, retval = 0;
291
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
292 CURL_SOCKET_TIMEOUT,
293 0,
294 &still_running);
295
296
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 LogCvmfs(kLogS3Fanout, kLogDebug,
297 "curl_multi_socket_action: %d - %d",
298 retval, still_running);
299 }
300
301
302 // Activity on curl sockets
303 // Within this loop the curl_multi_socket_action() may cause socket(s)
304 // to be removed from watch_fds_. If a socket is removed it is replaced
305 // by the socket at the end of the array and the inuse count is decreased.
306 // Therefore loop over the array in reverse order.
307 // First 2 fds are job and terminate pipes (not curl related)
308
2/2
✓ Branch 0 taken 37424 times.
✓ Branch 1 taken 16641 times.
54065 for (int32_t i = s3fanout_mgr->watch_fds_inuse_ - 1; i >= 2; --i) {
309
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 37424 times.
37424 if (static_cast<uint32_t>(i) >= s3fanout_mgr->watch_fds_inuse_) {
310 continue;
311 }
312
2/2
✓ Branch 0 taken 17066 times.
✓ Branch 1 taken 20358 times.
37424 if (s3fanout_mgr->watch_fds_[i].revents) {
313 17066 int ev_bitmask = 0;
314
2/2
✓ Branch 0 taken 1834 times.
✓ Branch 1 taken 15232 times.
17066 if (s3fanout_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI))
315 1834 ev_bitmask |= CURL_CSELECT_IN;
316
2/2
✓ Branch 0 taken 15232 times.
✓ Branch 1 taken 1834 times.
17066 if (s3fanout_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
317 15232 ev_bitmask |= CURL_CSELECT_OUT;
318
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 17066 times.
17066 if (s3fanout_mgr->watch_fds_[i].revents &
319 (POLLERR | POLLHUP | POLLNVAL))
320 ev_bitmask |= CURL_CSELECT_ERR;
321 17066 s3fanout_mgr->watch_fds_[i].revents = 0;
322
323 17066 int still_running = 0;
324 17066 retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
325
1/2
✓ Branch 1 taken 17066 times.
✗ Branch 2 not taken.
17066 s3fanout_mgr->watch_fds_[i].fd,
326 ev_bitmask,
327 &still_running);
328 }
329 }
330
331 // Check if transfers are completed
332 CURLMsg *curl_msg;
333 int msgs_in_queue;
334
3/4
✓ Branch 1 taken 17867 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1226 times.
✓ Branch 4 taken 16641 times.
17867 while ((curl_msg = curl_multi_info_read(s3fanout_mgr->curl_multi_,
335 &msgs_in_queue)))
336 {
337
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1226 times.
1226 assert(curl_msg->msg == CURLMSG_DONE);
338
339 1226 s3fanout_mgr->statistics_->num_requests++;
340 JobInfo *info;
341 1226 CURL *easy_handle = curl_msg->easy_handle;
342 1226 int curl_error = curl_msg->data.result;
343
1/2
✓ Branch 1 taken 1226 times.
✗ Branch 2 not taken.
1226 curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
344
345
1/2
✓ Branch 1 taken 1226 times.
✗ Branch 2 not taken.
1226 curl_multi_remove_handle(s3fanout_mgr->curl_multi_, easy_handle);
346
3/4
✓ Branch 1 taken 1226 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 612 times.
✓ Branch 4 taken 614 times.
1226 if (s3fanout_mgr->VerifyAndFinalize(curl_error, info)) {
347
1/2
✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
612 curl_multi_add_handle(s3fanout_mgr->curl_multi_, easy_handle);
348 612 int still_running = 0;
349
1/2
✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
612 curl_multi_socket_action(s3fanout_mgr->curl_multi_,
350 CURL_SOCKET_TIMEOUT,
351 0,
352 &still_running);
353 } else {
354 // Return easy handle into pool and write result back
355 614 jobs_in_flight--;
356
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 s3fanout_mgr->active_requests_->erase(info);
357
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 s3fanout_mgr->ReleaseCurlHandle(info, easy_handle);
358
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 s3fanout_mgr->available_jobs_->Decrement();
359
360 // Add to list of completed jobs
361
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 s3fanout_mgr->PushCompletedJob(info);
362 }
363 }
364 16641 }
365
366 12 set<CURL *>::iterator i = s3fanout_mgr->pool_handles_inuse_->begin();
367 const set<CURL *>::const_iterator i_end =
368 12 s3fanout_mgr->pool_handles_inuse_->end();
369
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 12 times.
12 for (; i != i_end; ++i) {
370 curl_multi_remove_handle(s3fanout_mgr->curl_multi_, *i);
371 curl_easy_cleanup(*i);
372 }
373 12 s3fanout_mgr->pool_handles_inuse_->clear();
374 12 free(s3fanout_mgr->watch_fds_);
375
376
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread terminated");
377 12 return NULL;
378 }
379
380
381 /**
382 * Gets an idle CURL handle from the pool. Creates a new one and adds it to
383 * the pool if necessary.
384 */
385 614 CURL *S3FanoutManager::AcquireCurlHandle() const {
386 CURL *handle;
387
388 614 MutexLockGuard guard(curl_handle_lock_);
389
390
2/2
✓ Branch 1 taken 49 times.
✓ Branch 2 taken 565 times.
614 if (pool_handles_idle_->empty()) {
391 CURLcode retval;
392
393 // Create a new handle
394
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 handle = curl_easy_init();
395
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 49 times.
49 assert(handle != NULL);
396
397 // Other settings
398
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 retval = curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
399
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 49 times.
49 assert(retval == CURLE_OK);
400
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 retval = curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION,
401 CallbackCurlHeader);
402
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 49 times.
49 assert(retval == CURLE_OK);
403
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 retval = curl_easy_setopt(handle, CURLOPT_READFUNCTION, CallbackCurlData);
404
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 49 times.
49 assert(retval == CURLE_OK);
405
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 retval = curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlBody);
406
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 49 times.
49 assert(retval == CURLE_OK);
407 } else {
408 565 handle = *(pool_handles_idle_->begin());
409
1/2
✓ Branch 2 taken 565 times.
✗ Branch 3 not taken.
565 pool_handles_idle_->erase(pool_handles_idle_->begin());
410 }
411
412
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 pool_handles_inuse_->insert(handle);
413
414 614 return handle;
415 614 }
416
417
418 614 void S3FanoutManager::ReleaseCurlHandle(JobInfo *info, CURL *handle) const {
419
1/2
✓ Branch 0 taken 614 times.
✗ Branch 1 not taken.
614 if (info->http_headers) {
420
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 curl_slist_free_all(info->http_headers);
421 614 info->http_headers = NULL;
422 }
423
424 614 MutexLockGuard guard(curl_handle_lock_);
425
426
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
427
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 614 times.
614 assert(elem != pool_handles_inuse_->end());
428
429
2/2
✓ Branch 1 taken 29 times.
✓ Branch 2 taken 585 times.
614 if (pool_handles_idle_->size() > config_.pool_max_handles) {
430
1/2
✓ Branch 1 taken 29 times.
✗ Branch 2 not taken.
29 CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, NULL);
431
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 assert(retval == CURLE_OK);
432
1/2
✓ Branch 1 taken 29 times.
✗ Branch 2 not taken.
29 curl_easy_cleanup(handle);
433 std::map<CURL *, S3FanOutDnsEntry *>::size_type retitems =
434
1/2
✓ Branch 1 taken 29 times.
✗ Branch 2 not taken.
29 curl_sharehandles_->erase(handle);
435
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 assert(retitems == 1);
436 } else {
437
1/2
✓ Branch 1 taken 585 times.
✗ Branch 2 not taken.
585 pool_handles_idle_->insert(handle);
438 }
439
440
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 pool_handles_inuse_->erase(elem);
441 614 }
442
443 12 void S3FanoutManager::InitPipeWatchFds() {
444
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
12 assert(watch_fds_inuse_ == 0);
445
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
12 assert(watch_fds_size_ >= 2);
446 12 watch_fds_[0].fd = pipe_terminate_[0];
447 12 watch_fds_[0].events = POLLIN | POLLPRI;
448 12 watch_fds_[0].revents = 0;
449 12 ++watch_fds_inuse_;
450 12 watch_fds_[1].fd = pipe_jobs_[0];
451 12 watch_fds_[1].events = POLLIN | POLLPRI;
452 12 watch_fds_[1].revents = 0;
453 12 ++watch_fds_inuse_;
454 12 }
455
456 /**
457 * The Amazon AWS 2 authorization header according to
458 * http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html#ConstructingTheAuthenticationHeader
459 */
460 1222 bool S3FanoutManager::MkV2Authz(const JobInfo &info, vector<string> *headers)
461 const
462 {
463 1222 string payload_hash;
464
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 bool retval = MkPayloadHash(info, &payload_hash);
465
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 if (!retval)
466 return false;
467
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 string content_type = GetContentType(info);
468
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 string request = GetRequestString(info);
469
470
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 string timestamp = RfcTimestamp();
471
2/4
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
2444 string to_sign = request + "\n" +
472
2/4
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
2444 payload_hash + "\n" +
473
2/4
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
2444 content_type + "\n" +
474
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 timestamp + "\n";
475
2/4
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1222 times.
✗ Branch 4 not taken.
1222 if (config_.x_amz_acl != "") {
476
2/4
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
2444 to_sign += "x-amz-acl:" + config_.x_amz_acl + "\n" + // default ACL
477
5/10
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 1222 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 1222 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 1222 times.
✗ Branch 14 not taken.
2444 "/" + config_.bucket + "/" + info.object_key;
478 }
479
1/2
✓ Branch 3 taken 1222 times.
✗ Branch 4 not taken.
1222 LogCvmfs(kLogS3Fanout, kLogDebug, "%s string to sign for: %s",
480 request.c_str(), info.object_key.c_str());
481
482
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 shash::Any hmac;
483 1222 hmac.algorithm = shash::kSha1;
484
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 shash::Hmac(config_.secret_key,
485 1222 reinterpret_cast<const unsigned char *>(to_sign.data()),
486 1222 to_sign.length(), &hmac);
487
488
4/8
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 1222 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 1222 times.
✗ Branch 11 not taken.
1222 headers->push_back("Authorization: AWS " + config_.access_key + ":" +
489
2/4
✓ Branch 2 taken 1222 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1222 times.
✗ Branch 6 not taken.
3666 Base64(string(reinterpret_cast<char *>(hmac.digest),
490 1222 hmac.GetDigestSize())));
491
2/4
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
1222 headers->push_back("Date: " + timestamp);
492
2/4
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
1222 headers->push_back("X-Amz-Acl: " + config_.x_amz_acl);
493
2/2
✓ Branch 1 taken 608 times.
✓ Branch 2 taken 614 times.
1222 if (!payload_hash.empty())
494
2/4
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 608 times.
✗ Branch 5 not taken.
608 headers->push_back("Content-MD5: " + payload_hash);
495
2/2
✓ Branch 1 taken 608 times.
✓ Branch 2 taken 614 times.
1222 if (!content_type.empty())
496
2/4
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 608 times.
✗ Branch 5 not taken.
608 headers->push_back("Content-Type: " + content_type);
497 1222 return true;
498 1222 }
499
500
501 string S3FanoutManager::GetUriEncode(const string &val, bool encode_slash)
502 const
503 {
504 string result;
505 const unsigned len = val.length();
506 result.reserve(len);
507 for (unsigned i = 0; i < len; ++i) {
508 char c = val[i];
509 if ((c >= 'A' && c <= 'Z') ||
510 (c >= 'a' && c <= 'z') ||
511 (c >= '0' && c <= '9') ||
512 c == '_' || c == '-' || c == '~' || c == '.')
513 {
514 result.push_back(c);
515 } else if (c == '/') {
516 if (encode_slash) {
517 result += "%2F";
518 } else {
519 result.push_back(c);
520 }
521 } else {
522 result.push_back('%');
523 result.push_back((c / 16) + ((c / 16 <= 9) ? '0' : 'A'-10));
524 result.push_back((c % 16) + ((c % 16 <= 9) ? '0' : 'A'-10));
525 }
526 }
527 return result;
528 }
529
530
531 string S3FanoutManager::GetAwsV4SigningKey(const string &date) const
532 {
533 if (last_signing_key_.first == date)
534 return last_signing_key_.second;
535
536 string date_key = shash::Hmac256("AWS4" + config_.secret_key, date, true);
537 string date_region_key = shash::Hmac256(date_key, config_.region, true);
538 string date_region_service_key = shash::Hmac256(date_region_key, "s3", true);
539 string signing_key =
540 shash::Hmac256(date_region_service_key, "aws4_request", true);
541 last_signing_key_.first = date;
542 last_signing_key_.second = signing_key;
543 return signing_key;
544 }
545
546
547 /**
548 * The Amazon AWS4 authorization header according to
549 * http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-auth-using-authorization-header.html
550 */
551 bool S3FanoutManager::MkV4Authz(const JobInfo &info, vector<string> *headers)
552 const
553 {
554 string payload_hash;
555 bool retval = MkPayloadHash(info, &payload_hash);
556 if (!retval)
557 return false;
558 string content_type = GetContentType(info);
559 string timestamp = IsoTimestamp();
560 string date = timestamp.substr(0, 8);
561 vector<string> tokens = SplitString(complete_hostname_, ':');
562 assert(tokens.size() <= 2);
563 string canonical_hostname = tokens[0];
564
565 // if we could split the hostname in two and if the port is *NOT* a default
566 // one
567 if (tokens.size() == 2 && !((String2Uint64(tokens[1]) == kDefaultHTTPPort) ||
568 (String2Uint64(tokens[1]) == kDefaultHTTPSPort)))
569 canonical_hostname += ":" + tokens[1];
570
571 string signed_headers;
572 string canonical_headers;
573 if (!content_type.empty()) {
574 signed_headers += "content-type;";
575 headers->push_back("Content-Type: " + content_type);
576 canonical_headers += "content-type:" + content_type + "\n";
577 }
578 if (config_.x_amz_acl != "") {
579 signed_headers += "host;x-amz-acl;x-amz-content-sha256;x-amz-date";
580 } else {
581 signed_headers += "host;x-amz-content-sha256;x-amz-date";
582 }
583 canonical_headers +=
584 "host:" + canonical_hostname + "\n";
585 if (config_.x_amz_acl != "") {
586 canonical_headers += "x-amz-acl:" + config_.x_amz_acl +"\n";
587 }
588 canonical_headers += "x-amz-content-sha256:" + payload_hash + "\n" +
589 "x-amz-date:" + timestamp + "\n";
590
591 string scope = date + "/" + config_.region + "/s3/aws4_request";
592 string uri = config_.dns_buckets ?
593 (string("/") + info.object_key) :
594 (string("/") + config_.bucket + "/" + info.object_key);
595
596 string canonical_request =
597 GetRequestString(info) + "\n" +
598 GetUriEncode(uri, false) + "\n" +
599 "\n" +
600 canonical_headers + "\n" +
601 signed_headers + "\n" +
602 payload_hash;
603
604 string hash_request = shash::Sha256String(canonical_request.c_str());
605
606 string string_to_sign =
607 "AWS4-HMAC-SHA256\n" +
608 timestamp + "\n" +
609 scope + "\n" +
610 hash_request;
611
612 string signing_key = GetAwsV4SigningKey(date);
613 string signature = shash::Hmac256(signing_key, string_to_sign);
614
615 headers->push_back("X-Amz-Acl: " + config_.x_amz_acl);
616 headers->push_back("X-Amz-Content-Sha256: " + payload_hash);
617 headers->push_back("X-Amz-Date: " + timestamp);
618 headers->push_back(
619 "Authorization: AWS4-HMAC-SHA256 "
620 "Credential=" + config_.access_key + "/" + scope + ","
621 "SignedHeaders=" + signed_headers + ","
622 "Signature=" + signature);
623 return true;
624 }
625
626 /**
627 * The Azure Blob authorization header according to
628 * https://docs.microsoft.com/en-us/rest/api/storageservices/authorize-with-shared-key
629 */
630 bool S3FanoutManager::MkAzureAuthz(const JobInfo &info, vector<string> *headers)
631 const
632 {
633 string timestamp = RfcTimestamp();
634 string canonical_headers =
635 "x-ms-blob-type:BlockBlob\nx-ms-date:" +
636 timestamp +
637 "\nx-ms-version:2011-08-18";
638 string canonical_resource =
639 "/" + config_.access_key + "/" + config_.bucket + "/" + info.object_key;
640
641 string string_to_sign;
642 if ((info.request == JobInfo::kReqHeadOnly) ||
643 (info.request == JobInfo::kReqHeadPut) ||
644 (info.request == JobInfo::kReqDelete)) {
645 string_to_sign =
646 GetRequestString(info) +
647 string("\n\n\n") +
648 "\n\n\n\n\n\n\n\n\n" +
649 canonical_headers + "\n" +
650 canonical_resource;
651 } else {
652 string_to_sign =
653 GetRequestString(info) +
654 string("\n\n\n") +
655 string(StringifyInt(info.origin->GetSize())) + "\n\n\n\n\n\n\n\n\n" +
656 canonical_headers + "\n" +
657 canonical_resource;
658 }
659
660 string signing_key;
661 int retval = Debase64(config_.secret_key, &signing_key);
662 if (!retval)
663 return false;
664
665 string signature = shash::Hmac256(signing_key, string_to_sign, true);
666
667 headers->push_back("x-ms-date: " + timestamp);
668 headers->push_back("x-ms-version: 2011-08-18");
669 headers->push_back(
670 "Authorization: SharedKey " + config_.access_key + ":" + Base64(signature));
671 headers->push_back("x-ms-blob-type: BlockBlob");
672 return true;
673 }
674
675 1222 void S3FanoutManager::InitializeDnsSettingsCurl(
676 CURL *handle,
677 CURLSH *sharehandle,
678 curl_slist *clist) const
679 {
680 1222 CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, sharehandle);
681
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 assert(retval == CURLE_OK);
682 1222 retval = curl_easy_setopt(handle, CURLOPT_RESOLVE, clist);
683
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 assert(retval == CURLE_OK);
684 1222 }
685
686
687 1222 int S3FanoutManager::InitializeDnsSettings(
688 CURL *handle,
689 std::string host_with_port) const
690 {
691 // Use existing handle
692 std::map<CURL *, S3FanOutDnsEntry *>::const_iterator it =
693
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 curl_sharehandles_->find(handle);
694
2/2
✓ Branch 3 taken 1173 times.
✓ Branch 4 taken 49 times.
1222 if (it != curl_sharehandles_->end()) {
695
1/2
✓ Branch 1 taken 1173 times.
✗ Branch 2 not taken.
1173 InitializeDnsSettingsCurl(handle, it->second->sharehandle,
696 1173 it->second->clist);
697 1173 return 0;
698 }
699
700 // Add protocol information for extraction of fields for DNS
701
2/4
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 49 times.
✗ Branch 4 not taken.
49 if (!IsHttpUrl(host_with_port))
702
2/4
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 49 times.
✗ Branch 5 not taken.
49 host_with_port = config_.protocol + "://" + host_with_port;
703
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 std::string remote_host = dns::ExtractHost(host_with_port);
704
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 std::string remote_port = dns::ExtractPort(host_with_port);
705
706 // If we have the name already resolved, use the least used IP
707 49 S3FanOutDnsEntry *useme = NULL;
708 49 unsigned int usemin = UINT_MAX;
709 49 std::set<S3FanOutDnsEntry *>::iterator its3 = sharehandles_->begin();
710
2/2
✓ Branch 3 taken 39 times.
✓ Branch 4 taken 49 times.
88 for (; its3 != sharehandles_->end(); ++its3) {
711
1/2
✓ Branch 2 taken 39 times.
✗ Branch 3 not taken.
39 if ((*its3)->dns_name == remote_host) {
712
1/2
✓ Branch 1 taken 39 times.
✗ Branch 2 not taken.
39 if (usemin >= (*its3)->counter) {
713 39 usemin = (*its3)->counter;
714 39 useme = (*its3);
715 }
716 }
717 }
718
2/2
✓ Branch 0 taken 39 times.
✓ Branch 1 taken 10 times.
49 if (useme != NULL) {
719
1/2
✓ Branch 2 taken 39 times.
✗ Branch 3 not taken.
39 curl_sharehandles_->insert(std::pair<CURL *,
720 S3FanOutDnsEntry *>(handle, useme));
721 39 useme->counter++;
722
1/2
✓ Branch 1 taken 39 times.
✗ Branch 2 not taken.
39 InitializeDnsSettingsCurl(handle, useme->sharehandle, useme->clist);
723 39 return 0;
724 }
725
726 // We need to resolve the hostname
727 // TODO(ssheikki): support ipv6 also... if (opt_ipv4_only_)
728
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 dns::Host host = resolver_->Resolve(remote_host);
729
1/2
✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
10 set<string> ipv4_addresses = host.ipv4_addresses();
730 10 std::set<string>::iterator its = ipv4_addresses.begin();
731 10 S3FanOutDnsEntry *dnse = NULL;
732
2/2
✓ Branch 3 taken 10 times.
✓ Branch 4 taken 10 times.
20 for ( ; its != ipv4_addresses.end(); ++its) {
733
2/4
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 10 times.
✗ Branch 5 not taken.
10 dnse = new S3FanOutDnsEntry();
734 10 dnse->counter = 0;
735
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 dnse->dns_name = remote_host;
736
3/10
✗ Branch 1 not taken.
✓ Branch 2 taken 10 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 8 taken 10 times.
✗ Branch 9 not taken.
✗ Branch 12 not taken.
✓ Branch 13 taken 10 times.
✗ Branch 15 not taken.
✗ Branch 16 not taken.
10 dnse->port = remote_port.size() == 0 ? "80" : remote_port;
737
1/2
✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
10 dnse->ip = *its;
738 10 dnse->clist = NULL;
739
1/2
✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
10 dnse->clist = curl_slist_append(dnse->clist,
740
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
20 (dnse->dns_name+":"+
741
2/4
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 10 times.
✗ Branch 5 not taken.
20 dnse->port+":"+
742
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 dnse->ip).c_str());
743
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 dnse->sharehandle = curl_share_init();
744
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
10 assert(dnse->sharehandle != NULL);
745
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 CURLSHcode share_retval = curl_share_setopt(dnse->sharehandle,
746 CURLSHOPT_SHARE,
747 CURL_LOCK_DATA_DNS);
748
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
10 assert(share_retval == CURLSHE_OK);
749
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 sharehandles_->insert(dnse);
750 }
751
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
10 if (dnse == NULL) {
752 LogCvmfs(kLogS3Fanout, kLogStderr | kLogSyslogErr,
753 "Error: DNS resolve failed for address '%s'.",
754 remote_host.c_str());
755 assert(dnse != NULL);
756 return -1;
757 }
758
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 curl_sharehandles_->insert(
759 10 std::pair<CURL *, S3FanOutDnsEntry *>(handle, dnse));
760 10 dnse->counter++;
761
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 InitializeDnsSettingsCurl(handle, dnse->sharehandle, dnse->clist);
762
763 10 return 0;
764 49 }
765
766
767 1222 bool S3FanoutManager::MkPayloadHash(const JobInfo &info, string *hex_hash)
768 const
769 {
770
2/2
✓ Branch 0 taken 1217 times.
✓ Branch 1 taken 5 times.
1222 if ((info.request == JobInfo::kReqHeadOnly) ||
771
2/2
✓ Branch 0 taken 609 times.
✓ Branch 1 taken 608 times.
1217 (info.request == JobInfo::kReqHeadPut) ||
772
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 608 times.
609 (info.request == JobInfo::kReqDelete))
773 {
774
1/4
✓ Branch 0 taken 614 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
614 switch (config_.authz_method) {
775 614 case kAuthzAwsV2:
776 614 hex_hash->clear();
777 614 break;
778 case kAuthzAwsV4:
779 // Sha256 over empty string
780 *hex_hash =
781 "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
782 break;
783 case kAuthzAzure:
784 // no payload hash required for Azure signature
785 hex_hash->clear();
786 break;
787 default:
788 PANIC(NULL);
789 }
790 614 return true;
791 }
792
793 // PUT, there is actually payload
794
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 shash::Any payload_hash(shash::kMd5);
795
796 unsigned char *data;
797 unsigned int nbytes =
798 608 info.origin->Data(reinterpret_cast<void **>(&data),
799
2/4
✓ Branch 2 taken 608 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 608 times.
✗ Branch 6 not taken.
608 info.origin->GetSize(), 0);
800
2/4
✓ Branch 2 taken 608 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 608 times.
608 assert(nbytes == info.origin->GetSize());
801
802
1/4
✓ Branch 0 taken 608 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
608 switch (config_.authz_method) {
803 608 case kAuthzAwsV2:
804
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 shash::HashMem(data, nbytes, &payload_hash);
805 *hex_hash =
806
2/4
✓ Branch 2 taken 608 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 608 times.
✗ Branch 6 not taken.
1824 Base64(string(reinterpret_cast<char *>(payload_hash.digest),
807 1216 payload_hash.GetDigestSize()));
808 608 return true;
809 case kAuthzAwsV4:
810 *hex_hash =
811 shash::Sha256Mem(data, nbytes);
812 return true;
813 case kAuthzAzure:
814 // no payload hash required for Azure signature
815 hex_hash->clear();
816 return true;
817 default:
818 PANIC(NULL);
819 }
820 }
821
822 1223 string S3FanoutManager::GetRequestString(const JobInfo &info) const {
823
3/4
✓ Branch 0 taken 613 times.
✓ Branch 1 taken 608 times.
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
1223 switch (info.request) {
824 613 case JobInfo::kReqHeadOnly:
825 case JobInfo::kReqHeadPut:
826
1/2
✓ Branch 2 taken 613 times.
✗ Branch 3 not taken.
613 return "HEAD";
827 608 case JobInfo::kReqPutCas:
828 case JobInfo::kReqPutDotCvmfs:
829 case JobInfo::kReqPutHtml:
830 case JobInfo::kReqPutBucket:
831
1/2
✓ Branch 2 taken 608 times.
✗ Branch 3 not taken.
608 return "PUT";
832 2 case JobInfo::kReqDelete:
833
1/2
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
2 return "DELETE";
834 default:
835 PANIC(NULL);
836 }
837 }
838
839
840 1222 string S3FanoutManager::GetContentType(const JobInfo &info) const {
841
2/6
✓ Branch 0 taken 614 times.
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
1222 switch (info.request) {
842 614 case JobInfo::kReqHeadOnly:
843 case JobInfo::kReqHeadPut:
844 case JobInfo::kReqDelete:
845
1/2
✓ Branch 2 taken 614 times.
✗ Branch 3 not taken.
614 return "";
846 608 case JobInfo::kReqPutCas:
847
1/2
✓ Branch 2 taken 608 times.
✗ Branch 3 not taken.
608 return "application/octet-stream";
848 case JobInfo::kReqPutDotCvmfs:
849 return "application/x-cvmfs";
850 case JobInfo::kReqPutHtml:
851 return "text/html";
852 case JobInfo::kReqPutBucket:
853 return "text/xml";
854 default:
855 PANIC(NULL);
856 }
857 }
858
859
860 /**
861 * Request parameters set the URL and other options such as timeout and
862 * proxy.
863 */
864 1222 Failures S3FanoutManager::InitializeRequest(JobInfo *info, CURL *handle) const {
865 // Initialize internal download state
866 1222 info->curl_handle = handle;
867 1222 info->error_code = kFailOk;
868 1222 info->http_error = 0;
869 1222 info->num_retries = 0;
870 1222 info->backoff_ms = 0;
871 1222 info->throttle_ms = 0;
872 1222 info->throttle_timestamp = 0;
873 1222 info->http_headers = NULL;
874 // info->payload_size is needed in S3Uploader::MainCollectResults,
875 // where info->origin is already destroyed.
876
1/2
✓ Branch 2 taken 1222 times.
✗ Branch 3 not taken.
1222 info->payload_size = info->origin->GetSize();
877
878
2/4
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
1222 InitializeDnsSettings(handle, complete_hostname_);
879
880 CURLcode retval;
881
2/2
✓ Branch 0 taken 1217 times.
✓ Branch 1 taken 5 times.
1222 if ((info->request == JobInfo::kReqHeadOnly) ||
882
2/2
✓ Branch 0 taken 609 times.
✓ Branch 1 taken 608 times.
1217 (info->request == JobInfo::kReqHeadPut) ||
883
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 608 times.
609 (info->request == JobInfo::kReqDelete))
884 {
885
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 0);
886
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 614 times.
614 assert(retval == CURLE_OK);
887
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
888
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 614 times.
614 assert(retval == CURLE_OK);
889
890
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 613 times.
614 if (info->request == JobInfo::kReqDelete)
891 {
892
2/4
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 1 times.
✗ Branch 6 not taken.
1 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST,
893 GetRequestString(*info).c_str());
894
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 assert(retval == CURLE_OK);
895 } else {
896
1/2
✓ Branch 1 taken 613 times.
✗ Branch 2 not taken.
613 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
897
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 613 times.
613 assert(retval == CURLE_OK);
898 }
899 } else {
900
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
901
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 608 times.
608 assert(retval == CURLE_OK);
902
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 1);
903
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 608 times.
608 assert(retval == CURLE_OK);
904
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 0);
905
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 608 times.
608 assert(retval == CURLE_OK);
906
2/4
✓ Branch 2 taken 608 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 608 times.
✗ Branch 6 not taken.
608 retval = curl_easy_setopt(handle, CURLOPT_INFILESIZE_LARGE,
907 static_cast<curl_off_t>(info->origin->GetSize()));
908
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 608 times.
608 assert(retval == CURLE_OK);
909
910
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 608 times.
608 if (info->request == JobInfo::kReqPutDotCvmfs) {
911 info->http_headers =
912 curl_slist_append(info->http_headers, kCacheControlDotCvmfs);
913
1/2
✓ Branch 0 taken 608 times.
✗ Branch 1 not taken.
608 } else if (info->request == JobInfo::kReqPutCas) {
914 608 info->http_headers =
915
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 curl_slist_append(info->http_headers, kCacheControlCas);
916 }
917 }
918
919 bool retval_b;
920
921 // Authorization
922 1222 vector<string> authz_headers;
923
1/4
✓ Branch 0 taken 1222 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
1222 switch (config_.authz_method) {
924 1222 case kAuthzAwsV2:
925
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 retval_b = MkV2Authz(*info, &authz_headers);
926 1222 break;
927 case kAuthzAwsV4:
928 retval_b = MkV4Authz(*info, &authz_headers);
929 break;
930 case kAuthzAzure:
931 retval_b = MkAzureAuthz(*info, &authz_headers);
932 break;
933 default:
934 PANIC(NULL);
935 }
936
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 if (!retval_b)
937 return kFailLocalIO;
938
2/2
✓ Branch 1 taken 4882 times.
✓ Branch 2 taken 1222 times.
6104 for (unsigned i = 0; i < authz_headers.size(); ++i) {
939 4882 info->http_headers =
940
1/2
✓ Branch 3 taken 4882 times.
✗ Branch 4 not taken.
4882 curl_slist_append(info->http_headers, authz_headers[i].c_str());
941 }
942
943 // Common headers
944 1222 info->http_headers =
945
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 curl_slist_append(info->http_headers, "Connection: Keep-Alive");
946
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 info->http_headers = curl_slist_append(info->http_headers, "Pragma:");
947 // No 100-continue
948
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 info->http_headers = curl_slist_append(info->http_headers, "Expect:");
949 // Strip unnecessary header
950
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 info->http_headers = curl_slist_append(info->http_headers, "Accept:");
951
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 info->http_headers = curl_slist_append(info->http_headers,
952 1222 user_agent_->c_str());
953
954 // Set curl parameters
955
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 retval = curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
956
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 assert(retval == CURLE_OK);
957
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 retval = curl_easy_setopt(handle, CURLOPT_HEADERDATA,
958 static_cast<void *>(info));
959
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 assert(retval == CURLE_OK);
960
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 retval = curl_easy_setopt(handle, CURLOPT_READDATA,
961 static_cast<void *>(info));
962
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 assert(retval == CURLE_OK);
963
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 retval = curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->http_headers);
964
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 assert(retval == CURLE_OK);
965
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 if (opt_ipv4_only_) {
966 retval = curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
967 assert(retval == CURLE_OK);
968 }
969 // Follow HTTP redirects
970
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 retval = curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L);
971
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 assert(retval == CURLE_OK);
972
973
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 retval = curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, info->errorbuffer);
974
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 assert(retval == CURLE_OK);
975
976
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1222 times.
1222 if (config_.protocol == "https") {
977 retval = curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 1L);
978 assert(retval == CURLE_OK);
979 retval = curl_easy_setopt(handle, CURLOPT_PROXY_SSL_VERIFYPEER, 1L);
980 assert(retval == CURLE_OK);
981 bool add_cert = ssl_certificate_store_.ApplySslCertificatePath(handle);
982 assert(add_cert);
983 }
984
985 1222 return kFailOk;
986 1222 }
987
988
989 /**
990 * Sets the URL specific options such as host to use and timeout.
991 */
992 1222 void S3FanoutManager::SetUrlOptions(JobInfo *info) const {
993 1222 CURL *curl_handle = info->curl_handle;
994 CURLcode retval;
995
996
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 retval = curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT,
997 config_.opt_timeout_sec);
998
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 assert(retval == CURLE_OK);
999
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT,
1000 kLowSpeedLimit);
1001
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 assert(retval == CURLE_OK);
1002
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME,
1003 config_.opt_timeout_sec);
1004
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 assert(retval == CURLE_OK);
1005
1006
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 if (is_curl_debug_) {
1007 retval = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE, 1);
1008 assert(retval == CURLE_OK);
1009 }
1010
1011
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 string url = MkUrl(info->object_key);
1012
1/2
✓ Branch 2 taken 1222 times.
✗ Branch 3 not taken.
1222 retval = curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
1013
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 assert(retval == CURLE_OK);
1014
1015
1/2
✓ Branch 2 taken 1222 times.
✗ Branch 3 not taken.
1222 retval = curl_easy_setopt(curl_handle, CURLOPT_PROXY, config_.proxy.c_str());
1016
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 assert(retval == CURLE_OK);
1017 1222 }
1018
1019
1020 /**
1021 * Adds transfer time and uploaded bytes to the global counters.
1022 */
1023 1226 void S3FanoutManager::UpdateStatistics(CURL *handle) {
1024 double val;
1025
1026
2/4
✓ Branch 1 taken 1226 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1226 times.
✗ Branch 4 not taken.
1226 if (curl_easy_getinfo(handle, CURLINFO_SIZE_UPLOAD, &val) == CURLE_OK)
1027 1226 statistics_->transferred_bytes += val;
1028 1226 }
1029
1030
1031 /**
1032 * Retry if possible and if not already done too often.
1033 */
1034 6 bool S3FanoutManager::CanRetry(const JobInfo *info) {
1035 return
1036 12 (info->error_code == kFailHostConnection ||
1037
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 info->error_code == kFailHostResolve ||
1038
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 info->error_code == kFailServiceUnavailable ||
1039
3/4
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
✓ Branch 3 taken 2 times.
16 info->error_code == kFailRetry) &&
1040
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
10 (info->num_retries < config_.opt_max_retries);
1041 }
1042
1043
1044 /**
1045 * Backoff for retry to introduce a jitter into a upload sequence.
1046 *
1047 * \return true if backoff has been performed, false otherwise
1048 */
1049 4 void S3FanoutManager::Backoff(JobInfo *info) {
1050
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
4 if (info->error_code != kFailRetry)
1051 info->num_retries++;
1052 4 statistics_->num_retries++;
1053
1054
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 if (info->throttle_ms > 0) {
1055 4 LogCvmfs(kLogS3Fanout, kLogDebug, "throttling for %d ms",
1056 info->throttle_ms);
1057 4 uint64_t now = platform_monotonic_time();
1058
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 if ((info->throttle_timestamp + (info->throttle_ms / 1000)) >= now) {
1059
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 3 times.
4 if ((now - timestamp_last_throttle_report_) > kThrottleReportIntervalSec)
1060 {
1061 1 LogCvmfs(kLogS3Fanout, kLogStdout,
1062 "Warning: S3 backend throttling %ums "
1063 "(total backoff time so far %ums)",
1064 info->throttle_ms,
1065 1 statistics_->ms_throttled);
1066 1 timestamp_last_throttle_report_ = now;
1067 }
1068 4 statistics_->ms_throttled += info->throttle_ms;
1069 4 SafeSleepMs(info->throttle_ms);
1070 }
1071 } else {
1072 if (info->backoff_ms == 0) {
1073 // Must be != 0
1074 info->backoff_ms = prng_.Next(config_.opt_backoff_init_ms + 1);
1075 } else {
1076 info->backoff_ms *= 2;
1077 }
1078 if (info->backoff_ms > config_.opt_backoff_max_ms)
1079 info->backoff_ms = config_.opt_backoff_max_ms;
1080
1081 LogCvmfs(kLogS3Fanout, kLogDebug, "backing off for %d ms",
1082 info->backoff_ms);
1083 SafeSleepMs(info->backoff_ms);
1084 }
1085 4 }
1086
1087
1088 /**
1089 * Checks the result of a curl request and implements the failure logic
1090 * and takes care of cleanup.
1091 *
1092 * @return true if request should be repeated, false otherwise
1093 */
1094 1226 bool S3FanoutManager::VerifyAndFinalize(const int curl_error, JobInfo *info) {
1095 1226 LogCvmfs(kLogS3Fanout, kLogDebug, "Verify uploaded/tested object %s "
1096 "(curl error %d, info error %d, info request %d)",
1097 info->object_key.c_str(),
1098 1226 curl_error, info->error_code, info->request);
1099 1226 UpdateStatistics(info->curl_handle);
1100
1101 // Verification and error classification
1102
1/6
✓ Branch 0 taken 1226 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
1226 switch (curl_error) {
1103 1226 case CURLE_OK:
1104
2/2
✓ Branch 0 taken 1222 times.
✓ Branch 1 taken 4 times.
1226 if ((info->error_code != kFailRetry) &&
1105
2/2
✓ Branch 0 taken 612 times.
✓ Branch 1 taken 610 times.
1222 (info->error_code != kFailNotFound))
1106 {
1107 612 info->error_code = kFailOk;
1108 }
1109 1226 break;
1110 case CURLE_UNSUPPORTED_PROTOCOL:
1111 case CURLE_URL_MALFORMAT:
1112 info->error_code = kFailBadRequest;
1113 break;
1114 case CURLE_COULDNT_RESOLVE_HOST:
1115 info->error_code = kFailHostResolve;
1116 break;
1117 case CURLE_COULDNT_CONNECT:
1118 case CURLE_OPERATION_TIMEDOUT:
1119 case CURLE_SEND_ERROR:
1120 case CURLE_RECV_ERROR:
1121 info->error_code = kFailHostConnection;
1122 break;
1123 case CURLE_ABORTED_BY_CALLBACK:
1124 case CURLE_WRITE_ERROR:
1125 // Error set by callback
1126 break;
1127 default:
1128 LogCvmfs(kLogS3Fanout, kLogStderr | kLogSyslogErr,
1129 "unexpected curl error (%d) while trying to upload %s: %s",
1130 curl_error, info->object_key.c_str(), info->errorbuffer);
1131 info->error_code = kFailOther;
1132 break;
1133 }
1134
1135 // Transform HEAD to PUT request
1136
2/2
✓ Branch 0 taken 610 times.
✓ Branch 1 taken 616 times.
1226 if ((info->error_code == kFailNotFound) &&
1137
2/2
✓ Branch 0 taken 608 times.
✓ Branch 1 taken 2 times.
610 (info->request == JobInfo::kReqHeadPut))
1138 {
1139 608 LogCvmfs(kLogS3Fanout, kLogDebug, "not found: %s, uploading",
1140 info->object_key.c_str());
1141 608 info->request = JobInfo::kReqPutCas;
1142 608 curl_slist_free_all(info->http_headers);
1143 608 info->http_headers = NULL;
1144 608 s3fanout::Failures init_failure = InitializeRequest(info,
1145 info->curl_handle);
1146
1147
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 608 times.
608 if (init_failure != s3fanout::kFailOk) {
1148 PANIC(kLogStderr,
1149 "Failed to initialize CURL handle "
1150 "(error: %d - %s | errno: %d)",
1151 init_failure, Code2Ascii(init_failure), errno);
1152 }
1153 608 SetUrlOptions(info);
1154 // Reset origin
1155 608 info->origin->Rewind();
1156 608 return true; // Again, Put
1157 }
1158
1159 // Determination if failed request should be repeated
1160 618 bool try_again = false;
1161
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 612 times.
618 if (info->error_code != kFailOk) {
1162 6 try_again = CanRetry(info);
1163 }
1164
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 614 times.
618 if (try_again) {
1165
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 if (info->request == JobInfo::kReqPutCas ||
1166
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 info->request == JobInfo::kReqPutDotCvmfs ||
1167
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
4 info->request == JobInfo::kReqPutHtml) {
1168 LogCvmfs(kLogS3Fanout, kLogDebug, "Trying again to upload %s",
1169 info->object_key.c_str());
1170 // Reset origin
1171 info->origin->Rewind();
1172 }
1173 4 Backoff(info);
1174 4 info->error_code = kFailOk;
1175 4 info->http_error = 0;
1176 4 info->throttle_ms = 0;
1177 4 info->backoff_ms = 0;
1178 4 info->throttle_timestamp = 0;
1179 4 return true; // try again
1180 }
1181
1182 // Cleanup opened resources
1183 614 info->origin.Destroy();
1184
1185
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 612 times.
614 if ((info->error_code != kFailOk) &&
1186
2/4
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
2 (info->http_error != 0) && (info->http_error != 404))
1187 {
1188 LogCvmfs(kLogS3Fanout, kLogStderr, "S3: HTTP failure %d", info->http_error);
1189 }
1190 614 return false; // stop transfer
1191 }
1192
1193
2/4
✓ Branch 3 taken 12 times.
✗ Branch 4 not taken.
✓ Branch 9 taken 12 times.
✗ Branch 10 not taken.
12 S3FanoutManager::S3FanoutManager(const S3Config &config) : config_(config) {
1194 12 atomic_init32(&multi_threaded_);
1195
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 MakePipe(pipe_terminate_);
1196
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 MakePipe(pipe_jobs_);
1197
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 MakePipe(pipe_completed_);
1198
1199 int retval;
1200 12 jobs_todo_lock_ =
1201 12 reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1202 12 retval = pthread_mutex_init(jobs_todo_lock_, NULL);
1203
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
12 assert(retval == 0);
1204 12 curl_handle_lock_ =
1205 12 reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1206 12 retval = pthread_mutex_init(curl_handle_lock_, NULL);
1207
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
12 assert(retval == 0);
1208
1209
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 active_requests_ = new set<JobInfo *>;
1210
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 pool_handles_idle_ = new set<CURL *>;
1211
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 pool_handles_inuse_ = new set<CURL *>;
1212
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 curl_sharehandles_ = new map<CURL *, S3FanOutDnsEntry *>;
1213
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 sharehandles_ = new set<S3FanOutDnsEntry *>;
1214 12 watch_fds_max_ = 4 * config_.pool_max_handles;
1215 12 max_available_jobs_ = 4 * config_.pool_max_handles;
1216
2/4
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 12 times.
✗ Branch 5 not taken.
12 available_jobs_ = new Semaphore(max_available_jobs_);
1217
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
12 assert(NULL != available_jobs_);
1218
1219
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 statistics_ = new Statistics();
1220
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 user_agent_ = new string();
1221
2/4
✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
12 *user_agent_ = "User-Agent: cvmfs " + string(VERSION);
1222
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 complete_hostname_ = MkCompleteHostname();
1223
1224
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 CURLcode cretval = curl_global_init(CURL_GLOBAL_ALL);
1225
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
12 assert(cretval == CURLE_OK);
1226
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 curl_multi_ = curl_multi_init();
1227
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
12 assert(curl_multi_ != NULL);
1228 CURLMcode mretval;
1229
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION,
1230 CallbackCurlSocket);
1231
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
12 assert(mretval == CURLM_OK);
1232
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1233 static_cast<void *>(this));
1234
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
12 assert(mretval == CURLM_OK);
1235
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 mretval = curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1236 config_.pool_max_handles);
1237
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
12 assert(mretval == CURLM_OK);
1238
1239 12 prng_.InitLocaltime();
1240
1241 12 thread_upload_ = 0;
1242 12 timestamp_last_throttle_report_ = 0;
1243 12 is_curl_debug_ = (getenv("_CVMFS_CURL_DEBUG") != NULL);
1244
1245 // Parsing environment variables
1246
2/4
✗ Branch 1 not taken.
✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 12 times.
12 if ((getenv("CVMFS_IPV4_ONLY") != NULL) &&
1247 (strlen(getenv("CVMFS_IPV4_ONLY")) > 0)) {
1248 opt_ipv4_only_ = true;
1249 } else {
1250 12 opt_ipv4_only_ = false;
1251 }
1252
1253
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 resolver_ = dns::CaresResolver::Create(opt_ipv4_only_, 2, 2000);
1254
1255 12 watch_fds_ = static_cast<struct pollfd *>(smalloc(4 * sizeof(struct pollfd)));
1256 12 watch_fds_size_ = 4;
1257 12 watch_fds_inuse_ = 0;
1258
1259
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 ssl_certificate_store_.UseSystemCertificatePath();
1260 12 }
1261
1262 24 S3FanoutManager::~S3FanoutManager() {
1263 12 pthread_mutex_destroy(jobs_todo_lock_);
1264 12 free(jobs_todo_lock_);
1265 12 pthread_mutex_destroy(curl_handle_lock_);
1266 12 free(curl_handle_lock_);
1267
1268
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1269 // Shutdown I/O thread
1270 12 char buf = 'T';
1271 12 WritePipe(pipe_terminate_[1], &buf, 1);
1272 12 pthread_join(thread_upload_, NULL);
1273 }
1274 12 ClosePipe(pipe_terminate_);
1275 12 ClosePipe(pipe_jobs_);
1276 12 ClosePipe(pipe_completed_);
1277
1278 12 set<CURL *>::iterator i = pool_handles_idle_->begin();
1279 12 const set<CURL *>::const_iterator iEnd = pool_handles_idle_->end();
1280
2/2
✓ Branch 2 taken 20 times.
✓ Branch 3 taken 12 times.
32 for (; i != iEnd; ++i) {
1281 20 curl_easy_cleanup(*i);
1282 }
1283
1284 12 set<S3FanOutDnsEntry *>::iterator is = sharehandles_->begin();
1285 12 const set<S3FanOutDnsEntry *>::const_iterator isEnd = sharehandles_->end();
1286
2/2
✓ Branch 2 taken 10 times.
✓ Branch 3 taken 12 times.
22 for (; is != isEnd; ++is) {
1287 10 curl_share_cleanup((*is)->sharehandle);
1288 10 curl_slist_free_all((*is)->clist);
1289
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 delete *is;
1290 }
1291 12 pool_handles_idle_->clear();
1292 12 curl_sharehandles_->clear();
1293 12 sharehandles_->clear();
1294
1/2
✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
12 delete active_requests_;
1295
1/2
✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
12 delete pool_handles_idle_;
1296
1/2
✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
12 delete pool_handles_inuse_;
1297
1/2
✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
12 delete curl_sharehandles_;
1298
1/2
✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
12 delete sharehandles_;
1299
1/2
✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
12 delete user_agent_;
1300 12 curl_multi_cleanup(curl_multi_);
1301
1302
1/2
✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
12 delete statistics_;
1303
1304
1/2
✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
12 delete available_jobs_;
1305
1306 12 curl_global_cleanup();
1307 12 }
1308
1309 /**
1310 * Spawns the I/O worker thread. No way back except ~S3FanoutManager.
1311 */
1312 12 void S3FanoutManager::Spawn() {
1313 12 LogCvmfs(kLogS3Fanout, kLogDebug, "S3FanoutManager spawned");
1314
1315 12 int retval = pthread_create(&thread_upload_, NULL, MainUpload,
1316 static_cast<void *>(this));
1317
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
12 assert(retval == 0);
1318
1319 12 atomic_inc32(&multi_threaded_);
1320 12 }
1321
1322 3 const Statistics &S3FanoutManager::GetStatistics() {
1323 3 return *statistics_;
1324 }
1325
1326 /**
1327 * Push new job to be uploaded to the S3 cloud storage.
1328 */
1329 614 void S3FanoutManager::PushNewJob(JobInfo *info) {
1330 614 available_jobs_->Increment();
1331 614 WritePipe(pipe_jobs_[1], &info, sizeof(info));
1332 614 }
1333
1334 /**
1335 * Push completed job to list of completed jobs
1336 */
1337 626 void S3FanoutManager::PushCompletedJob(JobInfo *info) {
1338 626 WritePipe(pipe_completed_[1], &info, sizeof(info));
1339 626 }
1340
1341 /**
1342 * Pop completed job
1343 */
1344 626 JobInfo *S3FanoutManager::PopCompletedJob() {
1345 JobInfo *info;
1346
1/2
✓ Branch 1 taken 626 times.
✗ Branch 2 not taken.
626 ReadPipe(pipe_completed_[0], &info, sizeof(info));
1347 626 return info;
1348 }
1349
1350 //------------------------------------------------------------------------------
1351
1352
1353 string Statistics::Print() const {
1354 return
1355 "Transferred Bytes: " +
1356 StringifyInt(uint64_t(transferred_bytes)) + "\n" +
1357 "Transfer duration: " +
1358 StringifyInt(uint64_t(transfer_time)) + " s\n" +
1359 "Number of requests: " +
1360 StringifyInt(num_requests) + "\n" +
1361 "Number of retries: " +
1362 StringifyInt(num_retries) + "\n";
1363 }
1364
1365 } // namespace s3fanout
1366