GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/network/s3fanout.cc
Date: 2026-03-15 02:35:27
Exec Total Coverage
Lines: 636 903 70.4%
Branches: 464 1332 34.8%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 *
4 * Runs a thread using libcurls asynchronous I/O mode to push data to S3
5 */
6
7 #include "s3fanout.h"
8
9 #include <pthread.h>
10
11 #include <algorithm>
12 #include <cassert>
13 #include <cerrno>
14 #include <utility>
15
16 #include "upload_facility.h"
17 #include "util/concurrency.h"
18 #include "util/exception.h"
19 #include "util/platform.h"
20 #include "util/posix.h"
21 #include "util/string.h"
22
23 using namespace std; // NOLINT
24
25 namespace s3fanout {
26
27 /**
28 * Escapes characters that are not allowed in XML text content.
29 * Only & and < need escaping; >, ', " are safe in text nodes.
30 */
31 286 static string XmlEscape(const string &input) {
32 286 string result;
33
1/2
✓ Branch 2 taken 286 times.
✗ Branch 3 not taken.
286 result.reserve(input.size());
34
2/2
✓ Branch 1 taken 5114 times.
✓ Branch 2 taken 286 times.
5400 for (unsigned i = 0; i < input.size(); ++i) {
35
3/3
✓ Branch 1 taken 35 times.
✓ Branch 2 taken 35 times.
✓ Branch 3 taken 5044 times.
5114 switch (input[i]) {
36
1/2
✓ Branch 1 taken 35 times.
✗ Branch 2 not taken.
35 case '&': result += "&amp;"; break;
37
1/2
✓ Branch 1 taken 35 times.
✗ Branch 2 not taken.
35 case '<': result += "&lt;"; break;
38
1/2
✓ Branch 2 taken 5044 times.
✗ Branch 3 not taken.
5044 default: result += input[i]; break;
39 }
40 }
41 286 return result;
42 }
43
44
45 /**
46 * Builds an S3 DeleteObjects XML request body for multi-object delete.
47 * Uses Quiet mode so the response only contains errors, not successes.
48 */
49 181 string ComposeDeleteMultiXml(const vector<string> &keys) {
50 string xml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
51
1/2
✓ Branch 2 taken 181 times.
✗ Branch 3 not taken.
181 "<Delete><Quiet>true</Quiet>\n";
52 // ~70 bytes per <Object><Key>...</Key></Object> entry
53
1/2
✓ Branch 3 taken 181 times.
✗ Branch 4 not taken.
181 xml.reserve(xml.size() + keys.size() * 70 + 10);
54
2/2
✓ Branch 1 taken 286 times.
✓ Branch 2 taken 181 times.
467 for (unsigned i = 0; i < keys.size(); ++i) {
55
4/8
✓ Branch 2 taken 286 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 286 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 286 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 286 times.
✗ Branch 12 not taken.
286 xml += "<Object><Key>" + XmlEscape(keys[i]) + "</Key></Object>\n";
56 }
57
1/2
✓ Branch 1 taken 181 times.
✗ Branch 2 not taken.
181 xml += "</Delete>";
58 181 return xml;
59 }
60
61
62 /**
63 * Parses the S3 DeleteObjects error response XML.
64 * In Quiet mode, the response only contains <Error> elements for failed keys.
65 * Returns the number of errors found.
66 */
67 140 unsigned ParseDeleteMultiResponse(const string &response,
68 vector<string> *error_keys,
69 vector<string> *error_codes,
70 vector<string> *error_messages) {
71 140 unsigned num_errors = 0;
72 140 string::size_type pos = 0;
73
74 while (true) {
75 245 const string::size_type err_start = response.find("<Error>", pos);
76
2/2
✓ Branch 0 taken 105 times.
✓ Branch 1 taken 140 times.
245 if (err_start == string::npos)
77 105 break;
78 140 const string::size_type err_end = response.find("</Error>", err_start);
79
2/2
✓ Branch 0 taken 35 times.
✓ Branch 1 taken 105 times.
140 if (err_end == string::npos)
80 35 break;
81
82 const string error_block = response.substr(err_start,
83
1/2
✓ Branch 1 taken 105 times.
✗ Branch 2 not taken.
105 err_end - err_start);
84 105 num_errors++;
85
86 // Extract <Key>...</Key>
87 105 const string::size_type key_start = error_block.find("<Key>");
88 105 const string::size_type key_end = error_block.find("</Key>");
89
2/4
✓ Branch 0 taken 105 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 105 times.
✗ Branch 3 not taken.
105 if (key_start != string::npos && key_end != string::npos) {
90
1/2
✓ Branch 1 taken 105 times.
✗ Branch 2 not taken.
105 error_keys->push_back(
91
1/2
✓ Branch 1 taken 105 times.
✗ Branch 2 not taken.
210 error_block.substr(key_start + 5, key_end - key_start - 5));
92 } else {
93 error_keys->push_back("");
94 }
95
96 // Extract <Code>...</Code>
97 105 const string::size_type code_start = error_block.find("<Code>");
98 105 const string::size_type code_end = error_block.find("</Code>");
99
2/4
✓ Branch 0 taken 105 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 105 times.
✗ Branch 3 not taken.
105 if (code_start != string::npos && code_end != string::npos) {
100
1/2
✓ Branch 1 taken 105 times.
✗ Branch 2 not taken.
105 error_codes->push_back(
101
1/2
✓ Branch 1 taken 105 times.
✗ Branch 2 not taken.
210 error_block.substr(code_start + 6, code_end - code_start - 6));
102 } else {
103 error_codes->push_back("");
104 }
105
106 // Extract <Message>...</Message>
107 105 const string::size_type msg_start = error_block.find("<Message>");
108 105 const string::size_type msg_end = error_block.find("</Message>");
109
2/4
✓ Branch 0 taken 105 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 105 times.
✗ Branch 3 not taken.
105 if (msg_start != string::npos && msg_end != string::npos) {
110
1/2
✓ Branch 1 taken 105 times.
✗ Branch 2 not taken.
105 error_messages->push_back(
111
1/2
✓ Branch 1 taken 105 times.
✗ Branch 2 not taken.
210 error_block.substr(msg_start + 9, msg_end - msg_start - 9));
112 } else {
113 error_messages->push_back("");
114 }
115
116 105 pos = err_end + 8; // length of "</Error>"
117 105 }
118
119 140 return num_errors;
120 }
121
122 const char *S3FanoutManager::kCacheControlCas = "Cache-Control: max-age=259200";
123 const unsigned S3FanoutManager::kDefault429ThrottleMs = 250;
124 const unsigned S3FanoutManager::kMax429ThrottleMs = 10000;
125 const unsigned S3FanoutManager::kThrottleReportIntervalSec = 10;
126 const unsigned S3FanoutManager::kDefaultHTTPPort = 80;
127 const unsigned S3FanoutManager::kDefaultHTTPSPort = 443;
128
129 492 std::string S3FanoutManager::MkDotCvmfsCacheControlHeader(unsigned defaultMaxAge, int overrideMaxAge)
130 {
131 const char *var;
132 int max_age_sec;
133 char *at_null_terminator_if_number;
134 492 bool value_determined = false;
135
136
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 492 times.
492 if (overrideMaxAge >= 0) {
137 max_age_sec = overrideMaxAge;
138 value_determined = true;
139 }
140
141
1/2
✓ Branch 0 taken 492 times.
✗ Branch 1 not taken.
492 if (!value_determined) {
142 492 var = getenv("CVMFS_MAX_TTL_SECS");
143
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
492 if (var && var[0]) {
144 max_age_sec = strtoll(var, &at_null_terminator_if_number, 10);
145 if (*at_null_terminator_if_number == '\0'
146 && max_age_sec >= 0) {
147 value_determined = true;
148 }
149 }
150 }
151
152
1/2
✓ Branch 0 taken 492 times.
✗ Branch 1 not taken.
492 if (!value_determined) {
153 492 var = getenv("CVMFS_MAX_TTL");
154
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
492 if (var && var[0]) {
155 max_age_sec = strtoll(var, &at_null_terminator_if_number, 10);
156 if (*at_null_terminator_if_number == '\0'
157 && max_age_sec >= 0) {
158 max_age_sec *= 60;
159 value_determined = true;
160 }
161 }
162 }
163
164
1/2
✓ Branch 0 taken 492 times.
✗ Branch 1 not taken.
492 if (!value_determined) {
165 492 max_age_sec = defaultMaxAge;
166 }
167
168
2/4
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 492 times.
✗ Branch 5 not taken.
984 return "Cache-Control: max-age=" + std::to_string(max_age_sec);
169 }
170
171 /**
172 * Parses Retry-After and X-Retry-In headers attached to HTTP 429 responses
173 */
174 982 void S3FanoutManager::DetectThrottleIndicator(const std::string &header,
175 JobInfo *info) {
176 982 std::string value_str;
177
4/6
✓ Branch 2 taken 982 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 982 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 409 times.
✓ Branch 10 taken 573 times.
982 if (HasPrefix(header, "retry-after:", true))
178
1/2
✓ Branch 1 taken 409 times.
✗ Branch 2 not taken.
409 value_str = header.substr(12);
179
4/6
✓ Branch 2 taken 982 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 982 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 175 times.
✓ Branch 10 taken 807 times.
982 if (HasPrefix(header, "x-retry-in:", true))
180
1/2
✓ Branch 1 taken 175 times.
✗ Branch 2 not taken.
175 value_str = header.substr(11);
181
182
1/2
✓ Branch 1 taken 982 times.
✗ Branch 2 not taken.
982 value_str = Trim(value_str, true /* trim_newline */);
183
2/2
✓ Branch 1 taken 514 times.
✓ Branch 2 taken 468 times.
982 if (!value_str.empty()) {
184
1/2
✓ Branch 1 taken 514 times.
✗ Branch 2 not taken.
514 const unsigned value_numeric = String2Uint64(value_str);
185
2/4
✓ Branch 2 taken 514 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 514 times.
✗ Branch 6 not taken.
1028 const unsigned value_ms = HasSuffix(value_str, "ms", true /* ignore_case */)
186
2/2
✓ Branch 0 taken 175 times.
✓ Branch 1 taken 339 times.
514 ? value_numeric
187 514 : (value_numeric * 1000);
188
2/2
✓ Branch 0 taken 479 times.
✓ Branch 1 taken 35 times.
514 if (value_ms > 0)
189 479 info->throttle_ms = std::min(value_ms, kMax429ThrottleMs);
190 }
191 982 }
192
193
194 /**
195 * Called by curl for every HTTP header. Not called for file:// transfers.
196 */
197 150962 static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb,
198 void *info_link) {
199 150962 const size_t num_bytes = size * nmemb;
200
1/2
✓ Branch 2 taken 150962 times.
✗ Branch 3 not taken.
150962 const string header_line(static_cast<const char *>(ptr), num_bytes);
201 150962 JobInfo *info = static_cast<JobInfo *>(info_link);
202
203 // Check for http status code errors
204
4/6
✓ Branch 2 taken 150962 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 150962 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 50266 times.
✓ Branch 10 taken 100696 times.
150962 if (HasPrefix(header_line, "HTTP/1.", false)) {
205
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 50266 times.
50266 if (header_line.length() < 10)
206 return 0;
207
208 unsigned i;
209
5/6
✓ Branch 1 taken 100532 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 50266 times.
✓ Branch 5 taken 50266 times.
✓ Branch 6 taken 50266 times.
✓ Branch 7 taken 50266 times.
100532 for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {
210 }
211
212
2/2
✓ Branch 1 taken 25092 times.
✓ Branch 2 taken 25174 times.
50266 if (header_line[i] == '2') {
213 25092 return num_bytes;
214 } else {
215
1/2
✓ Branch 2 taken 25174 times.
✗ Branch 3 not taken.
25174 LogCvmfs(kLogS3Fanout, kLogDebug, "http status error code [info %p]: %s",
216 info, header_line.c_str());
217
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 25174 times.
25174 if (header_line.length() < i + 3) {
218 LogCvmfs(kLogS3Fanout, kLogStderr, "S3: invalid HTTP response '%s'",
219 header_line.c_str());
220 info->error_code = kFailOther;
221 return 0;
222 }
223
2/4
✓ Branch 3 taken 25174 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 25174 times.
✗ Branch 7 not taken.
25174 info->http_error = String2Int64(string(&header_line[i], 3));
224
225
2/7
✓ Branch 0 taken 164 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 25010 times.
✗ Branch 6 not taken.
25174 switch (info->http_error) {
226 164 case 429:
227 164 info->error_code = kFailRetry;
228 164 info->throttle_ms = S3FanoutManager::kDefault429ThrottleMs;
229 164 info->throttle_timestamp = platform_monotonic_time();
230 164 return num_bytes;
231 case 507: // Insufficient Storage
232 info->error_code = kFailInsufficientStorage;
233 break;
234 case 503:
235 case 502: // Can happen if the S3 gateway-backend connection breaks
236 case 500: // sometimes see this as a transient error from S3
237 info->error_code = kFailServiceUnavailable;
238 break;
239 case 501:
240 case 400:
241 info->error_code = kFailBadRequest;
242 break;
243 case 403:
244 info->error_code = kFailForbidden;
245 break;
246 25010 case 404:
247 25010 info->error_code = kFailNotFound;
248 25010 return num_bytes;
249 default:
250 info->error_code = kFailOther;
251 }
252 return 0;
253 }
254 }
255
256
2/2
✓ Branch 0 taken 492 times.
✓ Branch 1 taken 100204 times.
100696 if (info->error_code == kFailRetry) {
257
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 S3FanoutManager::DetectThrottleIndicator(header_line, info);
258 }
259
260 100696 return num_bytes;
261 150962 }
262
263
264 /**
265 * Called by curl for every new chunk to upload.
266 */
267 642470 static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb,
268 void *info_link) {
269 642470 const size_t num_bytes = size * nmemb;
270 642470 JobInfo *info = static_cast<JobInfo *>(info_link);
271
272 642470 LogCvmfs(kLogS3Fanout, kLogDebug, "Data callback with %zu bytes", num_bytes);
273
274
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 642470 times.
642470 if (num_bytes == 0)
275 return 0;
276
277 642470 const uint64_t read_bytes = info->origin->Read(ptr, num_bytes);
278
279 642470 LogCvmfs(kLogS3Fanout, kLogDebug, "source buffer pushed out %lu bytes",
280 read_bytes);
281
282 642470 return read_bytes;
283 }
284
285
286 /**
287 * Captures the HTTP response body. For kReqDeleteMulti, the response contains
288 * XML error information. For other requests, the body is ignored.
289 */
290 static size_t CallbackCurlBody(char *ptr, size_t size, size_t nmemb,
291 void *userdata) {
292 const size_t num_bytes = size * nmemb;
293 JobInfo *info = static_cast<JobInfo *>(userdata);
294 if (info != NULL && info->request == JobInfo::kReqDeleteMulti) {
295 info->response_body.append(ptr, num_bytes);
296 }
297 return num_bytes;
298 }
299
300
301 /**
302 * Called when new curl sockets arrive or existing curl sockets depart.
303 */
304 111479 int S3FanoutManager::CallbackCurlSocket(CURL *easy, curl_socket_t s, int action,
305 void *userp, void *socketp) {
306 111479 S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(userp);
307 111479 LogCvmfs(kLogS3Fanout, kLogDebug,
308 "CallbackCurlSocket called with easy "
309 "handle %p, socket %d, action %d, up %p, "
310 "sp %p, fds_inuse %d, jobs %d",
311 easy, s, action, userp, socketp, s3fanout_mgr->watch_fds_inuse_,
312 111479 s3fanout_mgr->available_jobs_->Get());
313
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 111479 times.
111479 if (action == CURL_POLL_NONE)
314 return 0;
315
316 // Find s in watch_fds_
317 // First 2 fds are job and terminate pipes (not curl related)
318 unsigned index;
319
2/2
✓ Branch 0 taken 175234 times.
✓ Branch 1 taken 50266 times.
225500 for (index = 2; index < s3fanout_mgr->watch_fds_inuse_; ++index) {
320
2/2
✓ Branch 0 taken 61213 times.
✓ Branch 1 taken 114021 times.
175234 if (s3fanout_mgr->watch_fds_[index].fd == s)
321 61213 break;
322 }
323 // Or create newly
324
2/2
✓ Branch 0 taken 50266 times.
✓ Branch 1 taken 61213 times.
111479 if (index == s3fanout_mgr->watch_fds_inuse_) {
325 // Extend array if necessary
326
2/2
✓ Branch 0 taken 82 times.
✓ Branch 1 taken 50184 times.
50266 if (s3fanout_mgr->watch_fds_inuse_ == s3fanout_mgr->watch_fds_size_) {
327 82 s3fanout_mgr->watch_fds_size_ *= 2;
328 82 s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
329 82 srealloc(s3fanout_mgr->watch_fds_,
330 82 s3fanout_mgr->watch_fds_size_ * sizeof(struct pollfd)));
331 }
332 50266 s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].fd = s;
333 50266 s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].events = 0;
334 50266 s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].revents = 0;
335 50266 s3fanout_mgr->watch_fds_inuse_++;
336 }
337
338
4/5
✓ Branch 0 taken 50266 times.
✓ Branch 1 taken 123 times.
✓ Branch 2 taken 10824 times.
✓ Branch 3 taken 50266 times.
✗ Branch 4 not taken.
111479 switch (action) {
339 50266 case CURL_POLL_IN:
340 50266 s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI;
341 50266 break;
342 123 case CURL_POLL_OUT:
343 123 s3fanout_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND;
344 123 break;
345 10824 case CURL_POLL_INOUT:
346 10824 s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI | POLLOUT
347 | POLLWRBAND;
348 10824 break;
349 50266 case CURL_POLL_REMOVE:
350
2/2
✓ Branch 0 taken 7872 times.
✓ Branch 1 taken 42394 times.
50266 if (index < s3fanout_mgr->watch_fds_inuse_ - 1)
351 s3fanout_mgr
352 7872 ->watch_fds_[index] = s3fanout_mgr->watch_fds_
353 7872 [s3fanout_mgr->watch_fds_inuse_ - 1];
354 50266 s3fanout_mgr->watch_fds_inuse_--;
355 // Shrink array if necessary
356
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50266 times.
50266 if ((s3fanout_mgr->watch_fds_inuse_ > s3fanout_mgr->watch_fds_max_)
357 && (s3fanout_mgr->watch_fds_inuse_
358 < s3fanout_mgr->watch_fds_size_ / 2)) {
359 s3fanout_mgr->watch_fds_size_ /= 2;
360 s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
361 srealloc(s3fanout_mgr->watch_fds_,
362 s3fanout_mgr->watch_fds_size_ * sizeof(struct pollfd)));
363 }
364 50266 break;
365 default:
366 PANIC(NULL);
367 }
368
369 111479 return 0;
370 }
371
372
373 /**
374 * Worker thread event loop.
375 */
376 492 void *S3FanoutManager::MainUpload(void *data) {
377
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread started");
378 492 S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(data);
379
380 492 s3fanout_mgr->InitPipeWatchFds();
381
382 // Don't schedule more jobs into the multi handle than the maximum number of
383 // parallel connections. This should prevent starvation and thus a timeout
384 // of the authorization header (CVM-1339).
385 492 unsigned jobs_in_flight = 0;
386
387 while (true) {
388 // Check events with 100ms timeout
389 688062 const int timeout_ms = 100;
390
1/2
✓ Branch 1 taken 688062 times.
✗ Branch 2 not taken.
688062 int retval = poll(s3fanout_mgr->watch_fds_, s3fanout_mgr->watch_fds_inuse_,
391 timeout_ms);
392
2/2
✓ Branch 0 taken 3731 times.
✓ Branch 1 taken 684331 times.
688062 if (retval == 0) {
393 // Handle timeout
394 3731 int still_running = 0;
395
1/2
✓ Branch 1 taken 3731 times.
✗ Branch 2 not taken.
3731 retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
396 CURL_SOCKET_TIMEOUT, 0, &still_running);
397
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3731 times.
3731 if (retval != CURLM_OK) {
398 LogCvmfs(kLogS3Fanout, kLogStderr, "Error, timeout due to: %d", retval);
399 assert(retval == CURLM_OK);
400 }
401
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 684331 times.
684331 } else if (retval < 0) {
402 assert(errno == EINTR);
403 continue;
404 }
405
406 // Terminate I/O thread
407
2/2
✓ Branch 0 taken 492 times.
✓ Branch 1 taken 687570 times.
688062 if (s3fanout_mgr->watch_fds_[0].revents)
408 492 break;
409
410 // New job incoming
411
2/2
✓ Branch 0 taken 25174 times.
✓ Branch 1 taken 662396 times.
687570 if (s3fanout_mgr->watch_fds_[1].revents) {
412 25174 s3fanout_mgr->watch_fds_[1].revents = 0;
413 JobInfo *info;
414
1/2
✓ Branch 1 taken 25174 times.
✗ Branch 2 not taken.
25174 ReadPipe(s3fanout_mgr->pipe_jobs_[0], &info, sizeof(info));
415
1/2
✓ Branch 1 taken 25174 times.
✗ Branch 2 not taken.
25174 CURL *handle = s3fanout_mgr->AcquireCurlHandle();
416
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 25174 times.
25174 if (handle == NULL) {
417 PANIC(kLogStderr, "Failed to acquire CURL handle.");
418 }
419
1/2
✓ Branch 1 taken 25174 times.
✗ Branch 2 not taken.
25174 const s3fanout::Failures init_failure = s3fanout_mgr->InitializeRequest(
420 info, handle);
421
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 25174 times.
25174 if (init_failure != s3fanout::kFailOk) {
422 PANIC(kLogStderr,
423 "Failed to initialize CURL handle (error: %d - %s | errno: %d)",
424 init_failure, Code2Ascii(init_failure), errno);
425 }
426
1/2
✓ Branch 1 taken 25174 times.
✗ Branch 2 not taken.
25174 s3fanout_mgr->SetUrlOptions(info);
427
428
1/2
✓ Branch 1 taken 25174 times.
✗ Branch 2 not taken.
25174 curl_multi_add_handle(s3fanout_mgr->curl_multi_, handle);
429
1/2
✓ Branch 1 taken 25174 times.
✗ Branch 2 not taken.
25174 s3fanout_mgr->active_requests_->insert(info);
430 25174 jobs_in_flight++;
431 25174 int still_running = 0, retval = 0;
432
1/2
✓ Branch 1 taken 25174 times.
✗ Branch 2 not taken.
25174 retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
433 CURL_SOCKET_TIMEOUT, 0, &still_running);
434
435
1/2
✓ Branch 1 taken 25174 times.
✗ Branch 2 not taken.
25174 LogCvmfs(kLogS3Fanout, kLogDebug, "curl_multi_socket_action: %d - %d",
436 retval, still_running);
437 }
438
439
440 // Activity on curl sockets
441 // Within this loop the curl_multi_socket_action() may cause socket(s)
442 // to be removed from watch_fds_. If a socket is removed it is replaced
443 // by the socket at the end of the array and the inuse count is decreased.
444 // Therefore loop over the array in reverse order.
445 // First 2 fds are job and terminate pipes (not curl related)
446
2/2
✓ Branch 0 taken 1566487 times.
✓ Branch 1 taken 687570 times.
2254057 for (int32_t i = s3fanout_mgr->watch_fds_inuse_ - 1; i >= 2; --i) {
447
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1566487 times.
1566487 if (static_cast<uint32_t>(i) >= s3fanout_mgr->watch_fds_inuse_) {
448 continue;
449 }
450
2/2
✓ Branch 0 taken 699747 times.
✓ Branch 1 taken 866740 times.
1566487 if (s3fanout_mgr->watch_fds_[i].revents) {
451 699747 int ev_bitmask = 0;
452
2/2
✓ Branch 0 taken 75235 times.
✓ Branch 1 taken 624512 times.
699747 if (s3fanout_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI))
453 75235 ev_bitmask |= CURL_CSELECT_IN;
454
2/2
✓ Branch 0 taken 624512 times.
✓ Branch 1 taken 75235 times.
699747 if (s3fanout_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
455 624512 ev_bitmask |= CURL_CSELECT_OUT;
456 699747 if (s3fanout_mgr->watch_fds_[i].revents
457
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 699747 times.
699747 & (POLLERR | POLLHUP | POLLNVAL))
458 ev_bitmask |= CURL_CSELECT_ERR;
459 699747 s3fanout_mgr->watch_fds_[i].revents = 0;
460
461 699747 int still_running = 0;
462 699747 retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
463
1/2
✓ Branch 1 taken 699747 times.
✗ Branch 2 not taken.
699747 s3fanout_mgr->watch_fds_[i].fd,
464 ev_bitmask,
465 &still_running);
466 }
467 }
468
469 // Check if transfers are completed
470 CURLMsg *curl_msg;
471 int msgs_in_queue;
472
3/4
✓ Branch 1 taken 737836 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 50266 times.
✓ Branch 4 taken 687570 times.
737836 while ((curl_msg = curl_multi_info_read(s3fanout_mgr->curl_multi_,
473 &msgs_in_queue))) {
474
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50266 times.
50266 assert(curl_msg->msg == CURLMSG_DONE);
475
476 50266 s3fanout_mgr->statistics_->num_requests++;
477 JobInfo *info;
478 50266 CURL *easy_handle = curl_msg->easy_handle;
479 50266 const int curl_error = curl_msg->data.result;
480
1/2
✓ Branch 1 taken 50266 times.
✗ Branch 2 not taken.
50266 curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
481
482
1/2
✓ Branch 1 taken 50266 times.
✗ Branch 2 not taken.
50266 curl_multi_remove_handle(s3fanout_mgr->curl_multi_, easy_handle);
483
3/4
✓ Branch 1 taken 50266 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 25092 times.
✓ Branch 4 taken 25174 times.
50266 if (s3fanout_mgr->VerifyAndFinalize(curl_error, info)) {
484
1/2
✓ Branch 1 taken 25092 times.
✗ Branch 2 not taken.
25092 curl_multi_add_handle(s3fanout_mgr->curl_multi_, easy_handle);
485 25092 int still_running = 0;
486
1/2
✓ Branch 1 taken 25092 times.
✗ Branch 2 not taken.
25092 curl_multi_socket_action(s3fanout_mgr->curl_multi_, CURL_SOCKET_TIMEOUT,
487 0, &still_running);
488 } else {
489 // Return easy handle into pool and write result back
490 25174 jobs_in_flight--;
491
1/2
✓ Branch 1 taken 25174 times.
✗ Branch 2 not taken.
25174 s3fanout_mgr->active_requests_->erase(info);
492
1/2
✓ Branch 1 taken 25174 times.
✗ Branch 2 not taken.
25174 s3fanout_mgr->ReleaseCurlHandle(info, easy_handle);
493
1/2
✓ Branch 1 taken 25174 times.
✗ Branch 2 not taken.
25174 s3fanout_mgr->available_jobs_->Decrement();
494
495 // Add to list of completed jobs
496
1/2
✓ Branch 1 taken 25174 times.
✗ Branch 2 not taken.
25174 s3fanout_mgr->PushCompletedJob(info);
497 }
498 }
499 687570 }
500
501 492 set<CURL *>::iterator i = s3fanout_mgr->pool_handles_inuse_->begin();
502 492 const set<CURL *>::const_iterator i_end = s3fanout_mgr->pool_handles_inuse_
503 492 ->end();
504
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 492 times.
492 for (; i != i_end; ++i) {
505 curl_multi_remove_handle(s3fanout_mgr->curl_multi_, *i);
506 curl_easy_cleanup(*i);
507 }
508 492 s3fanout_mgr->pool_handles_inuse_->clear();
509 492 free(s3fanout_mgr->watch_fds_);
510
511
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread terminated");
512 492 return NULL;
513 }
514
515
516 /**
517 * Gets an idle CURL handle from the pool. Creates a new one and adds it to
518 * the pool if necessary.
519 */
520 25174 CURL *S3FanoutManager::AcquireCurlHandle() const {
521 CURL *handle;
522
523 25174 const MutexLockGuard guard(curl_handle_lock_);
524
525
2/2
✓ Branch 1 taken 2009 times.
✓ Branch 2 taken 23165 times.
25174 if (pool_handles_idle_->empty()) {
526 CURLcode retval;
527
528 // Create a new handle
529
1/2
✓ Branch 1 taken 2009 times.
✗ Branch 2 not taken.
2009 handle = curl_easy_init();
530
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2009 times.
2009 assert(handle != NULL);
531
532 // Other settings
533
1/2
✓ Branch 1 taken 2009 times.
✗ Branch 2 not taken.
2009 retval = curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
534
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2009 times.
2009 assert(retval == CURLE_OK);
535
1/2
✓ Branch 1 taken 2009 times.
✗ Branch 2 not taken.
2009 retval = curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION,
536 CallbackCurlHeader);
537
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2009 times.
2009 assert(retval == CURLE_OK);
538
1/2
✓ Branch 1 taken 2009 times.
✗ Branch 2 not taken.
2009 retval = curl_easy_setopt(handle, CURLOPT_READFUNCTION, CallbackCurlData);
539
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2009 times.
2009 assert(retval == CURLE_OK);
540
1/2
✓ Branch 1 taken 2009 times.
✗ Branch 2 not taken.
2009 retval = curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlBody);
541
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2009 times.
2009 assert(retval == CURLE_OK);
542 // WRITEDATA is set per-request in InitializeRequest
543 } else {
544 23165 handle = *(pool_handles_idle_->begin());
545
1/2
✓ Branch 2 taken 23165 times.
✗ Branch 3 not taken.
23165 pool_handles_idle_->erase(pool_handles_idle_->begin());
546 }
547
548
1/2
✓ Branch 1 taken 25174 times.
✗ Branch 2 not taken.
25174 pool_handles_inuse_->insert(handle);
549
550 25174 return handle;
551 25174 }
552
553
554 25174 void S3FanoutManager::ReleaseCurlHandle(JobInfo *info, CURL *handle) const {
555
1/2
✓ Branch 0 taken 25174 times.
✗ Branch 1 not taken.
25174 if (info->http_headers) {
556
1/2
✓ Branch 1 taken 25174 times.
✗ Branch 2 not taken.
25174 curl_slist_free_all(info->http_headers);
557 25174 info->http_headers = NULL;
558 }
559
560 25174 const MutexLockGuard guard(curl_handle_lock_);
561
562
1/2
✓ Branch 1 taken 25174 times.
✗ Branch 2 not taken.
25174 const set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
563
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 25174 times.
25174 assert(elem != pool_handles_inuse_->end());
564
565
2/2
✓ Branch 1 taken 1189 times.
✓ Branch 2 taken 23985 times.
25174 if (pool_handles_idle_->size() > config_.pool_max_handles) {
566
1/2
✓ Branch 1 taken 1189 times.
✗ Branch 2 not taken.
1189 const CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, NULL);
567
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1189 times.
1189 assert(retval == CURLE_OK);
568
1/2
✓ Branch 1 taken 1189 times.
✗ Branch 2 not taken.
1189 curl_easy_cleanup(handle);
569 const std::map<CURL *, S3FanOutDnsEntry *>::size_type
570
1/2
✓ Branch 1 taken 1189 times.
✗ Branch 2 not taken.
1189 retitems = curl_sharehandles_->erase(handle);
571
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1189 times.
1189 assert(retitems == 1);
572 } else {
573
1/2
✓ Branch 1 taken 23985 times.
✗ Branch 2 not taken.
23985 pool_handles_idle_->insert(handle);
574 }
575
576
1/2
✓ Branch 1 taken 25174 times.
✗ Branch 2 not taken.
25174 pool_handles_inuse_->erase(elem);
577 25174 }
578
579 492 void S3FanoutManager::InitPipeWatchFds() {
580
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 492 times.
492 assert(watch_fds_inuse_ == 0);
581
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 492 times.
492 assert(watch_fds_size_ >= 2);
582 492 watch_fds_[0].fd = pipe_terminate_[0];
583 492 watch_fds_[0].events = POLLIN | POLLPRI;
584 492 watch_fds_[0].revents = 0;
585 492 ++watch_fds_inuse_;
586 492 watch_fds_[1].fd = pipe_jobs_[0];
587 492 watch_fds_[1].events = POLLIN | POLLPRI;
588 492 watch_fds_[1].revents = 0;
589 492 ++watch_fds_inuse_;
590 492 }
591
592 /**
593 * The Amazon AWS 2 authorization header according to
594 * http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html#ConstructingTheAuthenticationHeader
595 */
596 50102 bool S3FanoutManager::MkV2Authz(const JobInfo &info,
597 vector<string> *headers) const {
598 50102 string payload_hash;
599
1/2
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
50102 const bool retval = MkPayloadHash(info, &payload_hash);
600
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50102 times.
50102 if (!retval)
601 return false;
602
1/2
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
50102 const string content_type = GetContentType(info);
603
1/2
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
50102 const string request = GetRequestString(info);
604
605
1/2
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
50102 const string timestamp = RfcTimestamp();
606
5/10
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 50102 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 50102 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 50102 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 50102 times.
✗ Branch 14 not taken.
100204 string to_sign = request + "\n" + payload_hash + "\n" + content_type + "\n"
607
2/4
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 50102 times.
✗ Branch 5 not taken.
100204 + timestamp + "\n";
608
2/4
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 50102 times.
✗ Branch 4 not taken.
50102 if (config_.x_amz_acl != "") {
609
2/4
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 50102 times.
✗ Branch 5 not taken.
100204 to_sign += "x-amz-acl:" + config_.x_amz_acl + "\n" + // default ACL
610
5/10
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 50102 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 50102 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 50102 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 50102 times.
✗ Branch 14 not taken.
100204 "/" + config_.bucket + "/" + info.object_key;
611 }
612 // S3 V2 requires subresources to be included in the string to sign.
613 // For kReqDeleteMulti, object_key is intentionally empty: the canonical
614 // resource becomes "/<bucket>/?delete", matching the POST URL.
615
2/2
✓ Branch 0 taken 41 times.
✓ Branch 1 taken 50061 times.
50102 if (info.request == JobInfo::kReqDeleteMulti) {
616
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 41 times.
41 if (config_.x_amz_acl == "")
617 to_sign += "/" + config_.bucket + "/" + info.object_key;
618
1/2
✓ Branch 1 taken 41 times.
✗ Branch 2 not taken.
41 to_sign += "?delete";
619 }
620
1/2
✓ Branch 3 taken 50102 times.
✗ Branch 4 not taken.
50102 LogCvmfs(kLogS3Fanout, kLogDebug, "%s string to sign for: %s",
621 request.c_str(), info.object_key.c_str());
622
623
1/2
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
50102 shash::Any hmac;
624 50102 hmac.algorithm = shash::kSha1;
625
1/2
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
50102 shash::Hmac(config_.secret_key,
626 50102 reinterpret_cast<const unsigned char *>(to_sign.data()),
627 50102 to_sign.length(), &hmac);
628
629
3/6
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 50102 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 50102 times.
✗ Branch 8 not taken.
150306 headers->push_back("Authorization: AWS " + config_.access_key + ":"
630
3/6
✓ Branch 2 taken 50102 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 50102 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 50102 times.
✗ Branch 9 not taken.
250510 + Base64(string(reinterpret_cast<char *>(hmac.digest),
631 50102 hmac.GetDigestSize())));
632
2/4
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 50102 times.
✗ Branch 5 not taken.
50102 headers->push_back("Date: " + timestamp);
633
2/4
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 50102 times.
✗ Branch 5 not taken.
50102 headers->push_back("X-Amz-Acl: " + config_.x_amz_acl);
634
2/2
✓ Branch 1 taken 24969 times.
✓ Branch 2 taken 25133 times.
50102 if (!payload_hash.empty())
635
2/4
✓ Branch 1 taken 24969 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 24969 times.
✗ Branch 5 not taken.
24969 headers->push_back("Content-MD5: " + payload_hash);
636
2/2
✓ Branch 1 taken 24969 times.
✓ Branch 2 taken 25133 times.
50102 if (!content_type.empty())
637
2/4
✓ Branch 1 taken 24969 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 24969 times.
✗ Branch 5 not taken.
24969 headers->push_back("Content-Type: " + content_type);
638 50102 return true;
639 50102 }
640
641
642 string S3FanoutManager::GetUriEncode(const string &val,
643 bool encode_slash) const {
644 string result;
645 const unsigned len = val.length();
646 result.reserve(len);
647 for (unsigned i = 0; i < len; ++i) {
648 const char c = val[i];
649 if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')
650 || (c >= '0' && c <= '9') || c == '_' || c == '-' || c == '~'
651 || c == '.') {
652 result.push_back(c);
653 } else if (c == '/') {
654 if (encode_slash) {
655 result += "%2F";
656 } else {
657 result.push_back(c);
658 }
659 } else {
660 result.push_back('%');
661 result.push_back((c / 16) + ((c / 16 <= 9) ? '0' : 'A' - 10));
662 result.push_back((c % 16) + ((c % 16 <= 9) ? '0' : 'A' - 10));
663 }
664 }
665 return result;
666 }
667
668
669 string S3FanoutManager::GetAwsV4SigningKey(const string &date) const {
670 if (last_signing_key_.first == date)
671 return last_signing_key_.second;
672
673 const string date_key = shash::Hmac256("AWS4" + config_.secret_key, date,
674 true);
675 const string date_region_key = shash::Hmac256(date_key, config_.region, true);
676 const string date_region_service_key = shash::Hmac256(date_region_key, "s3",
677 true);
678 string signing_key = shash::Hmac256(date_region_service_key, "aws4_request",
679 true);
680 last_signing_key_.first = date;
681 last_signing_key_.second = signing_key;
682 return signing_key;
683 }
684
685
686 /**
687 * The Amazon AWS4 authorization header according to
688 * http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-auth-using-authorization-header.html
689 */
690 bool S3FanoutManager::MkV4Authz(const JobInfo &info,
691 vector<string> *headers) const {
692 string payload_hash;
693 const bool retval = MkPayloadHash(info, &payload_hash);
694 if (!retval)
695 return false;
696 const string content_type = GetContentType(info);
697 const string timestamp = IsoTimestamp();
698 const string date = timestamp.substr(0, 8);
699 vector<string> tokens = SplitString(complete_hostname_, ':');
700 assert(tokens.size() <= 2);
701 string canonical_hostname = tokens[0];
702
703 // if we could split the hostname in two and if the port is *NOT* a default
704 // one
705 if (tokens.size() == 2
706 && !((String2Uint64(tokens[1]) == kDefaultHTTPPort)
707 || (String2Uint64(tokens[1]) == kDefaultHTTPSPort)))
708 canonical_hostname += ":" + tokens[1];
709
710 string signed_headers;
711 string canonical_headers;
712 if (!content_type.empty()) {
713 signed_headers += "content-type;";
714 headers->push_back("Content-Type: " + content_type);
715 canonical_headers += "content-type:" + content_type + "\n";
716 }
717 if (config_.x_amz_acl != "") {
718 signed_headers += "host;x-amz-acl;x-amz-content-sha256;x-amz-date";
719 } else {
720 signed_headers += "host;x-amz-content-sha256;x-amz-date";
721 }
722 canonical_headers += "host:" + canonical_hostname + "\n";
723 if (config_.x_amz_acl != "") {
724 canonical_headers += "x-amz-acl:" + config_.x_amz_acl + "\n";
725 }
726 canonical_headers += "x-amz-content-sha256:" + payload_hash + "\n"
727 + "x-amz-date:" + timestamp + "\n";
728
729 const string scope = date + "/" + config_.region + "/s3/aws4_request";
730 const string uri = config_.dns_buckets ? (string("/") + info.object_key)
731 : (string("/") + config_.bucket + "/"
732 + info.object_key);
733
734 // V4 canonical query string: empty for most requests, "delete=" for
735 // multi-object delete (the S3 ?delete parameter has no value)
736 const string canonical_query = (info.request == JobInfo::kReqDeleteMulti)
737 ? "delete="
738 : "";
739
740 const string canonical_request = GetRequestString(info) + "\n"
741 + GetUriEncode(uri, false) + "\n"
742 + canonical_query + "\n"
743 + canonical_headers + "\n" + signed_headers
744 + "\n" + payload_hash;
745
746 const string hash_request = shash::Sha256String(canonical_request.c_str());
747
748 const string string_to_sign = "AWS4-HMAC-SHA256\n" + timestamp + "\n" + scope
749 + "\n" + hash_request;
750
751 const string signing_key = GetAwsV4SigningKey(date);
752 const string signature = shash::Hmac256(signing_key, string_to_sign);
753
754 headers->push_back("X-Amz-Acl: " + config_.x_amz_acl);
755 headers->push_back("X-Amz-Content-Sha256: " + payload_hash);
756 headers->push_back("X-Amz-Date: " + timestamp);
757 headers->push_back("Authorization: AWS4-HMAC-SHA256 "
758 "Credential="
759 + config_.access_key + "/" + scope
760 + ","
761 "SignedHeaders="
762 + signed_headers
763 + ","
764 "Signature="
765 + signature);
766
767 // S3 requires Content-MD5 for multi-object delete
768 if (info.request == JobInfo::kReqDeleteMulti) {
769 shash::Any md5_hash(shash::kMd5);
770 unsigned char *data;
771 const unsigned int nbytes = info.origin->Data(
772 reinterpret_cast<void **>(&data), info.origin->GetSize(), 0);
773 assert(nbytes == info.origin->GetSize());
774 shash::HashMem(data, nbytes, &md5_hash);
775 headers->push_back(
776 "Content-MD5: "
777 + Base64(string(reinterpret_cast<char *>(md5_hash.digest),
778 md5_hash.GetDigestSize())));
779 }
780 return true;
781 }
782
783 /**
784 * The Azure Blob authorization header according to
785 * https://docs.microsoft.com/en-us/rest/api/storageservices/authorize-with-shared-key
786 */
787 bool S3FanoutManager::MkAzureAuthz(const JobInfo &info,
788 vector<string> *headers) const {
789 const string timestamp = RfcTimestamp();
790 const string canonical_headers = "x-ms-blob-type:BlockBlob\nx-ms-date:"
791 + timestamp + "\nx-ms-version:2011-08-18";
792 const string canonical_resource = "/" + config_.access_key + "/"
793 + config_.bucket + "/" + info.object_key;
794
795 string string_to_sign;
796 if ((info.request == JobInfo::kReqHeadOnly)
797 || (info.request == JobInfo::kReqHeadPut)
798 || (info.request == JobInfo::kReqDelete)) {
799 string_to_sign = GetRequestString(info) + string("\n\n\n")
800 + "\n\n\n\n\n\n\n\n\n" + canonical_headers + "\n"
801 + canonical_resource;
802 } else {
803 string_to_sign = GetRequestString(info) + string("\n\n\n")
804 + string(StringifyInt(info.origin->GetSize()))
805 + "\n\n\n\n\n\n\n\n\n" + canonical_headers + "\n"
806 + canonical_resource;
807 }
808
809 string signing_key;
810 const int retval = Debase64(config_.secret_key, &signing_key);
811 if (!retval)
812 return false;
813
814 const string signature = shash::Hmac256(signing_key, string_to_sign, true);
815
816 headers->push_back("x-ms-date: " + timestamp);
817 headers->push_back("x-ms-version: 2011-08-18");
818 headers->push_back("Authorization: SharedKey " + config_.access_key + ":"
819 + Base64(signature));
820 headers->push_back("x-ms-blob-type: BlockBlob");
821 return true;
822 }
823
824 50102 void S3FanoutManager::InitializeDnsSettingsCurl(CURL *handle,
825 CURLSH *sharehandle,
826 curl_slist *clist) const {
827 50102 CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, sharehandle);
828
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50102 times.
50102 assert(retval == CURLE_OK);
829 50102 retval = curl_easy_setopt(handle, CURLOPT_RESOLVE, clist);
830
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50102 times.
50102 assert(retval == CURLE_OK);
831 50102 }
832
833
834 50102 int S3FanoutManager::InitializeDnsSettings(CURL *handle,
835 std::string host_with_port) const {
836 // Use existing handle
837 const std::map<CURL *, S3FanOutDnsEntry *>::const_iterator
838
1/2
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
50102 it = curl_sharehandles_->find(handle);
839
2/2
✓ Branch 3 taken 48093 times.
✓ Branch 4 taken 2009 times.
50102 if (it != curl_sharehandles_->end()) {
840
1/2
✓ Branch 1 taken 48093 times.
✗ Branch 2 not taken.
48093 InitializeDnsSettingsCurl(handle, it->second->sharehandle,
841 48093 it->second->clist);
842 48093 return 0;
843 }
844
845 // Add protocol information for extraction of fields for DNS
846
2/4
✓ Branch 1 taken 2009 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2009 times.
✗ Branch 4 not taken.
2009 if (!IsHttpUrl(host_with_port))
847
2/4
✓ Branch 1 taken 2009 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2009 times.
✗ Branch 5 not taken.
2009 host_with_port = config_.protocol + "://" + host_with_port;
848
1/2
✓ Branch 1 taken 2009 times.
✗ Branch 2 not taken.
2009 const std::string remote_host = dns::ExtractHost(host_with_port);
849
1/2
✓ Branch 1 taken 2009 times.
✗ Branch 2 not taken.
2009 const std::string remote_port = dns::ExtractPort(host_with_port);
850
851 // If we have the name already resolved, use the least used IP
852 2009 S3FanOutDnsEntry *useme = NULL;
853 2009 unsigned int usemin = UINT_MAX;
854 2009 std::set<S3FanOutDnsEntry *>::iterator its3 = sharehandles_->begin();
855
2/2
✓ Branch 3 taken 1599 times.
✓ Branch 4 taken 2009 times.
3608 for (; its3 != sharehandles_->end(); ++its3) {
856
1/2
✓ Branch 2 taken 1599 times.
✗ Branch 3 not taken.
1599 if ((*its3)->dns_name == remote_host) {
857
1/2
✓ Branch 1 taken 1599 times.
✗ Branch 2 not taken.
1599 if (usemin >= (*its3)->counter) {
858 1599 usemin = (*its3)->counter;
859 1599 useme = (*its3);
860 }
861 }
862 }
863
2/2
✓ Branch 0 taken 1599 times.
✓ Branch 1 taken 410 times.
2009 if (useme != NULL) {
864
1/2
✓ Branch 1 taken 1599 times.
✗ Branch 2 not taken.
1599 curl_sharehandles_->insert(
865 1599 std::pair<CURL *, S3FanOutDnsEntry *>(handle, useme));
866 1599 useme->counter++;
867
1/2
✓ Branch 1 taken 1599 times.
✗ Branch 2 not taken.
1599 InitializeDnsSettingsCurl(handle, useme->sharehandle, useme->clist);
868 1599 return 0;
869 }
870
871 // We need to resolve the hostname
872 // TODO(ssheikki): support ipv6 also... if (opt_ipv4_only_)
873
1/2
✓ Branch 1 taken 410 times.
✗ Branch 2 not taken.
410 const dns::Host host = resolver_->Resolve(remote_host);
874
1/2
✓ Branch 2 taken 410 times.
✗ Branch 3 not taken.
410 set<string> ipv4_addresses = host.ipv4_addresses();
875 410 std::set<string>::iterator its = ipv4_addresses.begin();
876 410 S3FanOutDnsEntry *dnse = NULL;
877
2/2
✓ Branch 3 taken 410 times.
✓ Branch 4 taken 410 times.
820 for (; its != ipv4_addresses.end(); ++its) {
878
2/4
✓ Branch 1 taken 410 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 410 times.
✗ Branch 5 not taken.
410 dnse = new S3FanOutDnsEntry();
879 410 dnse->counter = 0;
880
1/2
✓ Branch 1 taken 410 times.
✗ Branch 2 not taken.
410 dnse->dns_name = remote_host;
881
4/12
✗ Branch 1 not taken.
✓ Branch 2 taken 410 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 8 taken 410 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 410 times.
✗ Branch 12 not taken.
✗ Branch 14 not taken.
✓ Branch 15 taken 410 times.
✗ Branch 18 not taken.
✗ Branch 19 not taken.
410 dnse->port = remote_port.size() == 0 ? "80" : remote_port;
882
1/2
✓ Branch 2 taken 410 times.
✗ Branch 3 not taken.
410 dnse->ip = *its;
883 410 dnse->clist = NULL;
884
1/2
✓ Branch 2 taken 410 times.
✗ Branch 3 not taken.
410 dnse->clist = curl_slist_append(
885 dnse->clist,
886
4/8
✓ Branch 1 taken 410 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 410 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 410 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 410 times.
✗ Branch 11 not taken.
820 (dnse->dns_name + ":" + dnse->port + ":" + dnse->ip).c_str());
887
1/2
✓ Branch 1 taken 410 times.
✗ Branch 2 not taken.
410 dnse->sharehandle = curl_share_init();
888
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 410 times.
410 assert(dnse->sharehandle != NULL);
889
1/2
✓ Branch 1 taken 410 times.
✗ Branch 2 not taken.
410 const CURLSHcode share_retval = curl_share_setopt(
890 dnse->sharehandle, CURLSHOPT_SHARE, CURL_LOCK_DATA_DNS);
891
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 410 times.
410 assert(share_retval == CURLSHE_OK);
892
1/2
✓ Branch 1 taken 410 times.
✗ Branch 2 not taken.
410 sharehandles_->insert(dnse);
893 }
894
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 410 times.
410 if (dnse == NULL) {
895 LogCvmfs(kLogS3Fanout, kLogStderr | kLogSyslogErr,
896 "Error: DNS resolve failed for address '%s'.",
897 remote_host.c_str());
898 assert(dnse != NULL);
899 return -1;
900 }
901
1/2
✓ Branch 1 taken 410 times.
✗ Branch 2 not taken.
410 curl_sharehandles_->insert(
902 410 std::pair<CURL *, S3FanOutDnsEntry *>(handle, dnse));
903 410 dnse->counter++;
904
1/2
✓ Branch 1 taken 410 times.
✗ Branch 2 not taken.
410 InitializeDnsSettingsCurl(handle, dnse->sharehandle, dnse->clist);
905
906 410 return 0;
907 2009 }
908
909
910 50102 bool S3FanoutManager::MkPayloadHash(const JobInfo &info,
911 string *hex_hash) const {
912
2/2
✓ Branch 0 taken 49897 times.
✓ Branch 1 taken 205 times.
50102 if (info.request == JobInfo::kReqHeadOnly
913
2/2
✓ Branch 0 taken 24969 times.
✓ Branch 1 taken 24928 times.
49897 || info.request == JobInfo::kReqHeadPut
914
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 24969 times.
24969 || info.request == JobInfo::kReqDelete) {
915
1/4
✓ Branch 0 taken 25133 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
25133 switch (config_.authz_method) {
916 25133 case kAuthzAwsV2:
917 25133 hex_hash->clear();
918 25133 break;
919 case kAuthzAwsV4:
920 // Sha256 over empty string
921 *hex_hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b78"
922 "52b855";
923 break;
924 case kAuthzAzure:
925 // no payload hash required for Azure signature
926 hex_hash->clear();
927 break;
928 default:
929 PANIC(NULL);
930 }
931 25133 return true;
932 }
933
934 // kReqDeleteMulti is a POST with a body (XML payload), same hashing as PUT
935 // falls through intentionally.
936
937 // PUT or POST with payload
938
1/2
✓ Branch 1 taken 24969 times.
✗ Branch 2 not taken.
24969 shash::Any payload_hash(shash::kMd5);
939
940 unsigned char *data;
941 24969 const unsigned int nbytes = info.origin->Data(
942
2/4
✓ Branch 2 taken 24969 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 24969 times.
✗ Branch 6 not taken.
24969 reinterpret_cast<void **>(&data), info.origin->GetSize(), 0);
943
2/4
✓ Branch 2 taken 24969 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 24969 times.
24969 assert(nbytes == info.origin->GetSize());
944
945
1/4
✓ Branch 0 taken 24969 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
24969 switch (config_.authz_method) {
946 24969 case kAuthzAwsV2:
947
1/2
✓ Branch 1 taken 24969 times.
✗ Branch 2 not taken.
24969 shash::HashMem(data, nbytes, &payload_hash);
948
2/4
✓ Branch 2 taken 24969 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 24969 times.
✗ Branch 6 not taken.
74907 *hex_hash = Base64(string(reinterpret_cast<char *>(payload_hash.digest),
949 49938 payload_hash.GetDigestSize()));
950 24969 return true;
951 case kAuthzAwsV4:
952 *hex_hash = shash::Sha256Mem(data, nbytes);
953 return true;
954 case kAuthzAzure:
955 // no payload hash required for Azure signature
956 hex_hash->clear();
957 return true;
958 default:
959 PANIC(NULL);
960 }
961 }
962
963 50102 string S3FanoutManager::GetRequestString(const JobInfo &info) const {
964
3/5
✓ Branch 0 taken 25133 times.
✓ Branch 1 taken 24928 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 41 times.
✗ Branch 4 not taken.
50102 switch (info.request) {
965 25133 case JobInfo::kReqHeadOnly:
966 case JobInfo::kReqHeadPut:
967
1/2
✓ Branch 2 taken 25133 times.
✗ Branch 3 not taken.
25133 return "HEAD";
968 24928 case JobInfo::kReqPutCas:
969 case JobInfo::kReqPutDotCvmfs:
970 case JobInfo::kReqPutHtml:
971 case JobInfo::kReqPutBucket:
972
1/2
✓ Branch 2 taken 24928 times.
✗ Branch 3 not taken.
24928 return "PUT";
973 case JobInfo::kReqDelete:
974 return "DELETE";
975 41 case JobInfo::kReqDeleteMulti:
976
1/2
✓ Branch 2 taken 41 times.
✗ Branch 3 not taken.
41 return "POST";
977 default:
978 PANIC(NULL);
979 }
980 }
981
982
983 50102 string S3FanoutManager::GetContentType(const JobInfo &info) const {
984
3/6
✓ Branch 0 taken 25133 times.
✓ Branch 1 taken 24928 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 41 times.
✗ Branch 5 not taken.
50102 switch (info.request) {
985 25133 case JobInfo::kReqHeadOnly:
986 case JobInfo::kReqHeadPut:
987 case JobInfo::kReqDelete:
988
1/2
✓ Branch 2 taken 25133 times.
✗ Branch 3 not taken.
25133 return "";
989 24928 case JobInfo::kReqPutCas:
990
1/2
✓ Branch 2 taken 24928 times.
✗ Branch 3 not taken.
24928 return "application/octet-stream";
991 case JobInfo::kReqPutDotCvmfs:
992 return "application/x-cvmfs";
993 case JobInfo::kReqPutHtml:
994 return "text/html";
995 41 case JobInfo::kReqPutBucket:
996 case JobInfo::kReqDeleteMulti:
997
1/2
✓ Branch 2 taken 41 times.
✗ Branch 3 not taken.
41 return "text/xml";
998 default:
999 PANIC(NULL);
1000 }
1001 }
1002
1003
1004 /**
1005 * Request parameters set the URL and other options such as timeout and
1006 * proxy.
1007 */
1008 50102 Failures S3FanoutManager::InitializeRequest(JobInfo *info, CURL *handle) const {
1009 // Initialize internal download state
1010 50102 info->curl_handle = handle;
1011 50102 info->error_code = kFailOk;
1012 50102 info->http_error = 0;
1013 50102 info->num_retries = 0;
1014 50102 info->backoff_ms = 0;
1015 50102 info->throttle_ms = 0;
1016 50102 info->throttle_timestamp = 0;
1017 50102 info->http_headers = NULL;
1018 // info->payload_size is needed in S3Uploader::MainCollectResults,
1019 // where info->origin is already destroyed.
1020
1/2
✓ Branch 2 taken 50102 times.
✗ Branch 3 not taken.
50102 info->payload_size = info->origin->GetSize();
1021
1022
2/4
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 50102 times.
✗ Branch 5 not taken.
50102 InitializeDnsSettings(handle, complete_hostname_);
1023
1024 CURLcode retval;
1025
2/2
✓ Branch 0 taken 49897 times.
✓ Branch 1 taken 205 times.
50102 if (info->request == JobInfo::kReqHeadOnly
1026
2/2
✓ Branch 0 taken 24969 times.
✓ Branch 1 taken 24928 times.
49897 || info->request == JobInfo::kReqHeadPut
1027
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 24969 times.
24969 || info->request == JobInfo::kReqDelete) {
1028
1/2
✓ Branch 1 taken 25133 times.
✗ Branch 2 not taken.
25133 retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 0);
1029
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 25133 times.
25133 assert(retval == CURLE_OK);
1030
1/2
✓ Branch 1 taken 25133 times.
✗ Branch 2 not taken.
25133 retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
1031
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 25133 times.
25133 assert(retval == CURLE_OK);
1032
1033
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 25133 times.
25133 if (info->request == JobInfo::kReqDelete) {
1034 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST,
1035 GetRequestString(*info).c_str());
1036 assert(retval == CURLE_OK);
1037 } else {
1038
1/2
✓ Branch 1 taken 25133 times.
✗ Branch 2 not taken.
25133 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
1039
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 25133 times.
25133 assert(retval == CURLE_OK);
1040 }
1041
2/2
✓ Branch 0 taken 41 times.
✓ Branch 1 taken 24928 times.
24969 } else if (info->request == JobInfo::kReqDeleteMulti) {
1042 // POST request with XML body read from origin buffer
1043
1/2
✓ Branch 1 taken 41 times.
✗ Branch 2 not taken.
41 retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 1);
1044
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41 times.
41 assert(retval == CURLE_OK);
1045
1/2
✓ Branch 1 taken 41 times.
✗ Branch 2 not taken.
41 retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 0);
1046
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41 times.
41 assert(retval == CURLE_OK);
1047
1/2
✓ Branch 1 taken 41 times.
✗ Branch 2 not taken.
41 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, "POST");
1048
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41 times.
41 assert(retval == CURLE_OK);
1049
2/4
✓ Branch 2 taken 41 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 41 times.
✗ Branch 6 not taken.
41 retval = curl_easy_setopt(handle, CURLOPT_INFILESIZE_LARGE,
1050 static_cast<curl_off_t>(info->origin->GetSize()));
1051
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41 times.
41 assert(retval == CURLE_OK);
1052 41 info->response_body.clear();
1053 } else {
1054
1/2
✓ Branch 1 taken 24928 times.
✗ Branch 2 not taken.
24928 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
1055
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 24928 times.
24928 assert(retval == CURLE_OK);
1056
1/2
✓ Branch 1 taken 24928 times.
✗ Branch 2 not taken.
24928 retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 1);
1057
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 24928 times.
24928 assert(retval == CURLE_OK);
1058
1/2
✓ Branch 1 taken 24928 times.
✗ Branch 2 not taken.
24928 retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 0);
1059
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 24928 times.
24928 assert(retval == CURLE_OK);
1060
2/4
✓ Branch 2 taken 24928 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 24928 times.
✗ Branch 6 not taken.
24928 retval = curl_easy_setopt(handle, CURLOPT_INFILESIZE_LARGE,
1061 static_cast<curl_off_t>(info->origin->GetSize()));
1062
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 24928 times.
24928 assert(retval == CURLE_OK);
1063
1064
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 24928 times.
24928 if (info->request == JobInfo::kReqPutDotCvmfs) {
1065 info->http_headers = curl_slist_append(
1066 info->http_headers, dot_cvmfs_cache_control_header.c_str());
1067
1/2
✓ Branch 0 taken 24928 times.
✗ Branch 1 not taken.
24928 } else if (info->request == JobInfo::kReqPutCas) {
1068
1/2
✓ Branch 1 taken 24928 times.
✗ Branch 2 not taken.
24928 info->http_headers = curl_slist_append(info->http_headers,
1069 kCacheControlCas);
1070 }
1071 }
1072
1073 bool retval_b;
1074
1075 // Authorization
1076 50102 vector<string> authz_headers;
1077
1/4
✓ Branch 0 taken 50102 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
50102 switch (config_.authz_method) {
1078 50102 case kAuthzAwsV2:
1079
1/2
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
50102 retval_b = MkV2Authz(*info, &authz_headers);
1080 50102 break;
1081 case kAuthzAwsV4:
1082 retval_b = MkV4Authz(*info, &authz_headers);
1083 break;
1084 case kAuthzAzure:
1085 retval_b = MkAzureAuthz(*info, &authz_headers);
1086 break;
1087 default:
1088 PANIC(NULL);
1089 }
1090
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50102 times.
50102 if (!retval_b)
1091 return kFailLocalIO;
1092
2/2
✓ Branch 1 taken 200244 times.
✓ Branch 2 taken 50102 times.
250346 for (unsigned i = 0; i < authz_headers.size(); ++i) {
1093
1/2
✓ Branch 2 taken 200244 times.
✗ Branch 3 not taken.
200244 info->http_headers = curl_slist_append(info->http_headers,
1094 200244 authz_headers[i].c_str());
1095 }
1096
1097 // Common headers
1098
1/2
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
50102 info->http_headers = curl_slist_append(info->http_headers,
1099 "Connection: Keep-Alive");
1100
1/2
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
50102 info->http_headers = curl_slist_append(info->http_headers, "Pragma:");
1101 // No 100-continue
1102
1/2
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
50102 info->http_headers = curl_slist_append(info->http_headers, "Expect:");
1103 // Strip unnecessary header
1104
1/2
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
50102 info->http_headers = curl_slist_append(info->http_headers, "Accept:");
1105
1/2
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
50102 info->http_headers = curl_slist_append(info->http_headers,
1106 50102 user_agent_->c_str());
1107
1108 // Set curl parameters
1109
1/2
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
50102 retval = curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
1110
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50102 times.
50102 assert(retval == CURLE_OK);
1111
1/2
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
50102 retval = curl_easy_setopt(handle, CURLOPT_HEADERDATA,
1112 static_cast<void *>(info));
1113
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50102 times.
50102 assert(retval == CURLE_OK);
1114
1/2
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
50102 retval = curl_easy_setopt(handle, CURLOPT_READDATA,
1115 static_cast<void *>(info));
1116
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50102 times.
50102 assert(retval == CURLE_OK);
1117
1/2
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
50102 retval = curl_easy_setopt(handle, CURLOPT_WRITEDATA,
1118 static_cast<void *>(info));
1119
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50102 times.
50102 assert(retval == CURLE_OK);
1120
1/2
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
50102 retval = curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->http_headers);
1121
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50102 times.
50102 assert(retval == CURLE_OK);
1122
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50102 times.
50102 if (opt_ipv4_only_) {
1123 retval = curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
1124 assert(retval == CURLE_OK);
1125 }
1126 // Follow HTTP redirects
1127
1/2
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
50102 retval = curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L);
1128
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50102 times.
50102 assert(retval == CURLE_OK);
1129
1130
1/2
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
50102 retval = curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, info->errorbuffer);
1131
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50102 times.
50102 assert(retval == CURLE_OK);
1132
1133
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 50102 times.
50102 if (config_.protocol == "https") {
1134 retval = curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 1L);
1135 assert(retval == CURLE_OK);
1136 retval = curl_easy_setopt(handle, CURLOPT_PROXY_SSL_VERIFYPEER, 1L);
1137 assert(retval == CURLE_OK);
1138 const bool add_cert = ssl_certificate_store_.ApplySslCertificatePath(
1139 handle);
1140 assert(add_cert);
1141 }
1142
1143 50102 return kFailOk;
1144 50102 }
1145
1146
1147 /**
1148 * Sets the URL specific options such as host to use and timeout.
1149 */
1150 50102 void S3FanoutManager::SetUrlOptions(JobInfo *info) const {
1151 50102 CURL *curl_handle = info->curl_handle;
1152 CURLcode retval;
1153
1154
1/2
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
50102 retval = curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT,
1155 config_.opt_timeout_sec);
1156
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50102 times.
50102 assert(retval == CURLE_OK);
1157
1/2
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
50102 retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT,
1158 kLowSpeedLimit);
1159
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50102 times.
50102 assert(retval == CURLE_OK);
1160
1/2
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
50102 retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME,
1161 config_.opt_timeout_sec);
1162
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50102 times.
50102 assert(retval == CURLE_OK);
1163
1164
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50102 times.
50102 if (is_curl_debug_) {
1165 retval = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE, 1);
1166 assert(retval == CURLE_OK);
1167 }
1168
1169
1/2
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
50102 string url = MkUrl(info->object_key);
1170
2/2
✓ Branch 0 taken 41 times.
✓ Branch 1 taken 50061 times.
50102 if (info->request == JobInfo::kReqDeleteMulti)
1171
1/2
✓ Branch 1 taken 41 times.
✗ Branch 2 not taken.
41 url += "?delete";
1172
1/2
✓ Branch 2 taken 50102 times.
✗ Branch 3 not taken.
50102 retval = curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
1173
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50102 times.
50102 assert(retval == CURLE_OK);
1174
1175
1/2
✓ Branch 2 taken 50102 times.
✗ Branch 3 not taken.
50102 retval = curl_easy_setopt(curl_handle, CURLOPT_PROXY, config_.proxy.c_str());
1176
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50102 times.
50102 assert(retval == CURLE_OK);
1177 50102 }
1178
1179
1180 /**
1181 * Adds transfer time and uploaded bytes to the global counters.
1182 */
1183 50266 void S3FanoutManager::UpdateStatistics(CURL *handle) {
1184 double val;
1185
1186
2/4
✓ Branch 1 taken 50266 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 50266 times.
✗ Branch 4 not taken.
50266 if (curl_easy_getinfo(handle, CURLINFO_SIZE_UPLOAD, &val) == CURLE_OK)
1187 50266 statistics_->transferred_bytes += val;
1188 50266 }
1189
1190
1191 /**
1192 * Retry if possible and if not already done too often.
1193 */
1194 246 bool S3FanoutManager::CanRetry(const JobInfo *info) {
1195 246 return (info->error_code == kFailHostConnection
1196
1/2
✓ Branch 0 taken 246 times.
✗ Branch 1 not taken.
246 || info->error_code == kFailHostResolve
1197
1/2
✓ Branch 0 taken 246 times.
✗ Branch 1 not taken.
246 || info->error_code == kFailServiceUnavailable
1198
2/2
✓ Branch 0 taken 164 times.
✓ Branch 1 taken 82 times.
246 || info->error_code == kFailRetry)
1199
2/4
✓ Branch 0 taken 246 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 164 times.
✗ Branch 3 not taken.
492 && (info->num_retries < config_.opt_max_retries);
1200 }
1201
1202
1203 /**
1204 * Backoff for retry to introduce a jitter into a upload sequence.
1205 *
1206 * \return true if backoff has been performed, false otherwise
1207 */
1208 164 void S3FanoutManager::Backoff(JobInfo *info) {
1209
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 164 times.
164 if (info->error_code != kFailRetry)
1210 info->num_retries++;
1211 164 statistics_->num_retries++;
1212
1213
1/2
✓ Branch 0 taken 164 times.
✗ Branch 1 not taken.
164 if (info->throttle_ms > 0) {
1214 164 LogCvmfs(kLogS3Fanout, kLogDebug, "throttling for %d ms",
1215 info->throttle_ms);
1216 164 const uint64_t now = platform_monotonic_time();
1217
1/2
✓ Branch 0 taken 164 times.
✗ Branch 1 not taken.
164 if ((info->throttle_timestamp + (info->throttle_ms / 1000)) >= now) {
1218
2/2
✓ Branch 0 taken 41 times.
✓ Branch 1 taken 123 times.
164 if ((now - timestamp_last_throttle_report_)
1219 > kThrottleReportIntervalSec) {
1220 41 LogCvmfs(kLogS3Fanout, kLogStdout,
1221 "Warning: S3 backend throttling %ums "
1222 "(total backoff time so far %lums)",
1223 41 info->throttle_ms, statistics_->ms_throttled);
1224 41 timestamp_last_throttle_report_ = now;
1225 }
1226 164 statistics_->ms_throttled += info->throttle_ms;
1227 164 SafeSleepMs(info->throttle_ms);
1228 }
1229 } else {
1230 if (info->backoff_ms == 0) {
1231 // Must be != 0
1232 info->backoff_ms = prng_.Next(config_.opt_backoff_init_ms + 1);
1233 } else {
1234 info->backoff_ms *= 2;
1235 }
1236 if (info->backoff_ms > config_.opt_backoff_max_ms)
1237 info->backoff_ms = config_.opt_backoff_max_ms;
1238
1239 LogCvmfs(kLogS3Fanout, kLogDebug, "backing off for %d ms",
1240 info->backoff_ms);
1241 SafeSleepMs(info->backoff_ms);
1242 }
1243 164 }
1244
1245
1246 /**
1247 * Checks the result of a curl request and implements the failure logic
1248 * and takes care of cleanup.
1249 *
1250 * @return true if request should be repeated, false otherwise
1251 */
1252 50266 bool S3FanoutManager::VerifyAndFinalize(const int curl_error, JobInfo *info) {
1253 50266 LogCvmfs(kLogS3Fanout, kLogDebug,
1254 "Verify uploaded/tested object %s "
1255 "(curl error %d, info error %d, info request %d)",
1256 50266 info->object_key.c_str(), curl_error, info->error_code,
1257 50266 info->request);
1258 50266 UpdateStatistics(info->curl_handle);
1259
1260 // Verification and error classification
1261
1/6
✓ Branch 0 taken 50266 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
50266 switch (curl_error) {
1262 50266 case CURLE_OK:
1263
2/2
✓ Branch 0 taken 50102 times.
✓ Branch 1 taken 164 times.
50266 if ((info->error_code != kFailRetry)
1264
2/2
✓ Branch 0 taken 25092 times.
✓ Branch 1 taken 25010 times.
50102 && (info->error_code != kFailNotFound)) {
1265 25092 info->error_code = kFailOk;
1266 }
1267 50266 break;
1268 case CURLE_UNSUPPORTED_PROTOCOL:
1269 case CURLE_URL_MALFORMAT:
1270 info->error_code = kFailBadRequest;
1271 break;
1272 case CURLE_COULDNT_RESOLVE_HOST:
1273 info->error_code = kFailHostResolve;
1274 break;
1275 case CURLE_COULDNT_CONNECT:
1276 case CURLE_OPERATION_TIMEDOUT:
1277 case CURLE_SEND_ERROR:
1278 case CURLE_RECV_ERROR:
1279 info->error_code = kFailHostConnection;
1280 break;
1281 case CURLE_ABORTED_BY_CALLBACK:
1282 case CURLE_WRITE_ERROR:
1283 // Error set by callback
1284 break;
1285 default:
1286 LogCvmfs(kLogS3Fanout, kLogStderr | kLogSyslogErr,
1287 "unexpected curl error (%d) while trying to upload %s: %s",
1288 curl_error, info->object_key.c_str(), info->errorbuffer);
1289 info->error_code = kFailOther;
1290 break;
1291 }
1292
1293 // Transform HEAD to PUT request
1294
2/2
✓ Branch 0 taken 25010 times.
✓ Branch 1 taken 25256 times.
50266 if ((info->error_code == kFailNotFound)
1295
2/2
✓ Branch 0 taken 24928 times.
✓ Branch 1 taken 82 times.
25010 && (info->request == JobInfo::kReqHeadPut)) {
1296 24928 LogCvmfs(kLogS3Fanout, kLogDebug, "not found: %s, uploading",
1297 info->object_key.c_str());
1298 24928 info->request = JobInfo::kReqPutCas;
1299 24928 curl_slist_free_all(info->http_headers);
1300 24928 info->http_headers = NULL;
1301 24928 const s3fanout::Failures init_failure = InitializeRequest(
1302 info, info->curl_handle);
1303
1304
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 24928 times.
24928 if (init_failure != s3fanout::kFailOk) {
1305 PANIC(kLogStderr,
1306 "Failed to initialize CURL handle "
1307 "(error: %d - %s | errno: %d)",
1308 init_failure, Code2Ascii(init_failure), errno);
1309 }
1310 24928 SetUrlOptions(info);
1311 // Reset origin
1312 24928 info->origin->Rewind();
1313 24928 return true; // Again, Put
1314 }
1315
1316 // Determination if failed request should be repeated
1317 25338 bool try_again = false;
1318
2/2
✓ Branch 0 taken 246 times.
✓ Branch 1 taken 25092 times.
25338 if (info->error_code != kFailOk) {
1319 246 try_again = CanRetry(info);
1320 }
1321
2/2
✓ Branch 0 taken 164 times.
✓ Branch 1 taken 25174 times.
25338 if (try_again) {
1322
1/2
✓ Branch 0 taken 164 times.
✗ Branch 1 not taken.
164 if (info->request == JobInfo::kReqPutCas
1323
1/2
✓ Branch 0 taken 164 times.
✗ Branch 1 not taken.
164 || info->request == JobInfo::kReqPutDotCvmfs
1324
1/2
✓ Branch 0 taken 164 times.
✗ Branch 1 not taken.
164 || info->request == JobInfo::kReqPutHtml
1325
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 164 times.
164 || info->request == JobInfo::kReqDeleteMulti) {
1326 LogCvmfs(kLogS3Fanout, kLogDebug, "Trying again to upload %s",
1327 info->object_key.c_str());
1328 // Reset origin
1329 info->origin->Rewind();
1330 if (info->request == JobInfo::kReqDeleteMulti)
1331 info->response_body.clear();
1332 }
1333 164 Backoff(info);
1334 164 info->error_code = kFailOk;
1335 164 info->http_error = 0;
1336 164 info->throttle_ms = 0;
1337 164 info->backoff_ms = 0;
1338 164 info->throttle_timestamp = 0;
1339 164 return true; // try again
1340 }
1341
1342 // Cleanup opened resources
1343 25174 info->origin.Destroy();
1344
1345
3/4
✓ Branch 0 taken 82 times.
✓ Branch 1 taken 25092 times.
✓ Branch 2 taken 82 times.
✗ Branch 3 not taken.
25174 if ((info->error_code != kFailOk) && (info->http_error != 0)
1346
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 82 times.
82 && (info->http_error != 404)) {
1347 LogCvmfs(kLogS3Fanout, kLogStderr, "S3: HTTP failure %d", info->http_error);
1348 }
1349 25174 return false; // stop transfer
1350 }
1351
1352
2/4
✓ Branch 3 taken 492 times.
✗ Branch 4 not taken.
✓ Branch 9 taken 492 times.
✗ Branch 10 not taken.
492 S3FanoutManager::S3FanoutManager(const S3Config &config) : config_(config) {
1353 492 atomic_init32(&multi_threaded_);
1354
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 MakePipe(pipe_terminate_);
1355
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 MakePipe(pipe_jobs_);
1356
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 MakePipe(pipe_completed_);
1357
1358 int retval;
1359 492 jobs_todo_lock_ = reinterpret_cast<pthread_mutex_t *>(
1360 492 smalloc(sizeof(pthread_mutex_t)));
1361 492 retval = pthread_mutex_init(jobs_todo_lock_, NULL);
1362
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 492 times.
492 assert(retval == 0);
1363 492 curl_handle_lock_ = reinterpret_cast<pthread_mutex_t *>(
1364 492 smalloc(sizeof(pthread_mutex_t)));
1365 492 retval = pthread_mutex_init(curl_handle_lock_, NULL);
1366
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 492 times.
492 assert(retval == 0);
1367
1368
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 active_requests_ = new set<JobInfo *>;
1369
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 pool_handles_idle_ = new set<CURL *>;
1370
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 pool_handles_inuse_ = new set<CURL *>;
1371
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 curl_sharehandles_ = new map<CURL *, S3FanOutDnsEntry *>;
1372
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 sharehandles_ = new set<S3FanOutDnsEntry *>;
1373 492 watch_fds_max_ = 4 * config_.pool_max_handles;
1374 492 max_available_jobs_ = 4 * config_.pool_max_handles;
1375
2/4
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 492 times.
✗ Branch 5 not taken.
492 available_jobs_ = new Semaphore(max_available_jobs_);
1376
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 492 times.
492 assert(NULL != available_jobs_);
1377
1378
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 statistics_ = new Statistics();
1379
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 user_agent_ = new string();
1380
2/4
✓ Branch 2 taken 492 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 492 times.
✗ Branch 6 not taken.
492 *user_agent_ = "User-Agent: cvmfs " + string(CVMFS_VERSION);
1381
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 complete_hostname_ = MkCompleteHostname();
1382
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 dot_cvmfs_cache_control_header = MkDotCvmfsCacheControlHeader();
1383
1384
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 const CURLcode cretval = curl_global_init(CURL_GLOBAL_ALL);
1385
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 492 times.
492 assert(cretval == CURLE_OK);
1386
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 curl_multi_ = curl_multi_init();
1387
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 492 times.
492 assert(curl_multi_ != NULL);
1388 CURLMcode mretval;
1389
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION,
1390 CallbackCurlSocket);
1391
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 492 times.
492 assert(mretval == CURLM_OK);
1392
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1393 static_cast<void *>(this));
1394
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 492 times.
492 assert(mretval == CURLM_OK);
1395
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 mretval = curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1396 config_.pool_max_handles);
1397
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 492 times.
492 assert(mretval == CURLM_OK);
1398
1399 492 prng_.InitLocaltime();
1400
1401 492 thread_upload_ = 0;
1402 492 timestamp_last_throttle_report_ = 0;
1403 492 is_curl_debug_ = (getenv("_CVMFS_CURL_DEBUG") != NULL);
1404
1405 // Parsing environment variables
1406 492 if ((getenv("CVMFS_IPV4_ONLY") != NULL)
1407
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 492 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 492 times.
492 && (strlen(getenv("CVMFS_IPV4_ONLY")) > 0)) {
1408 opt_ipv4_only_ = true;
1409 } else {
1410 492 opt_ipv4_only_ = false;
1411 }
1412
1413
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 resolver_ = dns::CaresResolver::Create(opt_ipv4_only_, 2, 2000);
1414
1415 492 watch_fds_ = static_cast<struct pollfd *>(smalloc(4 * sizeof(struct pollfd)));
1416 492 watch_fds_size_ = 4;
1417 492 watch_fds_inuse_ = 0;
1418
1419
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 ssl_certificate_store_.UseSystemCertificatePath();
1420 492 }
1421
1422 984 S3FanoutManager::~S3FanoutManager() {
1423 492 pthread_mutex_destroy(jobs_todo_lock_);
1424 492 free(jobs_todo_lock_);
1425 492 pthread_mutex_destroy(curl_handle_lock_);
1426 492 free(curl_handle_lock_);
1427
1428
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1429 // Shutdown I/O thread
1430 492 char buf = 'T';
1431 492 WritePipe(pipe_terminate_[1], &buf, 1);
1432 492 pthread_join(thread_upload_, NULL);
1433 }
1434 492 ClosePipe(pipe_terminate_);
1435 492 ClosePipe(pipe_jobs_);
1436 492 ClosePipe(pipe_completed_);
1437
1438 492 set<CURL *>::iterator i = pool_handles_idle_->begin();
1439 492 const set<CURL *>::const_iterator iEnd = pool_handles_idle_->end();
1440
2/2
✓ Branch 2 taken 820 times.
✓ Branch 3 taken 492 times.
1312 for (; i != iEnd; ++i) {
1441 820 curl_easy_cleanup(*i);
1442 }
1443
1444 492 set<S3FanOutDnsEntry *>::iterator is = sharehandles_->begin();
1445 492 const set<S3FanOutDnsEntry *>::const_iterator isEnd = sharehandles_->end();
1446
2/2
✓ Branch 2 taken 410 times.
✓ Branch 3 taken 492 times.
902 for (; is != isEnd; ++is) {
1447 410 curl_share_cleanup((*is)->sharehandle);
1448 410 curl_slist_free_all((*is)->clist);
1449
1/2
✓ Branch 1 taken 410 times.
✗ Branch 2 not taken.
410 delete *is;
1450 }
1451 492 pool_handles_idle_->clear();
1452 492 curl_sharehandles_->clear();
1453 492 sharehandles_->clear();
1454
1/2
✓ Branch 0 taken 492 times.
✗ Branch 1 not taken.
492 delete active_requests_;
1455
1/2
✓ Branch 0 taken 492 times.
✗ Branch 1 not taken.
492 delete pool_handles_idle_;
1456
1/2
✓ Branch 0 taken 492 times.
✗ Branch 1 not taken.
492 delete pool_handles_inuse_;
1457
1/2
✓ Branch 0 taken 492 times.
✗ Branch 1 not taken.
492 delete curl_sharehandles_;
1458
1/2
✓ Branch 0 taken 492 times.
✗ Branch 1 not taken.
492 delete sharehandles_;
1459
1/2
✓ Branch 0 taken 492 times.
✗ Branch 1 not taken.
492 delete user_agent_;
1460 492 curl_multi_cleanup(curl_multi_);
1461
1462
1/2
✓ Branch 0 taken 492 times.
✗ Branch 1 not taken.
492 delete statistics_;
1463
1464
1/2
✓ Branch 0 taken 492 times.
✗ Branch 1 not taken.
492 delete available_jobs_;
1465
1466 492 curl_global_cleanup();
1467 492 }
1468
1469 /**
1470 * Spawns the I/O worker thread. No way back except ~S3FanoutManager.
1471 */
1472 492 void S3FanoutManager::Spawn() {
1473 492 LogCvmfs(kLogS3Fanout, kLogDebug, "S3FanoutManager spawned");
1474
1475 492 const int retval = pthread_create(&thread_upload_, NULL, MainUpload,
1476 static_cast<void *>(this));
1477
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 492 times.
492 assert(retval == 0);
1478
1479 492 atomic_inc32(&multi_threaded_);
1480 492 }
1481
1482 123 const Statistics &S3FanoutManager::GetStatistics() { return *statistics_; }
1483
1484 /**
1485 * Push new job to be uploaded to the S3 cloud storage.
1486 */
1487 25174 void S3FanoutManager::PushNewJob(JobInfo *info) {
1488 25174 available_jobs_->Increment();
1489 25174 WritePipe(pipe_jobs_[1], &info, sizeof(info));
1490 25174 }
1491
1492 /**
1493 * Push completed job to list of completed jobs
1494 */
1495 25666 void S3FanoutManager::PushCompletedJob(JobInfo *info) {
1496 25666 WritePipe(pipe_completed_[1], &info, sizeof(info));
1497 25666 }
1498
1499 /**
1500 * Pop completed job
1501 */
1502 25666 JobInfo *S3FanoutManager::PopCompletedJob() {
1503 JobInfo *info;
1504
1/2
✓ Branch 1 taken 25666 times.
✗ Branch 2 not taken.
25666 ReadPipe(pipe_completed_[0], &info, sizeof(info));
1505 25666 return info;
1506 }
1507
1508 //------------------------------------------------------------------------------
1509
1510
1511 string Statistics::Print() const {
1512 return "Transferred Bytes: " + StringifyInt(uint64_t(transferred_bytes))
1513 + "\n" + "Transfer duration: " + StringifyInt(uint64_t(transfer_time))
1514 + " s\n" + "Number of requests: " + StringifyInt(num_requests) + "\n"
1515 + "Number of retries: " + StringifyInt(num_retries) + "\n";
1516 }
1517
1518 } // namespace s3fanout
1519