GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/network/s3fanout.cc
Date: 2026-04-05 02:35:23
Exec Total Coverage
Lines: 636 903 70.4%
Branches: 464 1332 34.8%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 *
4 * Runs a thread using libcurls asynchronous I/O mode to push data to S3
5 */
6
7 #include "s3fanout.h"
8
9 #include <pthread.h>
10
11 #include <algorithm>
12 #include <cassert>
13 #include <cerrno>
14 #include <utility>
15
16 #include "upload_facility.h"
17 #include "util/concurrency.h"
18 #include "util/exception.h"
19 #include "util/platform.h"
20 #include "util/posix.h"
21 #include "util/string.h"
22
23 using namespace std; // NOLINT
24
25 namespace s3fanout {
26
27 /**
28 * Escapes characters that are not allowed in XML text content.
29 * Only & and < need escaping; >, ', " are safe in text nodes.
30 */
31 97 static string XmlEscape(const string &input) {
32 97 string result;
33
1/2
✓ Branch 2 taken 97 times.
✗ Branch 3 not taken.
97 result.reserve(input.size());
34
2/2
✓ Branch 1 taken 1778 times.
✓ Branch 2 taken 97 times.
1875 for (unsigned i = 0; i < input.size(); ++i) {
35
3/3
✓ Branch 1 taken 11 times.
✓ Branch 2 taken 11 times.
✓ Branch 3 taken 1756 times.
1778 switch (input[i]) {
36
1/2
✓ Branch 1 taken 11 times.
✗ Branch 2 not taken.
11 case '&': result += "&amp;"; break;
37
1/2
✓ Branch 1 taken 11 times.
✗ Branch 2 not taken.
11 case '<': result += "&lt;"; break;
38
1/2
✓ Branch 2 taken 1756 times.
✗ Branch 3 not taken.
1756 default: result += input[i]; break;
39 }
40 }
41 97 return result;
42 }
43
44
45 /**
46 * Builds an S3 DeleteObjects XML request body for multi-object delete.
47 * Uses Quiet mode so the response only contains errors, not successes.
48 */
49 64 string ComposeDeleteMultiXml(const vector<string> &keys) {
50 string xml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
51
1/2
✓ Branch 2 taken 64 times.
✗ Branch 3 not taken.
64 "<Delete><Quiet>true</Quiet>\n";
52 // ~70 bytes per <Object><Key>...</Key></Object> entry
53
1/2
✓ Branch 3 taken 64 times.
✗ Branch 4 not taken.
64 xml.reserve(xml.size() + keys.size() * 70 + 10);
54
2/2
✓ Branch 1 taken 97 times.
✓ Branch 2 taken 64 times.
161 for (unsigned i = 0; i < keys.size(); ++i) {
55
4/8
✓ Branch 2 taken 97 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 97 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 97 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 97 times.
✗ Branch 12 not taken.
97 xml += "<Object><Key>" + XmlEscape(keys[i]) + "</Key></Object>\n";
56 }
57
1/2
✓ Branch 1 taken 64 times.
✗ Branch 2 not taken.
64 xml += "</Delete>";
58 64 return xml;
59 }
60
61
62 /**
63 * Parses the S3 DeleteObjects error response XML.
64 * In Quiet mode, the response only contains <Error> elements for failed keys.
65 * Returns the number of errors found.
66 */
67 44 unsigned ParseDeleteMultiResponse(const string &response,
68 vector<string> *error_keys,
69 vector<string> *error_codes,
70 vector<string> *error_messages) {
71 44 unsigned num_errors = 0;
72 44 string::size_type pos = 0;
73
74 while (true) {
75 77 const string::size_type err_start = response.find("<Error>", pos);
76
2/2
✓ Branch 0 taken 33 times.
✓ Branch 1 taken 44 times.
77 if (err_start == string::npos)
77 33 break;
78 44 const string::size_type err_end = response.find("</Error>", err_start);
79
2/2
✓ Branch 0 taken 11 times.
✓ Branch 1 taken 33 times.
44 if (err_end == string::npos)
80 11 break;
81
82 const string error_block = response.substr(err_start,
83
1/2
✓ Branch 1 taken 33 times.
✗ Branch 2 not taken.
33 err_end - err_start);
84 33 num_errors++;
85
86 // Extract <Key>...</Key>
87 33 const string::size_type key_start = error_block.find("<Key>");
88 33 const string::size_type key_end = error_block.find("</Key>");
89
2/4
✓ Branch 0 taken 33 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 33 times.
✗ Branch 3 not taken.
33 if (key_start != string::npos && key_end != string::npos) {
90
1/2
✓ Branch 1 taken 33 times.
✗ Branch 2 not taken.
33 error_keys->push_back(
91
1/2
✓ Branch 1 taken 33 times.
✗ Branch 2 not taken.
66 error_block.substr(key_start + 5, key_end - key_start - 5));
92 } else {
93 error_keys->push_back("");
94 }
95
96 // Extract <Code>...</Code>
97 33 const string::size_type code_start = error_block.find("<Code>");
98 33 const string::size_type code_end = error_block.find("</Code>");
99
2/4
✓ Branch 0 taken 33 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 33 times.
✗ Branch 3 not taken.
33 if (code_start != string::npos && code_end != string::npos) {
100
1/2
✓ Branch 1 taken 33 times.
✗ Branch 2 not taken.
33 error_codes->push_back(
101
1/2
✓ Branch 1 taken 33 times.
✗ Branch 2 not taken.
66 error_block.substr(code_start + 6, code_end - code_start - 6));
102 } else {
103 error_codes->push_back("");
104 }
105
106 // Extract <Message>...</Message>
107 33 const string::size_type msg_start = error_block.find("<Message>");
108 33 const string::size_type msg_end = error_block.find("</Message>");
109
2/4
✓ Branch 0 taken 33 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 33 times.
✗ Branch 3 not taken.
33 if (msg_start != string::npos && msg_end != string::npos) {
110
1/2
✓ Branch 1 taken 33 times.
✗ Branch 2 not taken.
33 error_messages->push_back(
111
1/2
✓ Branch 1 taken 33 times.
✗ Branch 2 not taken.
66 error_block.substr(msg_start + 9, msg_end - msg_start - 9));
112 } else {
113 error_messages->push_back("");
114 }
115
116 33 pos = err_end + 8; // length of "</Error>"
117 33 }
118
119 44 return num_errors;
120 }
121
122 const char *S3FanoutManager::kCacheControlCas = "Cache-Control: max-age=259200";
123 const unsigned S3FanoutManager::kDefault429ThrottleMs = 250;
124 const unsigned S3FanoutManager::kMax429ThrottleMs = 10000;
125 const unsigned S3FanoutManager::kThrottleReportIntervalSec = 10;
126 const unsigned S3FanoutManager::kDefaultHTTPPort = 80;
127 const unsigned S3FanoutManager::kDefaultHTTPSPort = 443;
128
129 240 std::string S3FanoutManager::MkDotCvmfsCacheControlHeader(unsigned defaultMaxAge, int overrideMaxAge)
130 {
131 const char *var;
132 int max_age_sec;
133 char *at_null_terminator_if_number;
134 240 bool value_determined = false;
135
136
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 240 times.
240 if (overrideMaxAge >= 0) {
137 max_age_sec = overrideMaxAge;
138 value_determined = true;
139 }
140
141
1/2
✓ Branch 0 taken 240 times.
✗ Branch 1 not taken.
240 if (!value_determined) {
142 240 var = getenv("CVMFS_MAX_TTL_SECS");
143
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
240 if (var && var[0]) {
144 max_age_sec = strtoll(var, &at_null_terminator_if_number, 10);
145 if (*at_null_terminator_if_number == '\0'
146 && max_age_sec >= 0) {
147 value_determined = true;
148 }
149 }
150 }
151
152
1/2
✓ Branch 0 taken 240 times.
✗ Branch 1 not taken.
240 if (!value_determined) {
153 240 var = getenv("CVMFS_MAX_TTL");
154
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
240 if (var && var[0]) {
155 max_age_sec = strtoll(var, &at_null_terminator_if_number, 10);
156 if (*at_null_terminator_if_number == '\0'
157 && max_age_sec >= 0) {
158 max_age_sec *= 60;
159 value_determined = true;
160 }
161 }
162 }
163
164
1/2
✓ Branch 0 taken 240 times.
✗ Branch 1 not taken.
240 if (!value_determined) {
165 240 max_age_sec = defaultMaxAge;
166 }
167
168
2/4
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 240 times.
✗ Branch 5 not taken.
480 return "Cache-Control: max-age=" + std::to_string(max_age_sec);
169 }
170
171 /**
172 * Parses Retry-After and X-Retry-In headers attached to HTTP 429 responses
173 */
174 394 void S3FanoutManager::DetectThrottleIndicator(const std::string &header,
175 JobInfo *info) {
176 394 std::string value_str;
177
4/6
✓ Branch 2 taken 394 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 394 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 157 times.
✓ Branch 10 taken 237 times.
394 if (HasPrefix(header, "retry-after:", true))
178
1/2
✓ Branch 1 taken 157 times.
✗ Branch 2 not taken.
157 value_str = header.substr(12);
179
4/6
✓ Branch 2 taken 394 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 394 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 55 times.
✓ Branch 10 taken 339 times.
394 if (HasPrefix(header, "x-retry-in:", true))
180
1/2
✓ Branch 1 taken 55 times.
✗ Branch 2 not taken.
55 value_str = header.substr(11);
181
182
1/2
✓ Branch 1 taken 394 times.
✗ Branch 2 not taken.
394 value_str = Trim(value_str, true /* trim_newline */);
183
2/2
✓ Branch 1 taken 190 times.
✓ Branch 2 taken 204 times.
394 if (!value_str.empty()) {
184
1/2
✓ Branch 1 taken 190 times.
✗ Branch 2 not taken.
190 const unsigned value_numeric = String2Uint64(value_str);
185
2/4
✓ Branch 2 taken 190 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 190 times.
✗ Branch 6 not taken.
380 const unsigned value_ms = HasSuffix(value_str, "ms", true /* ignore_case */)
186
2/2
✓ Branch 0 taken 55 times.
✓ Branch 1 taken 135 times.
190 ? value_numeric
187 190 : (value_numeric * 1000);
188
2/2
✓ Branch 0 taken 179 times.
✓ Branch 1 taken 11 times.
190 if (value_ms > 0)
189 179 info->throttle_ms = std::min(value_ms, kMax429ThrottleMs);
190 }
191 394 }
192
193
194 /**
195 * Called by curl for every HTTP header. Not called for file:// transfers.
196 */
197 73640 static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb,
198 void *info_link) {
199 73640 const size_t num_bytes = size * nmemb;
200
1/2
✓ Branch 2 taken 73640 times.
✗ Branch 3 not taken.
73640 const string header_line(static_cast<const char *>(ptr), num_bytes);
201 73640 JobInfo *info = static_cast<JobInfo *>(info_link);
202
203 // Check for http status code errors
204
4/6
✓ Branch 2 taken 73640 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 73640 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 24520 times.
✓ Branch 10 taken 49120 times.
73640 if (HasPrefix(header_line, "HTTP/1.", false)) {
205
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 24520 times.
24520 if (header_line.length() < 10)
206 return 0;
207
208 unsigned i;
209
5/6
✓ Branch 1 taken 49040 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 24520 times.
✓ Branch 5 taken 24520 times.
✓ Branch 6 taken 24520 times.
✓ Branch 7 taken 24520 times.
49040 for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {
210 }
211
212
2/2
✓ Branch 1 taken 12240 times.
✓ Branch 2 taken 12280 times.
24520 if (header_line[i] == '2') {
213 12240 return num_bytes;
214 } else {
215
1/2
✓ Branch 2 taken 12280 times.
✗ Branch 3 not taken.
12280 LogCvmfs(kLogS3Fanout, kLogDebug, "http status error code [info %p]: %s",
216 info, header_line.c_str());
217
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 12280 times.
12280 if (header_line.length() < i + 3) {
218 LogCvmfs(kLogS3Fanout, kLogStderr, "S3: invalid HTTP response '%s'",
219 header_line.c_str());
220 info->error_code = kFailOther;
221 return 0;
222 }
223
2/4
✓ Branch 3 taken 12280 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 12280 times.
✗ Branch 7 not taken.
12280 info->http_error = String2Int64(string(&header_line[i], 3));
224
225
2/7
✓ Branch 0 taken 80 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 12200 times.
✗ Branch 6 not taken.
12280 switch (info->http_error) {
226 80 case 429:
227 80 info->error_code = kFailRetry;
228 80 info->throttle_ms = S3FanoutManager::kDefault429ThrottleMs;
229 80 info->throttle_timestamp = platform_monotonic_time();
230 80 return num_bytes;
231 case 507: // Insufficient Storage
232 info->error_code = kFailInsufficientStorage;
233 break;
234 case 503:
235 case 502: // Can happen if the S3 gateway-backend connection breaks
236 case 500: // sometimes see this as a transient error from S3
237 info->error_code = kFailServiceUnavailable;
238 break;
239 case 501:
240 case 400:
241 info->error_code = kFailBadRequest;
242 break;
243 case 403:
244 info->error_code = kFailForbidden;
245 break;
246 12200 case 404:
247 12200 info->error_code = kFailNotFound;
248 12200 return num_bytes;
249 default:
250 info->error_code = kFailOther;
251 }
252 return 0;
253 }
254 }
255
256
2/2
✓ Branch 0 taken 240 times.
✓ Branch 1 taken 48880 times.
49120 if (info->error_code == kFailRetry) {
257
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 S3FanoutManager::DetectThrottleIndicator(header_line, info);
258 }
259
260 49120 return num_bytes;
261 73640 }
262
263
264 /**
265 * Called by curl for every new chunk to upload.
266 */
267 313400 static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb,
268 void *info_link) {
269 313400 const size_t num_bytes = size * nmemb;
270 313400 JobInfo *info = static_cast<JobInfo *>(info_link);
271
272 313400 LogCvmfs(kLogS3Fanout, kLogDebug, "Data callback with %zu bytes", num_bytes);
273
274
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 313400 times.
313400 if (num_bytes == 0)
275 return 0;
276
277 313400 const uint64_t read_bytes = info->origin->Read(ptr, num_bytes);
278
279 313400 LogCvmfs(kLogS3Fanout, kLogDebug, "source buffer pushed out %lu bytes",
280 read_bytes);
281
282 313400 return read_bytes;
283 }
284
285
286 /**
287 * Captures the HTTP response body. For kReqDeleteMulti, the response contains
288 * XML error information. For other requests, the body is ignored.
289 */
290 static size_t CallbackCurlBody(char *ptr, size_t size, size_t nmemb,
291 void *userdata) {
292 const size_t num_bytes = size * nmemb;
293 JobInfo *info = static_cast<JobInfo *>(userdata);
294 if (info != NULL && info->request == JobInfo::kReqDeleteMulti) {
295 info->response_body.append(ptr, num_bytes);
296 }
297 return num_bytes;
298 }
299
300
301 /**
302 * Called when new curl sockets arrive or existing curl sockets depart.
303 */
304 54380 int S3FanoutManager::CallbackCurlSocket(CURL *easy, curl_socket_t s, int action,
305 void *userp, void *socketp) {
306 54380 S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(userp);
307 54380 LogCvmfs(kLogS3Fanout, kLogDebug,
308 "CallbackCurlSocket called with easy "
309 "handle %p, socket %d, action %d, up %p, "
310 "sp %p, fds_inuse %d, jobs %d",
311 easy, s, action, userp, socketp, s3fanout_mgr->watch_fds_inuse_,
312 54380 s3fanout_mgr->available_jobs_->Get());
313
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 54380 times.
54380 if (action == CURL_POLL_NONE)
314 return 0;
315
316 // Find s in watch_fds_
317 // First 2 fds are job and terminate pipes (not curl related)
318 unsigned index;
319
2/2
✓ Branch 0 taken 86580 times.
✓ Branch 1 taken 24520 times.
111100 for (index = 2; index < s3fanout_mgr->watch_fds_inuse_; ++index) {
320
2/2
✓ Branch 0 taken 29860 times.
✓ Branch 1 taken 56720 times.
86580 if (s3fanout_mgr->watch_fds_[index].fd == s)
321 29860 break;
322 }
323 // Or create newly
324
2/2
✓ Branch 0 taken 24520 times.
✓ Branch 1 taken 29860 times.
54380 if (index == s3fanout_mgr->watch_fds_inuse_) {
325 // Extend array if necessary
326
2/2
✓ Branch 0 taken 40 times.
✓ Branch 1 taken 24480 times.
24520 if (s3fanout_mgr->watch_fds_inuse_ == s3fanout_mgr->watch_fds_size_) {
327 40 s3fanout_mgr->watch_fds_size_ *= 2;
328 40 s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
329 40 srealloc(s3fanout_mgr->watch_fds_,
330 40 s3fanout_mgr->watch_fds_size_ * sizeof(struct pollfd)));
331 }
332 24520 s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].fd = s;
333 24520 s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].events = 0;
334 24520 s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].revents = 0;
335 24520 s3fanout_mgr->watch_fds_inuse_++;
336 }
337
338
4/5
✓ Branch 0 taken 24520 times.
✓ Branch 1 taken 60 times.
✓ Branch 2 taken 5280 times.
✓ Branch 3 taken 24520 times.
✗ Branch 4 not taken.
54380 switch (action) {
339 24520 case CURL_POLL_IN:
340 24520 s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI;
341 24520 break;
342 60 case CURL_POLL_OUT:
343 60 s3fanout_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND;
344 60 break;
345 5280 case CURL_POLL_INOUT:
346 5280 s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI | POLLOUT
347 | POLLWRBAND;
348 5280 break;
349 24520 case CURL_POLL_REMOVE:
350
2/2
✓ Branch 0 taken 3860 times.
✓ Branch 1 taken 20660 times.
24520 if (index < s3fanout_mgr->watch_fds_inuse_ - 1)
351 s3fanout_mgr
352 3860 ->watch_fds_[index] = s3fanout_mgr->watch_fds_
353 3860 [s3fanout_mgr->watch_fds_inuse_ - 1];
354 24520 s3fanout_mgr->watch_fds_inuse_--;
355 // Shrink array if necessary
356
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 24520 times.
24520 if ((s3fanout_mgr->watch_fds_inuse_ > s3fanout_mgr->watch_fds_max_)
357 && (s3fanout_mgr->watch_fds_inuse_
358 < s3fanout_mgr->watch_fds_size_ / 2)) {
359 s3fanout_mgr->watch_fds_size_ /= 2;
360 s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
361 srealloc(s3fanout_mgr->watch_fds_,
362 s3fanout_mgr->watch_fds_size_ * sizeof(struct pollfd)));
363 }
364 24520 break;
365 default:
366 PANIC(NULL);
367 }
368
369 54380 return 0;
370 }
371
372
373 /**
374 * Worker thread event loop.
375 */
376 240 void *S3FanoutManager::MainUpload(void *data) {
377
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread started");
378 240 S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(data);
379
380 240 s3fanout_mgr->InitPipeWatchFds();
381
382 // Don't schedule more jobs into the multi handle than the maximum number of
383 // parallel connections. This should prevent starvation and thus a timeout
384 // of the authorization header (CVM-1339).
385 240 unsigned jobs_in_flight = 0;
386
387 while (true) {
388 // Check events with 100ms timeout
389 334580 const int timeout_ms = 100;
390
1/2
✓ Branch 1 taken 334580 times.
✗ Branch 2 not taken.
334580 int retval = poll(s3fanout_mgr->watch_fds_, s3fanout_mgr->watch_fds_inuse_,
391 timeout_ms);
392
2/2
✓ Branch 0 taken 1940 times.
✓ Branch 1 taken 332640 times.
334580 if (retval == 0) {
393 // Handle timeout
394 1940 int still_running = 0;
395
1/2
✓ Branch 1 taken 1940 times.
✗ Branch 2 not taken.
1940 retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
396 CURL_SOCKET_TIMEOUT, 0, &still_running);
397
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1940 times.
1940 if (retval != CURLM_OK) {
398 LogCvmfs(kLogS3Fanout, kLogStderr, "Error, timeout due to: %d", retval);
399 assert(retval == CURLM_OK);
400 }
401
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 332640 times.
332640 } else if (retval < 0) {
402 assert(errno == EINTR);
403 continue;
404 }
405
406 // Terminate I/O thread
407
2/2
✓ Branch 0 taken 240 times.
✓ Branch 1 taken 334340 times.
334580 if (s3fanout_mgr->watch_fds_[0].revents)
408 240 break;
409
410 // New job incoming
411
2/2
✓ Branch 0 taken 12280 times.
✓ Branch 1 taken 322060 times.
334340 if (s3fanout_mgr->watch_fds_[1].revents) {
412 12280 s3fanout_mgr->watch_fds_[1].revents = 0;
413 JobInfo *info;
414
1/2
✓ Branch 1 taken 12280 times.
✗ Branch 2 not taken.
12280 ReadPipe(s3fanout_mgr->pipe_jobs_[0], &info, sizeof(info));
415
1/2
✓ Branch 1 taken 12280 times.
✗ Branch 2 not taken.
12280 CURL *handle = s3fanout_mgr->AcquireCurlHandle();
416
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12280 times.
12280 if (handle == NULL) {
417 PANIC(kLogStderr, "Failed to acquire CURL handle.");
418 }
419
1/2
✓ Branch 1 taken 12280 times.
✗ Branch 2 not taken.
12280 const s3fanout::Failures init_failure = s3fanout_mgr->InitializeRequest(
420 info, handle);
421
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12280 times.
12280 if (init_failure != s3fanout::kFailOk) {
422 PANIC(kLogStderr,
423 "Failed to initialize CURL handle (error: %d - %s | errno: %d)",
424 init_failure, Code2Ascii(init_failure), errno);
425 }
426
1/2
✓ Branch 1 taken 12280 times.
✗ Branch 2 not taken.
12280 s3fanout_mgr->SetUrlOptions(info);
427
428
1/2
✓ Branch 1 taken 12280 times.
✗ Branch 2 not taken.
12280 curl_multi_add_handle(s3fanout_mgr->curl_multi_, handle);
429
1/2
✓ Branch 1 taken 12280 times.
✗ Branch 2 not taken.
12280 s3fanout_mgr->active_requests_->insert(info);
430 12280 jobs_in_flight++;
431 12280 int still_running = 0, retval = 0;
432
1/2
✓ Branch 1 taken 12280 times.
✗ Branch 2 not taken.
12280 retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
433 CURL_SOCKET_TIMEOUT, 0, &still_running);
434
435
1/2
✓ Branch 1 taken 12280 times.
✗ Branch 2 not taken.
12280 LogCvmfs(kLogS3Fanout, kLogDebug, "curl_multi_socket_action: %d - %d",
436 retval, still_running);
437 }
438
439
440 // Activity on curl sockets
441 // Within this loop the curl_multi_socket_action() may cause socket(s)
442 // to be removed from watch_fds_. If a socket is removed it is replaced
443 // by the socket at the end of the array and the inuse count is decreased.
444 // Therefore loop over the array in reverse order.
445 // First 2 fds are job and terminate pipes (not curl related)
446
2/2
✓ Branch 0 taken 757820 times.
✓ Branch 1 taken 334340 times.
1092160 for (int32_t i = s3fanout_mgr->watch_fds_inuse_ - 1; i >= 2; --i) {
447
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 757820 times.
757820 if (static_cast<uint32_t>(i) >= s3fanout_mgr->watch_fds_inuse_) {
448 continue;
449 }
450
2/2
✓ Branch 0 taken 341340 times.
✓ Branch 1 taken 416480 times.
757820 if (s3fanout_mgr->watch_fds_[i].revents) {
451 341340 int ev_bitmask = 0;
452
2/2
✓ Branch 0 taken 36700 times.
✓ Branch 1 taken 304640 times.
341340 if (s3fanout_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI))
453 36700 ev_bitmask |= CURL_CSELECT_IN;
454
2/2
✓ Branch 0 taken 304640 times.
✓ Branch 1 taken 36700 times.
341340 if (s3fanout_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
455 304640 ev_bitmask |= CURL_CSELECT_OUT;
456 341340 if (s3fanout_mgr->watch_fds_[i].revents
457
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 341340 times.
341340 & (POLLERR | POLLHUP | POLLNVAL))
458 ev_bitmask |= CURL_CSELECT_ERR;
459 341340 s3fanout_mgr->watch_fds_[i].revents = 0;
460
461 341340 int still_running = 0;
462 341340 retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
463
1/2
✓ Branch 1 taken 341340 times.
✗ Branch 2 not taken.
341340 s3fanout_mgr->watch_fds_[i].fd,
464 ev_bitmask,
465 &still_running);
466 }
467 }
468
469 // Check if transfers are completed
470 CURLMsg *curl_msg;
471 int msgs_in_queue;
472
3/4
✓ Branch 1 taken 358860 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 24520 times.
✓ Branch 4 taken 334340 times.
358860 while ((curl_msg = curl_multi_info_read(s3fanout_mgr->curl_multi_,
473 &msgs_in_queue))) {
474
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 24520 times.
24520 assert(curl_msg->msg == CURLMSG_DONE);
475
476 24520 s3fanout_mgr->statistics_->num_requests++;
477 JobInfo *info;
478 24520 CURL *easy_handle = curl_msg->easy_handle;
479 24520 const int curl_error = curl_msg->data.result;
480
1/2
✓ Branch 1 taken 24520 times.
✗ Branch 2 not taken.
24520 curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
481
482
1/2
✓ Branch 1 taken 24520 times.
✗ Branch 2 not taken.
24520 curl_multi_remove_handle(s3fanout_mgr->curl_multi_, easy_handle);
483
3/4
✓ Branch 1 taken 24520 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 12240 times.
✓ Branch 4 taken 12280 times.
24520 if (s3fanout_mgr->VerifyAndFinalize(curl_error, info)) {
484
1/2
✓ Branch 1 taken 12240 times.
✗ Branch 2 not taken.
12240 curl_multi_add_handle(s3fanout_mgr->curl_multi_, easy_handle);
485 12240 int still_running = 0;
486
1/2
✓ Branch 1 taken 12240 times.
✗ Branch 2 not taken.
12240 curl_multi_socket_action(s3fanout_mgr->curl_multi_, CURL_SOCKET_TIMEOUT,
487 0, &still_running);
488 } else {
489 // Return easy handle into pool and write result back
490 12280 jobs_in_flight--;
491
1/2
✓ Branch 1 taken 12280 times.
✗ Branch 2 not taken.
12280 s3fanout_mgr->active_requests_->erase(info);
492
1/2
✓ Branch 1 taken 12280 times.
✗ Branch 2 not taken.
12280 s3fanout_mgr->ReleaseCurlHandle(info, easy_handle);
493
1/2
✓ Branch 1 taken 12280 times.
✗ Branch 2 not taken.
12280 s3fanout_mgr->available_jobs_->Decrement();
494
495 // Add to list of completed jobs
496
1/2
✓ Branch 1 taken 12280 times.
✗ Branch 2 not taken.
12280 s3fanout_mgr->PushCompletedJob(info);
497 }
498 }
499 334340 }
500
501 240 set<CURL *>::iterator i = s3fanout_mgr->pool_handles_inuse_->begin();
502 240 const set<CURL *>::const_iterator i_end = s3fanout_mgr->pool_handles_inuse_
503 240 ->end();
504
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 240 times.
240 for (; i != i_end; ++i) {
505 curl_multi_remove_handle(s3fanout_mgr->curl_multi_, *i);
506 curl_easy_cleanup(*i);
507 }
508 240 s3fanout_mgr->pool_handles_inuse_->clear();
509 240 free(s3fanout_mgr->watch_fds_);
510
511
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread terminated");
512 240 return NULL;
513 }
514
515
516 /**
517 * Gets an idle CURL handle from the pool. Creates a new one and adds it to
518 * the pool if necessary.
519 */
520 12280 CURL *S3FanoutManager::AcquireCurlHandle() const {
521 CURL *handle;
522
523 12280 const MutexLockGuard guard(curl_handle_lock_);
524
525
2/2
✓ Branch 1 taken 980 times.
✓ Branch 2 taken 11300 times.
12280 if (pool_handles_idle_->empty()) {
526 CURLcode retval;
527
528 // Create a new handle
529
1/2
✓ Branch 1 taken 980 times.
✗ Branch 2 not taken.
980 handle = curl_easy_init();
530
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 980 times.
980 assert(handle != NULL);
531
532 // Other settings
533
1/2
✓ Branch 1 taken 980 times.
✗ Branch 2 not taken.
980 retval = curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
534
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 980 times.
980 assert(retval == CURLE_OK);
535
1/2
✓ Branch 1 taken 980 times.
✗ Branch 2 not taken.
980 retval = curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION,
536 CallbackCurlHeader);
537
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 980 times.
980 assert(retval == CURLE_OK);
538
1/2
✓ Branch 1 taken 980 times.
✗ Branch 2 not taken.
980 retval = curl_easy_setopt(handle, CURLOPT_READFUNCTION, CallbackCurlData);
539
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 980 times.
980 assert(retval == CURLE_OK);
540
1/2
✓ Branch 1 taken 980 times.
✗ Branch 2 not taken.
980 retval = curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlBody);
541
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 980 times.
980 assert(retval == CURLE_OK);
542 // WRITEDATA is set per-request in InitializeRequest
543 } else {
544 11300 handle = *(pool_handles_idle_->begin());
545
1/2
✓ Branch 2 taken 11300 times.
✗ Branch 3 not taken.
11300 pool_handles_idle_->erase(pool_handles_idle_->begin());
546 }
547
548
1/2
✓ Branch 1 taken 12280 times.
✗ Branch 2 not taken.
12280 pool_handles_inuse_->insert(handle);
549
550 12280 return handle;
551 12280 }
552
553
554 12280 void S3FanoutManager::ReleaseCurlHandle(JobInfo *info, CURL *handle) const {
555
1/2
✓ Branch 0 taken 12280 times.
✗ Branch 1 not taken.
12280 if (info->http_headers) {
556
1/2
✓ Branch 1 taken 12280 times.
✗ Branch 2 not taken.
12280 curl_slist_free_all(info->http_headers);
557 12280 info->http_headers = NULL;
558 }
559
560 12280 const MutexLockGuard guard(curl_handle_lock_);
561
562
1/2
✓ Branch 1 taken 12280 times.
✗ Branch 2 not taken.
12280 const set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
563
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 12280 times.
12280 assert(elem != pool_handles_inuse_->end());
564
565
2/2
✓ Branch 1 taken 580 times.
✓ Branch 2 taken 11700 times.
12280 if (pool_handles_idle_->size() > config_.pool_max_handles) {
566
1/2
✓ Branch 1 taken 580 times.
✗ Branch 2 not taken.
580 const CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, NULL);
567
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 580 times.
580 assert(retval == CURLE_OK);
568
1/2
✓ Branch 1 taken 580 times.
✗ Branch 2 not taken.
580 curl_easy_cleanup(handle);
569 const std::map<CURL *, S3FanOutDnsEntry *>::size_type
570
1/2
✓ Branch 1 taken 580 times.
✗ Branch 2 not taken.
580 retitems = curl_sharehandles_->erase(handle);
571
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 580 times.
580 assert(retitems == 1);
572 } else {
573
1/2
✓ Branch 1 taken 11700 times.
✗ Branch 2 not taken.
11700 pool_handles_idle_->insert(handle);
574 }
575
576
1/2
✓ Branch 1 taken 12280 times.
✗ Branch 2 not taken.
12280 pool_handles_inuse_->erase(elem);
577 12280 }
578
579 240 void S3FanoutManager::InitPipeWatchFds() {
580
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 240 times.
240 assert(watch_fds_inuse_ == 0);
581
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 240 times.
240 assert(watch_fds_size_ >= 2);
582 240 watch_fds_[0].fd = pipe_terminate_[0];
583 240 watch_fds_[0].events = POLLIN | POLLPRI;
584 240 watch_fds_[0].revents = 0;
585 240 ++watch_fds_inuse_;
586 240 watch_fds_[1].fd = pipe_jobs_[0];
587 240 watch_fds_[1].events = POLLIN | POLLPRI;
588 240 watch_fds_[1].revents = 0;
589 240 ++watch_fds_inuse_;
590 240 }
591
592 /**
593 * The Amazon AWS 2 authorization header according to
594 * http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html#ConstructingTheAuthenticationHeader
595 */
596 24440 bool S3FanoutManager::MkV2Authz(const JobInfo &info,
597 vector<string> *headers) const {
598 24440 string payload_hash;
599
1/2
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
24440 const bool retval = MkPayloadHash(info, &payload_hash);
600
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 24440 times.
24440 if (!retval)
601 return false;
602
1/2
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
24440 const string content_type = GetContentType(info);
603
1/2
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
24440 const string request = GetRequestString(info);
604
605
1/2
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
24440 const string timestamp = RfcTimestamp();
606
5/10
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 24440 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 24440 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 24440 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 24440 times.
✗ Branch 14 not taken.
48880 string to_sign = request + "\n" + payload_hash + "\n" + content_type + "\n"
607
2/4
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 24440 times.
✗ Branch 5 not taken.
48880 + timestamp + "\n";
608
2/4
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 24440 times.
✗ Branch 4 not taken.
24440 if (config_.x_amz_acl != "") {
609
2/4
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 24440 times.
✗ Branch 5 not taken.
48880 to_sign += "x-amz-acl:" + config_.x_amz_acl + "\n" + // default ACL
610
5/10
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 24440 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 24440 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 24440 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 24440 times.
✗ Branch 14 not taken.
48880 "/" + config_.bucket + "/" + info.object_key;
611 }
612 // S3 V2 requires subresources to be included in the string to sign.
613 // For kReqDeleteMulti, object_key is intentionally empty: the canonical
614 // resource becomes "/<bucket>/?delete", matching the POST URL.
615
2/2
✓ Branch 0 taken 20 times.
✓ Branch 1 taken 24420 times.
24440 if (info.request == JobInfo::kReqDeleteMulti) {
616
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 20 times.
20 if (config_.x_amz_acl == "")
617 to_sign += "/" + config_.bucket + "/" + info.object_key;
618
1/2
✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
20 to_sign += "?delete";
619 }
620
1/2
✓ Branch 3 taken 24440 times.
✗ Branch 4 not taken.
24440 LogCvmfs(kLogS3Fanout, kLogDebug, "%s string to sign for: %s",
621 request.c_str(), info.object_key.c_str());
622
623
1/2
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
24440 shash::Any hmac;
624 24440 hmac.algorithm = shash::kSha1;
625
1/2
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
24440 shash::Hmac(config_.secret_key,
626 24440 reinterpret_cast<const unsigned char *>(to_sign.data()),
627 24440 to_sign.length(), &hmac);
628
629
3/6
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 24440 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 24440 times.
✗ Branch 8 not taken.
73320 headers->push_back("Authorization: AWS " + config_.access_key + ":"
630
3/6
✓ Branch 2 taken 24440 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 24440 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 24440 times.
✗ Branch 9 not taken.
122200 + Base64(string(reinterpret_cast<char *>(hmac.digest),
631 24440 hmac.GetDigestSize())));
632
2/4
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 24440 times.
✗ Branch 5 not taken.
24440 headers->push_back("Date: " + timestamp);
633
2/4
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 24440 times.
✗ Branch 5 not taken.
24440 headers->push_back("X-Amz-Acl: " + config_.x_amz_acl);
634
2/2
✓ Branch 1 taken 12180 times.
✓ Branch 2 taken 12260 times.
24440 if (!payload_hash.empty())
635
2/4
✓ Branch 1 taken 12180 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 12180 times.
✗ Branch 5 not taken.
12180 headers->push_back("Content-MD5: " + payload_hash);
636
2/2
✓ Branch 1 taken 12180 times.
✓ Branch 2 taken 12260 times.
24440 if (!content_type.empty())
637
2/4
✓ Branch 1 taken 12180 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 12180 times.
✗ Branch 5 not taken.
12180 headers->push_back("Content-Type: " + content_type);
638 24440 return true;
639 24440 }
640
641
642 string S3FanoutManager::GetUriEncode(const string &val,
643 bool encode_slash) const {
644 string result;
645 const unsigned len = val.length();
646 result.reserve(len);
647 for (unsigned i = 0; i < len; ++i) {
648 const char c = val[i];
649 if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')
650 || (c >= '0' && c <= '9') || c == '_' || c == '-' || c == '~'
651 || c == '.') {
652 result.push_back(c);
653 } else if (c == '/') {
654 if (encode_slash) {
655 result += "%2F";
656 } else {
657 result.push_back(c);
658 }
659 } else {
660 result.push_back('%');
661 result.push_back((c / 16) + ((c / 16 <= 9) ? '0' : 'A' - 10));
662 result.push_back((c % 16) + ((c % 16 <= 9) ? '0' : 'A' - 10));
663 }
664 }
665 return result;
666 }
667
668
669 string S3FanoutManager::GetAwsV4SigningKey(const string &date) const {
670 if (last_signing_key_.first == date)
671 return last_signing_key_.second;
672
673 const string date_key = shash::Hmac256("AWS4" + config_.secret_key, date,
674 true);
675 const string date_region_key = shash::Hmac256(date_key, config_.region, true);
676 const string date_region_service_key = shash::Hmac256(date_region_key, "s3",
677 true);
678 string signing_key = shash::Hmac256(date_region_service_key, "aws4_request",
679 true);
680 last_signing_key_.first = date;
681 last_signing_key_.second = signing_key;
682 return signing_key;
683 }
684
685
686 /**
687 * The Amazon AWS4 authorization header according to
688 * http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-auth-using-authorization-header.html
689 */
690 bool S3FanoutManager::MkV4Authz(const JobInfo &info,
691 vector<string> *headers) const {
692 string payload_hash;
693 const bool retval = MkPayloadHash(info, &payload_hash);
694 if (!retval)
695 return false;
696 const string content_type = GetContentType(info);
697 const string timestamp = IsoTimestamp();
698 const string date = timestamp.substr(0, 8);
699 vector<string> tokens = SplitString(complete_hostname_, ':');
700 assert(tokens.size() <= 2);
701 string canonical_hostname = tokens[0];
702
703 // if we could split the hostname in two and if the port is *NOT* a default
704 // one
705 if (tokens.size() == 2
706 && !((String2Uint64(tokens[1]) == kDefaultHTTPPort)
707 || (String2Uint64(tokens[1]) == kDefaultHTTPSPort)))
708 canonical_hostname += ":" + tokens[1];
709
710 string signed_headers;
711 string canonical_headers;
712 if (!content_type.empty()) {
713 signed_headers += "content-type;";
714 headers->push_back("Content-Type: " + content_type);
715 canonical_headers += "content-type:" + content_type + "\n";
716 }
717 if (config_.x_amz_acl != "") {
718 signed_headers += "host;x-amz-acl;x-amz-content-sha256;x-amz-date";
719 } else {
720 signed_headers += "host;x-amz-content-sha256;x-amz-date";
721 }
722 canonical_headers += "host:" + canonical_hostname + "\n";
723 if (config_.x_amz_acl != "") {
724 canonical_headers += "x-amz-acl:" + config_.x_amz_acl + "\n";
725 }
726 canonical_headers += "x-amz-content-sha256:" + payload_hash + "\n"
727 + "x-amz-date:" + timestamp + "\n";
728
729 const string scope = date + "/" + config_.region + "/s3/aws4_request";
730 const string uri = config_.dns_buckets ? (string("/") + info.object_key)
731 : (string("/") + config_.bucket + "/"
732 + info.object_key);
733
734 // V4 canonical query string: empty for most requests, "delete=" for
735 // multi-object delete (the S3 ?delete parameter has no value)
736 const string canonical_query = (info.request == JobInfo::kReqDeleteMulti)
737 ? "delete="
738 : "";
739
740 const string canonical_request = GetRequestString(info) + "\n"
741 + GetUriEncode(uri, false) + "\n"
742 + canonical_query + "\n"
743 + canonical_headers + "\n" + signed_headers
744 + "\n" + payload_hash;
745
746 const string hash_request = shash::Sha256String(canonical_request.c_str());
747
748 const string string_to_sign = "AWS4-HMAC-SHA256\n" + timestamp + "\n" + scope
749 + "\n" + hash_request;
750
751 const string signing_key = GetAwsV4SigningKey(date);
752 const string signature = shash::Hmac256(signing_key, string_to_sign);
753
754 headers->push_back("X-Amz-Acl: " + config_.x_amz_acl);
755 headers->push_back("X-Amz-Content-Sha256: " + payload_hash);
756 headers->push_back("X-Amz-Date: " + timestamp);
757 headers->push_back("Authorization: AWS4-HMAC-SHA256 "
758 "Credential="
759 + config_.access_key + "/" + scope
760 + ","
761 "SignedHeaders="
762 + signed_headers
763 + ","
764 "Signature="
765 + signature);
766
767 // S3 requires Content-MD5 for multi-object delete
768 if (info.request == JobInfo::kReqDeleteMulti) {
769 shash::Any md5_hash(shash::kMd5);
770 unsigned char *data;
771 const unsigned int nbytes = info.origin->Data(
772 reinterpret_cast<void **>(&data), info.origin->GetSize(), 0);
773 assert(nbytes == info.origin->GetSize());
774 shash::HashMem(data, nbytes, &md5_hash);
775 headers->push_back(
776 "Content-MD5: "
777 + Base64(string(reinterpret_cast<char *>(md5_hash.digest),
778 md5_hash.GetDigestSize())));
779 }
780 return true;
781 }
782
783 /**
784 * The Azure Blob authorization header according to
785 * https://docs.microsoft.com/en-us/rest/api/storageservices/authorize-with-shared-key
786 */
787 bool S3FanoutManager::MkAzureAuthz(const JobInfo &info,
788 vector<string> *headers) const {
789 const string timestamp = RfcTimestamp();
790 const string canonical_headers = "x-ms-blob-type:BlockBlob\nx-ms-date:"
791 + timestamp + "\nx-ms-version:2011-08-18";
792 const string canonical_resource = "/" + config_.access_key + "/"
793 + config_.bucket + "/" + info.object_key;
794
795 string string_to_sign;
796 if ((info.request == JobInfo::kReqHeadOnly)
797 || (info.request == JobInfo::kReqHeadPut)
798 || (info.request == JobInfo::kReqDelete)) {
799 string_to_sign = GetRequestString(info) + string("\n\n\n")
800 + "\n\n\n\n\n\n\n\n\n" + canonical_headers + "\n"
801 + canonical_resource;
802 } else {
803 string_to_sign = GetRequestString(info) + string("\n\n\n")
804 + string(StringifyInt(info.origin->GetSize()))
805 + "\n\n\n\n\n\n\n\n\n" + canonical_headers + "\n"
806 + canonical_resource;
807 }
808
809 string signing_key;
810 const int retval = Debase64(config_.secret_key, &signing_key);
811 if (!retval)
812 return false;
813
814 const string signature = shash::Hmac256(signing_key, string_to_sign, true);
815
816 headers->push_back("x-ms-date: " + timestamp);
817 headers->push_back("x-ms-version: 2011-08-18");
818 headers->push_back("Authorization: SharedKey " + config_.access_key + ":"
819 + Base64(signature));
820 headers->push_back("x-ms-blob-type: BlockBlob");
821 return true;
822 }
823
824 24440 void S3FanoutManager::InitializeDnsSettingsCurl(CURL *handle,
825 CURLSH *sharehandle,
826 curl_slist *clist) const {
827 24440 CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, sharehandle);
828
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 24440 times.
24440 assert(retval == CURLE_OK);
829 24440 retval = curl_easy_setopt(handle, CURLOPT_RESOLVE, clist);
830
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 24440 times.
24440 assert(retval == CURLE_OK);
831 24440 }
832
833
834 24440 int S3FanoutManager::InitializeDnsSettings(CURL *handle,
835 std::string host_with_port) const {
836 // Use existing handle
837 const std::map<CURL *, S3FanOutDnsEntry *>::const_iterator
838
1/2
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
24440 it = curl_sharehandles_->find(handle);
839
2/2
✓ Branch 3 taken 23460 times.
✓ Branch 4 taken 980 times.
24440 if (it != curl_sharehandles_->end()) {
840
1/2
✓ Branch 1 taken 23460 times.
✗ Branch 2 not taken.
23460 InitializeDnsSettingsCurl(handle, it->second->sharehandle,
841 23460 it->second->clist);
842 23460 return 0;
843 }
844
845 // Add protocol information for extraction of fields for DNS
846
2/4
✓ Branch 1 taken 980 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 980 times.
✗ Branch 4 not taken.
980 if (!IsHttpUrl(host_with_port))
847
2/4
✓ Branch 1 taken 980 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 980 times.
✗ Branch 5 not taken.
980 host_with_port = config_.protocol + "://" + host_with_port;
848
1/2
✓ Branch 1 taken 980 times.
✗ Branch 2 not taken.
980 const std::string remote_host = dns::ExtractHost(host_with_port);
849
1/2
✓ Branch 1 taken 980 times.
✗ Branch 2 not taken.
980 const std::string remote_port = dns::ExtractPort(host_with_port);
850
851 // If we have the name already resolved, use the least used IP
852 980 S3FanOutDnsEntry *useme = NULL;
853 980 unsigned int usemin = UINT_MAX;
854 980 std::set<S3FanOutDnsEntry *>::iterator its3 = sharehandles_->begin();
855
2/2
✓ Branch 3 taken 780 times.
✓ Branch 4 taken 980 times.
1760 for (; its3 != sharehandles_->end(); ++its3) {
856
1/2
✓ Branch 2 taken 780 times.
✗ Branch 3 not taken.
780 if ((*its3)->dns_name == remote_host) {
857
1/2
✓ Branch 1 taken 780 times.
✗ Branch 2 not taken.
780 if (usemin >= (*its3)->counter) {
858 780 usemin = (*its3)->counter;
859 780 useme = (*its3);
860 }
861 }
862 }
863
2/2
✓ Branch 0 taken 780 times.
✓ Branch 1 taken 200 times.
980 if (useme != NULL) {
864
1/2
✓ Branch 1 taken 780 times.
✗ Branch 2 not taken.
780 curl_sharehandles_->insert(
865 780 std::pair<CURL *, S3FanOutDnsEntry *>(handle, useme));
866 780 useme->counter++;
867
1/2
✓ Branch 1 taken 780 times.
✗ Branch 2 not taken.
780 InitializeDnsSettingsCurl(handle, useme->sharehandle, useme->clist);
868 780 return 0;
869 }
870
871 // We need to resolve the hostname
872 // TODO(ssheikki): support ipv6 also... if (opt_ipv4_only_)
873
1/2
✓ Branch 1 taken 200 times.
✗ Branch 2 not taken.
200 const dns::Host host = resolver_->Resolve(remote_host);
874
1/2
✓ Branch 2 taken 200 times.
✗ Branch 3 not taken.
200 set<string> ipv4_addresses = host.ipv4_addresses();
875 200 std::set<string>::iterator its = ipv4_addresses.begin();
876 200 S3FanOutDnsEntry *dnse = NULL;
877
2/2
✓ Branch 3 taken 200 times.
✓ Branch 4 taken 200 times.
400 for (; its != ipv4_addresses.end(); ++its) {
878
2/4
✓ Branch 1 taken 200 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 200 times.
✗ Branch 5 not taken.
200 dnse = new S3FanOutDnsEntry();
879 200 dnse->counter = 0;
880
1/2
✓ Branch 1 taken 200 times.
✗ Branch 2 not taken.
200 dnse->dns_name = remote_host;
881
4/12
✗ Branch 1 not taken.
✓ Branch 2 taken 200 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 8 taken 200 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 200 times.
✗ Branch 12 not taken.
✗ Branch 14 not taken.
✓ Branch 15 taken 200 times.
✗ Branch 18 not taken.
✗ Branch 19 not taken.
200 dnse->port = remote_port.size() == 0 ? "80" : remote_port;
882
1/2
✓ Branch 2 taken 200 times.
✗ Branch 3 not taken.
200 dnse->ip = *its;
883 200 dnse->clist = NULL;
884
1/2
✓ Branch 2 taken 200 times.
✗ Branch 3 not taken.
200 dnse->clist = curl_slist_append(
885 dnse->clist,
886
4/8
✓ Branch 1 taken 200 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 200 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 200 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 200 times.
✗ Branch 11 not taken.
400 (dnse->dns_name + ":" + dnse->port + ":" + dnse->ip).c_str());
887
1/2
✓ Branch 1 taken 200 times.
✗ Branch 2 not taken.
200 dnse->sharehandle = curl_share_init();
888
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 200 times.
200 assert(dnse->sharehandle != NULL);
889
1/2
✓ Branch 1 taken 200 times.
✗ Branch 2 not taken.
200 const CURLSHcode share_retval = curl_share_setopt(
890 dnse->sharehandle, CURLSHOPT_SHARE, CURL_LOCK_DATA_DNS);
891
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 200 times.
200 assert(share_retval == CURLSHE_OK);
892
1/2
✓ Branch 1 taken 200 times.
✗ Branch 2 not taken.
200 sharehandles_->insert(dnse);
893 }
894
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 200 times.
200 if (dnse == NULL) {
895 LogCvmfs(kLogS3Fanout, kLogStderr | kLogSyslogErr,
896 "Error: DNS resolve failed for address '%s'.",
897 remote_host.c_str());
898 assert(dnse != NULL);
899 return -1;
900 }
901
1/2
✓ Branch 1 taken 200 times.
✗ Branch 2 not taken.
200 curl_sharehandles_->insert(
902 200 std::pair<CURL *, S3FanOutDnsEntry *>(handle, dnse));
903 200 dnse->counter++;
904
1/2
✓ Branch 1 taken 200 times.
✗ Branch 2 not taken.
200 InitializeDnsSettingsCurl(handle, dnse->sharehandle, dnse->clist);
905
906 200 return 0;
907 980 }
908
909
910 24440 bool S3FanoutManager::MkPayloadHash(const JobInfo &info,
911 string *hex_hash) const {
912
2/2
✓ Branch 0 taken 24340 times.
✓ Branch 1 taken 100 times.
24440 if (info.request == JobInfo::kReqHeadOnly
913
2/2
✓ Branch 0 taken 12180 times.
✓ Branch 1 taken 12160 times.
24340 || info.request == JobInfo::kReqHeadPut
914
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12180 times.
12180 || info.request == JobInfo::kReqDelete) {
915
1/4
✓ Branch 0 taken 12260 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
12260 switch (config_.authz_method) {
916 12260 case kAuthzAwsV2:
917 12260 hex_hash->clear();
918 12260 break;
919 case kAuthzAwsV4:
920 // Sha256 over empty string
921 *hex_hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b78"
922 "52b855";
923 break;
924 case kAuthzAzure:
925 // no payload hash required for Azure signature
926 hex_hash->clear();
927 break;
928 default:
929 PANIC(NULL);
930 }
931 12260 return true;
932 }
933
934 // kReqDeleteMulti is a POST with a body (XML payload), same hashing as PUT
935 // falls through intentionally.
936
937 // PUT or POST with payload
938
1/2
✓ Branch 1 taken 12180 times.
✗ Branch 2 not taken.
12180 shash::Any payload_hash(shash::kMd5);
939
940 unsigned char *data;
941 12180 const unsigned int nbytes = info.origin->Data(
942
2/4
✓ Branch 2 taken 12180 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12180 times.
✗ Branch 6 not taken.
12180 reinterpret_cast<void **>(&data), info.origin->GetSize(), 0);
943
2/4
✓ Branch 2 taken 12180 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 12180 times.
12180 assert(nbytes == info.origin->GetSize());
944
945
1/4
✓ Branch 0 taken 12180 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
12180 switch (config_.authz_method) {
946 12180 case kAuthzAwsV2:
947
1/2
✓ Branch 1 taken 12180 times.
✗ Branch 2 not taken.
12180 shash::HashMem(data, nbytes, &payload_hash);
948
2/4
✓ Branch 2 taken 12180 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12180 times.
✗ Branch 6 not taken.
36540 *hex_hash = Base64(string(reinterpret_cast<char *>(payload_hash.digest),
949 24360 payload_hash.GetDigestSize()));
950 12180 return true;
951 case kAuthzAwsV4:
952 *hex_hash = shash::Sha256Mem(data, nbytes);
953 return true;
954 case kAuthzAzure:
955 // no payload hash required for Azure signature
956 hex_hash->clear();
957 return true;
958 default:
959 PANIC(NULL);
960 }
961 }
962
963 24440 string S3FanoutManager::GetRequestString(const JobInfo &info) const {
964
3/5
✓ Branch 0 taken 12260 times.
✓ Branch 1 taken 12160 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 20 times.
✗ Branch 4 not taken.
24440 switch (info.request) {
965 12260 case JobInfo::kReqHeadOnly:
966 case JobInfo::kReqHeadPut:
967
1/2
✓ Branch 2 taken 12260 times.
✗ Branch 3 not taken.
12260 return "HEAD";
968 12160 case JobInfo::kReqPutCas:
969 case JobInfo::kReqPutDotCvmfs:
970 case JobInfo::kReqPutHtml:
971 case JobInfo::kReqPutBucket:
972
1/2
✓ Branch 2 taken 12160 times.
✗ Branch 3 not taken.
12160 return "PUT";
973 case JobInfo::kReqDelete:
974 return "DELETE";
975 20 case JobInfo::kReqDeleteMulti:
976
1/2
✓ Branch 2 taken 20 times.
✗ Branch 3 not taken.
20 return "POST";
977 default:
978 PANIC(NULL);
979 }
980 }
981
982
983 24440 string S3FanoutManager::GetContentType(const JobInfo &info) const {
984
3/6
✓ Branch 0 taken 12260 times.
✓ Branch 1 taken 12160 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 20 times.
✗ Branch 5 not taken.
24440 switch (info.request) {
985 12260 case JobInfo::kReqHeadOnly:
986 case JobInfo::kReqHeadPut:
987 case JobInfo::kReqDelete:
988
1/2
✓ Branch 2 taken 12260 times.
✗ Branch 3 not taken.
12260 return "";
989 12160 case JobInfo::kReqPutCas:
990
1/2
✓ Branch 2 taken 12160 times.
✗ Branch 3 not taken.
12160 return "application/octet-stream";
991 case JobInfo::kReqPutDotCvmfs:
992 return "application/x-cvmfs";
993 case JobInfo::kReqPutHtml:
994 return "text/html";
995 20 case JobInfo::kReqPutBucket:
996 case JobInfo::kReqDeleteMulti:
997
1/2
✓ Branch 2 taken 20 times.
✗ Branch 3 not taken.
20 return "text/xml";
998 default:
999 PANIC(NULL);
1000 }
1001 }
1002
1003
1004 /**
1005 * Request parameters set the URL and other options such as timeout and
1006 * proxy.
1007 */
1008 24440 Failures S3FanoutManager::InitializeRequest(JobInfo *info, CURL *handle) const {
1009 // Initialize internal download state
1010 24440 info->curl_handle = handle;
1011 24440 info->error_code = kFailOk;
1012 24440 info->http_error = 0;
1013 24440 info->num_retries = 0;
1014 24440 info->backoff_ms = 0;
1015 24440 info->throttle_ms = 0;
1016 24440 info->throttle_timestamp = 0;
1017 24440 info->http_headers = NULL;
1018 // info->payload_size is needed in S3Uploader::MainCollectResults,
1019 // where info->origin is already destroyed.
1020
1/2
✓ Branch 2 taken 24440 times.
✗ Branch 3 not taken.
24440 info->payload_size = info->origin->GetSize();
1021
1022
2/4
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 24440 times.
✗ Branch 5 not taken.
24440 InitializeDnsSettings(handle, complete_hostname_);
1023
1024 CURLcode retval;
1025
2/2
✓ Branch 0 taken 24340 times.
✓ Branch 1 taken 100 times.
24440 if (info->request == JobInfo::kReqHeadOnly
1026
2/2
✓ Branch 0 taken 12180 times.
✓ Branch 1 taken 12160 times.
24340 || info->request == JobInfo::kReqHeadPut
1027
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12180 times.
12180 || info->request == JobInfo::kReqDelete) {
1028
1/2
✓ Branch 1 taken 12260 times.
✗ Branch 2 not taken.
12260 retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 0);
1029
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12260 times.
12260 assert(retval == CURLE_OK);
1030
1/2
✓ Branch 1 taken 12260 times.
✗ Branch 2 not taken.
12260 retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
1031
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12260 times.
12260 assert(retval == CURLE_OK);
1032
1033
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12260 times.
12260 if (info->request == JobInfo::kReqDelete) {
1034 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST,
1035 GetRequestString(*info).c_str());
1036 assert(retval == CURLE_OK);
1037 } else {
1038
1/2
✓ Branch 1 taken 12260 times.
✗ Branch 2 not taken.
12260 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
1039
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12260 times.
12260 assert(retval == CURLE_OK);
1040 }
1041
2/2
✓ Branch 0 taken 20 times.
✓ Branch 1 taken 12160 times.
12180 } else if (info->request == JobInfo::kReqDeleteMulti) {
1042 // POST request with XML body read from origin buffer
1043
1/2
✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
20 retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 1);
1044
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20 times.
20 assert(retval == CURLE_OK);
1045
1/2
✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
20 retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 0);
1046
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20 times.
20 assert(retval == CURLE_OK);
1047
1/2
✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
20 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, "POST");
1048
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20 times.
20 assert(retval == CURLE_OK);
1049
2/4
✓ Branch 2 taken 20 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 20 times.
✗ Branch 6 not taken.
20 retval = curl_easy_setopt(handle, CURLOPT_INFILESIZE_LARGE,
1050 static_cast<curl_off_t>(info->origin->GetSize()));
1051
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20 times.
20 assert(retval == CURLE_OK);
1052 20 info->response_body.clear();
1053 } else {
1054
1/2
✓ Branch 1 taken 12160 times.
✗ Branch 2 not taken.
12160 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
1055
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12160 times.
12160 assert(retval == CURLE_OK);
1056
1/2
✓ Branch 1 taken 12160 times.
✗ Branch 2 not taken.
12160 retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 1);
1057
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12160 times.
12160 assert(retval == CURLE_OK);
1058
1/2
✓ Branch 1 taken 12160 times.
✗ Branch 2 not taken.
12160 retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 0);
1059
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12160 times.
12160 assert(retval == CURLE_OK);
1060
2/4
✓ Branch 2 taken 12160 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12160 times.
✗ Branch 6 not taken.
12160 retval = curl_easy_setopt(handle, CURLOPT_INFILESIZE_LARGE,
1061 static_cast<curl_off_t>(info->origin->GetSize()));
1062
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12160 times.
12160 assert(retval == CURLE_OK);
1063
1064
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12160 times.
12160 if (info->request == JobInfo::kReqPutDotCvmfs) {
1065 info->http_headers = curl_slist_append(
1066 info->http_headers, dot_cvmfs_cache_control_header.c_str());
1067
1/2
✓ Branch 0 taken 12160 times.
✗ Branch 1 not taken.
12160 } else if (info->request == JobInfo::kReqPutCas) {
1068
1/2
✓ Branch 1 taken 12160 times.
✗ Branch 2 not taken.
12160 info->http_headers = curl_slist_append(info->http_headers,
1069 kCacheControlCas);
1070 }
1071 }
1072
1073 bool retval_b;
1074
1075 // Authorization
1076 24440 vector<string> authz_headers;
1077
1/4
✓ Branch 0 taken 24440 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
24440 switch (config_.authz_method) {
1078 24440 case kAuthzAwsV2:
1079
1/2
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
24440 retval_b = MkV2Authz(*info, &authz_headers);
1080 24440 break;
1081 case kAuthzAwsV4:
1082 retval_b = MkV4Authz(*info, &authz_headers);
1083 break;
1084 case kAuthzAzure:
1085 retval_b = MkAzureAuthz(*info, &authz_headers);
1086 break;
1087 default:
1088 PANIC(NULL);
1089 }
1090
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 24440 times.
24440 if (!retval_b)
1091 return kFailLocalIO;
1092
2/2
✓ Branch 1 taken 97680 times.
✓ Branch 2 taken 24440 times.
122120 for (unsigned i = 0; i < authz_headers.size(); ++i) {
1093
1/2
✓ Branch 2 taken 97680 times.
✗ Branch 3 not taken.
97680 info->http_headers = curl_slist_append(info->http_headers,
1094 97680 authz_headers[i].c_str());
1095 }
1096
1097 // Common headers
1098
1/2
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
24440 info->http_headers = curl_slist_append(info->http_headers,
1099 "Connection: Keep-Alive");
1100
1/2
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
24440 info->http_headers = curl_slist_append(info->http_headers, "Pragma:");
1101 // No 100-continue
1102
1/2
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
24440 info->http_headers = curl_slist_append(info->http_headers, "Expect:");
1103 // Strip unnecessary header
1104
1/2
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
24440 info->http_headers = curl_slist_append(info->http_headers, "Accept:");
1105
1/2
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
24440 info->http_headers = curl_slist_append(info->http_headers,
1106 24440 user_agent_->c_str());
1107
1108 // Set curl parameters
1109
1/2
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
24440 retval = curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
1110
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 24440 times.
24440 assert(retval == CURLE_OK);
1111
1/2
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
24440 retval = curl_easy_setopt(handle, CURLOPT_HEADERDATA,
1112 static_cast<void *>(info));
1113
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 24440 times.
24440 assert(retval == CURLE_OK);
1114
1/2
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
24440 retval = curl_easy_setopt(handle, CURLOPT_READDATA,
1115 static_cast<void *>(info));
1116
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 24440 times.
24440 assert(retval == CURLE_OK);
1117
1/2
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
24440 retval = curl_easy_setopt(handle, CURLOPT_WRITEDATA,
1118 static_cast<void *>(info));
1119
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 24440 times.
24440 assert(retval == CURLE_OK);
1120
1/2
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
24440 retval = curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->http_headers);
1121
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 24440 times.
24440 assert(retval == CURLE_OK);
1122
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 24440 times.
24440 if (opt_ipv4_only_) {
1123 retval = curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
1124 assert(retval == CURLE_OK);
1125 }
1126 // Follow HTTP redirects
1127
1/2
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
24440 retval = curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L);
1128
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 24440 times.
24440 assert(retval == CURLE_OK);
1129
1130
1/2
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
24440 retval = curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, info->errorbuffer);
1131
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 24440 times.
24440 assert(retval == CURLE_OK);
1132
1133
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 24440 times.
24440 if (config_.protocol == "https") {
1134 retval = curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 1L);
1135 assert(retval == CURLE_OK);
1136 retval = curl_easy_setopt(handle, CURLOPT_PROXY_SSL_VERIFYPEER, 1L);
1137 assert(retval == CURLE_OK);
1138 const bool add_cert = ssl_certificate_store_.ApplySslCertificatePath(
1139 handle);
1140 assert(add_cert);
1141 }
1142
1143 24440 return kFailOk;
1144 24440 }
1145
1146
1147 /**
1148 * Sets the URL specific options such as host to use and timeout.
1149 */
1150 24440 void S3FanoutManager::SetUrlOptions(JobInfo *info) const {
1151 24440 CURL *curl_handle = info->curl_handle;
1152 CURLcode retval;
1153
1154
1/2
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
24440 retval = curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT,
1155 config_.opt_timeout_sec);
1156
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 24440 times.
24440 assert(retval == CURLE_OK);
1157
1/2
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
24440 retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT,
1158 kLowSpeedLimit);
1159
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 24440 times.
24440 assert(retval == CURLE_OK);
1160
1/2
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
24440 retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME,
1161 config_.opt_timeout_sec);
1162
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 24440 times.
24440 assert(retval == CURLE_OK);
1163
1164
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 24440 times.
24440 if (is_curl_debug_) {
1165 retval = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE, 1);
1166 assert(retval == CURLE_OK);
1167 }
1168
1169
1/2
✓ Branch 1 taken 24440 times.
✗ Branch 2 not taken.
24440 string url = MkUrl(info->object_key);
1170
2/2
✓ Branch 0 taken 20 times.
✓ Branch 1 taken 24420 times.
24440 if (info->request == JobInfo::kReqDeleteMulti)
1171
1/2
✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
20 url += "?delete";
1172
1/2
✓ Branch 2 taken 24440 times.
✗ Branch 3 not taken.
24440 retval = curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
1173
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 24440 times.
24440 assert(retval == CURLE_OK);
1174
1175
1/2
✓ Branch 2 taken 24440 times.
✗ Branch 3 not taken.
24440 retval = curl_easy_setopt(curl_handle, CURLOPT_PROXY, config_.proxy.c_str());
1176
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 24440 times.
24440 assert(retval == CURLE_OK);
1177 24440 }
1178
1179
1180 /**
1181 * Adds transfer time and uploaded bytes to the global counters.
1182 */
1183 24520 void S3FanoutManager::UpdateStatistics(CURL *handle) {
1184 double val;
1185
1186
2/4
✓ Branch 1 taken 24520 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 24520 times.
✗ Branch 4 not taken.
24520 if (curl_easy_getinfo(handle, CURLINFO_SIZE_UPLOAD, &val) == CURLE_OK)
1187 24520 statistics_->transferred_bytes += val;
1188 24520 }
1189
1190
1191 /**
1192 * Retry if possible and if not already done too often.
1193 */
1194 120 bool S3FanoutManager::CanRetry(const JobInfo *info) {
1195 120 return (info->error_code == kFailHostConnection
1196
1/2
✓ Branch 0 taken 120 times.
✗ Branch 1 not taken.
120 || info->error_code == kFailHostResolve
1197
1/2
✓ Branch 0 taken 120 times.
✗ Branch 1 not taken.
120 || info->error_code == kFailServiceUnavailable
1198
2/2
✓ Branch 0 taken 80 times.
✓ Branch 1 taken 40 times.
120 || info->error_code == kFailRetry)
1199
2/4
✓ Branch 0 taken 120 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 80 times.
✗ Branch 3 not taken.
240 && (info->num_retries < config_.opt_max_retries);
1200 }
1201
1202
1203 /**
1204 * Backoff for retry to introduce a jitter into a upload sequence.
1205 *
1206 * \return true if backoff has been performed, false otherwise
1207 */
1208 80 void S3FanoutManager::Backoff(JobInfo *info) {
1209
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 80 times.
80 if (info->error_code != kFailRetry)
1210 info->num_retries++;
1211 80 statistics_->num_retries++;
1212
1213
1/2
✓ Branch 0 taken 80 times.
✗ Branch 1 not taken.
80 if (info->throttle_ms > 0) {
1214 80 LogCvmfs(kLogS3Fanout, kLogDebug, "throttling for %d ms",
1215 info->throttle_ms);
1216 80 const uint64_t now = platform_monotonic_time();
1217
1/2
✓ Branch 0 taken 80 times.
✗ Branch 1 not taken.
80 if ((info->throttle_timestamp + (info->throttle_ms / 1000)) >= now) {
1218
2/2
✓ Branch 0 taken 20 times.
✓ Branch 1 taken 60 times.
80 if ((now - timestamp_last_throttle_report_)
1219 > kThrottleReportIntervalSec) {
1220 20 LogCvmfs(kLogS3Fanout, kLogStdout,
1221 "Warning: S3 backend throttling %ums "
1222 "(total backoff time so far %lums)",
1223 20 info->throttle_ms, statistics_->ms_throttled);
1224 20 timestamp_last_throttle_report_ = now;
1225 }
1226 80 statistics_->ms_throttled += info->throttle_ms;
1227 80 SafeSleepMs(info->throttle_ms);
1228 }
1229 } else {
1230 if (info->backoff_ms == 0) {
1231 // Must be != 0
1232 info->backoff_ms = prng_.Next(config_.opt_backoff_init_ms + 1);
1233 } else {
1234 info->backoff_ms *= 2;
1235 }
1236 if (info->backoff_ms > config_.opt_backoff_max_ms)
1237 info->backoff_ms = config_.opt_backoff_max_ms;
1238
1239 LogCvmfs(kLogS3Fanout, kLogDebug, "backing off for %d ms",
1240 info->backoff_ms);
1241 SafeSleepMs(info->backoff_ms);
1242 }
1243 80 }
1244
1245
1246 /**
1247 * Checks the result of a curl request and implements the failure logic
1248 * and takes care of cleanup.
1249 *
1250 * @return true if request should be repeated, false otherwise
1251 */
1252 24520 bool S3FanoutManager::VerifyAndFinalize(const int curl_error, JobInfo *info) {
1253 24520 LogCvmfs(kLogS3Fanout, kLogDebug,
1254 "Verify uploaded/tested object %s "
1255 "(curl error %d, info error %d, info request %d)",
1256 24520 info->object_key.c_str(), curl_error, info->error_code,
1257 24520 info->request);
1258 24520 UpdateStatistics(info->curl_handle);
1259
1260 // Verification and error classification
1261
1/6
✓ Branch 0 taken 24520 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
24520 switch (curl_error) {
1262 24520 case CURLE_OK:
1263
2/2
✓ Branch 0 taken 24440 times.
✓ Branch 1 taken 80 times.
24520 if ((info->error_code != kFailRetry)
1264
2/2
✓ Branch 0 taken 12240 times.
✓ Branch 1 taken 12200 times.
24440 && (info->error_code != kFailNotFound)) {
1265 12240 info->error_code = kFailOk;
1266 }
1267 24520 break;
1268 case CURLE_UNSUPPORTED_PROTOCOL:
1269 case CURLE_URL_MALFORMAT:
1270 info->error_code = kFailBadRequest;
1271 break;
1272 case CURLE_COULDNT_RESOLVE_HOST:
1273 info->error_code = kFailHostResolve;
1274 break;
1275 case CURLE_COULDNT_CONNECT:
1276 case CURLE_OPERATION_TIMEDOUT:
1277 case CURLE_SEND_ERROR:
1278 case CURLE_RECV_ERROR:
1279 info->error_code = kFailHostConnection;
1280 break;
1281 case CURLE_ABORTED_BY_CALLBACK:
1282 case CURLE_WRITE_ERROR:
1283 // Error set by callback
1284 break;
1285 default:
1286 LogCvmfs(kLogS3Fanout, kLogStderr | kLogSyslogErr,
1287 "unexpected curl error (%d) while trying to upload %s: %s",
1288 curl_error, info->object_key.c_str(), info->errorbuffer);
1289 info->error_code = kFailOther;
1290 break;
1291 }
1292
1293 // Transform HEAD to PUT request
1294
2/2
✓ Branch 0 taken 12200 times.
✓ Branch 1 taken 12320 times.
24520 if ((info->error_code == kFailNotFound)
1295
2/2
✓ Branch 0 taken 12160 times.
✓ Branch 1 taken 40 times.
12200 && (info->request == JobInfo::kReqHeadPut)) {
1296 12160 LogCvmfs(kLogS3Fanout, kLogDebug, "not found: %s, uploading",
1297 info->object_key.c_str());
1298 12160 info->request = JobInfo::kReqPutCas;
1299 12160 curl_slist_free_all(info->http_headers);
1300 12160 info->http_headers = NULL;
1301 12160 const s3fanout::Failures init_failure = InitializeRequest(
1302 info, info->curl_handle);
1303
1304
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12160 times.
12160 if (init_failure != s3fanout::kFailOk) {
1305 PANIC(kLogStderr,
1306 "Failed to initialize CURL handle "
1307 "(error: %d - %s | errno: %d)",
1308 init_failure, Code2Ascii(init_failure), errno);
1309 }
1310 12160 SetUrlOptions(info);
1311 // Reset origin
1312 12160 info->origin->Rewind();
1313 12160 return true; // Again, Put
1314 }
1315
1316 // Determination if failed request should be repeated
1317 12360 bool try_again = false;
1318
2/2
✓ Branch 0 taken 120 times.
✓ Branch 1 taken 12240 times.
12360 if (info->error_code != kFailOk) {
1319 120 try_again = CanRetry(info);
1320 }
1321
2/2
✓ Branch 0 taken 80 times.
✓ Branch 1 taken 12280 times.
12360 if (try_again) {
1322
1/2
✓ Branch 0 taken 80 times.
✗ Branch 1 not taken.
80 if (info->request == JobInfo::kReqPutCas
1323
1/2
✓ Branch 0 taken 80 times.
✗ Branch 1 not taken.
80 || info->request == JobInfo::kReqPutDotCvmfs
1324
1/2
✓ Branch 0 taken 80 times.
✗ Branch 1 not taken.
80 || info->request == JobInfo::kReqPutHtml
1325
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 80 times.
80 || info->request == JobInfo::kReqDeleteMulti) {
1326 LogCvmfs(kLogS3Fanout, kLogDebug, "Trying again to upload %s",
1327 info->object_key.c_str());
1328 // Reset origin
1329 info->origin->Rewind();
1330 if (info->request == JobInfo::kReqDeleteMulti)
1331 info->response_body.clear();
1332 }
1333 80 Backoff(info);
1334 80 info->error_code = kFailOk;
1335 80 info->http_error = 0;
1336 80 info->throttle_ms = 0;
1337 80 info->backoff_ms = 0;
1338 80 info->throttle_timestamp = 0;
1339 80 return true; // try again
1340 }
1341
1342 // Cleanup opened resources
1343 12280 info->origin.Destroy();
1344
1345
3/4
✓ Branch 0 taken 40 times.
✓ Branch 1 taken 12240 times.
✓ Branch 2 taken 40 times.
✗ Branch 3 not taken.
12280 if ((info->error_code != kFailOk) && (info->http_error != 0)
1346
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 40 times.
40 && (info->http_error != 404)) {
1347 LogCvmfs(kLogS3Fanout, kLogStderr, "S3: HTTP failure %d", info->http_error);
1348 }
1349 12280 return false; // stop transfer
1350 }
1351
1352
2/4
✓ Branch 3 taken 240 times.
✗ Branch 4 not taken.
✓ Branch 9 taken 240 times.
✗ Branch 10 not taken.
240 S3FanoutManager::S3FanoutManager(const S3Config &config) : config_(config) {
1353 240 atomic_init32(&multi_threaded_);
1354
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 MakePipe(pipe_terminate_);
1355
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 MakePipe(pipe_jobs_);
1356
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 MakePipe(pipe_completed_);
1357
1358 int retval;
1359 240 jobs_todo_lock_ = reinterpret_cast<pthread_mutex_t *>(
1360 240 smalloc(sizeof(pthread_mutex_t)));
1361 240 retval = pthread_mutex_init(jobs_todo_lock_, NULL);
1362
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 240 times.
240 assert(retval == 0);
1363 240 curl_handle_lock_ = reinterpret_cast<pthread_mutex_t *>(
1364 240 smalloc(sizeof(pthread_mutex_t)));
1365 240 retval = pthread_mutex_init(curl_handle_lock_, NULL);
1366
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 240 times.
240 assert(retval == 0);
1367
1368
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 active_requests_ = new set<JobInfo *>;
1369
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 pool_handles_idle_ = new set<CURL *>;
1370
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 pool_handles_inuse_ = new set<CURL *>;
1371
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 curl_sharehandles_ = new map<CURL *, S3FanOutDnsEntry *>;
1372
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 sharehandles_ = new set<S3FanOutDnsEntry *>;
1373 240 watch_fds_max_ = 4 * config_.pool_max_handles;
1374 240 max_available_jobs_ = 4 * config_.pool_max_handles;
1375
2/4
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 240 times.
✗ Branch 5 not taken.
240 available_jobs_ = new Semaphore(max_available_jobs_);
1376
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 240 times.
240 assert(NULL != available_jobs_);
1377
1378
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 statistics_ = new Statistics();
1379
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 user_agent_ = new string();
1380
2/4
✓ Branch 2 taken 240 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 240 times.
✗ Branch 6 not taken.
240 *user_agent_ = "User-Agent: cvmfs " + string(CVMFS_VERSION);
1381
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 complete_hostname_ = MkCompleteHostname();
1382
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 dot_cvmfs_cache_control_header = MkDotCvmfsCacheControlHeader();
1383
1384
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 const CURLcode cretval = curl_global_init(CURL_GLOBAL_ALL);
1385
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 240 times.
240 assert(cretval == CURLE_OK);
1386
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 curl_multi_ = curl_multi_init();
1387
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 240 times.
240 assert(curl_multi_ != NULL);
1388 CURLMcode mretval;
1389
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION,
1390 CallbackCurlSocket);
1391
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 240 times.
240 assert(mretval == CURLM_OK);
1392
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1393 static_cast<void *>(this));
1394
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 240 times.
240 assert(mretval == CURLM_OK);
1395
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 mretval = curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1396 config_.pool_max_handles);
1397
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 240 times.
240 assert(mretval == CURLM_OK);
1398
1399 240 prng_.InitLocaltime();
1400
1401 240 thread_upload_ = 0;
1402 240 timestamp_last_throttle_report_ = 0;
1403 240 is_curl_debug_ = (getenv("_CVMFS_CURL_DEBUG") != NULL);
1404
1405 // Parsing environment variables
1406 240 if ((getenv("CVMFS_IPV4_ONLY") != NULL)
1407
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 240 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 240 times.
240 && (strlen(getenv("CVMFS_IPV4_ONLY")) > 0)) {
1408 opt_ipv4_only_ = true;
1409 } else {
1410 240 opt_ipv4_only_ = false;
1411 }
1412
1413
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 resolver_ = dns::CaresResolver::Create(opt_ipv4_only_, 2, 2000);
1414
1415 240 watch_fds_ = static_cast<struct pollfd *>(smalloc(4 * sizeof(struct pollfd)));
1416 240 watch_fds_size_ = 4;
1417 240 watch_fds_inuse_ = 0;
1418
1419
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 ssl_certificate_store_.UseSystemCertificatePath();
1420 240 }
1421
1422 480 S3FanoutManager::~S3FanoutManager() {
1423 240 pthread_mutex_destroy(jobs_todo_lock_);
1424 240 free(jobs_todo_lock_);
1425 240 pthread_mutex_destroy(curl_handle_lock_);
1426 240 free(curl_handle_lock_);
1427
1428
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1429 // Shutdown I/O thread
1430 240 char buf = 'T';
1431 240 WritePipe(pipe_terminate_[1], &buf, 1);
1432 240 pthread_join(thread_upload_, NULL);
1433 }
1434 240 ClosePipe(pipe_terminate_);
1435 240 ClosePipe(pipe_jobs_);
1436 240 ClosePipe(pipe_completed_);
1437
1438 240 set<CURL *>::iterator i = pool_handles_idle_->begin();
1439 240 const set<CURL *>::const_iterator iEnd = pool_handles_idle_->end();
1440
2/2
✓ Branch 2 taken 400 times.
✓ Branch 3 taken 240 times.
640 for (; i != iEnd; ++i) {
1441 400 curl_easy_cleanup(*i);
1442 }
1443
1444 240 set<S3FanOutDnsEntry *>::iterator is = sharehandles_->begin();
1445 240 const set<S3FanOutDnsEntry *>::const_iterator isEnd = sharehandles_->end();
1446
2/2
✓ Branch 2 taken 200 times.
✓ Branch 3 taken 240 times.
440 for (; is != isEnd; ++is) {
1447 200 curl_share_cleanup((*is)->sharehandle);
1448 200 curl_slist_free_all((*is)->clist);
1449
1/2
✓ Branch 1 taken 200 times.
✗ Branch 2 not taken.
200 delete *is;
1450 }
1451 240 pool_handles_idle_->clear();
1452 240 curl_sharehandles_->clear();
1453 240 sharehandles_->clear();
1454
1/2
✓ Branch 0 taken 240 times.
✗ Branch 1 not taken.
240 delete active_requests_;
1455
1/2
✓ Branch 0 taken 240 times.
✗ Branch 1 not taken.
240 delete pool_handles_idle_;
1456
1/2
✓ Branch 0 taken 240 times.
✗ Branch 1 not taken.
240 delete pool_handles_inuse_;
1457
1/2
✓ Branch 0 taken 240 times.
✗ Branch 1 not taken.
240 delete curl_sharehandles_;
1458
1/2
✓ Branch 0 taken 240 times.
✗ Branch 1 not taken.
240 delete sharehandles_;
1459
1/2
✓ Branch 0 taken 240 times.
✗ Branch 1 not taken.
240 delete user_agent_;
1460 240 curl_multi_cleanup(curl_multi_);
1461
1462
1/2
✓ Branch 0 taken 240 times.
✗ Branch 1 not taken.
240 delete statistics_;
1463
1464
1/2
✓ Branch 0 taken 240 times.
✗ Branch 1 not taken.
240 delete available_jobs_;
1465
1466 240 curl_global_cleanup();
1467 240 }
1468
1469 /**
1470 * Spawns the I/O worker thread. No way back except ~S3FanoutManager.
1471 */
1472 240 void S3FanoutManager::Spawn() {
1473 240 LogCvmfs(kLogS3Fanout, kLogDebug, "S3FanoutManager spawned");
1474
1475 240 const int retval = pthread_create(&thread_upload_, NULL, MainUpload,
1476 static_cast<void *>(this));
1477
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 240 times.
240 assert(retval == 0);
1478
1479 240 atomic_inc32(&multi_threaded_);
1480 240 }
1481
1482 60 const Statistics &S3FanoutManager::GetStatistics() { return *statistics_; }
1483
1484 /**
1485 * Push new job to be uploaded to the S3 cloud storage.
1486 */
1487 12280 void S3FanoutManager::PushNewJob(JobInfo *info) {
1488 12280 available_jobs_->Increment();
1489 12280 WritePipe(pipe_jobs_[1], &info, sizeof(info));
1490 12280 }
1491
1492 /**
1493 * Push completed job to list of completed jobs
1494 */
1495 12520 void S3FanoutManager::PushCompletedJob(JobInfo *info) {
1496 12520 WritePipe(pipe_completed_[1], &info, sizeof(info));
1497 12520 }
1498
1499 /**
1500 * Pop completed job
1501 */
1502 12520 JobInfo *S3FanoutManager::PopCompletedJob() {
1503 JobInfo *info;
1504
1/2
✓ Branch 1 taken 12520 times.
✗ Branch 2 not taken.
12520 ReadPipe(pipe_completed_[0], &info, sizeof(info));
1505 12520 return info;
1506 }
1507
1508 //------------------------------------------------------------------------------
1509
1510
1511 string Statistics::Print() const {
1512 return "Transferred Bytes: " + StringifyInt(uint64_t(transferred_bytes))
1513 + "\n" + "Transfer duration: " + StringifyInt(uint64_t(transfer_time))
1514 + " s\n" + "Number of requests: " + StringifyInt(num_requests) + "\n"
1515 + "Number of retries: " + StringifyInt(num_retries) + "\n";
1516 }
1517
1518 } // namespace s3fanout
1519