GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/network/s3fanout.cc
Date: 2025-11-09 02:35:23
Exec Total Coverage
Lines: 573 816 70.2%
Branches: 412 1180 34.9%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 *
4 * Runs a thread using libcurls asynchronous I/O mode to push data to S3
5 */
6
7 #include "s3fanout.h"
8
9 #include <pthread.h>
10
11 #include <algorithm>
12 #include <cassert>
13 #include <cerrno>
14 #include <utility>
15
16 #include "upload_facility.h"
17 #include "util/concurrency.h"
18 #include "util/exception.h"
19 #include "util/platform.h"
20 #include "util/posix.h"
21 #include "util/string.h"
22
23 using namespace std; // NOLINT
24
25 namespace s3fanout {
26
27 const char *S3FanoutManager::kCacheControlCas = "Cache-Control: max-age=259200";
28 const unsigned S3FanoutManager::kDefault429ThrottleMs = 250;
29 const unsigned S3FanoutManager::kMax429ThrottleMs = 10000;
30 const unsigned S3FanoutManager::kThrottleReportIntervalSec = 10;
31 const unsigned S3FanoutManager::kDefaultHTTPPort = 80;
32 const unsigned S3FanoutManager::kDefaultHTTPSPort = 443;
33
34 264 std::string S3FanoutManager::MkDotCvmfsCacheControlHeader(unsigned defaultMaxAge, int overrideMaxAge)
35 {
36 const char *var;
37 int max_age_sec;
38 char *at_null_terminator_if_number;
39 264 bool value_determined = false;
40
41
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 264 times.
264 if (overrideMaxAge >= 0) {
42 max_age_sec = overrideMaxAge;
43 value_determined = true;
44 }
45
46
1/2
✓ Branch 0 taken 264 times.
✗ Branch 1 not taken.
264 if (!value_determined) {
47 264 var = getenv("CVMFS_MAX_TTL_SECS");
48
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
264 if (var && var[0]) {
49 max_age_sec = strtoll(var, &at_null_terminator_if_number, 10);
50 if (*at_null_terminator_if_number == '\0'
51 && max_age_sec >= 0) {
52 value_determined = true;
53 }
54 }
55 }
56
57
1/2
✓ Branch 0 taken 264 times.
✗ Branch 1 not taken.
264 if (!value_determined) {
58 264 var = getenv("CVMFS_MAX_TTL");
59
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
264 if (var && var[0]) {
60 max_age_sec = strtoll(var, &at_null_terminator_if_number, 10);
61 if (*at_null_terminator_if_number == '\0'
62 && max_age_sec >= 0) {
63 max_age_sec *= 60;
64 value_determined = true;
65 }
66 }
67 }
68
69
1/2
✓ Branch 0 taken 264 times.
✗ Branch 1 not taken.
264 if (!value_determined) {
70 264 max_age_sec = defaultMaxAge;
71 }
72
73
2/4
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 264 times.
✗ Branch 5 not taken.
528 return "Cache-Control: max-age=" + std::to_string(max_age_sec);
74 }
75
76 /**
77 * Parses Retry-After and X-Retry-In headers attached to HTTP 429 responses
78 */
79 670 void S3FanoutManager::DetectThrottleIndicator(const std::string &header,
80 JobInfo *info) {
81 670 std::string value_str;
82
4/6
✓ Branch 2 taken 670 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 670 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 291 times.
✓ Branch 10 taken 379 times.
670 if (HasPrefix(header, "retry-after:", true))
83
1/2
✓ Branch 1 taken 291 times.
✗ Branch 2 not taken.
291 value_str = header.substr(12);
84
4/6
✓ Branch 2 taken 670 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 670 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 145 times.
✓ Branch 10 taken 525 times.
670 if (HasPrefix(header, "x-retry-in:", true))
85
1/2
✓ Branch 1 taken 145 times.
✗ Branch 2 not taken.
145 value_str = header.substr(11);
86
87
1/2
✓ Branch 1 taken 670 times.
✗ Branch 2 not taken.
670 value_str = Trim(value_str, true /* trim_newline */);
88
2/2
✓ Branch 1 taken 378 times.
✓ Branch 2 taken 292 times.
670 if (!value_str.empty()) {
89
1/2
✓ Branch 1 taken 378 times.
✗ Branch 2 not taken.
378 const unsigned value_numeric = String2Uint64(value_str);
90
2/4
✓ Branch 2 taken 378 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 378 times.
✗ Branch 6 not taken.
756 const unsigned value_ms = HasSuffix(value_str, "ms", true /* ignore_case */)
91
2/2
✓ Branch 0 taken 145 times.
✓ Branch 1 taken 233 times.
378 ? value_numeric
92 378 : (value_numeric * 1000);
93
2/2
✓ Branch 0 taken 349 times.
✓ Branch 1 taken 29 times.
378 if (value_ms > 0)
94 349 info->throttle_ms = std::min(value_ms, kMax429ThrottleMs);
95 }
96 670 }
97
98
99 /**
100 * Called by curl for every HTTP header. Not called for file:// transfers.
101 */
102 81004 static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb,
103 void *info_link) {
104 81004 const size_t num_bytes = size * nmemb;
105
1/2
✓ Branch 2 taken 81004 times.
✗ Branch 3 not taken.
81004 const string header_line(static_cast<const char *>(ptr), num_bytes);
106 81004 JobInfo *info = static_cast<JobInfo *>(info_link);
107
108 // Check for http status code errors
109
4/6
✓ Branch 2 taken 81004 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 81004 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 26972 times.
✓ Branch 10 taken 54032 times.
81004 if (HasPrefix(header_line, "HTTP/1.", false)) {
110
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 26972 times.
26972 if (header_line.length() < 10)
111 return 0;
112
113 unsigned i;
114
5/6
✓ Branch 1 taken 53944 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 26972 times.
✓ Branch 5 taken 26972 times.
✓ Branch 6 taken 26972 times.
✓ Branch 7 taken 26972 times.
53944 for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {
115 }
116
117
2/2
✓ Branch 1 taken 13464 times.
✓ Branch 2 taken 13508 times.
26972 if (header_line[i] == '2') {
118 13464 return num_bytes;
119 } else {
120
1/2
✓ Branch 2 taken 13508 times.
✗ Branch 3 not taken.
13508 LogCvmfs(kLogS3Fanout, kLogDebug, "http status error code [info %p]: %s",
121 info, header_line.c_str());
122
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 13508 times.
13508 if (header_line.length() < i + 3) {
123 LogCvmfs(kLogS3Fanout, kLogStderr, "S3: invalid HTTP response '%s'",
124 header_line.c_str());
125 info->error_code = kFailOther;
126 return 0;
127 }
128
2/4
✓ Branch 3 taken 13508 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 13508 times.
✗ Branch 7 not taken.
13508 info->http_error = String2Int64(string(&header_line[i], 3));
129
130
2/7
✓ Branch 0 taken 88 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 13420 times.
✗ Branch 6 not taken.
13508 switch (info->http_error) {
131 88 case 429:
132 88 info->error_code = kFailRetry;
133 88 info->throttle_ms = S3FanoutManager::kDefault429ThrottleMs;
134 88 info->throttle_timestamp = platform_monotonic_time();
135 88 return num_bytes;
136 case 507: // Insufficient Storage
137 info->error_code = kFailInsufficientStorage;
138 break;
139 case 503:
140 case 502: // Can happen if the S3 gateway-backend connection breaks
141 case 500: // sometimes see this as a transient error from S3
142 info->error_code = kFailServiceUnavailable;
143 break;
144 case 501:
145 case 400:
146 info->error_code = kFailBadRequest;
147 break;
148 case 403:
149 info->error_code = kFailForbidden;
150 break;
151 13420 case 404:
152 13420 info->error_code = kFailNotFound;
153 13420 return num_bytes;
154 default:
155 info->error_code = kFailOther;
156 }
157 return 0;
158 }
159 }
160
161
2/2
✓ Branch 0 taken 264 times.
✓ Branch 1 taken 53768 times.
54032 if (info->error_code == kFailRetry) {
162
1/2
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
264 S3FanoutManager::DetectThrottleIndicator(header_line, info);
163 }
164
165 54032 return num_bytes;
166 81004 }
167
168
169 /**
170 * Called by curl for every new chunk to upload.
171 */
172 344718 static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb,
173 void *info_link) {
174 344718 const size_t num_bytes = size * nmemb;
175 344718 JobInfo *info = static_cast<JobInfo *>(info_link);
176
177 344718 LogCvmfs(kLogS3Fanout, kLogDebug, "Data callback with %zu bytes", num_bytes);
178
179
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 344718 times.
344718 if (num_bytes == 0)
180 return 0;
181
182 344718 const uint64_t read_bytes = info->origin->Read(ptr, num_bytes);
183
184 344718 LogCvmfs(kLogS3Fanout, kLogDebug, "source buffer pushed out %lu bytes",
185 read_bytes);
186
187 344718 return read_bytes;
188 }
189
190
191 /**
192 * For the time being, ignore all received information in the HTTP body
193 */
194 static size_t CallbackCurlBody(char * /*ptr*/, size_t size, size_t nmemb,
195 void * /*userdata*/) {
196 return size * nmemb;
197 }
198
199
200 /**
201 * Called when new curl sockets arrive or existing curl sockets depart.
202 */
203 59818 int S3FanoutManager::CallbackCurlSocket(CURL *easy, curl_socket_t s, int action,
204 void *userp, void *socketp) {
205 59818 S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(userp);
206 59818 LogCvmfs(kLogS3Fanout, kLogDebug,
207 "CallbackCurlSocket called with easy "
208 "handle %p, socket %d, action %d, up %p, "
209 "sp %p, fds_inuse %d, jobs %d",
210 easy, s, action, userp, socketp, s3fanout_mgr->watch_fds_inuse_,
211 59818 s3fanout_mgr->available_jobs_->Get());
212
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 59818 times.
59818 if (action == CURL_POLL_NONE)
213 return 0;
214
215 // Find s in watch_fds_
216 // First 2 fds are job and terminate pipes (not curl related)
217 unsigned index;
218
2/2
✓ Branch 0 taken 94182 times.
✓ Branch 1 taken 26972 times.
121154 for (index = 2; index < s3fanout_mgr->watch_fds_inuse_; ++index) {
219
2/2
✓ Branch 0 taken 32846 times.
✓ Branch 1 taken 61336 times.
94182 if (s3fanout_mgr->watch_fds_[index].fd == s)
220 32846 break;
221 }
222 // Or create newly
223
2/2
✓ Branch 0 taken 26972 times.
✓ Branch 1 taken 32846 times.
59818 if (index == s3fanout_mgr->watch_fds_inuse_) {
224 // Extend array if necessary
225
2/2
✓ Branch 0 taken 44 times.
✓ Branch 1 taken 26928 times.
26972 if (s3fanout_mgr->watch_fds_inuse_ == s3fanout_mgr->watch_fds_size_) {
226 44 s3fanout_mgr->watch_fds_size_ *= 2;
227 44 s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
228 44 srealloc(s3fanout_mgr->watch_fds_,
229 44 s3fanout_mgr->watch_fds_size_ * sizeof(struct pollfd)));
230 }
231 26972 s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].fd = s;
232 26972 s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].events = 0;
233 26972 s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].revents = 0;
234 26972 s3fanout_mgr->watch_fds_inuse_++;
235 }
236
237
4/5
✓ Branch 0 taken 26972 times.
✓ Branch 1 taken 66 times.
✓ Branch 2 taken 5808 times.
✓ Branch 3 taken 26972 times.
✗ Branch 4 not taken.
59818 switch (action) {
238 26972 case CURL_POLL_IN:
239 26972 s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI;
240 26972 break;
241 66 case CURL_POLL_OUT:
242 66 s3fanout_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND;
243 66 break;
244 5808 case CURL_POLL_INOUT:
245 5808 s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI | POLLOUT
246 | POLLWRBAND;
247 5808 break;
248 26972 case CURL_POLL_REMOVE:
249
2/2
✓ Branch 0 taken 4224 times.
✓ Branch 1 taken 22748 times.
26972 if (index < s3fanout_mgr->watch_fds_inuse_ - 1)
250 s3fanout_mgr
251 4224 ->watch_fds_[index] = s3fanout_mgr->watch_fds_
252 4224 [s3fanout_mgr->watch_fds_inuse_ - 1];
253 26972 s3fanout_mgr->watch_fds_inuse_--;
254 // Shrink array if necessary
255
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26972 times.
26972 if ((s3fanout_mgr->watch_fds_inuse_ > s3fanout_mgr->watch_fds_max_)
256 && (s3fanout_mgr->watch_fds_inuse_
257 < s3fanout_mgr->watch_fds_size_ / 2)) {
258 s3fanout_mgr->watch_fds_size_ /= 2;
259 s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
260 srealloc(s3fanout_mgr->watch_fds_,
261 s3fanout_mgr->watch_fds_size_ * sizeof(struct pollfd)));
262 }
263 26972 break;
264 default:
265 PANIC(NULL);
266 }
267
268 59818 return 0;
269 }
270
271
272 /**
273 * Worker thread event loop.
274 */
275 264 void *S3FanoutManager::MainUpload(void *data) {
276
1/2
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
264 LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread started");
277 264 S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(data);
278
279 264 s3fanout_mgr->InitPipeWatchFds();
280
281 // Don't schedule more jobs into the multi handle than the maximum number of
282 // parallel connections. This should prevent starvation and thus a timeout
283 // of the authorization header (CVM-1339).
284 264 unsigned jobs_in_flight = 0;
285
286 while (true) {
287 // Check events with 100ms timeout
288 369116 const int timeout_ms = 100;
289
1/2
✓ Branch 1 taken 369116 times.
✗ Branch 2 not taken.
369116 int retval = poll(s3fanout_mgr->watch_fds_, s3fanout_mgr->watch_fds_inuse_,
290 timeout_ms);
291
2/2
✓ Branch 0 taken 2046 times.
✓ Branch 1 taken 367070 times.
369116 if (retval == 0) {
292 // Handle timeout
293 2046 int still_running = 0;
294
1/2
✓ Branch 1 taken 2046 times.
✗ Branch 2 not taken.
2046 retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
295 CURL_SOCKET_TIMEOUT, 0, &still_running);
296
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2046 times.
2046 if (retval != CURLM_OK) {
297 LogCvmfs(kLogS3Fanout, kLogStderr, "Error, timeout due to: %d", retval);
298 assert(retval == CURLM_OK);
299 }
300
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 367070 times.
367070 } else if (retval < 0) {
301 assert(errno == EINTR);
302 continue;
303 }
304
305 // Terminate I/O thread
306
2/2
✓ Branch 0 taken 264 times.
✓ Branch 1 taken 368852 times.
369116 if (s3fanout_mgr->watch_fds_[0].revents)
307 264 break;
308
309 // New job incoming
310
2/2
✓ Branch 0 taken 13508 times.
✓ Branch 1 taken 355344 times.
368852 if (s3fanout_mgr->watch_fds_[1].revents) {
311 13508 s3fanout_mgr->watch_fds_[1].revents = 0;
312 JobInfo *info;
313
1/2
✓ Branch 1 taken 13508 times.
✗ Branch 2 not taken.
13508 ReadPipe(s3fanout_mgr->pipe_jobs_[0], &info, sizeof(info));
314
1/2
✓ Branch 1 taken 13508 times.
✗ Branch 2 not taken.
13508 CURL *handle = s3fanout_mgr->AcquireCurlHandle();
315
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13508 times.
13508 if (handle == NULL) {
316 PANIC(kLogStderr, "Failed to acquire CURL handle.");
317 }
318
1/2
✓ Branch 1 taken 13508 times.
✗ Branch 2 not taken.
13508 const s3fanout::Failures init_failure = s3fanout_mgr->InitializeRequest(
319 info, handle);
320
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13508 times.
13508 if (init_failure != s3fanout::kFailOk) {
321 PANIC(kLogStderr,
322 "Failed to initialize CURL handle (error: %d - %s | errno: %d)",
323 init_failure, Code2Ascii(init_failure), errno);
324 }
325
1/2
✓ Branch 1 taken 13508 times.
✗ Branch 2 not taken.
13508 s3fanout_mgr->SetUrlOptions(info);
326
327
1/2
✓ Branch 1 taken 13508 times.
✗ Branch 2 not taken.
13508 curl_multi_add_handle(s3fanout_mgr->curl_multi_, handle);
328
1/2
✓ Branch 1 taken 13508 times.
✗ Branch 2 not taken.
13508 s3fanout_mgr->active_requests_->insert(info);
329 13508 jobs_in_flight++;
330 13508 int still_running = 0, retval = 0;
331
1/2
✓ Branch 1 taken 13508 times.
✗ Branch 2 not taken.
13508 retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
332 CURL_SOCKET_TIMEOUT, 0, &still_running);
333
334
1/2
✓ Branch 1 taken 13508 times.
✗ Branch 2 not taken.
13508 LogCvmfs(kLogS3Fanout, kLogDebug, "curl_multi_socket_action: %d - %d",
335 retval, still_running);
336 }
337
338
339 // Activity on curl sockets
340 // Within this loop the curl_multi_socket_action() may cause socket(s)
341 // to be removed from watch_fds_. If a socket is removed it is replaced
342 // by the socket at the end of the array and the inuse count is decreased.
343 // Therefore loop over the array in reverse order.
344 // First 2 fds are job and terminate pipes (not curl related)
345
2/2
✓ Branch 0 taken 833360 times.
✓ Branch 1 taken 368852 times.
1202212 for (int32_t i = s3fanout_mgr->watch_fds_inuse_ - 1; i >= 2; --i) {
346
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 833360 times.
833360 if (static_cast<uint32_t>(i) >= s3fanout_mgr->watch_fds_inuse_) {
347 continue;
348 }
349
2/2
✓ Branch 0 taken 375452 times.
✓ Branch 1 taken 457908 times.
833360 if (s3fanout_mgr->watch_fds_[i].revents) {
350 375452 int ev_bitmask = 0;
351
2/2
✓ Branch 0 taken 40348 times.
✓ Branch 1 taken 335104 times.
375452 if (s3fanout_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI))
352 40348 ev_bitmask |= CURL_CSELECT_IN;
353
2/2
✓ Branch 0 taken 335104 times.
✓ Branch 1 taken 40348 times.
375452 if (s3fanout_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
354 335104 ev_bitmask |= CURL_CSELECT_OUT;
355 375452 if (s3fanout_mgr->watch_fds_[i].revents
356
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 375452 times.
375452 & (POLLERR | POLLHUP | POLLNVAL))
357 ev_bitmask |= CURL_CSELECT_ERR;
358 375452 s3fanout_mgr->watch_fds_[i].revents = 0;
359
360 375452 int still_running = 0;
361 375452 retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
362
1/2
✓ Branch 1 taken 375452 times.
✗ Branch 2 not taken.
375452 s3fanout_mgr->watch_fds_[i].fd,
363 ev_bitmask,
364 &still_running);
365 }
366 }
367
368 // Check if transfers are completed
369 CURLMsg *curl_msg;
370 int msgs_in_queue;
371
3/4
✓ Branch 1 taken 395824 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 26972 times.
✓ Branch 4 taken 368852 times.
395824 while ((curl_msg = curl_multi_info_read(s3fanout_mgr->curl_multi_,
372 &msgs_in_queue))) {
373
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26972 times.
26972 assert(curl_msg->msg == CURLMSG_DONE);
374
375 26972 s3fanout_mgr->statistics_->num_requests++;
376 JobInfo *info;
377 26972 CURL *easy_handle = curl_msg->easy_handle;
378 26972 const int curl_error = curl_msg->data.result;
379
1/2
✓ Branch 1 taken 26972 times.
✗ Branch 2 not taken.
26972 curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
380
381
1/2
✓ Branch 1 taken 26972 times.
✗ Branch 2 not taken.
26972 curl_multi_remove_handle(s3fanout_mgr->curl_multi_, easy_handle);
382
3/4
✓ Branch 1 taken 26972 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 13464 times.
✓ Branch 4 taken 13508 times.
26972 if (s3fanout_mgr->VerifyAndFinalize(curl_error, info)) {
383
1/2
✓ Branch 1 taken 13464 times.
✗ Branch 2 not taken.
13464 curl_multi_add_handle(s3fanout_mgr->curl_multi_, easy_handle);
384 13464 int still_running = 0;
385
1/2
✓ Branch 1 taken 13464 times.
✗ Branch 2 not taken.
13464 curl_multi_socket_action(s3fanout_mgr->curl_multi_, CURL_SOCKET_TIMEOUT,
386 0, &still_running);
387 } else {
388 // Return easy handle into pool and write result back
389 13508 jobs_in_flight--;
390
1/2
✓ Branch 1 taken 13508 times.
✗ Branch 2 not taken.
13508 s3fanout_mgr->active_requests_->erase(info);
391
1/2
✓ Branch 1 taken 13508 times.
✗ Branch 2 not taken.
13508 s3fanout_mgr->ReleaseCurlHandle(info, easy_handle);
392
1/2
✓ Branch 1 taken 13508 times.
✗ Branch 2 not taken.
13508 s3fanout_mgr->available_jobs_->Decrement();
393
394 // Add to list of completed jobs
395
1/2
✓ Branch 1 taken 13508 times.
✗ Branch 2 not taken.
13508 s3fanout_mgr->PushCompletedJob(info);
396 }
397 }
398 368852 }
399
400 264 set<CURL *>::iterator i = s3fanout_mgr->pool_handles_inuse_->begin();
401 264 const set<CURL *>::const_iterator i_end = s3fanout_mgr->pool_handles_inuse_
402 264 ->end();
403
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 264 times.
264 for (; i != i_end; ++i) {
404 curl_multi_remove_handle(s3fanout_mgr->curl_multi_, *i);
405 curl_easy_cleanup(*i);
406 }
407 264 s3fanout_mgr->pool_handles_inuse_->clear();
408 264 free(s3fanout_mgr->watch_fds_);
409
410
1/2
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
264 LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread terminated");
411 264 return NULL;
412 }
413
414
415 /**
416 * Gets an idle CURL handle from the pool. Creates a new one and adds it to
417 * the pool if necessary.
418 */
419 13508 CURL *S3FanoutManager::AcquireCurlHandle() const {
420 CURL *handle;
421
422 13508 const MutexLockGuard guard(curl_handle_lock_);
423
424
2/2
✓ Branch 1 taken 1078 times.
✓ Branch 2 taken 12430 times.
13508 if (pool_handles_idle_->empty()) {
425 CURLcode retval;
426
427 // Create a new handle
428
1/2
✓ Branch 1 taken 1078 times.
✗ Branch 2 not taken.
1078 handle = curl_easy_init();
429
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1078 times.
1078 assert(handle != NULL);
430
431 // Other settings
432
1/2
✓ Branch 1 taken 1078 times.
✗ Branch 2 not taken.
1078 retval = curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
433
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1078 times.
1078 assert(retval == CURLE_OK);
434
1/2
✓ Branch 1 taken 1078 times.
✗ Branch 2 not taken.
1078 retval = curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION,
435 CallbackCurlHeader);
436
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1078 times.
1078 assert(retval == CURLE_OK);
437
1/2
✓ Branch 1 taken 1078 times.
✗ Branch 2 not taken.
1078 retval = curl_easy_setopt(handle, CURLOPT_READFUNCTION, CallbackCurlData);
438
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1078 times.
1078 assert(retval == CURLE_OK);
439
1/2
✓ Branch 1 taken 1078 times.
✗ Branch 2 not taken.
1078 retval = curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlBody);
440
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1078 times.
1078 assert(retval == CURLE_OK);
441 } else {
442 12430 handle = *(pool_handles_idle_->begin());
443
1/2
✓ Branch 2 taken 12430 times.
✗ Branch 3 not taken.
12430 pool_handles_idle_->erase(pool_handles_idle_->begin());
444 }
445
446
1/2
✓ Branch 1 taken 13508 times.
✗ Branch 2 not taken.
13508 pool_handles_inuse_->insert(handle);
447
448 13508 return handle;
449 13508 }
450
451
452 13508 void S3FanoutManager::ReleaseCurlHandle(JobInfo *info, CURL *handle) const {
453
1/2
✓ Branch 0 taken 13508 times.
✗ Branch 1 not taken.
13508 if (info->http_headers) {
454
1/2
✓ Branch 1 taken 13508 times.
✗ Branch 2 not taken.
13508 curl_slist_free_all(info->http_headers);
455 13508 info->http_headers = NULL;
456 }
457
458 13508 const MutexLockGuard guard(curl_handle_lock_);
459
460
1/2
✓ Branch 1 taken 13508 times.
✗ Branch 2 not taken.
13508 const set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
461
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 13508 times.
13508 assert(elem != pool_handles_inuse_->end());
462
463
2/2
✓ Branch 1 taken 638 times.
✓ Branch 2 taken 12870 times.
13508 if (pool_handles_idle_->size() > config_.pool_max_handles) {
464
1/2
✓ Branch 1 taken 638 times.
✗ Branch 2 not taken.
638 const CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, NULL);
465
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 638 times.
638 assert(retval == CURLE_OK);
466
1/2
✓ Branch 1 taken 638 times.
✗ Branch 2 not taken.
638 curl_easy_cleanup(handle);
467 const std::map<CURL *, S3FanOutDnsEntry *>::size_type
468
1/2
✓ Branch 1 taken 638 times.
✗ Branch 2 not taken.
638 retitems = curl_sharehandles_->erase(handle);
469
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 638 times.
638 assert(retitems == 1);
470 } else {
471
1/2
✓ Branch 1 taken 12870 times.
✗ Branch 2 not taken.
12870 pool_handles_idle_->insert(handle);
472 }
473
474
1/2
✓ Branch 1 taken 13508 times.
✗ Branch 2 not taken.
13508 pool_handles_inuse_->erase(elem);
475 13508 }
476
477 264 void S3FanoutManager::InitPipeWatchFds() {
478
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 264 times.
264 assert(watch_fds_inuse_ == 0);
479
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 264 times.
264 assert(watch_fds_size_ >= 2);
480 264 watch_fds_[0].fd = pipe_terminate_[0];
481 264 watch_fds_[0].events = POLLIN | POLLPRI;
482 264 watch_fds_[0].revents = 0;
483 264 ++watch_fds_inuse_;
484 264 watch_fds_[1].fd = pipe_jobs_[0];
485 264 watch_fds_[1].events = POLLIN | POLLPRI;
486 264 watch_fds_[1].revents = 0;
487 264 ++watch_fds_inuse_;
488 264 }
489
490 /**
491 * The Amazon AWS 2 authorization header according to
492 * http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html#ConstructingTheAuthenticationHeader
493 */
494 26884 bool S3FanoutManager::MkV2Authz(const JobInfo &info,
495 vector<string> *headers) const {
496 26884 string payload_hash;
497
1/2
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
26884 const bool retval = MkPayloadHash(info, &payload_hash);
498
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26884 times.
26884 if (!retval)
499 return false;
500
1/2
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
26884 const string content_type = GetContentType(info);
501
1/2
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
26884 const string request = GetRequestString(info);
502
503
1/2
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
26884 const string timestamp = RfcTimestamp();
504
5/10
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 26884 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 26884 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 26884 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 26884 times.
✗ Branch 14 not taken.
53768 string to_sign = request + "\n" + payload_hash + "\n" + content_type + "\n"
505
2/4
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 26884 times.
✗ Branch 5 not taken.
53768 + timestamp + "\n";
506
2/4
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 26884 times.
✗ Branch 4 not taken.
26884 if (config_.x_amz_acl != "") {
507
2/4
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 26884 times.
✗ Branch 5 not taken.
53768 to_sign += "x-amz-acl:" + config_.x_amz_acl + "\n" + // default ACL
508
5/10
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 26884 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 26884 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 26884 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 26884 times.
✗ Branch 14 not taken.
53768 "/" + config_.bucket + "/" + info.object_key;
509 }
510
1/2
✓ Branch 3 taken 26884 times.
✗ Branch 4 not taken.
26884 LogCvmfs(kLogS3Fanout, kLogDebug, "%s string to sign for: %s",
511 request.c_str(), info.object_key.c_str());
512
513
1/2
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
26884 shash::Any hmac;
514 26884 hmac.algorithm = shash::kSha1;
515
1/2
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
26884 shash::Hmac(config_.secret_key,
516 26884 reinterpret_cast<const unsigned char *>(to_sign.data()),
517 26884 to_sign.length(), &hmac);
518
519
3/6
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 26884 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 26884 times.
✗ Branch 8 not taken.
80652 headers->push_back("Authorization: AWS " + config_.access_key + ":"
520
3/6
✓ Branch 2 taken 26884 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 26884 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 26884 times.
✗ Branch 9 not taken.
134420 + Base64(string(reinterpret_cast<char *>(hmac.digest),
521 26884 hmac.GetDigestSize())));
522
2/4
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 26884 times.
✗ Branch 5 not taken.
26884 headers->push_back("Date: " + timestamp);
523
2/4
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 26884 times.
✗ Branch 5 not taken.
26884 headers->push_back("X-Amz-Acl: " + config_.x_amz_acl);
524
2/2
✓ Branch 1 taken 13376 times.
✓ Branch 2 taken 13508 times.
26884 if (!payload_hash.empty())
525
2/4
✓ Branch 1 taken 13376 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 13376 times.
✗ Branch 5 not taken.
13376 headers->push_back("Content-MD5: " + payload_hash);
526
2/2
✓ Branch 1 taken 13376 times.
✓ Branch 2 taken 13508 times.
26884 if (!content_type.empty())
527
2/4
✓ Branch 1 taken 13376 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 13376 times.
✗ Branch 5 not taken.
13376 headers->push_back("Content-Type: " + content_type);
528 26884 return true;
529 26884 }
530
531
532 string S3FanoutManager::GetUriEncode(const string &val,
533 bool encode_slash) const {
534 string result;
535 const unsigned len = val.length();
536 result.reserve(len);
537 for (unsigned i = 0; i < len; ++i) {
538 const char c = val[i];
539 if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')
540 || (c >= '0' && c <= '9') || c == '_' || c == '-' || c == '~'
541 || c == '.') {
542 result.push_back(c);
543 } else if (c == '/') {
544 if (encode_slash) {
545 result += "%2F";
546 } else {
547 result.push_back(c);
548 }
549 } else {
550 result.push_back('%');
551 result.push_back((c / 16) + ((c / 16 <= 9) ? '0' : 'A' - 10));
552 result.push_back((c % 16) + ((c % 16 <= 9) ? '0' : 'A' - 10));
553 }
554 }
555 return result;
556 }
557
558
559 string S3FanoutManager::GetAwsV4SigningKey(const string &date) const {
560 if (last_signing_key_.first == date)
561 return last_signing_key_.second;
562
563 const string date_key = shash::Hmac256("AWS4" + config_.secret_key, date,
564 true);
565 const string date_region_key = shash::Hmac256(date_key, config_.region, true);
566 const string date_region_service_key = shash::Hmac256(date_region_key, "s3",
567 true);
568 string signing_key = shash::Hmac256(date_region_service_key, "aws4_request",
569 true);
570 last_signing_key_.first = date;
571 last_signing_key_.second = signing_key;
572 return signing_key;
573 }
574
575
576 /**
577 * The Amazon AWS4 authorization header according to
578 * http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-auth-using-authorization-header.html
579 */
580 bool S3FanoutManager::MkV4Authz(const JobInfo &info,
581 vector<string> *headers) const {
582 string payload_hash;
583 const bool retval = MkPayloadHash(info, &payload_hash);
584 if (!retval)
585 return false;
586 const string content_type = GetContentType(info);
587 const string timestamp = IsoTimestamp();
588 const string date = timestamp.substr(0, 8);
589 vector<string> tokens = SplitString(complete_hostname_, ':');
590 assert(tokens.size() <= 2);
591 string canonical_hostname = tokens[0];
592
593 // if we could split the hostname in two and if the port is *NOT* a default
594 // one
595 if (tokens.size() == 2
596 && !((String2Uint64(tokens[1]) == kDefaultHTTPPort)
597 || (String2Uint64(tokens[1]) == kDefaultHTTPSPort)))
598 canonical_hostname += ":" + tokens[1];
599
600 string signed_headers;
601 string canonical_headers;
602 if (!content_type.empty()) {
603 signed_headers += "content-type;";
604 headers->push_back("Content-Type: " + content_type);
605 canonical_headers += "content-type:" + content_type + "\n";
606 }
607 if (config_.x_amz_acl != "") {
608 signed_headers += "host;x-amz-acl;x-amz-content-sha256;x-amz-date";
609 } else {
610 signed_headers += "host;x-amz-content-sha256;x-amz-date";
611 }
612 canonical_headers += "host:" + canonical_hostname + "\n";
613 if (config_.x_amz_acl != "") {
614 canonical_headers += "x-amz-acl:" + config_.x_amz_acl + "\n";
615 }
616 canonical_headers += "x-amz-content-sha256:" + payload_hash + "\n"
617 + "x-amz-date:" + timestamp + "\n";
618
619 const string scope = date + "/" + config_.region + "/s3/aws4_request";
620 const string uri = config_.dns_buckets ? (string("/") + info.object_key)
621 : (string("/") + config_.bucket + "/"
622 + info.object_key);
623
624 const string canonical_request = GetRequestString(info) + "\n"
625 + GetUriEncode(uri, false) + "\n" + "\n"
626 + canonical_headers + "\n" + signed_headers
627 + "\n" + payload_hash;
628
629 const string hash_request = shash::Sha256String(canonical_request.c_str());
630
631 const string string_to_sign = "AWS4-HMAC-SHA256\n" + timestamp + "\n" + scope
632 + "\n" + hash_request;
633
634 const string signing_key = GetAwsV4SigningKey(date);
635 const string signature = shash::Hmac256(signing_key, string_to_sign);
636
637 headers->push_back("X-Amz-Acl: " + config_.x_amz_acl);
638 headers->push_back("X-Amz-Content-Sha256: " + payload_hash);
639 headers->push_back("X-Amz-Date: " + timestamp);
640 headers->push_back("Authorization: AWS4-HMAC-SHA256 "
641 "Credential="
642 + config_.access_key + "/" + scope
643 + ","
644 "SignedHeaders="
645 + signed_headers
646 + ","
647 "Signature="
648 + signature);
649 return true;
650 }
651
652 /**
653 * The Azure Blob authorization header according to
654 * https://docs.microsoft.com/en-us/rest/api/storageservices/authorize-with-shared-key
655 */
656 bool S3FanoutManager::MkAzureAuthz(const JobInfo &info,
657 vector<string> *headers) const {
658 const string timestamp = RfcTimestamp();
659 const string canonical_headers = "x-ms-blob-type:BlockBlob\nx-ms-date:"
660 + timestamp + "\nx-ms-version:2011-08-18";
661 const string canonical_resource = "/" + config_.access_key + "/"
662 + config_.bucket + "/" + info.object_key;
663
664 string string_to_sign;
665 if ((info.request == JobInfo::kReqHeadOnly)
666 || (info.request == JobInfo::kReqHeadPut)
667 || (info.request == JobInfo::kReqDelete)) {
668 string_to_sign = GetRequestString(info) + string("\n\n\n")
669 + "\n\n\n\n\n\n\n\n\n" + canonical_headers + "\n"
670 + canonical_resource;
671 } else {
672 string_to_sign = GetRequestString(info) + string("\n\n\n")
673 + string(StringifyInt(info.origin->GetSize()))
674 + "\n\n\n\n\n\n\n\n\n" + canonical_headers + "\n"
675 + canonical_resource;
676 }
677
678 string signing_key;
679 const int retval = Debase64(config_.secret_key, &signing_key);
680 if (!retval)
681 return false;
682
683 const string signature = shash::Hmac256(signing_key, string_to_sign, true);
684
685 headers->push_back("x-ms-date: " + timestamp);
686 headers->push_back("x-ms-version: 2011-08-18");
687 headers->push_back("Authorization: SharedKey " + config_.access_key + ":"
688 + Base64(signature));
689 headers->push_back("x-ms-blob-type: BlockBlob");
690 return true;
691 }
692
693 26884 void S3FanoutManager::InitializeDnsSettingsCurl(CURL *handle,
694 CURLSH *sharehandle,
695 curl_slist *clist) const {
696 26884 CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, sharehandle);
697
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26884 times.
26884 assert(retval == CURLE_OK);
698 26884 retval = curl_easy_setopt(handle, CURLOPT_RESOLVE, clist);
699
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26884 times.
26884 assert(retval == CURLE_OK);
700 26884 }
701
702
703 26884 int S3FanoutManager::InitializeDnsSettings(CURL *handle,
704 std::string host_with_port) const {
705 // Use existing handle
706 const std::map<CURL *, S3FanOutDnsEntry *>::const_iterator
707
1/2
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
26884 it = curl_sharehandles_->find(handle);
708
2/2
✓ Branch 3 taken 25806 times.
✓ Branch 4 taken 1078 times.
26884 if (it != curl_sharehandles_->end()) {
709
1/2
✓ Branch 1 taken 25806 times.
✗ Branch 2 not taken.
25806 InitializeDnsSettingsCurl(handle, it->second->sharehandle,
710 25806 it->second->clist);
711 25806 return 0;
712 }
713
714 // Add protocol information for extraction of fields for DNS
715
2/4
✓ Branch 1 taken 1078 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1078 times.
✗ Branch 4 not taken.
1078 if (!IsHttpUrl(host_with_port))
716
2/4
✓ Branch 1 taken 1078 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1078 times.
✗ Branch 5 not taken.
1078 host_with_port = config_.protocol + "://" + host_with_port;
717
1/2
✓ Branch 1 taken 1078 times.
✗ Branch 2 not taken.
1078 const std::string remote_host = dns::ExtractHost(host_with_port);
718
1/2
✓ Branch 1 taken 1078 times.
✗ Branch 2 not taken.
1078 const std::string remote_port = dns::ExtractPort(host_with_port);
719
720 // If we have the name already resolved, use the least used IP
721 1078 S3FanOutDnsEntry *useme = NULL;
722 1078 unsigned int usemin = UINT_MAX;
723 1078 std::set<S3FanOutDnsEntry *>::iterator its3 = sharehandles_->begin();
724
2/2
✓ Branch 3 taken 858 times.
✓ Branch 4 taken 1078 times.
1936 for (; its3 != sharehandles_->end(); ++its3) {
725
1/2
✓ Branch 2 taken 858 times.
✗ Branch 3 not taken.
858 if ((*its3)->dns_name == remote_host) {
726
1/2
✓ Branch 1 taken 858 times.
✗ Branch 2 not taken.
858 if (usemin >= (*its3)->counter) {
727 858 usemin = (*its3)->counter;
728 858 useme = (*its3);
729 }
730 }
731 }
732
2/2
✓ Branch 0 taken 858 times.
✓ Branch 1 taken 220 times.
1078 if (useme != NULL) {
733
1/2
✓ Branch 1 taken 858 times.
✗ Branch 2 not taken.
858 curl_sharehandles_->insert(
734 858 std::pair<CURL *, S3FanOutDnsEntry *>(handle, useme));
735 858 useme->counter++;
736
1/2
✓ Branch 1 taken 858 times.
✗ Branch 2 not taken.
858 InitializeDnsSettingsCurl(handle, useme->sharehandle, useme->clist);
737 858 return 0;
738 }
739
740 // We need to resolve the hostname
741 // TODO(ssheikki): support ipv6 also... if (opt_ipv4_only_)
742
1/2
✓ Branch 1 taken 220 times.
✗ Branch 2 not taken.
220 const dns::Host host = resolver_->Resolve(remote_host);
743
1/2
✓ Branch 2 taken 220 times.
✗ Branch 3 not taken.
220 set<string> ipv4_addresses = host.ipv4_addresses();
744 220 std::set<string>::iterator its = ipv4_addresses.begin();
745 220 S3FanOutDnsEntry *dnse = NULL;
746
2/2
✓ Branch 3 taken 220 times.
✓ Branch 4 taken 220 times.
440 for (; its != ipv4_addresses.end(); ++its) {
747
2/4
✓ Branch 1 taken 220 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 220 times.
✗ Branch 5 not taken.
220 dnse = new S3FanOutDnsEntry();
748 220 dnse->counter = 0;
749
1/2
✓ Branch 1 taken 220 times.
✗ Branch 2 not taken.
220 dnse->dns_name = remote_host;
750
4/12
✗ Branch 1 not taken.
✓ Branch 2 taken 220 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 8 taken 220 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 220 times.
✗ Branch 12 not taken.
✗ Branch 14 not taken.
✓ Branch 15 taken 220 times.
✗ Branch 18 not taken.
✗ Branch 19 not taken.
220 dnse->port = remote_port.size() == 0 ? "80" : remote_port;
751
1/2
✓ Branch 2 taken 220 times.
✗ Branch 3 not taken.
220 dnse->ip = *its;
752 220 dnse->clist = NULL;
753
1/2
✓ Branch 2 taken 220 times.
✗ Branch 3 not taken.
220 dnse->clist = curl_slist_append(
754 dnse->clist,
755
4/8
✓ Branch 1 taken 220 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 220 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 220 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 220 times.
✗ Branch 11 not taken.
440 (dnse->dns_name + ":" + dnse->port + ":" + dnse->ip).c_str());
756
1/2
✓ Branch 1 taken 220 times.
✗ Branch 2 not taken.
220 dnse->sharehandle = curl_share_init();
757
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 220 times.
220 assert(dnse->sharehandle != NULL);
758
1/2
✓ Branch 1 taken 220 times.
✗ Branch 2 not taken.
220 const CURLSHcode share_retval = curl_share_setopt(
759 dnse->sharehandle, CURLSHOPT_SHARE, CURL_LOCK_DATA_DNS);
760
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 220 times.
220 assert(share_retval == CURLSHE_OK);
761
1/2
✓ Branch 1 taken 220 times.
✗ Branch 2 not taken.
220 sharehandles_->insert(dnse);
762 }
763
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 220 times.
220 if (dnse == NULL) {
764 LogCvmfs(kLogS3Fanout, kLogStderr | kLogSyslogErr,
765 "Error: DNS resolve failed for address '%s'.",
766 remote_host.c_str());
767 assert(dnse != NULL);
768 return -1;
769 }
770
1/2
✓ Branch 1 taken 220 times.
✗ Branch 2 not taken.
220 curl_sharehandles_->insert(
771 220 std::pair<CURL *, S3FanOutDnsEntry *>(handle, dnse));
772 220 dnse->counter++;
773
1/2
✓ Branch 1 taken 220 times.
✗ Branch 2 not taken.
220 InitializeDnsSettingsCurl(handle, dnse->sharehandle, dnse->clist);
774
775 220 return 0;
776 1078 }
777
778
779 26884 bool S3FanoutManager::MkPayloadHash(const JobInfo &info,
780 string *hex_hash) const {
781
2/2
✓ Branch 0 taken 26774 times.
✓ Branch 1 taken 110 times.
26884 if ((info.request == JobInfo::kReqHeadOnly)
782
2/2
✓ Branch 0 taken 13398 times.
✓ Branch 1 taken 13376 times.
26774 || (info.request == JobInfo::kReqHeadPut)
783
2/2
✓ Branch 0 taken 22 times.
✓ Branch 1 taken 13376 times.
13398 || (info.request == JobInfo::kReqDelete)) {
784
1/4
✓ Branch 0 taken 13508 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
13508 switch (config_.authz_method) {
785 13508 case kAuthzAwsV2:
786 13508 hex_hash->clear();
787 13508 break;
788 case kAuthzAwsV4:
789 // Sha256 over empty string
790 *hex_hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b78"
791 "52b855";
792 break;
793 case kAuthzAzure:
794 // no payload hash required for Azure signature
795 hex_hash->clear();
796 break;
797 default:
798 PANIC(NULL);
799 }
800 13508 return true;
801 }
802
803 // PUT, there is actually payload
804
1/2
✓ Branch 1 taken 13376 times.
✗ Branch 2 not taken.
13376 shash::Any payload_hash(shash::kMd5);
805
806 unsigned char *data;
807 13376 const unsigned int nbytes = info.origin->Data(
808
2/4
✓ Branch 2 taken 13376 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 13376 times.
✗ Branch 6 not taken.
13376 reinterpret_cast<void **>(&data), info.origin->GetSize(), 0);
809
2/4
✓ Branch 2 taken 13376 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 13376 times.
13376 assert(nbytes == info.origin->GetSize());
810
811
1/4
✓ Branch 0 taken 13376 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
13376 switch (config_.authz_method) {
812 13376 case kAuthzAwsV2:
813
1/2
✓ Branch 1 taken 13376 times.
✗ Branch 2 not taken.
13376 shash::HashMem(data, nbytes, &payload_hash);
814
2/4
✓ Branch 2 taken 13376 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 13376 times.
✗ Branch 6 not taken.
40128 *hex_hash = Base64(string(reinterpret_cast<char *>(payload_hash.digest),
815 26752 payload_hash.GetDigestSize()));
816 13376 return true;
817 case kAuthzAwsV4:
818 *hex_hash = shash::Sha256Mem(data, nbytes);
819 return true;
820 case kAuthzAzure:
821 // no payload hash required for Azure signature
822 hex_hash->clear();
823 return true;
824 default:
825 PANIC(NULL);
826 }
827 }
828
829 26906 string S3FanoutManager::GetRequestString(const JobInfo &info) const {
830
3/4
✓ Branch 0 taken 13486 times.
✓ Branch 1 taken 13376 times.
✓ Branch 2 taken 44 times.
✗ Branch 3 not taken.
26906 switch (info.request) {
831 13486 case JobInfo::kReqHeadOnly:
832 case JobInfo::kReqHeadPut:
833
1/2
✓ Branch 2 taken 13486 times.
✗ Branch 3 not taken.
13486 return "HEAD";
834 13376 case JobInfo::kReqPutCas:
835 case JobInfo::kReqPutDotCvmfs:
836 case JobInfo::kReqPutHtml:
837 case JobInfo::kReqPutBucket:
838
1/2
✓ Branch 2 taken 13376 times.
✗ Branch 3 not taken.
13376 return "PUT";
839 44 case JobInfo::kReqDelete:
840
1/2
✓ Branch 2 taken 44 times.
✗ Branch 3 not taken.
44 return "DELETE";
841 default:
842 PANIC(NULL);
843 }
844 }
845
846
847 26884 string S3FanoutManager::GetContentType(const JobInfo &info) const {
848
2/6
✓ Branch 0 taken 13508 times.
✓ Branch 1 taken 13376 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
26884 switch (info.request) {
849 13508 case JobInfo::kReqHeadOnly:
850 case JobInfo::kReqHeadPut:
851 case JobInfo::kReqDelete:
852
1/2
✓ Branch 2 taken 13508 times.
✗ Branch 3 not taken.
13508 return "";
853 13376 case JobInfo::kReqPutCas:
854
1/2
✓ Branch 2 taken 13376 times.
✗ Branch 3 not taken.
13376 return "application/octet-stream";
855 case JobInfo::kReqPutDotCvmfs:
856 return "application/x-cvmfs";
857 case JobInfo::kReqPutHtml:
858 return "text/html";
859 case JobInfo::kReqPutBucket:
860 return "text/xml";
861 default:
862 PANIC(NULL);
863 }
864 }
865
866
867 /**
868 * Request parameters set the URL and other options such as timeout and
869 * proxy.
870 */
871 26884 Failures S3FanoutManager::InitializeRequest(JobInfo *info, CURL *handle) const {
872 // Initialize internal download state
873 26884 info->curl_handle = handle;
874 26884 info->error_code = kFailOk;
875 26884 info->http_error = 0;
876 26884 info->num_retries = 0;
877 26884 info->backoff_ms = 0;
878 26884 info->throttle_ms = 0;
879 26884 info->throttle_timestamp = 0;
880 26884 info->http_headers = NULL;
881 // info->payload_size is needed in S3Uploader::MainCollectResults,
882 // where info->origin is already destroyed.
883
1/2
✓ Branch 2 taken 26884 times.
✗ Branch 3 not taken.
26884 info->payload_size = info->origin->GetSize();
884
885
2/4
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 26884 times.
✗ Branch 5 not taken.
26884 InitializeDnsSettings(handle, complete_hostname_);
886
887 CURLcode retval;
888
2/2
✓ Branch 0 taken 26774 times.
✓ Branch 1 taken 110 times.
26884 if ((info->request == JobInfo::kReqHeadOnly)
889
2/2
✓ Branch 0 taken 13398 times.
✓ Branch 1 taken 13376 times.
26774 || (info->request == JobInfo::kReqHeadPut)
890
2/2
✓ Branch 0 taken 22 times.
✓ Branch 1 taken 13376 times.
13398 || (info->request == JobInfo::kReqDelete)) {
891
1/2
✓ Branch 1 taken 13508 times.
✗ Branch 2 not taken.
13508 retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 0);
892
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13508 times.
13508 assert(retval == CURLE_OK);
893
1/2
✓ Branch 1 taken 13508 times.
✗ Branch 2 not taken.
13508 retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
894
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13508 times.
13508 assert(retval == CURLE_OK);
895
896
2/2
✓ Branch 0 taken 22 times.
✓ Branch 1 taken 13486 times.
13508 if (info->request == JobInfo::kReqDelete) {
897
2/4
✓ Branch 1 taken 22 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 22 times.
✗ Branch 6 not taken.
22 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST,
898 GetRequestString(*info).c_str());
899
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 22 times.
22 assert(retval == CURLE_OK);
900 } else {
901
1/2
✓ Branch 1 taken 13486 times.
✗ Branch 2 not taken.
13486 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
902
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13486 times.
13486 assert(retval == CURLE_OK);
903 }
904 } else {
905
1/2
✓ Branch 1 taken 13376 times.
✗ Branch 2 not taken.
13376 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
906
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13376 times.
13376 assert(retval == CURLE_OK);
907
1/2
✓ Branch 1 taken 13376 times.
✗ Branch 2 not taken.
13376 retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 1);
908
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13376 times.
13376 assert(retval == CURLE_OK);
909
1/2
✓ Branch 1 taken 13376 times.
✗ Branch 2 not taken.
13376 retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 0);
910
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13376 times.
13376 assert(retval == CURLE_OK);
911
2/4
✓ Branch 2 taken 13376 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 13376 times.
✗ Branch 6 not taken.
13376 retval = curl_easy_setopt(handle, CURLOPT_INFILESIZE_LARGE,
912 static_cast<curl_off_t>(info->origin->GetSize()));
913
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13376 times.
13376 assert(retval == CURLE_OK);
914
915
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13376 times.
13376 if (info->request == JobInfo::kReqPutDotCvmfs) {
916 info->http_headers = curl_slist_append(
917 info->http_headers, dot_cvmfs_cache_control_header.c_str());
918
1/2
✓ Branch 0 taken 13376 times.
✗ Branch 1 not taken.
13376 } else if (info->request == JobInfo::kReqPutCas) {
919
1/2
✓ Branch 1 taken 13376 times.
✗ Branch 2 not taken.
13376 info->http_headers = curl_slist_append(info->http_headers,
920 kCacheControlCas);
921 }
922 }
923
924 bool retval_b;
925
926 // Authorization
927 26884 vector<string> authz_headers;
928
1/4
✓ Branch 0 taken 26884 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
26884 switch (config_.authz_method) {
929 26884 case kAuthzAwsV2:
930
1/2
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
26884 retval_b = MkV2Authz(*info, &authz_headers);
931 26884 break;
932 case kAuthzAwsV4:
933 retval_b = MkV4Authz(*info, &authz_headers);
934 break;
935 case kAuthzAzure:
936 retval_b = MkAzureAuthz(*info, &authz_headers);
937 break;
938 default:
939 PANIC(NULL);
940 }
941
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26884 times.
26884 if (!retval_b)
942 return kFailLocalIO;
943
2/2
✓ Branch 1 taken 107404 times.
✓ Branch 2 taken 26884 times.
134288 for (unsigned i = 0; i < authz_headers.size(); ++i) {
944
1/2
✓ Branch 2 taken 107404 times.
✗ Branch 3 not taken.
107404 info->http_headers = curl_slist_append(info->http_headers,
945 107404 authz_headers[i].c_str());
946 }
947
948 // Common headers
949
1/2
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
26884 info->http_headers = curl_slist_append(info->http_headers,
950 "Connection: Keep-Alive");
951
1/2
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
26884 info->http_headers = curl_slist_append(info->http_headers, "Pragma:");
952 // No 100-continue
953
1/2
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
26884 info->http_headers = curl_slist_append(info->http_headers, "Expect:");
954 // Strip unnecessary header
955
1/2
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
26884 info->http_headers = curl_slist_append(info->http_headers, "Accept:");
956
1/2
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
26884 info->http_headers = curl_slist_append(info->http_headers,
957 26884 user_agent_->c_str());
958
959 // Set curl parameters
960
1/2
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
26884 retval = curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
961
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26884 times.
26884 assert(retval == CURLE_OK);
962
1/2
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
26884 retval = curl_easy_setopt(handle, CURLOPT_HEADERDATA,
963 static_cast<void *>(info));
964
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26884 times.
26884 assert(retval == CURLE_OK);
965
1/2
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
26884 retval = curl_easy_setopt(handle, CURLOPT_READDATA,
966 static_cast<void *>(info));
967
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26884 times.
26884 assert(retval == CURLE_OK);
968
1/2
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
26884 retval = curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->http_headers);
969
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26884 times.
26884 assert(retval == CURLE_OK);
970
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26884 times.
26884 if (opt_ipv4_only_) {
971 retval = curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
972 assert(retval == CURLE_OK);
973 }
974 // Follow HTTP redirects
975
1/2
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
26884 retval = curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L);
976
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26884 times.
26884 assert(retval == CURLE_OK);
977
978
1/2
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
26884 retval = curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, info->errorbuffer);
979
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26884 times.
26884 assert(retval == CURLE_OK);
980
981
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 26884 times.
26884 if (config_.protocol == "https") {
982 retval = curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 1L);
983 assert(retval == CURLE_OK);
984 retval = curl_easy_setopt(handle, CURLOPT_PROXY_SSL_VERIFYPEER, 1L);
985 assert(retval == CURLE_OK);
986 const bool add_cert = ssl_certificate_store_.ApplySslCertificatePath(
987 handle);
988 assert(add_cert);
989 }
990
991 26884 return kFailOk;
992 26884 }
993
994
995 /**
996 * Sets the URL specific options such as host to use and timeout.
997 */
998 26884 void S3FanoutManager::SetUrlOptions(JobInfo *info) const {
999 26884 CURL *curl_handle = info->curl_handle;
1000 CURLcode retval;
1001
1002
1/2
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
26884 retval = curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT,
1003 config_.opt_timeout_sec);
1004
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26884 times.
26884 assert(retval == CURLE_OK);
1005
1/2
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
26884 retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT,
1006 kLowSpeedLimit);
1007
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26884 times.
26884 assert(retval == CURLE_OK);
1008
1/2
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
26884 retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME,
1009 config_.opt_timeout_sec);
1010
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26884 times.
26884 assert(retval == CURLE_OK);
1011
1012
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26884 times.
26884 if (is_curl_debug_) {
1013 retval = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE, 1);
1014 assert(retval == CURLE_OK);
1015 }
1016
1017
1/2
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
26884 const string url = MkUrl(info->object_key);
1018
1/2
✓ Branch 2 taken 26884 times.
✗ Branch 3 not taken.
26884 retval = curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
1019
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26884 times.
26884 assert(retval == CURLE_OK);
1020
1021
1/2
✓ Branch 2 taken 26884 times.
✗ Branch 3 not taken.
26884 retval = curl_easy_setopt(curl_handle, CURLOPT_PROXY, config_.proxy.c_str());
1022
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26884 times.
26884 assert(retval == CURLE_OK);
1023 26884 }
1024
1025
1026 /**
1027 * Adds transfer time and uploaded bytes to the global counters.
1028 */
1029 26972 void S3FanoutManager::UpdateStatistics(CURL *handle) {
1030 double val;
1031
1032
2/4
✓ Branch 1 taken 26972 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 26972 times.
✗ Branch 4 not taken.
26972 if (curl_easy_getinfo(handle, CURLINFO_SIZE_UPLOAD, &val) == CURLE_OK)
1033 26972 statistics_->transferred_bytes += val;
1034 26972 }
1035
1036
1037 /**
1038 * Retry if possible and if not already done too often.
1039 */
1040 132 bool S3FanoutManager::CanRetry(const JobInfo *info) {
1041 132 return (info->error_code == kFailHostConnection
1042
1/2
✓ Branch 0 taken 132 times.
✗ Branch 1 not taken.
132 || info->error_code == kFailHostResolve
1043
1/2
✓ Branch 0 taken 132 times.
✗ Branch 1 not taken.
132 || info->error_code == kFailServiceUnavailable
1044
2/2
✓ Branch 0 taken 88 times.
✓ Branch 1 taken 44 times.
132 || info->error_code == kFailRetry)
1045
2/4
✓ Branch 0 taken 132 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 88 times.
✗ Branch 3 not taken.
264 && (info->num_retries < config_.opt_max_retries);
1046 }
1047
1048
1049 /**
1050 * Backoff for retry to introduce a jitter into a upload sequence.
1051 *
1052 * \return true if backoff has been performed, false otherwise
1053 */
1054 88 void S3FanoutManager::Backoff(JobInfo *info) {
1055
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 88 times.
88 if (info->error_code != kFailRetry)
1056 info->num_retries++;
1057 88 statistics_->num_retries++;
1058
1059
1/2
✓ Branch 0 taken 88 times.
✗ Branch 1 not taken.
88 if (info->throttle_ms > 0) {
1060 88 LogCvmfs(kLogS3Fanout, kLogDebug, "throttling for %d ms",
1061 info->throttle_ms);
1062 88 const uint64_t now = platform_monotonic_time();
1063
1/2
✓ Branch 0 taken 88 times.
✗ Branch 1 not taken.
88 if ((info->throttle_timestamp + (info->throttle_ms / 1000)) >= now) {
1064
2/2
✓ Branch 0 taken 22 times.
✓ Branch 1 taken 66 times.
88 if ((now - timestamp_last_throttle_report_)
1065 > kThrottleReportIntervalSec) {
1066 22 LogCvmfs(kLogS3Fanout, kLogStdout,
1067 "Warning: S3 backend throttling %ums "
1068 "(total backoff time so far %lums)",
1069 22 info->throttle_ms, statistics_->ms_throttled);
1070 22 timestamp_last_throttle_report_ = now;
1071 }
1072 88 statistics_->ms_throttled += info->throttle_ms;
1073 88 SafeSleepMs(info->throttle_ms);
1074 }
1075 } else {
1076 if (info->backoff_ms == 0) {
1077 // Must be != 0
1078 info->backoff_ms = prng_.Next(config_.opt_backoff_init_ms + 1);
1079 } else {
1080 info->backoff_ms *= 2;
1081 }
1082 if (info->backoff_ms > config_.opt_backoff_max_ms)
1083 info->backoff_ms = config_.opt_backoff_max_ms;
1084
1085 LogCvmfs(kLogS3Fanout, kLogDebug, "backing off for %d ms",
1086 info->backoff_ms);
1087 SafeSleepMs(info->backoff_ms);
1088 }
1089 88 }
1090
1091
1092 /**
1093 * Checks the result of a curl request and implements the failure logic
1094 * and takes care of cleanup.
1095 *
1096 * @return true if request should be repeated, false otherwise
1097 */
1098 26972 bool S3FanoutManager::VerifyAndFinalize(const int curl_error, JobInfo *info) {
1099 26972 LogCvmfs(kLogS3Fanout, kLogDebug,
1100 "Verify uploaded/tested object %s "
1101 "(curl error %d, info error %d, info request %d)",
1102 26972 info->object_key.c_str(), curl_error, info->error_code,
1103 26972 info->request);
1104 26972 UpdateStatistics(info->curl_handle);
1105
1106 // Verification and error classification
1107
1/6
✓ Branch 0 taken 26972 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
26972 switch (curl_error) {
1108 26972 case CURLE_OK:
1109
2/2
✓ Branch 0 taken 26884 times.
✓ Branch 1 taken 88 times.
26972 if ((info->error_code != kFailRetry)
1110
2/2
✓ Branch 0 taken 13464 times.
✓ Branch 1 taken 13420 times.
26884 && (info->error_code != kFailNotFound)) {
1111 13464 info->error_code = kFailOk;
1112 }
1113 26972 break;
1114 case CURLE_UNSUPPORTED_PROTOCOL:
1115 case CURLE_URL_MALFORMAT:
1116 info->error_code = kFailBadRequest;
1117 break;
1118 case CURLE_COULDNT_RESOLVE_HOST:
1119 info->error_code = kFailHostResolve;
1120 break;
1121 case CURLE_COULDNT_CONNECT:
1122 case CURLE_OPERATION_TIMEDOUT:
1123 case CURLE_SEND_ERROR:
1124 case CURLE_RECV_ERROR:
1125 info->error_code = kFailHostConnection;
1126 break;
1127 case CURLE_ABORTED_BY_CALLBACK:
1128 case CURLE_WRITE_ERROR:
1129 // Error set by callback
1130 break;
1131 default:
1132 LogCvmfs(kLogS3Fanout, kLogStderr | kLogSyslogErr,
1133 "unexpected curl error (%d) while trying to upload %s: %s",
1134 curl_error, info->object_key.c_str(), info->errorbuffer);
1135 info->error_code = kFailOther;
1136 break;
1137 }
1138
1139 // Transform HEAD to PUT request
1140
2/2
✓ Branch 0 taken 13420 times.
✓ Branch 1 taken 13552 times.
26972 if ((info->error_code == kFailNotFound)
1141
2/2
✓ Branch 0 taken 13376 times.
✓ Branch 1 taken 44 times.
13420 && (info->request == JobInfo::kReqHeadPut)) {
1142 13376 LogCvmfs(kLogS3Fanout, kLogDebug, "not found: %s, uploading",
1143 info->object_key.c_str());
1144 13376 info->request = JobInfo::kReqPutCas;
1145 13376 curl_slist_free_all(info->http_headers);
1146 13376 info->http_headers = NULL;
1147 13376 const s3fanout::Failures init_failure = InitializeRequest(
1148 info, info->curl_handle);
1149
1150
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13376 times.
13376 if (init_failure != s3fanout::kFailOk) {
1151 PANIC(kLogStderr,
1152 "Failed to initialize CURL handle "
1153 "(error: %d - %s | errno: %d)",
1154 init_failure, Code2Ascii(init_failure), errno);
1155 }
1156 13376 SetUrlOptions(info);
1157 // Reset origin
1158 13376 info->origin->Rewind();
1159 13376 return true; // Again, Put
1160 }
1161
1162 // Determination if failed request should be repeated
1163 13596 bool try_again = false;
1164
2/2
✓ Branch 0 taken 132 times.
✓ Branch 1 taken 13464 times.
13596 if (info->error_code != kFailOk) {
1165 132 try_again = CanRetry(info);
1166 }
1167
2/2
✓ Branch 0 taken 88 times.
✓ Branch 1 taken 13508 times.
13596 if (try_again) {
1168
1/2
✓ Branch 0 taken 88 times.
✗ Branch 1 not taken.
88 if (info->request == JobInfo::kReqPutCas
1169
1/2
✓ Branch 0 taken 88 times.
✗ Branch 1 not taken.
88 || info->request == JobInfo::kReqPutDotCvmfs
1170
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 88 times.
88 || info->request == JobInfo::kReqPutHtml) {
1171 LogCvmfs(kLogS3Fanout, kLogDebug, "Trying again to upload %s",
1172 info->object_key.c_str());
1173 // Reset origin
1174 info->origin->Rewind();
1175 }
1176 88 Backoff(info);
1177 88 info->error_code = kFailOk;
1178 88 info->http_error = 0;
1179 88 info->throttle_ms = 0;
1180 88 info->backoff_ms = 0;
1181 88 info->throttle_timestamp = 0;
1182 88 return true; // try again
1183 }
1184
1185 // Cleanup opened resources
1186 13508 info->origin.Destroy();
1187
1188
3/4
✓ Branch 0 taken 44 times.
✓ Branch 1 taken 13464 times.
✓ Branch 2 taken 44 times.
✗ Branch 3 not taken.
13508 if ((info->error_code != kFailOk) && (info->http_error != 0)
1189
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 44 times.
44 && (info->http_error != 404)) {
1190 LogCvmfs(kLogS3Fanout, kLogStderr, "S3: HTTP failure %d", info->http_error);
1191 }
1192 13508 return false; // stop transfer
1193 }
1194
1195
2/4
✓ Branch 3 taken 264 times.
✗ Branch 4 not taken.
✓ Branch 9 taken 264 times.
✗ Branch 10 not taken.
264 S3FanoutManager::S3FanoutManager(const S3Config &config) : config_(config) {
1196 264 atomic_init32(&multi_threaded_);
1197
1/2
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
264 MakePipe(pipe_terminate_);
1198
1/2
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
264 MakePipe(pipe_jobs_);
1199
1/2
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
264 MakePipe(pipe_completed_);
1200
1201 int retval;
1202 264 jobs_todo_lock_ = reinterpret_cast<pthread_mutex_t *>(
1203 264 smalloc(sizeof(pthread_mutex_t)));
1204 264 retval = pthread_mutex_init(jobs_todo_lock_, NULL);
1205
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 264 times.
264 assert(retval == 0);
1206 264 curl_handle_lock_ = reinterpret_cast<pthread_mutex_t *>(
1207 264 smalloc(sizeof(pthread_mutex_t)));
1208 264 retval = pthread_mutex_init(curl_handle_lock_, NULL);
1209
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 264 times.
264 assert(retval == 0);
1210
1211
1/2
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
264 active_requests_ = new set<JobInfo *>;
1212
1/2
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
264 pool_handles_idle_ = new set<CURL *>;
1213
1/2
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
264 pool_handles_inuse_ = new set<CURL *>;
1214
1/2
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
264 curl_sharehandles_ = new map<CURL *, S3FanOutDnsEntry *>;
1215
1/2
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
264 sharehandles_ = new set<S3FanOutDnsEntry *>;
1216 264 watch_fds_max_ = 4 * config_.pool_max_handles;
1217 264 max_available_jobs_ = 4 * config_.pool_max_handles;
1218
2/4
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 264 times.
✗ Branch 5 not taken.
264 available_jobs_ = new Semaphore(max_available_jobs_);
1219
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 264 times.
264 assert(NULL != available_jobs_);
1220
1221
1/2
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
264 statistics_ = new Statistics();
1222
1/2
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
264 user_agent_ = new string();
1223
2/4
✓ Branch 2 taken 264 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 264 times.
✗ Branch 6 not taken.
264 *user_agent_ = "User-Agent: cvmfs " + string(CVMFS_VERSION);
1224
1/2
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
264 complete_hostname_ = MkCompleteHostname();
1225
1/2
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
264 dot_cvmfs_cache_control_header = MkDotCvmfsCacheControlHeader();
1226
1227
1/2
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
264 const CURLcode cretval = curl_global_init(CURL_GLOBAL_ALL);
1228
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 264 times.
264 assert(cretval == CURLE_OK);
1229
1/2
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
264 curl_multi_ = curl_multi_init();
1230
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 264 times.
264 assert(curl_multi_ != NULL);
1231 CURLMcode mretval;
1232
1/2
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
264 mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION,
1233 CallbackCurlSocket);
1234
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 264 times.
264 assert(mretval == CURLM_OK);
1235
1/2
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
264 mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1236 static_cast<void *>(this));
1237
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 264 times.
264 assert(mretval == CURLM_OK);
1238
1/2
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
264 mretval = curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1239 config_.pool_max_handles);
1240
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 264 times.
264 assert(mretval == CURLM_OK);
1241
1242 264 prng_.InitLocaltime();
1243
1244 264 thread_upload_ = 0;
1245 264 timestamp_last_throttle_report_ = 0;
1246 264 is_curl_debug_ = (getenv("_CVMFS_CURL_DEBUG") != NULL);
1247
1248 // Parsing environment variables
1249 264 if ((getenv("CVMFS_IPV4_ONLY") != NULL)
1250
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 264 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 264 times.
264 && (strlen(getenv("CVMFS_IPV4_ONLY")) > 0)) {
1251 opt_ipv4_only_ = true;
1252 } else {
1253 264 opt_ipv4_only_ = false;
1254 }
1255
1256
1/2
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
264 resolver_ = dns::CaresResolver::Create(opt_ipv4_only_, 2, 2000);
1257
1258 264 watch_fds_ = static_cast<struct pollfd *>(smalloc(4 * sizeof(struct pollfd)));
1259 264 watch_fds_size_ = 4;
1260 264 watch_fds_inuse_ = 0;
1261
1262
1/2
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
264 ssl_certificate_store_.UseSystemCertificatePath();
1263 264 }
1264
1265 528 S3FanoutManager::~S3FanoutManager() {
1266 264 pthread_mutex_destroy(jobs_todo_lock_);
1267 264 free(jobs_todo_lock_);
1268 264 pthread_mutex_destroy(curl_handle_lock_);
1269 264 free(curl_handle_lock_);
1270
1271
1/2
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
264 if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1272 // Shutdown I/O thread
1273 264 char buf = 'T';
1274 264 WritePipe(pipe_terminate_[1], &buf, 1);
1275 264 pthread_join(thread_upload_, NULL);
1276 }
1277 264 ClosePipe(pipe_terminate_);
1278 264 ClosePipe(pipe_jobs_);
1279 264 ClosePipe(pipe_completed_);
1280
1281 264 set<CURL *>::iterator i = pool_handles_idle_->begin();
1282 264 const set<CURL *>::const_iterator iEnd = pool_handles_idle_->end();
1283
2/2
✓ Branch 2 taken 440 times.
✓ Branch 3 taken 264 times.
704 for (; i != iEnd; ++i) {
1284 440 curl_easy_cleanup(*i);
1285 }
1286
1287 264 set<S3FanOutDnsEntry *>::iterator is = sharehandles_->begin();
1288 264 const set<S3FanOutDnsEntry *>::const_iterator isEnd = sharehandles_->end();
1289
2/2
✓ Branch 2 taken 220 times.
✓ Branch 3 taken 264 times.
484 for (; is != isEnd; ++is) {
1290 220 curl_share_cleanup((*is)->sharehandle);
1291 220 curl_slist_free_all((*is)->clist);
1292
1/2
✓ Branch 1 taken 220 times.
✗ Branch 2 not taken.
220 delete *is;
1293 }
1294 264 pool_handles_idle_->clear();
1295 264 curl_sharehandles_->clear();
1296 264 sharehandles_->clear();
1297
1/2
✓ Branch 0 taken 264 times.
✗ Branch 1 not taken.
264 delete active_requests_;
1298
1/2
✓ Branch 0 taken 264 times.
✗ Branch 1 not taken.
264 delete pool_handles_idle_;
1299
1/2
✓ Branch 0 taken 264 times.
✗ Branch 1 not taken.
264 delete pool_handles_inuse_;
1300
1/2
✓ Branch 0 taken 264 times.
✗ Branch 1 not taken.
264 delete curl_sharehandles_;
1301
1/2
✓ Branch 0 taken 264 times.
✗ Branch 1 not taken.
264 delete sharehandles_;
1302
1/2
✓ Branch 0 taken 264 times.
✗ Branch 1 not taken.
264 delete user_agent_;
1303 264 curl_multi_cleanup(curl_multi_);
1304
1305
1/2
✓ Branch 0 taken 264 times.
✗ Branch 1 not taken.
264 delete statistics_;
1306
1307
1/2
✓ Branch 0 taken 264 times.
✗ Branch 1 not taken.
264 delete available_jobs_;
1308
1309 264 curl_global_cleanup();
1310 264 }
1311
1312 /**
1313 * Spawns the I/O worker thread. No way back except ~S3FanoutManager.
1314 */
1315 264 void S3FanoutManager::Spawn() {
1316 264 LogCvmfs(kLogS3Fanout, kLogDebug, "S3FanoutManager spawned");
1317
1318 264 const int retval = pthread_create(&thread_upload_, NULL, MainUpload,
1319 static_cast<void *>(this));
1320
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 264 times.
264 assert(retval == 0);
1321
1322 264 atomic_inc32(&multi_threaded_);
1323 264 }
1324
1325 66 const Statistics &S3FanoutManager::GetStatistics() { return *statistics_; }
1326
1327 /**
1328 * Push new job to be uploaded to the S3 cloud storage.
1329 */
1330 13508 void S3FanoutManager::PushNewJob(JobInfo *info) {
1331 13508 available_jobs_->Increment();
1332 13508 WritePipe(pipe_jobs_[1], &info, sizeof(info));
1333 13508 }
1334
1335 /**
1336 * Push completed job to list of completed jobs
1337 */
1338 13772 void S3FanoutManager::PushCompletedJob(JobInfo *info) {
1339 13772 WritePipe(pipe_completed_[1], &info, sizeof(info));
1340 13772 }
1341
1342 /**
1343 * Pop completed job
1344 */
1345 13772 JobInfo *S3FanoutManager::PopCompletedJob() {
1346 JobInfo *info;
1347
1/2
✓ Branch 1 taken 13772 times.
✗ Branch 2 not taken.
13772 ReadPipe(pipe_completed_[0], &info, sizeof(info));
1348 13772 return info;
1349 }
1350
1351 //------------------------------------------------------------------------------
1352
1353
1354 string Statistics::Print() const {
1355 return "Transferred Bytes: " + StringifyInt(uint64_t(transferred_bytes))
1356 + "\n" + "Transfer duration: " + StringifyInt(uint64_t(transfer_time))
1357 + " s\n" + "Number of requests: " + StringifyInt(num_requests) + "\n"
1358 + "Number of retries: " + StringifyInt(num_retries) + "\n";
1359 }
1360
1361 } // namespace s3fanout
1362