GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/network/s3fanout.cc
Date: 2026-04-26 02:35:59
Exec Total Coverage
Lines: 639 909 70.3%
Branches: 471 1320 35.7%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 *
4 * Runs a thread using libcurls asynchronous I/O mode to push data to S3
5 */
6
7 #include "s3fanout.h"
8
9 #include <pthread.h>
10
11 #include <algorithm>
12 #include <cassert>
13 #include <cerrno>
14 #include <cstring>
15 #include <utility>
16
17 #include "crypto/hash.h"
18 #include "util/exception.h"
19 #include "util/posix.h"
20 #include "util/string.h"
21 #include "util/platform.h"
22
23 using namespace std; // NOLINT
24
25 namespace s3fanout {
26
27 /**
28 * Escapes characters that are not allowed in XML text content.
29 * Only & and < need escaping; >, ', " are safe in text nodes.
30 */
31 20770 static string XmlEscape(const string &input) {
32 20770 string result;
33
1/2
✓ Branch 2 taken 20770 times.
✗ Branch 3 not taken.
20770 result.reserve(input.size());
34
2/2
✓ Branch 1 taken 471940 times.
✓ Branch 2 taken 20770 times.
492710 for (unsigned i = 0; i < input.size(); ++i) {
35
3/3
✓ Branch 1 taken 48 times.
✓ Branch 2 taken 48 times.
✓ Branch 3 taken 471844 times.
471940 switch (input[i]) {
36
1/2
✓ Branch 1 taken 48 times.
✗ Branch 2 not taken.
48 case '&': result += "&amp;"; break;
37
1/2
✓ Branch 1 taken 48 times.
✗ Branch 2 not taken.
48 case '<': result += "&lt;"; break;
38
1/2
✓ Branch 2 taken 471844 times.
✗ Branch 3 not taken.
471844 default: result += input[i]; break;
39 }
40 }
41 20770 return result;
42 }
43
44
45 /**
46 * Builds an S3 DeleteObjects XML request body for multi-object delete.
47 * Uses Quiet mode so the response only contains errors, not successes.
48 */
49 260 string ComposeDeleteMultiXml(const vector<string> &keys) {
50
1/2
✓ Branch 2 taken 260 times.
✗ Branch 3 not taken.
260 string xml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><Delete><Quiet>true</Quiet>";
51 // ~70 bytes per <Object><Key>...</Key></Object> entry
52
1/2
✓ Branch 3 taken 260 times.
✗ Branch 4 not taken.
260 xml.reserve(xml.size() + keys.size() * 70 + 10);
53
2/2
✓ Branch 1 taken 20770 times.
✓ Branch 2 taken 260 times.
21030 for (unsigned i = 0; i < keys.size(); ++i) {
54
4/8
✓ Branch 2 taken 20770 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 20770 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 20770 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 20770 times.
✗ Branch 12 not taken.
20770 xml += "<Object><Key>" + XmlEscape(keys[i]) + "</Key></Object>";
55 }
56
1/2
✓ Branch 1 taken 260 times.
✗ Branch 2 not taken.
260 xml += "</Delete>";
57 260 return xml;
58 }
59
60
61 /**
62 * Parses the S3 DeleteObjects error response XML.
63 * In Quiet mode, the response only contains <Error> elements for failed keys.
64 * Returns the number of errors found.
65 */
66 192 unsigned ParseDeleteMultiResponse(const string &response,
67 vector<string> *error_keys,
68 vector<string> *error_codes,
69 vector<string> *error_messages) {
70 192 unsigned num_errors = 0;
71 192 string::size_type pos = 0;
72
73 while (true) {
74 336 const string::size_type err_start = response.find("<Error>", pos);
75
2/2
✓ Branch 0 taken 144 times.
✓ Branch 1 taken 192 times.
336 if (err_start == string::npos)
76 144 break;
77 192 const string::size_type err_end = response.find("</Error>", err_start);
78
2/2
✓ Branch 0 taken 48 times.
✓ Branch 1 taken 144 times.
192 if (err_end == string::npos)
79 48 break;
80
81 const string error_block = response.substr(err_start,
82
1/2
✓ Branch 1 taken 144 times.
✗ Branch 2 not taken.
144 err_end - err_start);
83 144 num_errors++;
84
85 // Extract <Key>...</Key>
86 144 const string::size_type key_start = error_block.find("<Key>");
87 144 const string::size_type key_end = error_block.find("</Key>");
88
2/4
✓ Branch 0 taken 144 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 144 times.
✗ Branch 3 not taken.
144 if (key_start != string::npos && key_end != string::npos) {
89
1/2
✓ Branch 1 taken 144 times.
✗ Branch 2 not taken.
144 error_keys->push_back(
90
1/2
✓ Branch 1 taken 144 times.
✗ Branch 2 not taken.
288 error_block.substr(key_start + 5, key_end - key_start - 5));
91 } else {
92 error_keys->push_back("");
93 }
94
95 // Extract <Code>...</Code>
96 144 const string::size_type code_start = error_block.find("<Code>");
97 144 const string::size_type code_end = error_block.find("</Code>");
98
2/4
✓ Branch 0 taken 144 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 144 times.
✗ Branch 3 not taken.
144 if (code_start != string::npos && code_end != string::npos) {
99
1/2
✓ Branch 1 taken 144 times.
✗ Branch 2 not taken.
144 error_codes->push_back(
100
1/2
✓ Branch 1 taken 144 times.
✗ Branch 2 not taken.
288 error_block.substr(code_start + 6, code_end - code_start - 6));
101 } else {
102 error_codes->push_back("");
103 }
104
105 // Extract <Message>...</Message>
106 144 const string::size_type msg_start = error_block.find("<Message>");
107 144 const string::size_type msg_end = error_block.find("</Message>");
108
2/4
✓ Branch 0 taken 144 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 144 times.
✗ Branch 3 not taken.
144 if (msg_start != string::npos && msg_end != string::npos) {
109
1/2
✓ Branch 1 taken 144 times.
✗ Branch 2 not taken.
144 error_messages->push_back(
110
1/2
✓ Branch 1 taken 144 times.
✗ Branch 2 not taken.
288 error_block.substr(msg_start + 9, msg_end - msg_start - 9));
111 } else {
112 error_messages->push_back("");
113 }
114
115 144 pos = err_end + 8; // length of "</Error>"
116 144 }
117
118 192 return num_errors;
119 }
120
121 const char *S3FanoutManager::kCacheControlCas = "Cache-Control: max-age=259200";
122 const unsigned S3FanoutManager::kDefault429ThrottleMs = 250;
123 const unsigned S3FanoutManager::kMax429ThrottleMs = 10000;
124 const unsigned S3FanoutManager::kThrottleReportIntervalSec = 10;
125 const unsigned S3FanoutManager::kDefaultHTTPPort = 80;
126 const unsigned S3FanoutManager::kDefaultHTTPSPort = 443;
127
128 442 std::string S3FanoutManager::MkDotCvmfsCacheControlHeader(unsigned defaultMaxAge, int overrideMaxAge)
129 {
130 const char *var;
131 int max_age_sec;
132 char *at_null_terminator_if_number;
133 442 bool value_determined = false;
134
135
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 442 times.
442 if (overrideMaxAge >= 0) {
136 max_age_sec = overrideMaxAge;
137 value_determined = true;
138 }
139
140
1/2
✓ Branch 0 taken 442 times.
✗ Branch 1 not taken.
442 if (!value_determined) {
141 442 var = getenv("CVMFS_MAX_TTL_SECS");
142
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
442 if (var && var[0]) {
143 max_age_sec = strtoll(var, &at_null_terminator_if_number, 10);
144 if (*at_null_terminator_if_number == '\0'
145 && max_age_sec >= 0) {
146 value_determined = true;
147 }
148 }
149 }
150
151
1/2
✓ Branch 0 taken 442 times.
✗ Branch 1 not taken.
442 if (!value_determined) {
152 442 var = getenv("CVMFS_MAX_TTL");
153
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
442 if (var && var[0]) {
154 max_age_sec = strtoll(var, &at_null_terminator_if_number, 10);
155 if (*at_null_terminator_if_number == '\0'
156 && max_age_sec >= 0) {
157 max_age_sec *= 60;
158 value_determined = true;
159 }
160 }
161 }
162
163
1/2
✓ Branch 0 taken 442 times.
✗ Branch 1 not taken.
442 if (!value_determined) {
164 442 max_age_sec = defaultMaxAge;
165 }
166
167
2/4
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 442 times.
✗ Branch 5 not taken.
884 return "Cache-Control: max-age=" + std::to_string(max_age_sec);
168 }
169
170 /**
171 * Parses Retry-After and X-Retry-In headers attached to HTTP 429 responses
172 */
173 1080 void S3FanoutManager::DetectThrottleIndicator(const std::string &header,
174 JobInfo *info) {
175 1080 std::string value_str;
176
4/6
✓ Branch 2 taken 1080 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1080 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 472 times.
✓ Branch 10 taken 608 times.
1080 if (HasPrefix(header, "retry-after:", true))
177
1/2
✓ Branch 1 taken 472 times.
✗ Branch 2 not taken.
472 value_str = header.substr(12);
178
4/6
✓ Branch 2 taken 1080 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1080 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 240 times.
✓ Branch 10 taken 840 times.
1080 if (HasPrefix(header, "x-retry-in:", true))
179
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 value_str = header.substr(11);
180
181
1/2
✓ Branch 1 taken 1080 times.
✗ Branch 2 not taken.
1080 value_str = Trim(value_str, true /* trim_newline */);
182
2/2
✓ Branch 1 taken 616 times.
✓ Branch 2 taken 464 times.
1080 if (!value_str.empty()) {
183
1/2
✓ Branch 1 taken 616 times.
✗ Branch 2 not taken.
616 const unsigned value_numeric = String2Uint64(value_str);
184
2/4
✓ Branch 2 taken 616 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 616 times.
✗ Branch 6 not taken.
1232 const unsigned value_ms = HasSuffix(value_str, "ms", true /* ignore_case */)
185
2/2
✓ Branch 0 taken 240 times.
✓ Branch 1 taken 376 times.
616 ? value_numeric
186 616 : (value_numeric * 1000);
187
2/2
✓ Branch 0 taken 568 times.
✓ Branch 1 taken 48 times.
616 if (value_ms > 0)
188 568 info->throttle_ms = std::min(value_ms, kMax429ThrottleMs);
189 }
190 1080 }
191
192
193 /**
194 * Called by curl for every HTTP header. Not called for file:// transfers.
195 */
196 125290 static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb,
197 void *info_link) {
198 125290 const size_t num_bytes = size * nmemb;
199
1/2
✓ Branch 2 taken 125290 times.
✗ Branch 3 not taken.
125290 const string header_line(static_cast<const char *>(ptr), num_bytes);
200 125290 JobInfo *info = static_cast<JobInfo *>(info_link);
201
202 // Check for http status code errors
203
4/6
✓ Branch 2 taken 125290 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 125290 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 41718 times.
✓ Branch 10 taken 83572 times.
125290 if (HasPrefix(header_line, "HTTP/1.", false)) {
204
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 41718 times.
41718 if (header_line.length() < 10)
205 return 0;
206
207 unsigned i;
208
5/6
✓ Branch 1 taken 83436 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 41718 times.
✓ Branch 5 taken 41718 times.
✓ Branch 6 taken 41718 times.
✓ Branch 7 taken 41718 times.
83436 for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {
209 }
210
211
2/2
✓ Branch 1 taken 20842 times.
✓ Branch 2 taken 20876 times.
41718 if (header_line[i] == '2') {
212 20842 return num_bytes;
213 } else {
214
1/2
✓ Branch 2 taken 20876 times.
✗ Branch 3 not taken.
20876 LogCvmfs(kLogS3Fanout, kLogDebug, "http status error code [info %p]: %s",
215 info, header_line.c_str());
216
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 20876 times.
20876 if (header_line.length() < i + 3) {
217 LogCvmfs(kLogS3Fanout, kLogStderr, "S3: invalid HTTP response '%s'",
218 header_line.c_str());
219 info->error_code = kFailOther;
220 return 0;
221 }
222
2/4
✓ Branch 3 taken 20876 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 20876 times.
✗ Branch 7 not taken.
20876 info->http_error = String2Int64(string(&header_line[i], 3));
223
224
2/7
✓ Branch 0 taken 136 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 20740 times.
✗ Branch 6 not taken.
20876 switch (info->http_error) {
225 136 case 429:
226 136 info->error_code = kFailRetry;
227 136 info->throttle_ms = S3FanoutManager::kDefault429ThrottleMs;
228 136 info->throttle_timestamp = platform_monotonic_time();
229 136 return num_bytes;
230 case 507: // Insufficient Storage
231 info->error_code = kFailInsufficientStorage;
232 break;
233 case 503:
234 case 502: // Can happen if the S3 gateway-backend connection breaks
235 case 500: // sometimes see this as a transient error from S3
236 info->error_code = kFailServiceUnavailable;
237 break;
238 case 501:
239 case 400:
240 info->error_code = kFailBadRequest;
241 break;
242 case 403:
243 info->error_code = kFailForbidden;
244 break;
245 20740 case 404:
246 20740 info->error_code = kFailNotFound;
247 20740 return num_bytes;
248 default:
249 info->error_code = kFailOther;
250 }
251 return 0;
252 }
253 }
254
255
2/2
✓ Branch 0 taken 408 times.
✓ Branch 1 taken 83164 times.
83572 if (info->error_code == kFailRetry) {
256
1/2
✓ Branch 1 taken 408 times.
✗ Branch 2 not taken.
408 S3FanoutManager::DetectThrottleIndicator(header_line, info);
257 }
258
259 83572 return num_bytes;
260 125290 }
261
262
263 /**
264 * Called by curl for every new chunk to upload.
265 */
266 532814 static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb,
267 void *info_link) {
268 532814 const size_t num_bytes = size * nmemb;
269 532814 JobInfo *info = static_cast<JobInfo *>(info_link);
270
271 532814 LogCvmfs(kLogS3Fanout, kLogDebug, "Data callback with %zu bytes", num_bytes);
272
273
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 532814 times.
532814 if (num_bytes == 0)
274 return 0;
275
276 532814 const uint64_t read_bytes = info->origin->Read(ptr, num_bytes);
277
278 532814 LogCvmfs(kLogS3Fanout, kLogDebug, "source buffer pushed out %lu bytes",
279 read_bytes);
280
281 532814 return read_bytes;
282 }
283
284
285 /**
286 * Captures the HTTP response body. For kReqDeleteMulti, the response contains
287 * XML error information. For other requests, the body is ignored.
288 */
289 static size_t CallbackCurlBody(char *ptr, size_t size, size_t nmemb,
290 void *userdata) {
291 const size_t num_bytes = size * nmemb;
292 JobInfo *info = static_cast<JobInfo *>(userdata);
293 if (info != NULL && info->request == JobInfo::kReqDeleteMulti) {
294 info->response_body.append(ptr, num_bytes);
295 }
296 return num_bytes;
297 }
298
299
300 /**
301 * Called when new curl sockets arrive or existing curl sockets depart.
302 */
303 92446 int S3FanoutManager::CallbackCurlSocket(CURL *easy, curl_socket_t s, int action,
304 void *userp, void *socketp) {
305 92446 S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(userp);
306 92446 LogCvmfs(kLogS3Fanout, kLogDebug,
307 "CallbackCurlSocket called with easy "
308 "handle %p, socket %d, action %d, up %p, "
309 "sp %p, fds_inuse %d, jobs %d",
310 easy, s, action, userp, socketp, s3fanout_mgr->watch_fds_inuse_,
311 92446 s3fanout_mgr->available_jobs_->Get());
312
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 92446 times.
92446 if (action == CURL_POLL_NONE)
313 return 0;
314
315 // Find s in watch_fds_
316 // First 2 fds are job and terminate pipes (not curl related)
317 unsigned index;
318
2/2
✓ Branch 0 taken 145758 times.
✓ Branch 1 taken 41684 times.
187442 for (index = 2; index < s3fanout_mgr->watch_fds_inuse_; ++index) {
319
2/2
✓ Branch 0 taken 50762 times.
✓ Branch 1 taken 94996 times.
145758 if (s3fanout_mgr->watch_fds_[index].fd == s)
320 50762 break;
321 }
322 // Or create newly
323
2/2
✓ Branch 0 taken 41684 times.
✓ Branch 1 taken 50762 times.
92446 if (index == s3fanout_mgr->watch_fds_inuse_) {
324 // Extend array if necessary
325
2/2
✓ Branch 0 taken 68 times.
✓ Branch 1 taken 41616 times.
41684 if (s3fanout_mgr->watch_fds_inuse_ == s3fanout_mgr->watch_fds_size_) {
326 68 s3fanout_mgr->watch_fds_size_ *= 2;
327 68 s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
328 68 srealloc(s3fanout_mgr->watch_fds_,
329 68 s3fanout_mgr->watch_fds_size_ * sizeof(struct pollfd)));
330 }
331 41684 s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].fd = s;
332 41684 s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].events = 0;
333 41684 s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].revents = 0;
334 41684 s3fanout_mgr->watch_fds_inuse_++;
335 }
336
337
4/5
✓ Branch 0 taken 41684 times.
✓ Branch 1 taken 102 times.
✓ Branch 2 taken 8976 times.
✓ Branch 3 taken 41684 times.
✗ Branch 4 not taken.
92446 switch (action) {
338 41684 case CURL_POLL_IN:
339 41684 s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI;
340 41684 break;
341 102 case CURL_POLL_OUT:
342 102 s3fanout_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND;
343 102 break;
344 8976 case CURL_POLL_INOUT:
345 8976 s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI | POLLOUT
346 | POLLWRBAND;
347 8976 break;
348 41684 case CURL_POLL_REMOVE:
349
2/2
✓ Branch 0 taken 6562 times.
✓ Branch 1 taken 35122 times.
41684 if (index < s3fanout_mgr->watch_fds_inuse_ - 1)
350 s3fanout_mgr
351 6562 ->watch_fds_[index] = s3fanout_mgr->watch_fds_
352 6562 [s3fanout_mgr->watch_fds_inuse_ - 1];
353 41684 s3fanout_mgr->watch_fds_inuse_--;
354 // Shrink array if necessary
355
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41684 times.
41684 if ((s3fanout_mgr->watch_fds_inuse_ > s3fanout_mgr->watch_fds_max_)
356 && (s3fanout_mgr->watch_fds_inuse_
357 < s3fanout_mgr->watch_fds_size_ / 2)) {
358 s3fanout_mgr->watch_fds_size_ /= 2;
359 s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
360 srealloc(s3fanout_mgr->watch_fds_,
361 s3fanout_mgr->watch_fds_size_ * sizeof(struct pollfd)));
362 }
363 41684 break;
364 default:
365 PANIC(NULL);
366 }
367
368 92446 return 0;
369 }
370
371
372 /**
373 * Worker thread event loop.
374 */
375 442 void *S3FanoutManager::MainUpload(void *data) {
376
1/2
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
442 LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread started");
377 442 S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(data);
378
379 442 s3fanout_mgr->InitPipeWatchFds();
380
381 // Don't schedule more jobs into the multi handle than the maximum number of
382 // parallel connections. This should prevent starvation and thus a timeout
383 // of the authorization header (CVM-1339).
384 442 unsigned jobs_in_flight = 0;
385
386 while (true) {
387 // Check events with 100ms timeout
388 568786 const int timeout_ms = 100;
389
1/2
✓ Branch 1 taken 568786 times.
✗ Branch 2 not taken.
568786 int retval = poll(s3fanout_mgr->watch_fds_, s3fanout_mgr->watch_fds_inuse_,
390 timeout_ms);
391
2/2
✓ Branch 0 taken 3128 times.
✓ Branch 1 taken 565658 times.
568786 if (retval == 0) {
392 // Handle timeout
393 3128 int still_running = 0;
394
1/2
✓ Branch 1 taken 3128 times.
✗ Branch 2 not taken.
3128 retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
395 CURL_SOCKET_TIMEOUT, 0, &still_running);
396
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3128 times.
3128 if (retval != CURLM_OK) {
397 LogCvmfs(kLogS3Fanout, kLogStderr, "Error, timeout due to: %d", retval);
398 assert(retval == CURLM_OK);
399 }
400
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 565658 times.
565658 } else if (retval < 0) {
401 assert(errno == EINTR);
402 continue;
403 }
404
405 // Terminate I/O thread
406
2/2
✓ Branch 0 taken 442 times.
✓ Branch 1 taken 568344 times.
568786 if (s3fanout_mgr->watch_fds_[0].revents)
407 442 break;
408
409 // New job incoming
410
2/2
✓ Branch 0 taken 20910 times.
✓ Branch 1 taken 547434 times.
568344 if (s3fanout_mgr->watch_fds_[1].revents) {
411 20910 s3fanout_mgr->watch_fds_[1].revents = 0;
412 JobInfo *info;
413
1/2
✓ Branch 1 taken 20910 times.
✗ Branch 2 not taken.
20910 ReadPipe(s3fanout_mgr->pipe_jobs_[0], &info, sizeof(info));
414
1/2
✓ Branch 1 taken 20910 times.
✗ Branch 2 not taken.
20910 CURL *handle = s3fanout_mgr->AcquireCurlHandle();
415
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20910 times.
20910 if (handle == NULL) {
416 PANIC(kLogStderr, "Failed to acquire CURL handle.");
417 }
418
1/2
✓ Branch 1 taken 20910 times.
✗ Branch 2 not taken.
20910 const s3fanout::Failures init_failure = s3fanout_mgr->InitializeRequest(
419 info, handle);
420
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20910 times.
20910 if (init_failure != s3fanout::kFailOk) {
421 PANIC(kLogStderr,
422 "Failed to initialize CURL handle (error: %d - %s | errno: %d)",
423 init_failure, Code2Ascii(init_failure), errno);
424 }
425
1/2
✓ Branch 1 taken 20910 times.
✗ Branch 2 not taken.
20910 s3fanout_mgr->SetUrlOptions(info);
426
427
1/2
✓ Branch 1 taken 20910 times.
✗ Branch 2 not taken.
20910 curl_multi_add_handle(s3fanout_mgr->curl_multi_, handle);
428
1/2
✓ Branch 1 taken 20910 times.
✗ Branch 2 not taken.
20910 s3fanout_mgr->active_requests_->insert(info);
429 20910 jobs_in_flight++;
430 20910 int still_running = 0, retval = 0;
431
1/2
✓ Branch 1 taken 20910 times.
✗ Branch 2 not taken.
20910 retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
432 CURL_SOCKET_TIMEOUT, 0, &still_running);
433
434
1/2
✓ Branch 1 taken 20910 times.
✗ Branch 2 not taken.
20910 LogCvmfs(kLogS3Fanout, kLogDebug, "curl_multi_socket_action: %d - %d",
435 retval, still_running);
436 }
437
438
439 // Activity on curl sockets
440 // Within this loop the curl_multi_socket_action() may cause socket(s)
441 // to be removed from watch_fds_. If a socket is removed it is replaced
442 // by the socket at the end of the array and the inuse count is decreased.
443 // Therefore loop over the array in reverse order.
444 // First 2 fds are job and terminate pipes (not curl related)
445
2/2
✓ Branch 0 taken 1280508 times.
✓ Branch 1 taken 568344 times.
1848852 for (int32_t i = s3fanout_mgr->watch_fds_inuse_ - 1; i >= 2; --i) {
446
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1280508 times.
1280508 if (static_cast<uint32_t>(i) >= s3fanout_mgr->watch_fds_inuse_) {
447 continue;
448 }
449
2/2
✓ Branch 0 taken 580312 times.
✓ Branch 1 taken 700196 times.
1280508 if (s3fanout_mgr->watch_fds_[i].revents) {
450 580312 int ev_bitmask = 0;
451
2/2
✓ Branch 0 taken 62424 times.
✓ Branch 1 taken 517888 times.
580312 if (s3fanout_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI))
452 62424 ev_bitmask |= CURL_CSELECT_IN;
453
2/2
✓ Branch 0 taken 517888 times.
✓ Branch 1 taken 62424 times.
580312 if (s3fanout_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
454 517888 ev_bitmask |= CURL_CSELECT_OUT;
455 580312 if (s3fanout_mgr->watch_fds_[i].revents
456
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 580312 times.
580312 & (POLLERR | POLLHUP | POLLNVAL))
457 ev_bitmask |= CURL_CSELECT_ERR;
458 580312 s3fanout_mgr->watch_fds_[i].revents = 0;
459
460 580312 int still_running = 0;
461 580312 retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
462
1/2
✓ Branch 1 taken 580312 times.
✗ Branch 2 not taken.
580312 s3fanout_mgr->watch_fds_[i].fd,
463 ev_bitmask,
464 &still_running);
465 }
466 }
467
468 // Check if transfers are completed
469 CURLMsg *curl_msg;
470 int msgs_in_queue;
471
3/4
✓ Branch 1 taken 610062 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 41718 times.
✓ Branch 4 taken 568344 times.
610062 while ((curl_msg = curl_multi_info_read(s3fanout_mgr->curl_multi_,
472 &msgs_in_queue))) {
473
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41718 times.
41718 assert(curl_msg->msg == CURLMSG_DONE);
474
475 41718 s3fanout_mgr->statistics_->num_requests++;
476 JobInfo *info;
477 41718 CURL *easy_handle = curl_msg->easy_handle;
478 41718 const int curl_error = curl_msg->data.result;
479
1/2
✓ Branch 1 taken 41718 times.
✗ Branch 2 not taken.
41718 curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
480
481
1/2
✓ Branch 1 taken 41718 times.
✗ Branch 2 not taken.
41718 curl_multi_remove_handle(s3fanout_mgr->curl_multi_, easy_handle);
482
3/4
✓ Branch 1 taken 41718 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 20808 times.
✓ Branch 4 taken 20910 times.
41718 if (s3fanout_mgr->VerifyAndFinalize(curl_error, info)) {
483
1/2
✓ Branch 1 taken 20808 times.
✗ Branch 2 not taken.
20808 curl_multi_add_handle(s3fanout_mgr->curl_multi_, easy_handle);
484 20808 int still_running = 0;
485
1/2
✓ Branch 1 taken 20808 times.
✗ Branch 2 not taken.
20808 curl_multi_socket_action(s3fanout_mgr->curl_multi_, CURL_SOCKET_TIMEOUT,
486 0, &still_running);
487 } else {
488 // Return easy handle into pool and write result back
489 20910 jobs_in_flight--;
490
1/2
✓ Branch 1 taken 20910 times.
✗ Branch 2 not taken.
20910 s3fanout_mgr->active_requests_->erase(info);
491
1/2
✓ Branch 1 taken 20910 times.
✗ Branch 2 not taken.
20910 s3fanout_mgr->ReleaseCurlHandle(info, easy_handle);
492
1/2
✓ Branch 1 taken 20910 times.
✗ Branch 2 not taken.
20910 s3fanout_mgr->available_jobs_->Decrement();
493
494 // Add to list of completed jobs
495
1/2
✓ Branch 1 taken 20910 times.
✗ Branch 2 not taken.
20910 s3fanout_mgr->PushCompletedJob(info);
496 }
497 }
498 568344 }
499
500 442 set<CURL *>::iterator i = s3fanout_mgr->pool_handles_inuse_->begin();
501 442 const set<CURL *>::const_iterator i_end = s3fanout_mgr->pool_handles_inuse_
502 442 ->end();
503
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 442 times.
442 for (; i != i_end; ++i) {
504 curl_multi_remove_handle(s3fanout_mgr->curl_multi_, *i);
505 curl_easy_cleanup(*i);
506 }
507 442 s3fanout_mgr->pool_handles_inuse_->clear();
508 442 free(s3fanout_mgr->watch_fds_);
509
510
1/2
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
442 LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread terminated");
511 442 return NULL;
512 }
513
514
515 /**
516 * Gets an idle CURL handle from the pool. Creates a new one and adds it to
517 * the pool if necessary.
518 */
519 20910 CURL *S3FanoutManager::AcquireCurlHandle() const {
520 CURL *handle;
521
522 20910 const MutexLockGuard guard(curl_handle_lock_);
523
524
2/2
✓ Branch 1 taken 1700 times.
✓ Branch 2 taken 19210 times.
20910 if (pool_handles_idle_->empty()) {
525 CURLcode retval;
526
527 // Create a new handle
528
1/2
✓ Branch 1 taken 1700 times.
✗ Branch 2 not taken.
1700 handle = curl_easy_init();
529
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1700 times.
1700 assert(handle != NULL);
530
531 // Other settings
532
1/2
✓ Branch 1 taken 1700 times.
✗ Branch 2 not taken.
1700 retval = curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
533
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1700 times.
1700 assert(retval == CURLE_OK);
534
1/2
✓ Branch 1 taken 1700 times.
✗ Branch 2 not taken.
1700 retval = curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION,
535 CallbackCurlHeader);
536
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1700 times.
1700 assert(retval == CURLE_OK);
537
1/2
✓ Branch 1 taken 1700 times.
✗ Branch 2 not taken.
1700 retval = curl_easy_setopt(handle, CURLOPT_READFUNCTION, CallbackCurlData);
538
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1700 times.
1700 assert(retval == CURLE_OK);
539
1/2
✓ Branch 1 taken 1700 times.
✗ Branch 2 not taken.
1700 retval = curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlBody);
540
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1700 times.
1700 assert(retval == CURLE_OK);
541 // WRITEDATA is set per-request in InitializeRequest
542 } else {
543 19210 handle = *(pool_handles_idle_->begin());
544
1/2
✓ Branch 2 taken 19210 times.
✗ Branch 3 not taken.
19210 pool_handles_idle_->erase(pool_handles_idle_->begin());
545 }
546
547
1/2
✓ Branch 1 taken 20910 times.
✗ Branch 2 not taken.
20910 pool_handles_inuse_->insert(handle);
548
549 20910 return handle;
550 20910 }
551
552
553 20910 void S3FanoutManager::ReleaseCurlHandle(JobInfo *info, CURL *handle) const {
554
1/2
✓ Branch 0 taken 20910 times.
✗ Branch 1 not taken.
20910 if (info->http_headers) {
555
1/2
✓ Branch 1 taken 20910 times.
✗ Branch 2 not taken.
20910 curl_slist_free_all(info->http_headers);
556 20910 info->http_headers = NULL;
557 }
558
559 20910 const MutexLockGuard guard(curl_handle_lock_);
560
561
1/2
✓ Branch 1 taken 20910 times.
✗ Branch 2 not taken.
20910 const set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
562
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 20910 times.
20910 assert(elem != pool_handles_inuse_->end());
563
564
2/2
✓ Branch 1 taken 986 times.
✓ Branch 2 taken 19924 times.
20910 if (pool_handles_idle_->size() > config_.pool_max_handles) {
565
1/2
✓ Branch 1 taken 986 times.
✗ Branch 2 not taken.
986 const CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, NULL);
566
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 986 times.
986 assert(retval == CURLE_OK);
567
1/2
✓ Branch 1 taken 986 times.
✗ Branch 2 not taken.
986 curl_easy_cleanup(handle);
568 const std::map<CURL *, S3FanOutDnsEntry *>::size_type
569
1/2
✓ Branch 1 taken 986 times.
✗ Branch 2 not taken.
986 retitems = curl_sharehandles_->erase(handle);
570
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 986 times.
986 assert(retitems == 1);
571 } else {
572
1/2
✓ Branch 1 taken 19924 times.
✗ Branch 2 not taken.
19924 pool_handles_idle_->insert(handle);
573 }
574
575
1/2
✓ Branch 1 taken 20910 times.
✗ Branch 2 not taken.
20910 pool_handles_inuse_->erase(elem);
576 20910 }
577
578 442 void S3FanoutManager::InitPipeWatchFds() {
579
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 442 times.
442 assert(watch_fds_inuse_ == 0);
580
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 442 times.
442 assert(watch_fds_size_ >= 2);
581 442 watch_fds_[0].fd = pipe_terminate_[0];
582 442 watch_fds_[0].events = POLLIN | POLLPRI;
583 442 watch_fds_[0].revents = 0;
584 442 ++watch_fds_inuse_;
585 442 watch_fds_[1].fd = pipe_jobs_[0];
586 442 watch_fds_[1].events = POLLIN | POLLPRI;
587 442 watch_fds_[1].revents = 0;
588 442 ++watch_fds_inuse_;
589 442 }
590
591 /**
592 * The Amazon AWS 2 authorization header according to
593 * http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html#ConstructingTheAuthenticationHeader
594 */
595 41582 bool S3FanoutManager::MkV2Authz(const JobInfo &info,
596 vector<string> *headers) const {
597 41582 string payload_hash;
598
1/2
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
41582 const bool retval = MkPayloadHash(info, &payload_hash);
599
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41582 times.
41582 if (!retval)
600 return false;
601
1/2
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
41582 const string content_type = GetContentType(info);
602
1/2
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
41582 const string request = GetRequestString(info);
603
604
1/2
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
41582 const string timestamp = RfcTimestamp();
605
5/10
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 41582 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 41582 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 41582 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 41582 times.
✗ Branch 14 not taken.
83164 string to_sign = request + "\n" + payload_hash + "\n" + content_type + "\n"
606
2/4
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 41582 times.
✗ Branch 5 not taken.
83164 + timestamp + "\n";
607
2/4
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 41582 times.
✗ Branch 4 not taken.
41582 if (config_.x_amz_acl != "") {
608
2/2
✓ Branch 1 taken 68 times.
✓ Branch 2 taken 41514 times.
41582 if (info.object_key.empty()) {
609
2/4
✓ Branch 1 taken 68 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 68 times.
✗ Branch 5 not taken.
136 to_sign += "x-amz-acl:" + config_.x_amz_acl + "\n" + // default ACL
610
3/6
✓ Branch 1 taken 68 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 68 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 68 times.
✗ Branch 8 not taken.
136 "/" + config_.bucket;
611 } else {
612
2/4
✓ Branch 1 taken 41514 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 41514 times.
✗ Branch 5 not taken.
83028 to_sign += "x-amz-acl:" + config_.x_amz_acl + "\n" + // default ACL
613
5/10
✓ Branch 1 taken 41514 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 41514 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 41514 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 41514 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 41514 times.
✗ Branch 14 not taken.
83028 "/" + config_.bucket + "/" + info.object_key;
614 }
615 }
616 // S3 V2 requires subresources to be included in the string to sign.
617 // For kReqDeleteMulti, object_key is intentionally empty: the canonical
618 // resource becomes "/<bucket>?delete", matching the POST URL.
619
2/2
✓ Branch 0 taken 68 times.
✓ Branch 1 taken 41514 times.
41582 if (info.request == JobInfo::kReqDeleteMulti) {
620
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 68 times.
68 if (config_.x_amz_acl == "")
621 to_sign += "/" + config_.bucket;
622
1/2
✓ Branch 1 taken 68 times.
✗ Branch 2 not taken.
68 to_sign += "?delete";
623 }
624
1/2
✓ Branch 3 taken 41582 times.
✗ Branch 4 not taken.
41582 LogCvmfs(kLogS3Fanout, kLogDebug, "%s string to sign for: %s",
625 request.c_str(), info.object_key.c_str());
626
627
1/2
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
41582 shash::Any hmac;
628 41582 hmac.algorithm = shash::kSha1;
629
1/2
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
41582 shash::Hmac(config_.secret_key,
630 41582 reinterpret_cast<const unsigned char *>(to_sign.data()),
631 41582 to_sign.length(), &hmac);
632
633
3/6
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 41582 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 41582 times.
✗ Branch 8 not taken.
124746 headers->push_back("Authorization: AWS " + config_.access_key + ":"
634
3/6
✓ Branch 2 taken 41582 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 41582 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 41582 times.
✗ Branch 9 not taken.
207910 + Base64(string(reinterpret_cast<char *>(hmac.digest),
635 41582 hmac.GetDigestSize())));
636
2/4
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 41582 times.
✗ Branch 5 not taken.
41582 headers->push_back("Date: " + timestamp);
637
2/4
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 41582 times.
✗ Branch 5 not taken.
41582 headers->push_back("X-Amz-Acl: " + config_.x_amz_acl);
638
2/2
✓ Branch 1 taken 20740 times.
✓ Branch 2 taken 20842 times.
41582 if (!payload_hash.empty())
639
2/4
✓ Branch 1 taken 20740 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 20740 times.
✗ Branch 5 not taken.
20740 headers->push_back("Content-MD5: " + payload_hash);
640
2/2
✓ Branch 1 taken 20740 times.
✓ Branch 2 taken 20842 times.
41582 if (!content_type.empty())
641
2/4
✓ Branch 1 taken 20740 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 20740 times.
✗ Branch 5 not taken.
20740 headers->push_back("Content-Type: " + content_type);
642 41582 return true;
643 41582 }
644
645
646 string S3FanoutManager::GetUriEncode(const string &val,
647 bool encode_slash) const {
648 string result;
649 const unsigned len = val.length();
650 result.reserve(len);
651 for (unsigned i = 0; i < len; ++i) {
652 const char c = val[i];
653 if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')
654 || (c >= '0' && c <= '9') || c == '_' || c == '-' || c == '~'
655 || c == '.') {
656 result.push_back(c);
657 } else if (c == '/') {
658 if (encode_slash) {
659 result += "%2F";
660 } else {
661 result.push_back(c);
662 }
663 } else {
664 result.push_back('%');
665 result.push_back((c / 16) + ((c / 16 <= 9) ? '0' : 'A' - 10));
666 result.push_back((c % 16) + ((c % 16 <= 9) ? '0' : 'A' - 10));
667 }
668 }
669 return result;
670 }
671
672
673 string S3FanoutManager::GetAwsV4SigningKey(const string &date) const {
674 if (last_signing_key_.first == date)
675 return last_signing_key_.second;
676
677 const string date_key = shash::Hmac256("AWS4" + config_.secret_key, date,
678 true);
679 const string date_region_key = shash::Hmac256(date_key, config_.region, true);
680 const string date_region_service_key = shash::Hmac256(date_region_key, "s3",
681 true);
682 string signing_key = shash::Hmac256(date_region_service_key, "aws4_request",
683 true);
684 last_signing_key_.first = date;
685 last_signing_key_.second = signing_key;
686 return signing_key;
687 }
688
689
690 /**
691 * The Amazon AWS4 authorization header according to
692 * http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-auth-using-authorization-header.html
693 */
694 bool S3FanoutManager::MkV4Authz(const JobInfo &info,
695 vector<string> *headers) const {
696 string payload_hash;
697 const bool retval = MkPayloadHash(info, &payload_hash);
698 if (!retval)
699 return false;
700 const string content_type = GetContentType(info);
701 const string timestamp = IsoTimestamp();
702 const string date = timestamp.substr(0, 8);
703 vector<string> tokens = SplitString(complete_hostname_, ':');
704 assert(tokens.size() <= 2);
705 string canonical_hostname = tokens[0];
706
707 // if we could split the hostname in two and if the port is *NOT* a default
708 // one
709 if (tokens.size() == 2
710 && !((String2Uint64(tokens[1]) == kDefaultHTTPPort)
711 || (String2Uint64(tokens[1]) == kDefaultHTTPSPort)))
712 canonical_hostname += ":" + tokens[1];
713
714 string signed_headers;
715 string canonical_headers;
716 if (!content_type.empty()) {
717 signed_headers += "content-type;";
718 headers->push_back("Content-Type: " + content_type);
719 canonical_headers += "content-type:" + content_type + "\n";
720 }
721 if (config_.x_amz_acl != "") {
722 signed_headers += "host;x-amz-acl;x-amz-content-sha256;x-amz-date";
723 } else {
724 signed_headers += "host;x-amz-content-sha256;x-amz-date";
725 }
726 canonical_headers += "host:" + canonical_hostname + "\n";
727 if (config_.x_amz_acl != "") {
728 canonical_headers += "x-amz-acl:" + config_.x_amz_acl + "\n";
729 }
730 canonical_headers += "x-amz-content-sha256:" + payload_hash + "\n"
731 + "x-amz-date:" + timestamp + "\n";
732
733 const string scope = date + "/" + config_.region + "/s3/aws4_request";
734 string uri;
735 if (config_.dns_buckets) {
736 uri = string("/") + info.object_key;
737 } else if (info.object_key.empty()) {
738 uri = string("/") + config_.bucket;
739 } else {
740 uri = string("/") + config_.bucket + "/" + info.object_key;
741 }
742
743 // V4 canonical query string: empty for most requests, "delete=" for
744 // multi-object delete (the S3 ?delete parameter has no value)
745 const string canonical_query = (info.request == JobInfo::kReqDeleteMulti)
746 ? "delete="
747 : "";
748
749 const string canonical_request = GetRequestString(info) + "\n"
750 + GetUriEncode(uri, false) + "\n"
751 + canonical_query + "\n"
752 + canonical_headers + "\n" + signed_headers
753 + "\n" + payload_hash;
754
755 const string hash_request = shash::Sha256String(canonical_request.c_str());
756
757 const string string_to_sign = "AWS4-HMAC-SHA256\n" + timestamp + "\n" + scope
758 + "\n" + hash_request;
759
760 const string signing_key = GetAwsV4SigningKey(date);
761 const string signature = shash::Hmac256(signing_key, string_to_sign);
762
763 headers->push_back("X-Amz-Acl: " + config_.x_amz_acl);
764 headers->push_back("X-Amz-Content-Sha256: " + payload_hash);
765 headers->push_back("X-Amz-Date: " + timestamp);
766 headers->push_back("Authorization: AWS4-HMAC-SHA256 "
767 "Credential="
768 + config_.access_key + "/" + scope
769 + ","
770 "SignedHeaders="
771 + signed_headers
772 + ","
773 "Signature="
774 + signature);
775
776 // S3 requires Content-MD5 for multi-object delete
777 if (info.request == JobInfo::kReqDeleteMulti) {
778 shash::Any md5_hash(shash::kMd5);
779 unsigned char *data;
780 const unsigned int nbytes = info.origin->Data(
781 reinterpret_cast<void **>(&data), info.origin->GetSize(), 0);
782 assert(nbytes == info.origin->GetSize());
783 shash::HashMem(data, nbytes, &md5_hash);
784 headers->push_back(
785 "Content-MD5: "
786 + Base64(string(reinterpret_cast<char *>(md5_hash.digest),
787 md5_hash.GetDigestSize())));
788 }
789 return true;
790 }
791
792 /**
793 * The Azure Blob authorization header according to
794 * https://docs.microsoft.com/en-us/rest/api/storageservices/authorize-with-shared-key
795 */
796 bool S3FanoutManager::MkAzureAuthz(const JobInfo &info,
797 vector<string> *headers) const {
798 const string timestamp = RfcTimestamp();
799 const string canonical_headers = "x-ms-blob-type:BlockBlob\nx-ms-date:"
800 + timestamp + "\nx-ms-version:2011-08-18";
801 const string canonical_resource = "/" + config_.access_key + "/"
802 + config_.bucket + "/" + info.object_key;
803
804 string string_to_sign;
805 if ((info.request == JobInfo::kReqHeadOnly)
806 || (info.request == JobInfo::kReqHeadPut)
807 || (info.request == JobInfo::kReqDelete)) {
808 string_to_sign = GetRequestString(info) + string("\n\n\n")
809 + "\n\n\n\n\n\n\n\n\n" + canonical_headers + "\n"
810 + canonical_resource;
811 } else {
812 string_to_sign = GetRequestString(info) + string("\n\n\n")
813 + string(StringifyInt(info.origin->GetSize()))
814 + "\n\n\n\n\n\n\n\n\n" + canonical_headers + "\n"
815 + canonical_resource;
816 }
817
818 string signing_key;
819 const int retval = Debase64(config_.secret_key, &signing_key);
820 if (!retval)
821 return false;
822
823 const string signature = shash::Hmac256(signing_key, string_to_sign, true);
824
825 headers->push_back("x-ms-date: " + timestamp);
826 headers->push_back("x-ms-version: 2011-08-18");
827 headers->push_back("Authorization: SharedKey " + config_.access_key + ":"
828 + Base64(signature));
829 headers->push_back("x-ms-blob-type: BlockBlob");
830 return true;
831 }
832
833 41582 void S3FanoutManager::InitializeDnsSettingsCurl(CURL *handle,
834 CURLSH *sharehandle,
835 curl_slist *clist) const {
836 41582 CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, sharehandle);
837
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41582 times.
41582 assert(retval == CURLE_OK);
838 41582 retval = curl_easy_setopt(handle, CURLOPT_RESOLVE, clist);
839
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41582 times.
41582 assert(retval == CURLE_OK);
840 41582 }
841
842
843 41582 int S3FanoutManager::InitializeDnsSettings(CURL *handle,
844 std::string host_with_port) const {
845 // Use existing handle
846 const std::map<CURL *, S3FanOutDnsEntry *>::const_iterator
847
1/2
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
41582 it = curl_sharehandles_->find(handle);
848
2/2
✓ Branch 3 taken 39882 times.
✓ Branch 4 taken 1700 times.
41582 if (it != curl_sharehandles_->end()) {
849
1/2
✓ Branch 1 taken 39882 times.
✗ Branch 2 not taken.
39882 InitializeDnsSettingsCurl(handle, it->second->sharehandle,
850 39882 it->second->clist);
851 39882 return 0;
852 }
853
854 // Add protocol information for extraction of fields for DNS
855
2/4
✓ Branch 1 taken 1700 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1700 times.
✗ Branch 4 not taken.
1700 if (!IsHttpUrl(host_with_port))
856
2/4
✓ Branch 1 taken 1700 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1700 times.
✗ Branch 5 not taken.
1700 host_with_port = config_.protocol + "://" + host_with_port;
857
1/2
✓ Branch 1 taken 1700 times.
✗ Branch 2 not taken.
1700 const std::string remote_host = dns::ExtractHost(host_with_port);
858
1/2
✓ Branch 1 taken 1700 times.
✗ Branch 2 not taken.
1700 const std::string remote_port = dns::ExtractPort(host_with_port);
859
860 // If we have the name already resolved, use the least used IP
861 1700 S3FanOutDnsEntry *useme = NULL;
862 1700 unsigned int usemin = UINT_MAX;
863 1700 std::set<S3FanOutDnsEntry *>::iterator its3 = sharehandles_->begin();
864
2/2
✓ Branch 3 taken 1326 times.
✓ Branch 4 taken 1700 times.
3026 for (; its3 != sharehandles_->end(); ++its3) {
865
1/2
✓ Branch 2 taken 1326 times.
✗ Branch 3 not taken.
1326 if ((*its3)->dns_name == remote_host) {
866
1/2
✓ Branch 1 taken 1326 times.
✗ Branch 2 not taken.
1326 if (usemin >= (*its3)->counter) {
867 1326 usemin = (*its3)->counter;
868 1326 useme = (*its3);
869 }
870 }
871 }
872
2/2
✓ Branch 0 taken 1326 times.
✓ Branch 1 taken 374 times.
1700 if (useme != NULL) {
873
1/2
✓ Branch 1 taken 1326 times.
✗ Branch 2 not taken.
1326 curl_sharehandles_->insert(
874 1326 std::pair<CURL *, S3FanOutDnsEntry *>(handle, useme));
875 1326 useme->counter++;
876
1/2
✓ Branch 1 taken 1326 times.
✗ Branch 2 not taken.
1326 InitializeDnsSettingsCurl(handle, useme->sharehandle, useme->clist);
877 1326 return 0;
878 }
879
880 // We need to resolve the hostname
881 // TODO(ssheikki): support ipv6 also... if (opt_ipv4_only_)
882
1/2
✓ Branch 1 taken 374 times.
✗ Branch 2 not taken.
374 const dns::Host host = resolver_->Resolve(remote_host);
883
1/2
✓ Branch 2 taken 374 times.
✗ Branch 3 not taken.
374 set<string> ipv4_addresses = host.ipv4_addresses();
884 374 std::set<string>::iterator its = ipv4_addresses.begin();
885 374 S3FanOutDnsEntry *dnse = NULL;
886
2/2
✓ Branch 3 taken 374 times.
✓ Branch 4 taken 374 times.
748 for (; its != ipv4_addresses.end(); ++its) {
887
2/4
✓ Branch 1 taken 374 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 374 times.
✗ Branch 5 not taken.
374 dnse = new S3FanOutDnsEntry();
888 374 dnse->counter = 0;
889
1/2
✓ Branch 1 taken 374 times.
✗ Branch 2 not taken.
374 dnse->dns_name = remote_host;
890
4/12
✗ Branch 1 not taken.
✓ Branch 2 taken 374 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 8 taken 374 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 374 times.
✗ Branch 12 not taken.
✗ Branch 14 not taken.
✓ Branch 15 taken 374 times.
✗ Branch 18 not taken.
✗ Branch 19 not taken.
374 dnse->port = remote_port.size() == 0 ? "80" : remote_port;
891
1/2
✓ Branch 2 taken 374 times.
✗ Branch 3 not taken.
374 dnse->ip = *its;
892 374 dnse->clist = NULL;
893
1/2
✓ Branch 2 taken 374 times.
✗ Branch 3 not taken.
374 dnse->clist = curl_slist_append(
894 dnse->clist,
895
4/8
✓ Branch 1 taken 374 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 374 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 374 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 374 times.
✗ Branch 11 not taken.
748 (dnse->dns_name + ":" + dnse->port + ":" + dnse->ip).c_str());
896
1/2
✓ Branch 1 taken 374 times.
✗ Branch 2 not taken.
374 dnse->sharehandle = curl_share_init();
897
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 374 times.
374 assert(dnse->sharehandle != NULL);
898
1/2
✓ Branch 1 taken 374 times.
✗ Branch 2 not taken.
374 const CURLSHcode share_retval = curl_share_setopt(
899 dnse->sharehandle, CURLSHOPT_SHARE, CURL_LOCK_DATA_DNS);
900
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 374 times.
374 assert(share_retval == CURLSHE_OK);
901
1/2
✓ Branch 1 taken 374 times.
✗ Branch 2 not taken.
374 sharehandles_->insert(dnse);
902 }
903
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 374 times.
374 if (dnse == NULL) {
904 LogCvmfs(kLogS3Fanout, kLogStderr | kLogSyslogErr,
905 "Error: DNS resolve failed for address '%s'.",
906 remote_host.c_str());
907 assert(dnse != NULL);
908 return -1;
909 }
910
1/2
✓ Branch 1 taken 374 times.
✗ Branch 2 not taken.
374 curl_sharehandles_->insert(
911 374 std::pair<CURL *, S3FanOutDnsEntry *>(handle, dnse));
912 374 dnse->counter++;
913
1/2
✓ Branch 1 taken 374 times.
✗ Branch 2 not taken.
374 InitializeDnsSettingsCurl(handle, dnse->sharehandle, dnse->clist);
914
915 374 return 0;
916 1700 }
917
918
919 41582 bool S3FanoutManager::MkPayloadHash(const JobInfo &info,
920 string *hex_hash) const {
921
2/2
✓ Branch 0 taken 41412 times.
✓ Branch 1 taken 170 times.
41582 if (info.request == JobInfo::kReqHeadOnly
922
2/2
✓ Branch 0 taken 20740 times.
✓ Branch 1 taken 20672 times.
41412 || info.request == JobInfo::kReqHeadPut
923
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20740 times.
20740 || info.request == JobInfo::kReqDelete) {
924
1/4
✓ Branch 0 taken 20842 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
20842 switch (config_.authz_method) {
925 20842 case kAuthzAwsV2:
926 20842 hex_hash->clear();
927 20842 break;
928 case kAuthzAwsV4:
929 // Sha256 over empty string
930 *hex_hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b78"
931 "52b855";
932 break;
933 case kAuthzAzure:
934 // no payload hash required for Azure signature
935 hex_hash->clear();
936 break;
937 default:
938 PANIC(NULL);
939 }
940 20842 return true;
941 }
942
943 // kReqDeleteMulti is a POST with a body (XML payload), same hashing as PUT
944 // falls through intentionally.
945
946 // PUT or POST with payload
947
1/2
✓ Branch 1 taken 20740 times.
✗ Branch 2 not taken.
20740 shash::Any payload_hash(shash::kMd5);
948
949 unsigned char *data;
950 20740 const unsigned int nbytes = info.origin->Data(
951
2/4
✓ Branch 2 taken 20740 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 20740 times.
✗ Branch 6 not taken.
20740 reinterpret_cast<void **>(&data), info.origin->GetSize(), 0);
952
2/4
✓ Branch 2 taken 20740 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 20740 times.
20740 assert(nbytes == info.origin->GetSize());
953
954
1/4
✓ Branch 0 taken 20740 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
20740 switch (config_.authz_method) {
955 20740 case kAuthzAwsV2:
956
1/2
✓ Branch 1 taken 20740 times.
✗ Branch 2 not taken.
20740 shash::HashMem(data, nbytes, &payload_hash);
957
2/4
✓ Branch 2 taken 20740 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 20740 times.
✗ Branch 6 not taken.
62220 *hex_hash = Base64(string(reinterpret_cast<char *>(payload_hash.digest),
958 41480 payload_hash.GetDigestSize()));
959 20740 return true;
960 case kAuthzAwsV4:
961 *hex_hash = shash::Sha256Mem(data, nbytes);
962 return true;
963 case kAuthzAzure:
964 // no payload hash required for Azure signature
965 hex_hash->clear();
966 return true;
967 default:
968 PANIC(NULL);
969 }
970 }
971
972 41582 string S3FanoutManager::GetRequestString(const JobInfo &info) const {
973
3/5
✓ Branch 0 taken 20842 times.
✓ Branch 1 taken 20672 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 68 times.
✗ Branch 4 not taken.
41582 switch (info.request) {
974 20842 case JobInfo::kReqHeadOnly:
975 case JobInfo::kReqHeadPut:
976
1/2
✓ Branch 2 taken 20842 times.
✗ Branch 3 not taken.
20842 return "HEAD";
977 20672 case JobInfo::kReqPutCas:
978 case JobInfo::kReqPutDotCvmfs:
979 case JobInfo::kReqPutHtml:
980 case JobInfo::kReqPutBucket:
981
1/2
✓ Branch 2 taken 20672 times.
✗ Branch 3 not taken.
20672 return "PUT";
982 case JobInfo::kReqDelete:
983 return "DELETE";
984 68 case JobInfo::kReqDeleteMulti:
985
1/2
✓ Branch 2 taken 68 times.
✗ Branch 3 not taken.
68 return "POST";
986 default:
987 PANIC(NULL);
988 }
989 }
990
991
992 41582 string S3FanoutManager::GetContentType(const JobInfo &info) const {
993
3/6
✓ Branch 0 taken 20842 times.
✓ Branch 1 taken 20672 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 68 times.
✗ Branch 5 not taken.
41582 switch (info.request) {
994 20842 case JobInfo::kReqHeadOnly:
995 case JobInfo::kReqHeadPut:
996 case JobInfo::kReqDelete:
997
1/2
✓ Branch 2 taken 20842 times.
✗ Branch 3 not taken.
20842 return "";
998 20672 case JobInfo::kReqPutCas:
999
1/2
✓ Branch 2 taken 20672 times.
✗ Branch 3 not taken.
20672 return "application/octet-stream";
1000 case JobInfo::kReqPutDotCvmfs:
1001 return "application/x-cvmfs";
1002 case JobInfo::kReqPutHtml:
1003 return "text/html";
1004 68 case JobInfo::kReqPutBucket:
1005 case JobInfo::kReqDeleteMulti:
1006
1/2
✓ Branch 2 taken 68 times.
✗ Branch 3 not taken.
68 return "text/xml";
1007 default:
1008 PANIC(NULL);
1009 }
1010 }
1011
1012
1013 /**
1014 * Request parameters set the URL and other options such as timeout and
1015 * proxy.
1016 */
1017 41582 Failures S3FanoutManager::InitializeRequest(JobInfo *info, CURL *handle) const {
1018 // Initialize internal download state
1019 41582 info->curl_handle = handle;
1020 41582 info->error_code = kFailOk;
1021 41582 info->http_error = 0;
1022 41582 info->num_retries = 0;
1023 41582 info->backoff_ms = 0;
1024 41582 info->throttle_ms = 0;
1025 41582 info->throttle_timestamp = 0;
1026 41582 info->http_headers = NULL;
1027 // info->payload_size is needed in S3Uploader::MainCollectResults,
1028 // where info->origin is already destroyed.
1029
1/2
✓ Branch 2 taken 41582 times.
✗ Branch 3 not taken.
41582 info->payload_size = info->origin->GetSize();
1030
1031
2/4
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 41582 times.
✗ Branch 5 not taken.
41582 InitializeDnsSettings(handle, complete_hostname_);
1032
1033 CURLcode retval;
1034
2/2
✓ Branch 0 taken 41412 times.
✓ Branch 1 taken 170 times.
41582 if (info->request == JobInfo::kReqHeadOnly
1035
2/2
✓ Branch 0 taken 20740 times.
✓ Branch 1 taken 20672 times.
41412 || info->request == JobInfo::kReqHeadPut
1036
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20740 times.
20740 || info->request == JobInfo::kReqDelete) {
1037
1/2
✓ Branch 1 taken 20842 times.
✗ Branch 2 not taken.
20842 retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 0);
1038
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20842 times.
20842 assert(retval == CURLE_OK);
1039
1/2
✓ Branch 1 taken 20842 times.
✗ Branch 2 not taken.
20842 retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
1040
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20842 times.
20842 assert(retval == CURLE_OK);
1041
1042
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20842 times.
20842 if (info->request == JobInfo::kReqDelete) {
1043 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST,
1044 GetRequestString(*info).c_str());
1045 assert(retval == CURLE_OK);
1046 } else {
1047
1/2
✓ Branch 1 taken 20842 times.
✗ Branch 2 not taken.
20842 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
1048
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20842 times.
20842 assert(retval == CURLE_OK);
1049 }
1050
2/2
✓ Branch 0 taken 68 times.
✓ Branch 1 taken 20672 times.
20740 } else if (info->request == JobInfo::kReqDeleteMulti) {
1051 // POST request with XML body read from origin buffer
1052
1/2
✓ Branch 1 taken 68 times.
✗ Branch 2 not taken.
68 retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 1);
1053
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 68 times.
68 assert(retval == CURLE_OK);
1054
1/2
✓ Branch 1 taken 68 times.
✗ Branch 2 not taken.
68 retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 0);
1055
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 68 times.
68 assert(retval == CURLE_OK);
1056
1/2
✓ Branch 1 taken 68 times.
✗ Branch 2 not taken.
68 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, "POST");
1057
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 68 times.
68 assert(retval == CURLE_OK);
1058
2/4
✓ Branch 2 taken 68 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 68 times.
✗ Branch 6 not taken.
68 retval = curl_easy_setopt(handle, CURLOPT_INFILESIZE_LARGE,
1059 static_cast<curl_off_t>(info->origin->GetSize()));
1060
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 68 times.
68 assert(retval == CURLE_OK);
1061 68 info->response_body.clear();
1062 } else {
1063
1/2
✓ Branch 1 taken 20672 times.
✗ Branch 2 not taken.
20672 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
1064
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20672 times.
20672 assert(retval == CURLE_OK);
1065
1/2
✓ Branch 1 taken 20672 times.
✗ Branch 2 not taken.
20672 retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 1);
1066
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20672 times.
20672 assert(retval == CURLE_OK);
1067
1/2
✓ Branch 1 taken 20672 times.
✗ Branch 2 not taken.
20672 retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 0);
1068
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20672 times.
20672 assert(retval == CURLE_OK);
1069
2/4
✓ Branch 2 taken 20672 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 20672 times.
✗ Branch 6 not taken.
20672 retval = curl_easy_setopt(handle, CURLOPT_INFILESIZE_LARGE,
1070 static_cast<curl_off_t>(info->origin->GetSize()));
1071
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20672 times.
20672 assert(retval == CURLE_OK);
1072
1073
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20672 times.
20672 if (info->request == JobInfo::kReqPutDotCvmfs) {
1074 info->http_headers = curl_slist_append(
1075 info->http_headers, dot_cvmfs_cache_control_header.c_str());
1076
1/2
✓ Branch 0 taken 20672 times.
✗ Branch 1 not taken.
20672 } else if (info->request == JobInfo::kReqPutCas) {
1077
1/2
✓ Branch 1 taken 20672 times.
✗ Branch 2 not taken.
20672 info->http_headers = curl_slist_append(info->http_headers,
1078 kCacheControlCas);
1079 }
1080 }
1081
1082 bool retval_b;
1083
1084 // Authorization
1085 41582 vector<string> authz_headers;
1086
1/4
✓ Branch 0 taken 41582 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
41582 switch (config_.authz_method) {
1087 41582 case kAuthzAwsV2:
1088
1/2
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
41582 retval_b = MkV2Authz(*info, &authz_headers);
1089 41582 break;
1090 case kAuthzAwsV4:
1091 retval_b = MkV4Authz(*info, &authz_headers);
1092 break;
1093 case kAuthzAzure:
1094 retval_b = MkAzureAuthz(*info, &authz_headers);
1095 break;
1096 default:
1097 PANIC(NULL);
1098 }
1099
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41582 times.
41582 if (!retval_b)
1100 return kFailLocalIO;
1101
2/2
✓ Branch 1 taken 166226 times.
✓ Branch 2 taken 41582 times.
207808 for (unsigned i = 0; i < authz_headers.size(); ++i) {
1102
1/2
✓ Branch 2 taken 166226 times.
✗ Branch 3 not taken.
166226 info->http_headers = curl_slist_append(info->http_headers,
1103 166226 authz_headers[i].c_str());
1104 }
1105
1106 // Common headers
1107
1/2
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
41582 info->http_headers = curl_slist_append(info->http_headers,
1108 "Connection: Keep-Alive");
1109
1/2
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
41582 info->http_headers = curl_slist_append(info->http_headers, "Pragma:");
1110 // No 100-continue
1111
1/2
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
41582 info->http_headers = curl_slist_append(info->http_headers, "Expect:");
1112 // Strip unnecessary header
1113
1/2
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
41582 info->http_headers = curl_slist_append(info->http_headers, "Accept:");
1114
1/2
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
41582 info->http_headers = curl_slist_append(info->http_headers,
1115 41582 user_agent_->c_str());
1116
1117 // Set curl parameters
1118
1/2
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
41582 retval = curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
1119
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41582 times.
41582 assert(retval == CURLE_OK);
1120
1/2
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
41582 retval = curl_easy_setopt(handle, CURLOPT_HEADERDATA,
1121 static_cast<void *>(info));
1122
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41582 times.
41582 assert(retval == CURLE_OK);
1123
1/2
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
41582 retval = curl_easy_setopt(handle, CURLOPT_READDATA,
1124 static_cast<void *>(info));
1125
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41582 times.
41582 assert(retval == CURLE_OK);
1126
1/2
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
41582 retval = curl_easy_setopt(handle, CURLOPT_WRITEDATA,
1127 static_cast<void *>(info));
1128
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41582 times.
41582 assert(retval == CURLE_OK);
1129
1/2
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
41582 retval = curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->http_headers);
1130
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41582 times.
41582 assert(retval == CURLE_OK);
1131
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41582 times.
41582 if (opt_ipv4_only_) {
1132 retval = curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
1133 assert(retval == CURLE_OK);
1134 }
1135 // Follow HTTP redirects
1136
1/2
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
41582 retval = curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L);
1137
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41582 times.
41582 assert(retval == CURLE_OK);
1138
1139
1/2
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
41582 retval = curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, info->errorbuffer);
1140
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41582 times.
41582 assert(retval == CURLE_OK);
1141
1142
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 41582 times.
41582 if (config_.protocol == "https") {
1143 retval = curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 1L);
1144 assert(retval == CURLE_OK);
1145 retval = curl_easy_setopt(handle, CURLOPT_PROXY_SSL_VERIFYPEER, 1L);
1146 assert(retval == CURLE_OK);
1147 const bool add_cert = ssl_certificate_store_.ApplySslCertificatePath(
1148 handle);
1149 assert(add_cert);
1150 }
1151
1152 41582 return kFailOk;
1153 41582 }
1154
1155
1156 /**
1157 * Sets the URL specific options such as host to use and timeout.
1158 */
1159 41582 void S3FanoutManager::SetUrlOptions(JobInfo *info) const {
1160 41582 CURL *curl_handle = info->curl_handle;
1161 CURLcode retval;
1162
1163
1/2
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
41582 retval = curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT,
1164 config_.opt_timeout_sec);
1165
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41582 times.
41582 assert(retval == CURLE_OK);
1166
1/2
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
41582 retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT,
1167 kLowSpeedLimit);
1168
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41582 times.
41582 assert(retval == CURLE_OK);
1169
1/2
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
41582 retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME,
1170 config_.opt_timeout_sec);
1171
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41582 times.
41582 assert(retval == CURLE_OK);
1172
1173
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41582 times.
41582 if (is_curl_debug_) {
1174 retval = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE, 1);
1175 assert(retval == CURLE_OK);
1176 }
1177
1178
1/2
✓ Branch 1 taken 41582 times.
✗ Branch 2 not taken.
41582 string url = MkUrl(info->object_key);
1179
2/2
✓ Branch 0 taken 68 times.
✓ Branch 1 taken 41514 times.
41582 if (info->request == JobInfo::kReqDeleteMulti)
1180
1/2
✓ Branch 1 taken 68 times.
✗ Branch 2 not taken.
68 url += "?delete";
1181
1/2
✓ Branch 2 taken 41582 times.
✗ Branch 3 not taken.
41582 retval = curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
1182
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41582 times.
41582 assert(retval == CURLE_OK);
1183
1184
1/2
✓ Branch 2 taken 41582 times.
✗ Branch 3 not taken.
41582 retval = curl_easy_setopt(curl_handle, CURLOPT_PROXY, config_.proxy.c_str());
1185
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41582 times.
41582 assert(retval == CURLE_OK);
1186 41582 }
1187
1188
1189 /**
1190 * Adds transfer time and uploaded bytes to the global counters.
1191 */
1192 41718 void S3FanoutManager::UpdateStatistics(CURL *handle) {
1193 double val;
1194
1195
2/4
✓ Branch 1 taken 41718 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 41718 times.
✗ Branch 4 not taken.
41718 if (curl_easy_getinfo(handle, CURLINFO_SIZE_UPLOAD, &val) == CURLE_OK)
1196 41718 statistics_->transferred_bytes += val;
1197 41718 }
1198
1199
1200 /**
1201 * Retry if possible and if not already done too often.
1202 */
1203 204 bool S3FanoutManager::CanRetry(const JobInfo *info) {
1204 204 return (info->error_code == kFailHostConnection
1205
1/2
✓ Branch 0 taken 204 times.
✗ Branch 1 not taken.
204 || info->error_code == kFailHostResolve
1206
1/2
✓ Branch 0 taken 204 times.
✗ Branch 1 not taken.
204 || info->error_code == kFailServiceUnavailable
1207
2/2
✓ Branch 0 taken 136 times.
✓ Branch 1 taken 68 times.
204 || info->error_code == kFailRetry)
1208
2/4
✓ Branch 0 taken 204 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 136 times.
✗ Branch 3 not taken.
408 && (info->num_retries < config_.opt_max_retries);
1209 }
1210
1211
1212 /**
1213 * Backoff for retry to introduce a jitter into a upload sequence.
1214 *
1215 * \return true if backoff has been performed, false otherwise
1216 */
1217 136 void S3FanoutManager::Backoff(JobInfo *info) {
1218
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 136 times.
136 if (info->error_code != kFailRetry)
1219 info->num_retries++;
1220 136 statistics_->num_retries++;
1221
1222
1/2
✓ Branch 0 taken 136 times.
✗ Branch 1 not taken.
136 if (info->throttle_ms > 0) {
1223 136 LogCvmfs(kLogS3Fanout, kLogDebug, "throttling for %d ms",
1224 info->throttle_ms);
1225 136 const uint64_t now = platform_monotonic_time();
1226
1/2
✓ Branch 0 taken 136 times.
✗ Branch 1 not taken.
136 if ((info->throttle_timestamp + (info->throttle_ms / 1000)) >= now) {
1227
2/2
✓ Branch 0 taken 34 times.
✓ Branch 1 taken 102 times.
136 if ((now - timestamp_last_throttle_report_)
1228 > kThrottleReportIntervalSec) {
1229 34 LogCvmfs(kLogS3Fanout, kLogStdout,
1230 "Warning: S3 backend throttling %ums "
1231 "(total backoff time so far %lums)",
1232 34 info->throttle_ms, statistics_->ms_throttled);
1233 34 timestamp_last_throttle_report_ = now;
1234 }
1235 136 statistics_->ms_throttled += info->throttle_ms;
1236 136 SafeSleepMs(info->throttle_ms);
1237 }
1238 } else {
1239 if (info->backoff_ms == 0) {
1240 // Must be != 0
1241 info->backoff_ms = prng_.Next(config_.opt_backoff_init_ms + 1);
1242 } else {
1243 info->backoff_ms *= 2;
1244 }
1245 if (info->backoff_ms > config_.opt_backoff_max_ms)
1246 info->backoff_ms = config_.opt_backoff_max_ms;
1247
1248 LogCvmfs(kLogS3Fanout, kLogDebug, "backing off for %d ms",
1249 info->backoff_ms);
1250 SafeSleepMs(info->backoff_ms);
1251 }
1252 136 }
1253
1254
1255 /**
1256 * Checks the result of a curl request and implements the failure logic
1257 * and takes care of cleanup.
1258 *
1259 * @return true if request should be repeated, false otherwise
1260 */
1261 41718 bool S3FanoutManager::VerifyAndFinalize(const int curl_error, JobInfo *info) {
1262 41718 LogCvmfs(kLogS3Fanout, kLogDebug,
1263 "Verify uploaded/tested object %s "
1264 "(curl error %d, info error %d, info request %d)",
1265 41718 info->object_key.c_str(), curl_error, info->error_code,
1266 41718 info->request);
1267 41718 UpdateStatistics(info->curl_handle);
1268
1269 // Verification and error classification
1270
1/6
✓ Branch 0 taken 41718 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
41718 switch (curl_error) {
1271 41718 case CURLE_OK:
1272
2/2
✓ Branch 0 taken 41582 times.
✓ Branch 1 taken 136 times.
41718 if ((info->error_code != kFailRetry)
1273
2/2
✓ Branch 0 taken 20842 times.
✓ Branch 1 taken 20740 times.
41582 && (info->error_code != kFailNotFound)) {
1274 20842 info->error_code = kFailOk;
1275 }
1276 41718 break;
1277 case CURLE_UNSUPPORTED_PROTOCOL:
1278 case CURLE_URL_MALFORMAT:
1279 info->error_code = kFailBadRequest;
1280 break;
1281 case CURLE_COULDNT_RESOLVE_HOST:
1282 info->error_code = kFailHostResolve;
1283 break;
1284 case CURLE_COULDNT_CONNECT:
1285 case CURLE_OPERATION_TIMEDOUT:
1286 case CURLE_SEND_ERROR:
1287 case CURLE_RECV_ERROR:
1288 info->error_code = kFailHostConnection;
1289 break;
1290 case CURLE_ABORTED_BY_CALLBACK:
1291 case CURLE_WRITE_ERROR:
1292 // Error set by callback
1293 break;
1294 default:
1295 LogCvmfs(kLogS3Fanout, kLogStderr | kLogSyslogErr,
1296 "unexpected curl error (%d) while trying to upload %s: %s",
1297 curl_error, info->object_key.c_str(), info->errorbuffer);
1298 info->error_code = kFailOther;
1299 break;
1300 }
1301
1302 // Transform HEAD to PUT request
1303
2/2
✓ Branch 0 taken 20740 times.
✓ Branch 1 taken 20978 times.
41718 if ((info->error_code == kFailNotFound)
1304
2/2
✓ Branch 0 taken 20672 times.
✓ Branch 1 taken 68 times.
20740 && (info->request == JobInfo::kReqHeadPut)) {
1305 20672 LogCvmfs(kLogS3Fanout, kLogDebug, "not found: %s, uploading",
1306 info->object_key.c_str());
1307 20672 info->request = JobInfo::kReqPutCas;
1308 20672 curl_slist_free_all(info->http_headers);
1309 20672 info->http_headers = NULL;
1310 20672 const s3fanout::Failures init_failure = InitializeRequest(
1311 info, info->curl_handle);
1312
1313
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20672 times.
20672 if (init_failure != s3fanout::kFailOk) {
1314 PANIC(kLogStderr,
1315 "Failed to initialize CURL handle "
1316 "(error: %d - %s | errno: %d)",
1317 init_failure, Code2Ascii(init_failure), errno);
1318 }
1319 20672 SetUrlOptions(info);
1320 // Reset origin
1321 20672 info->origin->Rewind();
1322 20672 return true; // Again, Put
1323 }
1324
1325 // Determination if failed request should be repeated
1326 21046 bool try_again = false;
1327
2/2
✓ Branch 0 taken 204 times.
✓ Branch 1 taken 20842 times.
21046 if (info->error_code != kFailOk) {
1328 204 try_again = CanRetry(info);
1329 }
1330
2/2
✓ Branch 0 taken 136 times.
✓ Branch 1 taken 20910 times.
21046 if (try_again) {
1331
1/2
✓ Branch 0 taken 136 times.
✗ Branch 1 not taken.
136 if (info->request == JobInfo::kReqPutCas
1332
1/2
✓ Branch 0 taken 136 times.
✗ Branch 1 not taken.
136 || info->request == JobInfo::kReqPutDotCvmfs
1333
1/2
✓ Branch 0 taken 136 times.
✗ Branch 1 not taken.
136 || info->request == JobInfo::kReqPutHtml
1334
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 136 times.
136 || info->request == JobInfo::kReqDeleteMulti) {
1335 LogCvmfs(kLogS3Fanout, kLogDebug, "Trying again to upload %s",
1336 info->object_key.c_str());
1337 // Reset origin
1338 info->origin->Rewind();
1339 if (info->request == JobInfo::kReqDeleteMulti)
1340 info->response_body.clear();
1341 }
1342 136 Backoff(info);
1343 136 info->error_code = kFailOk;
1344 136 info->http_error = 0;
1345 136 info->throttle_ms = 0;
1346 136 info->backoff_ms = 0;
1347 136 info->throttle_timestamp = 0;
1348 136 return true; // try again
1349 }
1350
1351 // Cleanup opened resources
1352 20910 info->origin.Destroy();
1353
1354
3/4
✓ Branch 0 taken 68 times.
✓ Branch 1 taken 20842 times.
✓ Branch 2 taken 68 times.
✗ Branch 3 not taken.
20910 if ((info->error_code != kFailOk) && (info->http_error != 0)
1355
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 68 times.
68 && (info->http_error != 404)) {
1356 LogCvmfs(kLogS3Fanout, kLogStderr, "S3: HTTP failure %d", info->http_error);
1357 }
1358 20910 return false; // stop transfer
1359 }
1360
1361
2/4
✓ Branch 3 taken 442 times.
✗ Branch 4 not taken.
✓ Branch 9 taken 442 times.
✗ Branch 10 not taken.
442 S3FanoutManager::S3FanoutManager(const S3Config &config) : config_(config) {
1362 442 atomic_init32(&multi_threaded_);
1363
1/2
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
442 MakePipe(pipe_terminate_);
1364
1/2
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
442 MakePipe(pipe_jobs_);
1365
1/2
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
442 MakePipe(pipe_completed_);
1366
1367 int retval;
1368 442 jobs_todo_lock_ = reinterpret_cast<pthread_mutex_t *>(
1369 442 smalloc(sizeof(pthread_mutex_t)));
1370 442 retval = pthread_mutex_init(jobs_todo_lock_, NULL);
1371
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 442 times.
442 assert(retval == 0);
1372 442 curl_handle_lock_ = reinterpret_cast<pthread_mutex_t *>(
1373 442 smalloc(sizeof(pthread_mutex_t)));
1374 442 retval = pthread_mutex_init(curl_handle_lock_, NULL);
1375
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 442 times.
442 assert(retval == 0);
1376
1377
1/2
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
442 active_requests_ = new set<JobInfo *>;
1378
1/2
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
442 pool_handles_idle_ = new set<CURL *>;
1379
1/2
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
442 pool_handles_inuse_ = new set<CURL *>;
1380
1/2
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
442 curl_sharehandles_ = new map<CURL *, S3FanOutDnsEntry *>;
1381
1/2
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
442 sharehandles_ = new set<S3FanOutDnsEntry *>;
1382 442 watch_fds_max_ = 4 * config_.pool_max_handles;
1383 442 max_available_jobs_ = 4 * config_.pool_max_handles;
1384
2/4
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 442 times.
✗ Branch 5 not taken.
442 available_jobs_ = new Semaphore(max_available_jobs_);
1385
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 442 times.
442 assert(NULL != available_jobs_);
1386
1387
1/2
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
442 statistics_ = new Statistics();
1388
1/2
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
442 user_agent_ = new string();
1389
2/4
✓ Branch 2 taken 442 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 442 times.
✗ Branch 6 not taken.
442 *user_agent_ = "User-Agent: cvmfs " + string(CVMFS_VERSION);
1390
1/2
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
442 complete_hostname_ = MkCompleteHostname();
1391
1/2
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
442 dot_cvmfs_cache_control_header = MkDotCvmfsCacheControlHeader();
1392
1393
1/2
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
442 const CURLcode cretval = curl_global_init(CURL_GLOBAL_ALL);
1394
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 442 times.
442 assert(cretval == CURLE_OK);
1395
1/2
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
442 curl_multi_ = curl_multi_init();
1396
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 442 times.
442 assert(curl_multi_ != NULL);
1397 CURLMcode mretval;
1398
1/2
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
442 mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION,
1399 CallbackCurlSocket);
1400
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 442 times.
442 assert(mretval == CURLM_OK);
1401
1/2
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
442 mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1402 static_cast<void *>(this));
1403
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 442 times.
442 assert(mretval == CURLM_OK);
1404
1/2
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
442 mretval = curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1405 config_.pool_max_handles);
1406
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 442 times.
442 assert(mretval == CURLM_OK);
1407
1408 442 prng_.InitLocaltime();
1409
1410 442 thread_upload_ = 0;
1411 442 timestamp_last_throttle_report_ = 0;
1412 442 is_curl_debug_ = (getenv("_CVMFS_CURL_DEBUG") != NULL);
1413
1414 // Parsing environment variables
1415 442 if ((getenv("CVMFS_IPV4_ONLY") != NULL)
1416
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 442 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 442 times.
442 && (strlen(getenv("CVMFS_IPV4_ONLY")) > 0)) {
1417 opt_ipv4_only_ = true;
1418 } else {
1419 442 opt_ipv4_only_ = false;
1420 }
1421
1422
1/2
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
442 resolver_ = dns::CaresResolver::Create(opt_ipv4_only_, 2, 2000);
1423
1424 442 watch_fds_ = static_cast<struct pollfd *>(smalloc(4 * sizeof(struct pollfd)));
1425 442 watch_fds_size_ = 4;
1426 442 watch_fds_inuse_ = 0;
1427
1428
1/2
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
442 ssl_certificate_store_.UseSystemCertificatePath();
1429 442 }
1430
1431 884 S3FanoutManager::~S3FanoutManager() {
1432 442 pthread_mutex_destroy(jobs_todo_lock_);
1433 442 free(jobs_todo_lock_);
1434 442 pthread_mutex_destroy(curl_handle_lock_);
1435 442 free(curl_handle_lock_);
1436
1437
1/2
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
442 if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1438 // Shutdown I/O thread
1439 442 char buf = 'T';
1440 442 WritePipe(pipe_terminate_[1], &buf, 1);
1441 442 pthread_join(thread_upload_, NULL);
1442 }
1443 442 ClosePipe(pipe_terminate_);
1444 442 ClosePipe(pipe_jobs_);
1445 442 ClosePipe(pipe_completed_);
1446
1447 442 set<CURL *>::iterator i = pool_handles_idle_->begin();
1448 442 const set<CURL *>::const_iterator iEnd = pool_handles_idle_->end();
1449
2/2
✓ Branch 2 taken 714 times.
✓ Branch 3 taken 442 times.
1156 for (; i != iEnd; ++i) {
1450 714 curl_easy_cleanup(*i);
1451 }
1452
1453 442 set<S3FanOutDnsEntry *>::iterator is = sharehandles_->begin();
1454 442 const set<S3FanOutDnsEntry *>::const_iterator isEnd = sharehandles_->end();
1455
2/2
✓ Branch 2 taken 374 times.
✓ Branch 3 taken 442 times.
816 for (; is != isEnd; ++is) {
1456 374 curl_share_cleanup((*is)->sharehandle);
1457 374 curl_slist_free_all((*is)->clist);
1458
1/2
✓ Branch 1 taken 374 times.
✗ Branch 2 not taken.
374 delete *is;
1459 }
1460 442 pool_handles_idle_->clear();
1461 442 curl_sharehandles_->clear();
1462 442 sharehandles_->clear();
1463
1/2
✓ Branch 0 taken 442 times.
✗ Branch 1 not taken.
442 delete active_requests_;
1464
1/2
✓ Branch 0 taken 442 times.
✗ Branch 1 not taken.
442 delete pool_handles_idle_;
1465
1/2
✓ Branch 0 taken 442 times.
✗ Branch 1 not taken.
442 delete pool_handles_inuse_;
1466
1/2
✓ Branch 0 taken 442 times.
✗ Branch 1 not taken.
442 delete curl_sharehandles_;
1467
1/2
✓ Branch 0 taken 442 times.
✗ Branch 1 not taken.
442 delete sharehandles_;
1468
1/2
✓ Branch 0 taken 442 times.
✗ Branch 1 not taken.
442 delete user_agent_;
1469 442 curl_multi_cleanup(curl_multi_);
1470
1471
1/2
✓ Branch 0 taken 442 times.
✗ Branch 1 not taken.
442 delete statistics_;
1472
1473
1/2
✓ Branch 0 taken 442 times.
✗ Branch 1 not taken.
442 delete available_jobs_;
1474
1475 442 curl_global_cleanup();
1476 442 }
1477
1478 /**
1479 * Spawns the I/O worker thread. No way back except ~S3FanoutManager.
1480 */
1481 442 void S3FanoutManager::Spawn() {
1482 442 LogCvmfs(kLogS3Fanout, kLogDebug, "S3FanoutManager spawned");
1483
1484 442 const int retval = pthread_create(&thread_upload_, NULL, MainUpload,
1485 static_cast<void *>(this));
1486
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 442 times.
442 assert(retval == 0);
1487
1488 442 atomic_inc32(&multi_threaded_);
1489 442 }
1490
1491 102 const Statistics &S3FanoutManager::GetStatistics() { return *statistics_; }
1492
1493 /**
1494 * Push new job to be uploaded to the S3 cloud storage.
1495 */
1496 20910 void S3FanoutManager::PushNewJob(JobInfo *info) {
1497 20910 available_jobs_->Increment();
1498 20910 WritePipe(pipe_jobs_[1], &info, sizeof(info));
1499 20910 }
1500
1501 /**
1502 * Push completed job to list of completed jobs
1503 */
1504 21352 void S3FanoutManager::PushCompletedJob(JobInfo *info) {
1505 21352 WritePipe(pipe_completed_[1], &info, sizeof(info));
1506 21352 }
1507
1508 /**
1509 * Pop completed job
1510 */
1511 21352 JobInfo *S3FanoutManager::PopCompletedJob() {
1512 JobInfo *info;
1513
1/2
✓ Branch 1 taken 21352 times.
✗ Branch 2 not taken.
21352 ReadPipe(pipe_completed_[0], &info, sizeof(info));
1514 21352 return info;
1515 }
1516
1517 //------------------------------------------------------------------------------
1518
1519
1520 string Statistics::Print() const {
1521 return "Transferred Bytes: " + StringifyInt(uint64_t(transferred_bytes))
1522 + "\n" + "Transfer duration: " + StringifyInt(uint64_t(transfer_time))
1523 + " s\n" + "Number of requests: " + StringifyInt(num_requests) + "\n"
1524 + "Number of retries: " + StringifyInt(num_retries) + "\n";
1525 }
1526
1527 } // namespace s3fanout
1528