GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/network/s3fanout.cc
Date: 2025-11-30 02:35:17
Exec Total Coverage
Lines: 573 816 70.2%
Branches: 412 1180 34.9%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 *
4 * Runs a thread using libcurls asynchronous I/O mode to push data to S3
5 */
6
7 #include "s3fanout.h"
8
9 #include <pthread.h>
10
11 #include <algorithm>
12 #include <cassert>
13 #include <cerrno>
14 #include <utility>
15
16 #include "upload_facility.h"
17 #include "util/concurrency.h"
18 #include "util/exception.h"
19 #include "util/platform.h"
20 #include "util/posix.h"
21 #include "util/string.h"
22
23 using namespace std; // NOLINT
24
25 namespace s3fanout {
26
27 const char *S3FanoutManager::kCacheControlCas = "Cache-Control: max-age=259200";
28 const unsigned S3FanoutManager::kDefault429ThrottleMs = 250;
29 const unsigned S3FanoutManager::kMax429ThrottleMs = 10000;
30 const unsigned S3FanoutManager::kThrottleReportIntervalSec = 10;
31 const unsigned S3FanoutManager::kDefaultHTTPPort = 80;
32 const unsigned S3FanoutManager::kDefaultHTTPSPort = 443;
33
34 180 std::string S3FanoutManager::MkDotCvmfsCacheControlHeader(unsigned defaultMaxAge, int overrideMaxAge)
35 {
36 const char *var;
37 int max_age_sec;
38 char *at_null_terminator_if_number;
39 180 bool value_determined = false;
40
41
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 180 times.
180 if (overrideMaxAge >= 0) {
42 max_age_sec = overrideMaxAge;
43 value_determined = true;
44 }
45
46
1/2
✓ Branch 0 taken 180 times.
✗ Branch 1 not taken.
180 if (!value_determined) {
47 180 var = getenv("CVMFS_MAX_TTL_SECS");
48
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
180 if (var && var[0]) {
49 max_age_sec = strtoll(var, &at_null_terminator_if_number, 10);
50 if (*at_null_terminator_if_number == '\0'
51 && max_age_sec >= 0) {
52 value_determined = true;
53 }
54 }
55 }
56
57
1/2
✓ Branch 0 taken 180 times.
✗ Branch 1 not taken.
180 if (!value_determined) {
58 180 var = getenv("CVMFS_MAX_TTL");
59
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
180 if (var && var[0]) {
60 max_age_sec = strtoll(var, &at_null_terminator_if_number, 10);
61 if (*at_null_terminator_if_number == '\0'
62 && max_age_sec >= 0) {
63 max_age_sec *= 60;
64 value_determined = true;
65 }
66 }
67 }
68
69
1/2
✓ Branch 0 taken 180 times.
✗ Branch 1 not taken.
180 if (!value_determined) {
70 180 max_age_sec = defaultMaxAge;
71 }
72
73
2/4
✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 180 times.
✗ Branch 5 not taken.
360 return "Cache-Control: max-age=" + std::to_string(max_age_sec);
74 }
75
76 /**
77 * Parses Retry-After and X-Retry-In headers attached to HTTP 429 responses
78 */
79 390 void S3FanoutManager::DetectThrottleIndicator(const std::string &header,
80 JobInfo *info) {
81 390 std::string value_str;
82
4/6
✓ Branch 2 taken 390 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 390 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 165 times.
✓ Branch 10 taken 225 times.
390 if (HasPrefix(header, "retry-after:", true))
83
1/2
✓ Branch 1 taken 165 times.
✗ Branch 2 not taken.
165 value_str = header.substr(12);
84
4/6
✓ Branch 2 taken 390 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 390 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 75 times.
✓ Branch 10 taken 315 times.
390 if (HasPrefix(header, "x-retry-in:", true))
85
1/2
✓ Branch 1 taken 75 times.
✗ Branch 2 not taken.
75 value_str = header.substr(11);
86
87
1/2
✓ Branch 1 taken 390 times.
✗ Branch 2 not taken.
390 value_str = Trim(value_str, true /* trim_newline */);
88
2/2
✓ Branch 1 taken 210 times.
✓ Branch 2 taken 180 times.
390 if (!value_str.empty()) {
89
1/2
✓ Branch 1 taken 210 times.
✗ Branch 2 not taken.
210 const unsigned value_numeric = String2Uint64(value_str);
90
2/4
✓ Branch 2 taken 210 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 210 times.
✗ Branch 6 not taken.
420 const unsigned value_ms = HasSuffix(value_str, "ms", true /* ignore_case */)
91
2/2
✓ Branch 0 taken 75 times.
✓ Branch 1 taken 135 times.
210 ? value_numeric
92 210 : (value_numeric * 1000);
93
2/2
✓ Branch 0 taken 195 times.
✓ Branch 1 taken 15 times.
210 if (value_ms > 0)
94 195 info->throttle_ms = std::min(value_ms, kMax429ThrottleMs);
95 }
96 390 }
97
98
99 /**
100 * Called by curl for every HTTP header. Not called for file:// transfers.
101 */
102 55230 static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb,
103 void *info_link) {
104 55230 const size_t num_bytes = size * nmemb;
105
1/2
✓ Branch 2 taken 55230 times.
✗ Branch 3 not taken.
55230 const string header_line(static_cast<const char *>(ptr), num_bytes);
106 55230 JobInfo *info = static_cast<JobInfo *>(info_link);
107
108 // Check for http status code errors
109
4/6
✓ Branch 2 taken 55230 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 55230 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 18390 times.
✓ Branch 10 taken 36840 times.
55230 if (HasPrefix(header_line, "HTTP/1.", false)) {
110
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 18390 times.
18390 if (header_line.length() < 10)
111 return 0;
112
113 unsigned i;
114
5/6
✓ Branch 1 taken 36780 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 18390 times.
✓ Branch 5 taken 18390 times.
✓ Branch 6 taken 18390 times.
✓ Branch 7 taken 18390 times.
36780 for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {
115 }
116
117
2/2
✓ Branch 1 taken 9180 times.
✓ Branch 2 taken 9210 times.
18390 if (header_line[i] == '2') {
118 9180 return num_bytes;
119 } else {
120
1/2
✓ Branch 2 taken 9210 times.
✗ Branch 3 not taken.
9210 LogCvmfs(kLogS3Fanout, kLogDebug, "http status error code [info %p]: %s",
121 info, header_line.c_str());
122
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9210 times.
9210 if (header_line.length() < i + 3) {
123 LogCvmfs(kLogS3Fanout, kLogStderr, "S3: invalid HTTP response '%s'",
124 header_line.c_str());
125 info->error_code = kFailOther;
126 return 0;
127 }
128
2/4
✓ Branch 3 taken 9210 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 9210 times.
✗ Branch 7 not taken.
9210 info->http_error = String2Int64(string(&header_line[i], 3));
129
130
2/7
✓ Branch 0 taken 60 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 9150 times.
✗ Branch 6 not taken.
9210 switch (info->http_error) {
131 60 case 429:
132 60 info->error_code = kFailRetry;
133 60 info->throttle_ms = S3FanoutManager::kDefault429ThrottleMs;
134 60 info->throttle_timestamp = platform_monotonic_time();
135 60 return num_bytes;
136 case 507: // Insufficient Storage
137 info->error_code = kFailInsufficientStorage;
138 break;
139 case 503:
140 case 502: // Can happen if the S3 gateway-backend connection breaks
141 case 500: // sometimes see this as a transient error from S3
142 info->error_code = kFailServiceUnavailable;
143 break;
144 case 501:
145 case 400:
146 info->error_code = kFailBadRequest;
147 break;
148 case 403:
149 info->error_code = kFailForbidden;
150 break;
151 9150 case 404:
152 9150 info->error_code = kFailNotFound;
153 9150 return num_bytes;
154 default:
155 info->error_code = kFailOther;
156 }
157 return 0;
158 }
159 }
160
161
2/2
✓ Branch 0 taken 180 times.
✓ Branch 1 taken 36660 times.
36840 if (info->error_code == kFailRetry) {
162
1/2
✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
180 S3FanoutManager::DetectThrottleIndicator(header_line, info);
163 }
164
165 36840 return num_bytes;
166 55230 }
167
168
169 /**
170 * Called by curl for every new chunk to upload.
171 */
172 235035 static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb,
173 void *info_link) {
174 235035 const size_t num_bytes = size * nmemb;
175 235035 JobInfo *info = static_cast<JobInfo *>(info_link);
176
177 235035 LogCvmfs(kLogS3Fanout, kLogDebug, "Data callback with %zu bytes", num_bytes);
178
179
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 235035 times.
235035 if (num_bytes == 0)
180 return 0;
181
182 235035 const uint64_t read_bytes = info->origin->Read(ptr, num_bytes);
183
184 235035 LogCvmfs(kLogS3Fanout, kLogDebug, "source buffer pushed out %lu bytes",
185 read_bytes);
186
187 235035 return read_bytes;
188 }
189
190
191 /**
192 * For the time being, ignore all received information in the HTTP body
193 */
194 static size_t CallbackCurlBody(char * /*ptr*/, size_t size, size_t nmemb,
195 void * /*userdata*/) {
196 return size * nmemb;
197 }
198
199
200 /**
201 * Called when new curl sockets arrive or existing curl sockets depart.
202 */
203 40785 int S3FanoutManager::CallbackCurlSocket(CURL *easy, curl_socket_t s, int action,
204 void *userp, void *socketp) {
205 40785 S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(userp);
206 40785 LogCvmfs(kLogS3Fanout, kLogDebug,
207 "CallbackCurlSocket called with easy "
208 "handle %p, socket %d, action %d, up %p, "
209 "sp %p, fds_inuse %d, jobs %d",
210 easy, s, action, userp, socketp, s3fanout_mgr->watch_fds_inuse_,
211 40785 s3fanout_mgr->available_jobs_->Get());
212
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 40785 times.
40785 if (action == CURL_POLL_NONE)
213 return 0;
214
215 // Find s in watch_fds_
216 // First 2 fds are job and terminate pipes (not curl related)
217 unsigned index;
218
2/2
✓ Branch 0 taken 64080 times.
✓ Branch 1 taken 18390 times.
82470 for (index = 2; index < s3fanout_mgr->watch_fds_inuse_; ++index) {
219
2/2
✓ Branch 0 taken 22395 times.
✓ Branch 1 taken 41685 times.
64080 if (s3fanout_mgr->watch_fds_[index].fd == s)
220 22395 break;
221 }
222 // Or create newly
223
2/2
✓ Branch 0 taken 18390 times.
✓ Branch 1 taken 22395 times.
40785 if (index == s3fanout_mgr->watch_fds_inuse_) {
224 // Extend array if necessary
225
2/2
✓ Branch 0 taken 30 times.
✓ Branch 1 taken 18360 times.
18390 if (s3fanout_mgr->watch_fds_inuse_ == s3fanout_mgr->watch_fds_size_) {
226 30 s3fanout_mgr->watch_fds_size_ *= 2;
227 30 s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
228 30 srealloc(s3fanout_mgr->watch_fds_,
229 30 s3fanout_mgr->watch_fds_size_ * sizeof(struct pollfd)));
230 }
231 18390 s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].fd = s;
232 18390 s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].events = 0;
233 18390 s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].revents = 0;
234 18390 s3fanout_mgr->watch_fds_inuse_++;
235 }
236
237
4/5
✓ Branch 0 taken 18390 times.
✓ Branch 1 taken 45 times.
✓ Branch 2 taken 3960 times.
✓ Branch 3 taken 18390 times.
✗ Branch 4 not taken.
40785 switch (action) {
238 18390 case CURL_POLL_IN:
239 18390 s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI;
240 18390 break;
241 45 case CURL_POLL_OUT:
242 45 s3fanout_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND;
243 45 break;
244 3960 case CURL_POLL_INOUT:
245 3960 s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI | POLLOUT
246 | POLLWRBAND;
247 3960 break;
248 18390 case CURL_POLL_REMOVE:
249
2/2
✓ Branch 0 taken 2910 times.
✓ Branch 1 taken 15480 times.
18390 if (index < s3fanout_mgr->watch_fds_inuse_ - 1)
250 s3fanout_mgr
251 2910 ->watch_fds_[index] = s3fanout_mgr->watch_fds_
252 2910 [s3fanout_mgr->watch_fds_inuse_ - 1];
253 18390 s3fanout_mgr->watch_fds_inuse_--;
254 // Shrink array if necessary
255
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18390 times.
18390 if ((s3fanout_mgr->watch_fds_inuse_ > s3fanout_mgr->watch_fds_max_)
256 && (s3fanout_mgr->watch_fds_inuse_
257 < s3fanout_mgr->watch_fds_size_ / 2)) {
258 s3fanout_mgr->watch_fds_size_ /= 2;
259 s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
260 srealloc(s3fanout_mgr->watch_fds_,
261 s3fanout_mgr->watch_fds_size_ * sizeof(struct pollfd)));
262 }
263 18390 break;
264 default:
265 PANIC(NULL);
266 }
267
268 40785 return 0;
269 }
270
271
272 /**
273 * Worker thread event loop.
274 */
275 180 void *S3FanoutManager::MainUpload(void *data) {
276
1/2
✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
180 LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread started");
277 180 S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(data);
278
279 180 s3fanout_mgr->InitPipeWatchFds();
280
281 // Don't schedule more jobs into the multi handle than the maximum number of
282 // parallel connections. This should prevent starvation and thus a timeout
283 // of the authorization header (CVM-1339).
284 180 unsigned jobs_in_flight = 0;
285
286 while (true) {
287 // Check events with 100ms timeout
288 250350 const int timeout_ms = 100;
289
1/2
✓ Branch 1 taken 250350 times.
✗ Branch 2 not taken.
250350 int retval = poll(s3fanout_mgr->watch_fds_, s3fanout_mgr->watch_fds_inuse_,
290 timeout_ms);
291
2/2
✓ Branch 0 taken 1395 times.
✓ Branch 1 taken 248955 times.
250350 if (retval == 0) {
292 // Handle timeout
293 1395 int still_running = 0;
294
1/2
✓ Branch 1 taken 1395 times.
✗ Branch 2 not taken.
1395 retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
295 CURL_SOCKET_TIMEOUT, 0, &still_running);
296
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1395 times.
1395 if (retval != CURLM_OK) {
297 LogCvmfs(kLogS3Fanout, kLogStderr, "Error, timeout due to: %d", retval);
298 assert(retval == CURLM_OK);
299 }
300
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 248955 times.
248955 } else if (retval < 0) {
301 assert(errno == EINTR);
302 continue;
303 }
304
305 // Terminate I/O thread
306
2/2
✓ Branch 0 taken 180 times.
✓ Branch 1 taken 250170 times.
250350 if (s3fanout_mgr->watch_fds_[0].revents)
307 180 break;
308
309 // New job incoming
310
2/2
✓ Branch 0 taken 9210 times.
✓ Branch 1 taken 240960 times.
250170 if (s3fanout_mgr->watch_fds_[1].revents) {
311 9210 s3fanout_mgr->watch_fds_[1].revents = 0;
312 JobInfo *info;
313
1/2
✓ Branch 1 taken 9210 times.
✗ Branch 2 not taken.
9210 ReadPipe(s3fanout_mgr->pipe_jobs_[0], &info, sizeof(info));
314
1/2
✓ Branch 1 taken 9210 times.
✗ Branch 2 not taken.
9210 CURL *handle = s3fanout_mgr->AcquireCurlHandle();
315
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9210 times.
9210 if (handle == NULL) {
316 PANIC(kLogStderr, "Failed to acquire CURL handle.");
317 }
318
1/2
✓ Branch 1 taken 9210 times.
✗ Branch 2 not taken.
9210 const s3fanout::Failures init_failure = s3fanout_mgr->InitializeRequest(
319 info, handle);
320
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9210 times.
9210 if (init_failure != s3fanout::kFailOk) {
321 PANIC(kLogStderr,
322 "Failed to initialize CURL handle (error: %d - %s | errno: %d)",
323 init_failure, Code2Ascii(init_failure), errno);
324 }
325
1/2
✓ Branch 1 taken 9210 times.
✗ Branch 2 not taken.
9210 s3fanout_mgr->SetUrlOptions(info);
326
327
1/2
✓ Branch 1 taken 9210 times.
✗ Branch 2 not taken.
9210 curl_multi_add_handle(s3fanout_mgr->curl_multi_, handle);
328
1/2
✓ Branch 1 taken 9210 times.
✗ Branch 2 not taken.
9210 s3fanout_mgr->active_requests_->insert(info);
329 9210 jobs_in_flight++;
330 9210 int still_running = 0, retval = 0;
331
1/2
✓ Branch 1 taken 9210 times.
✗ Branch 2 not taken.
9210 retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
332 CURL_SOCKET_TIMEOUT, 0, &still_running);
333
334
1/2
✓ Branch 1 taken 9210 times.
✗ Branch 2 not taken.
9210 LogCvmfs(kLogS3Fanout, kLogDebug, "curl_multi_socket_action: %d - %d",
335 retval, still_running);
336 }
337
338
339 // Activity on curl sockets
340 // Within this loop the curl_multi_socket_action() may cause socket(s)
341 // to be removed from watch_fds_. If a socket is removed it is replaced
342 // by the socket at the end of the array and the inuse count is decreased.
343 // Therefore loop over the array in reverse order.
344 // First 2 fds are job and terminate pipes (not curl related)
345
2/2
✓ Branch 0 taken 562110 times.
✓ Branch 1 taken 250170 times.
812280 for (int32_t i = s3fanout_mgr->watch_fds_inuse_ - 1; i >= 2; --i) {
346
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 562110 times.
562110 if (static_cast<uint32_t>(i) >= s3fanout_mgr->watch_fds_inuse_) {
347 continue;
348 }
349
2/2
✓ Branch 0 taken 255990 times.
✓ Branch 1 taken 306120 times.
562110 if (s3fanout_mgr->watch_fds_[i].revents) {
350 255990 int ev_bitmask = 0;
351
2/2
✓ Branch 0 taken 27510 times.
✓ Branch 1 taken 228480 times.
255990 if (s3fanout_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI))
352 27510 ev_bitmask |= CURL_CSELECT_IN;
353
2/2
✓ Branch 0 taken 228480 times.
✓ Branch 1 taken 27510 times.
255990 if (s3fanout_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
354 228480 ev_bitmask |= CURL_CSELECT_OUT;
355 255990 if (s3fanout_mgr->watch_fds_[i].revents
356
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 255990 times.
255990 & (POLLERR | POLLHUP | POLLNVAL))
357 ev_bitmask |= CURL_CSELECT_ERR;
358 255990 s3fanout_mgr->watch_fds_[i].revents = 0;
359
360 255990 int still_running = 0;
361 255990 retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
362
1/2
✓ Branch 1 taken 255990 times.
✗ Branch 2 not taken.
255990 s3fanout_mgr->watch_fds_[i].fd,
363 ev_bitmask,
364 &still_running);
365 }
366 }
367
368 // Check if transfers are completed
369 CURLMsg *curl_msg;
370 int msgs_in_queue;
371
3/4
✓ Branch 1 taken 268560 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 18390 times.
✓ Branch 4 taken 250170 times.
268560 while ((curl_msg = curl_multi_info_read(s3fanout_mgr->curl_multi_,
372 &msgs_in_queue))) {
373
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18390 times.
18390 assert(curl_msg->msg == CURLMSG_DONE);
374
375 18390 s3fanout_mgr->statistics_->num_requests++;
376 JobInfo *info;
377 18390 CURL *easy_handle = curl_msg->easy_handle;
378 18390 const int curl_error = curl_msg->data.result;
379
1/2
✓ Branch 1 taken 18390 times.
✗ Branch 2 not taken.
18390 curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
380
381
1/2
✓ Branch 1 taken 18390 times.
✗ Branch 2 not taken.
18390 curl_multi_remove_handle(s3fanout_mgr->curl_multi_, easy_handle);
382
3/4
✓ Branch 1 taken 18390 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 9180 times.
✓ Branch 4 taken 9210 times.
18390 if (s3fanout_mgr->VerifyAndFinalize(curl_error, info)) {
383
1/2
✓ Branch 1 taken 9180 times.
✗ Branch 2 not taken.
9180 curl_multi_add_handle(s3fanout_mgr->curl_multi_, easy_handle);
384 9180 int still_running = 0;
385
1/2
✓ Branch 1 taken 9180 times.
✗ Branch 2 not taken.
9180 curl_multi_socket_action(s3fanout_mgr->curl_multi_, CURL_SOCKET_TIMEOUT,
386 0, &still_running);
387 } else {
388 // Return easy handle into pool and write result back
389 9210 jobs_in_flight--;
390
1/2
✓ Branch 1 taken 9210 times.
✗ Branch 2 not taken.
9210 s3fanout_mgr->active_requests_->erase(info);
391
1/2
✓ Branch 1 taken 9210 times.
✗ Branch 2 not taken.
9210 s3fanout_mgr->ReleaseCurlHandle(info, easy_handle);
392
1/2
✓ Branch 1 taken 9210 times.
✗ Branch 2 not taken.
9210 s3fanout_mgr->available_jobs_->Decrement();
393
394 // Add to list of completed jobs
395
1/2
✓ Branch 1 taken 9210 times.
✗ Branch 2 not taken.
9210 s3fanout_mgr->PushCompletedJob(info);
396 }
397 }
398 250170 }
399
400 180 set<CURL *>::iterator i = s3fanout_mgr->pool_handles_inuse_->begin();
401 180 const set<CURL *>::const_iterator i_end = s3fanout_mgr->pool_handles_inuse_
402 180 ->end();
403
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 180 times.
180 for (; i != i_end; ++i) {
404 curl_multi_remove_handle(s3fanout_mgr->curl_multi_, *i);
405 curl_easy_cleanup(*i);
406 }
407 180 s3fanout_mgr->pool_handles_inuse_->clear();
408 180 free(s3fanout_mgr->watch_fds_);
409
410
1/2
✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
180 LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread terminated");
411 180 return NULL;
412 }
413
414
415 /**
416 * Gets an idle CURL handle from the pool. Creates a new one and adds it to
417 * the pool if necessary.
418 */
419 9210 CURL *S3FanoutManager::AcquireCurlHandle() const {
420 CURL *handle;
421
422 9210 const MutexLockGuard guard(curl_handle_lock_);
423
424
2/2
✓ Branch 1 taken 735 times.
✓ Branch 2 taken 8475 times.
9210 if (pool_handles_idle_->empty()) {
425 CURLcode retval;
426
427 // Create a new handle
428
1/2
✓ Branch 1 taken 735 times.
✗ Branch 2 not taken.
735 handle = curl_easy_init();
429
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 735 times.
735 assert(handle != NULL);
430
431 // Other settings
432
1/2
✓ Branch 1 taken 735 times.
✗ Branch 2 not taken.
735 retval = curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
433
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 735 times.
735 assert(retval == CURLE_OK);
434
1/2
✓ Branch 1 taken 735 times.
✗ Branch 2 not taken.
735 retval = curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION,
435 CallbackCurlHeader);
436
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 735 times.
735 assert(retval == CURLE_OK);
437
1/2
✓ Branch 1 taken 735 times.
✗ Branch 2 not taken.
735 retval = curl_easy_setopt(handle, CURLOPT_READFUNCTION, CallbackCurlData);
438
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 735 times.
735 assert(retval == CURLE_OK);
439
1/2
✓ Branch 1 taken 735 times.
✗ Branch 2 not taken.
735 retval = curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlBody);
440
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 735 times.
735 assert(retval == CURLE_OK);
441 } else {
442 8475 handle = *(pool_handles_idle_->begin());
443
1/2
✓ Branch 2 taken 8475 times.
✗ Branch 3 not taken.
8475 pool_handles_idle_->erase(pool_handles_idle_->begin());
444 }
445
446
1/2
✓ Branch 1 taken 9210 times.
✗ Branch 2 not taken.
9210 pool_handles_inuse_->insert(handle);
447
448 9210 return handle;
449 9210 }
450
451
452 9210 void S3FanoutManager::ReleaseCurlHandle(JobInfo *info, CURL *handle) const {
453
1/2
✓ Branch 0 taken 9210 times.
✗ Branch 1 not taken.
9210 if (info->http_headers) {
454
1/2
✓ Branch 1 taken 9210 times.
✗ Branch 2 not taken.
9210 curl_slist_free_all(info->http_headers);
455 9210 info->http_headers = NULL;
456 }
457
458 9210 const MutexLockGuard guard(curl_handle_lock_);
459
460
1/2
✓ Branch 1 taken 9210 times.
✗ Branch 2 not taken.
9210 const set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
461
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 9210 times.
9210 assert(elem != pool_handles_inuse_->end());
462
463
2/2
✓ Branch 1 taken 435 times.
✓ Branch 2 taken 8775 times.
9210 if (pool_handles_idle_->size() > config_.pool_max_handles) {
464
1/2
✓ Branch 1 taken 435 times.
✗ Branch 2 not taken.
435 const CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, NULL);
465
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 435 times.
435 assert(retval == CURLE_OK);
466
1/2
✓ Branch 1 taken 435 times.
✗ Branch 2 not taken.
435 curl_easy_cleanup(handle);
467 const std::map<CURL *, S3FanOutDnsEntry *>::size_type
468
1/2
✓ Branch 1 taken 435 times.
✗ Branch 2 not taken.
435 retitems = curl_sharehandles_->erase(handle);
469
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 435 times.
435 assert(retitems == 1);
470 } else {
471
1/2
✓ Branch 1 taken 8775 times.
✗ Branch 2 not taken.
8775 pool_handles_idle_->insert(handle);
472 }
473
474
1/2
✓ Branch 1 taken 9210 times.
✗ Branch 2 not taken.
9210 pool_handles_inuse_->erase(elem);
475 9210 }
476
477 180 void S3FanoutManager::InitPipeWatchFds() {
478
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 180 times.
180 assert(watch_fds_inuse_ == 0);
479
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 180 times.
180 assert(watch_fds_size_ >= 2);
480 180 watch_fds_[0].fd = pipe_terminate_[0];
481 180 watch_fds_[0].events = POLLIN | POLLPRI;
482 180 watch_fds_[0].revents = 0;
483 180 ++watch_fds_inuse_;
484 180 watch_fds_[1].fd = pipe_jobs_[0];
485 180 watch_fds_[1].events = POLLIN | POLLPRI;
486 180 watch_fds_[1].revents = 0;
487 180 ++watch_fds_inuse_;
488 180 }
489
490 /**
491 * The Amazon AWS 2 authorization header according to
492 * http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html#ConstructingTheAuthenticationHeader
493 */
494 18330 bool S3FanoutManager::MkV2Authz(const JobInfo &info,
495 vector<string> *headers) const {
496 18330 string payload_hash;
497
1/2
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
18330 const bool retval = MkPayloadHash(info, &payload_hash);
498
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18330 times.
18330 if (!retval)
499 return false;
500
1/2
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
18330 const string content_type = GetContentType(info);
501
1/2
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
18330 const string request = GetRequestString(info);
502
503
1/2
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
18330 const string timestamp = RfcTimestamp();
504
5/10
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 18330 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 18330 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 18330 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 18330 times.
✗ Branch 14 not taken.
36660 string to_sign = request + "\n" + payload_hash + "\n" + content_type + "\n"
505
2/4
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 18330 times.
✗ Branch 5 not taken.
36660 + timestamp + "\n";
506
2/4
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 18330 times.
✗ Branch 4 not taken.
18330 if (config_.x_amz_acl != "") {
507
2/4
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 18330 times.
✗ Branch 5 not taken.
36660 to_sign += "x-amz-acl:" + config_.x_amz_acl + "\n" + // default ACL
508
5/10
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 18330 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 18330 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 18330 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 18330 times.
✗ Branch 14 not taken.
36660 "/" + config_.bucket + "/" + info.object_key;
509 }
510
1/2
✓ Branch 3 taken 18330 times.
✗ Branch 4 not taken.
18330 LogCvmfs(kLogS3Fanout, kLogDebug, "%s string to sign for: %s",
511 request.c_str(), info.object_key.c_str());
512
513
1/2
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
18330 shash::Any hmac;
514 18330 hmac.algorithm = shash::kSha1;
515
1/2
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
18330 shash::Hmac(config_.secret_key,
516 18330 reinterpret_cast<const unsigned char *>(to_sign.data()),
517 18330 to_sign.length(), &hmac);
518
519
3/6
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 18330 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 18330 times.
✗ Branch 8 not taken.
54990 headers->push_back("Authorization: AWS " + config_.access_key + ":"
520
3/6
✓ Branch 2 taken 18330 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 18330 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 18330 times.
✗ Branch 9 not taken.
91650 + Base64(string(reinterpret_cast<char *>(hmac.digest),
521 18330 hmac.GetDigestSize())));
522
2/4
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 18330 times.
✗ Branch 5 not taken.
18330 headers->push_back("Date: " + timestamp);
523
2/4
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 18330 times.
✗ Branch 5 not taken.
18330 headers->push_back("X-Amz-Acl: " + config_.x_amz_acl);
524
2/2
✓ Branch 1 taken 9120 times.
✓ Branch 2 taken 9210 times.
18330 if (!payload_hash.empty())
525
2/4
✓ Branch 1 taken 9120 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 9120 times.
✗ Branch 5 not taken.
9120 headers->push_back("Content-MD5: " + payload_hash);
526
2/2
✓ Branch 1 taken 9120 times.
✓ Branch 2 taken 9210 times.
18330 if (!content_type.empty())
527
2/4
✓ Branch 1 taken 9120 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 9120 times.
✗ Branch 5 not taken.
9120 headers->push_back("Content-Type: " + content_type);
528 18330 return true;
529 18330 }
530
531
532 string S3FanoutManager::GetUriEncode(const string &val,
533 bool encode_slash) const {
534 string result;
535 const unsigned len = val.length();
536 result.reserve(len);
537 for (unsigned i = 0; i < len; ++i) {
538 const char c = val[i];
539 if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')
540 || (c >= '0' && c <= '9') || c == '_' || c == '-' || c == '~'
541 || c == '.') {
542 result.push_back(c);
543 } else if (c == '/') {
544 if (encode_slash) {
545 result += "%2F";
546 } else {
547 result.push_back(c);
548 }
549 } else {
550 result.push_back('%');
551 result.push_back((c / 16) + ((c / 16 <= 9) ? '0' : 'A' - 10));
552 result.push_back((c % 16) + ((c % 16 <= 9) ? '0' : 'A' - 10));
553 }
554 }
555 return result;
556 }
557
558
559 string S3FanoutManager::GetAwsV4SigningKey(const string &date) const {
560 if (last_signing_key_.first == date)
561 return last_signing_key_.second;
562
563 const string date_key = shash::Hmac256("AWS4" + config_.secret_key, date,
564 true);
565 const string date_region_key = shash::Hmac256(date_key, config_.region, true);
566 const string date_region_service_key = shash::Hmac256(date_region_key, "s3",
567 true);
568 string signing_key = shash::Hmac256(date_region_service_key, "aws4_request",
569 true);
570 last_signing_key_.first = date;
571 last_signing_key_.second = signing_key;
572 return signing_key;
573 }
574
575
576 /**
577 * The Amazon AWS4 authorization header according to
578 * http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-auth-using-authorization-header.html
579 */
580 bool S3FanoutManager::MkV4Authz(const JobInfo &info,
581 vector<string> *headers) const {
582 string payload_hash;
583 const bool retval = MkPayloadHash(info, &payload_hash);
584 if (!retval)
585 return false;
586 const string content_type = GetContentType(info);
587 const string timestamp = IsoTimestamp();
588 const string date = timestamp.substr(0, 8);
589 vector<string> tokens = SplitString(complete_hostname_, ':');
590 assert(tokens.size() <= 2);
591 string canonical_hostname = tokens[0];
592
593 // if we could split the hostname in two and if the port is *NOT* a default
594 // one
595 if (tokens.size() == 2
596 && !((String2Uint64(tokens[1]) == kDefaultHTTPPort)
597 || (String2Uint64(tokens[1]) == kDefaultHTTPSPort)))
598 canonical_hostname += ":" + tokens[1];
599
600 string signed_headers;
601 string canonical_headers;
602 if (!content_type.empty()) {
603 signed_headers += "content-type;";
604 headers->push_back("Content-Type: " + content_type);
605 canonical_headers += "content-type:" + content_type + "\n";
606 }
607 if (config_.x_amz_acl != "") {
608 signed_headers += "host;x-amz-acl;x-amz-content-sha256;x-amz-date";
609 } else {
610 signed_headers += "host;x-amz-content-sha256;x-amz-date";
611 }
612 canonical_headers += "host:" + canonical_hostname + "\n";
613 if (config_.x_amz_acl != "") {
614 canonical_headers += "x-amz-acl:" + config_.x_amz_acl + "\n";
615 }
616 canonical_headers += "x-amz-content-sha256:" + payload_hash + "\n"
617 + "x-amz-date:" + timestamp + "\n";
618
619 const string scope = date + "/" + config_.region + "/s3/aws4_request";
620 const string uri = config_.dns_buckets ? (string("/") + info.object_key)
621 : (string("/") + config_.bucket + "/"
622 + info.object_key);
623
624 const string canonical_request = GetRequestString(info) + "\n"
625 + GetUriEncode(uri, false) + "\n" + "\n"
626 + canonical_headers + "\n" + signed_headers
627 + "\n" + payload_hash;
628
629 const string hash_request = shash::Sha256String(canonical_request.c_str());
630
631 const string string_to_sign = "AWS4-HMAC-SHA256\n" + timestamp + "\n" + scope
632 + "\n" + hash_request;
633
634 const string signing_key = GetAwsV4SigningKey(date);
635 const string signature = shash::Hmac256(signing_key, string_to_sign);
636
637 headers->push_back("X-Amz-Acl: " + config_.x_amz_acl);
638 headers->push_back("X-Amz-Content-Sha256: " + payload_hash);
639 headers->push_back("X-Amz-Date: " + timestamp);
640 headers->push_back("Authorization: AWS4-HMAC-SHA256 "
641 "Credential="
642 + config_.access_key + "/" + scope
643 + ","
644 "SignedHeaders="
645 + signed_headers
646 + ","
647 "Signature="
648 + signature);
649 return true;
650 }
651
652 /**
653 * The Azure Blob authorization header according to
654 * https://docs.microsoft.com/en-us/rest/api/storageservices/authorize-with-shared-key
655 */
656 bool S3FanoutManager::MkAzureAuthz(const JobInfo &info,
657 vector<string> *headers) const {
658 const string timestamp = RfcTimestamp();
659 const string canonical_headers = "x-ms-blob-type:BlockBlob\nx-ms-date:"
660 + timestamp + "\nx-ms-version:2011-08-18";
661 const string canonical_resource = "/" + config_.access_key + "/"
662 + config_.bucket + "/" + info.object_key;
663
664 string string_to_sign;
665 if ((info.request == JobInfo::kReqHeadOnly)
666 || (info.request == JobInfo::kReqHeadPut)
667 || (info.request == JobInfo::kReqDelete)) {
668 string_to_sign = GetRequestString(info) + string("\n\n\n")
669 + "\n\n\n\n\n\n\n\n\n" + canonical_headers + "\n"
670 + canonical_resource;
671 } else {
672 string_to_sign = GetRequestString(info) + string("\n\n\n")
673 + string(StringifyInt(info.origin->GetSize()))
674 + "\n\n\n\n\n\n\n\n\n" + canonical_headers + "\n"
675 + canonical_resource;
676 }
677
678 string signing_key;
679 const int retval = Debase64(config_.secret_key, &signing_key);
680 if (!retval)
681 return false;
682
683 const string signature = shash::Hmac256(signing_key, string_to_sign, true);
684
685 headers->push_back("x-ms-date: " + timestamp);
686 headers->push_back("x-ms-version: 2011-08-18");
687 headers->push_back("Authorization: SharedKey " + config_.access_key + ":"
688 + Base64(signature));
689 headers->push_back("x-ms-blob-type: BlockBlob");
690 return true;
691 }
692
693 18330 void S3FanoutManager::InitializeDnsSettingsCurl(CURL *handle,
694 CURLSH *sharehandle,
695 curl_slist *clist) const {
696 18330 CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, sharehandle);
697
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18330 times.
18330 assert(retval == CURLE_OK);
698 18330 retval = curl_easy_setopt(handle, CURLOPT_RESOLVE, clist);
699
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18330 times.
18330 assert(retval == CURLE_OK);
700 18330 }
701
702
703 18330 int S3FanoutManager::InitializeDnsSettings(CURL *handle,
704 std::string host_with_port) const {
705 // Use existing handle
706 const std::map<CURL *, S3FanOutDnsEntry *>::const_iterator
707
1/2
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
18330 it = curl_sharehandles_->find(handle);
708
2/2
✓ Branch 3 taken 17595 times.
✓ Branch 4 taken 735 times.
18330 if (it != curl_sharehandles_->end()) {
709
1/2
✓ Branch 1 taken 17595 times.
✗ Branch 2 not taken.
17595 InitializeDnsSettingsCurl(handle, it->second->sharehandle,
710 17595 it->second->clist);
711 17595 return 0;
712 }
713
714 // Add protocol information for extraction of fields for DNS
715
2/4
✓ Branch 1 taken 735 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 735 times.
✗ Branch 4 not taken.
735 if (!IsHttpUrl(host_with_port))
716
2/4
✓ Branch 1 taken 735 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 735 times.
✗ Branch 5 not taken.
735 host_with_port = config_.protocol + "://" + host_with_port;
717
1/2
✓ Branch 1 taken 735 times.
✗ Branch 2 not taken.
735 const std::string remote_host = dns::ExtractHost(host_with_port);
718
1/2
✓ Branch 1 taken 735 times.
✗ Branch 2 not taken.
735 const std::string remote_port = dns::ExtractPort(host_with_port);
719
720 // If we have the name already resolved, use the least used IP
721 735 S3FanOutDnsEntry *useme = NULL;
722 735 unsigned int usemin = UINT_MAX;
723 735 std::set<S3FanOutDnsEntry *>::iterator its3 = sharehandles_->begin();
724
2/2
✓ Branch 3 taken 585 times.
✓ Branch 4 taken 735 times.
1320 for (; its3 != sharehandles_->end(); ++its3) {
725
1/2
✓ Branch 2 taken 585 times.
✗ Branch 3 not taken.
585 if ((*its3)->dns_name == remote_host) {
726
1/2
✓ Branch 1 taken 585 times.
✗ Branch 2 not taken.
585 if (usemin >= (*its3)->counter) {
727 585 usemin = (*its3)->counter;
728 585 useme = (*its3);
729 }
730 }
731 }
732
2/2
✓ Branch 0 taken 585 times.
✓ Branch 1 taken 150 times.
735 if (useme != NULL) {
733
1/2
✓ Branch 1 taken 585 times.
✗ Branch 2 not taken.
585 curl_sharehandles_->insert(
734 585 std::pair<CURL *, S3FanOutDnsEntry *>(handle, useme));
735 585 useme->counter++;
736
1/2
✓ Branch 1 taken 585 times.
✗ Branch 2 not taken.
585 InitializeDnsSettingsCurl(handle, useme->sharehandle, useme->clist);
737 585 return 0;
738 }
739
740 // We need to resolve the hostname
741 // TODO(ssheikki): support ipv6 also... if (opt_ipv4_only_)
742
1/2
✓ Branch 1 taken 150 times.
✗ Branch 2 not taken.
150 const dns::Host host = resolver_->Resolve(remote_host);
743
1/2
✓ Branch 2 taken 150 times.
✗ Branch 3 not taken.
150 set<string> ipv4_addresses = host.ipv4_addresses();
744 150 std::set<string>::iterator its = ipv4_addresses.begin();
745 150 S3FanOutDnsEntry *dnse = NULL;
746
2/2
✓ Branch 3 taken 150 times.
✓ Branch 4 taken 150 times.
300 for (; its != ipv4_addresses.end(); ++its) {
747
2/4
✓ Branch 1 taken 150 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 150 times.
✗ Branch 5 not taken.
150 dnse = new S3FanOutDnsEntry();
748 150 dnse->counter = 0;
749
1/2
✓ Branch 1 taken 150 times.
✗ Branch 2 not taken.
150 dnse->dns_name = remote_host;
750
4/12
✗ Branch 1 not taken.
✓ Branch 2 taken 150 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 8 taken 150 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 150 times.
✗ Branch 12 not taken.
✗ Branch 14 not taken.
✓ Branch 15 taken 150 times.
✗ Branch 18 not taken.
✗ Branch 19 not taken.
150 dnse->port = remote_port.size() == 0 ? "80" : remote_port;
751
1/2
✓ Branch 2 taken 150 times.
✗ Branch 3 not taken.
150 dnse->ip = *its;
752 150 dnse->clist = NULL;
753
1/2
✓ Branch 2 taken 150 times.
✗ Branch 3 not taken.
150 dnse->clist = curl_slist_append(
754 dnse->clist,
755
4/8
✓ Branch 1 taken 150 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 150 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 150 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 150 times.
✗ Branch 11 not taken.
300 (dnse->dns_name + ":" + dnse->port + ":" + dnse->ip).c_str());
756
1/2
✓ Branch 1 taken 150 times.
✗ Branch 2 not taken.
150 dnse->sharehandle = curl_share_init();
757
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 150 times.
150 assert(dnse->sharehandle != NULL);
758
1/2
✓ Branch 1 taken 150 times.
✗ Branch 2 not taken.
150 const CURLSHcode share_retval = curl_share_setopt(
759 dnse->sharehandle, CURLSHOPT_SHARE, CURL_LOCK_DATA_DNS);
760
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 150 times.
150 assert(share_retval == CURLSHE_OK);
761
1/2
✓ Branch 1 taken 150 times.
✗ Branch 2 not taken.
150 sharehandles_->insert(dnse);
762 }
763
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 150 times.
150 if (dnse == NULL) {
764 LogCvmfs(kLogS3Fanout, kLogStderr | kLogSyslogErr,
765 "Error: DNS resolve failed for address '%s'.",
766 remote_host.c_str());
767 assert(dnse != NULL);
768 return -1;
769 }
770
1/2
✓ Branch 1 taken 150 times.
✗ Branch 2 not taken.
150 curl_sharehandles_->insert(
771 150 std::pair<CURL *, S3FanOutDnsEntry *>(handle, dnse));
772 150 dnse->counter++;
773
1/2
✓ Branch 1 taken 150 times.
✗ Branch 2 not taken.
150 InitializeDnsSettingsCurl(handle, dnse->sharehandle, dnse->clist);
774
775 150 return 0;
776 735 }
777
778
779 18330 bool S3FanoutManager::MkPayloadHash(const JobInfo &info,
780 string *hex_hash) const {
781
2/2
✓ Branch 0 taken 18255 times.
✓ Branch 1 taken 75 times.
18330 if ((info.request == JobInfo::kReqHeadOnly)
782
2/2
✓ Branch 0 taken 9135 times.
✓ Branch 1 taken 9120 times.
18255 || (info.request == JobInfo::kReqHeadPut)
783
2/2
✓ Branch 0 taken 15 times.
✓ Branch 1 taken 9120 times.
9135 || (info.request == JobInfo::kReqDelete)) {
784
1/4
✓ Branch 0 taken 9210 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
9210 switch (config_.authz_method) {
785 9210 case kAuthzAwsV2:
786 9210 hex_hash->clear();
787 9210 break;
788 case kAuthzAwsV4:
789 // Sha256 over empty string
790 *hex_hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b78"
791 "52b855";
792 break;
793 case kAuthzAzure:
794 // no payload hash required for Azure signature
795 hex_hash->clear();
796 break;
797 default:
798 PANIC(NULL);
799 }
800 9210 return true;
801 }
802
803 // PUT, there is actually payload
804
1/2
✓ Branch 1 taken 9120 times.
✗ Branch 2 not taken.
9120 shash::Any payload_hash(shash::kMd5);
805
806 unsigned char *data;
807 9120 const unsigned int nbytes = info.origin->Data(
808
2/4
✓ Branch 2 taken 9120 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 9120 times.
✗ Branch 6 not taken.
9120 reinterpret_cast<void **>(&data), info.origin->GetSize(), 0);
809
2/4
✓ Branch 2 taken 9120 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 9120 times.
9120 assert(nbytes == info.origin->GetSize());
810
811
1/4
✓ Branch 0 taken 9120 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
9120 switch (config_.authz_method) {
812 9120 case kAuthzAwsV2:
813
1/2
✓ Branch 1 taken 9120 times.
✗ Branch 2 not taken.
9120 shash::HashMem(data, nbytes, &payload_hash);
814
2/4
✓ Branch 2 taken 9120 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 9120 times.
✗ Branch 6 not taken.
27360 *hex_hash = Base64(string(reinterpret_cast<char *>(payload_hash.digest),
815 18240 payload_hash.GetDigestSize()));
816 9120 return true;
817 case kAuthzAwsV4:
818 *hex_hash = shash::Sha256Mem(data, nbytes);
819 return true;
820 case kAuthzAzure:
821 // no payload hash required for Azure signature
822 hex_hash->clear();
823 return true;
824 default:
825 PANIC(NULL);
826 }
827 }
828
829 18345 string S3FanoutManager::GetRequestString(const JobInfo &info) const {
830
3/4
✓ Branch 0 taken 9195 times.
✓ Branch 1 taken 9120 times.
✓ Branch 2 taken 30 times.
✗ Branch 3 not taken.
18345 switch (info.request) {
831 9195 case JobInfo::kReqHeadOnly:
832 case JobInfo::kReqHeadPut:
833
1/2
✓ Branch 2 taken 9195 times.
✗ Branch 3 not taken.
9195 return "HEAD";
834 9120 case JobInfo::kReqPutCas:
835 case JobInfo::kReqPutDotCvmfs:
836 case JobInfo::kReqPutHtml:
837 case JobInfo::kReqPutBucket:
838
1/2
✓ Branch 2 taken 9120 times.
✗ Branch 3 not taken.
9120 return "PUT";
839 30 case JobInfo::kReqDelete:
840
1/2
✓ Branch 2 taken 30 times.
✗ Branch 3 not taken.
30 return "DELETE";
841 default:
842 PANIC(NULL);
843 }
844 }
845
846
847 18330 string S3FanoutManager::GetContentType(const JobInfo &info) const {
848
2/6
✓ Branch 0 taken 9210 times.
✓ Branch 1 taken 9120 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
18330 switch (info.request) {
849 9210 case JobInfo::kReqHeadOnly:
850 case JobInfo::kReqHeadPut:
851 case JobInfo::kReqDelete:
852
1/2
✓ Branch 2 taken 9210 times.
✗ Branch 3 not taken.
9210 return "";
853 9120 case JobInfo::kReqPutCas:
854
1/2
✓ Branch 2 taken 9120 times.
✗ Branch 3 not taken.
9120 return "application/octet-stream";
855 case JobInfo::kReqPutDotCvmfs:
856 return "application/x-cvmfs";
857 case JobInfo::kReqPutHtml:
858 return "text/html";
859 case JobInfo::kReqPutBucket:
860 return "text/xml";
861 default:
862 PANIC(NULL);
863 }
864 }
865
866
867 /**
868 * Request parameters set the URL and other options such as timeout and
869 * proxy.
870 */
871 18330 Failures S3FanoutManager::InitializeRequest(JobInfo *info, CURL *handle) const {
872 // Initialize internal download state
873 18330 info->curl_handle = handle;
874 18330 info->error_code = kFailOk;
875 18330 info->http_error = 0;
876 18330 info->num_retries = 0;
877 18330 info->backoff_ms = 0;
878 18330 info->throttle_ms = 0;
879 18330 info->throttle_timestamp = 0;
880 18330 info->http_headers = NULL;
881 // info->payload_size is needed in S3Uploader::MainCollectResults,
882 // where info->origin is already destroyed.
883
1/2
✓ Branch 2 taken 18330 times.
✗ Branch 3 not taken.
18330 info->payload_size = info->origin->GetSize();
884
885
2/4
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 18330 times.
✗ Branch 5 not taken.
18330 InitializeDnsSettings(handle, complete_hostname_);
886
887 CURLcode retval;
888
2/2
✓ Branch 0 taken 18255 times.
✓ Branch 1 taken 75 times.
18330 if ((info->request == JobInfo::kReqHeadOnly)
889
2/2
✓ Branch 0 taken 9135 times.
✓ Branch 1 taken 9120 times.
18255 || (info->request == JobInfo::kReqHeadPut)
890
2/2
✓ Branch 0 taken 15 times.
✓ Branch 1 taken 9120 times.
9135 || (info->request == JobInfo::kReqDelete)) {
891
1/2
✓ Branch 1 taken 9210 times.
✗ Branch 2 not taken.
9210 retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 0);
892
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9210 times.
9210 assert(retval == CURLE_OK);
893
1/2
✓ Branch 1 taken 9210 times.
✗ Branch 2 not taken.
9210 retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
894
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9210 times.
9210 assert(retval == CURLE_OK);
895
896
2/2
✓ Branch 0 taken 15 times.
✓ Branch 1 taken 9195 times.
9210 if (info->request == JobInfo::kReqDelete) {
897
2/4
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 15 times.
✗ Branch 6 not taken.
15 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST,
898 GetRequestString(*info).c_str());
899
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15 times.
15 assert(retval == CURLE_OK);
900 } else {
901
1/2
✓ Branch 1 taken 9195 times.
✗ Branch 2 not taken.
9195 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
902
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9195 times.
9195 assert(retval == CURLE_OK);
903 }
904 } else {
905
1/2
✓ Branch 1 taken 9120 times.
✗ Branch 2 not taken.
9120 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
906
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9120 times.
9120 assert(retval == CURLE_OK);
907
1/2
✓ Branch 1 taken 9120 times.
✗ Branch 2 not taken.
9120 retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 1);
908
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9120 times.
9120 assert(retval == CURLE_OK);
909
1/2
✓ Branch 1 taken 9120 times.
✗ Branch 2 not taken.
9120 retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 0);
910
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9120 times.
9120 assert(retval == CURLE_OK);
911
2/4
✓ Branch 2 taken 9120 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 9120 times.
✗ Branch 6 not taken.
9120 retval = curl_easy_setopt(handle, CURLOPT_INFILESIZE_LARGE,
912 static_cast<curl_off_t>(info->origin->GetSize()));
913
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9120 times.
9120 assert(retval == CURLE_OK);
914
915
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9120 times.
9120 if (info->request == JobInfo::kReqPutDotCvmfs) {
916 info->http_headers = curl_slist_append(
917 info->http_headers, dot_cvmfs_cache_control_header.c_str());
918
1/2
✓ Branch 0 taken 9120 times.
✗ Branch 1 not taken.
9120 } else if (info->request == JobInfo::kReqPutCas) {
919
1/2
✓ Branch 1 taken 9120 times.
✗ Branch 2 not taken.
9120 info->http_headers = curl_slist_append(info->http_headers,
920 kCacheControlCas);
921 }
922 }
923
924 bool retval_b;
925
926 // Authorization
927 18330 vector<string> authz_headers;
928
1/4
✓ Branch 0 taken 18330 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
18330 switch (config_.authz_method) {
929 18330 case kAuthzAwsV2:
930
1/2
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
18330 retval_b = MkV2Authz(*info, &authz_headers);
931 18330 break;
932 case kAuthzAwsV4:
933 retval_b = MkV4Authz(*info, &authz_headers);
934 break;
935 case kAuthzAzure:
936 retval_b = MkAzureAuthz(*info, &authz_headers);
937 break;
938 default:
939 PANIC(NULL);
940 }
941
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18330 times.
18330 if (!retval_b)
942 return kFailLocalIO;
943
2/2
✓ Branch 1 taken 73230 times.
✓ Branch 2 taken 18330 times.
91560 for (unsigned i = 0; i < authz_headers.size(); ++i) {
944
1/2
✓ Branch 2 taken 73230 times.
✗ Branch 3 not taken.
73230 info->http_headers = curl_slist_append(info->http_headers,
945 73230 authz_headers[i].c_str());
946 }
947
948 // Common headers
949
1/2
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
18330 info->http_headers = curl_slist_append(info->http_headers,
950 "Connection: Keep-Alive");
951
1/2
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
18330 info->http_headers = curl_slist_append(info->http_headers, "Pragma:");
952 // No 100-continue
953
1/2
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
18330 info->http_headers = curl_slist_append(info->http_headers, "Expect:");
954 // Strip unnecessary header
955
1/2
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
18330 info->http_headers = curl_slist_append(info->http_headers, "Accept:");
956
1/2
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
18330 info->http_headers = curl_slist_append(info->http_headers,
957 18330 user_agent_->c_str());
958
959 // Set curl parameters
960
1/2
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
18330 retval = curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
961
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18330 times.
18330 assert(retval == CURLE_OK);
962
1/2
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
18330 retval = curl_easy_setopt(handle, CURLOPT_HEADERDATA,
963 static_cast<void *>(info));
964
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18330 times.
18330 assert(retval == CURLE_OK);
965
1/2
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
18330 retval = curl_easy_setopt(handle, CURLOPT_READDATA,
966 static_cast<void *>(info));
967
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18330 times.
18330 assert(retval == CURLE_OK);
968
1/2
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
18330 retval = curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->http_headers);
969
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18330 times.
18330 assert(retval == CURLE_OK);
970
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18330 times.
18330 if (opt_ipv4_only_) {
971 retval = curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
972 assert(retval == CURLE_OK);
973 }
974 // Follow HTTP redirects
975
1/2
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
18330 retval = curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L);
976
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18330 times.
18330 assert(retval == CURLE_OK);
977
978
1/2
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
18330 retval = curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, info->errorbuffer);
979
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18330 times.
18330 assert(retval == CURLE_OK);
980
981
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 18330 times.
18330 if (config_.protocol == "https") {
982 retval = curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 1L);
983 assert(retval == CURLE_OK);
984 retval = curl_easy_setopt(handle, CURLOPT_PROXY_SSL_VERIFYPEER, 1L);
985 assert(retval == CURLE_OK);
986 const bool add_cert = ssl_certificate_store_.ApplySslCertificatePath(
987 handle);
988 assert(add_cert);
989 }
990
991 18330 return kFailOk;
992 18330 }
993
994
995 /**
996 * Sets the URL specific options such as host to use and timeout.
997 */
998 18330 void S3FanoutManager::SetUrlOptions(JobInfo *info) const {
999 18330 CURL *curl_handle = info->curl_handle;
1000 CURLcode retval;
1001
1002
1/2
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
18330 retval = curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT,
1003 config_.opt_timeout_sec);
1004
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18330 times.
18330 assert(retval == CURLE_OK);
1005
1/2
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
18330 retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT,
1006 kLowSpeedLimit);
1007
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18330 times.
18330 assert(retval == CURLE_OK);
1008
1/2
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
18330 retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME,
1009 config_.opt_timeout_sec);
1010
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18330 times.
18330 assert(retval == CURLE_OK);
1011
1012
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18330 times.
18330 if (is_curl_debug_) {
1013 retval = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE, 1);
1014 assert(retval == CURLE_OK);
1015 }
1016
1017
1/2
✓ Branch 1 taken 18330 times.
✗ Branch 2 not taken.
18330 const string url = MkUrl(info->object_key);
1018
1/2
✓ Branch 2 taken 18330 times.
✗ Branch 3 not taken.
18330 retval = curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
1019
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18330 times.
18330 assert(retval == CURLE_OK);
1020
1021
1/2
✓ Branch 2 taken 18330 times.
✗ Branch 3 not taken.
18330 retval = curl_easy_setopt(curl_handle, CURLOPT_PROXY, config_.proxy.c_str());
1022
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18330 times.
18330 assert(retval == CURLE_OK);
1023 18330 }
1024
1025
1026 /**
1027 * Adds transfer time and uploaded bytes to the global counters.
1028 */
1029 18390 void S3FanoutManager::UpdateStatistics(CURL *handle) {
1030 double val;
1031
1032
2/4
✓ Branch 1 taken 18390 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 18390 times.
✗ Branch 4 not taken.
18390 if (curl_easy_getinfo(handle, CURLINFO_SIZE_UPLOAD, &val) == CURLE_OK)
1033 18390 statistics_->transferred_bytes += val;
1034 18390 }
1035
1036
1037 /**
1038 * Retry if possible and if not already done too often.
1039 */
1040 90 bool S3FanoutManager::CanRetry(const JobInfo *info) {
1041 90 return (info->error_code == kFailHostConnection
1042
1/2
✓ Branch 0 taken 90 times.
✗ Branch 1 not taken.
90 || info->error_code == kFailHostResolve
1043
1/2
✓ Branch 0 taken 90 times.
✗ Branch 1 not taken.
90 || info->error_code == kFailServiceUnavailable
1044
2/2
✓ Branch 0 taken 60 times.
✓ Branch 1 taken 30 times.
90 || info->error_code == kFailRetry)
1045
2/4
✓ Branch 0 taken 90 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 60 times.
✗ Branch 3 not taken.
180 && (info->num_retries < config_.opt_max_retries);
1046 }
1047
1048
1049 /**
1050 * Backoff for retry to introduce a jitter into a upload sequence.
1051 *
1052 * \return true if backoff has been performed, false otherwise
1053 */
1054 60 void S3FanoutManager::Backoff(JobInfo *info) {
1055
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
60 if (info->error_code != kFailRetry)
1056 info->num_retries++;
1057 60 statistics_->num_retries++;
1058
1059
1/2
✓ Branch 0 taken 60 times.
✗ Branch 1 not taken.
60 if (info->throttle_ms > 0) {
1060 60 LogCvmfs(kLogS3Fanout, kLogDebug, "throttling for %d ms",
1061 info->throttle_ms);
1062 60 const uint64_t now = platform_monotonic_time();
1063
1/2
✓ Branch 0 taken 60 times.
✗ Branch 1 not taken.
60 if ((info->throttle_timestamp + (info->throttle_ms / 1000)) >= now) {
1064
2/2
✓ Branch 0 taken 15 times.
✓ Branch 1 taken 45 times.
60 if ((now - timestamp_last_throttle_report_)
1065 > kThrottleReportIntervalSec) {
1066 15 LogCvmfs(kLogS3Fanout, kLogStdout,
1067 "Warning: S3 backend throttling %ums "
1068 "(total backoff time so far %lums)",
1069 15 info->throttle_ms, statistics_->ms_throttled);
1070 15 timestamp_last_throttle_report_ = now;
1071 }
1072 60 statistics_->ms_throttled += info->throttle_ms;
1073 60 SafeSleepMs(info->throttle_ms);
1074 }
1075 } else {
1076 if (info->backoff_ms == 0) {
1077 // Must be != 0
1078 info->backoff_ms = prng_.Next(config_.opt_backoff_init_ms + 1);
1079 } else {
1080 info->backoff_ms *= 2;
1081 }
1082 if (info->backoff_ms > config_.opt_backoff_max_ms)
1083 info->backoff_ms = config_.opt_backoff_max_ms;
1084
1085 LogCvmfs(kLogS3Fanout, kLogDebug, "backing off for %d ms",
1086 info->backoff_ms);
1087 SafeSleepMs(info->backoff_ms);
1088 }
1089 60 }
1090
1091
1092 /**
1093 * Checks the result of a curl request and implements the failure logic
1094 * and takes care of cleanup.
1095 *
1096 * @return true if request should be repeated, false otherwise
1097 */
1098 18390 bool S3FanoutManager::VerifyAndFinalize(const int curl_error, JobInfo *info) {
1099 18390 LogCvmfs(kLogS3Fanout, kLogDebug,
1100 "Verify uploaded/tested object %s "
1101 "(curl error %d, info error %d, info request %d)",
1102 18390 info->object_key.c_str(), curl_error, info->error_code,
1103 18390 info->request);
1104 18390 UpdateStatistics(info->curl_handle);
1105
1106 // Verification and error classification
1107
1/6
✓ Branch 0 taken 18390 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
18390 switch (curl_error) {
1108 18390 case CURLE_OK:
1109
2/2
✓ Branch 0 taken 18330 times.
✓ Branch 1 taken 60 times.
18390 if ((info->error_code != kFailRetry)
1110
2/2
✓ Branch 0 taken 9180 times.
✓ Branch 1 taken 9150 times.
18330 && (info->error_code != kFailNotFound)) {
1111 9180 info->error_code = kFailOk;
1112 }
1113 18390 break;
1114 case CURLE_UNSUPPORTED_PROTOCOL:
1115 case CURLE_URL_MALFORMAT:
1116 info->error_code = kFailBadRequest;
1117 break;
1118 case CURLE_COULDNT_RESOLVE_HOST:
1119 info->error_code = kFailHostResolve;
1120 break;
1121 case CURLE_COULDNT_CONNECT:
1122 case CURLE_OPERATION_TIMEDOUT:
1123 case CURLE_SEND_ERROR:
1124 case CURLE_RECV_ERROR:
1125 info->error_code = kFailHostConnection;
1126 break;
1127 case CURLE_ABORTED_BY_CALLBACK:
1128 case CURLE_WRITE_ERROR:
1129 // Error set by callback
1130 break;
1131 default:
1132 LogCvmfs(kLogS3Fanout, kLogStderr | kLogSyslogErr,
1133 "unexpected curl error (%d) while trying to upload %s: %s",
1134 curl_error, info->object_key.c_str(), info->errorbuffer);
1135 info->error_code = kFailOther;
1136 break;
1137 }
1138
1139 // Transform HEAD to PUT request
1140
2/2
✓ Branch 0 taken 9150 times.
✓ Branch 1 taken 9240 times.
18390 if ((info->error_code == kFailNotFound)
1141
2/2
✓ Branch 0 taken 9120 times.
✓ Branch 1 taken 30 times.
9150 && (info->request == JobInfo::kReqHeadPut)) {
1142 9120 LogCvmfs(kLogS3Fanout, kLogDebug, "not found: %s, uploading",
1143 info->object_key.c_str());
1144 9120 info->request = JobInfo::kReqPutCas;
1145 9120 curl_slist_free_all(info->http_headers);
1146 9120 info->http_headers = NULL;
1147 9120 const s3fanout::Failures init_failure = InitializeRequest(
1148 info, info->curl_handle);
1149
1150
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9120 times.
9120 if (init_failure != s3fanout::kFailOk) {
1151 PANIC(kLogStderr,
1152 "Failed to initialize CURL handle "
1153 "(error: %d - %s | errno: %d)",
1154 init_failure, Code2Ascii(init_failure), errno);
1155 }
1156 9120 SetUrlOptions(info);
1157 // Reset origin
1158 9120 info->origin->Rewind();
1159 9120 return true; // Again, Put
1160 }
1161
1162 // Determination if failed request should be repeated
1163 9270 bool try_again = false;
1164
2/2
✓ Branch 0 taken 90 times.
✓ Branch 1 taken 9180 times.
9270 if (info->error_code != kFailOk) {
1165 90 try_again = CanRetry(info);
1166 }
1167
2/2
✓ Branch 0 taken 60 times.
✓ Branch 1 taken 9210 times.
9270 if (try_again) {
1168
1/2
✓ Branch 0 taken 60 times.
✗ Branch 1 not taken.
60 if (info->request == JobInfo::kReqPutCas
1169
1/2
✓ Branch 0 taken 60 times.
✗ Branch 1 not taken.
60 || info->request == JobInfo::kReqPutDotCvmfs
1170
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
60 || info->request == JobInfo::kReqPutHtml) {
1171 LogCvmfs(kLogS3Fanout, kLogDebug, "Trying again to upload %s",
1172 info->object_key.c_str());
1173 // Reset origin
1174 info->origin->Rewind();
1175 }
1176 60 Backoff(info);
1177 60 info->error_code = kFailOk;
1178 60 info->http_error = 0;
1179 60 info->throttle_ms = 0;
1180 60 info->backoff_ms = 0;
1181 60 info->throttle_timestamp = 0;
1182 60 return true; // try again
1183 }
1184
1185 // Cleanup opened resources
1186 9210 info->origin.Destroy();
1187
1188
3/4
✓ Branch 0 taken 30 times.
✓ Branch 1 taken 9180 times.
✓ Branch 2 taken 30 times.
✗ Branch 3 not taken.
9210 if ((info->error_code != kFailOk) && (info->http_error != 0)
1189
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 30 times.
30 && (info->http_error != 404)) {
1190 LogCvmfs(kLogS3Fanout, kLogStderr, "S3: HTTP failure %d", info->http_error);
1191 }
1192 9210 return false; // stop transfer
1193 }
1194
1195
2/4
✓ Branch 3 taken 180 times.
✗ Branch 4 not taken.
✓ Branch 9 taken 180 times.
✗ Branch 10 not taken.
180 S3FanoutManager::S3FanoutManager(const S3Config &config) : config_(config) {
1196 180 atomic_init32(&multi_threaded_);
1197
1/2
✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
180 MakePipe(pipe_terminate_);
1198
1/2
✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
180 MakePipe(pipe_jobs_);
1199
1/2
✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
180 MakePipe(pipe_completed_);
1200
1201 int retval;
1202 180 jobs_todo_lock_ = reinterpret_cast<pthread_mutex_t *>(
1203 180 smalloc(sizeof(pthread_mutex_t)));
1204 180 retval = pthread_mutex_init(jobs_todo_lock_, NULL);
1205
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 180 times.
180 assert(retval == 0);
1206 180 curl_handle_lock_ = reinterpret_cast<pthread_mutex_t *>(
1207 180 smalloc(sizeof(pthread_mutex_t)));
1208 180 retval = pthread_mutex_init(curl_handle_lock_, NULL);
1209
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 180 times.
180 assert(retval == 0);
1210
1211
1/2
✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
180 active_requests_ = new set<JobInfo *>;
1212
1/2
✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
180 pool_handles_idle_ = new set<CURL *>;
1213
1/2
✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
180 pool_handles_inuse_ = new set<CURL *>;
1214
1/2
✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
180 curl_sharehandles_ = new map<CURL *, S3FanOutDnsEntry *>;
1215
1/2
✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
180 sharehandles_ = new set<S3FanOutDnsEntry *>;
1216 180 watch_fds_max_ = 4 * config_.pool_max_handles;
1217 180 max_available_jobs_ = 4 * config_.pool_max_handles;
1218
2/4
✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 180 times.
✗ Branch 5 not taken.
180 available_jobs_ = new Semaphore(max_available_jobs_);
1219
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 180 times.
180 assert(NULL != available_jobs_);
1220
1221
1/2
✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
180 statistics_ = new Statistics();
1222
1/2
✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
180 user_agent_ = new string();
1223
2/4
✓ Branch 2 taken 180 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 180 times.
✗ Branch 6 not taken.
180 *user_agent_ = "User-Agent: cvmfs " + string(CVMFS_VERSION);
1224
1/2
✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
180 complete_hostname_ = MkCompleteHostname();
1225
1/2
✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
180 dot_cvmfs_cache_control_header = MkDotCvmfsCacheControlHeader();
1226
1227
1/2
✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
180 const CURLcode cretval = curl_global_init(CURL_GLOBAL_ALL);
1228
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 180 times.
180 assert(cretval == CURLE_OK);
1229
1/2
✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
180 curl_multi_ = curl_multi_init();
1230
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 180 times.
180 assert(curl_multi_ != NULL);
1231 CURLMcode mretval;
1232
1/2
✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
180 mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION,
1233 CallbackCurlSocket);
1234
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 180 times.
180 assert(mretval == CURLM_OK);
1235
1/2
✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
180 mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1236 static_cast<void *>(this));
1237
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 180 times.
180 assert(mretval == CURLM_OK);
1238
1/2
✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
180 mretval = curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1239 config_.pool_max_handles);
1240
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 180 times.
180 assert(mretval == CURLM_OK);
1241
1242 180 prng_.InitLocaltime();
1243
1244 180 thread_upload_ = 0;
1245 180 timestamp_last_throttle_report_ = 0;
1246 180 is_curl_debug_ = (getenv("_CVMFS_CURL_DEBUG") != NULL);
1247
1248 // Parsing environment variables
1249 180 if ((getenv("CVMFS_IPV4_ONLY") != NULL)
1250
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 180 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 180 times.
180 && (strlen(getenv("CVMFS_IPV4_ONLY")) > 0)) {
1251 opt_ipv4_only_ = true;
1252 } else {
1253 180 opt_ipv4_only_ = false;
1254 }
1255
1256
1/2
✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
180 resolver_ = dns::CaresResolver::Create(opt_ipv4_only_, 2, 2000);
1257
1258 180 watch_fds_ = static_cast<struct pollfd *>(smalloc(4 * sizeof(struct pollfd)));
1259 180 watch_fds_size_ = 4;
1260 180 watch_fds_inuse_ = 0;
1261
1262
1/2
✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
180 ssl_certificate_store_.UseSystemCertificatePath();
1263 180 }
1264
1265 360 S3FanoutManager::~S3FanoutManager() {
1266 180 pthread_mutex_destroy(jobs_todo_lock_);
1267 180 free(jobs_todo_lock_);
1268 180 pthread_mutex_destroy(curl_handle_lock_);
1269 180 free(curl_handle_lock_);
1270
1271
1/2
✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
180 if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1272 // Shutdown I/O thread
1273 180 char buf = 'T';
1274 180 WritePipe(pipe_terminate_[1], &buf, 1);
1275 180 pthread_join(thread_upload_, NULL);
1276 }
1277 180 ClosePipe(pipe_terminate_);
1278 180 ClosePipe(pipe_jobs_);
1279 180 ClosePipe(pipe_completed_);
1280
1281 180 set<CURL *>::iterator i = pool_handles_idle_->begin();
1282 180 const set<CURL *>::const_iterator iEnd = pool_handles_idle_->end();
1283
2/2
✓ Branch 2 taken 300 times.
✓ Branch 3 taken 180 times.
480 for (; i != iEnd; ++i) {
1284 300 curl_easy_cleanup(*i);
1285 }
1286
1287 180 set<S3FanOutDnsEntry *>::iterator is = sharehandles_->begin();
1288 180 const set<S3FanOutDnsEntry *>::const_iterator isEnd = sharehandles_->end();
1289
2/2
✓ Branch 2 taken 150 times.
✓ Branch 3 taken 180 times.
330 for (; is != isEnd; ++is) {
1290 150 curl_share_cleanup((*is)->sharehandle);
1291 150 curl_slist_free_all((*is)->clist);
1292
1/2
✓ Branch 1 taken 150 times.
✗ Branch 2 not taken.
150 delete *is;
1293 }
1294 180 pool_handles_idle_->clear();
1295 180 curl_sharehandles_->clear();
1296 180 sharehandles_->clear();
1297
1/2
✓ Branch 0 taken 180 times.
✗ Branch 1 not taken.
180 delete active_requests_;
1298
1/2
✓ Branch 0 taken 180 times.
✗ Branch 1 not taken.
180 delete pool_handles_idle_;
1299
1/2
✓ Branch 0 taken 180 times.
✗ Branch 1 not taken.
180 delete pool_handles_inuse_;
1300
1/2
✓ Branch 0 taken 180 times.
✗ Branch 1 not taken.
180 delete curl_sharehandles_;
1301
1/2
✓ Branch 0 taken 180 times.
✗ Branch 1 not taken.
180 delete sharehandles_;
1302
1/2
✓ Branch 0 taken 180 times.
✗ Branch 1 not taken.
180 delete user_agent_;
1303 180 curl_multi_cleanup(curl_multi_);
1304
1305
1/2
✓ Branch 0 taken 180 times.
✗ Branch 1 not taken.
180 delete statistics_;
1306
1307
1/2
✓ Branch 0 taken 180 times.
✗ Branch 1 not taken.
180 delete available_jobs_;
1308
1309 180 curl_global_cleanup();
1310 180 }
1311
1312 /**
1313 * Spawns the I/O worker thread. No way back except ~S3FanoutManager.
1314 */
1315 180 void S3FanoutManager::Spawn() {
1316 180 LogCvmfs(kLogS3Fanout, kLogDebug, "S3FanoutManager spawned");
1317
1318 180 const int retval = pthread_create(&thread_upload_, NULL, MainUpload,
1319 static_cast<void *>(this));
1320
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 180 times.
180 assert(retval == 0);
1321
1322 180 atomic_inc32(&multi_threaded_);
1323 180 }
1324
1325 45 const Statistics &S3FanoutManager::GetStatistics() { return *statistics_; }
1326
1327 /**
1328 * Push new job to be uploaded to the S3 cloud storage.
1329 */
1330 9210 void S3FanoutManager::PushNewJob(JobInfo *info) {
1331 9210 available_jobs_->Increment();
1332 9210 WritePipe(pipe_jobs_[1], &info, sizeof(info));
1333 9210 }
1334
1335 /**
1336 * Push completed job to list of completed jobs
1337 */
1338 9390 void S3FanoutManager::PushCompletedJob(JobInfo *info) {
1339 9390 WritePipe(pipe_completed_[1], &info, sizeof(info));
1340 9390 }
1341
1342 /**
1343 * Pop completed job
1344 */
1345 9390 JobInfo *S3FanoutManager::PopCompletedJob() {
1346 JobInfo *info;
1347
1/2
✓ Branch 1 taken 9390 times.
✗ Branch 2 not taken.
9390 ReadPipe(pipe_completed_[0], &info, sizeof(info));
1348 9390 return info;
1349 }
1350
1351 //------------------------------------------------------------------------------
1352
1353
1354 string Statistics::Print() const {
1355 return "Transferred Bytes: " + StringifyInt(uint64_t(transferred_bytes))
1356 + "\n" + "Transfer duration: " + StringifyInt(uint64_t(transfer_time))
1357 + " s\n" + "Number of requests: " + StringifyInt(num_requests) + "\n"
1358 + "Number of retries: " + StringifyInt(num_retries) + "\n";
1359 }
1360
1361 } // namespace s3fanout
1362