GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/network/s3fanout.cc
Date: 2025-09-28 02:35:26
Exec Total Coverage
Lines: 573 813 70.5%
Branches: 412 1179 34.9%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 *
4 * Runs a thread using libcurls asynchronous I/O mode to push data to S3
5 */
6
7 #include "s3fanout.h"
8
9 #include <pthread.h>
10
11 #include <algorithm>
12 #include <cassert>
13 #include <cerrno>
14 #include <utility>
15
16 #include "upload_facility.h"
17 #include "util/concurrency.h"
18 #include "util/exception.h"
19 #include "util/platform.h"
20 #include "util/posix.h"
21 #include "util/string.h"
22
23 using namespace std; // NOLINT
24
25 namespace s3fanout {
26
27 const char *S3FanoutManager::kCacheControlCas = "Cache-Control: max-age=259200";
28 const unsigned S3FanoutManager::kDefault429ThrottleMs = 250;
29 const unsigned S3FanoutManager::kMax429ThrottleMs = 10000;
30 const unsigned S3FanoutManager::kThrottleReportIntervalSec = 10;
31 const unsigned S3FanoutManager::kDefaultHTTPPort = 80;
32 const unsigned S3FanoutManager::kDefaultHTTPSPort = 443;
33
34 12 std::string S3FanoutManager::MkDotCvmfsCacheControlHeader(unsigned defaultMaxAge, int overrideMaxAge)
35 {
36 const char *var;
37 int max_age_sec;
38 char *at_null_terminator_if_number;
39 12 bool value_determined = false;
40
41
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
12 if (overrideMaxAge >= 0) {
42 max_age_sec = overrideMaxAge;
43 value_determined = true;
44 }
45
46
1/2
✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
12 if (!value_determined) {
47 12 var = getenv("CVMFS_MAX_TTL_SECS");
48
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
12 if (var && var[0]) {
49 max_age_sec = strtoll(var, &at_null_terminator_if_number, 10);
50 if (*at_null_terminator_if_number == '\0'
51 && max_age_sec >= 0) {
52 value_determined = true;
53 }
54 }
55 }
56
57
1/2
✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
12 if (!value_determined) {
58 12 var = getenv("CVMFS_MAX_TTL");
59
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
12 if (var && var[0]) {
60 max_age_sec = strtoll(var, &at_null_terminator_if_number, 10);
61 if (*at_null_terminator_if_number == '\0'
62 && max_age_sec >= 0) {
63 max_age_sec *= 60;
64 value_determined = true;
65 }
66 }
67 }
68
69
1/2
✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
12 if (!value_determined) {
70 12 max_age_sec = defaultMaxAge;
71 }
72
73
2/4
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 12 times.
✗ Branch 5 not taken.
24 return "Cache-Control: max-age=" + std::to_string(max_age_sec);
74 }
75
76 /**
77 * Parses Retry-After and X-Retry-In headers attached to HTTP 429 responses
78 */
79 558 void S3FanoutManager::DetectThrottleIndicator(const std::string &header,
80 JobInfo *info) {
81 558 std::string value_str;
82
4/6
✓ Branch 2 taken 558 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 558 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 277 times.
✓ Branch 10 taken 281 times.
558 if (HasPrefix(header, "retry-after:", true))
83
1/2
✓ Branch 1 taken 277 times.
✗ Branch 2 not taken.
277 value_str = header.substr(12);
84
4/6
✓ Branch 2 taken 558 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 558 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 195 times.
✓ Branch 10 taken 363 times.
558 if (HasPrefix(header, "x-retry-in:", true))
85
1/2
✓ Branch 1 taken 195 times.
✗ Branch 2 not taken.
195 value_str = header.substr(11);
86
87
1/2
✓ Branch 1 taken 558 times.
✗ Branch 2 not taken.
558 value_str = Trim(value_str, true /* trim_newline */);
88
2/2
✓ Branch 1 taken 394 times.
✓ Branch 2 taken 164 times.
558 if (!value_str.empty()) {
89
1/2
✓ Branch 1 taken 394 times.
✗ Branch 2 not taken.
394 const unsigned value_numeric = String2Uint64(value_str);
90
2/4
✓ Branch 2 taken 394 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 394 times.
✗ Branch 6 not taken.
788 const unsigned value_ms = HasSuffix(value_str, "ms", true /* ignore_case */)
91
2/2
✓ Branch 0 taken 195 times.
✓ Branch 1 taken 199 times.
394 ? value_numeric
92 394 : (value_numeric * 1000);
93
2/2
✓ Branch 0 taken 355 times.
✓ Branch 1 taken 39 times.
394 if (value_ms > 0)
94 355 info->throttle_ms = std::min(value_ms, kMax429ThrottleMs);
95 }
96 558 }
97
98
99 /**
100 * Called by curl for every HTTP header. Not called for file:// transfers.
101 */
102 3682 static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb,
103 void *info_link) {
104 3682 const size_t num_bytes = size * nmemb;
105
1/2
✓ Branch 2 taken 3682 times.
✗ Branch 3 not taken.
3682 const string header_line(static_cast<const char *>(ptr), num_bytes);
106 3682 JobInfo *info = static_cast<JobInfo *>(info_link);
107
108 // Check for http status code errors
109
4/6
✓ Branch 2 taken 3682 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 3682 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 1226 times.
✓ Branch 10 taken 2456 times.
3682 if (HasPrefix(header_line, "HTTP/1.", false)) {
110
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1226 times.
1226 if (header_line.length() < 10)
111 return 0;
112
113 unsigned i;
114
5/6
✓ Branch 1 taken 2452 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1226 times.
✓ Branch 5 taken 1226 times.
✓ Branch 6 taken 1226 times.
✓ Branch 7 taken 1226 times.
2452 for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {
115 }
116
117
2/2
✓ Branch 1 taken 612 times.
✓ Branch 2 taken 614 times.
1226 if (header_line[i] == '2') {
118 612 return num_bytes;
119 } else {
120
1/2
✓ Branch 2 taken 614 times.
✗ Branch 3 not taken.
614 LogCvmfs(kLogS3Fanout, kLogDebug, "http status error code [info %p]: %s",
121 info, header_line.c_str());
122
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 614 times.
614 if (header_line.length() < i + 3) {
123 LogCvmfs(kLogS3Fanout, kLogStderr, "S3: invalid HTTP response '%s'",
124 header_line.c_str());
125 info->error_code = kFailOther;
126 return 0;
127 }
128
2/4
✓ Branch 3 taken 614 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 614 times.
✗ Branch 7 not taken.
614 info->http_error = String2Int64(string(&header_line[i], 3));
129
130
2/6
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 610 times.
✗ Branch 5 not taken.
614 switch (info->http_error) {
131 4 case 429:
132 4 info->error_code = kFailRetry;
133 4 info->throttle_ms = S3FanoutManager::kDefault429ThrottleMs;
134 4 info->throttle_timestamp = platform_monotonic_time();
135 4 return num_bytes;
136 case 503:
137 case 502: // Can happen if the S3 gateway-backend connection breaks
138 case 500: // sometimes see this as a transient error from S3
139 info->error_code = kFailServiceUnavailable;
140 break;
141 case 501:
142 case 400:
143 info->error_code = kFailBadRequest;
144 break;
145 case 403:
146 info->error_code = kFailForbidden;
147 break;
148 610 case 404:
149 610 info->error_code = kFailNotFound;
150 610 return num_bytes;
151 default:
152 info->error_code = kFailOther;
153 }
154 return 0;
155 }
156 }
157
158
2/2
✓ Branch 0 taken 12 times.
✓ Branch 1 taken 2444 times.
2456 if (info->error_code == kFailRetry) {
159
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 S3FanoutManager::DetectThrottleIndicator(header_line, info);
160 }
161
162 2456 return num_bytes;
163 3682 }
164
165
166 /**
167 * Called by curl for every new chunk to upload.
168 */
169 15669 static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb,
170 void *info_link) {
171 15669 const size_t num_bytes = size * nmemb;
172 15669 JobInfo *info = static_cast<JobInfo *>(info_link);
173
174 15669 LogCvmfs(kLogS3Fanout, kLogDebug, "Data callback with %zu bytes", num_bytes);
175
176
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15669 times.
15669 if (num_bytes == 0)
177 return 0;
178
179 15669 const uint64_t read_bytes = info->origin->Read(ptr, num_bytes);
180
181 15669 LogCvmfs(kLogS3Fanout, kLogDebug, "source buffer pushed out %lu bytes",
182 read_bytes);
183
184 15669 return read_bytes;
185 }
186
187
188 /**
189 * For the time being, ignore all received information in the HTTP body
190 */
191 static size_t CallbackCurlBody(char * /*ptr*/, size_t size, size_t nmemb,
192 void * /*userdata*/) {
193 return size * nmemb;
194 }
195
196
197 /**
198 * Called when new curl sockets arrive or existing curl sockets depart.
199 */
200 2720 int S3FanoutManager::CallbackCurlSocket(CURL *easy, curl_socket_t s, int action,
201 void *userp, void *socketp) {
202 2720 S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(userp);
203 2720 LogCvmfs(kLogS3Fanout, kLogDebug,
204 "CallbackCurlSocket called with easy "
205 "handle %p, socket %d, action %d, up %p, "
206 "sp %p, fds_inuse %d, jobs %d",
207 easy, s, action, userp, socketp, s3fanout_mgr->watch_fds_inuse_,
208 2720 s3fanout_mgr->available_jobs_->Get());
209
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2720 times.
2720 if (action == CURL_POLL_NONE)
210 return 0;
211
212 // Find s in watch_fds_
213 // First 2 fds are job and terminate pipes (not curl related)
214 unsigned index;
215
2/2
✓ Branch 0 taken 4504 times.
✓ Branch 1 taken 1225 times.
5729 for (index = 2; index < s3fanout_mgr->watch_fds_inuse_; ++index) {
216
2/2
✓ Branch 0 taken 1495 times.
✓ Branch 1 taken 3009 times.
4504 if (s3fanout_mgr->watch_fds_[index].fd == s)
217 1495 break;
218 }
219 // Or create newly
220
2/2
✓ Branch 0 taken 1225 times.
✓ Branch 1 taken 1495 times.
2720 if (index == s3fanout_mgr->watch_fds_inuse_) {
221 // Extend array if necessary
222
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1223 times.
1225 if (s3fanout_mgr->watch_fds_inuse_ == s3fanout_mgr->watch_fds_size_) {
223 2 s3fanout_mgr->watch_fds_size_ *= 2;
224 2 s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
225 2 srealloc(s3fanout_mgr->watch_fds_,
226 2 s3fanout_mgr->watch_fds_size_ * sizeof(struct pollfd)));
227 }
228 1225 s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].fd = s;
229 1225 s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].events = 0;
230 1225 s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].revents = 0;
231 1225 s3fanout_mgr->watch_fds_inuse_++;
232 }
233
234
4/5
✓ Branch 0 taken 1225 times.
✓ Branch 1 taken 6 times.
✓ Branch 2 taken 264 times.
✓ Branch 3 taken 1225 times.
✗ Branch 4 not taken.
2720 switch (action) {
235 1225 case CURL_POLL_IN:
236 1225 s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI;
237 1225 break;
238 6 case CURL_POLL_OUT:
239 6 s3fanout_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND;
240 6 break;
241 264 case CURL_POLL_INOUT:
242 264 s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI | POLLOUT
243 | POLLWRBAND;
244 264 break;
245 1225 case CURL_POLL_REMOVE:
246
2/2
✓ Branch 0 taken 193 times.
✓ Branch 1 taken 1032 times.
1225 if (index < s3fanout_mgr->watch_fds_inuse_ - 1)
247 s3fanout_mgr
248 193 ->watch_fds_[index] = s3fanout_mgr->watch_fds_
249 193 [s3fanout_mgr->watch_fds_inuse_ - 1];
250 1225 s3fanout_mgr->watch_fds_inuse_--;
251 // Shrink array if necessary
252
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1225 times.
1225 if ((s3fanout_mgr->watch_fds_inuse_ > s3fanout_mgr->watch_fds_max_)
253 && (s3fanout_mgr->watch_fds_inuse_
254 < s3fanout_mgr->watch_fds_size_ / 2)) {
255 s3fanout_mgr->watch_fds_size_ /= 2;
256 s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
257 srealloc(s3fanout_mgr->watch_fds_,
258 s3fanout_mgr->watch_fds_size_ * sizeof(struct pollfd)));
259 }
260 1225 break;
261 default:
262 PANIC(NULL);
263 }
264
265 2720 return 0;
266 }
267
268
269 /**
270 * Worker thread event loop.
271 */
272 12 void *S3FanoutManager::MainUpload(void *data) {
273
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread started");
274 12 S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(data);
275
276 12 s3fanout_mgr->InitPipeWatchFds();
277
278 // Don't schedule more jobs into the multi handle than the maximum number of
279 // parallel connections. This should prevent starvation and thus a timeout
280 // of the authorization header (CVM-1339).
281 12 unsigned jobs_in_flight = 0;
282
283 while (true) {
284 // Check events with 100ms timeout
285 16755 const int timeout_ms = 100;
286
1/2
✓ Branch 1 taken 16755 times.
✗ Branch 2 not taken.
16755 int retval = poll(s3fanout_mgr->watch_fds_, s3fanout_mgr->watch_fds_inuse_,
287 timeout_ms);
288
2/2
✓ Branch 0 taken 102 times.
✓ Branch 1 taken 16653 times.
16755 if (retval == 0) {
289 // Handle timeout
290 102 int still_running = 0;
291
1/2
✓ Branch 1 taken 102 times.
✗ Branch 2 not taken.
102 retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
292 CURL_SOCKET_TIMEOUT, 0, &still_running);
293
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 102 times.
102 if (retval != CURLM_OK) {
294 LogCvmfs(kLogS3Fanout, kLogStderr, "Error, timeout due to: %d", retval);
295 assert(retval == CURLM_OK);
296 }
297
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 16653 times.
16653 } else if (retval < 0) {
298 assert(errno == EINTR);
299 continue;
300 }
301
302 // Terminate I/O thread
303
2/2
✓ Branch 0 taken 12 times.
✓ Branch 1 taken 16743 times.
16755 if (s3fanout_mgr->watch_fds_[0].revents)
304 12 break;
305
306 // New job incoming
307
2/2
✓ Branch 0 taken 614 times.
✓ Branch 1 taken 16129 times.
16743 if (s3fanout_mgr->watch_fds_[1].revents) {
308 614 s3fanout_mgr->watch_fds_[1].revents = 0;
309 JobInfo *info;
310
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 ReadPipe(s3fanout_mgr->pipe_jobs_[0], &info, sizeof(info));
311
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 CURL *handle = s3fanout_mgr->AcquireCurlHandle();
312
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 614 times.
614 if (handle == NULL) {
313 PANIC(kLogStderr, "Failed to acquire CURL handle.");
314 }
315
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 const s3fanout::Failures init_failure = s3fanout_mgr->InitializeRequest(
316 info, handle);
317
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 614 times.
614 if (init_failure != s3fanout::kFailOk) {
318 PANIC(kLogStderr,
319 "Failed to initialize CURL handle (error: %d - %s | errno: %d)",
320 init_failure, Code2Ascii(init_failure), errno);
321 }
322
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 s3fanout_mgr->SetUrlOptions(info);
323
324
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 curl_multi_add_handle(s3fanout_mgr->curl_multi_, handle);
325
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 s3fanout_mgr->active_requests_->insert(info);
326 614 jobs_in_flight++;
327 614 int still_running = 0, retval = 0;
328
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
329 CURL_SOCKET_TIMEOUT, 0, &still_running);
330
331
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 LogCvmfs(kLogS3Fanout, kLogDebug, "curl_multi_socket_action: %d - %d",
332 retval, still_running);
333 }
334
335
336 // Activity on curl sockets
337 // Within this loop the curl_multi_socket_action() may cause socket(s)
338 // to be removed from watch_fds_. If a socket is removed it is replaced
339 // by the socket at the end of the array and the inuse count is decreased.
340 // Therefore loop over the array in reverse order.
341 // First 2 fds are job and terminate pipes (not curl related)
342
2/2
✓ Branch 0 taken 38228 times.
✓ Branch 1 taken 16743 times.
54971 for (int32_t i = s3fanout_mgr->watch_fds_inuse_ - 1; i >= 2; --i) {
343
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 38228 times.
38228 if (static_cast<uint32_t>(i) >= s3fanout_mgr->watch_fds_inuse_) {
344 continue;
345 }
346
2/2
✓ Branch 0 taken 17068 times.
✓ Branch 1 taken 21160 times.
38228 if (s3fanout_mgr->watch_fds_[i].revents) {
347 17068 int ev_bitmask = 0;
348
2/2
✓ Branch 0 taken 1833 times.
✓ Branch 1 taken 15235 times.
17068 if (s3fanout_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI))
349 1833 ev_bitmask |= CURL_CSELECT_IN;
350
2/2
✓ Branch 0 taken 15235 times.
✓ Branch 1 taken 1833 times.
17068 if (s3fanout_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
351 15235 ev_bitmask |= CURL_CSELECT_OUT;
352 17068 if (s3fanout_mgr->watch_fds_[i].revents
353
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 17068 times.
17068 & (POLLERR | POLLHUP | POLLNVAL))
354 ev_bitmask |= CURL_CSELECT_ERR;
355 17068 s3fanout_mgr->watch_fds_[i].revents = 0;
356
357 17068 int still_running = 0;
358 17068 retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
359
1/2
✓ Branch 1 taken 17068 times.
✗ Branch 2 not taken.
17068 s3fanout_mgr->watch_fds_[i].fd,
360 ev_bitmask,
361 &still_running);
362 }
363 }
364
365 // Check if transfers are completed
366 CURLMsg *curl_msg;
367 int msgs_in_queue;
368
3/4
✓ Branch 1 taken 17969 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1226 times.
✓ Branch 4 taken 16743 times.
17969 while ((curl_msg = curl_multi_info_read(s3fanout_mgr->curl_multi_,
369 &msgs_in_queue))) {
370
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1226 times.
1226 assert(curl_msg->msg == CURLMSG_DONE);
371
372 1226 s3fanout_mgr->statistics_->num_requests++;
373 JobInfo *info;
374 1226 CURL *easy_handle = curl_msg->easy_handle;
375 1226 const int curl_error = curl_msg->data.result;
376
1/2
✓ Branch 1 taken 1226 times.
✗ Branch 2 not taken.
1226 curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
377
378
1/2
✓ Branch 1 taken 1226 times.
✗ Branch 2 not taken.
1226 curl_multi_remove_handle(s3fanout_mgr->curl_multi_, easy_handle);
379
3/4
✓ Branch 1 taken 1226 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 612 times.
✓ Branch 4 taken 614 times.
1226 if (s3fanout_mgr->VerifyAndFinalize(curl_error, info)) {
380
1/2
✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
612 curl_multi_add_handle(s3fanout_mgr->curl_multi_, easy_handle);
381 612 int still_running = 0;
382
1/2
✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
612 curl_multi_socket_action(s3fanout_mgr->curl_multi_, CURL_SOCKET_TIMEOUT,
383 0, &still_running);
384 } else {
385 // Return easy handle into pool and write result back
386 614 jobs_in_flight--;
387
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 s3fanout_mgr->active_requests_->erase(info);
388
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 s3fanout_mgr->ReleaseCurlHandle(info, easy_handle);
389
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 s3fanout_mgr->available_jobs_->Decrement();
390
391 // Add to list of completed jobs
392
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 s3fanout_mgr->PushCompletedJob(info);
393 }
394 }
395 16743 }
396
397 12 set<CURL *>::iterator i = s3fanout_mgr->pool_handles_inuse_->begin();
398 12 const set<CURL *>::const_iterator i_end = s3fanout_mgr->pool_handles_inuse_
399 12 ->end();
400
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 12 times.
12 for (; i != i_end; ++i) {
401 curl_multi_remove_handle(s3fanout_mgr->curl_multi_, *i);
402 curl_easy_cleanup(*i);
403 }
404 12 s3fanout_mgr->pool_handles_inuse_->clear();
405 12 free(s3fanout_mgr->watch_fds_);
406
407
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread terminated");
408 12 return NULL;
409 }
410
411
412 /**
413 * Gets an idle CURL handle from the pool. Creates a new one and adds it to
414 * the pool if necessary.
415 */
416 614 CURL *S3FanoutManager::AcquireCurlHandle() const {
417 CURL *handle;
418
419 614 const MutexLockGuard guard(curl_handle_lock_);
420
421
2/2
✓ Branch 1 taken 49 times.
✓ Branch 2 taken 565 times.
614 if (pool_handles_idle_->empty()) {
422 CURLcode retval;
423
424 // Create a new handle
425
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 handle = curl_easy_init();
426
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 49 times.
49 assert(handle != NULL);
427
428 // Other settings
429
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 retval = curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
430
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 49 times.
49 assert(retval == CURLE_OK);
431
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 retval = curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION,
432 CallbackCurlHeader);
433
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 49 times.
49 assert(retval == CURLE_OK);
434
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 retval = curl_easy_setopt(handle, CURLOPT_READFUNCTION, CallbackCurlData);
435
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 49 times.
49 assert(retval == CURLE_OK);
436
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 retval = curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlBody);
437
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 49 times.
49 assert(retval == CURLE_OK);
438 } else {
439 565 handle = *(pool_handles_idle_->begin());
440
1/2
✓ Branch 2 taken 565 times.
✗ Branch 3 not taken.
565 pool_handles_idle_->erase(pool_handles_idle_->begin());
441 }
442
443
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 pool_handles_inuse_->insert(handle);
444
445 614 return handle;
446 614 }
447
448
449 614 void S3FanoutManager::ReleaseCurlHandle(JobInfo *info, CURL *handle) const {
450
1/2
✓ Branch 0 taken 614 times.
✗ Branch 1 not taken.
614 if (info->http_headers) {
451
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 curl_slist_free_all(info->http_headers);
452 614 info->http_headers = NULL;
453 }
454
455 614 const MutexLockGuard guard(curl_handle_lock_);
456
457
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 const set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
458
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 614 times.
614 assert(elem != pool_handles_inuse_->end());
459
460
2/2
✓ Branch 1 taken 29 times.
✓ Branch 2 taken 585 times.
614 if (pool_handles_idle_->size() > config_.pool_max_handles) {
461
1/2
✓ Branch 1 taken 29 times.
✗ Branch 2 not taken.
29 const CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, NULL);
462
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 assert(retval == CURLE_OK);
463
1/2
✓ Branch 1 taken 29 times.
✗ Branch 2 not taken.
29 curl_easy_cleanup(handle);
464 const std::map<CURL *, S3FanOutDnsEntry *>::size_type
465
1/2
✓ Branch 1 taken 29 times.
✗ Branch 2 not taken.
29 retitems = curl_sharehandles_->erase(handle);
466
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 29 times.
29 assert(retitems == 1);
467 } else {
468
1/2
✓ Branch 1 taken 585 times.
✗ Branch 2 not taken.
585 pool_handles_idle_->insert(handle);
469 }
470
471
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 pool_handles_inuse_->erase(elem);
472 614 }
473
474 12 void S3FanoutManager::InitPipeWatchFds() {
475
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
12 assert(watch_fds_inuse_ == 0);
476
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
12 assert(watch_fds_size_ >= 2);
477 12 watch_fds_[0].fd = pipe_terminate_[0];
478 12 watch_fds_[0].events = POLLIN | POLLPRI;
479 12 watch_fds_[0].revents = 0;
480 12 ++watch_fds_inuse_;
481 12 watch_fds_[1].fd = pipe_jobs_[0];
482 12 watch_fds_[1].events = POLLIN | POLLPRI;
483 12 watch_fds_[1].revents = 0;
484 12 ++watch_fds_inuse_;
485 12 }
486
487 /**
488 * The Amazon AWS 2 authorization header according to
489 * http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html#ConstructingTheAuthenticationHeader
490 */
491 1222 bool S3FanoutManager::MkV2Authz(const JobInfo &info,
492 vector<string> *headers) const {
493 1222 string payload_hash;
494
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 const bool retval = MkPayloadHash(info, &payload_hash);
495
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 if (!retval)
496 return false;
497
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 const string content_type = GetContentType(info);
498
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 const string request = GetRequestString(info);
499
500
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 const string timestamp = RfcTimestamp();
501
5/10
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 1222 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 1222 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 1222 times.
✗ Branch 14 not taken.
2444 string to_sign = request + "\n" + payload_hash + "\n" + content_type + "\n"
502
2/4
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
2444 + timestamp + "\n";
503
2/4
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1222 times.
✗ Branch 4 not taken.
1222 if (config_.x_amz_acl != "") {
504
2/4
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
2444 to_sign += "x-amz-acl:" + config_.x_amz_acl + "\n" + // default ACL
505
5/10
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 1222 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 1222 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 1222 times.
✗ Branch 14 not taken.
2444 "/" + config_.bucket + "/" + info.object_key;
506 }
507
1/2
✓ Branch 3 taken 1222 times.
✗ Branch 4 not taken.
1222 LogCvmfs(kLogS3Fanout, kLogDebug, "%s string to sign for: %s",
508 request.c_str(), info.object_key.c_str());
509
510
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 shash::Any hmac;
511 1222 hmac.algorithm = shash::kSha1;
512
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 shash::Hmac(config_.secret_key,
513 1222 reinterpret_cast<const unsigned char *>(to_sign.data()),
514 1222 to_sign.length(), &hmac);
515
516
3/6
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 1222 times.
✗ Branch 8 not taken.
3666 headers->push_back("Authorization: AWS " + config_.access_key + ":"
517
3/6
✓ Branch 2 taken 1222 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1222 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 1222 times.
✗ Branch 9 not taken.
6110 + Base64(string(reinterpret_cast<char *>(hmac.digest),
518 1222 hmac.GetDigestSize())));
519
2/4
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
1222 headers->push_back("Date: " + timestamp);
520
2/4
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
1222 headers->push_back("X-Amz-Acl: " + config_.x_amz_acl);
521
2/2
✓ Branch 1 taken 608 times.
✓ Branch 2 taken 614 times.
1222 if (!payload_hash.empty())
522
2/4
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 608 times.
✗ Branch 5 not taken.
608 headers->push_back("Content-MD5: " + payload_hash);
523
2/2
✓ Branch 1 taken 608 times.
✓ Branch 2 taken 614 times.
1222 if (!content_type.empty())
524
2/4
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 608 times.
✗ Branch 5 not taken.
608 headers->push_back("Content-Type: " + content_type);
525 1222 return true;
526 1222 }
527
528
529 string S3FanoutManager::GetUriEncode(const string &val,
530 bool encode_slash) const {
531 string result;
532 const unsigned len = val.length();
533 result.reserve(len);
534 for (unsigned i = 0; i < len; ++i) {
535 const char c = val[i];
536 if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')
537 || (c >= '0' && c <= '9') || c == '_' || c == '-' || c == '~'
538 || c == '.') {
539 result.push_back(c);
540 } else if (c == '/') {
541 if (encode_slash) {
542 result += "%2F";
543 } else {
544 result.push_back(c);
545 }
546 } else {
547 result.push_back('%');
548 result.push_back((c / 16) + ((c / 16 <= 9) ? '0' : 'A' - 10));
549 result.push_back((c % 16) + ((c % 16 <= 9) ? '0' : 'A' - 10));
550 }
551 }
552 return result;
553 }
554
555
556 string S3FanoutManager::GetAwsV4SigningKey(const string &date) const {
557 if (last_signing_key_.first == date)
558 return last_signing_key_.second;
559
560 const string date_key = shash::Hmac256("AWS4" + config_.secret_key, date,
561 true);
562 const string date_region_key = shash::Hmac256(date_key, config_.region, true);
563 const string date_region_service_key = shash::Hmac256(date_region_key, "s3",
564 true);
565 string signing_key = shash::Hmac256(date_region_service_key, "aws4_request",
566 true);
567 last_signing_key_.first = date;
568 last_signing_key_.second = signing_key;
569 return signing_key;
570 }
571
572
573 /**
574 * The Amazon AWS4 authorization header according to
575 * http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-auth-using-authorization-header.html
576 */
577 bool S3FanoutManager::MkV4Authz(const JobInfo &info,
578 vector<string> *headers) const {
579 string payload_hash;
580 const bool retval = MkPayloadHash(info, &payload_hash);
581 if (!retval)
582 return false;
583 const string content_type = GetContentType(info);
584 const string timestamp = IsoTimestamp();
585 const string date = timestamp.substr(0, 8);
586 vector<string> tokens = SplitString(complete_hostname_, ':');
587 assert(tokens.size() <= 2);
588 string canonical_hostname = tokens[0];
589
590 // if we could split the hostname in two and if the port is *NOT* a default
591 // one
592 if (tokens.size() == 2
593 && !((String2Uint64(tokens[1]) == kDefaultHTTPPort)
594 || (String2Uint64(tokens[1]) == kDefaultHTTPSPort)))
595 canonical_hostname += ":" + tokens[1];
596
597 string signed_headers;
598 string canonical_headers;
599 if (!content_type.empty()) {
600 signed_headers += "content-type;";
601 headers->push_back("Content-Type: " + content_type);
602 canonical_headers += "content-type:" + content_type + "\n";
603 }
604 if (config_.x_amz_acl != "") {
605 signed_headers += "host;x-amz-acl;x-amz-content-sha256;x-amz-date";
606 } else {
607 signed_headers += "host;x-amz-content-sha256;x-amz-date";
608 }
609 canonical_headers += "host:" + canonical_hostname + "\n";
610 if (config_.x_amz_acl != "") {
611 canonical_headers += "x-amz-acl:" + config_.x_amz_acl + "\n";
612 }
613 canonical_headers += "x-amz-content-sha256:" + payload_hash + "\n"
614 + "x-amz-date:" + timestamp + "\n";
615
616 const string scope = date + "/" + config_.region + "/s3/aws4_request";
617 const string uri = config_.dns_buckets ? (string("/") + info.object_key)
618 : (string("/") + config_.bucket + "/"
619 + info.object_key);
620
621 const string canonical_request = GetRequestString(info) + "\n"
622 + GetUriEncode(uri, false) + "\n" + "\n"
623 + canonical_headers + "\n" + signed_headers
624 + "\n" + payload_hash;
625
626 const string hash_request = shash::Sha256String(canonical_request.c_str());
627
628 const string string_to_sign = "AWS4-HMAC-SHA256\n" + timestamp + "\n" + scope
629 + "\n" + hash_request;
630
631 const string signing_key = GetAwsV4SigningKey(date);
632 const string signature = shash::Hmac256(signing_key, string_to_sign);
633
634 headers->push_back("X-Amz-Acl: " + config_.x_amz_acl);
635 headers->push_back("X-Amz-Content-Sha256: " + payload_hash);
636 headers->push_back("X-Amz-Date: " + timestamp);
637 headers->push_back("Authorization: AWS4-HMAC-SHA256 "
638 "Credential="
639 + config_.access_key + "/" + scope
640 + ","
641 "SignedHeaders="
642 + signed_headers
643 + ","
644 "Signature="
645 + signature);
646 return true;
647 }
648
649 /**
650 * The Azure Blob authorization header according to
651 * https://docs.microsoft.com/en-us/rest/api/storageservices/authorize-with-shared-key
652 */
653 bool S3FanoutManager::MkAzureAuthz(const JobInfo &info,
654 vector<string> *headers) const {
655 const string timestamp = RfcTimestamp();
656 const string canonical_headers = "x-ms-blob-type:BlockBlob\nx-ms-date:"
657 + timestamp + "\nx-ms-version:2011-08-18";
658 const string canonical_resource = "/" + config_.access_key + "/"
659 + config_.bucket + "/" + info.object_key;
660
661 string string_to_sign;
662 if ((info.request == JobInfo::kReqHeadOnly)
663 || (info.request == JobInfo::kReqHeadPut)
664 || (info.request == JobInfo::kReqDelete)) {
665 string_to_sign = GetRequestString(info) + string("\n\n\n")
666 + "\n\n\n\n\n\n\n\n\n" + canonical_headers + "\n"
667 + canonical_resource;
668 } else {
669 string_to_sign = GetRequestString(info) + string("\n\n\n")
670 + string(StringifyInt(info.origin->GetSize()))
671 + "\n\n\n\n\n\n\n\n\n" + canonical_headers + "\n"
672 + canonical_resource;
673 }
674
675 string signing_key;
676 const int retval = Debase64(config_.secret_key, &signing_key);
677 if (!retval)
678 return false;
679
680 const string signature = shash::Hmac256(signing_key, string_to_sign, true);
681
682 headers->push_back("x-ms-date: " + timestamp);
683 headers->push_back("x-ms-version: 2011-08-18");
684 headers->push_back("Authorization: SharedKey " + config_.access_key + ":"
685 + Base64(signature));
686 headers->push_back("x-ms-blob-type: BlockBlob");
687 return true;
688 }
689
690 1222 void S3FanoutManager::InitializeDnsSettingsCurl(CURL *handle,
691 CURLSH *sharehandle,
692 curl_slist *clist) const {
693 1222 CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, sharehandle);
694
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 assert(retval == CURLE_OK);
695 1222 retval = curl_easy_setopt(handle, CURLOPT_RESOLVE, clist);
696
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 assert(retval == CURLE_OK);
697 1222 }
698
699
700 1222 int S3FanoutManager::InitializeDnsSettings(CURL *handle,
701 std::string host_with_port) const {
702 // Use existing handle
703 const std::map<CURL *, S3FanOutDnsEntry *>::const_iterator
704
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 it = curl_sharehandles_->find(handle);
705
2/2
✓ Branch 3 taken 1173 times.
✓ Branch 4 taken 49 times.
1222 if (it != curl_sharehandles_->end()) {
706
1/2
✓ Branch 1 taken 1173 times.
✗ Branch 2 not taken.
1173 InitializeDnsSettingsCurl(handle, it->second->sharehandle,
707 1173 it->second->clist);
708 1173 return 0;
709 }
710
711 // Add protocol information for extraction of fields for DNS
712
2/4
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 49 times.
✗ Branch 4 not taken.
49 if (!IsHttpUrl(host_with_port))
713
2/4
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 49 times.
✗ Branch 5 not taken.
49 host_with_port = config_.protocol + "://" + host_with_port;
714
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 const std::string remote_host = dns::ExtractHost(host_with_port);
715
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 const std::string remote_port = dns::ExtractPort(host_with_port);
716
717 // If we have the name already resolved, use the least used IP
718 49 S3FanOutDnsEntry *useme = NULL;
719 49 unsigned int usemin = UINT_MAX;
720 49 std::set<S3FanOutDnsEntry *>::iterator its3 = sharehandles_->begin();
721
2/2
✓ Branch 3 taken 39 times.
✓ Branch 4 taken 49 times.
88 for (; its3 != sharehandles_->end(); ++its3) {
722
1/2
✓ Branch 2 taken 39 times.
✗ Branch 3 not taken.
39 if ((*its3)->dns_name == remote_host) {
723
1/2
✓ Branch 1 taken 39 times.
✗ Branch 2 not taken.
39 if (usemin >= (*its3)->counter) {
724 39 usemin = (*its3)->counter;
725 39 useme = (*its3);
726 }
727 }
728 }
729
2/2
✓ Branch 0 taken 39 times.
✓ Branch 1 taken 10 times.
49 if (useme != NULL) {
730
1/2
✓ Branch 1 taken 39 times.
✗ Branch 2 not taken.
39 curl_sharehandles_->insert(
731 39 std::pair<CURL *, S3FanOutDnsEntry *>(handle, useme));
732 39 useme->counter++;
733
1/2
✓ Branch 1 taken 39 times.
✗ Branch 2 not taken.
39 InitializeDnsSettingsCurl(handle, useme->sharehandle, useme->clist);
734 39 return 0;
735 }
736
737 // We need to resolve the hostname
738 // TODO(ssheikki): support ipv6 also... if (opt_ipv4_only_)
739
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 const dns::Host host = resolver_->Resolve(remote_host);
740
1/2
✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
10 set<string> ipv4_addresses = host.ipv4_addresses();
741 10 std::set<string>::iterator its = ipv4_addresses.begin();
742 10 S3FanOutDnsEntry *dnse = NULL;
743
2/2
✓ Branch 3 taken 10 times.
✓ Branch 4 taken 10 times.
20 for (; its != ipv4_addresses.end(); ++its) {
744
2/4
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 10 times.
✗ Branch 5 not taken.
10 dnse = new S3FanOutDnsEntry();
745 10 dnse->counter = 0;
746
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 dnse->dns_name = remote_host;
747
4/12
✗ Branch 1 not taken.
✓ Branch 2 taken 10 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 8 taken 10 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 10 times.
✗ Branch 12 not taken.
✗ Branch 14 not taken.
✓ Branch 15 taken 10 times.
✗ Branch 18 not taken.
✗ Branch 19 not taken.
10 dnse->port = remote_port.size() == 0 ? "80" : remote_port;
748
1/2
✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
10 dnse->ip = *its;
749 10 dnse->clist = NULL;
750
1/2
✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
10 dnse->clist = curl_slist_append(
751 dnse->clist,
752
4/8
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 10 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 10 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 10 times.
✗ Branch 11 not taken.
20 (dnse->dns_name + ":" + dnse->port + ":" + dnse->ip).c_str());
753
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 dnse->sharehandle = curl_share_init();
754
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
10 assert(dnse->sharehandle != NULL);
755
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 const CURLSHcode share_retval = curl_share_setopt(
756 dnse->sharehandle, CURLSHOPT_SHARE, CURL_LOCK_DATA_DNS);
757
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
10 assert(share_retval == CURLSHE_OK);
758
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 sharehandles_->insert(dnse);
759 }
760
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
10 if (dnse == NULL) {
761 LogCvmfs(kLogS3Fanout, kLogStderr | kLogSyslogErr,
762 "Error: DNS resolve failed for address '%s'.",
763 remote_host.c_str());
764 assert(dnse != NULL);
765 return -1;
766 }
767
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 curl_sharehandles_->insert(
768 10 std::pair<CURL *, S3FanOutDnsEntry *>(handle, dnse));
769 10 dnse->counter++;
770
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 InitializeDnsSettingsCurl(handle, dnse->sharehandle, dnse->clist);
771
772 10 return 0;
773 49 }
774
775
776 1222 bool S3FanoutManager::MkPayloadHash(const JobInfo &info,
777 string *hex_hash) const {
778
2/2
✓ Branch 0 taken 1217 times.
✓ Branch 1 taken 5 times.
1222 if ((info.request == JobInfo::kReqHeadOnly)
779
2/2
✓ Branch 0 taken 609 times.
✓ Branch 1 taken 608 times.
1217 || (info.request == JobInfo::kReqHeadPut)
780
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 608 times.
609 || (info.request == JobInfo::kReqDelete)) {
781
1/4
✓ Branch 0 taken 614 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
614 switch (config_.authz_method) {
782 614 case kAuthzAwsV2:
783 614 hex_hash->clear();
784 614 break;
785 case kAuthzAwsV4:
786 // Sha256 over empty string
787 *hex_hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b78"
788 "52b855";
789 break;
790 case kAuthzAzure:
791 // no payload hash required for Azure signature
792 hex_hash->clear();
793 break;
794 default:
795 PANIC(NULL);
796 }
797 614 return true;
798 }
799
800 // PUT, there is actually payload
801
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 shash::Any payload_hash(shash::kMd5);
802
803 unsigned char *data;
804 608 const unsigned int nbytes = info.origin->Data(
805
2/4
✓ Branch 2 taken 608 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 608 times.
✗ Branch 6 not taken.
608 reinterpret_cast<void **>(&data), info.origin->GetSize(), 0);
806
2/4
✓ Branch 2 taken 608 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 608 times.
608 assert(nbytes == info.origin->GetSize());
807
808
1/4
✓ Branch 0 taken 608 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
608 switch (config_.authz_method) {
809 608 case kAuthzAwsV2:
810
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 shash::HashMem(data, nbytes, &payload_hash);
811
2/4
✓ Branch 2 taken 608 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 608 times.
✗ Branch 6 not taken.
1824 *hex_hash = Base64(string(reinterpret_cast<char *>(payload_hash.digest),
812 1216 payload_hash.GetDigestSize()));
813 608 return true;
814 case kAuthzAwsV4:
815 *hex_hash = shash::Sha256Mem(data, nbytes);
816 return true;
817 case kAuthzAzure:
818 // no payload hash required for Azure signature
819 hex_hash->clear();
820 return true;
821 default:
822 PANIC(NULL);
823 }
824 }
825
826 1223 string S3FanoutManager::GetRequestString(const JobInfo &info) const {
827
3/4
✓ Branch 0 taken 613 times.
✓ Branch 1 taken 608 times.
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
1223 switch (info.request) {
828 613 case JobInfo::kReqHeadOnly:
829 case JobInfo::kReqHeadPut:
830
1/2
✓ Branch 2 taken 613 times.
✗ Branch 3 not taken.
613 return "HEAD";
831 608 case JobInfo::kReqPutCas:
832 case JobInfo::kReqPutDotCvmfs:
833 case JobInfo::kReqPutHtml:
834 case JobInfo::kReqPutBucket:
835
1/2
✓ Branch 2 taken 608 times.
✗ Branch 3 not taken.
608 return "PUT";
836 2 case JobInfo::kReqDelete:
837
1/2
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
2 return "DELETE";
838 default:
839 PANIC(NULL);
840 }
841 }
842
843
844 1222 string S3FanoutManager::GetContentType(const JobInfo &info) const {
845
2/6
✓ Branch 0 taken 614 times.
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
1222 switch (info.request) {
846 614 case JobInfo::kReqHeadOnly:
847 case JobInfo::kReqHeadPut:
848 case JobInfo::kReqDelete:
849
1/2
✓ Branch 2 taken 614 times.
✗ Branch 3 not taken.
614 return "";
850 608 case JobInfo::kReqPutCas:
851
1/2
✓ Branch 2 taken 608 times.
✗ Branch 3 not taken.
608 return "application/octet-stream";
852 case JobInfo::kReqPutDotCvmfs:
853 return "application/x-cvmfs";
854 case JobInfo::kReqPutHtml:
855 return "text/html";
856 case JobInfo::kReqPutBucket:
857 return "text/xml";
858 default:
859 PANIC(NULL);
860 }
861 }
862
863
864 /**
865 * Request parameters set the URL and other options such as timeout and
866 * proxy.
867 */
868 1222 Failures S3FanoutManager::InitializeRequest(JobInfo *info, CURL *handle) const {
869 // Initialize internal download state
870 1222 info->curl_handle = handle;
871 1222 info->error_code = kFailOk;
872 1222 info->http_error = 0;
873 1222 info->num_retries = 0;
874 1222 info->backoff_ms = 0;
875 1222 info->throttle_ms = 0;
876 1222 info->throttle_timestamp = 0;
877 1222 info->http_headers = NULL;
878 // info->payload_size is needed in S3Uploader::MainCollectResults,
879 // where info->origin is already destroyed.
880
1/2
✓ Branch 2 taken 1222 times.
✗ Branch 3 not taken.
1222 info->payload_size = info->origin->GetSize();
881
882
2/4
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
1222 InitializeDnsSettings(handle, complete_hostname_);
883
884 CURLcode retval;
885
2/2
✓ Branch 0 taken 1217 times.
✓ Branch 1 taken 5 times.
1222 if ((info->request == JobInfo::kReqHeadOnly)
886
2/2
✓ Branch 0 taken 609 times.
✓ Branch 1 taken 608 times.
1217 || (info->request == JobInfo::kReqHeadPut)
887
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 608 times.
609 || (info->request == JobInfo::kReqDelete)) {
888
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 0);
889
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 614 times.
614 assert(retval == CURLE_OK);
890
1/2
✓ Branch 1 taken 614 times.
✗ Branch 2 not taken.
614 retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
891
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 614 times.
614 assert(retval == CURLE_OK);
892
893
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 613 times.
614 if (info->request == JobInfo::kReqDelete) {
894
2/4
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 1 times.
✗ Branch 6 not taken.
1 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST,
895 GetRequestString(*info).c_str());
896
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 assert(retval == CURLE_OK);
897 } else {
898
1/2
✓ Branch 1 taken 613 times.
✗ Branch 2 not taken.
613 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
899
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 613 times.
613 assert(retval == CURLE_OK);
900 }
901 } else {
902
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
903
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 608 times.
608 assert(retval == CURLE_OK);
904
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 1);
905
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 608 times.
608 assert(retval == CURLE_OK);
906
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 0);
907
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 608 times.
608 assert(retval == CURLE_OK);
908
2/4
✓ Branch 2 taken 608 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 608 times.
✗ Branch 6 not taken.
608 retval = curl_easy_setopt(handle, CURLOPT_INFILESIZE_LARGE,
909 static_cast<curl_off_t>(info->origin->GetSize()));
910
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 608 times.
608 assert(retval == CURLE_OK);
911
912
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 608 times.
608 if (info->request == JobInfo::kReqPutDotCvmfs) {
913 info->http_headers = curl_slist_append(
914 info->http_headers, dot_cvmfs_cache_control_header.c_str());
915
1/2
✓ Branch 0 taken 608 times.
✗ Branch 1 not taken.
608 } else if (info->request == JobInfo::kReqPutCas) {
916
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 info->http_headers = curl_slist_append(info->http_headers,
917 kCacheControlCas);
918 }
919 }
920
921 bool retval_b;
922
923 // Authorization
924 1222 vector<string> authz_headers;
925
1/4
✓ Branch 0 taken 1222 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
1222 switch (config_.authz_method) {
926 1222 case kAuthzAwsV2:
927
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 retval_b = MkV2Authz(*info, &authz_headers);
928 1222 break;
929 case kAuthzAwsV4:
930 retval_b = MkV4Authz(*info, &authz_headers);
931 break;
932 case kAuthzAzure:
933 retval_b = MkAzureAuthz(*info, &authz_headers);
934 break;
935 default:
936 PANIC(NULL);
937 }
938
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 if (!retval_b)
939 return kFailLocalIO;
940
2/2
✓ Branch 1 taken 4882 times.
✓ Branch 2 taken 1222 times.
6104 for (unsigned i = 0; i < authz_headers.size(); ++i) {
941
1/2
✓ Branch 2 taken 4882 times.
✗ Branch 3 not taken.
4882 info->http_headers = curl_slist_append(info->http_headers,
942 4882 authz_headers[i].c_str());
943 }
944
945 // Common headers
946
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 info->http_headers = curl_slist_append(info->http_headers,
947 "Connection: Keep-Alive");
948
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 info->http_headers = curl_slist_append(info->http_headers, "Pragma:");
949 // No 100-continue
950
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 info->http_headers = curl_slist_append(info->http_headers, "Expect:");
951 // Strip unnecessary header
952
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 info->http_headers = curl_slist_append(info->http_headers, "Accept:");
953
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 info->http_headers = curl_slist_append(info->http_headers,
954 1222 user_agent_->c_str());
955
956 // Set curl parameters
957
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 retval = curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
958
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 assert(retval == CURLE_OK);
959
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 retval = curl_easy_setopt(handle, CURLOPT_HEADERDATA,
960 static_cast<void *>(info));
961
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 assert(retval == CURLE_OK);
962
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 retval = curl_easy_setopt(handle, CURLOPT_READDATA,
963 static_cast<void *>(info));
964
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 assert(retval == CURLE_OK);
965
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 retval = curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->http_headers);
966
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 assert(retval == CURLE_OK);
967
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 if (opt_ipv4_only_) {
968 retval = curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
969 assert(retval == CURLE_OK);
970 }
971 // Follow HTTP redirects
972
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 retval = curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L);
973
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 assert(retval == CURLE_OK);
974
975
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 retval = curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, info->errorbuffer);
976
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 assert(retval == CURLE_OK);
977
978
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1222 times.
1222 if (config_.protocol == "https") {
979 retval = curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 1L);
980 assert(retval == CURLE_OK);
981 retval = curl_easy_setopt(handle, CURLOPT_PROXY_SSL_VERIFYPEER, 1L);
982 assert(retval == CURLE_OK);
983 const bool add_cert = ssl_certificate_store_.ApplySslCertificatePath(
984 handle);
985 assert(add_cert);
986 }
987
988 1222 return kFailOk;
989 1222 }
990
991
992 /**
993 * Sets the URL specific options such as host to use and timeout.
994 */
995 1222 void S3FanoutManager::SetUrlOptions(JobInfo *info) const {
996 1222 CURL *curl_handle = info->curl_handle;
997 CURLcode retval;
998
999
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 retval = curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT,
1000 config_.opt_timeout_sec);
1001
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 assert(retval == CURLE_OK);
1002
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT,
1003 kLowSpeedLimit);
1004
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 assert(retval == CURLE_OK);
1005
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME,
1006 config_.opt_timeout_sec);
1007
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 assert(retval == CURLE_OK);
1008
1009
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 if (is_curl_debug_) {
1010 retval = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE, 1);
1011 assert(retval == CURLE_OK);
1012 }
1013
1014
1/2
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
1222 const string url = MkUrl(info->object_key);
1015
1/2
✓ Branch 2 taken 1222 times.
✗ Branch 3 not taken.
1222 retval = curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
1016
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 assert(retval == CURLE_OK);
1017
1018
1/2
✓ Branch 2 taken 1222 times.
✗ Branch 3 not taken.
1222 retval = curl_easy_setopt(curl_handle, CURLOPT_PROXY, config_.proxy.c_str());
1019
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 assert(retval == CURLE_OK);
1020 1222 }
1021
1022
1023 /**
1024 * Adds transfer time and uploaded bytes to the global counters.
1025 */
1026 1226 void S3FanoutManager::UpdateStatistics(CURL *handle) {
1027 double val;
1028
1029
2/4
✓ Branch 1 taken 1226 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1226 times.
✗ Branch 4 not taken.
1226 if (curl_easy_getinfo(handle, CURLINFO_SIZE_UPLOAD, &val) == CURLE_OK)
1030 1226 statistics_->transferred_bytes += val;
1031 1226 }
1032
1033
1034 /**
1035 * Retry if possible and if not already done too often.
1036 */
1037 6 bool S3FanoutManager::CanRetry(const JobInfo *info) {
1038 6 return (info->error_code == kFailHostConnection
1039
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 || info->error_code == kFailHostResolve
1040
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 || info->error_code == kFailServiceUnavailable
1041
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 2 times.
6 || info->error_code == kFailRetry)
1042
2/4
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
12 && (info->num_retries < config_.opt_max_retries);
1043 }
1044
1045
1046 /**
1047 * Backoff for retry to introduce a jitter into a upload sequence.
1048 *
1049 * \return true if backoff has been performed, false otherwise
1050 */
1051 4 void S3FanoutManager::Backoff(JobInfo *info) {
1052
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
4 if (info->error_code != kFailRetry)
1053 info->num_retries++;
1054 4 statistics_->num_retries++;
1055
1056
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 if (info->throttle_ms > 0) {
1057 4 LogCvmfs(kLogS3Fanout, kLogDebug, "throttling for %d ms",
1058 info->throttle_ms);
1059 4 const uint64_t now = platform_monotonic_time();
1060
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 if ((info->throttle_timestamp + (info->throttle_ms / 1000)) >= now) {
1061
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 3 times.
4 if ((now - timestamp_last_throttle_report_)
1062 > kThrottleReportIntervalSec) {
1063 1 LogCvmfs(kLogS3Fanout, kLogStdout,
1064 "Warning: S3 backend throttling %ums "
1065 "(total backoff time so far %lums)",
1066 1 info->throttle_ms, statistics_->ms_throttled);
1067 1 timestamp_last_throttle_report_ = now;
1068 }
1069 4 statistics_->ms_throttled += info->throttle_ms;
1070 4 SafeSleepMs(info->throttle_ms);
1071 }
1072 } else {
1073 if (info->backoff_ms == 0) {
1074 // Must be != 0
1075 info->backoff_ms = prng_.Next(config_.opt_backoff_init_ms + 1);
1076 } else {
1077 info->backoff_ms *= 2;
1078 }
1079 if (info->backoff_ms > config_.opt_backoff_max_ms)
1080 info->backoff_ms = config_.opt_backoff_max_ms;
1081
1082 LogCvmfs(kLogS3Fanout, kLogDebug, "backing off for %d ms",
1083 info->backoff_ms);
1084 SafeSleepMs(info->backoff_ms);
1085 }
1086 4 }
1087
1088
1089 /**
1090 * Checks the result of a curl request and implements the failure logic
1091 * and takes care of cleanup.
1092 *
1093 * @return true if request should be repeated, false otherwise
1094 */
1095 1226 bool S3FanoutManager::VerifyAndFinalize(const int curl_error, JobInfo *info) {
1096 1226 LogCvmfs(kLogS3Fanout, kLogDebug,
1097 "Verify uploaded/tested object %s "
1098 "(curl error %d, info error %d, info request %d)",
1099 1226 info->object_key.c_str(), curl_error, info->error_code,
1100 1226 info->request);
1101 1226 UpdateStatistics(info->curl_handle);
1102
1103 // Verification and error classification
1104
1/6
✓ Branch 0 taken 1226 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
1226 switch (curl_error) {
1105 1226 case CURLE_OK:
1106
2/2
✓ Branch 0 taken 1222 times.
✓ Branch 1 taken 4 times.
1226 if ((info->error_code != kFailRetry)
1107
2/2
✓ Branch 0 taken 612 times.
✓ Branch 1 taken 610 times.
1222 && (info->error_code != kFailNotFound)) {
1108 612 info->error_code = kFailOk;
1109 }
1110 1226 break;
1111 case CURLE_UNSUPPORTED_PROTOCOL:
1112 case CURLE_URL_MALFORMAT:
1113 info->error_code = kFailBadRequest;
1114 break;
1115 case CURLE_COULDNT_RESOLVE_HOST:
1116 info->error_code = kFailHostResolve;
1117 break;
1118 case CURLE_COULDNT_CONNECT:
1119 case CURLE_OPERATION_TIMEDOUT:
1120 case CURLE_SEND_ERROR:
1121 case CURLE_RECV_ERROR:
1122 info->error_code = kFailHostConnection;
1123 break;
1124 case CURLE_ABORTED_BY_CALLBACK:
1125 case CURLE_WRITE_ERROR:
1126 // Error set by callback
1127 break;
1128 default:
1129 LogCvmfs(kLogS3Fanout, kLogStderr | kLogSyslogErr,
1130 "unexpected curl error (%d) while trying to upload %s: %s",
1131 curl_error, info->object_key.c_str(), info->errorbuffer);
1132 info->error_code = kFailOther;
1133 break;
1134 }
1135
1136 // Transform HEAD to PUT request
1137
2/2
✓ Branch 0 taken 610 times.
✓ Branch 1 taken 616 times.
1226 if ((info->error_code == kFailNotFound)
1138
2/2
✓ Branch 0 taken 608 times.
✓ Branch 1 taken 2 times.
610 && (info->request == JobInfo::kReqHeadPut)) {
1139 608 LogCvmfs(kLogS3Fanout, kLogDebug, "not found: %s, uploading",
1140 info->object_key.c_str());
1141 608 info->request = JobInfo::kReqPutCas;
1142 608 curl_slist_free_all(info->http_headers);
1143 608 info->http_headers = NULL;
1144 608 const s3fanout::Failures init_failure = InitializeRequest(
1145 info, info->curl_handle);
1146
1147
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 608 times.
608 if (init_failure != s3fanout::kFailOk) {
1148 PANIC(kLogStderr,
1149 "Failed to initialize CURL handle "
1150 "(error: %d - %s | errno: %d)",
1151 init_failure, Code2Ascii(init_failure), errno);
1152 }
1153 608 SetUrlOptions(info);
1154 // Reset origin
1155 608 info->origin->Rewind();
1156 608 return true; // Again, Put
1157 }
1158
1159 // Determination if failed request should be repeated
1160 618 bool try_again = false;
1161
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 612 times.
618 if (info->error_code != kFailOk) {
1162 6 try_again = CanRetry(info);
1163 }
1164
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 614 times.
618 if (try_again) {
1165
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 if (info->request == JobInfo::kReqPutCas
1166
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 || info->request == JobInfo::kReqPutDotCvmfs
1167
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
4 || info->request == JobInfo::kReqPutHtml) {
1168 LogCvmfs(kLogS3Fanout, kLogDebug, "Trying again to upload %s",
1169 info->object_key.c_str());
1170 // Reset origin
1171 info->origin->Rewind();
1172 }
1173 4 Backoff(info);
1174 4 info->error_code = kFailOk;
1175 4 info->http_error = 0;
1176 4 info->throttle_ms = 0;
1177 4 info->backoff_ms = 0;
1178 4 info->throttle_timestamp = 0;
1179 4 return true; // try again
1180 }
1181
1182 // Cleanup opened resources
1183 614 info->origin.Destroy();
1184
1185
3/4
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 612 times.
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
614 if ((info->error_code != kFailOk) && (info->http_error != 0)
1186
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 && (info->http_error != 404)) {
1187 LogCvmfs(kLogS3Fanout, kLogStderr, "S3: HTTP failure %d", info->http_error);
1188 }
1189 614 return false; // stop transfer
1190 }
1191
1192
2/4
✓ Branch 3 taken 12 times.
✗ Branch 4 not taken.
✓ Branch 9 taken 12 times.
✗ Branch 10 not taken.
12 S3FanoutManager::S3FanoutManager(const S3Config &config) : config_(config) {
1193 12 atomic_init32(&multi_threaded_);
1194
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 MakePipe(pipe_terminate_);
1195
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 MakePipe(pipe_jobs_);
1196
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 MakePipe(pipe_completed_);
1197
1198 int retval;
1199 12 jobs_todo_lock_ = reinterpret_cast<pthread_mutex_t *>(
1200 12 smalloc(sizeof(pthread_mutex_t)));
1201 12 retval = pthread_mutex_init(jobs_todo_lock_, NULL);
1202
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
12 assert(retval == 0);
1203 12 curl_handle_lock_ = reinterpret_cast<pthread_mutex_t *>(
1204 12 smalloc(sizeof(pthread_mutex_t)));
1205 12 retval = pthread_mutex_init(curl_handle_lock_, NULL);
1206
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
12 assert(retval == 0);
1207
1208
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 active_requests_ = new set<JobInfo *>;
1209
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 pool_handles_idle_ = new set<CURL *>;
1210
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 pool_handles_inuse_ = new set<CURL *>;
1211
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 curl_sharehandles_ = new map<CURL *, S3FanOutDnsEntry *>;
1212
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 sharehandles_ = new set<S3FanOutDnsEntry *>;
1213 12 watch_fds_max_ = 4 * config_.pool_max_handles;
1214 12 max_available_jobs_ = 4 * config_.pool_max_handles;
1215
2/4
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 12 times.
✗ Branch 5 not taken.
12 available_jobs_ = new Semaphore(max_available_jobs_);
1216
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
12 assert(NULL != available_jobs_);
1217
1218
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 statistics_ = new Statistics();
1219
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 user_agent_ = new string();
1220
2/4
✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12 times.
✗ Branch 6 not taken.
12 *user_agent_ = "User-Agent: cvmfs " + string(CVMFS_VERSION);
1221
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 complete_hostname_ = MkCompleteHostname();
1222
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 dot_cvmfs_cache_control_header = MkDotCvmfsCacheControlHeader();
1223
1224
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 const CURLcode cretval = curl_global_init(CURL_GLOBAL_ALL);
1225
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
12 assert(cretval == CURLE_OK);
1226
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 curl_multi_ = curl_multi_init();
1227
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
12 assert(curl_multi_ != NULL);
1228 CURLMcode mretval;
1229
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION,
1230 CallbackCurlSocket);
1231
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
12 assert(mretval == CURLM_OK);
1232
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1233 static_cast<void *>(this));
1234
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
12 assert(mretval == CURLM_OK);
1235
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 mretval = curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1236 config_.pool_max_handles);
1237
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
12 assert(mretval == CURLM_OK);
1238
1239 12 prng_.InitLocaltime();
1240
1241 12 thread_upload_ = 0;
1242 12 timestamp_last_throttle_report_ = 0;
1243 12 is_curl_debug_ = (getenv("_CVMFS_CURL_DEBUG") != NULL);
1244
1245 // Parsing environment variables
1246 12 if ((getenv("CVMFS_IPV4_ONLY") != NULL)
1247
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 12 times.
12 && (strlen(getenv("CVMFS_IPV4_ONLY")) > 0)) {
1248 opt_ipv4_only_ = true;
1249 } else {
1250 12 opt_ipv4_only_ = false;
1251 }
1252
1253
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 resolver_ = dns::CaresResolver::Create(opt_ipv4_only_, 2, 2000);
1254
1255 12 watch_fds_ = static_cast<struct pollfd *>(smalloc(4 * sizeof(struct pollfd)));
1256 12 watch_fds_size_ = 4;
1257 12 watch_fds_inuse_ = 0;
1258
1259
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 ssl_certificate_store_.UseSystemCertificatePath();
1260 12 }
1261
1262 24 S3FanoutManager::~S3FanoutManager() {
1263 12 pthread_mutex_destroy(jobs_todo_lock_);
1264 12 free(jobs_todo_lock_);
1265 12 pthread_mutex_destroy(curl_handle_lock_);
1266 12 free(curl_handle_lock_);
1267
1268
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1269 // Shutdown I/O thread
1270 12 char buf = 'T';
1271 12 WritePipe(pipe_terminate_[1], &buf, 1);
1272 12 pthread_join(thread_upload_, NULL);
1273 }
1274 12 ClosePipe(pipe_terminate_);
1275 12 ClosePipe(pipe_jobs_);
1276 12 ClosePipe(pipe_completed_);
1277
1278 12 set<CURL *>::iterator i = pool_handles_idle_->begin();
1279 12 const set<CURL *>::const_iterator iEnd = pool_handles_idle_->end();
1280
2/2
✓ Branch 2 taken 20 times.
✓ Branch 3 taken 12 times.
32 for (; i != iEnd; ++i) {
1281 20 curl_easy_cleanup(*i);
1282 }
1283
1284 12 set<S3FanOutDnsEntry *>::iterator is = sharehandles_->begin();
1285 12 const set<S3FanOutDnsEntry *>::const_iterator isEnd = sharehandles_->end();
1286
2/2
✓ Branch 2 taken 10 times.
✓ Branch 3 taken 12 times.
22 for (; is != isEnd; ++is) {
1287 10 curl_share_cleanup((*is)->sharehandle);
1288 10 curl_slist_free_all((*is)->clist);
1289
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
10 delete *is;
1290 }
1291 12 pool_handles_idle_->clear();
1292 12 curl_sharehandles_->clear();
1293 12 sharehandles_->clear();
1294
1/2
✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
12 delete active_requests_;
1295
1/2
✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
12 delete pool_handles_idle_;
1296
1/2
✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
12 delete pool_handles_inuse_;
1297
1/2
✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
12 delete curl_sharehandles_;
1298
1/2
✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
12 delete sharehandles_;
1299
1/2
✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
12 delete user_agent_;
1300 12 curl_multi_cleanup(curl_multi_);
1301
1302
1/2
✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
12 delete statistics_;
1303
1304
1/2
✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
12 delete available_jobs_;
1305
1306 12 curl_global_cleanup();
1307 12 }
1308
1309 /**
1310 * Spawns the I/O worker thread. No way back except ~S3FanoutManager.
1311 */
1312 12 void S3FanoutManager::Spawn() {
1313 12 LogCvmfs(kLogS3Fanout, kLogDebug, "S3FanoutManager spawned");
1314
1315 12 const int retval = pthread_create(&thread_upload_, NULL, MainUpload,
1316 static_cast<void *>(this));
1317
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
12 assert(retval == 0);
1318
1319 12 atomic_inc32(&multi_threaded_);
1320 12 }
1321
1322 3 const Statistics &S3FanoutManager::GetStatistics() { return *statistics_; }
1323
1324 /**
1325 * Push new job to be uploaded to the S3 cloud storage.
1326 */
1327 614 void S3FanoutManager::PushNewJob(JobInfo *info) {
1328 614 available_jobs_->Increment();
1329 614 WritePipe(pipe_jobs_[1], &info, sizeof(info));
1330 614 }
1331
1332 /**
1333 * Push completed job to list of completed jobs
1334 */
1335 626 void S3FanoutManager::PushCompletedJob(JobInfo *info) {
1336 626 WritePipe(pipe_completed_[1], &info, sizeof(info));
1337 626 }
1338
1339 /**
1340 * Pop completed job
1341 */
1342 626 JobInfo *S3FanoutManager::PopCompletedJob() {
1343 JobInfo *info;
1344
1/2
✓ Branch 1 taken 626 times.
✗ Branch 2 not taken.
626 ReadPipe(pipe_completed_[0], &info, sizeof(info));
1345 626 return info;
1346 }
1347
1348 //------------------------------------------------------------------------------
1349
1350
1351 string Statistics::Print() const {
1352 return "Transferred Bytes: " + StringifyInt(uint64_t(transferred_bytes))
1353 + "\n" + "Transfer duration: " + StringifyInt(uint64_t(transfer_time))
1354 + " s\n" + "Number of requests: " + StringifyInt(num_requests) + "\n"
1355 + "Number of retries: " + StringifyInt(num_retries) + "\n";
1356 }
1357
1358 } // namespace s3fanout
1359