GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/network/s3fanout.cc
Date: 2025-07-21 10:50:29
Exec Total Coverage
Lines: 560 789 71.0%
Branches: 403 1149 35.1%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 *
4 * Runs a thread using libcurls asynchronous I/O mode to push data to S3
5 */
6
7 #include "s3fanout.h"
8
9 #include <pthread.h>
10
11 #include <algorithm>
12 #include <cassert>
13 #include <cerrno>
14 #include <utility>
15
16 #include "upload_facility.h"
17 #include "util/concurrency.h"
18 #include "util/exception.h"
19 #include "util/platform.h"
20 #include "util/posix.h"
21 #include "util/string.h"
22
23 using namespace std; // NOLINT
24
25 namespace s3fanout {
26
27 const char *S3FanoutManager::kCacheControlCas = "Cache-Control: max-age=259200";
28 const char
29 *S3FanoutManager::kCacheControlDotCvmfs = "Cache-Control: max-age=61";
30 const unsigned S3FanoutManager::kDefault429ThrottleMs = 250;
31 const unsigned S3FanoutManager::kMax429ThrottleMs = 10000;
32 const unsigned S3FanoutManager::kThrottleReportIntervalSec = 10;
33 const unsigned S3FanoutManager::kDefaultHTTPPort = 80;
34 const unsigned S3FanoutManager::kDefaultHTTPSPort = 443;
35
36
37 /**
38 * Parses Retry-After and X-Retry-In headers attached to HTTP 429 responses
39 */
40 696 void S3FanoutManager::DetectThrottleIndicator(const std::string &header,
41 JobInfo *info) {
42 696 std::string value_str;
43
4/6
✓ Branch 2 taken 696 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 696 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 302 times.
✓ Branch 10 taken 394 times.
696 if (HasPrefix(header, "retry-after:", true))
44
1/2
✓ Branch 1 taken 302 times.
✗ Branch 2 not taken.
302 value_str = header.substr(12);
45
4/6
✓ Branch 2 taken 696 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 696 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 150 times.
✓ Branch 10 taken 546 times.
696 if (HasPrefix(header, "x-retry-in:", true))
46
1/2
✓ Branch 1 taken 150 times.
✗ Branch 2 not taken.
150 value_str = header.substr(11);
47
48
1/2
✓ Branch 1 taken 696 times.
✗ Branch 2 not taken.
696 value_str = Trim(value_str, true /* trim_newline */);
49
2/2
✓ Branch 1 taken 392 times.
✓ Branch 2 taken 304 times.
696 if (!value_str.empty()) {
50
1/2
✓ Branch 1 taken 392 times.
✗ Branch 2 not taken.
392 const unsigned value_numeric = String2Uint64(value_str);
51
2/4
✓ Branch 2 taken 392 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 392 times.
✗ Branch 6 not taken.
784 const unsigned value_ms = HasSuffix(value_str, "ms", true /* ignore_case */)
52
2/2
✓ Branch 0 taken 150 times.
✓ Branch 1 taken 242 times.
392 ? value_numeric
53 392 : (value_numeric * 1000);
54
2/2
✓ Branch 0 taken 362 times.
✓ Branch 1 taken 30 times.
392 if (value_ms > 0)
55 362 info->throttle_ms = std::min(value_ms, kMax429ThrottleMs);
56 }
57 696 }
58
59
60 /**
61 * Called by curl for every HTTP header. Not called for file:// transfers.
62 */
63 84686 static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb,
64 void *info_link) {
65 84686 const size_t num_bytes = size * nmemb;
66
1/2
✓ Branch 2 taken 84686 times.
✗ Branch 3 not taken.
84686 const string header_line(static_cast<const char *>(ptr), num_bytes);
67 84686 JobInfo *info = static_cast<JobInfo *>(info_link);
68
69 // Check for http status code errors
70
4/6
✓ Branch 2 taken 84686 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 84686 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 28198 times.
✓ Branch 10 taken 56488 times.
84686 if (HasPrefix(header_line, "HTTP/1.", false)) {
71
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 28198 times.
28198 if (header_line.length() < 10)
72 return 0;
73
74 unsigned i;
75
5/6
✓ Branch 1 taken 56396 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 28198 times.
✓ Branch 5 taken 28198 times.
✓ Branch 6 taken 28198 times.
✓ Branch 7 taken 28198 times.
56396 for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {
76 }
77
78
2/2
✓ Branch 1 taken 14076 times.
✓ Branch 2 taken 14122 times.
28198 if (header_line[i] == '2') {
79 14076 return num_bytes;
80 } else {
81
1/2
✓ Branch 2 taken 14122 times.
✗ Branch 3 not taken.
14122 LogCvmfs(kLogS3Fanout, kLogDebug, "http status error code [info %p]: %s",
82 info, header_line.c_str());
83
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 14122 times.
14122 if (header_line.length() < i + 3) {
84 LogCvmfs(kLogS3Fanout, kLogStderr, "S3: invalid HTTP response '%s'",
85 header_line.c_str());
86 info->error_code = kFailOther;
87 return 0;
88 }
89
2/4
✓ Branch 3 taken 14122 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 14122 times.
✗ Branch 7 not taken.
14122 info->http_error = String2Int64(string(&header_line[i], 3));
90
91
2/6
✓ Branch 0 taken 92 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 14030 times.
✗ Branch 5 not taken.
14122 switch (info->http_error) {
92 92 case 429:
93 92 info->error_code = kFailRetry;
94 92 info->throttle_ms = S3FanoutManager::kDefault429ThrottleMs;
95 92 info->throttle_timestamp = platform_monotonic_time();
96 92 return num_bytes;
97 case 503:
98 case 502: // Can happen if the S3 gateway-backend connection breaks
99 case 500: // sometimes see this as a transient error from S3
100 info->error_code = kFailServiceUnavailable;
101 break;
102 case 501:
103 case 400:
104 info->error_code = kFailBadRequest;
105 break;
106 case 403:
107 info->error_code = kFailForbidden;
108 break;
109 14030 case 404:
110 14030 info->error_code = kFailNotFound;
111 14030 return num_bytes;
112 default:
113 info->error_code = kFailOther;
114 }
115 return 0;
116 }
117 }
118
119
2/2
✓ Branch 0 taken 276 times.
✓ Branch 1 taken 56212 times.
56488 if (info->error_code == kFailRetry) {
120
1/2
✓ Branch 1 taken 276 times.
✗ Branch 2 not taken.
276 S3FanoutManager::DetectThrottleIndicator(header_line, info);
121 }
122
123 56488 return num_bytes;
124 84686 }
125
126
127 /**
128 * Called by curl for every new chunk to upload.
129 */
130 360387 static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb,
131 void *info_link) {
132 360387 const size_t num_bytes = size * nmemb;
133 360387 JobInfo *info = static_cast<JobInfo *>(info_link);
134
135 360387 LogCvmfs(kLogS3Fanout, kLogDebug, "Data callback with %zu bytes", num_bytes);
136
137
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 360387 times.
360387 if (num_bytes == 0)
138 return 0;
139
140 360387 const uint64_t read_bytes = info->origin->Read(ptr, num_bytes);
141
142 360387 LogCvmfs(kLogS3Fanout, kLogDebug, "source buffer pushed out %lu bytes",
143 read_bytes);
144
145 360387 return read_bytes;
146 }
147
148
149 /**
150 * For the time being, ignore all received information in the HTTP body
151 */
152 static size_t CallbackCurlBody(char * /*ptr*/, size_t size, size_t nmemb,
153 void * /*userdata*/) {
154 return size * nmemb;
155 }
156
157
158 /**
159 * Called when new curl sockets arrive or existing curl sockets depart.
160 */
161 62445 int S3FanoutManager::CallbackCurlSocket(CURL *easy, curl_socket_t s, int action,
162 void *userp, void *socketp) {
163 62445 S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(userp);
164 62445 LogCvmfs(kLogS3Fanout, kLogDebug,
165 "CallbackCurlSocket called with easy "
166 "handle %p, socket %d, action %d, up %p, "
167 "sp %p, fds_inuse %d, jobs %d",
168 easy, s, action, userp, socketp, s3fanout_mgr->watch_fds_inuse_,
169 62445 s3fanout_mgr->available_jobs_->Get());
170
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 62445 times.
62445 if (action == CURL_POLL_NONE)
171 return 0;
172
173 // Find s in watch_fds_
174 // First 2 fds are job and terminate pipes (not curl related)
175 unsigned index;
176
2/2
✓ Branch 0 taken 98302 times.
✓ Branch 1 taken 28152 times.
126454 for (index = 2; index < s3fanout_mgr->watch_fds_inuse_; ++index) {
177
2/2
✓ Branch 0 taken 34293 times.
✓ Branch 1 taken 64009 times.
98302 if (s3fanout_mgr->watch_fds_[index].fd == s)
178 34293 break;
179 }
180 // Or create newly
181
2/2
✓ Branch 0 taken 28152 times.
✓ Branch 1 taken 34293 times.
62445 if (index == s3fanout_mgr->watch_fds_inuse_) {
182 // Extend array if necessary
183
2/2
✓ Branch 0 taken 46 times.
✓ Branch 1 taken 28106 times.
28152 if (s3fanout_mgr->watch_fds_inuse_ == s3fanout_mgr->watch_fds_size_) {
184 46 s3fanout_mgr->watch_fds_size_ *= 2;
185 46 s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
186 46 srealloc(s3fanout_mgr->watch_fds_,
187 46 s3fanout_mgr->watch_fds_size_ * sizeof(struct pollfd)));
188 }
189 28152 s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].fd = s;
190 28152 s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].events = 0;
191 28152 s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].revents = 0;
192 28152 s3fanout_mgr->watch_fds_inuse_++;
193 }
194
195
4/5
✓ Branch 0 taken 28152 times.
✓ Branch 1 taken 69 times.
✓ Branch 2 taken 6072 times.
✓ Branch 3 taken 28152 times.
✗ Branch 4 not taken.
62445 switch (action) {
196 28152 case CURL_POLL_IN:
197 28152 s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI;
198 28152 break;
199 69 case CURL_POLL_OUT:
200 69 s3fanout_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND;
201 69 break;
202 6072 case CURL_POLL_INOUT:
203 6072 s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI | POLLOUT
204 | POLLWRBAND;
205 6072 break;
206 28152 case CURL_POLL_REMOVE:
207
2/2
✓ Branch 0 taken 4485 times.
✓ Branch 1 taken 23667 times.
28152 if (index < s3fanout_mgr->watch_fds_inuse_ - 1)
208 s3fanout_mgr
209 4485 ->watch_fds_[index] = s3fanout_mgr->watch_fds_
210 4485 [s3fanout_mgr->watch_fds_inuse_ - 1];
211 28152 s3fanout_mgr->watch_fds_inuse_--;
212 // Shrink array if necessary
213
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 28152 times.
28152 if ((s3fanout_mgr->watch_fds_inuse_ > s3fanout_mgr->watch_fds_max_)
214 && (s3fanout_mgr->watch_fds_inuse_
215 < s3fanout_mgr->watch_fds_size_ / 2)) {
216 s3fanout_mgr->watch_fds_size_ /= 2;
217 s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
218 srealloc(s3fanout_mgr->watch_fds_,
219 s3fanout_mgr->watch_fds_size_ * sizeof(struct pollfd)));
220 }
221 28152 break;
222 default:
223 PANIC(NULL);
224 }
225
226 62445 return 0;
227 }
228
229
230 /**
231 * Worker thread event loop.
232 */
233 276 void *S3FanoutManager::MainUpload(void *data) {
234
1/2
✓ Branch 1 taken 276 times.
✗ Branch 2 not taken.
276 LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread started");
235 276 S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(data);
236
237 276 s3fanout_mgr->InitPipeWatchFds();
238
239 // Don't schedule more jobs into the multi handle than the maximum number of
240 // parallel connections. This should prevent starvation and thus a timeout
241 // of the authorization header (CVM-1339).
242 276 unsigned jobs_in_flight = 0;
243
244 while (true) {
245 // Check events with 100ms timeout
246 384629 const int timeout_ms = 100;
247
1/2
✓ Branch 1 taken 384629 times.
✗ Branch 2 not taken.
384629 int retval = poll(s3fanout_mgr->watch_fds_, s3fanout_mgr->watch_fds_inuse_,
248 timeout_ms);
249
2/2
✓ Branch 0 taken 2139 times.
✓ Branch 1 taken 382490 times.
384629 if (retval == 0) {
250 // Handle timeout
251 2139 int still_running = 0;
252
1/2
✓ Branch 1 taken 2139 times.
✗ Branch 2 not taken.
2139 retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
253 CURL_SOCKET_TIMEOUT, 0, &still_running);
254
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2139 times.
2139 if (retval != CURLM_OK) {
255 LogCvmfs(kLogS3Fanout, kLogStderr, "Error, timeout due to: %d", retval);
256 assert(retval == CURLM_OK);
257 }
258
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 382490 times.
382490 } else if (retval < 0) {
259 assert(errno == EINTR);
260 continue;
261 }
262
263 // Terminate I/O thread
264
2/2
✓ Branch 0 taken 276 times.
✓ Branch 1 taken 384353 times.
384629 if (s3fanout_mgr->watch_fds_[0].revents)
265 276 break;
266
267 // New job incoming
268
2/2
✓ Branch 0 taken 14122 times.
✓ Branch 1 taken 370231 times.
384353 if (s3fanout_mgr->watch_fds_[1].revents) {
269 14122 s3fanout_mgr->watch_fds_[1].revents = 0;
270 JobInfo *info;
271
1/2
✓ Branch 1 taken 14122 times.
✗ Branch 2 not taken.
14122 ReadPipe(s3fanout_mgr->pipe_jobs_[0], &info, sizeof(info));
272
1/2
✓ Branch 1 taken 14122 times.
✗ Branch 2 not taken.
14122 CURL *handle = s3fanout_mgr->AcquireCurlHandle();
273
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 14122 times.
14122 if (handle == NULL) {
274 PANIC(kLogStderr, "Failed to acquire CURL handle.");
275 }
276
1/2
✓ Branch 1 taken 14122 times.
✗ Branch 2 not taken.
14122 const s3fanout::Failures init_failure = s3fanout_mgr->InitializeRequest(
277 info, handle);
278
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 14122 times.
14122 if (init_failure != s3fanout::kFailOk) {
279 PANIC(kLogStderr,
280 "Failed to initialize CURL handle (error: %d - %s | errno: %d)",
281 init_failure, Code2Ascii(init_failure), errno);
282 }
283
1/2
✓ Branch 1 taken 14122 times.
✗ Branch 2 not taken.
14122 s3fanout_mgr->SetUrlOptions(info);
284
285
1/2
✓ Branch 1 taken 14122 times.
✗ Branch 2 not taken.
14122 curl_multi_add_handle(s3fanout_mgr->curl_multi_, handle);
286
1/2
✓ Branch 1 taken 14122 times.
✗ Branch 2 not taken.
14122 s3fanout_mgr->active_requests_->insert(info);
287 14122 jobs_in_flight++;
288 14122 int still_running = 0, retval = 0;
289
1/2
✓ Branch 1 taken 14122 times.
✗ Branch 2 not taken.
14122 retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
290 CURL_SOCKET_TIMEOUT, 0, &still_running);
291
292
1/2
✓ Branch 1 taken 14122 times.
✗ Branch 2 not taken.
14122 LogCvmfs(kLogS3Fanout, kLogDebug, "curl_multi_socket_action: %d - %d",
293 retval, still_running);
294 }
295
296
297 // Activity on curl sockets
298 // Within this loop the curl_multi_socket_action() may cause socket(s)
299 // to be removed from watch_fds_. If a socket is removed it is replaced
300 // by the socket at the end of the array and the inuse count is decreased.
301 // Therefore loop over the array in reverse order.
302 // First 2 fds are job and terminate pipes (not curl related)
303
2/2
✓ Branch 0 taken 862592 times.
✓ Branch 1 taken 384353 times.
1246945 for (int32_t i = s3fanout_mgr->watch_fds_inuse_ - 1; i >= 2; --i) {
304
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 862592 times.
862592 if (static_cast<uint32_t>(i) >= s3fanout_mgr->watch_fds_inuse_) {
305 continue;
306 }
307
2/2
✓ Branch 0 taken 392472 times.
✓ Branch 1 taken 470120 times.
862592 if (s3fanout_mgr->watch_fds_[i].revents) {
308 392472 int ev_bitmask = 0;
309
2/2
✓ Branch 0 taken 42136 times.
✓ Branch 1 taken 350336 times.
392472 if (s3fanout_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI))
310 42136 ev_bitmask |= CURL_CSELECT_IN;
311
2/2
✓ Branch 0 taken 350336 times.
✓ Branch 1 taken 42136 times.
392472 if (s3fanout_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
312 350336 ev_bitmask |= CURL_CSELECT_OUT;
313 392472 if (s3fanout_mgr->watch_fds_[i].revents
314
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 392472 times.
392472 & (POLLERR | POLLHUP | POLLNVAL))
315 ev_bitmask |= CURL_CSELECT_ERR;
316 392472 s3fanout_mgr->watch_fds_[i].revents = 0;
317
318 392472 int still_running = 0;
319 392472 retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
320
1/2
✓ Branch 1 taken 392472 times.
✗ Branch 2 not taken.
392472 s3fanout_mgr->watch_fds_[i].fd,
321 ev_bitmask,
322 &still_running);
323 }
324 }
325
326 // Check if transfers are completed
327 CURLMsg *curl_msg;
328 int msgs_in_queue;
329
3/4
✓ Branch 1 taken 412551 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 28198 times.
✓ Branch 4 taken 384353 times.
412551 while ((curl_msg = curl_multi_info_read(s3fanout_mgr->curl_multi_,
330 &msgs_in_queue))) {
331
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 28198 times.
28198 assert(curl_msg->msg == CURLMSG_DONE);
332
333 28198 s3fanout_mgr->statistics_->num_requests++;
334 JobInfo *info;
335 28198 CURL *easy_handle = curl_msg->easy_handle;
336 28198 const int curl_error = curl_msg->data.result;
337
1/2
✓ Branch 1 taken 28198 times.
✗ Branch 2 not taken.
28198 curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
338
339
1/2
✓ Branch 1 taken 28198 times.
✗ Branch 2 not taken.
28198 curl_multi_remove_handle(s3fanout_mgr->curl_multi_, easy_handle);
340
3/4
✓ Branch 1 taken 28198 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 14076 times.
✓ Branch 4 taken 14122 times.
28198 if (s3fanout_mgr->VerifyAndFinalize(curl_error, info)) {
341
1/2
✓ Branch 1 taken 14076 times.
✗ Branch 2 not taken.
14076 curl_multi_add_handle(s3fanout_mgr->curl_multi_, easy_handle);
342 14076 int still_running = 0;
343
1/2
✓ Branch 1 taken 14076 times.
✗ Branch 2 not taken.
14076 curl_multi_socket_action(s3fanout_mgr->curl_multi_, CURL_SOCKET_TIMEOUT,
344 0, &still_running);
345 } else {
346 // Return easy handle into pool and write result back
347 14122 jobs_in_flight--;
348
1/2
✓ Branch 1 taken 14122 times.
✗ Branch 2 not taken.
14122 s3fanout_mgr->active_requests_->erase(info);
349
1/2
✓ Branch 1 taken 14122 times.
✗ Branch 2 not taken.
14122 s3fanout_mgr->ReleaseCurlHandle(info, easy_handle);
350
1/2
✓ Branch 1 taken 14122 times.
✗ Branch 2 not taken.
14122 s3fanout_mgr->available_jobs_->Decrement();
351
352 // Add to list of completed jobs
353
1/2
✓ Branch 1 taken 14122 times.
✗ Branch 2 not taken.
14122 s3fanout_mgr->PushCompletedJob(info);
354 }
355 }
356 384353 }
357
358 276 set<CURL *>::iterator i = s3fanout_mgr->pool_handles_inuse_->begin();
359 276 const set<CURL *>::const_iterator i_end = s3fanout_mgr->pool_handles_inuse_
360 276 ->end();
361
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 276 times.
276 for (; i != i_end; ++i) {
362 curl_multi_remove_handle(s3fanout_mgr->curl_multi_, *i);
363 curl_easy_cleanup(*i);
364 }
365 276 s3fanout_mgr->pool_handles_inuse_->clear();
366 276 free(s3fanout_mgr->watch_fds_);
367
368
1/2
✓ Branch 1 taken 276 times.
✗ Branch 2 not taken.
276 LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread terminated");
369 276 return NULL;
370 }
371
372
373 /**
374 * Gets an idle CURL handle from the pool. Creates a new one and adds it to
375 * the pool if necessary.
376 */
377 14122 CURL *S3FanoutManager::AcquireCurlHandle() const {
378 CURL *handle;
379
380 14122 const MutexLockGuard guard(curl_handle_lock_);
381
382
2/2
✓ Branch 1 taken 1127 times.
✓ Branch 2 taken 12995 times.
14122 if (pool_handles_idle_->empty()) {
383 CURLcode retval;
384
385 // Create a new handle
386
1/2
✓ Branch 1 taken 1127 times.
✗ Branch 2 not taken.
1127 handle = curl_easy_init();
387
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1127 times.
1127 assert(handle != NULL);
388
389 // Other settings
390
1/2
✓ Branch 1 taken 1127 times.
✗ Branch 2 not taken.
1127 retval = curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
391
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1127 times.
1127 assert(retval == CURLE_OK);
392
1/2
✓ Branch 1 taken 1127 times.
✗ Branch 2 not taken.
1127 retval = curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION,
393 CallbackCurlHeader);
394
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1127 times.
1127 assert(retval == CURLE_OK);
395
1/2
✓ Branch 1 taken 1127 times.
✗ Branch 2 not taken.
1127 retval = curl_easy_setopt(handle, CURLOPT_READFUNCTION, CallbackCurlData);
396
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1127 times.
1127 assert(retval == CURLE_OK);
397
1/2
✓ Branch 1 taken 1127 times.
✗ Branch 2 not taken.
1127 retval = curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlBody);
398
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1127 times.
1127 assert(retval == CURLE_OK);
399 } else {
400 12995 handle = *(pool_handles_idle_->begin());
401
1/2
✓ Branch 2 taken 12995 times.
✗ Branch 3 not taken.
12995 pool_handles_idle_->erase(pool_handles_idle_->begin());
402 }
403
404
1/2
✓ Branch 1 taken 14122 times.
✗ Branch 2 not taken.
14122 pool_handles_inuse_->insert(handle);
405
406 14122 return handle;
407 14122 }
408
409
410 14122 void S3FanoutManager::ReleaseCurlHandle(JobInfo *info, CURL *handle) const {
411
1/2
✓ Branch 0 taken 14122 times.
✗ Branch 1 not taken.
14122 if (info->http_headers) {
412
1/2
✓ Branch 1 taken 14122 times.
✗ Branch 2 not taken.
14122 curl_slist_free_all(info->http_headers);
413 14122 info->http_headers = NULL;
414 }
415
416 14122 const MutexLockGuard guard(curl_handle_lock_);
417
418
1/2
✓ Branch 1 taken 14122 times.
✗ Branch 2 not taken.
14122 const set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
419
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 14122 times.
14122 assert(elem != pool_handles_inuse_->end());
420
421
2/2
✓ Branch 1 taken 667 times.
✓ Branch 2 taken 13455 times.
14122 if (pool_handles_idle_->size() > config_.pool_max_handles) {
422
1/2
✓ Branch 1 taken 667 times.
✗ Branch 2 not taken.
667 const CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, NULL);
423
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 667 times.
667 assert(retval == CURLE_OK);
424
1/2
✓ Branch 1 taken 667 times.
✗ Branch 2 not taken.
667 curl_easy_cleanup(handle);
425 const std::map<CURL *, S3FanOutDnsEntry *>::size_type
426
1/2
✓ Branch 1 taken 667 times.
✗ Branch 2 not taken.
667 retitems = curl_sharehandles_->erase(handle);
427
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 667 times.
667 assert(retitems == 1);
428 } else {
429
1/2
✓ Branch 1 taken 13455 times.
✗ Branch 2 not taken.
13455 pool_handles_idle_->insert(handle);
430 }
431
432
1/2
✓ Branch 1 taken 14122 times.
✗ Branch 2 not taken.
14122 pool_handles_inuse_->erase(elem);
433 14122 }
434
435 276 void S3FanoutManager::InitPipeWatchFds() {
436
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 276 times.
276 assert(watch_fds_inuse_ == 0);
437
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 276 times.
276 assert(watch_fds_size_ >= 2);
438 276 watch_fds_[0].fd = pipe_terminate_[0];
439 276 watch_fds_[0].events = POLLIN | POLLPRI;
440 276 watch_fds_[0].revents = 0;
441 276 ++watch_fds_inuse_;
442 276 watch_fds_[1].fd = pipe_jobs_[0];
443 276 watch_fds_[1].events = POLLIN | POLLPRI;
444 276 watch_fds_[1].revents = 0;
445 276 ++watch_fds_inuse_;
446 276 }
447
448 /**
449 * The Amazon AWS 2 authorization header according to
450 * http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html#ConstructingTheAuthenticationHeader
451 */
452 28106 bool S3FanoutManager::MkV2Authz(const JobInfo &info,
453 vector<string> *headers) const {
454 28106 string payload_hash;
455
1/2
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
28106 const bool retval = MkPayloadHash(info, &payload_hash);
456
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 28106 times.
28106 if (!retval)
457 return false;
458
1/2
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
28106 const string content_type = GetContentType(info);
459
1/2
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
28106 const string request = GetRequestString(info);
460
461
1/2
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
28106 const string timestamp = RfcTimestamp();
462
5/10
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 28106 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 28106 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 28106 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 28106 times.
✗ Branch 14 not taken.
56212 string to_sign = request + "\n" + payload_hash + "\n" + content_type + "\n"
463
2/4
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 28106 times.
✗ Branch 5 not taken.
56212 + timestamp + "\n";
464
2/4
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 28106 times.
✗ Branch 4 not taken.
28106 if (config_.x_amz_acl != "") {
465
2/4
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 28106 times.
✗ Branch 5 not taken.
56212 to_sign += "x-amz-acl:" + config_.x_amz_acl + "\n" + // default ACL
466
5/10
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 28106 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 28106 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 28106 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 28106 times.
✗ Branch 14 not taken.
56212 "/" + config_.bucket + "/" + info.object_key;
467 }
468
1/2
✓ Branch 3 taken 28106 times.
✗ Branch 4 not taken.
28106 LogCvmfs(kLogS3Fanout, kLogDebug, "%s string to sign for: %s",
469 request.c_str(), info.object_key.c_str());
470
471
1/2
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
28106 shash::Any hmac;
472 28106 hmac.algorithm = shash::kSha1;
473
1/2
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
28106 shash::Hmac(config_.secret_key,
474 28106 reinterpret_cast<const unsigned char *>(to_sign.data()),
475 28106 to_sign.length(), &hmac);
476
477
3/6
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 28106 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 28106 times.
✗ Branch 8 not taken.
84318 headers->push_back("Authorization: AWS " + config_.access_key + ":"
478
3/6
✓ Branch 2 taken 28106 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 28106 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 28106 times.
✗ Branch 9 not taken.
140530 + Base64(string(reinterpret_cast<char *>(hmac.digest),
479 28106 hmac.GetDigestSize())));
480
2/4
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 28106 times.
✗ Branch 5 not taken.
28106 headers->push_back("Date: " + timestamp);
481
2/4
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 28106 times.
✗ Branch 5 not taken.
28106 headers->push_back("X-Amz-Acl: " + config_.x_amz_acl);
482
2/2
✓ Branch 1 taken 13984 times.
✓ Branch 2 taken 14122 times.
28106 if (!payload_hash.empty())
483
2/4
✓ Branch 1 taken 13984 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 13984 times.
✗ Branch 5 not taken.
13984 headers->push_back("Content-MD5: " + payload_hash);
484
2/2
✓ Branch 1 taken 13984 times.
✓ Branch 2 taken 14122 times.
28106 if (!content_type.empty())
485
2/4
✓ Branch 1 taken 13984 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 13984 times.
✗ Branch 5 not taken.
13984 headers->push_back("Content-Type: " + content_type);
486 28106 return true;
487 28106 }
488
489
490 string S3FanoutManager::GetUriEncode(const string &val,
491 bool encode_slash) const {
492 string result;
493 const unsigned len = val.length();
494 result.reserve(len);
495 for (unsigned i = 0; i < len; ++i) {
496 const char c = val[i];
497 if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')
498 || (c >= '0' && c <= '9') || c == '_' || c == '-' || c == '~'
499 || c == '.') {
500 result.push_back(c);
501 } else if (c == '/') {
502 if (encode_slash) {
503 result += "%2F";
504 } else {
505 result.push_back(c);
506 }
507 } else {
508 result.push_back('%');
509 result.push_back((c / 16) + ((c / 16 <= 9) ? '0' : 'A' - 10));
510 result.push_back((c % 16) + ((c % 16 <= 9) ? '0' : 'A' - 10));
511 }
512 }
513 return result;
514 }
515
516
517 string S3FanoutManager::GetAwsV4SigningKey(const string &date) const {
518 if (last_signing_key_.first == date)
519 return last_signing_key_.second;
520
521 const string date_key = shash::Hmac256("AWS4" + config_.secret_key, date,
522 true);
523 const string date_region_key = shash::Hmac256(date_key, config_.region, true);
524 const string date_region_service_key = shash::Hmac256(date_region_key, "s3",
525 true);
526 string signing_key = shash::Hmac256(date_region_service_key, "aws4_request",
527 true);
528 last_signing_key_.first = date;
529 last_signing_key_.second = signing_key;
530 return signing_key;
531 }
532
533
534 /**
535 * The Amazon AWS4 authorization header according to
536 * http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-auth-using-authorization-header.html
537 */
538 bool S3FanoutManager::MkV4Authz(const JobInfo &info,
539 vector<string> *headers) const {
540 string payload_hash;
541 const bool retval = MkPayloadHash(info, &payload_hash);
542 if (!retval)
543 return false;
544 const string content_type = GetContentType(info);
545 const string timestamp = IsoTimestamp();
546 const string date = timestamp.substr(0, 8);
547 vector<string> tokens = SplitString(complete_hostname_, ':');
548 assert(tokens.size() <= 2);
549 string canonical_hostname = tokens[0];
550
551 // if we could split the hostname in two and if the port is *NOT* a default
552 // one
553 if (tokens.size() == 2
554 && !((String2Uint64(tokens[1]) == kDefaultHTTPPort)
555 || (String2Uint64(tokens[1]) == kDefaultHTTPSPort)))
556 canonical_hostname += ":" + tokens[1];
557
558 string signed_headers;
559 string canonical_headers;
560 if (!content_type.empty()) {
561 signed_headers += "content-type;";
562 headers->push_back("Content-Type: " + content_type);
563 canonical_headers += "content-type:" + content_type + "\n";
564 }
565 if (config_.x_amz_acl != "") {
566 signed_headers += "host;x-amz-acl;x-amz-content-sha256;x-amz-date";
567 } else {
568 signed_headers += "host;x-amz-content-sha256;x-amz-date";
569 }
570 canonical_headers += "host:" + canonical_hostname + "\n";
571 if (config_.x_amz_acl != "") {
572 canonical_headers += "x-amz-acl:" + config_.x_amz_acl + "\n";
573 }
574 canonical_headers += "x-amz-content-sha256:" + payload_hash + "\n"
575 + "x-amz-date:" + timestamp + "\n";
576
577 const string scope = date + "/" + config_.region + "/s3/aws4_request";
578 const string uri = config_.dns_buckets ? (string("/") + info.object_key)
579 : (string("/") + config_.bucket + "/"
580 + info.object_key);
581
582 const string canonical_request = GetRequestString(info) + "\n"
583 + GetUriEncode(uri, false) + "\n" + "\n"
584 + canonical_headers + "\n" + signed_headers
585 + "\n" + payload_hash;
586
587 const string hash_request = shash::Sha256String(canonical_request.c_str());
588
589 const string string_to_sign = "AWS4-HMAC-SHA256\n" + timestamp + "\n" + scope
590 + "\n" + hash_request;
591
592 const string signing_key = GetAwsV4SigningKey(date);
593 const string signature = shash::Hmac256(signing_key, string_to_sign);
594
595 headers->push_back("X-Amz-Acl: " + config_.x_amz_acl);
596 headers->push_back("X-Amz-Content-Sha256: " + payload_hash);
597 headers->push_back("X-Amz-Date: " + timestamp);
598 headers->push_back("Authorization: AWS4-HMAC-SHA256 "
599 "Credential="
600 + config_.access_key + "/" + scope
601 + ","
602 "SignedHeaders="
603 + signed_headers
604 + ","
605 "Signature="
606 + signature);
607 return true;
608 }
609
610 /**
611 * The Azure Blob authorization header according to
612 * https://docs.microsoft.com/en-us/rest/api/storageservices/authorize-with-shared-key
613 */
614 bool S3FanoutManager::MkAzureAuthz(const JobInfo &info,
615 vector<string> *headers) const {
616 const string timestamp = RfcTimestamp();
617 const string canonical_headers = "x-ms-blob-type:BlockBlob\nx-ms-date:"
618 + timestamp + "\nx-ms-version:2011-08-18";
619 const string canonical_resource = "/" + config_.access_key + "/"
620 + config_.bucket + "/" + info.object_key;
621
622 string string_to_sign;
623 if ((info.request == JobInfo::kReqHeadOnly)
624 || (info.request == JobInfo::kReqHeadPut)
625 || (info.request == JobInfo::kReqDelete)) {
626 string_to_sign = GetRequestString(info) + string("\n\n\n")
627 + "\n\n\n\n\n\n\n\n\n" + canonical_headers + "\n"
628 + canonical_resource;
629 } else {
630 string_to_sign = GetRequestString(info) + string("\n\n\n")
631 + string(StringifyInt(info.origin->GetSize()))
632 + "\n\n\n\n\n\n\n\n\n" + canonical_headers + "\n"
633 + canonical_resource;
634 }
635
636 string signing_key;
637 const int retval = Debase64(config_.secret_key, &signing_key);
638 if (!retval)
639 return false;
640
641 const string signature = shash::Hmac256(signing_key, string_to_sign, true);
642
643 headers->push_back("x-ms-date: " + timestamp);
644 headers->push_back("x-ms-version: 2011-08-18");
645 headers->push_back("Authorization: SharedKey " + config_.access_key + ":"
646 + Base64(signature));
647 headers->push_back("x-ms-blob-type: BlockBlob");
648 return true;
649 }
650
651 28106 void S3FanoutManager::InitializeDnsSettingsCurl(CURL *handle,
652 CURLSH *sharehandle,
653 curl_slist *clist) const {
654 28106 CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, sharehandle);
655
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 28106 times.
28106 assert(retval == CURLE_OK);
656 28106 retval = curl_easy_setopt(handle, CURLOPT_RESOLVE, clist);
657
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 28106 times.
28106 assert(retval == CURLE_OK);
658 28106 }
659
660
661 28106 int S3FanoutManager::InitializeDnsSettings(CURL *handle,
662 std::string host_with_port) const {
663 // Use existing handle
664 const std::map<CURL *, S3FanOutDnsEntry *>::const_iterator
665
1/2
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
28106 it = curl_sharehandles_->find(handle);
666
2/2
✓ Branch 3 taken 26979 times.
✓ Branch 4 taken 1127 times.
28106 if (it != curl_sharehandles_->end()) {
667
1/2
✓ Branch 1 taken 26979 times.
✗ Branch 2 not taken.
26979 InitializeDnsSettingsCurl(handle, it->second->sharehandle,
668 26979 it->second->clist);
669 26979 return 0;
670 }
671
672 // Add protocol information for extraction of fields for DNS
673
2/4
✓ Branch 1 taken 1127 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1127 times.
✗ Branch 4 not taken.
1127 if (!IsHttpUrl(host_with_port))
674
2/4
✓ Branch 1 taken 1127 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1127 times.
✗ Branch 5 not taken.
1127 host_with_port = config_.protocol + "://" + host_with_port;
675
1/2
✓ Branch 1 taken 1127 times.
✗ Branch 2 not taken.
1127 const std::string remote_host = dns::ExtractHost(host_with_port);
676
1/2
✓ Branch 1 taken 1127 times.
✗ Branch 2 not taken.
1127 const std::string remote_port = dns::ExtractPort(host_with_port);
677
678 // If we have the name already resolved, use the least used IP
679 1127 S3FanOutDnsEntry *useme = NULL;
680 1127 unsigned int usemin = UINT_MAX;
681 1127 std::set<S3FanOutDnsEntry *>::iterator its3 = sharehandles_->begin();
682
2/2
✓ Branch 3 taken 897 times.
✓ Branch 4 taken 1127 times.
2024 for (; its3 != sharehandles_->end(); ++its3) {
683
1/2
✓ Branch 2 taken 897 times.
✗ Branch 3 not taken.
897 if ((*its3)->dns_name == remote_host) {
684
1/2
✓ Branch 1 taken 897 times.
✗ Branch 2 not taken.
897 if (usemin >= (*its3)->counter) {
685 897 usemin = (*its3)->counter;
686 897 useme = (*its3);
687 }
688 }
689 }
690
2/2
✓ Branch 0 taken 897 times.
✓ Branch 1 taken 230 times.
1127 if (useme != NULL) {
691
1/2
✓ Branch 1 taken 897 times.
✗ Branch 2 not taken.
897 curl_sharehandles_->insert(
692 897 std::pair<CURL *, S3FanOutDnsEntry *>(handle, useme));
693 897 useme->counter++;
694
1/2
✓ Branch 1 taken 897 times.
✗ Branch 2 not taken.
897 InitializeDnsSettingsCurl(handle, useme->sharehandle, useme->clist);
695 897 return 0;
696 }
697
698 // We need to resolve the hostname
699 // TODO(ssheikki): support ipv6 also... if (opt_ipv4_only_)
700
1/2
✓ Branch 1 taken 230 times.
✗ Branch 2 not taken.
230 const dns::Host host = resolver_->Resolve(remote_host);
701
1/2
✓ Branch 2 taken 230 times.
✗ Branch 3 not taken.
230 set<string> ipv4_addresses = host.ipv4_addresses();
702 230 std::set<string>::iterator its = ipv4_addresses.begin();
703 230 S3FanOutDnsEntry *dnse = NULL;
704
2/2
✓ Branch 3 taken 230 times.
✓ Branch 4 taken 230 times.
460 for (; its != ipv4_addresses.end(); ++its) {
705
2/4
✓ Branch 1 taken 230 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 230 times.
✗ Branch 5 not taken.
230 dnse = new S3FanOutDnsEntry();
706 230 dnse->counter = 0;
707
1/2
✓ Branch 1 taken 230 times.
✗ Branch 2 not taken.
230 dnse->dns_name = remote_host;
708
4/12
✗ Branch 1 not taken.
✓ Branch 2 taken 230 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 8 taken 230 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 230 times.
✗ Branch 12 not taken.
✗ Branch 14 not taken.
✓ Branch 15 taken 230 times.
✗ Branch 18 not taken.
✗ Branch 19 not taken.
230 dnse->port = remote_port.size() == 0 ? "80" : remote_port;
709
1/2
✓ Branch 2 taken 230 times.
✗ Branch 3 not taken.
230 dnse->ip = *its;
710 230 dnse->clist = NULL;
711
1/2
✓ Branch 2 taken 230 times.
✗ Branch 3 not taken.
230 dnse->clist = curl_slist_append(
712 dnse->clist,
713
4/8
✓ Branch 1 taken 230 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 230 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 230 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 230 times.
✗ Branch 11 not taken.
460 (dnse->dns_name + ":" + dnse->port + ":" + dnse->ip).c_str());
714
1/2
✓ Branch 1 taken 230 times.
✗ Branch 2 not taken.
230 dnse->sharehandle = curl_share_init();
715
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 230 times.
230 assert(dnse->sharehandle != NULL);
716
1/2
✓ Branch 1 taken 230 times.
✗ Branch 2 not taken.
230 const CURLSHcode share_retval = curl_share_setopt(
717 dnse->sharehandle, CURLSHOPT_SHARE, CURL_LOCK_DATA_DNS);
718
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 230 times.
230 assert(share_retval == CURLSHE_OK);
719
1/2
✓ Branch 1 taken 230 times.
✗ Branch 2 not taken.
230 sharehandles_->insert(dnse);
720 }
721
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 230 times.
230 if (dnse == NULL) {
722 LogCvmfs(kLogS3Fanout, kLogStderr | kLogSyslogErr,
723 "Error: DNS resolve failed for address '%s'.",
724 remote_host.c_str());
725 assert(dnse != NULL);
726 return -1;
727 }
728
1/2
✓ Branch 1 taken 230 times.
✗ Branch 2 not taken.
230 curl_sharehandles_->insert(
729 230 std::pair<CURL *, S3FanOutDnsEntry *>(handle, dnse));
730 230 dnse->counter++;
731
1/2
✓ Branch 1 taken 230 times.
✗ Branch 2 not taken.
230 InitializeDnsSettingsCurl(handle, dnse->sharehandle, dnse->clist);
732
733 230 return 0;
734 1127 }
735
736
737 28106 bool S3FanoutManager::MkPayloadHash(const JobInfo &info,
738 string *hex_hash) const {
739
2/2
✓ Branch 0 taken 27991 times.
✓ Branch 1 taken 115 times.
28106 if ((info.request == JobInfo::kReqHeadOnly)
740
2/2
✓ Branch 0 taken 14007 times.
✓ Branch 1 taken 13984 times.
27991 || (info.request == JobInfo::kReqHeadPut)
741
2/2
✓ Branch 0 taken 23 times.
✓ Branch 1 taken 13984 times.
14007 || (info.request == JobInfo::kReqDelete)) {
742
1/4
✓ Branch 0 taken 14122 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
14122 switch (config_.authz_method) {
743 14122 case kAuthzAwsV2:
744 14122 hex_hash->clear();
745 14122 break;
746 case kAuthzAwsV4:
747 // Sha256 over empty string
748 *hex_hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b78"
749 "52b855";
750 break;
751 case kAuthzAzure:
752 // no payload hash required for Azure signature
753 hex_hash->clear();
754 break;
755 default:
756 PANIC(NULL);
757 }
758 14122 return true;
759 }
760
761 // PUT, there is actually payload
762
1/2
✓ Branch 1 taken 13984 times.
✗ Branch 2 not taken.
13984 shash::Any payload_hash(shash::kMd5);
763
764 unsigned char *data;
765 13984 const unsigned int nbytes = info.origin->Data(
766
2/4
✓ Branch 2 taken 13984 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 13984 times.
✗ Branch 6 not taken.
13984 reinterpret_cast<void **>(&data), info.origin->GetSize(), 0);
767
2/4
✓ Branch 2 taken 13984 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 13984 times.
13984 assert(nbytes == info.origin->GetSize());
768
769
1/4
✓ Branch 0 taken 13984 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
13984 switch (config_.authz_method) {
770 13984 case kAuthzAwsV2:
771
1/2
✓ Branch 1 taken 13984 times.
✗ Branch 2 not taken.
13984 shash::HashMem(data, nbytes, &payload_hash);
772
2/4
✓ Branch 2 taken 13984 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 13984 times.
✗ Branch 6 not taken.
41952 *hex_hash = Base64(string(reinterpret_cast<char *>(payload_hash.digest),
773 27968 payload_hash.GetDigestSize()));
774 13984 return true;
775 case kAuthzAwsV4:
776 *hex_hash = shash::Sha256Mem(data, nbytes);
777 return true;
778 case kAuthzAzure:
779 // no payload hash required for Azure signature
780 hex_hash->clear();
781 return true;
782 default:
783 PANIC(NULL);
784 }
785 }
786
787 28129 string S3FanoutManager::GetRequestString(const JobInfo &info) const {
788
3/4
✓ Branch 0 taken 14099 times.
✓ Branch 1 taken 13984 times.
✓ Branch 2 taken 46 times.
✗ Branch 3 not taken.
28129 switch (info.request) {
789 14099 case JobInfo::kReqHeadOnly:
790 case JobInfo::kReqHeadPut:
791
1/2
✓ Branch 2 taken 14099 times.
✗ Branch 3 not taken.
14099 return "HEAD";
792 13984 case JobInfo::kReqPutCas:
793 case JobInfo::kReqPutDotCvmfs:
794 case JobInfo::kReqPutHtml:
795 case JobInfo::kReqPutBucket:
796
1/2
✓ Branch 2 taken 13984 times.
✗ Branch 3 not taken.
13984 return "PUT";
797 46 case JobInfo::kReqDelete:
798
1/2
✓ Branch 2 taken 46 times.
✗ Branch 3 not taken.
46 return "DELETE";
799 default:
800 PANIC(NULL);
801 }
802 }
803
804
805 28106 string S3FanoutManager::GetContentType(const JobInfo &info) const {
806
2/6
✓ Branch 0 taken 14122 times.
✓ Branch 1 taken 13984 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
28106 switch (info.request) {
807 14122 case JobInfo::kReqHeadOnly:
808 case JobInfo::kReqHeadPut:
809 case JobInfo::kReqDelete:
810
1/2
✓ Branch 2 taken 14122 times.
✗ Branch 3 not taken.
14122 return "";
811 13984 case JobInfo::kReqPutCas:
812
1/2
✓ Branch 2 taken 13984 times.
✗ Branch 3 not taken.
13984 return "application/octet-stream";
813 case JobInfo::kReqPutDotCvmfs:
814 return "application/x-cvmfs";
815 case JobInfo::kReqPutHtml:
816 return "text/html";
817 case JobInfo::kReqPutBucket:
818 return "text/xml";
819 default:
820 PANIC(NULL);
821 }
822 }
823
824
825 /**
826 * Request parameters set the URL and other options such as timeout and
827 * proxy.
828 */
829 28106 Failures S3FanoutManager::InitializeRequest(JobInfo *info, CURL *handle) const {
830 // Initialize internal download state
831 28106 info->curl_handle = handle;
832 28106 info->error_code = kFailOk;
833 28106 info->http_error = 0;
834 28106 info->num_retries = 0;
835 28106 info->backoff_ms = 0;
836 28106 info->throttle_ms = 0;
837 28106 info->throttle_timestamp = 0;
838 28106 info->http_headers = NULL;
839 // info->payload_size is needed in S3Uploader::MainCollectResults,
840 // where info->origin is already destroyed.
841
1/2
✓ Branch 2 taken 28106 times.
✗ Branch 3 not taken.
28106 info->payload_size = info->origin->GetSize();
842
843
2/4
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 28106 times.
✗ Branch 5 not taken.
28106 InitializeDnsSettings(handle, complete_hostname_);
844
845 CURLcode retval;
846
2/2
✓ Branch 0 taken 27991 times.
✓ Branch 1 taken 115 times.
28106 if ((info->request == JobInfo::kReqHeadOnly)
847
2/2
✓ Branch 0 taken 14007 times.
✓ Branch 1 taken 13984 times.
27991 || (info->request == JobInfo::kReqHeadPut)
848
2/2
✓ Branch 0 taken 23 times.
✓ Branch 1 taken 13984 times.
14007 || (info->request == JobInfo::kReqDelete)) {
849
1/2
✓ Branch 1 taken 14122 times.
✗ Branch 2 not taken.
14122 retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 0);
850
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 14122 times.
14122 assert(retval == CURLE_OK);
851
1/2
✓ Branch 1 taken 14122 times.
✗ Branch 2 not taken.
14122 retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
852
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 14122 times.
14122 assert(retval == CURLE_OK);
853
854
2/2
✓ Branch 0 taken 23 times.
✓ Branch 1 taken 14099 times.
14122 if (info->request == JobInfo::kReqDelete) {
855
2/4
✓ Branch 1 taken 23 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 23 times.
✗ Branch 6 not taken.
23 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST,
856 GetRequestString(*info).c_str());
857
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 23 times.
23 assert(retval == CURLE_OK);
858 } else {
859
1/2
✓ Branch 1 taken 14099 times.
✗ Branch 2 not taken.
14099 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
860
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 14099 times.
14099 assert(retval == CURLE_OK);
861 }
862 } else {
863
1/2
✓ Branch 1 taken 13984 times.
✗ Branch 2 not taken.
13984 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
864
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13984 times.
13984 assert(retval == CURLE_OK);
865
1/2
✓ Branch 1 taken 13984 times.
✗ Branch 2 not taken.
13984 retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 1);
866
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13984 times.
13984 assert(retval == CURLE_OK);
867
1/2
✓ Branch 1 taken 13984 times.
✗ Branch 2 not taken.
13984 retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 0);
868
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13984 times.
13984 assert(retval == CURLE_OK);
869
2/4
✓ Branch 2 taken 13984 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 13984 times.
✗ Branch 6 not taken.
13984 retval = curl_easy_setopt(handle, CURLOPT_INFILESIZE_LARGE,
870 static_cast<curl_off_t>(info->origin->GetSize()));
871
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13984 times.
13984 assert(retval == CURLE_OK);
872
873
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13984 times.
13984 if (info->request == JobInfo::kReqPutDotCvmfs) {
874 info->http_headers = curl_slist_append(info->http_headers,
875 kCacheControlDotCvmfs);
876
1/2
✓ Branch 0 taken 13984 times.
✗ Branch 1 not taken.
13984 } else if (info->request == JobInfo::kReqPutCas) {
877
1/2
✓ Branch 1 taken 13984 times.
✗ Branch 2 not taken.
13984 info->http_headers = curl_slist_append(info->http_headers,
878 kCacheControlCas);
879 }
880 }
881
882 bool retval_b;
883
884 // Authorization
885 28106 vector<string> authz_headers;
886
1/4
✓ Branch 0 taken 28106 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
28106 switch (config_.authz_method) {
887 28106 case kAuthzAwsV2:
888
1/2
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
28106 retval_b = MkV2Authz(*info, &authz_headers);
889 28106 break;
890 case kAuthzAwsV4:
891 retval_b = MkV4Authz(*info, &authz_headers);
892 break;
893 case kAuthzAzure:
894 retval_b = MkAzureAuthz(*info, &authz_headers);
895 break;
896 default:
897 PANIC(NULL);
898 }
899
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 28106 times.
28106 if (!retval_b)
900 return kFailLocalIO;
901
2/2
✓ Branch 1 taken 112286 times.
✓ Branch 2 taken 28106 times.
140392 for (unsigned i = 0; i < authz_headers.size(); ++i) {
902
1/2
✓ Branch 2 taken 112286 times.
✗ Branch 3 not taken.
112286 info->http_headers = curl_slist_append(info->http_headers,
903 112286 authz_headers[i].c_str());
904 }
905
906 // Common headers
907
1/2
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
28106 info->http_headers = curl_slist_append(info->http_headers,
908 "Connection: Keep-Alive");
909
1/2
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
28106 info->http_headers = curl_slist_append(info->http_headers, "Pragma:");
910 // No 100-continue
911
1/2
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
28106 info->http_headers = curl_slist_append(info->http_headers, "Expect:");
912 // Strip unnecessary header
913
1/2
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
28106 info->http_headers = curl_slist_append(info->http_headers, "Accept:");
914
1/2
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
28106 info->http_headers = curl_slist_append(info->http_headers,
915 28106 user_agent_->c_str());
916
917 // Set curl parameters
918
1/2
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
28106 retval = curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
919
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 28106 times.
28106 assert(retval == CURLE_OK);
920
1/2
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
28106 retval = curl_easy_setopt(handle, CURLOPT_HEADERDATA,
921 static_cast<void *>(info));
922
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 28106 times.
28106 assert(retval == CURLE_OK);
923
1/2
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
28106 retval = curl_easy_setopt(handle, CURLOPT_READDATA,
924 static_cast<void *>(info));
925
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 28106 times.
28106 assert(retval == CURLE_OK);
926
1/2
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
28106 retval = curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->http_headers);
927
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 28106 times.
28106 assert(retval == CURLE_OK);
928
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 28106 times.
28106 if (opt_ipv4_only_) {
929 retval = curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
930 assert(retval == CURLE_OK);
931 }
932 // Follow HTTP redirects
933
1/2
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
28106 retval = curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L);
934
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 28106 times.
28106 assert(retval == CURLE_OK);
935
936
1/2
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
28106 retval = curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, info->errorbuffer);
937
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 28106 times.
28106 assert(retval == CURLE_OK);
938
939
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 28106 times.
28106 if (config_.protocol == "https") {
940 retval = curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 1L);
941 assert(retval == CURLE_OK);
942 retval = curl_easy_setopt(handle, CURLOPT_PROXY_SSL_VERIFYPEER, 1L);
943 assert(retval == CURLE_OK);
944 const bool add_cert = ssl_certificate_store_.ApplySslCertificatePath(
945 handle);
946 assert(add_cert);
947 }
948
949 28106 return kFailOk;
950 28106 }
951
952
953 /**
954 * Sets the URL specific options such as host to use and timeout.
955 */
956 28106 void S3FanoutManager::SetUrlOptions(JobInfo *info) const {
957 28106 CURL *curl_handle = info->curl_handle;
958 CURLcode retval;
959
960
1/2
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
28106 retval = curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT,
961 config_.opt_timeout_sec);
962
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 28106 times.
28106 assert(retval == CURLE_OK);
963
1/2
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
28106 retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT,
964 kLowSpeedLimit);
965
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 28106 times.
28106 assert(retval == CURLE_OK);
966
1/2
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
28106 retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME,
967 config_.opt_timeout_sec);
968
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 28106 times.
28106 assert(retval == CURLE_OK);
969
970
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 28106 times.
28106 if (is_curl_debug_) {
971 retval = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE, 1);
972 assert(retval == CURLE_OK);
973 }
974
975
1/2
✓ Branch 1 taken 28106 times.
✗ Branch 2 not taken.
28106 const string url = MkUrl(info->object_key);
976
1/2
✓ Branch 2 taken 28106 times.
✗ Branch 3 not taken.
28106 retval = curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
977
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 28106 times.
28106 assert(retval == CURLE_OK);
978
979
1/2
✓ Branch 2 taken 28106 times.
✗ Branch 3 not taken.
28106 retval = curl_easy_setopt(curl_handle, CURLOPT_PROXY, config_.proxy.c_str());
980
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 28106 times.
28106 assert(retval == CURLE_OK);
981 28106 }
982
983
984 /**
985 * Adds transfer time and uploaded bytes to the global counters.
986 */
987 28198 void S3FanoutManager::UpdateStatistics(CURL *handle) {
988 double val;
989
990
2/4
✓ Branch 1 taken 28198 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 28198 times.
✗ Branch 4 not taken.
28198 if (curl_easy_getinfo(handle, CURLINFO_SIZE_UPLOAD, &val) == CURLE_OK)
991 28198 statistics_->transferred_bytes += val;
992 28198 }
993
994
995 /**
996 * Retry if possible and if not already done too often.
997 */
998 138 bool S3FanoutManager::CanRetry(const JobInfo *info) {
999 138 return (info->error_code == kFailHostConnection
1000
1/2
✓ Branch 0 taken 138 times.
✗ Branch 1 not taken.
138 || info->error_code == kFailHostResolve
1001
1/2
✓ Branch 0 taken 138 times.
✗ Branch 1 not taken.
138 || info->error_code == kFailServiceUnavailable
1002
2/2
✓ Branch 0 taken 92 times.
✓ Branch 1 taken 46 times.
138 || info->error_code == kFailRetry)
1003
2/4
✓ Branch 0 taken 138 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 92 times.
✗ Branch 3 not taken.
276 && (info->num_retries < config_.opt_max_retries);
1004 }
1005
1006
1007 /**
1008 * Backoff for retry to introduce a jitter into a upload sequence.
1009 *
1010 * \return true if backoff has been performed, false otherwise
1011 */
1012 92 void S3FanoutManager::Backoff(JobInfo *info) {
1013
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 92 times.
92 if (info->error_code != kFailRetry)
1014 info->num_retries++;
1015 92 statistics_->num_retries++;
1016
1017
1/2
✓ Branch 0 taken 92 times.
✗ Branch 1 not taken.
92 if (info->throttle_ms > 0) {
1018 92 LogCvmfs(kLogS3Fanout, kLogDebug, "throttling for %d ms",
1019 info->throttle_ms);
1020 92 const uint64_t now = platform_monotonic_time();
1021
1/2
✓ Branch 0 taken 92 times.
✗ Branch 1 not taken.
92 if ((info->throttle_timestamp + (info->throttle_ms / 1000)) >= now) {
1022
2/2
✓ Branch 0 taken 23 times.
✓ Branch 1 taken 69 times.
92 if ((now - timestamp_last_throttle_report_)
1023 > kThrottleReportIntervalSec) {
1024 23 LogCvmfs(kLogS3Fanout, kLogStdout,
1025 "Warning: S3 backend throttling %ums "
1026 "(total backoff time so far %lums)",
1027 23 info->throttle_ms, statistics_->ms_throttled);
1028 23 timestamp_last_throttle_report_ = now;
1029 }
1030 92 statistics_->ms_throttled += info->throttle_ms;
1031 92 SafeSleepMs(info->throttle_ms);
1032 }
1033 } else {
1034 if (info->backoff_ms == 0) {
1035 // Must be != 0
1036 info->backoff_ms = prng_.Next(config_.opt_backoff_init_ms + 1);
1037 } else {
1038 info->backoff_ms *= 2;
1039 }
1040 if (info->backoff_ms > config_.opt_backoff_max_ms)
1041 info->backoff_ms = config_.opt_backoff_max_ms;
1042
1043 LogCvmfs(kLogS3Fanout, kLogDebug, "backing off for %d ms",
1044 info->backoff_ms);
1045 SafeSleepMs(info->backoff_ms);
1046 }
1047 92 }
1048
1049
1050 /**
1051 * Checks the result of a curl request and implements the failure logic
1052 * and takes care of cleanup.
1053 *
1054 * @return true if request should be repeated, false otherwise
1055 */
1056 28198 bool S3FanoutManager::VerifyAndFinalize(const int curl_error, JobInfo *info) {
1057 28198 LogCvmfs(kLogS3Fanout, kLogDebug,
1058 "Verify uploaded/tested object %s "
1059 "(curl error %d, info error %d, info request %d)",
1060 28198 info->object_key.c_str(), curl_error, info->error_code,
1061 28198 info->request);
1062 28198 UpdateStatistics(info->curl_handle);
1063
1064 // Verification and error classification
1065
1/6
✓ Branch 0 taken 28198 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
28198 switch (curl_error) {
1066 28198 case CURLE_OK:
1067
2/2
✓ Branch 0 taken 28106 times.
✓ Branch 1 taken 92 times.
28198 if ((info->error_code != kFailRetry)
1068
2/2
✓ Branch 0 taken 14076 times.
✓ Branch 1 taken 14030 times.
28106 && (info->error_code != kFailNotFound)) {
1069 14076 info->error_code = kFailOk;
1070 }
1071 28198 break;
1072 case CURLE_UNSUPPORTED_PROTOCOL:
1073 case CURLE_URL_MALFORMAT:
1074 info->error_code = kFailBadRequest;
1075 break;
1076 case CURLE_COULDNT_RESOLVE_HOST:
1077 info->error_code = kFailHostResolve;
1078 break;
1079 case CURLE_COULDNT_CONNECT:
1080 case CURLE_OPERATION_TIMEDOUT:
1081 case CURLE_SEND_ERROR:
1082 case CURLE_RECV_ERROR:
1083 info->error_code = kFailHostConnection;
1084 break;
1085 case CURLE_ABORTED_BY_CALLBACK:
1086 case CURLE_WRITE_ERROR:
1087 // Error set by callback
1088 break;
1089 default:
1090 LogCvmfs(kLogS3Fanout, kLogStderr | kLogSyslogErr,
1091 "unexpected curl error (%d) while trying to upload %s: %s",
1092 curl_error, info->object_key.c_str(), info->errorbuffer);
1093 info->error_code = kFailOther;
1094 break;
1095 }
1096
1097 // Transform HEAD to PUT request
1098
2/2
✓ Branch 0 taken 14030 times.
✓ Branch 1 taken 14168 times.
28198 if ((info->error_code == kFailNotFound)
1099
2/2
✓ Branch 0 taken 13984 times.
✓ Branch 1 taken 46 times.
14030 && (info->request == JobInfo::kReqHeadPut)) {
1100 13984 LogCvmfs(kLogS3Fanout, kLogDebug, "not found: %s, uploading",
1101 info->object_key.c_str());
1102 13984 info->request = JobInfo::kReqPutCas;
1103 13984 curl_slist_free_all(info->http_headers);
1104 13984 info->http_headers = NULL;
1105 13984 const s3fanout::Failures init_failure = InitializeRequest(
1106 info, info->curl_handle);
1107
1108
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13984 times.
13984 if (init_failure != s3fanout::kFailOk) {
1109 PANIC(kLogStderr,
1110 "Failed to initialize CURL handle "
1111 "(error: %d - %s | errno: %d)",
1112 init_failure, Code2Ascii(init_failure), errno);
1113 }
1114 13984 SetUrlOptions(info);
1115 // Reset origin
1116 13984 info->origin->Rewind();
1117 13984 return true; // Again, Put
1118 }
1119
1120 // Determination if failed request should be repeated
1121 14214 bool try_again = false;
1122
2/2
✓ Branch 0 taken 138 times.
✓ Branch 1 taken 14076 times.
14214 if (info->error_code != kFailOk) {
1123 138 try_again = CanRetry(info);
1124 }
1125
2/2
✓ Branch 0 taken 92 times.
✓ Branch 1 taken 14122 times.
14214 if (try_again) {
1126
1/2
✓ Branch 0 taken 92 times.
✗ Branch 1 not taken.
92 if (info->request == JobInfo::kReqPutCas
1127
1/2
✓ Branch 0 taken 92 times.
✗ Branch 1 not taken.
92 || info->request == JobInfo::kReqPutDotCvmfs
1128
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 92 times.
92 || info->request == JobInfo::kReqPutHtml) {
1129 LogCvmfs(kLogS3Fanout, kLogDebug, "Trying again to upload %s",
1130 info->object_key.c_str());
1131 // Reset origin
1132 info->origin->Rewind();
1133 }
1134 92 Backoff(info);
1135 92 info->error_code = kFailOk;
1136 92 info->http_error = 0;
1137 92 info->throttle_ms = 0;
1138 92 info->backoff_ms = 0;
1139 92 info->throttle_timestamp = 0;
1140 92 return true; // try again
1141 }
1142
1143 // Cleanup opened resources
1144 14122 info->origin.Destroy();
1145
1146
3/4
✓ Branch 0 taken 46 times.
✓ Branch 1 taken 14076 times.
✓ Branch 2 taken 46 times.
✗ Branch 3 not taken.
14122 if ((info->error_code != kFailOk) && (info->http_error != 0)
1147
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 46 times.
46 && (info->http_error != 404)) {
1148 LogCvmfs(kLogS3Fanout, kLogStderr, "S3: HTTP failure %d", info->http_error);
1149 }
1150 14122 return false; // stop transfer
1151 }
1152
1153
2/4
✓ Branch 3 taken 276 times.
✗ Branch 4 not taken.
✓ Branch 9 taken 276 times.
✗ Branch 10 not taken.
276 S3FanoutManager::S3FanoutManager(const S3Config &config) : config_(config) {
1154 276 atomic_init32(&multi_threaded_);
1155
1/2
✓ Branch 1 taken 276 times.
✗ Branch 2 not taken.
276 MakePipe(pipe_terminate_);
1156
1/2
✓ Branch 1 taken 276 times.
✗ Branch 2 not taken.
276 MakePipe(pipe_jobs_);
1157
1/2
✓ Branch 1 taken 276 times.
✗ Branch 2 not taken.
276 MakePipe(pipe_completed_);
1158
1159 int retval;
1160 276 jobs_todo_lock_ = reinterpret_cast<pthread_mutex_t *>(
1161 276 smalloc(sizeof(pthread_mutex_t)));
1162 276 retval = pthread_mutex_init(jobs_todo_lock_, NULL);
1163
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 276 times.
276 assert(retval == 0);
1164 276 curl_handle_lock_ = reinterpret_cast<pthread_mutex_t *>(
1165 276 smalloc(sizeof(pthread_mutex_t)));
1166 276 retval = pthread_mutex_init(curl_handle_lock_, NULL);
1167
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 276 times.
276 assert(retval == 0);
1168
1169
1/2
✓ Branch 1 taken 276 times.
✗ Branch 2 not taken.
276 active_requests_ = new set<JobInfo *>;
1170
1/2
✓ Branch 1 taken 276 times.
✗ Branch 2 not taken.
276 pool_handles_idle_ = new set<CURL *>;
1171
1/2
✓ Branch 1 taken 276 times.
✗ Branch 2 not taken.
276 pool_handles_inuse_ = new set<CURL *>;
1172
1/2
✓ Branch 1 taken 276 times.
✗ Branch 2 not taken.
276 curl_sharehandles_ = new map<CURL *, S3FanOutDnsEntry *>;
1173
1/2
✓ Branch 1 taken 276 times.
✗ Branch 2 not taken.
276 sharehandles_ = new set<S3FanOutDnsEntry *>;
1174 276 watch_fds_max_ = 4 * config_.pool_max_handles;
1175 276 max_available_jobs_ = 4 * config_.pool_max_handles;
1176
2/4
✓ Branch 1 taken 276 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 276 times.
✗ Branch 5 not taken.
276 available_jobs_ = new Semaphore(max_available_jobs_);
1177
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 276 times.
276 assert(NULL != available_jobs_);
1178
1179
1/2
✓ Branch 1 taken 276 times.
✗ Branch 2 not taken.
276 statistics_ = new Statistics();
1180
1/2
✓ Branch 1 taken 276 times.
✗ Branch 2 not taken.
276 user_agent_ = new string();
1181
2/4
✓ Branch 2 taken 276 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 276 times.
✗ Branch 6 not taken.
276 *user_agent_ = "User-Agent: cvmfs " + string(CVMFS_VERSION);
1182
1/2
✓ Branch 1 taken 276 times.
✗ Branch 2 not taken.
276 complete_hostname_ = MkCompleteHostname();
1183
1184
1/2
✓ Branch 1 taken 276 times.
✗ Branch 2 not taken.
276 const CURLcode cretval = curl_global_init(CURL_GLOBAL_ALL);
1185
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 276 times.
276 assert(cretval == CURLE_OK);
1186
1/2
✓ Branch 1 taken 276 times.
✗ Branch 2 not taken.
276 curl_multi_ = curl_multi_init();
1187
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 276 times.
276 assert(curl_multi_ != NULL);
1188 CURLMcode mretval;
1189
1/2
✓ Branch 1 taken 276 times.
✗ Branch 2 not taken.
276 mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION,
1190 CallbackCurlSocket);
1191
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 276 times.
276 assert(mretval == CURLM_OK);
1192
1/2
✓ Branch 1 taken 276 times.
✗ Branch 2 not taken.
276 mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1193 static_cast<void *>(this));
1194
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 276 times.
276 assert(mretval == CURLM_OK);
1195
1/2
✓ Branch 1 taken 276 times.
✗ Branch 2 not taken.
276 mretval = curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1196 config_.pool_max_handles);
1197
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 276 times.
276 assert(mretval == CURLM_OK);
1198
1199 276 prng_.InitLocaltime();
1200
1201 276 thread_upload_ = 0;
1202 276 timestamp_last_throttle_report_ = 0;
1203 276 is_curl_debug_ = (getenv("_CVMFS_CURL_DEBUG") != NULL);
1204
1205 // Parsing environment variables
1206 276 if ((getenv("CVMFS_IPV4_ONLY") != NULL)
1207
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 276 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 276 times.
276 && (strlen(getenv("CVMFS_IPV4_ONLY")) > 0)) {
1208 opt_ipv4_only_ = true;
1209 } else {
1210 276 opt_ipv4_only_ = false;
1211 }
1212
1213
1/2
✓ Branch 1 taken 276 times.
✗ Branch 2 not taken.
276 resolver_ = dns::CaresResolver::Create(opt_ipv4_only_, 2, 2000);
1214
1215 276 watch_fds_ = static_cast<struct pollfd *>(smalloc(4 * sizeof(struct pollfd)));
1216 276 watch_fds_size_ = 4;
1217 276 watch_fds_inuse_ = 0;
1218
1219
1/2
✓ Branch 1 taken 276 times.
✗ Branch 2 not taken.
276 ssl_certificate_store_.UseSystemCertificatePath();
1220 276 }
1221
1222 552 S3FanoutManager::~S3FanoutManager() {
1223 276 pthread_mutex_destroy(jobs_todo_lock_);
1224 276 free(jobs_todo_lock_);
1225 276 pthread_mutex_destroy(curl_handle_lock_);
1226 276 free(curl_handle_lock_);
1227
1228
1/2
✓ Branch 1 taken 276 times.
✗ Branch 2 not taken.
276 if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1229 // Shutdown I/O thread
1230 276 char buf = 'T';
1231 276 WritePipe(pipe_terminate_[1], &buf, 1);
1232 276 pthread_join(thread_upload_, NULL);
1233 }
1234 276 ClosePipe(pipe_terminate_);
1235 276 ClosePipe(pipe_jobs_);
1236 276 ClosePipe(pipe_completed_);
1237
1238 276 set<CURL *>::iterator i = pool_handles_idle_->begin();
1239 276 const set<CURL *>::const_iterator iEnd = pool_handles_idle_->end();
1240
2/2
✓ Branch 2 taken 460 times.
✓ Branch 3 taken 276 times.
736 for (; i != iEnd; ++i) {
1241 460 curl_easy_cleanup(*i);
1242 }
1243
1244 276 set<S3FanOutDnsEntry *>::iterator is = sharehandles_->begin();
1245 276 const set<S3FanOutDnsEntry *>::const_iterator isEnd = sharehandles_->end();
1246
2/2
✓ Branch 2 taken 230 times.
✓ Branch 3 taken 276 times.
506 for (; is != isEnd; ++is) {
1247 230 curl_share_cleanup((*is)->sharehandle);
1248 230 curl_slist_free_all((*is)->clist);
1249
1/2
✓ Branch 1 taken 230 times.
✗ Branch 2 not taken.
230 delete *is;
1250 }
1251 276 pool_handles_idle_->clear();
1252 276 curl_sharehandles_->clear();
1253 276 sharehandles_->clear();
1254
1/2
✓ Branch 0 taken 276 times.
✗ Branch 1 not taken.
276 delete active_requests_;
1255
1/2
✓ Branch 0 taken 276 times.
✗ Branch 1 not taken.
276 delete pool_handles_idle_;
1256
1/2
✓ Branch 0 taken 276 times.
✗ Branch 1 not taken.
276 delete pool_handles_inuse_;
1257
1/2
✓ Branch 0 taken 276 times.
✗ Branch 1 not taken.
276 delete curl_sharehandles_;
1258
1/2
✓ Branch 0 taken 276 times.
✗ Branch 1 not taken.
276 delete sharehandles_;
1259
1/2
✓ Branch 0 taken 276 times.
✗ Branch 1 not taken.
276 delete user_agent_;
1260 276 curl_multi_cleanup(curl_multi_);
1261
1262
1/2
✓ Branch 0 taken 276 times.
✗ Branch 1 not taken.
276 delete statistics_;
1263
1264
1/2
✓ Branch 0 taken 276 times.
✗ Branch 1 not taken.
276 delete available_jobs_;
1265
1266 276 curl_global_cleanup();
1267 276 }
1268
1269 /**
1270 * Spawns the I/O worker thread. No way back except ~S3FanoutManager.
1271 */
1272 276 void S3FanoutManager::Spawn() {
1273 276 LogCvmfs(kLogS3Fanout, kLogDebug, "S3FanoutManager spawned");
1274
1275 276 const int retval = pthread_create(&thread_upload_, NULL, MainUpload,
1276 static_cast<void *>(this));
1277
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 276 times.
276 assert(retval == 0);
1278
1279 276 atomic_inc32(&multi_threaded_);
1280 276 }
1281
1282 69 const Statistics &S3FanoutManager::GetStatistics() { return *statistics_; }
1283
1284 /**
1285 * Push new job to be uploaded to the S3 cloud storage.
1286 */
1287 14122 void S3FanoutManager::PushNewJob(JobInfo *info) {
1288 14122 available_jobs_->Increment();
1289 14122 WritePipe(pipe_jobs_[1], &info, sizeof(info));
1290 14122 }
1291
1292 /**
1293 * Push completed job to list of completed jobs
1294 */
1295 14398 void S3FanoutManager::PushCompletedJob(JobInfo *info) {
1296 14398 WritePipe(pipe_completed_[1], &info, sizeof(info));
1297 14398 }
1298
1299 /**
1300 * Pop completed job
1301 */
1302 14398 JobInfo *S3FanoutManager::PopCompletedJob() {
1303 JobInfo *info;
1304
1/2
✓ Branch 1 taken 14398 times.
✗ Branch 2 not taken.
14398 ReadPipe(pipe_completed_[0], &info, sizeof(info));
1305 14398 return info;
1306 }
1307
1308 //------------------------------------------------------------------------------
1309
1310
1311 string Statistics::Print() const {
1312 return "Transferred Bytes: " + StringifyInt(uint64_t(transferred_bytes))
1313 + "\n" + "Transfer duration: " + StringifyInt(uint64_t(transfer_time))
1314 + " s\n" + "Number of requests: " + StringifyInt(num_requests) + "\n"
1315 + "Number of retries: " + StringifyInt(num_retries) + "\n";
1316 }
1317
1318 } // namespace s3fanout
1319