GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/network/s3fanout.cc
Date: 2025-07-13 02:35:07
Exec Total Coverage
Lines: 560 789 71.0%
Branches: 403 1149 35.1%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 *
4 * Runs a thread using libcurls asynchronous I/O mode to push data to S3
5 */
6
7 #include "s3fanout.h"
8
9 #include <pthread.h>
10
11 #include <algorithm>
12 #include <cassert>
13 #include <cerrno>
14 #include <utility>
15
16 #include "upload_facility.h"
17 #include "util/concurrency.h"
18 #include "util/exception.h"
19 #include "util/platform.h"
20 #include "util/posix.h"
21 #include "util/string.h"
22
23 using namespace std; // NOLINT
24
25 namespace s3fanout {
26
27 const char *S3FanoutManager::kCacheControlCas = "Cache-Control: max-age=259200";
28 const char
29 *S3FanoutManager::kCacheControlDotCvmfs = "Cache-Control: max-age=61";
30 const unsigned S3FanoutManager::kDefault429ThrottleMs = 250;
31 const unsigned S3FanoutManager::kMax429ThrottleMs = 10000;
32 const unsigned S3FanoutManager::kThrottleReportIntervalSec = 10;
33 const unsigned S3FanoutManager::kDefaultHTTPPort = 80;
34 const unsigned S3FanoutManager::kDefaultHTTPSPort = 443;
35
36
37 /**
38 * Parses Retry-After and X-Retry-In headers attached to HTTP 429 responses
39 */
40 214 void S3FanoutManager::DetectThrottleIndicator(const std::string &header,
41 JobInfo *info) {
42 214 std::string value_str;
43
4/6
✓ Branch 2 taken 214 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 214 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 97 times.
✓ Branch 10 taken 117 times.
214 if (HasPrefix(header, "retry-after:", true))
44
1/2
✓ Branch 1 taken 97 times.
✗ Branch 2 not taken.
97 value_str = header.substr(12);
45
4/6
✓ Branch 2 taken 214 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 214 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 55 times.
✓ Branch 10 taken 159 times.
214 if (HasPrefix(header, "x-retry-in:", true))
46
1/2
✓ Branch 1 taken 55 times.
✗ Branch 2 not taken.
55 value_str = header.substr(11);
47
48
1/2
✓ Branch 1 taken 214 times.
✗ Branch 2 not taken.
214 value_str = Trim(value_str, true /* trim_newline */);
49
2/2
✓ Branch 1 taken 130 times.
✓ Branch 2 taken 84 times.
214 if (!value_str.empty()) {
50
1/2
✓ Branch 1 taken 130 times.
✗ Branch 2 not taken.
130 const unsigned value_numeric = String2Uint64(value_str);
51
2/4
✓ Branch 2 taken 130 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 130 times.
✗ Branch 6 not taken.
260 const unsigned value_ms = HasSuffix(value_str, "ms", true /* ignore_case */)
52
2/2
✓ Branch 0 taken 55 times.
✓ Branch 1 taken 75 times.
130 ? value_numeric
53 130 : (value_numeric * 1000);
54
2/2
✓ Branch 0 taken 119 times.
✓ Branch 1 taken 11 times.
130 if (value_ms > 0)
55 119 info->throttle_ms = std::min(value_ms, kMax429ThrottleMs);
56 }
57 214 }
58
59
60 /**
61 * Called by curl for every HTTP header. Not called for file:// transfers.
62 */
63 18410 static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb,
64 void *info_link) {
65 18410 const size_t num_bytes = size * nmemb;
66
1/2
✓ Branch 2 taken 18410 times.
✗ Branch 3 not taken.
18410 const string header_line(static_cast<const char *>(ptr), num_bytes);
67 18410 JobInfo *info = static_cast<JobInfo *>(info_link);
68
69 // Check for http status code errors
70
4/6
✓ Branch 2 taken 18410 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 18410 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 6130 times.
✓ Branch 10 taken 12280 times.
18410 if (HasPrefix(header_line, "HTTP/1.", false)) {
71
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 6130 times.
6130 if (header_line.length() < 10)
72 return 0;
73
74 unsigned i;
75
5/6
✓ Branch 1 taken 12260 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 6130 times.
✓ Branch 5 taken 6130 times.
✓ Branch 6 taken 6130 times.
✓ Branch 7 taken 6130 times.
12260 for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {
76 }
77
78
2/2
✓ Branch 1 taken 3060 times.
✓ Branch 2 taken 3070 times.
6130 if (header_line[i] == '2') {
79 3060 return num_bytes;
80 } else {
81
1/2
✓ Branch 2 taken 3070 times.
✗ Branch 3 not taken.
3070 LogCvmfs(kLogS3Fanout, kLogDebug, "http status error code [info %p]: %s",
82 info, header_line.c_str());
83
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3070 times.
3070 if (header_line.length() < i + 3) {
84 LogCvmfs(kLogS3Fanout, kLogStderr, "S3: invalid HTTP response '%s'",
85 header_line.c_str());
86 info->error_code = kFailOther;
87 return 0;
88 }
89
2/4
✓ Branch 3 taken 3070 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 3070 times.
✗ Branch 7 not taken.
3070 info->http_error = String2Int64(string(&header_line[i], 3));
90
91
2/6
✓ Branch 0 taken 20 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 3050 times.
✗ Branch 5 not taken.
3070 switch (info->http_error) {
92 20 case 429:
93 20 info->error_code = kFailRetry;
94 20 info->throttle_ms = S3FanoutManager::kDefault429ThrottleMs;
95 20 info->throttle_timestamp = platform_monotonic_time();
96 20 return num_bytes;
97 case 503:
98 case 502: // Can happen if the S3 gateway-backend connection breaks
99 case 500: // sometimes see this as a transient error from S3
100 info->error_code = kFailServiceUnavailable;
101 break;
102 case 501:
103 case 400:
104 info->error_code = kFailBadRequest;
105 break;
106 case 403:
107 info->error_code = kFailForbidden;
108 break;
109 3050 case 404:
110 3050 info->error_code = kFailNotFound;
111 3050 return num_bytes;
112 default:
113 info->error_code = kFailOther;
114 }
115 return 0;
116 }
117 }
118
119
2/2
✓ Branch 0 taken 60 times.
✓ Branch 1 taken 12220 times.
12280 if (info->error_code == kFailRetry) {
120
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 S3FanoutManager::DetectThrottleIndicator(header_line, info);
121 }
122
123 12280 return num_bytes;
124 18410 }
125
126
127 /**
128 * Called by curl for every new chunk to upload.
129 */
130 78345 static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb,
131 void *info_link) {
132 78345 const size_t num_bytes = size * nmemb;
133 78345 JobInfo *info = static_cast<JobInfo *>(info_link);
134
135 78345 LogCvmfs(kLogS3Fanout, kLogDebug, "Data callback with %zu bytes", num_bytes);
136
137
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 78345 times.
78345 if (num_bytes == 0)
138 return 0;
139
140 78345 const uint64_t read_bytes = info->origin->Read(ptr, num_bytes);
141
142 78345 LogCvmfs(kLogS3Fanout, kLogDebug, "source buffer pushed out %lu bytes",
143 read_bytes);
144
145 78345 return read_bytes;
146 }
147
148
149 /**
150 * For the time being, ignore all received information in the HTTP body
151 */
152 static size_t CallbackCurlBody(char * /*ptr*/, size_t size, size_t nmemb,
153 void * /*userdata*/) {
154 return size * nmemb;
155 }
156
157
158 /**
159 * Called when new curl sockets arrive or existing curl sockets depart.
160 */
161 13610 int S3FanoutManager::CallbackCurlSocket(CURL *easy, curl_socket_t s, int action,
162 void *userp, void *socketp) {
163 13610 S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(userp);
164 13610 LogCvmfs(kLogS3Fanout, kLogDebug,
165 "CallbackCurlSocket called with easy "
166 "handle %p, socket %d, action %d, up %p, "
167 "sp %p, fds_inuse %d, jobs %d",
168 easy, s, action, userp, socketp, s3fanout_mgr->watch_fds_inuse_,
169 13610 s3fanout_mgr->available_jobs_->Get());
170
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13610 times.
13610 if (action == CURL_POLL_NONE)
171 return 0;
172
173 // Find s in watch_fds_
174 // First 2 fds are job and terminate pipes (not curl related)
175 unsigned index;
176
2/2
✓ Branch 0 taken 21780 times.
✓ Branch 1 taken 6130 times.
27910 for (index = 2; index < s3fanout_mgr->watch_fds_inuse_; ++index) {
177
2/2
✓ Branch 0 taken 7480 times.
✓ Branch 1 taken 14300 times.
21780 if (s3fanout_mgr->watch_fds_[index].fd == s)
178 7480 break;
179 }
180 // Or create newly
181
2/2
✓ Branch 0 taken 6130 times.
✓ Branch 1 taken 7480 times.
13610 if (index == s3fanout_mgr->watch_fds_inuse_) {
182 // Extend array if necessary
183
2/2
✓ Branch 0 taken 10 times.
✓ Branch 1 taken 6120 times.
6130 if (s3fanout_mgr->watch_fds_inuse_ == s3fanout_mgr->watch_fds_size_) {
184 10 s3fanout_mgr->watch_fds_size_ *= 2;
185 10 s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
186 10 srealloc(s3fanout_mgr->watch_fds_,
187 10 s3fanout_mgr->watch_fds_size_ * sizeof(struct pollfd)));
188 }
189 6130 s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].fd = s;
190 6130 s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].events = 0;
191 6130 s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].revents = 0;
192 6130 s3fanout_mgr->watch_fds_inuse_++;
193 }
194
195
4/5
✓ Branch 0 taken 6130 times.
✓ Branch 1 taken 30 times.
✓ Branch 2 taken 1320 times.
✓ Branch 3 taken 6130 times.
✗ Branch 4 not taken.
13610 switch (action) {
196 6130 case CURL_POLL_IN:
197 6130 s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI;
198 6130 break;
199 30 case CURL_POLL_OUT:
200 30 s3fanout_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND;
201 30 break;
202 1320 case CURL_POLL_INOUT:
203 1320 s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI | POLLOUT
204 | POLLWRBAND;
205 1320 break;
206 6130 case CURL_POLL_REMOVE:
207
2/2
✓ Branch 0 taken 980 times.
✓ Branch 1 taken 5150 times.
6130 if (index < s3fanout_mgr->watch_fds_inuse_ - 1)
208 s3fanout_mgr
209 980 ->watch_fds_[index] = s3fanout_mgr->watch_fds_
210 980 [s3fanout_mgr->watch_fds_inuse_ - 1];
211 6130 s3fanout_mgr->watch_fds_inuse_--;
212 // Shrink array if necessary
213
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6130 times.
6130 if ((s3fanout_mgr->watch_fds_inuse_ > s3fanout_mgr->watch_fds_max_)
214 && (s3fanout_mgr->watch_fds_inuse_
215 < s3fanout_mgr->watch_fds_size_ / 2)) {
216 s3fanout_mgr->watch_fds_size_ /= 2;
217 s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
218 srealloc(s3fanout_mgr->watch_fds_,
219 s3fanout_mgr->watch_fds_size_ * sizeof(struct pollfd)));
220 }
221 6130 break;
222 default:
223 PANIC(NULL);
224 }
225
226 13610 return 0;
227 }
228
229
230 /**
231 * Worker thread event loop.
232 */
233 60 void *S3FanoutManager::MainUpload(void *data) {
234
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread started");
235 60 S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(data);
236
237 60 s3fanout_mgr->InitPipeWatchFds();
238
239 // Don't schedule more jobs into the multi handle than the maximum number of
240 // parallel connections. This should prevent starvation and thus a timeout
241 // of the authorization header (CVM-1339).
242 60 unsigned jobs_in_flight = 0;
243
244 while (true) {
245 // Check events with 100ms timeout
246 83725 const int timeout_ms = 100;
247
1/2
✓ Branch 1 taken 83725 times.
✗ Branch 2 not taken.
83725 int retval = poll(s3fanout_mgr->watch_fds_, s3fanout_mgr->watch_fds_inuse_,
248 timeout_ms);
249
2/2
✓ Branch 0 taken 505 times.
✓ Branch 1 taken 83220 times.
83725 if (retval == 0) {
250 // Handle timeout
251 505 int still_running = 0;
252
1/2
✓ Branch 1 taken 505 times.
✗ Branch 2 not taken.
505 retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
253 CURL_SOCKET_TIMEOUT, 0, &still_running);
254
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 505 times.
505 if (retval != CURLM_OK) {
255 LogCvmfs(kLogS3Fanout, kLogStderr, "Error, timeout due to: %d", retval);
256 assert(retval == CURLM_OK);
257 }
258
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 83220 times.
83220 } else if (retval < 0) {
259 assert(errno == EINTR);
260 continue;
261 }
262
263 // Terminate I/O thread
264
2/2
✓ Branch 0 taken 60 times.
✓ Branch 1 taken 83665 times.
83725 if (s3fanout_mgr->watch_fds_[0].revents)
265 60 break;
266
267 // New job incoming
268
2/2
✓ Branch 0 taken 3070 times.
✓ Branch 1 taken 80595 times.
83665 if (s3fanout_mgr->watch_fds_[1].revents) {
269 3070 s3fanout_mgr->watch_fds_[1].revents = 0;
270 JobInfo *info;
271
1/2
✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
3070 ReadPipe(s3fanout_mgr->pipe_jobs_[0], &info, sizeof(info));
272
1/2
✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
3070 CURL *handle = s3fanout_mgr->AcquireCurlHandle();
273
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3070 times.
3070 if (handle == NULL) {
274 PANIC(kLogStderr, "Failed to acquire CURL handle.");
275 }
276
1/2
✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
3070 const s3fanout::Failures init_failure = s3fanout_mgr->InitializeRequest(
277 info, handle);
278
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3070 times.
3070 if (init_failure != s3fanout::kFailOk) {
279 PANIC(kLogStderr,
280 "Failed to initialize CURL handle (error: %d - %s | errno: %d)",
281 init_failure, Code2Ascii(init_failure), errno);
282 }
283
1/2
✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
3070 s3fanout_mgr->SetUrlOptions(info);
284
285
1/2
✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
3070 curl_multi_add_handle(s3fanout_mgr->curl_multi_, handle);
286
1/2
✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
3070 s3fanout_mgr->active_requests_->insert(info);
287 3070 jobs_in_flight++;
288 3070 int still_running = 0, retval = 0;
289
1/2
✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
3070 retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
290 CURL_SOCKET_TIMEOUT, 0, &still_running);
291
292
1/2
✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
3070 LogCvmfs(kLogS3Fanout, kLogDebug, "curl_multi_socket_action: %d - %d",
293 retval, still_running);
294 }
295
296
297 // Activity on curl sockets
298 // Within this loop the curl_multi_socket_action() may cause socket(s)
299 // to be removed from watch_fds_. If a socket is removed it is replaced
300 // by the socket at the end of the array and the inuse count is decreased.
301 // Therefore loop over the array in reverse order.
302 // First 2 fds are job and terminate pipes (not curl related)
303
2/2
✓ Branch 0 taken 188600 times.
✓ Branch 1 taken 83665 times.
272265 for (int32_t i = s3fanout_mgr->watch_fds_inuse_ - 1; i >= 2; --i) {
304
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 188600 times.
188600 if (static_cast<uint32_t>(i) >= s3fanout_mgr->watch_fds_inuse_) {
305 continue;
306 }
307
2/2
✓ Branch 0 taken 85345 times.
✓ Branch 1 taken 103255 times.
188600 if (s3fanout_mgr->watch_fds_[i].revents) {
308 85345 int ev_bitmask = 0;
309
2/2
✓ Branch 0 taken 9170 times.
✓ Branch 1 taken 76175 times.
85345 if (s3fanout_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI))
310 9170 ev_bitmask |= CURL_CSELECT_IN;
311
2/2
✓ Branch 0 taken 76175 times.
✓ Branch 1 taken 9170 times.
85345 if (s3fanout_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
312 76175 ev_bitmask |= CURL_CSELECT_OUT;
313 85345 if (s3fanout_mgr->watch_fds_[i].revents
314
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 85345 times.
85345 & (POLLERR | POLLHUP | POLLNVAL))
315 ev_bitmask |= CURL_CSELECT_ERR;
316 85345 s3fanout_mgr->watch_fds_[i].revents = 0;
317
318 85345 int still_running = 0;
319 85345 retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
320
1/2
✓ Branch 1 taken 85345 times.
✗ Branch 2 not taken.
85345 s3fanout_mgr->watch_fds_[i].fd,
321 ev_bitmask,
322 &still_running);
323 }
324 }
325
326 // Check if transfers are completed
327 CURLMsg *curl_msg;
328 int msgs_in_queue;
329
3/4
✓ Branch 1 taken 89795 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 6130 times.
✓ Branch 4 taken 83665 times.
89795 while ((curl_msg = curl_multi_info_read(s3fanout_mgr->curl_multi_,
330 &msgs_in_queue))) {
331
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6130 times.
6130 assert(curl_msg->msg == CURLMSG_DONE);
332
333 6130 s3fanout_mgr->statistics_->num_requests++;
334 JobInfo *info;
335 6130 CURL *easy_handle = curl_msg->easy_handle;
336 6130 const int curl_error = curl_msg->data.result;
337
1/2
✓ Branch 1 taken 6130 times.
✗ Branch 2 not taken.
6130 curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
338
339
1/2
✓ Branch 1 taken 6130 times.
✗ Branch 2 not taken.
6130 curl_multi_remove_handle(s3fanout_mgr->curl_multi_, easy_handle);
340
3/4
✓ Branch 1 taken 6130 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 3060 times.
✓ Branch 4 taken 3070 times.
6130 if (s3fanout_mgr->VerifyAndFinalize(curl_error, info)) {
341
1/2
✓ Branch 1 taken 3060 times.
✗ Branch 2 not taken.
3060 curl_multi_add_handle(s3fanout_mgr->curl_multi_, easy_handle);
342 3060 int still_running = 0;
343
1/2
✓ Branch 1 taken 3060 times.
✗ Branch 2 not taken.
3060 curl_multi_socket_action(s3fanout_mgr->curl_multi_, CURL_SOCKET_TIMEOUT,
344 0, &still_running);
345 } else {
346 // Return easy handle into pool and write result back
347 3070 jobs_in_flight--;
348
1/2
✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
3070 s3fanout_mgr->active_requests_->erase(info);
349
1/2
✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
3070 s3fanout_mgr->ReleaseCurlHandle(info, easy_handle);
350
1/2
✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
3070 s3fanout_mgr->available_jobs_->Decrement();
351
352 // Add to list of completed jobs
353
1/2
✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
3070 s3fanout_mgr->PushCompletedJob(info);
354 }
355 }
356 83665 }
357
358 60 set<CURL *>::iterator i = s3fanout_mgr->pool_handles_inuse_->begin();
359 60 const set<CURL *>::const_iterator i_end = s3fanout_mgr->pool_handles_inuse_
360 60 ->end();
361
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 60 times.
60 for (; i != i_end; ++i) {
362 curl_multi_remove_handle(s3fanout_mgr->curl_multi_, *i);
363 curl_easy_cleanup(*i);
364 }
365 60 s3fanout_mgr->pool_handles_inuse_->clear();
366 60 free(s3fanout_mgr->watch_fds_);
367
368
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread terminated");
369 60 return NULL;
370 }
371
372
373 /**
374 * Gets an idle CURL handle from the pool. Creates a new one and adds it to
375 * the pool if necessary.
376 */
377 3070 CURL *S3FanoutManager::AcquireCurlHandle() const {
378 CURL *handle;
379
380 3070 const MutexLockGuard guard(curl_handle_lock_);
381
382
2/2
✓ Branch 1 taken 245 times.
✓ Branch 2 taken 2825 times.
3070 if (pool_handles_idle_->empty()) {
383 CURLcode retval;
384
385 // Create a new handle
386
1/2
✓ Branch 1 taken 245 times.
✗ Branch 2 not taken.
245 handle = curl_easy_init();
387
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 245 times.
245 assert(handle != NULL);
388
389 // Other settings
390
1/2
✓ Branch 1 taken 245 times.
✗ Branch 2 not taken.
245 retval = curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
391
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 245 times.
245 assert(retval == CURLE_OK);
392
1/2
✓ Branch 1 taken 245 times.
✗ Branch 2 not taken.
245 retval = curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION,
393 CallbackCurlHeader);
394
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 245 times.
245 assert(retval == CURLE_OK);
395
1/2
✓ Branch 1 taken 245 times.
✗ Branch 2 not taken.
245 retval = curl_easy_setopt(handle, CURLOPT_READFUNCTION, CallbackCurlData);
396
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 245 times.
245 assert(retval == CURLE_OK);
397
1/2
✓ Branch 1 taken 245 times.
✗ Branch 2 not taken.
245 retval = curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlBody);
398
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 245 times.
245 assert(retval == CURLE_OK);
399 } else {
400 2825 handle = *(pool_handles_idle_->begin());
401
1/2
✓ Branch 2 taken 2825 times.
✗ Branch 3 not taken.
2825 pool_handles_idle_->erase(pool_handles_idle_->begin());
402 }
403
404
1/2
✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
3070 pool_handles_inuse_->insert(handle);
405
406 3070 return handle;
407 3070 }
408
409
410 3070 void S3FanoutManager::ReleaseCurlHandle(JobInfo *info, CURL *handle) const {
411
1/2
✓ Branch 0 taken 3070 times.
✗ Branch 1 not taken.
3070 if (info->http_headers) {
412
1/2
✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
3070 curl_slist_free_all(info->http_headers);
413 3070 info->http_headers = NULL;
414 }
415
416 3070 const MutexLockGuard guard(curl_handle_lock_);
417
418
1/2
✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
3070 const set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
419
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 3070 times.
3070 assert(elem != pool_handles_inuse_->end());
420
421
2/2
✓ Branch 1 taken 145 times.
✓ Branch 2 taken 2925 times.
3070 if (pool_handles_idle_->size() > config_.pool_max_handles) {
422
1/2
✓ Branch 1 taken 145 times.
✗ Branch 2 not taken.
145 const CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, NULL);
423
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 145 times.
145 assert(retval == CURLE_OK);
424
1/2
✓ Branch 1 taken 145 times.
✗ Branch 2 not taken.
145 curl_easy_cleanup(handle);
425 const std::map<CURL *, S3FanOutDnsEntry *>::size_type
426
1/2
✓ Branch 1 taken 145 times.
✗ Branch 2 not taken.
145 retitems = curl_sharehandles_->erase(handle);
427
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 145 times.
145 assert(retitems == 1);
428 } else {
429
1/2
✓ Branch 1 taken 2925 times.
✗ Branch 2 not taken.
2925 pool_handles_idle_->insert(handle);
430 }
431
432
1/2
✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
3070 pool_handles_inuse_->erase(elem);
433 3070 }
434
435 60 void S3FanoutManager::InitPipeWatchFds() {
436
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
60 assert(watch_fds_inuse_ == 0);
437
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
60 assert(watch_fds_size_ >= 2);
438 60 watch_fds_[0].fd = pipe_terminate_[0];
439 60 watch_fds_[0].events = POLLIN | POLLPRI;
440 60 watch_fds_[0].revents = 0;
441 60 ++watch_fds_inuse_;
442 60 watch_fds_[1].fd = pipe_jobs_[0];
443 60 watch_fds_[1].events = POLLIN | POLLPRI;
444 60 watch_fds_[1].revents = 0;
445 60 ++watch_fds_inuse_;
446 60 }
447
448 /**
449 * The Amazon AWS 2 authorization header according to
450 * http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html#ConstructingTheAuthenticationHeader
451 */
452 6110 bool S3FanoutManager::MkV2Authz(const JobInfo &info,
453 vector<string> *headers) const {
454 6110 string payload_hash;
455
1/2
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
6110 const bool retval = MkPayloadHash(info, &payload_hash);
456
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
6110 if (!retval)
457 return false;
458
1/2
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
6110 const string content_type = GetContentType(info);
459
1/2
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
6110 const string request = GetRequestString(info);
460
461
1/2
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
6110 const string timestamp = RfcTimestamp();
462
5/10
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 6110 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 6110 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 6110 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 6110 times.
✗ Branch 14 not taken.
12220 string to_sign = request + "\n" + payload_hash + "\n" + content_type + "\n"
463
2/4
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 6110 times.
✗ Branch 5 not taken.
12220 + timestamp + "\n";
464
2/4
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 6110 times.
✗ Branch 4 not taken.
6110 if (config_.x_amz_acl != "") {
465
2/4
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 6110 times.
✗ Branch 5 not taken.
12220 to_sign += "x-amz-acl:" + config_.x_amz_acl + "\n" + // default ACL
466
5/10
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 6110 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 6110 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 6110 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 6110 times.
✗ Branch 14 not taken.
12220 "/" + config_.bucket + "/" + info.object_key;
467 }
468
1/2
✓ Branch 3 taken 6110 times.
✗ Branch 4 not taken.
6110 LogCvmfs(kLogS3Fanout, kLogDebug, "%s string to sign for: %s",
469 request.c_str(), info.object_key.c_str());
470
471
1/2
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
6110 shash::Any hmac;
472 6110 hmac.algorithm = shash::kSha1;
473
1/2
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
6110 shash::Hmac(config_.secret_key,
474 6110 reinterpret_cast<const unsigned char *>(to_sign.data()),
475 6110 to_sign.length(), &hmac);
476
477
3/6
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 6110 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 6110 times.
✗ Branch 8 not taken.
18330 headers->push_back("Authorization: AWS " + config_.access_key + ":"
478
3/6
✓ Branch 2 taken 6110 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 6110 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 6110 times.
✗ Branch 9 not taken.
30550 + Base64(string(reinterpret_cast<char *>(hmac.digest),
479 6110 hmac.GetDigestSize())));
480
2/4
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 6110 times.
✗ Branch 5 not taken.
6110 headers->push_back("Date: " + timestamp);
481
2/4
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 6110 times.
✗ Branch 5 not taken.
6110 headers->push_back("X-Amz-Acl: " + config_.x_amz_acl);
482
2/2
✓ Branch 1 taken 3040 times.
✓ Branch 2 taken 3070 times.
6110 if (!payload_hash.empty())
483
2/4
✓ Branch 1 taken 3040 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 3040 times.
✗ Branch 5 not taken.
3040 headers->push_back("Content-MD5: " + payload_hash);
484
2/2
✓ Branch 1 taken 3040 times.
✓ Branch 2 taken 3070 times.
6110 if (!content_type.empty())
485
2/4
✓ Branch 1 taken 3040 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 3040 times.
✗ Branch 5 not taken.
3040 headers->push_back("Content-Type: " + content_type);
486 6110 return true;
487 6110 }
488
489
490 string S3FanoutManager::GetUriEncode(const string &val,
491 bool encode_slash) const {
492 string result;
493 const unsigned len = val.length();
494 result.reserve(len);
495 for (unsigned i = 0; i < len; ++i) {
496 const char c = val[i];
497 if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')
498 || (c >= '0' && c <= '9') || c == '_' || c == '-' || c == '~'
499 || c == '.') {
500 result.push_back(c);
501 } else if (c == '/') {
502 if (encode_slash) {
503 result += "%2F";
504 } else {
505 result.push_back(c);
506 }
507 } else {
508 result.push_back('%');
509 result.push_back((c / 16) + ((c / 16 <= 9) ? '0' : 'A' - 10));
510 result.push_back((c % 16) + ((c % 16 <= 9) ? '0' : 'A' - 10));
511 }
512 }
513 return result;
514 }
515
516
517 string S3FanoutManager::GetAwsV4SigningKey(const string &date) const {
518 if (last_signing_key_.first == date)
519 return last_signing_key_.second;
520
521 const string date_key = shash::Hmac256("AWS4" + config_.secret_key, date,
522 true);
523 const string date_region_key = shash::Hmac256(date_key, config_.region, true);
524 const string date_region_service_key = shash::Hmac256(date_region_key, "s3",
525 true);
526 string signing_key = shash::Hmac256(date_region_service_key, "aws4_request",
527 true);
528 last_signing_key_.first = date;
529 last_signing_key_.second = signing_key;
530 return signing_key;
531 }
532
533
534 /**
535 * The Amazon AWS4 authorization header according to
536 * http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-auth-using-authorization-header.html
537 */
538 bool S3FanoutManager::MkV4Authz(const JobInfo &info,
539 vector<string> *headers) const {
540 string payload_hash;
541 const bool retval = MkPayloadHash(info, &payload_hash);
542 if (!retval)
543 return false;
544 const string content_type = GetContentType(info);
545 const string timestamp = IsoTimestamp();
546 const string date = timestamp.substr(0, 8);
547 vector<string> tokens = SplitString(complete_hostname_, ':');
548 assert(tokens.size() <= 2);
549 string canonical_hostname = tokens[0];
550
551 // if we could split the hostname in two and if the port is *NOT* a default
552 // one
553 if (tokens.size() == 2
554 && !((String2Uint64(tokens[1]) == kDefaultHTTPPort)
555 || (String2Uint64(tokens[1]) == kDefaultHTTPSPort)))
556 canonical_hostname += ":" + tokens[1];
557
558 string signed_headers;
559 string canonical_headers;
560 if (!content_type.empty()) {
561 signed_headers += "content-type;";
562 headers->push_back("Content-Type: " + content_type);
563 canonical_headers += "content-type:" + content_type + "\n";
564 }
565 if (config_.x_amz_acl != "") {
566 signed_headers += "host;x-amz-acl;x-amz-content-sha256;x-amz-date";
567 } else {
568 signed_headers += "host;x-amz-content-sha256;x-amz-date";
569 }
570 canonical_headers += "host:" + canonical_hostname + "\n";
571 if (config_.x_amz_acl != "") {
572 canonical_headers += "x-amz-acl:" + config_.x_amz_acl + "\n";
573 }
574 canonical_headers += "x-amz-content-sha256:" + payload_hash + "\n"
575 + "x-amz-date:" + timestamp + "\n";
576
577 const string scope = date + "/" + config_.region + "/s3/aws4_request";
578 const string uri = config_.dns_buckets ? (string("/") + info.object_key)
579 : (string("/") + config_.bucket + "/"
580 + info.object_key);
581
582 const string canonical_request = GetRequestString(info) + "\n"
583 + GetUriEncode(uri, false) + "\n" + "\n"
584 + canonical_headers + "\n" + signed_headers
585 + "\n" + payload_hash;
586
587 const string hash_request = shash::Sha256String(canonical_request.c_str());
588
589 const string string_to_sign = "AWS4-HMAC-SHA256\n" + timestamp + "\n" + scope
590 + "\n" + hash_request;
591
592 const string signing_key = GetAwsV4SigningKey(date);
593 const string signature = shash::Hmac256(signing_key, string_to_sign);
594
595 headers->push_back("X-Amz-Acl: " + config_.x_amz_acl);
596 headers->push_back("X-Amz-Content-Sha256: " + payload_hash);
597 headers->push_back("X-Amz-Date: " + timestamp);
598 headers->push_back("Authorization: AWS4-HMAC-SHA256 "
599 "Credential="
600 + config_.access_key + "/" + scope
601 + ","
602 "SignedHeaders="
603 + signed_headers
604 + ","
605 "Signature="
606 + signature);
607 return true;
608 }
609
610 /**
611 * The Azure Blob authorization header according to
612 * https://docs.microsoft.com/en-us/rest/api/storageservices/authorize-with-shared-key
613 */
614 bool S3FanoutManager::MkAzureAuthz(const JobInfo &info,
615 vector<string> *headers) const {
616 const string timestamp = RfcTimestamp();
617 const string canonical_headers = "x-ms-blob-type:BlockBlob\nx-ms-date:"
618 + timestamp + "\nx-ms-version:2011-08-18";
619 const string canonical_resource = "/" + config_.access_key + "/"
620 + config_.bucket + "/" + info.object_key;
621
622 string string_to_sign;
623 if ((info.request == JobInfo::kReqHeadOnly)
624 || (info.request == JobInfo::kReqHeadPut)
625 || (info.request == JobInfo::kReqDelete)) {
626 string_to_sign = GetRequestString(info) + string("\n\n\n")
627 + "\n\n\n\n\n\n\n\n\n" + canonical_headers + "\n"
628 + canonical_resource;
629 } else {
630 string_to_sign = GetRequestString(info) + string("\n\n\n")
631 + string(StringifyInt(info.origin->GetSize()))
632 + "\n\n\n\n\n\n\n\n\n" + canonical_headers + "\n"
633 + canonical_resource;
634 }
635
636 string signing_key;
637 const int retval = Debase64(config_.secret_key, &signing_key);
638 if (!retval)
639 return false;
640
641 const string signature = shash::Hmac256(signing_key, string_to_sign, true);
642
643 headers->push_back("x-ms-date: " + timestamp);
644 headers->push_back("x-ms-version: 2011-08-18");
645 headers->push_back("Authorization: SharedKey " + config_.access_key + ":"
646 + Base64(signature));
647 headers->push_back("x-ms-blob-type: BlockBlob");
648 return true;
649 }
650
651 6110 void S3FanoutManager::InitializeDnsSettingsCurl(CURL *handle,
652 CURLSH *sharehandle,
653 curl_slist *clist) const {
654 6110 CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, sharehandle);
655
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
6110 assert(retval == CURLE_OK);
656 6110 retval = curl_easy_setopt(handle, CURLOPT_RESOLVE, clist);
657
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
6110 assert(retval == CURLE_OK);
658 6110 }
659
660
661 6110 int S3FanoutManager::InitializeDnsSettings(CURL *handle,
662 std::string host_with_port) const {
663 // Use existing handle
664 const std::map<CURL *, S3FanOutDnsEntry *>::const_iterator
665
1/2
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
6110 it = curl_sharehandles_->find(handle);
666
2/2
✓ Branch 3 taken 5865 times.
✓ Branch 4 taken 245 times.
6110 if (it != curl_sharehandles_->end()) {
667
1/2
✓ Branch 1 taken 5865 times.
✗ Branch 2 not taken.
5865 InitializeDnsSettingsCurl(handle, it->second->sharehandle,
668 5865 it->second->clist);
669 5865 return 0;
670 }
671
672 // Add protocol information for extraction of fields for DNS
673
2/4
✓ Branch 1 taken 245 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 245 times.
✗ Branch 4 not taken.
245 if (!IsHttpUrl(host_with_port))
674
2/4
✓ Branch 1 taken 245 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 245 times.
✗ Branch 5 not taken.
245 host_with_port = config_.protocol + "://" + host_with_port;
675
1/2
✓ Branch 1 taken 245 times.
✗ Branch 2 not taken.
245 const std::string remote_host = dns::ExtractHost(host_with_port);
676
1/2
✓ Branch 1 taken 245 times.
✗ Branch 2 not taken.
245 const std::string remote_port = dns::ExtractPort(host_with_port);
677
678 // If we have the name already resolved, use the least used IP
679 245 S3FanOutDnsEntry *useme = NULL;
680 245 unsigned int usemin = UINT_MAX;
681 245 std::set<S3FanOutDnsEntry *>::iterator its3 = sharehandles_->begin();
682
2/2
✓ Branch 3 taken 195 times.
✓ Branch 4 taken 245 times.
440 for (; its3 != sharehandles_->end(); ++its3) {
683
1/2
✓ Branch 2 taken 195 times.
✗ Branch 3 not taken.
195 if ((*its3)->dns_name == remote_host) {
684
1/2
✓ Branch 1 taken 195 times.
✗ Branch 2 not taken.
195 if (usemin >= (*its3)->counter) {
685 195 usemin = (*its3)->counter;
686 195 useme = (*its3);
687 }
688 }
689 }
690
2/2
✓ Branch 0 taken 195 times.
✓ Branch 1 taken 50 times.
245 if (useme != NULL) {
691
1/2
✓ Branch 1 taken 195 times.
✗ Branch 2 not taken.
195 curl_sharehandles_->insert(
692 195 std::pair<CURL *, S3FanOutDnsEntry *>(handle, useme));
693 195 useme->counter++;
694
1/2
✓ Branch 1 taken 195 times.
✗ Branch 2 not taken.
195 InitializeDnsSettingsCurl(handle, useme->sharehandle, useme->clist);
695 195 return 0;
696 }
697
698 // We need to resolve the hostname
699 // TODO(ssheikki): support ipv6 also... if (opt_ipv4_only_)
700
1/2
✓ Branch 1 taken 50 times.
✗ Branch 2 not taken.
50 const dns::Host host = resolver_->Resolve(remote_host);
701
1/2
✓ Branch 2 taken 50 times.
✗ Branch 3 not taken.
50 set<string> ipv4_addresses = host.ipv4_addresses();
702 50 std::set<string>::iterator its = ipv4_addresses.begin();
703 50 S3FanOutDnsEntry *dnse = NULL;
704
2/2
✓ Branch 3 taken 50 times.
✓ Branch 4 taken 50 times.
100 for (; its != ipv4_addresses.end(); ++its) {
705
2/4
✓ Branch 1 taken 50 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 50 times.
✗ Branch 5 not taken.
50 dnse = new S3FanOutDnsEntry();
706 50 dnse->counter = 0;
707
1/2
✓ Branch 1 taken 50 times.
✗ Branch 2 not taken.
50 dnse->dns_name = remote_host;
708
4/12
✗ Branch 1 not taken.
✓ Branch 2 taken 50 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 8 taken 50 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 50 times.
✗ Branch 12 not taken.
✗ Branch 14 not taken.
✓ Branch 15 taken 50 times.
✗ Branch 18 not taken.
✗ Branch 19 not taken.
50 dnse->port = remote_port.size() == 0 ? "80" : remote_port;
709
1/2
✓ Branch 2 taken 50 times.
✗ Branch 3 not taken.
50 dnse->ip = *its;
710 50 dnse->clist = NULL;
711
1/2
✓ Branch 2 taken 50 times.
✗ Branch 3 not taken.
50 dnse->clist = curl_slist_append(
712 dnse->clist,
713
4/8
✓ Branch 1 taken 50 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 50 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 50 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 50 times.
✗ Branch 11 not taken.
100 (dnse->dns_name + ":" + dnse->port + ":" + dnse->ip).c_str());
714
1/2
✓ Branch 1 taken 50 times.
✗ Branch 2 not taken.
50 dnse->sharehandle = curl_share_init();
715
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50 times.
50 assert(dnse->sharehandle != NULL);
716
1/2
✓ Branch 1 taken 50 times.
✗ Branch 2 not taken.
50 const CURLSHcode share_retval = curl_share_setopt(
717 dnse->sharehandle, CURLSHOPT_SHARE, CURL_LOCK_DATA_DNS);
718
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50 times.
50 assert(share_retval == CURLSHE_OK);
719
1/2
✓ Branch 1 taken 50 times.
✗ Branch 2 not taken.
50 sharehandles_->insert(dnse);
720 }
721
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50 times.
50 if (dnse == NULL) {
722 LogCvmfs(kLogS3Fanout, kLogStderr | kLogSyslogErr,
723 "Error: DNS resolve failed for address '%s'.",
724 remote_host.c_str());
725 assert(dnse != NULL);
726 return -1;
727 }
728
1/2
✓ Branch 1 taken 50 times.
✗ Branch 2 not taken.
50 curl_sharehandles_->insert(
729 50 std::pair<CURL *, S3FanOutDnsEntry *>(handle, dnse));
730 50 dnse->counter++;
731
1/2
✓ Branch 1 taken 50 times.
✗ Branch 2 not taken.
50 InitializeDnsSettingsCurl(handle, dnse->sharehandle, dnse->clist);
732
733 50 return 0;
734 245 }
735
736
737 6110 bool S3FanoutManager::MkPayloadHash(const JobInfo &info,
738 string *hex_hash) const {
739
2/2
✓ Branch 0 taken 6085 times.
✓ Branch 1 taken 25 times.
6110 if ((info.request == JobInfo::kReqHeadOnly)
740
2/2
✓ Branch 0 taken 3045 times.
✓ Branch 1 taken 3040 times.
6085 || (info.request == JobInfo::kReqHeadPut)
741
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 3040 times.
3045 || (info.request == JobInfo::kReqDelete)) {
742
1/4
✓ Branch 0 taken 3070 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
3070 switch (config_.authz_method) {
743 3070 case kAuthzAwsV2:
744 3070 hex_hash->clear();
745 3070 break;
746 case kAuthzAwsV4:
747 // Sha256 over empty string
748 *hex_hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b78"
749 "52b855";
750 break;
751 case kAuthzAzure:
752 // no payload hash required for Azure signature
753 hex_hash->clear();
754 break;
755 default:
756 PANIC(NULL);
757 }
758 3070 return true;
759 }
760
761 // PUT, there is actually payload
762
1/2
✓ Branch 1 taken 3040 times.
✗ Branch 2 not taken.
3040 shash::Any payload_hash(shash::kMd5);
763
764 unsigned char *data;
765 3040 const unsigned int nbytes = info.origin->Data(
766
2/4
✓ Branch 2 taken 3040 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 3040 times.
✗ Branch 6 not taken.
3040 reinterpret_cast<void **>(&data), info.origin->GetSize(), 0);
767
2/4
✓ Branch 2 taken 3040 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 3040 times.
3040 assert(nbytes == info.origin->GetSize());
768
769
1/4
✓ Branch 0 taken 3040 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
3040 switch (config_.authz_method) {
770 3040 case kAuthzAwsV2:
771
1/2
✓ Branch 1 taken 3040 times.
✗ Branch 2 not taken.
3040 shash::HashMem(data, nbytes, &payload_hash);
772
2/4
✓ Branch 2 taken 3040 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 3040 times.
✗ Branch 6 not taken.
9120 *hex_hash = Base64(string(reinterpret_cast<char *>(payload_hash.digest),
773 6080 payload_hash.GetDigestSize()));
774 3040 return true;
775 case kAuthzAwsV4:
776 *hex_hash = shash::Sha256Mem(data, nbytes);
777 return true;
778 case kAuthzAzure:
779 // no payload hash required for Azure signature
780 hex_hash->clear();
781 return true;
782 default:
783 PANIC(NULL);
784 }
785 }
786
787 6115 string S3FanoutManager::GetRequestString(const JobInfo &info) const {
788
3/4
✓ Branch 0 taken 3065 times.
✓ Branch 1 taken 3040 times.
✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
6115 switch (info.request) {
789 3065 case JobInfo::kReqHeadOnly:
790 case JobInfo::kReqHeadPut:
791
1/2
✓ Branch 2 taken 3065 times.
✗ Branch 3 not taken.
3065 return "HEAD";
792 3040 case JobInfo::kReqPutCas:
793 case JobInfo::kReqPutDotCvmfs:
794 case JobInfo::kReqPutHtml:
795 case JobInfo::kReqPutBucket:
796
1/2
✓ Branch 2 taken 3040 times.
✗ Branch 3 not taken.
3040 return "PUT";
797 10 case JobInfo::kReqDelete:
798
1/2
✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
10 return "DELETE";
799 default:
800 PANIC(NULL);
801 }
802 }
803
804
805 6110 string S3FanoutManager::GetContentType(const JobInfo &info) const {
806
2/6
✓ Branch 0 taken 3070 times.
✓ Branch 1 taken 3040 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
6110 switch (info.request) {
807 3070 case JobInfo::kReqHeadOnly:
808 case JobInfo::kReqHeadPut:
809 case JobInfo::kReqDelete:
810
1/2
✓ Branch 2 taken 3070 times.
✗ Branch 3 not taken.
3070 return "";
811 3040 case JobInfo::kReqPutCas:
812
1/2
✓ Branch 2 taken 3040 times.
✗ Branch 3 not taken.
3040 return "application/octet-stream";
813 case JobInfo::kReqPutDotCvmfs:
814 return "application/x-cvmfs";
815 case JobInfo::kReqPutHtml:
816 return "text/html";
817 case JobInfo::kReqPutBucket:
818 return "text/xml";
819 default:
820 PANIC(NULL);
821 }
822 }
823
824
825 /**
826 * Request parameters set the URL and other options such as timeout and
827 * proxy.
828 */
829 6110 Failures S3FanoutManager::InitializeRequest(JobInfo *info, CURL *handle) const {
830 // Initialize internal download state
831 6110 info->curl_handle = handle;
832 6110 info->error_code = kFailOk;
833 6110 info->http_error = 0;
834 6110 info->num_retries = 0;
835 6110 info->backoff_ms = 0;
836 6110 info->throttle_ms = 0;
837 6110 info->throttle_timestamp = 0;
838 6110 info->http_headers = NULL;
839 // info->payload_size is needed in S3Uploader::MainCollectResults,
840 // where info->origin is already destroyed.
841
1/2
✓ Branch 2 taken 6110 times.
✗ Branch 3 not taken.
6110 info->payload_size = info->origin->GetSize();
842
843
2/4
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 6110 times.
✗ Branch 5 not taken.
6110 InitializeDnsSettings(handle, complete_hostname_);
844
845 CURLcode retval;
846
2/2
✓ Branch 0 taken 6085 times.
✓ Branch 1 taken 25 times.
6110 if ((info->request == JobInfo::kReqHeadOnly)
847
2/2
✓ Branch 0 taken 3045 times.
✓ Branch 1 taken 3040 times.
6085 || (info->request == JobInfo::kReqHeadPut)
848
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 3040 times.
3045 || (info->request == JobInfo::kReqDelete)) {
849
1/2
✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
3070 retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 0);
850
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3070 times.
3070 assert(retval == CURLE_OK);
851
1/2
✓ Branch 1 taken 3070 times.
✗ Branch 2 not taken.
3070 retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
852
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3070 times.
3070 assert(retval == CURLE_OK);
853
854
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 3065 times.
3070 if (info->request == JobInfo::kReqDelete) {
855
2/4
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 5 times.
✗ Branch 6 not taken.
5 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST,
856 GetRequestString(*info).c_str());
857
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5 times.
5 assert(retval == CURLE_OK);
858 } else {
859
1/2
✓ Branch 1 taken 3065 times.
✗ Branch 2 not taken.
3065 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
860
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3065 times.
3065 assert(retval == CURLE_OK);
861 }
862 } else {
863
1/2
✓ Branch 1 taken 3040 times.
✗ Branch 2 not taken.
3040 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
864
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3040 times.
3040 assert(retval == CURLE_OK);
865
1/2
✓ Branch 1 taken 3040 times.
✗ Branch 2 not taken.
3040 retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 1);
866
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3040 times.
3040 assert(retval == CURLE_OK);
867
1/2
✓ Branch 1 taken 3040 times.
✗ Branch 2 not taken.
3040 retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 0);
868
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3040 times.
3040 assert(retval == CURLE_OK);
869
2/4
✓ Branch 2 taken 3040 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 3040 times.
✗ Branch 6 not taken.
3040 retval = curl_easy_setopt(handle, CURLOPT_INFILESIZE_LARGE,
870 static_cast<curl_off_t>(info->origin->GetSize()));
871
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3040 times.
3040 assert(retval == CURLE_OK);
872
873
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3040 times.
3040 if (info->request == JobInfo::kReqPutDotCvmfs) {
874 info->http_headers = curl_slist_append(info->http_headers,
875 kCacheControlDotCvmfs);
876
1/2
✓ Branch 0 taken 3040 times.
✗ Branch 1 not taken.
3040 } else if (info->request == JobInfo::kReqPutCas) {
877
1/2
✓ Branch 1 taken 3040 times.
✗ Branch 2 not taken.
3040 info->http_headers = curl_slist_append(info->http_headers,
878 kCacheControlCas);
879 }
880 }
881
882 bool retval_b;
883
884 // Authorization
885 6110 vector<string> authz_headers;
886
1/4
✓ Branch 0 taken 6110 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
6110 switch (config_.authz_method) {
887 6110 case kAuthzAwsV2:
888
1/2
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
6110 retval_b = MkV2Authz(*info, &authz_headers);
889 6110 break;
890 case kAuthzAwsV4:
891 retval_b = MkV4Authz(*info, &authz_headers);
892 break;
893 case kAuthzAzure:
894 retval_b = MkAzureAuthz(*info, &authz_headers);
895 break;
896 default:
897 PANIC(NULL);
898 }
899
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
6110 if (!retval_b)
900 return kFailLocalIO;
901
2/2
✓ Branch 1 taken 24410 times.
✓ Branch 2 taken 6110 times.
30520 for (unsigned i = 0; i < authz_headers.size(); ++i) {
902
1/2
✓ Branch 2 taken 24410 times.
✗ Branch 3 not taken.
24410 info->http_headers = curl_slist_append(info->http_headers,
903 24410 authz_headers[i].c_str());
904 }
905
906 // Common headers
907
1/2
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
6110 info->http_headers = curl_slist_append(info->http_headers,
908 "Connection: Keep-Alive");
909
1/2
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
6110 info->http_headers = curl_slist_append(info->http_headers, "Pragma:");
910 // No 100-continue
911
1/2
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
6110 info->http_headers = curl_slist_append(info->http_headers, "Expect:");
912 // Strip unnecessary header
913
1/2
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
6110 info->http_headers = curl_slist_append(info->http_headers, "Accept:");
914
1/2
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
6110 info->http_headers = curl_slist_append(info->http_headers,
915 6110 user_agent_->c_str());
916
917 // Set curl parameters
918
1/2
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
6110 retval = curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
919
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
6110 assert(retval == CURLE_OK);
920
1/2
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
6110 retval = curl_easy_setopt(handle, CURLOPT_HEADERDATA,
921 static_cast<void *>(info));
922
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
6110 assert(retval == CURLE_OK);
923
1/2
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
6110 retval = curl_easy_setopt(handle, CURLOPT_READDATA,
924 static_cast<void *>(info));
925
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
6110 assert(retval == CURLE_OK);
926
1/2
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
6110 retval = curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->http_headers);
927
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
6110 assert(retval == CURLE_OK);
928
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
6110 if (opt_ipv4_only_) {
929 retval = curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
930 assert(retval == CURLE_OK);
931 }
932 // Follow HTTP redirects
933
1/2
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
6110 retval = curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L);
934
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
6110 assert(retval == CURLE_OK);
935
936
1/2
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
6110 retval = curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, info->errorbuffer);
937
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
6110 assert(retval == CURLE_OK);
938
939
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 6110 times.
6110 if (config_.protocol == "https") {
940 retval = curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 1L);
941 assert(retval == CURLE_OK);
942 retval = curl_easy_setopt(handle, CURLOPT_PROXY_SSL_VERIFYPEER, 1L);
943 assert(retval == CURLE_OK);
944 const bool add_cert = ssl_certificate_store_.ApplySslCertificatePath(
945 handle);
946 assert(add_cert);
947 }
948
949 6110 return kFailOk;
950 6110 }
951
952
953 /**
954 * Sets the URL specific options such as host to use and timeout.
955 */
956 6110 void S3FanoutManager::SetUrlOptions(JobInfo *info) const {
957 6110 CURL *curl_handle = info->curl_handle;
958 CURLcode retval;
959
960
1/2
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
6110 retval = curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT,
961 config_.opt_timeout_sec);
962
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
6110 assert(retval == CURLE_OK);
963
1/2
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
6110 retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT,
964 kLowSpeedLimit);
965
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
6110 assert(retval == CURLE_OK);
966
1/2
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
6110 retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME,
967 config_.opt_timeout_sec);
968
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
6110 assert(retval == CURLE_OK);
969
970
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
6110 if (is_curl_debug_) {
971 retval = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE, 1);
972 assert(retval == CURLE_OK);
973 }
974
975
1/2
✓ Branch 1 taken 6110 times.
✗ Branch 2 not taken.
6110 const string url = MkUrl(info->object_key);
976
1/2
✓ Branch 2 taken 6110 times.
✗ Branch 3 not taken.
6110 retval = curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
977
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
6110 assert(retval == CURLE_OK);
978
979
1/2
✓ Branch 2 taken 6110 times.
✗ Branch 3 not taken.
6110 retval = curl_easy_setopt(curl_handle, CURLOPT_PROXY, config_.proxy.c_str());
980
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6110 times.
6110 assert(retval == CURLE_OK);
981 6110 }
982
983
984 /**
985 * Adds transfer time and uploaded bytes to the global counters.
986 */
987 6130 void S3FanoutManager::UpdateStatistics(CURL *handle) {
988 double val;
989
990
2/4
✓ Branch 1 taken 6130 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 6130 times.
✗ Branch 4 not taken.
6130 if (curl_easy_getinfo(handle, CURLINFO_SIZE_UPLOAD, &val) == CURLE_OK)
991 6130 statistics_->transferred_bytes += val;
992 6130 }
993
994
995 /**
996 * Retry if possible and if not already done too often.
997 */
998 30 bool S3FanoutManager::CanRetry(const JobInfo *info) {
999 30 return (info->error_code == kFailHostConnection
1000
1/2
✓ Branch 0 taken 30 times.
✗ Branch 1 not taken.
30 || info->error_code == kFailHostResolve
1001
1/2
✓ Branch 0 taken 30 times.
✗ Branch 1 not taken.
30 || info->error_code == kFailServiceUnavailable
1002
2/2
✓ Branch 0 taken 20 times.
✓ Branch 1 taken 10 times.
30 || info->error_code == kFailRetry)
1003
2/4
✓ Branch 0 taken 30 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 20 times.
✗ Branch 3 not taken.
60 && (info->num_retries < config_.opt_max_retries);
1004 }
1005
1006
1007 /**
1008 * Backoff for retry to introduce a jitter into a upload sequence.
1009 *
1010 * \return true if backoff has been performed, false otherwise
1011 */
1012 20 void S3FanoutManager::Backoff(JobInfo *info) {
1013
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20 times.
20 if (info->error_code != kFailRetry)
1014 info->num_retries++;
1015 20 statistics_->num_retries++;
1016
1017
1/2
✓ Branch 0 taken 20 times.
✗ Branch 1 not taken.
20 if (info->throttle_ms > 0) {
1018 20 LogCvmfs(kLogS3Fanout, kLogDebug, "throttling for %d ms",
1019 info->throttle_ms);
1020 20 const uint64_t now = platform_monotonic_time();
1021
1/2
✓ Branch 0 taken 20 times.
✗ Branch 1 not taken.
20 if ((info->throttle_timestamp + (info->throttle_ms / 1000)) >= now) {
1022
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 15 times.
20 if ((now - timestamp_last_throttle_report_)
1023 > kThrottleReportIntervalSec) {
1024 5 LogCvmfs(kLogS3Fanout, kLogStdout,
1025 "Warning: S3 backend throttling %ums "
1026 "(total backoff time so far %lums)",
1027 5 info->throttle_ms, statistics_->ms_throttled);
1028 5 timestamp_last_throttle_report_ = now;
1029 }
1030 20 statistics_->ms_throttled += info->throttle_ms;
1031 20 SafeSleepMs(info->throttle_ms);
1032 }
1033 } else {
1034 if (info->backoff_ms == 0) {
1035 // Must be != 0
1036 info->backoff_ms = prng_.Next(config_.opt_backoff_init_ms + 1);
1037 } else {
1038 info->backoff_ms *= 2;
1039 }
1040 if (info->backoff_ms > config_.opt_backoff_max_ms)
1041 info->backoff_ms = config_.opt_backoff_max_ms;
1042
1043 LogCvmfs(kLogS3Fanout, kLogDebug, "backing off for %d ms",
1044 info->backoff_ms);
1045 SafeSleepMs(info->backoff_ms);
1046 }
1047 20 }
1048
1049
1050 /**
1051 * Checks the result of a curl request and implements the failure logic
1052 * and takes care of cleanup.
1053 *
1054 * @return true if request should be repeated, false otherwise
1055 */
1056 6130 bool S3FanoutManager::VerifyAndFinalize(const int curl_error, JobInfo *info) {
1057 6130 LogCvmfs(kLogS3Fanout, kLogDebug,
1058 "Verify uploaded/tested object %s "
1059 "(curl error %d, info error %d, info request %d)",
1060 6130 info->object_key.c_str(), curl_error, info->error_code,
1061 6130 info->request);
1062 6130 UpdateStatistics(info->curl_handle);
1063
1064 // Verification and error classification
1065
1/6
✓ Branch 0 taken 6130 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
6130 switch (curl_error) {
1066 6130 case CURLE_OK:
1067
2/2
✓ Branch 0 taken 6110 times.
✓ Branch 1 taken 20 times.
6130 if ((info->error_code != kFailRetry)
1068
2/2
✓ Branch 0 taken 3060 times.
✓ Branch 1 taken 3050 times.
6110 && (info->error_code != kFailNotFound)) {
1069 3060 info->error_code = kFailOk;
1070 }
1071 6130 break;
1072 case CURLE_UNSUPPORTED_PROTOCOL:
1073 case CURLE_URL_MALFORMAT:
1074 info->error_code = kFailBadRequest;
1075 break;
1076 case CURLE_COULDNT_RESOLVE_HOST:
1077 info->error_code = kFailHostResolve;
1078 break;
1079 case CURLE_COULDNT_CONNECT:
1080 case CURLE_OPERATION_TIMEDOUT:
1081 case CURLE_SEND_ERROR:
1082 case CURLE_RECV_ERROR:
1083 info->error_code = kFailHostConnection;
1084 break;
1085 case CURLE_ABORTED_BY_CALLBACK:
1086 case CURLE_WRITE_ERROR:
1087 // Error set by callback
1088 break;
1089 default:
1090 LogCvmfs(kLogS3Fanout, kLogStderr | kLogSyslogErr,
1091 "unexpected curl error (%d) while trying to upload %s: %s",
1092 curl_error, info->object_key.c_str(), info->errorbuffer);
1093 info->error_code = kFailOther;
1094 break;
1095 }
1096
1097 // Transform HEAD to PUT request
1098
2/2
✓ Branch 0 taken 3050 times.
✓ Branch 1 taken 3080 times.
6130 if ((info->error_code == kFailNotFound)
1099
2/2
✓ Branch 0 taken 3040 times.
✓ Branch 1 taken 10 times.
3050 && (info->request == JobInfo::kReqHeadPut)) {
1100 3040 LogCvmfs(kLogS3Fanout, kLogDebug, "not found: %s, uploading",
1101 info->object_key.c_str());
1102 3040 info->request = JobInfo::kReqPutCas;
1103 3040 curl_slist_free_all(info->http_headers);
1104 3040 info->http_headers = NULL;
1105 3040 const s3fanout::Failures init_failure = InitializeRequest(
1106 info, info->curl_handle);
1107
1108
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3040 times.
3040 if (init_failure != s3fanout::kFailOk) {
1109 PANIC(kLogStderr,
1110 "Failed to initialize CURL handle "
1111 "(error: %d - %s | errno: %d)",
1112 init_failure, Code2Ascii(init_failure), errno);
1113 }
1114 3040 SetUrlOptions(info);
1115 // Reset origin
1116 3040 info->origin->Rewind();
1117 3040 return true; // Again, Put
1118 }
1119
1120 // Determination if failed request should be repeated
1121 3090 bool try_again = false;
1122
2/2
✓ Branch 0 taken 30 times.
✓ Branch 1 taken 3060 times.
3090 if (info->error_code != kFailOk) {
1123 30 try_again = CanRetry(info);
1124 }
1125
2/2
✓ Branch 0 taken 20 times.
✓ Branch 1 taken 3070 times.
3090 if (try_again) {
1126
1/2
✓ Branch 0 taken 20 times.
✗ Branch 1 not taken.
20 if (info->request == JobInfo::kReqPutCas
1127
1/2
✓ Branch 0 taken 20 times.
✗ Branch 1 not taken.
20 || info->request == JobInfo::kReqPutDotCvmfs
1128
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20 times.
20 || info->request == JobInfo::kReqPutHtml) {
1129 LogCvmfs(kLogS3Fanout, kLogDebug, "Trying again to upload %s",
1130 info->object_key.c_str());
1131 // Reset origin
1132 info->origin->Rewind();
1133 }
1134 20 Backoff(info);
1135 20 info->error_code = kFailOk;
1136 20 info->http_error = 0;
1137 20 info->throttle_ms = 0;
1138 20 info->backoff_ms = 0;
1139 20 info->throttle_timestamp = 0;
1140 20 return true; // try again
1141 }
1142
1143 // Cleanup opened resources
1144 3070 info->origin.Destroy();
1145
1146
3/4
✓ Branch 0 taken 10 times.
✓ Branch 1 taken 3060 times.
✓ Branch 2 taken 10 times.
✗ Branch 3 not taken.
3070 if ((info->error_code != kFailOk) && (info->http_error != 0)
1147
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
10 && (info->http_error != 404)) {
1148 LogCvmfs(kLogS3Fanout, kLogStderr, "S3: HTTP failure %d", info->http_error);
1149 }
1150 3070 return false; // stop transfer
1151 }
1152
1153
2/4
✓ Branch 3 taken 60 times.
✗ Branch 4 not taken.
✓ Branch 9 taken 60 times.
✗ Branch 10 not taken.
60 S3FanoutManager::S3FanoutManager(const S3Config &config) : config_(config) {
1154 60 atomic_init32(&multi_threaded_);
1155
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 MakePipe(pipe_terminate_);
1156
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 MakePipe(pipe_jobs_);
1157
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 MakePipe(pipe_completed_);
1158
1159 int retval;
1160 60 jobs_todo_lock_ = reinterpret_cast<pthread_mutex_t *>(
1161 60 smalloc(sizeof(pthread_mutex_t)));
1162 60 retval = pthread_mutex_init(jobs_todo_lock_, NULL);
1163
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
60 assert(retval == 0);
1164 60 curl_handle_lock_ = reinterpret_cast<pthread_mutex_t *>(
1165 60 smalloc(sizeof(pthread_mutex_t)));
1166 60 retval = pthread_mutex_init(curl_handle_lock_, NULL);
1167
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
60 assert(retval == 0);
1168
1169
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 active_requests_ = new set<JobInfo *>;
1170
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 pool_handles_idle_ = new set<CURL *>;
1171
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 pool_handles_inuse_ = new set<CURL *>;
1172
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 curl_sharehandles_ = new map<CURL *, S3FanOutDnsEntry *>;
1173
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 sharehandles_ = new set<S3FanOutDnsEntry *>;
1174 60 watch_fds_max_ = 4 * config_.pool_max_handles;
1175 60 max_available_jobs_ = 4 * config_.pool_max_handles;
1176
2/4
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 60 times.
✗ Branch 5 not taken.
60 available_jobs_ = new Semaphore(max_available_jobs_);
1177
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
60 assert(NULL != available_jobs_);
1178
1179
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 statistics_ = new Statistics();
1180
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 user_agent_ = new string();
1181
2/4
✓ Branch 2 taken 60 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 60 times.
✗ Branch 6 not taken.
60 *user_agent_ = "User-Agent: cvmfs " + string(CVMFS_VERSION);
1182
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 complete_hostname_ = MkCompleteHostname();
1183
1184
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 const CURLcode cretval = curl_global_init(CURL_GLOBAL_ALL);
1185
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
60 assert(cretval == CURLE_OK);
1186
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 curl_multi_ = curl_multi_init();
1187
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
60 assert(curl_multi_ != NULL);
1188 CURLMcode mretval;
1189
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION,
1190 CallbackCurlSocket);
1191
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
60 assert(mretval == CURLM_OK);
1192
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1193 static_cast<void *>(this));
1194
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
60 assert(mretval == CURLM_OK);
1195
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 mretval = curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1196 config_.pool_max_handles);
1197
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
60 assert(mretval == CURLM_OK);
1198
1199 60 prng_.InitLocaltime();
1200
1201 60 thread_upload_ = 0;
1202 60 timestamp_last_throttle_report_ = 0;
1203 60 is_curl_debug_ = (getenv("_CVMFS_CURL_DEBUG") != NULL);
1204
1205 // Parsing environment variables
1206 60 if ((getenv("CVMFS_IPV4_ONLY") != NULL)
1207
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 60 times.
60 && (strlen(getenv("CVMFS_IPV4_ONLY")) > 0)) {
1208 opt_ipv4_only_ = true;
1209 } else {
1210 60 opt_ipv4_only_ = false;
1211 }
1212
1213
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 resolver_ = dns::CaresResolver::Create(opt_ipv4_only_, 2, 2000);
1214
1215 60 watch_fds_ = static_cast<struct pollfd *>(smalloc(4 * sizeof(struct pollfd)));
1216 60 watch_fds_size_ = 4;
1217 60 watch_fds_inuse_ = 0;
1218
1219
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 ssl_certificate_store_.UseSystemCertificatePath();
1220 60 }
1221
1222 120 S3FanoutManager::~S3FanoutManager() {
1223 60 pthread_mutex_destroy(jobs_todo_lock_);
1224 60 free(jobs_todo_lock_);
1225 60 pthread_mutex_destroy(curl_handle_lock_);
1226 60 free(curl_handle_lock_);
1227
1228
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1229 // Shutdown I/O thread
1230 60 char buf = 'T';
1231 60 WritePipe(pipe_terminate_[1], &buf, 1);
1232 60 pthread_join(thread_upload_, NULL);
1233 }
1234 60 ClosePipe(pipe_terminate_);
1235 60 ClosePipe(pipe_jobs_);
1236 60 ClosePipe(pipe_completed_);
1237
1238 60 set<CURL *>::iterator i = pool_handles_idle_->begin();
1239 60 const set<CURL *>::const_iterator iEnd = pool_handles_idle_->end();
1240
2/2
✓ Branch 2 taken 100 times.
✓ Branch 3 taken 60 times.
160 for (; i != iEnd; ++i) {
1241 100 curl_easy_cleanup(*i);
1242 }
1243
1244 60 set<S3FanOutDnsEntry *>::iterator is = sharehandles_->begin();
1245 60 const set<S3FanOutDnsEntry *>::const_iterator isEnd = sharehandles_->end();
1246
2/2
✓ Branch 2 taken 50 times.
✓ Branch 3 taken 60 times.
110 for (; is != isEnd; ++is) {
1247 50 curl_share_cleanup((*is)->sharehandle);
1248 50 curl_slist_free_all((*is)->clist);
1249
1/2
✓ Branch 1 taken 50 times.
✗ Branch 2 not taken.
50 delete *is;
1250 }
1251 60 pool_handles_idle_->clear();
1252 60 curl_sharehandles_->clear();
1253 60 sharehandles_->clear();
1254
1/2
✓ Branch 0 taken 60 times.
✗ Branch 1 not taken.
60 delete active_requests_;
1255
1/2
✓ Branch 0 taken 60 times.
✗ Branch 1 not taken.
60 delete pool_handles_idle_;
1256
1/2
✓ Branch 0 taken 60 times.
✗ Branch 1 not taken.
60 delete pool_handles_inuse_;
1257
1/2
✓ Branch 0 taken 60 times.
✗ Branch 1 not taken.
60 delete curl_sharehandles_;
1258
1/2
✓ Branch 0 taken 60 times.
✗ Branch 1 not taken.
60 delete sharehandles_;
1259
1/2
✓ Branch 0 taken 60 times.
✗ Branch 1 not taken.
60 delete user_agent_;
1260 60 curl_multi_cleanup(curl_multi_);
1261
1262
1/2
✓ Branch 0 taken 60 times.
✗ Branch 1 not taken.
60 delete statistics_;
1263
1264
1/2
✓ Branch 0 taken 60 times.
✗ Branch 1 not taken.
60 delete available_jobs_;
1265
1266 60 curl_global_cleanup();
1267 60 }
1268
1269 /**
1270 * Spawns the I/O worker thread. No way back except ~S3FanoutManager.
1271 */
1272 60 void S3FanoutManager::Spawn() {
1273 60 LogCvmfs(kLogS3Fanout, kLogDebug, "S3FanoutManager spawned");
1274
1275 60 const int retval = pthread_create(&thread_upload_, NULL, MainUpload,
1276 static_cast<void *>(this));
1277
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 60 times.
60 assert(retval == 0);
1278
1279 60 atomic_inc32(&multi_threaded_);
1280 60 }
1281
1282 15 const Statistics &S3FanoutManager::GetStatistics() { return *statistics_; }
1283
1284 /**
1285 * Push new job to be uploaded to the S3 cloud storage.
1286 */
1287 3070 void S3FanoutManager::PushNewJob(JobInfo *info) {
1288 3070 available_jobs_->Increment();
1289 3070 WritePipe(pipe_jobs_[1], &info, sizeof(info));
1290 3070 }
1291
1292 /**
1293 * Push completed job to list of completed jobs
1294 */
1295 3130 void S3FanoutManager::PushCompletedJob(JobInfo *info) {
1296 3130 WritePipe(pipe_completed_[1], &info, sizeof(info));
1297 3130 }
1298
1299 /**
1300 * Pop completed job
1301 */
1302 3130 JobInfo *S3FanoutManager::PopCompletedJob() {
1303 JobInfo *info;
1304
1/2
✓ Branch 1 taken 3130 times.
✗ Branch 2 not taken.
3130 ReadPipe(pipe_completed_[0], &info, sizeof(info));
1305 3130 return info;
1306 }
1307
1308 //------------------------------------------------------------------------------
1309
1310
1311 string Statistics::Print() const {
1312 return "Transferred Bytes: " + StringifyInt(uint64_t(transferred_bytes))
1313 + "\n" + "Transfer duration: " + StringifyInt(uint64_t(transfer_time))
1314 + " s\n" + "Number of requests: " + StringifyInt(num_requests) + "\n"
1315 + "Number of retries: " + StringifyInt(num_retries) + "\n";
1316 }
1317
1318 } // namespace s3fanout
1319