GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/network/download.cc
Date: 2026-06-21 02:37:04
Exec Total Coverage
Lines: 875 1754 49.9%
Branches: 666 2254 29.5%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 *
4 * The download module provides an interface for fetching files via HTTP
5 * and file. It is internally using libcurl and the asynchronous DNS resolver
6 * c-ares. The JobInfo struct describes a single file/url to download and
7 * keeps the state during the several phases of downloading.
8 *
9 * The module starts in single-threaded mode and can be switched to multi-
10 * threaded mode by Spawn(). In multi-threaded mode, the Fetch() function still
11 * blocks but there is a separate I/O thread using asynchronous I/O, which
12 * maintains all concurrent connections simultaneously. As there might be more
13 * than 1024 file descriptors for the CernVM-FS process, the I/O thread uses
14 * poll and the libcurl multi socket interface.
15 *
16 * While downloading, files can be decompressed and the secure hash can be
17 * calculated on the fly.
18 *
19 * The module also implements failure handling. If corrupted data has been
20 * downloaded, the transfer is restarted using HTTP "no-cache" pragma.
21 * A "host chain" can be configured. When a host fails, there is automatic
22 * fail-over to the next host in the chain until all hosts are probed.
23 * Similarly a chain of proxy sets can be configured. Inside a proxy set,
24 * proxies are selected randomly (load-balancing set).
25 */
26
27 // TODO(jblomer): MS for time summing
28
29 #include "download.h"
30
31 #include <alloca.h>
32 #include <errno.h>
33 #include <inttypes.h>
34 #include <poll.h>
35 #include <pthread.h>
36 #include <signal.h>
37 #include <stdint.h>
38 #include <sys/time.h>
39 #include <unistd.h>
40
41 #include <algorithm>
42 #include <cassert>
43 #include <cstdio>
44 #include <cstdlib>
45 #include <cstring>
46 #include <map>
47 #include <set>
48 #include <utility>
49
50 #include "compression/compression.h"
51 #include "crypto/hash.h"
52 #include "duplex_curl.h" // IWYU pragma: keep
53 #include "interrupt.h"
54 #include "network/sink_mem.h"
55 #include "network/sink_path.h"
56 #include "sanitizer.h"
57 #include "ssl.h"
58 #include "util/algorithm.h"
59 #include "util/atomic.h"
60 #include "util/exception.h"
61 #include "util/logging.h"
62 #include "util/posix.h"
63 #include "util/prng.h"
64 #include "util/smalloc.h"
65 #include "util/string.h"
66
67 using namespace std; // NOLINT
68
69 namespace download {
70
71 /**
72 * Returns the status if an interrupt happened for a given repository.
73 *
74 * Used only in case CVMFS_FAILOVER_INDEFINITELY (failover_indefinitely_) is set
75 * where failed downloads are retried indefinitely, unless an interrupt occurred
76 *
77 * @note If you use this functionality you need to change the source code of
78 * e.g. cvmfs_config reload to create a sentinel file. See comment below.
79 *
80 * @return true if an interrupt occurred
81 * false otherwise
82 */
83 bool Interrupted(const std::string &fqrn, JobInfo *info) {
84 if (info->allow_failure()) {
85 return true;
86 }
87
88 if (!fqrn.empty()) {
89 // it is up to the user the create this sentinel file ("pause_file") if
90 // CVMFS_FAILOVER_INDEFINITELY is used. It must be created during
91 // "cvmfs_config reload" and "cvmfs_config reload $fqrn"
92 const std::string pause_file = std::string("/var/run/cvmfs/interrupt.")
93 + fqrn;
94
95 LogCvmfs(kLogDownload, kLogDebug,
96 "(id %" PRId64 ") Interrupted(): checking for existence of %s",
97 info->id(), pause_file.c_str());
98 if (FileExists(pause_file)) {
99 LogCvmfs(kLogDownload, kLogDebug,
100 "(id %" PRId64 ") Interrupt marker found - "
101 "Interrupting current download, this will EIO outstanding IO.",
102 info->id());
103 if (0 != unlink(pause_file.c_str())) {
104 LogCvmfs(kLogDownload, kLogDebug,
105 "(id %" PRId64 ") Couldn't delete interrupt marker: errno=%d",
106 info->id(), errno);
107 }
108 return true;
109 }
110 }
111 return false;
112 }
113
114 2767 static Failures PrepareDownloadDestination(JobInfo *info) {
115
3/6
✓ Branch 1 taken 2767 times.
✗ Branch 2 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 2767 times.
✗ Branch 7 not taken.
✓ Branch 8 taken 2767 times.
2767 if (info->sink() != NULL && !info->sink()->IsValid()) {
116 cvmfs::PathSink *psink = dynamic_cast<cvmfs::PathSink *>(info->sink());
117 if (psink != NULL) {
118 LogCvmfs(kLogDownload, kLogDebug,
119 "(id %" PRId64 ") Failed to open path %s: %s (errno=%d).",
120 info->id(), psink->path().c_str(), strerror(errno), errno);
121 return kFailLocalIO;
122 } else {
123 LogCvmfs(kLogDownload, kLogDebug,
124 "(id %" PRId64 ") "
125 "Failed to create a valid sink: \n %s",
126 info->id(), info->sink()->Describe().c_str());
127 return kFailOther;
128 }
129 }
130
131 2767 return kFailOk;
132 }
133
134
135 /**
136 * Called by curl for every HTTP header. Not called for file:// transfers.
137 */
138 8083 static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb,
139 void *info_link) {
140 8083 const size_t num_bytes = size * nmemb;
141
1/2
✓ Branch 2 taken 8083 times.
✗ Branch 3 not taken.
8083 const string header_line(static_cast<const char *>(ptr), num_bytes);
142 8083 JobInfo *info = static_cast<JobInfo *>(info_link);
143
144 // LogCvmfs(kLogDownload, kLogDebug, "REMOVE-ME: Header callback with %s",
145 // header_line.c_str());
146
147 // Check http status codes
148
4/6
✓ Branch 2 taken 8083 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 8083 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 24 times.
✓ Branch 10 taken 8059 times.
8083 if (HasPrefix(header_line, "HTTP/1.", false)) {
149
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 24 times.
24 if (header_line.length() < 10) {
150 return 0;
151 }
152
153 unsigned i;
154
5/6
✓ Branch 1 taken 48 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 24 times.
✓ Branch 5 taken 24 times.
✓ Branch 6 taken 24 times.
✓ Branch 7 taken 24 times.
48 for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {
155 }
156
157 // Code is initialized to -1
158
1/2
✓ Branch 1 taken 24 times.
✗ Branch 2 not taken.
24 if (header_line.length() > i + 2) {
159 24 info->SetHttpCode(DownloadManager::ParseHttpCode(&header_line[i]));
160 }
161
162
2/2
✓ Branch 1 taken 18 times.
✓ Branch 2 taken 6 times.
24 if ((info->http_code() / 100) == 2) {
163 18 return num_bytes;
164
0/2
✗ Branch 2 not taken.
✗ Branch 3 not taken.
6 } else if ((info->http_code() == 301) || (info->http_code() == 302)
165
2/8
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 6 times.
✗ Branch 9 not taken.
6 || (info->http_code() == 303) || (info->http_code() == 307)) {
166
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 6 times.
6 if (!info->follow_redirects()) {
167 LogCvmfs(kLogDownload, kLogDebug,
168 "(id %" PRId64 ") redirect support not enabled: %s",
169 info->id(), header_line.c_str());
170 info->SetErrorCode(kFailHostHttp);
171 return 0;
172 }
173
1/2
✓ Branch 3 taken 6 times.
✗ Branch 4 not taken.
6 LogCvmfs(kLogDownload, kLogDebug, "(id %" PRId64 ") http redirect: %s",
174 info->id(), header_line.c_str());
175 // libcurl will handle this because of CURLOPT_FOLLOWLOCATION
176 6 return num_bytes;
177 } else {
178 LogCvmfs(kLogDownload, kLogDebug,
179 "(id %" PRId64 ") http status error code: %s [%d]", info->id(),
180 header_line.c_str(), info->http_code());
181 if (((info->http_code() / 100) == 5) || (info->http_code() == 400)
182 || (info->http_code() == 404)) {
183 // 5XX returned by host
184 // 400: error from the GeoAPI module
185 // 404: the stratum 1 does not have the newest files
186 info->SetErrorCode(kFailHostHttp);
187 } else if (info->http_code() == 429) {
188 // 429: rate throttling (we ignore the backoff hint for the time being)
189 info->SetErrorCode(kFailHostConnection);
190 } else {
191 info->SetErrorCode((info->proxy() == "DIRECT") ? kFailHostHttp
192 : kFailProxyHttp);
193 }
194 return 0;
195 }
196 }
197
198 // If needed: allocate space in sink
199
3/4
✓ Branch 2 taken 8059 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 3307 times.
✓ Branch 5 taken 4752 times.
8059 if (info->sink() != NULL && info->sink()->RequiresReserve()
200
11/20
✓ Branch 1 taken 8059 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 3307 times.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 3307 times.
✗ Branch 9 not taken.
✓ Branch 10 taken 1095 times.
✓ Branch 11 taken 2212 times.
✓ Branch 12 taken 3307 times.
✓ Branch 13 taken 4752 times.
✗ Branch 14 not taken.
✓ Branch 15 taken 3307 times.
✓ Branch 16 taken 4752 times.
✓ Branch 18 taken 1095 times.
✓ Branch 19 taken 6964 times.
✗ Branch 20 not taken.
✗ Branch 21 not taken.
✗ Branch 23 not taken.
✗ Branch 24 not taken.
16118 && HasPrefix(header_line, "CONTENT-LENGTH:", true)) {
201 1095 char *tmp = reinterpret_cast<char *>(alloca(num_bytes + 1));
202 1095 uint64_t length = 0;
203 1095 sscanf(header_line.c_str(), "%s %" PRIu64, tmp, &length);
204
1/2
✓ Branch 0 taken 1095 times.
✗ Branch 1 not taken.
1095 if (length > 0) {
205
2/4
✓ Branch 2 taken 1095 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 1095 times.
1095 if (!info->sink()->Reserve(length)) {
206 LogCvmfs(kLogDownload, kLogDebug | kLogSyslogErr,
207 "(id %" PRId64 ") "
208 "resource %s too large to store in memory (%" PRIu64 ")",
209 info->id(), info->url()->c_str(), length);
210 info->SetErrorCode(kFailTooBig);
211 return 0;
212 }
213 } else {
214 // Empty resource
215 info->sink()->Reserve(0);
216 }
217
4/6
✓ Branch 2 taken 6964 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 6964 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 6 times.
✓ Branch 10 taken 6958 times.
6964 } else if (HasPrefix(header_line, "LOCATION:", true)) {
218 // This comes along with redirects
219
1/2
✓ Branch 3 taken 6 times.
✗ Branch 4 not taken.
6 LogCvmfs(kLogDownload, kLogDebug, "(id %" PRId64 ") %s", info->id(),
220 header_line.c_str());
221
3/6
✓ Branch 2 taken 6958 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 6958 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 6958 times.
6958 } else if (HasPrefix(header_line, "LINK:", true)) {
222 // This is metalink info
223 LogCvmfs(kLogDownload, kLogDebug, "(id %" PRId64 ") %s", info->id(),
224 header_line.c_str());
225 std::string link = info->link();
226 if (link.size() != 0) {
227 // multiple LINK headers are allowed
228 link = link + ", " + header_line.substr(5);
229 } else {
230 link = header_line.substr(5);
231 }
232 info->SetLink(link);
233
3/6
✓ Branch 3 taken 6958 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 6958 times.
✗ Branch 7 not taken.
✗ Branch 10 not taken.
✓ Branch 11 taken 6958 times.
6958 } else if (HasPrefix(header_line, "X-SQUID-ERROR:", true)) {
234 // Reinterpret host error as proxy error
235 if (info->error_code() == kFailHostHttp) {
236 info->SetErrorCode(kFailProxyHttp);
237 }
238
3/6
✓ Branch 2 taken 6958 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 6958 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 6958 times.
6958 } else if (HasPrefix(header_line, "PROXY-STATUS:", true)) {
239 // Reinterpret host error as proxy error if applicable
240 if ((info->error_code() == kFailHostHttp)
241 && (header_line.find("error=") != string::npos)) {
242 info->SetErrorCode(kFailProxyHttp);
243 }
244 }
245
246 8059 return num_bytes;
247 8083 }
248
249
250 /**
251 * Called by curl for every received data chunk.
252 *
253 * In multi-threaded mode the chunk's hash is updated here (cheap) and the
254 * raw bytes are handed off to the calling Fetch() thread via JobInfo's
255 * data tube; the (relatively expensive) zlib decompression and sink->Write
256 * happen on that caller thread. This lets multiple callers — e.g. main
257 * thread plus N file-bundle prefetch workers — run their decompression in
258 * parallel instead of serializing on this single MainDownload thread.
259 *
260 * In single-threaded mode (no MainDownload thread, e.g. unit tests) the
261 * old inline path is kept.
262 */
263 2655 static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb,
264 void *info_link) {
265 2655 const size_t num_bytes = size * nmemb;
266 2655 JobInfo *info = static_cast<JobInfo *>(info_link);
267
268 // TODO(heretherebedragons) remove if no error comes up
269 // as this means only jobinfo data request (and not header only)
270 // come here
271
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2655 times.
2655 assert(info->sink() != NULL);
272
273
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2655 times.
2655 if (num_bytes == 0)
274 return 0;
275
276
2/2
✓ Branch 1 taken 1746 times.
✓ Branch 2 taken 909 times.
2655 if (info->expected_hash()) {
277 1746 shash::Update(reinterpret_cast<unsigned char *>(ptr), num_bytes,
278 info->hash_context());
279 }
280
281
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2655 times.
2655 if (info->IsValidDataTube()) {
282 // Parallel-decompress path: copy bytes onto a buffer the caller will own
283 // and enqueue. The caller thread (DownloadManager::Fetch) pops these
284 // elements and runs DecompressZStream2Sink / sink->Write itself.
285 char *buf = new char[num_bytes];
286 memcpy(buf, ptr, num_bytes);
287 DataTubeElement *ele = new DataTubeElement(buf, num_bytes,
288 kActionDecompress);
289 info->GetDataTubePtr()->EnqueueBack(ele);
290 return num_bytes;
291 }
292
293
2/2
✓ Branch 1 taken 1732 times.
✓ Branch 2 taken 923 times.
2655 if (info->compressed()) {
294 1732 const zlib::StreamStates retval = zlib::DecompressZStream2Sink(
295 ptr, static_cast<int64_t>(num_bytes), info->GetZstreamPtr(),
296 info->sink());
297
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1732 times.
1732 if (retval == zlib::kStreamDataError) {
298 LogCvmfs(kLogDownload, kLogSyslogErr,
299 "(id %" PRId64 ") failed to decompress %s", info->id(),
300 info->url()->c_str());
301 info->SetErrorCode(kFailBadData);
302 return 0;
303
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1732 times.
1732 } else if (retval == zlib::kStreamIOError) {
304 LogCvmfs(kLogDownload, kLogSyslogErr,
305 "(id %" PRId64 ") decompressing %s, local IO error", info->id(),
306 info->url()->c_str());
307 info->SetErrorCode(kFailLocalIO);
308 return 0;
309 }
310 } else {
311 923 const int64_t written = info->sink()->Write(ptr, num_bytes);
312
2/4
✓ Branch 0 taken 923 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 923 times.
923 if (written < 0 || static_cast<uint64_t>(written) != num_bytes) {
313 LogCvmfs(kLogDownload, kLogDebug,
314 "(id %" PRId64 ") "
315 "Failed to perform write of %zu bytes to sink %s with errno %ld",
316 info->id(), num_bytes, info->sink()->Describe().c_str(),
317 written);
318 }
319 }
320
321 2655 return num_bytes;
322 }
323
324 #ifdef DEBUGMSG
325 406 static int CallbackCurlDebug(CURL *handle,
326 curl_infotype type,
327 char *data,
328 size_t size,
329 void * /* clientp */) {
330 JobInfo *info;
331
1/2
✓ Branch 1 taken 406 times.
✗ Branch 2 not taken.
406 curl_easy_getinfo(handle, CURLINFO_PRIVATE, &info);
332
333
3/6
✓ Branch 2 taken 406 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 406 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 406 times.
✗ Branch 9 not taken.
812 std::string prefix = "(id " + StringifyInt(info->id()) + ") ";
334
4/8
✓ Branch 0 taken 216 times.
✓ Branch 1 taken 94 times.
✓ Branch 2 taken 24 times.
✓ Branch 3 taken 72 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
406 switch (type) {
335 216 case CURLINFO_TEXT:
336
1/2
✓ Branch 1 taken 216 times.
✗ Branch 2 not taken.
216 prefix += "{info} ";
337 216 break;
338 94 case CURLINFO_HEADER_IN:
339
1/2
✓ Branch 1 taken 94 times.
✗ Branch 2 not taken.
94 prefix += "{header/recv} ";
340 94 break;
341 24 case CURLINFO_HEADER_OUT:
342
1/2
✓ Branch 1 taken 24 times.
✗ Branch 2 not taken.
24 prefix += "{header/sent} ";
343 24 break;
344 72 case CURLINFO_DATA_IN:
345
2/2
✓ Branch 0 taken 8 times.
✓ Branch 1 taken 64 times.
72 if (size < 50) {
346
1/2
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
8 prefix += "{data/recv} ";
347 8 break;
348 } else {
349
1/2
✓ Branch 2 taken 64 times.
✗ Branch 3 not taken.
64 LogCvmfs(kLogCurl, kLogDebug, "%s{data/recv} <snip>", prefix.c_str());
350 64 return 0;
351 }
352 case CURLINFO_DATA_OUT:
353 if (size < 50) {
354 prefix += "{data/sent} ";
355 break;
356 } else {
357 LogCvmfs(kLogCurl, kLogDebug, "%s{data/sent} <snip>", prefix.c_str());
358 return 0;
359 }
360 case CURLINFO_SSL_DATA_IN:
361 if (size < 50) {
362 prefix += "{ssldata/recv} ";
363 break;
364 } else {
365 LogCvmfs(kLogCurl, kLogDebug, "%s{ssldata/recv} <snip>",
366 prefix.c_str());
367 return 0;
368 }
369 case CURLINFO_SSL_DATA_OUT:
370 if (size < 50) {
371 prefix += "{ssldata/sent} ";
372 break;
373 } else {
374 LogCvmfs(kLogCurl, kLogDebug, "%s{ssldata/sent} <snip>",
375 prefix.c_str());
376 return 0;
377 }
378 default:
379 // just log the message
380 break;
381 }
382
383 342 bool valid_char = true;
384
1/2
✓ Branch 2 taken 342 times.
✗ Branch 3 not taken.
342 std::string msg(data, size);
385
2/2
✓ Branch 1 taken 28788 times.
✓ Branch 2 taken 342 times.
29130 for (size_t i = 0; i < msg.length(); ++i) {
386
2/4
✓ Branch 1 taken 28788 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 28788 times.
28788 if (msg[i] == '\0') {
387 msg[i] = '~';
388 }
389
390 // verify that char is a valid printable char
391
3/6
✓ Branch 1 taken 28788 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 28080 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 28080 times.
56868 if ((msg[i] < ' ' || msg[i] > '~')
392
6/8
✓ Branch 0 taken 28080 times.
✓ Branch 1 taken 708 times.
✓ Branch 3 taken 708 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 246 times.
✓ Branch 6 taken 462 times.
✗ Branch 7 not taken.
✓ Branch 8 taken 28788 times.
57114 && (msg[i] != 10 /*line feed*/
393
2/4
✓ Branch 1 taken 246 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 246 times.
246 && msg[i] != 13 /*carriage return*/)) {
394 valid_char = false;
395 }
396 }
397
398
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 342 times.
342 if (!valid_char) {
399 msg = "<Non-plaintext sequence>";
400 }
401
402
1/2
✓ Branch 3 taken 342 times.
✗ Branch 4 not taken.
342 LogCvmfs(kLogCurl, kLogDebug, "%s%s", prefix.c_str(),
403
1/2
✓ Branch 1 taken 342 times.
✗ Branch 2 not taken.
684 Trim(msg, true /* trim_newline */).c_str());
404 342 return 0;
405 406 }
406 #endif
407
408 //------------------------------------------------------------------------------
409
410
411 const int DownloadManager::kProbeUnprobed = -1;
412 const int DownloadManager::kProbeDown = -2;
413 const int DownloadManager::kProbeGeo = -3;
414
415 /**
416 * Escape special chars from the URL, except for ':' and '/',
417 * which should keep their meaning.
418 */
419 2775 string DownloadManager::EscapeUrl(const int64_t jobinfo_id, const string &url) {
420 2775 string escaped;
421
1/2
✓ Branch 2 taken 2775 times.
✗ Branch 3 not taken.
2775 escaped.reserve(url.length());
422
423 char escaped_char[3];
424
2/2
✓ Branch 1 taken 251880 times.
✓ Branch 2 taken 2775 times.
254655 for (unsigned i = 0, s = url.length(); i < s; ++i) {
425
3/4
✓ Branch 2 taken 251880 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 18 times.
✓ Branch 5 taken 251862 times.
251880 if (JobInfo::EscapeUrlChar(url[i], escaped_char)) {
426
1/2
✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
18 escaped.append(escaped_char, 3);
427 } else {
428
1/2
✓ Branch 1 taken 251862 times.
✗ Branch 2 not taken.
251862 escaped.push_back(escaped_char[0]);
429 }
430 }
431
1/2
✓ Branch 3 taken 2775 times.
✗ Branch 4 not taken.
2775 LogCvmfs(kLogDownload, kLogDebug, "(id %" PRId64 ") escaped %s to %s",
432 jobinfo_id, url.c_str(), escaped.c_str());
433
434 5550 return escaped;
435 }
436
437 /**
438 * -1 of digits is not a valid Http return code
439 */
440 34 int DownloadManager::ParseHttpCode(const char digits[3]) {
441 34 int result = 0;
442 34 int factor = 100;
443
2/2
✓ Branch 0 taken 102 times.
✓ Branch 1 taken 32 times.
134 for (int i = 0; i < 3; ++i) {
444
3/4
✓ Branch 0 taken 102 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
✓ Branch 3 taken 100 times.
102 if ((digits[i] < '0') || (digits[i] > '9'))
445 2 return -1;
446 100 result += (digits[i] - '0') * factor;
447 100 factor /= 10;
448 }
449 32 return result;
450 }
451
452
453 /**
454 * Called when new curl sockets arrive or existing curl sockets depart.
455 */
456 int DownloadManager::CallbackCurlSocket(CURL * /* easy */,
457 curl_socket_t s,
458 int action,
459 void *userp,
460 void * /* socketp */) {
461 // LogCvmfs(kLogDownload, kLogDebug, "CallbackCurlSocket called with easy "
462 // "handle %p, socket %d, action %d", easy, s, action);
463 DownloadManager *download_mgr = static_cast<DownloadManager *>(userp);
464 if (action == CURL_POLL_NONE)
465 return 0;
466
467 // Find s in watch_fds_
468 unsigned index;
469
470 // TODO(heretherebedragons) why start at index = 0 and not 2?
471 // fd[0] and fd[1] are fixed?
472 for (index = 0; index < download_mgr->watch_fds_inuse_; ++index) {
473 if (download_mgr->watch_fds_[index].fd == s)
474 break;
475 }
476 // Or create newly
477 if (index == download_mgr->watch_fds_inuse_) {
478 // Extend array if necessary
479 if (download_mgr->watch_fds_inuse_ == download_mgr->watch_fds_size_) {
480 assert(download_mgr->watch_fds_size_ > 0);
481 download_mgr->watch_fds_size_ *= 2;
482 download_mgr->watch_fds_ = static_cast<struct pollfd *>(
483 srealloc(download_mgr->watch_fds_,
484 download_mgr->watch_fds_size_ * sizeof(struct pollfd)));
485 }
486 download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].fd = s;
487 download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].events = 0;
488 download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].revents = 0;
489 download_mgr->watch_fds_inuse_++;
490 }
491
492 switch (action) {
493 case CURL_POLL_IN:
494 download_mgr->watch_fds_[index].events = POLLIN | POLLPRI;
495 break;
496 case CURL_POLL_OUT:
497 download_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND;
498 break;
499 case CURL_POLL_INOUT:
500 download_mgr->watch_fds_[index].events = POLLIN | POLLPRI | POLLOUT
501 | POLLWRBAND;
502 break;
503 case CURL_POLL_REMOVE:
504 if (index < download_mgr->watch_fds_inuse_ - 1) {
505 download_mgr
506 ->watch_fds_[index] = download_mgr->watch_fds_
507 [download_mgr->watch_fds_inuse_ - 1];
508 }
509 download_mgr->watch_fds_inuse_--;
510 // Shrink array if necessary
511 if ((download_mgr->watch_fds_inuse_ > download_mgr->watch_fds_max_)
512 && (download_mgr->watch_fds_inuse_
513 < download_mgr->watch_fds_size_ / 2)) {
514 download_mgr->watch_fds_size_ /= 2;
515 // LogCvmfs(kLogDownload, kLogDebug, "shrinking watch_fds_ (%d)",
516 // watch_fds_size_);
517 download_mgr->watch_fds_ = static_cast<struct pollfd *>(
518 srealloc(download_mgr->watch_fds_,
519 download_mgr->watch_fds_size_ * sizeof(struct pollfd)));
520 // LogCvmfs(kLogDownload, kLogDebug, "shrinking watch_fds_ done",
521 // watch_fds_size_);
522 }
523 break;
524 default:
525 break;
526 }
527
528 return 0;
529 }
530
531
532 /**
533 * Worker thread event loop. Waits on new JobInfo structs on a pipe.
534 */
535 void *DownloadManager::MainDownload(void *data) {
536 DownloadManager *download_mgr = static_cast<DownloadManager *>(data);
537 LogCvmfs(kLogDownload, kLogDebug,
538 "download I/O thread of DownloadManager '%s' started",
539 download_mgr->name_.c_str());
540
541 const int kIdxPipeTerminate = 0;
542 const int kIdxPipeJobs = 1;
543
544 download_mgr->watch_fds_ = static_cast<struct pollfd *>(
545 smalloc(2 * sizeof(struct pollfd)));
546 download_mgr->watch_fds_size_ = 2;
547 download_mgr->watch_fds_[kIdxPipeTerminate].fd = download_mgr->pipe_terminate_
548 ->GetReadFd();
549 download_mgr->watch_fds_[kIdxPipeTerminate].events = POLLIN | POLLPRI;
550 download_mgr->watch_fds_[kIdxPipeTerminate].revents = 0;
551 download_mgr->watch_fds_[kIdxPipeJobs].fd = download_mgr->pipe_jobs_
552 ->GetReadFd();
553 download_mgr->watch_fds_[kIdxPipeJobs].events = POLLIN | POLLPRI;
554 download_mgr->watch_fds_[kIdxPipeJobs].revents = 0;
555 download_mgr->watch_fds_inuse_ = 2;
556
557 int still_running = 0;
558 struct timeval timeval_start, timeval_stop;
559 gettimeofday(&timeval_start, NULL);
560 while (true) {
561 int timeout;
562 if (still_running) {
563 /* NOTE: The following might degrade the performance for many small files
564 * use case. TODO(jblomer): look into it.
565 // Specify a timeout for polling in ms; this allows us to return
566 // to libcurl once a second so it can look for internal operations
567 // which timed out. libcurl has a more elaborate mechanism
568 // (CURLMOPT_TIMERFUNCTION) that would inform us of the next potential
569 // timeout. TODO(bbockelm) we should switch to that in the future.
570 timeout = 100;
571 */
572 timeout = 1;
573 } else {
574 timeout = -1;
575 gettimeofday(&timeval_stop, NULL);
576 const int64_t delta = static_cast<int64_t>(
577 1000 * DiffTimeSeconds(timeval_start, timeval_stop));
578 perf::Xadd(download_mgr->counters_->sz_transfer_time, delta);
579 }
580 const int retval = poll(download_mgr->watch_fds_,
581 download_mgr->watch_fds_inuse_, timeout);
582 if (retval < 0) {
583 continue;
584 }
585
586 // Handle timeout
587 if (retval == 0) {
588 curl_multi_socket_action(download_mgr->curl_multi_, CURL_SOCKET_TIMEOUT,
589 0, &still_running);
590 }
591
592 // Terminate I/O thread
593 if (download_mgr->watch_fds_[kIdxPipeTerminate].revents)
594 break;
595
596 // New job arrives
597 if (download_mgr->watch_fds_[kIdxPipeJobs].revents) {
598 download_mgr->watch_fds_[kIdxPipeJobs].revents = 0;
599 JobInfo *info;
600 download_mgr->pipe_jobs_->Read<JobInfo *>(&info);
601 if (!still_running) {
602 gettimeofday(&timeval_start, NULL);
603 }
604 CURL *handle = download_mgr->AcquireCurlHandle();
605 download_mgr->InitializeRequest(info, handle);
606 download_mgr->SetUrlOptions(info);
607 curl_multi_add_handle(download_mgr->curl_multi_, handle);
608 curl_multi_socket_action(download_mgr->curl_multi_, CURL_SOCKET_TIMEOUT,
609 0, &still_running);
610 }
611
612 // Activity on curl sockets
613 // Within this loop the curl_multi_socket_action() may cause socket(s)
614 // to be removed from watch_fds_. If a socket is removed it is replaced
615 // by the socket at the end of the array and the inuse count is decreased.
616 // Therefore loop over the array in reverse order.
617 for (int64_t i = download_mgr->watch_fds_inuse_ - 1; i >= 2; --i) {
618 if (i >= download_mgr->watch_fds_inuse_) {
619 continue;
620 }
621 if (download_mgr->watch_fds_[i].revents) {
622 int ev_bitmask = 0;
623 if (download_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI))
624 ev_bitmask |= CURL_CSELECT_IN;
625 if (download_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
626 ev_bitmask |= CURL_CSELECT_OUT;
627 if (download_mgr->watch_fds_[i].revents
628 & (POLLERR | POLLHUP | POLLNVAL)) {
629 ev_bitmask |= CURL_CSELECT_ERR;
630 }
631 download_mgr->watch_fds_[i].revents = 0;
632
633 curl_multi_socket_action(download_mgr->curl_multi_,
634 download_mgr->watch_fds_[i].fd,
635 ev_bitmask,
636 &still_running);
637 }
638 }
639
640 // Check if transfers are completed
641 CURLMsg *curl_msg;
642 int msgs_in_queue;
643 while ((curl_msg = curl_multi_info_read(download_mgr->curl_multi_,
644 &msgs_in_queue))) {
645 if (curl_msg->msg == CURLMSG_DONE) {
646 perf::Inc(download_mgr->counters_->n_requests);
647 JobInfo *info;
648 CURL *easy_handle = curl_msg->easy_handle;
649 const int curl_error = curl_msg->data.result;
650 curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
651
652 int64_t redir_count;
653 curl_easy_getinfo(easy_handle, CURLINFO_REDIRECT_COUNT, &redir_count);
654 LogCvmfs(kLogDownload, kLogDebug,
655 "(manager '%s' - id %" PRId64 ") "
656 "Number of CURL redirects %" PRId64,
657 download_mgr->name_.c_str(), info->id(), redir_count);
658
659 curl_multi_remove_handle(download_mgr->curl_multi_, easy_handle);
660 if (download_mgr->VerifyAndFinalize(curl_error, info)) {
661 curl_multi_add_handle(download_mgr->curl_multi_, easy_handle);
662 curl_multi_socket_action(download_mgr->curl_multi_,
663 CURL_SOCKET_TIMEOUT,
664 0,
665 &still_running);
666 } else {
667 // Return easy handle into pool and write result back
668 download_mgr->ReleaseCurlHandle(easy_handle, true /* allow_reuse */);
669
670 DataTubeElement *ele = new DataTubeElement(kActionStop);
671 info->GetDataTubePtr()->EnqueueBack(ele);
672 info->GetPipeJobResultPtr()->Write<download::Failures>(
673 info->error_code());
674 }
675 }
676 }
677 }
678
679 for (set<CURL *>::iterator i = download_mgr->pool_handles_inuse_->begin(),
680 iEnd = download_mgr->pool_handles_inuse_->end();
681 i != iEnd;
682 ++i) {
683 curl_multi_remove_handle(download_mgr->curl_multi_, *i);
684 curl_easy_cleanup(*i);
685 }
686 download_mgr->pool_handles_inuse_->clear();
687 free(download_mgr->watch_fds_);
688
689 LogCvmfs(kLogDownload, kLogDebug,
690 "download I/O thread of DownloadManager '%s' terminated",
691 download_mgr->name_.c_str());
692 return NULL;
693 }
694
695
696 //------------------------------------------------------------------------------
697
698
699 2018 HeaderLists::~HeaderLists() {
700
2/2
✓ Branch 1 taken 2044 times.
✓ Branch 2 taken 2018 times.
4062 for (unsigned i = 0; i < blocks_.size(); ++i) {
701
1/2
✓ Branch 1 taken 2044 times.
✗ Branch 2 not taken.
2044 delete[] blocks_[i];
702 }
703 2018 blocks_.clear();
704 2018 }
705
706
707 18228 curl_slist *HeaderLists::GetList(const char *header) { return Get(header); }
708
709
710 2767 curl_slist *HeaderLists::DuplicateList(curl_slist *slist) {
711
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2767 times.
2767 assert(slist);
712 2767 curl_slist *copy = GetList(slist->data);
713 2767 copy->next = slist->next;
714 2767 curl_slist *prev = copy;
715 2767 slist = slist->next;
716
2/2
✓ Branch 0 taken 5534 times.
✓ Branch 1 taken 2767 times.
8301 while (slist) {
717 5534 curl_slist *new_link = Get(slist->data);
718 5534 new_link->next = slist->next;
719 5534 prev->next = new_link;
720 5534 prev = new_link;
721 5534 slist = slist->next;
722 }
723 2767 return copy;
724 }
725
726
727 4378 void HeaderLists::AppendHeader(curl_slist *slist, const char *header) {
728
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4378 times.
4378 assert(slist);
729 4378 curl_slist *new_link = Get(header);
730 4378 new_link->next = NULL;
731
732
2/2
✓ Branch 0 taken 2843 times.
✓ Branch 1 taken 4378 times.
7221 while (slist->next)
733 2843 slist = slist->next;
734 4378 slist->next = new_link;
735 4378 }
736
737
738 /**
739 * Ensures that a certain header string is _not_ part of slist on return.
740 * Note that if the first header element matches, the returned slist points
741 * to a different value.
742 */
743 130 void HeaderLists::CutHeader(const char *header, curl_slist **slist) {
744
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 130 times.
130 assert(slist);
745 curl_slist head;
746 130 head.next = *slist;
747 130 curl_slist *prev = &head;
748 130 curl_slist *rover = *slist;
749
2/2
✓ Branch 0 taken 234 times.
✓ Branch 1 taken 130 times.
364 while (rover) {
750
2/2
✓ Branch 0 taken 104 times.
✓ Branch 1 taken 130 times.
234 if (strcmp(rover->data, header) == 0) {
751 104 prev->next = rover->next;
752 104 Put(rover);
753 104 rover = prev;
754 }
755 234 prev = rover;
756 234 rover = rover->next;
757 }
758 130 *slist = head.next;
759 130 }
760
761
762 9527 void HeaderLists::PutList(curl_slist *slist) {
763
2/2
✓ Branch 0 taken 15453 times.
✓ Branch 1 taken 9527 times.
24980 while (slist) {
764 15453 curl_slist *next = slist->next;
765 15453 Put(slist);
766 15453 slist = next;
767 }
768 9527 }
769
770
771 104 string HeaderLists::Print(curl_slist *slist) {
772 104 string verbose;
773
2/2
✓ Branch 0 taken 208 times.
✓ Branch 1 taken 104 times.
312 while (slist) {
774
3/6
✓ Branch 2 taken 208 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 208 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 208 times.
✗ Branch 9 not taken.
208 verbose += string(slist->data) + "\n";
775 208 slist = slist->next;
776 }
777 104 return verbose;
778 }
779
780
781 28140 curl_slist *HeaderLists::Get(const char *header) {
782
2/2
✓ Branch 1 taken 26095 times.
✓ Branch 2 taken 2071 times.
28166 for (unsigned i = 0; i < blocks_.size(); ++i) {
783
2/2
✓ Branch 0 taken 1777924 times.
✓ Branch 1 taken 26 times.
1777950 for (unsigned j = 0; j < kBlockSize; ++j) {
784
2/2
✓ Branch 2 taken 26069 times.
✓ Branch 3 taken 1751855 times.
1777924 if (!IsUsed(&(blocks_[i][j]))) {
785 26069 blocks_[i][j].data = const_cast<char *>(header);
786 26069 return &(blocks_[i][j]);
787 }
788 }
789 }
790
791 // All used, new block
792 2071 AddBlock();
793 2071 blocks_[blocks_.size() - 1][0].data = const_cast<char *>(header);
794 2071 return &(blocks_[blocks_.size() - 1][0]);
795 }
796
797
798 545733 void HeaderLists::Put(curl_slist *slist) {
799 545733 slist->data = NULL;
800 545733 slist->next = NULL;
801 545733 }
802
803
804 2071 void HeaderLists::AddBlock() {
805
1/2
✓ Branch 1 taken 2071 times.
✗ Branch 2 not taken.
2071 curl_slist *new_block = new curl_slist[kBlockSize];
806
2/2
✓ Branch 0 taken 530176 times.
✓ Branch 1 taken 2071 times.
532247 for (unsigned i = 0; i < kBlockSize; ++i) {
807 530176 Put(&new_block[i]);
808 }
809
1/2
✓ Branch 1 taken 2071 times.
✗ Branch 2 not taken.
2071 blocks_.push_back(new_block);
810 2071 }
811
812
813 //------------------------------------------------------------------------------
814
815
816 string DownloadManager::ProxyInfo::Print() {
817 if (url == "DIRECT")
818 return url;
819
820 string result = url;
821 const int remaining = static_cast<int>(host.deadline())
822 - static_cast<int>(time(NULL));
823 string expinfo = (remaining >= 0) ? "+" : "";
824 if (abs(remaining) >= 3600) {
825 expinfo += StringifyInt(remaining / 3600) + "h";
826 } else if (abs(remaining) >= 60) {
827 expinfo += StringifyInt(remaining / 60) + "m";
828 } else {
829 expinfo += StringifyInt(remaining) + "s";
830 }
831 if (host.status() == dns::kFailOk) {
832 result += " (" + host.name() + ", " + expinfo + ")";
833 } else {
834 result += " (:unresolved:, " + expinfo + ")";
835 }
836 return result;
837 }
838
839
840 /**
841 * Gets an idle CURL handle from the pool. Creates a new one and adds it to
842 * the pool if necessary.
843 */
844 2767 CURL *DownloadManager::AcquireCurlHandle() {
845 CURL *handle;
846
847
1/2
✓ Branch 1 taken 2767 times.
✗ Branch 2 not taken.
2767 if (pool_handles_idle_->empty()) {
848 // Create a new handle
849
1/2
✓ Branch 1 taken 2767 times.
✗ Branch 2 not taken.
2767 handle = curl_easy_init();
850
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2767 times.
2767 assert(handle != NULL);
851
852
1/2
✓ Branch 1 taken 2767 times.
✗ Branch 2 not taken.
2767 curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
853 // curl_easy_setopt(curl_default, CURLOPT_FAILONERROR, 1);
854
1/2
✓ Branch 1 taken 2767 times.
✗ Branch 2 not taken.
2767 curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION, CallbackCurlHeader);
855
1/2
✓ Branch 1 taken 2767 times.
✗ Branch 2 not taken.
2767 curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlData);
856 } else {
857 handle = *(pool_handles_idle_->begin());
858 pool_handles_idle_->erase(pool_handles_idle_->begin());
859 }
860
861
1/2
✓ Branch 1 taken 2767 times.
✗ Branch 2 not taken.
2767 pool_handles_inuse_->insert(handle);
862
863 2767 return handle;
864 }
865
866
867 2767 void DownloadManager::ReleaseCurlHandle(CURL *handle, bool allow_reuse) {
868
1/2
✓ Branch 1 taken 2767 times.
✗ Branch 2 not taken.
2767 const set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
869
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 2767 times.
2767 assert(elem != pool_handles_inuse_->end());
870
871
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 2767 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 2767 times.
✗ Branch 6 not taken.
2767 if (!allow_reuse || pool_handles_idle_->size() > pool_max_handles_) {
872
1/2
✓ Branch 2 taken 2767 times.
✗ Branch 3 not taken.
2767 curl_easy_cleanup(*elem);
873 } else {
874 pool_handles_idle_->insert(*elem);
875 }
876
877
1/2
✓ Branch 1 taken 2767 times.
✗ Branch 2 not taken.
2767 pool_handles_inuse_->erase(elem);
878 2767 }
879
880
881 /**
882 * HTTP request options: set the URL and other options such as timeout and
883 * proxy.
884 */
885 2767 void DownloadManager::InitializeRequest(JobInfo *info, CURL *handle) {
886 // Initialize internal download state
887 2767 info->SetCurlHandle(handle);
888 2767 info->SetErrorCode(kFailOk);
889 2767 info->SetHttpCode(-1);
890 2767 info->SetFollowRedirects(follow_redirects_);
891 2767 info->SetNumUsedProxies(1);
892 2767 info->SetNumUsedMetalinks(1);
893 2767 info->SetNumUsedHosts(1);
894 2767 info->SetNumRetries(0);
895 2767 info->SetBackoffMs(0);
896 2767 info->SetHeaders(header_lists_->DuplicateList(default_headers_));
897
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2767 times.
2767 if (info->info_header()) {
898 header_lists_->AppendHeader(info->headers(), info->info_header());
899 }
900
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2767 times.
2767 if (enable_http_tracing_) {
901 for (unsigned int i = 0; i < http_tracing_headers_.size(); i++) {
902 header_lists_->AppendHeader(info->headers(),
903 (http_tracing_headers_)[i].c_str());
904 }
905
906 header_lists_->AppendHeader(info->headers(), info->tracing_header_pid());
907 header_lists_->AppendHeader(info->headers(), info->tracing_header_gid());
908 header_lists_->AppendHeader(info->headers(), info->tracing_header_uid());
909
910 LogCvmfs(kLogDownload, kLogDebug,
911 "(manager '%s' - id %" PRId64 ") "
912 "CURL Header for URL: %s is:\n %s",
913 name_.c_str(), info->id(), info->url()->c_str(),
914 header_lists_->Print(info->headers()).c_str());
915 }
916
917
2/2
✓ Branch 1 taken 78 times.
✓ Branch 2 taken 2689 times.
2767 if (info->force_nocache()) {
918 78 SetNocache(info);
919 } else {
920 2689 info->SetNocache(false);
921 }
922
2/2
✓ Branch 1 taken 1899 times.
✓ Branch 2 taken 868 times.
2767 if (info->compressed()) {
923 1899 zlib::DecompressInit(info->GetZstreamPtr());
924 }
925
2/2
✓ Branch 1 taken 1913 times.
✓ Branch 2 taken 854 times.
2767 if (info->expected_hash()) {
926
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1913 times.
1913 assert(info->hash_context().buffer != NULL);
927 1913 shash::Init(info->hash_context());
928 }
929
930
2/6
✗ Branch 1 not taken.
✓ Branch 2 taken 2767 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 2767 times.
2767 if ((info->range_offset() != -1) && (info->range_size())) {
931 char byte_range_array[100];
932 const int64_t range_lower = static_cast<int64_t>(info->range_offset());
933 const int64_t range_upper = static_cast<int64_t>(info->range_offset()
934 + info->range_size() - 1);
935 if (snprintf(byte_range_array, sizeof(byte_range_array),
936 "%" PRId64 "-%" PRId64, range_lower, range_upper)
937 == 100) {
938 PANIC(NULL); // Should be impossible given limits on offset size.
939 }
940 curl_easy_setopt(handle, CURLOPT_RANGE, byte_range_array);
941 } else {
942 2767 curl_easy_setopt(handle, CURLOPT_RANGE, NULL);
943 }
944
945 // Set curl parameters
946 2767 curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
947 2767 curl_easy_setopt(handle, CURLOPT_WRITEHEADER, static_cast<void *>(info));
948 2767 curl_easy_setopt(handle, CURLOPT_WRITEDATA, static_cast<void *>(info));
949 2767 curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->headers());
950
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2767 times.
2767 if (info->head_request()) {
951 curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
952 } else {
953 2767 curl_easy_setopt(handle, CURLOPT_HTTPGET, 1);
954 }
955
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2767 times.
2767 if (opt_ipv4_only_) {
956 curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
957 }
958
2/2
✓ Branch 0 taken 822 times.
✓ Branch 1 taken 1945 times.
2767 if (follow_redirects_) {
959 822 curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1);
960 822 curl_easy_setopt(handle, CURLOPT_MAXREDIRS, 4);
961 }
962 #ifdef DEBUGMSG
963 2767 curl_easy_setopt(handle, CURLOPT_VERBOSE, 1);
964 2767 curl_easy_setopt(handle, CURLOPT_DEBUGFUNCTION, CallbackCurlDebug);
965 #endif
966 2767 }
967
968 5546 void DownloadManager::CheckHostInfoReset(const std::string &typ,
969 HostInfo &info,
970 JobInfo *jobinfo,
971 time_t &now) {
972
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5546 times.
5546 if (info.timestamp_backup > 0) {
973 if (now == 0)
974 now = time(NULL);
975 if (static_cast<int64_t>(now)
976 > static_cast<int64_t>(info.timestamp_backup + info.reset_after)) {
977 LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
978 "(manager %s - id %" PRId64 ") "
979 "switching %s from %s to %s (reset %s)",
980 name_.c_str(), jobinfo->id(), typ.c_str(),
981 (*info.chain)[info.current].c_str(), (*info.chain)[0].c_str(),
982 typ.c_str());
983 info.current = 0;
984 info.timestamp_backup = 0;
985 }
986 }
987 5546 }
988
989
990 /**
991 * Sets the URL specific options such as host to use and timeout. It might also
992 * set an error code, in which case the further processing should react on.
993 */
994 2773 void DownloadManager::SetUrlOptions(JobInfo *info) {
995 2773 CURL *curl_handle = info->curl_handle();
996 2773 string url_prefix;
997 2773 time_t now = 0;
998
999 2773 const MutexLockGuard m(lock_options_);
1000
1001 // sharding policy
1002
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2773 times.
2773 if (sharding_policy_.UseCount() > 0) {
1003 if (info->proxy() != "") {
1004 // proxy already set, so this is a failover event
1005 perf::Inc(counters_->n_proxy_failover);
1006 }
1007 info->SetProxy(sharding_policy_->GetNextProxy(
1008 info->url(), info->proxy(),
1009 info->range_offset() == -1 ? 0 : info->range_offset()));
1010
1011 curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY, info->proxy().c_str());
1012 } else { // no sharding policy
1013 // Check if proxy group needs to be reset from backup to primary
1014
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2773 times.
2773 if (opt_timestamp_backup_proxies_ > 0) {
1015 now = time(NULL);
1016 if (static_cast<int64_t>(now) > static_cast<int64_t>(
1017 opt_timestamp_backup_proxies_ + opt_proxy_groups_reset_after_)) {
1018 opt_proxy_groups_current_ = 0;
1019 opt_timestamp_backup_proxies_ = 0;
1020 RebalanceProxiesUnlocked("Reset proxy group from backup to primary");
1021 }
1022 }
1023 // Check if load-balanced proxies within the group need to be reset
1024
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2773 times.
2773 if (opt_timestamp_failover_proxies_ > 0) {
1025 if (now == 0)
1026 now = time(NULL);
1027 if (static_cast<int64_t>(now)
1028 > static_cast<int64_t>(opt_timestamp_failover_proxies_
1029 + opt_proxy_groups_reset_after_)) {
1030 RebalanceProxiesUnlocked(
1031 "Reset load-balanced proxies within the active group");
1032 }
1033 }
1034
1035
1/2
✓ Branch 2 taken 2773 times.
✗ Branch 3 not taken.
2773 ProxyInfo *proxy = ChooseProxyUnlocked(info->expected_hash());
1036
6/6
✓ Branch 0 taken 764 times.
✓ Branch 1 taken 2009 times.
✓ Branch 3 taken 756 times.
✓ Branch 4 taken 8 times.
✓ Branch 5 taken 2765 times.
✓ Branch 6 taken 8 times.
2773 if (!proxy || (proxy->url == "DIRECT")) {
1037
2/4
✓ Branch 2 taken 2765 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2765 times.
✗ Branch 6 not taken.
2765 info->SetProxy("DIRECT");
1038
1/2
✓ Branch 2 taken 2765 times.
✗ Branch 3 not taken.
2765 curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY, "");
1039 } else {
1040 // Note: inside ValidateProxyIpsUnlocked() we may change the proxy data
1041 // structure, so we must not pass proxy->... (== current_proxy())
1042 // parameters directly
1043
1/2
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
8 const std::string purl = proxy->url;
1044
1/2
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
8 const dns::Host phost = proxy->host;
1045
1/2
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
8 const bool changed = ValidateProxyIpsUnlocked(purl, phost);
1046 // Current proxy may have changed
1047
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8 times.
8 if (changed) {
1048 proxy = ChooseProxyUnlocked(info->expected_hash());
1049 }
1050
1/2
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
8 info->SetProxy(proxy->url);
1051
1/2
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
8 if (proxy->host.status() == dns::kFailOk) {
1052
2/4
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
✓ Branch 6 taken 8 times.
✗ Branch 7 not taken.
8 curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY,
1053 info->proxy().c_str());
1054 } else {
1055 // We know it can't work, don't even try to download
1056 curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY, "0.0.0.0");
1057 }
1058 8 }
1059 } // end !sharding
1060
1061 // Check if metalink and host chains need to be reset
1062
2/4
✓ Branch 2 taken 2773 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2773 times.
✗ Branch 6 not taken.
2773 CheckHostInfoReset("metalink", opt_metalink_, info, now);
1063
2/4
✓ Branch 2 taken 2773 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2773 times.
✗ Branch 6 not taken.
2773 CheckHostInfoReset("host", opt_host_, info, now);
1064
1065
1/2
✓ Branch 1 taken 2773 times.
✗ Branch 2 not taken.
2773 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, opt_low_speed_limit_);
1066
3/6
✓ Branch 1 taken 2773 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 8 times.
✓ Branch 6 taken 2765 times.
2773 if (info->proxy() != "DIRECT") {
1067
1/2
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
8 curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_proxy_);
1068
1/2
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
8 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_proxy_);
1069 } else {
1070
1/2
✓ Branch 1 taken 2765 times.
✗ Branch 2 not taken.
2765 curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_direct_);
1071
1/2
✓ Branch 1 taken 2765 times.
✗ Branch 2 not taken.
2765 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_direct_);
1072 }
1073
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2773 times.
2773 if (!opt_dns_server_.empty())
1074 curl_easy_setopt(curl_handle, CURLOPT_DNS_SERVERS, opt_dns_server_.c_str());
1075
1076
2/2
✓ Branch 1 taken 965 times.
✓ Branch 2 taken 1808 times.
2773 if (info->probe_hosts()) {
1077
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 965 times.
965 if (CheckMetalinkChain(now)) {
1078 url_prefix = (*opt_metalink_.chain)[opt_metalink_.current];
1079 info->SetCurrentMetalinkChainIndex(opt_metalink_.current);
1080 LogCvmfs(kLogDownload, kLogDebug,
1081 "(manager %s - id %" PRId64 ") "
1082 "reading from metalink %d",
1083 name_.c_str(), info->id(), opt_metalink_.current);
1084
1/2
✓ Branch 0 taken 965 times.
✗ Branch 1 not taken.
965 } else if (opt_host_.chain) {
1085
1/2
✓ Branch 2 taken 965 times.
✗ Branch 3 not taken.
965 url_prefix = (*opt_host_.chain)[opt_host_.current];
1086 965 info->SetCurrentHostChainIndex(opt_host_.current);
1087
1/2
✓ Branch 3 taken 965 times.
✗ Branch 4 not taken.
965 LogCvmfs(kLogDownload, kLogDebug,
1088 "(manager %s - id %" PRId64 ") "
1089 "reading from host %d",
1090 name_.c_str(), info->id(), opt_host_.current);
1091 }
1092 }
1093
1094
1/2
✓ Branch 2 taken 2773 times.
✗ Branch 3 not taken.
2773 string url = url_prefix + *(info->url());
1095
1096
1/2
✓ Branch 1 taken 2773 times.
✗ Branch 2 not taken.
2773 curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 1L);
1097
2/6
✓ Branch 1 taken 2773 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 2773 times.
2773 if (url.substr(0, 5) == "https") {
1098 const bool rvb = ssl_certificate_store_.ApplySslCertificatePath(
1099 curl_handle);
1100 if (!rvb) {
1101 LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
1102 "(manager %s - id %" PRId64 ") "
1103 "Failed to set SSL certificate path %s",
1104 name_.c_str(), info->id(),
1105 ssl_certificate_store_.GetCaPath().c_str());
1106 }
1107 if (info->pid() != -1) {
1108 if (credentials_attachment_ == NULL) {
1109 LogCvmfs(kLogDownload, kLogDebug,
1110 "(manager %s - id %" PRId64 ") "
1111 "uses secure downloads but no credentials attachment set",
1112 name_.c_str(), info->id());
1113 } else {
1114 const bool retval = credentials_attachment_->ConfigureCurlHandle(
1115 curl_handle, info->pid(), info->GetCredDataPtr());
1116 if (!retval) {
1117 LogCvmfs(kLogDownload, kLogDebug,
1118 "(manager %s - id %" PRId64 ") "
1119 "failed attaching credentials",
1120 name_.c_str(), info->id());
1121 }
1122 }
1123 }
1124 // The download manager disables signal handling in the curl library;
1125 // as OpenSSL's implementation of TLS will generate a sigpipe in some
1126 // error paths, we must explicitly disable SIGPIPE here.
1127 // TODO(jblomer): it should be enough to do this once
1128 signal(SIGPIPE, SIG_IGN);
1129 }
1130
1131
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2773 times.
2773 if (url.find("@proxy@") != string::npos) {
1132 // This is used in Geo-API requests (only), to replace a portion of the
1133 // URL with the current proxy name for the sake of caching the result.
1134 // Replace the @proxy@ either with a passed in "forced" template (which
1135 // is set from $CVMFS_PROXY_TEMPLATE) if there is one, or a "direct"
1136 // template (which is the uuid) if there's no proxy, or the name of the
1137 // proxy.
1138 string replacement;
1139 if (proxy_template_forced_ != "") {
1140 replacement = proxy_template_forced_;
1141 } else if (info->proxy() == "DIRECT") {
1142 replacement = proxy_template_direct_;
1143 } else {
1144 if (opt_proxy_groups_current_ >= opt_proxy_groups_fallback_) {
1145 // It doesn't make sense to use the fallback proxies in Geo-API requests
1146 // since the fallback proxies are supposed to get sorted, too.
1147 info->SetProxy("DIRECT");
1148 curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY, "");
1149 replacement = proxy_template_direct_;
1150 } else {
1151 replacement = ChooseProxyUnlocked(info->expected_hash())->host.name();
1152 }
1153 }
1154 replacement = (replacement == "") ? proxy_template_direct_ : replacement;
1155 LogCvmfs(kLogDownload, kLogDebug,
1156 "(manager %s - id %" PRId64 ") "
1157 "replacing @proxy@ by %s",
1158 name_.c_str(), info->id(), replacement.c_str());
1159 url = ReplaceAll(url, "@proxy@", replacement);
1160 }
1161
1162 // TODO(heretherebedragons) before removing
1163 // static_cast<cvmfs::MemSink*>(info->sink)->size() == 0
1164 // and just always call info->sink->Reserve()
1165 // we should do a speed check
1166
3/4
✓ Branch 2 taken 2773 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 1118 times.
✓ Branch 5 taken 1655 times.
2773 if ((info->sink() != NULL) && info->sink()->RequiresReserve()
1167
1/2
✓ Branch 2 taken 1118 times.
✗ Branch 3 not taken.
1118 && (static_cast<cvmfs::MemSink *>(info->sink())->size() == 0)
1168
11/20
✓ Branch 1 taken 2773 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 1118 times.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 1118 times.
✗ Branch 9 not taken.
✓ Branch 10 taken 1088 times.
✓ Branch 11 taken 30 times.
✓ Branch 12 taken 1118 times.
✓ Branch 13 taken 1655 times.
✗ Branch 14 not taken.
✓ Branch 15 taken 1118 times.
✓ Branch 16 taken 1655 times.
✓ Branch 18 taken 1088 times.
✓ Branch 19 taken 1685 times.
✗ Branch 20 not taken.
✗ Branch 21 not taken.
✗ Branch 23 not taken.
✗ Branch 24 not taken.
5546 && HasPrefix(url, "file://", false)) {
1169 platform_stat64 stat_buf;
1170 1088 const int retval = platform_stat(url.c_str(), &stat_buf);
1171
1/2
✓ Branch 0 taken 1088 times.
✗ Branch 1 not taken.
1088 if (retval != 0) {
1172 // this is an error: file does not exist or out of memory
1173 // error is caught in other code section.
1174
1/2
✓ Branch 2 taken 1088 times.
✗ Branch 3 not taken.
1088 info->sink()->Reserve(64ul * 1024ul);
1175 } else {
1176 info->sink()->Reserve(stat_buf.st_size);
1177 }
1178 }
1179
1180
2/4
✓ Branch 2 taken 2773 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2773 times.
✗ Branch 7 not taken.
2773 curl_easy_setopt(curl_handle, CURLOPT_URL,
1181 EscapeUrl(info->id(), url).c_str());
1182 2773 }
1183
1184
1185 /**
1186 * Checks if the name resolving information is still up to date. The host
1187 * object should be one from the current load-balance group. If the information
1188 * changed, gather new set of resolved IPs and, if different, exchange them in
1189 * the load-balance group on the fly. In the latter case, also rebalance the
1190 * proxies. The options mutex needs to be open.
1191 *
1192 * Returns true if proxies may have changed.
1193 */
1194 8 bool DownloadManager::ValidateProxyIpsUnlocked(const string &url,
1195 const dns::Host &host) {
1196
2/4
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 8 times.
✗ Branch 4 not taken.
8 if (!host.IsExpired())
1197 8 return false;
1198 LogCvmfs(kLogDownload, kLogDebug, "(manager '%s') validate DNS entry for %s",
1199 name_.c_str(), host.name().c_str());
1200
1201 const unsigned group_idx = opt_proxy_groups_current_;
1202 dns::Host new_host = resolver_->Resolve(host.name());
1203
1204 bool update_only = true; // No changes to the list of IP addresses.
1205 if (new_host.status() != dns::kFailOk) {
1206 // Try again later in case resolving fails.
1207 LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
1208 "(manager '%s') failed to resolve IP addresses for %s (%d - %s)",
1209 name_.c_str(), host.name().c_str(), new_host.status(),
1210 dns::Code2Ascii(new_host.status()));
1211 new_host = dns::Host::ExtendDeadline(host, resolver_->min_ttl());
1212 } else if (!host.IsEquivalent(new_host)) {
1213 update_only = false;
1214 }
1215
1216 if (update_only) {
1217 for (unsigned i = 0; i < (*opt_proxy_groups_)[group_idx].size(); ++i) {
1218 if ((*opt_proxy_groups_)[group_idx][i].host.id() == host.id())
1219 (*opt_proxy_groups_)[group_idx][i].host = new_host;
1220 }
1221 return false;
1222 }
1223
1224 assert(new_host.status() == dns::kFailOk);
1225
1226 // Remove old host objects, insert new objects, and rebalance.
1227 LogCvmfs(kLogDownload, kLogDebug | kLogSyslog,
1228 "(manager '%s') DNS entries for proxy %s changed, adjusting",
1229 name_.c_str(), host.name().c_str());
1230 vector<ProxyInfo> *group = current_proxy_group();
1231 opt_num_proxies_ -= group->size();
1232 for (unsigned i = 0; i < group->size();) {
1233 if ((*group)[i].host.id() == host.id()) {
1234 group->erase(group->begin() + i);
1235 } else {
1236 i++;
1237 }
1238 }
1239 vector<ProxyInfo> new_infos;
1240 set<string> const best_addresses = new_host.ViewBestAddresses(opt_ip_preference_);
1241 set<string>::const_iterator iter_ips = best_addresses.begin();
1242 for (; iter_ips != best_addresses.end(); ++iter_ips) {
1243 const string url_ip = dns::RewriteUrl(url, *iter_ips);
1244 new_infos.push_back(ProxyInfo(new_host, url_ip));
1245 }
1246 group->insert(group->end(), new_infos.begin(), new_infos.end());
1247 opt_num_proxies_ += new_infos.size();
1248
1249 const std::string msg = "DNS entries for proxy " + host.name() + " changed";
1250
1251 RebalanceProxiesUnlocked(msg);
1252 return true;
1253 }
1254
1255
1256 /**
1257 * Adds transfer time and downloaded bytes to the global counters.
1258 */
1259 2899 void DownloadManager::UpdateStatistics(CURL *handle) {
1260 double val;
1261 int retval;
1262 2899 int64_t sum = 0;
1263
1264
1/2
✓ Branch 1 taken 2899 times.
✗ Branch 2 not taken.
2899 retval = curl_easy_getinfo(handle, CURLINFO_SIZE_DOWNLOAD, &val);
1265
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2899 times.
2899 assert(retval == CURLE_OK);
1266 2899 sum += static_cast<int64_t>(val);
1267 /*retval = curl_easy_getinfo(handle, CURLINFO_HEADER_SIZE, &val);
1268 assert(retval == CURLE_OK);
1269 sum += static_cast<int64_t>(val);*/
1270 2899 perf::Xadd(counters_->sz_transferred_bytes, sum);
1271 2899 }
1272
1273
1274 /**
1275 * Retry if possible if not on no-cache and if not already done too often.
1276 */
1277 2899 bool DownloadManager::CanRetry(const JobInfo *info) {
1278 2899 const MutexLockGuard m(lock_options_);
1279 2899 const unsigned max_retries = opt_max_retries_;
1280
1281
2/2
✓ Branch 2 taken 931 times.
✓ Branch 3 taken 1824 times.
5654 return !(info->nocache()) && (info->num_retries() < max_retries)
1282
3/4
✓ Branch 0 taken 2755 times.
✓ Branch 1 taken 144 times.
✓ Branch 4 taken 931 times.
✗ Branch 5 not taken.
6585 && (IsProxyTransferError(info->error_code())
1283
2/2
✓ Branch 2 taken 60 times.
✓ Branch 3 taken 871 times.
931 || IsHostTransferError(info->error_code()));
1284 2899 }
1285
1286 /**
1287 * Backoff for retry to introduce a jitter into a cluster of requesting
1288 * cvmfs nodes.
1289 * Retry only when HTTP caching is on.
1290 *
1291 * \return true if backoff has been performed, false otherwise
1292 */
1293 60 void DownloadManager::Backoff(JobInfo *info) {
1294 60 unsigned backoff_init_ms = 0;
1295 60 unsigned backoff_max_ms = 0;
1296 {
1297 60 const MutexLockGuard m(lock_options_);
1298 60 backoff_init_ms = opt_backoff_init_ms_;
1299 60 backoff_max_ms = opt_backoff_max_ms_;
1300 60 }
1301
1302 60 info->SetNumRetries(info->num_retries() + 1);
1303 60 perf::Inc(counters_->n_retries);
1304
2/2
✓ Branch 1 taken 22 times.
✓ Branch 2 taken 38 times.
60 if (info->backoff_ms() == 0) {
1305 22 info->SetBackoffMs(prng_.Next(backoff_init_ms + 1)); // Must be != 0
1306 } else {
1307 38 info->SetBackoffMs(info->backoff_ms() * 2);
1308 }
1309
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 60 times.
60 if (info->backoff_ms() > backoff_max_ms) {
1310 info->SetBackoffMs(backoff_max_ms);
1311 }
1312
1313 60 LogCvmfs(kLogDownload, kLogDebug,
1314 "(manager '%s' - id %" PRId64 ") backing off for %d ms",
1315 name_.c_str(), info->id(), info->backoff_ms());
1316 60 SafeSleepMs(info->backoff_ms());
1317 60 }
1318
1319 144 void DownloadManager::SetNocache(JobInfo *info) {
1320
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 144 times.
144 if (info->nocache())
1321 return;
1322 144 header_lists_->AppendHeader(info->headers(), "Pragma: no-cache");
1323 144 header_lists_->AppendHeader(info->headers(), "Cache-Control: no-cache");
1324 144 curl_easy_setopt(info->curl_handle(), CURLOPT_HTTPHEADER, info->headers());
1325 144 info->SetNocache(true);
1326 }
1327
1328
1329 /**
1330 * Reverse operation of SetNocache. Makes sure that "no-cache" header
1331 * disappears from the list of headers to let proxies work normally.
1332 */
1333 132 void DownloadManager::SetRegularCache(JobInfo *info) {
1334
1/2
✓ Branch 1 taken 132 times.
✗ Branch 2 not taken.
132 if (info->nocache() == false)
1335 132 return;
1336 header_lists_->CutHeader("Pragma: no-cache", info->GetHeadersPtr());
1337 header_lists_->CutHeader("Cache-Control: no-cache", info->GetHeadersPtr());
1338 curl_easy_setopt(info->curl_handle(), CURLOPT_HTTPHEADER, info->headers());
1339 info->SetNocache(false);
1340 }
1341
1342
1343 /**
1344 * Frees the storage associated with the authz attachment from the job
1345 */
1346 2773 void DownloadManager::ReleaseCredential(JobInfo *info) {
1347
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2773 times.
2773 if (info->cred_data()) {
1348 assert(credentials_attachment_ != NULL); // Someone must have set it
1349 credentials_attachment_->ReleaseCurlHandle(info->curl_handle(),
1350 info->cred_data());
1351 info->SetCredData(NULL);
1352 }
1353 2773 }
1354
1355
1356 /* Sort links based on the "pri=" parameter */
1357 static bool sortlinks(const std::string &s1, const std::string &s2) {
1358 const size_t pos1 = s1.find("; pri=");
1359 const size_t pos2 = s2.find("; pri=");
1360 int pri1, pri2;
1361 if ((pos1 != std::string::npos) && (pos2 != std::string::npos)
1362 && (sscanf(s1.substr(pos1 + 6).c_str(), "%d", &pri1) == 1)
1363 && (sscanf(s2.substr(pos2 + 6).c_str(), "%d", &pri2) == 1)) {
1364 return pri1 < pri2;
1365 }
1366 return false;
1367 }
1368
1369 /**
1370 * Parses Link header and uses it to set a new host chain.
1371 * See rfc6249.
1372 */
1373 void DownloadManager::ProcessLink(JobInfo *info) {
1374 std::vector<std::string> links = SplitString(info->link(), ',');
1375 if (info->link().find("; pri=") != std::string::npos)
1376 std::sort(links.begin(), links.end(), sortlinks);
1377
1378 std::vector<std::string> host_list;
1379
1380 std::vector<std::string>::const_iterator il = links.begin();
1381 for (; il != links.end(); ++il) {
1382 const std::string &link = *il;
1383 if ((link.find("; rel=duplicate") == std::string::npos)
1384 && (link.find("; rel=\"duplicate\"") == std::string::npos)) {
1385 LogCvmfs(kLogDownload, kLogDebug,
1386 "skipping link '%s' because it does not contain rel=duplicate",
1387 link.c_str());
1388 continue;
1389 }
1390 // ignore depth= field since there's nothing useful we can do with it
1391
1392 size_t start = link.find('<');
1393 if (start == std::string::npos) {
1394 LogCvmfs(
1395 kLogDownload, kLogDebug,
1396 "skipping link '%s' because it does not have a left angle bracket",
1397 link.c_str());
1398 continue;
1399 }
1400
1401 start++;
1402 if ((link.substr(start, 7) != "http://")
1403 && (link.substr(start, 8) != "https://")) {
1404 LogCvmfs(kLogDownload, kLogDebug,
1405 "skipping link '%s' of unrecognized url protocol", link.c_str());
1406 continue;
1407 }
1408
1409 size_t end = link.find('/', start + 8);
1410 if (end == std::string::npos)
1411 end = link.find('>');
1412 if (end == std::string::npos) {
1413 LogCvmfs(kLogDownload, kLogDebug,
1414 "skipping link '%s' because no slash in url and no right angle "
1415 "bracket",
1416 link.c_str());
1417 continue;
1418 }
1419 const std::string host = link.substr(start, end - start);
1420 LogCvmfs(kLogDownload, kLogDebug, "adding linked host '%s'", host.c_str());
1421 host_list.push_back(host);
1422 }
1423
1424 if (host_list.size() > 0) {
1425 SetHostChain(host_list);
1426 opt_metalink_timestamp_link_ = time(NULL);
1427 }
1428 }
1429
1430
1431 /**
1432 * Checks the result of a curl download and implements the failure logic, such
1433 * as changing the proxy server. Takes care of cleanup.
1434 *
1435 * \return true if another download should be performed, false otherwise
1436 */
1437 2899 bool DownloadManager::VerifyAndFinalize(const int curl_error, JobInfo *info) {
1438
1/2
✓ Branch 6 taken 2899 times.
✗ Branch 7 not taken.
2899 LogCvmfs(kLogDownload, kLogDebug,
1439 "(manager '%s' - id %" PRId64 ") "
1440 "Verify downloaded url %s, proxy %s (curl error %d)",
1441 name_.c_str(), info->id(), info->url()->c_str(),
1442
1/2
✓ Branch 1 taken 2899 times.
✗ Branch 2 not taken.
5798 info->proxy().c_str(), curl_error);
1443
1/2
✓ Branch 2 taken 2899 times.
✗ Branch 3 not taken.
2899 UpdateStatistics(info->curl_handle());
1444
1445 bool was_metalink;
1446 2899 std::string typ;
1447
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2899 times.
2899 if (info->current_metalink_chain_index() >= 0) {
1448 was_metalink = true;
1449 typ = "metalink";
1450 if (info->link() != "") {
1451 // process Link header whether or not the redirected URL got an error
1452 ProcessLink(info);
1453 }
1454 } else {
1455 2899 was_metalink = false;
1456
1/2
✓ Branch 1 taken 2899 times.
✗ Branch 2 not taken.
2899 typ = "host";
1457 }
1458
1459
1460 // Verification and error classification
1461
3/14
✓ Branch 0 taken 2681 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 216 times.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
✗ Branch 10 not taken.
✗ Branch 11 not taken.
✗ Branch 12 not taken.
✗ Branch 13 not taken.
2899 switch (curl_error) {
1462 2681 case CURLE_OK:
1463 // Verify content hash
1464
2/2
✓ Branch 1 taken 1842 times.
✓ Branch 2 taken 839 times.
2681 if (info->expected_hash()) {
1465
1/2
✓ Branch 1 taken 1842 times.
✗ Branch 2 not taken.
1842 shash::Any match_hash;
1466
1/2
✓ Branch 2 taken 1842 times.
✗ Branch 3 not taken.
1842 shash::Final(info->hash_context(), &match_hash);
1467
3/4
✓ Branch 2 taken 1842 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 132 times.
✓ Branch 5 taken 1710 times.
1842 if (match_hash != *(info->expected_hash())) {
1468
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 132 times.
132 if (ignore_signature_failures_) {
1469 LogCvmfs(
1470 kLogDownload, kLogDebug | kLogSyslogErr,
1471 "(manager '%s' - id %" PRId64 ") "
1472 "ignoring failed hash verification of %s (expected %s, got %s)",
1473 name_.c_str(), info->id(), info->url()->c_str(),
1474 info->expected_hash()->ToString().c_str(),
1475 match_hash.ToString().c_str());
1476 } else {
1477
1/2
✓ Branch 7 taken 132 times.
✗ Branch 8 not taken.
264 LogCvmfs(kLogDownload, kLogDebug,
1478 "(manager '%s' - id %" PRId64 ") "
1479 "hash verification of %s failed (expected %s, got %s)",
1480 name_.c_str(), info->id(), info->url()->c_str(),
1481
1/2
✓ Branch 2 taken 132 times.
✗ Branch 3 not taken.
264 info->expected_hash()->ToString().c_str(),
1482
1/2
✓ Branch 1 taken 132 times.
✗ Branch 2 not taken.
264 match_hash.ToString().c_str());
1483 132 info->SetErrorCode(kFailBadData);
1484 132 break;
1485 }
1486 }
1487 }
1488
1489 2549 info->SetErrorCode(kFailOk);
1490 2549 break;
1491 case CURLE_UNSUPPORTED_PROTOCOL:
1492 info->SetErrorCode(kFailUnsupportedProtocol);
1493 break;
1494 2 case CURLE_URL_MALFORMAT:
1495 2 info->SetErrorCode(kFailBadUrl);
1496 2 break;
1497 case CURLE_COULDNT_RESOLVE_PROXY:
1498 info->SetErrorCode(kFailProxyResolve);
1499 break;
1500 case CURLE_COULDNT_RESOLVE_HOST:
1501 info->SetErrorCode(kFailHostResolve);
1502 break;
1503 case CURLE_OPERATION_TIMEDOUT:
1504 info->SetErrorCode((info->proxy() == "DIRECT") ? kFailHostTooSlow
1505 : kFailProxyTooSlow);
1506 break;
1507 case CURLE_PARTIAL_FILE:
1508 case CURLE_GOT_NOTHING:
1509 case CURLE_RECV_ERROR:
1510 info->SetErrorCode((info->proxy() == "DIRECT") ? kFailHostShortTransfer
1511 : kFailProxyShortTransfer);
1512 break;
1513 216 case CURLE_FILE_COULDNT_READ_FILE:
1514 case CURLE_COULDNT_CONNECT:
1515
3/6
✓ Branch 1 taken 216 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 2 times.
✓ Branch 6 taken 214 times.
216 if (info->proxy() != "DIRECT") {
1516 // This is a guess. Fail-over can still change to switching host
1517 2 info->SetErrorCode(kFailProxyConnection);
1518 } else {
1519 214 info->SetErrorCode(kFailHostConnection);
1520 }
1521 216 break;
1522 case CURLE_TOO_MANY_REDIRECTS:
1523 info->SetErrorCode(kFailHostConnection);
1524 break;
1525 case CURLE_SSL_CACERT_BADFILE:
1526 LogCvmfs(kLogDownload, kLogDebug | kLogSyslogErr,
1527 "(manager '%s' -id %" PRId64 ") "
1528 "Failed to load certificate bundle. "
1529 "X509_CERT_BUNDLE might point to the wrong location.",
1530 name_.c_str(), info->id());
1531 info->SetErrorCode(kFailHostConnection);
1532 break;
1533 // As of curl 7.62.0, CURLE_SSL_CACERT is the same as
1534 // CURLE_PEER_FAILED_VERIFICATION
1535 case CURLE_PEER_FAILED_VERIFICATION:
1536 LogCvmfs(kLogDownload, kLogDebug | kLogSyslogErr,
1537 "(manager '%s' - id %" PRId64 ") "
1538 "invalid SSL certificate of remote host. "
1539 "X509_CERT_DIR and/or X509_CERT_BUNDLE might point to the wrong "
1540 "location.",
1541 name_.c_str(), info->id());
1542 info->SetErrorCode(kFailHostConnection);
1543 break;
1544 case CURLE_ABORTED_BY_CALLBACK:
1545 case CURLE_WRITE_ERROR:
1546 // Error set by callback
1547 break;
1548 case CURLE_SEND_ERROR:
1549 // The curl error CURLE_SEND_ERROR can be seen when a cache is misbehaving
1550 // and closing connections before the http request send is completed.
1551 // Handle this error, treating it as a short transfer error.
1552 info->SetErrorCode((info->proxy() == "DIRECT") ? kFailHostShortTransfer
1553 : kFailProxyShortTransfer);
1554 break;
1555 default:
1556 LogCvmfs(kLogDownload, kLogSyslogErr,
1557 "(manager '%s' - id %" PRId64 ") "
1558 "unexpected curl error (%d) while trying to fetch %s",
1559 name_.c_str(), info->id(), curl_error, info->url()->c_str());
1560 info->SetErrorCode(kFailOther);
1561 break;
1562 }
1563
1564 std::vector<std::string> *host_chain;
1565 unsigned char num_used_hosts;
1566
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2899 times.
2899 if (was_metalink) {
1567 host_chain = opt_metalink_.chain;
1568 num_used_hosts = info->num_used_metalinks();
1569 } else {
1570 2899 host_chain = opt_host_.chain;
1571 2899 num_used_hosts = info->num_used_hosts();
1572 }
1573
1574 // Determination if download should be repeated
1575 2899 bool try_again = false;
1576
1/2
✓ Branch 1 taken 2899 times.
✗ Branch 2 not taken.
2899 bool same_url_retry = CanRetry(info);
1577
2/2
✓ Branch 1 taken 350 times.
✓ Branch 2 taken 2549 times.
2899 if (info->error_code() != kFailOk) {
1578 350 const MutexLockGuard m(lock_options_);
1579
2/2
✓ Branch 1 taken 132 times.
✓ Branch 2 taken 218 times.
350 if (info->error_code() == kFailBadData) {
1580
2/2
✓ Branch 1 taken 66 times.
✓ Branch 2 taken 66 times.
132 if (!info->nocache()) {
1581 66 try_again = true;
1582 } else {
1583 // Make it a host failure
1584
1/2
✓ Branch 4 taken 66 times.
✗ Branch 5 not taken.
66 LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
1585 "(manager '%s' - id %" PRId64 ") "
1586 "data corruption with no-cache header, try another %s",
1587 name_.c_str(), info->id(), typ.c_str());
1588
1589 66 info->SetErrorCode(kFailHostHttp);
1590 }
1591 }
1592 350 if (same_url_retry
1593
5/6
✓ Branch 0 taken 290 times.
✓ Branch 1 taken 60 times.
✓ Branch 3 taken 290 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 66 times.
✓ Branch 6 taken 284 times.
640 || (((info->error_code() == kFailHostResolve)
1594
2/2
✓ Branch 2 taken 136 times.
✓ Branch 3 taken 154 times.
290 || IsHostTransferError(info->error_code())
1595
2/2
✓ Branch 1 taken 66 times.
✓ Branch 2 taken 70 times.
136 || (info->error_code() == kFailHostHttp))
1596
3/4
✓ Branch 1 taken 74 times.
✓ Branch 2 taken 146 times.
✓ Branch 3 taken 74 times.
✗ Branch 4 not taken.
220 && info->probe_hosts() && host_chain
1597
2/2
✓ Branch 1 taken 6 times.
✓ Branch 2 taken 68 times.
74 && (num_used_hosts < host_chain->size()))) {
1598 66 try_again = true;
1599 }
1600 350 if (same_url_retry
1601
5/6
✓ Branch 0 taken 290 times.
✓ Branch 1 taken 60 times.
✓ Branch 3 taken 290 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 62 times.
✓ Branch 6 taken 288 times.
640 || (((info->error_code() == kFailProxyResolve)
1602
2/2
✓ Branch 2 taken 288 times.
✓ Branch 3 taken 2 times.
290 || IsProxyTransferError(info->error_code())
1603
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 288 times.
288 || (info->error_code() == kFailProxyHttp)))) {
1604
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 62 times.
62 if (sharding_policy_.UseCount() > 0) { // sharding policy
1605 try_again = true;
1606 same_url_retry = false;
1607 } else { // no sharding
1608 62 try_again = true;
1609 // If all proxies failed, do a next round with the next host
1610
4/6
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 60 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 62 times.
62 if (!same_url_retry && (info->num_used_proxies() >= opt_num_proxies_)) {
1611 // Check if this can be made a host fail-over
1612 if (info->probe_hosts() && host_chain
1613 && (num_used_hosts < host_chain->size())) {
1614 // reset proxy group if not already performed by other handle
1615 if (opt_proxy_groups_) {
1616 if ((opt_proxy_groups_current_ > 0)
1617 || (opt_proxy_groups_current_burned_ > 0)) {
1618 opt_proxy_groups_current_ = 0;
1619 opt_timestamp_backup_proxies_ = 0;
1620 const std::string msg = "reset proxies for " + typ
1621 + " failover";
1622 RebalanceProxiesUnlocked(msg);
1623 }
1624 }
1625
1626 // Make it a host failure
1627 LogCvmfs(kLogDownload, kLogDebug,
1628 "(manager '%s' - id %" PRId64 ") make it a %s failure",
1629 name_.c_str(), info->id(), typ.c_str());
1630 info->SetNumUsedProxies(1);
1631 info->SetErrorCode(kFailHostAfterProxy);
1632 } else {
1633 if (failover_indefinitely_) {
1634 // Instead of giving up, reset the num_used_proxies counter,
1635 // switch proxy and try again
1636 LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
1637 "(manager '%s' - id %" PRId64 ") "
1638 "VerifyAndFinalize() would fail the download here. "
1639 "Instead switch proxy and retry download. "
1640 "typ=%s "
1641 "info->probe_hosts=%d host_chain=%p num_used_hosts=%d "
1642 "host_chain->size()=%lu same_url_retry=%d "
1643 "info->num_used_proxies=%d opt_num_proxies_=%d",
1644 name_.c_str(), info->id(), typ.c_str(),
1645 static_cast<int>(info->probe_hosts()), host_chain,
1646 num_used_hosts, host_chain ? host_chain->size() : -1,
1647 static_cast<int>(same_url_retry),
1648 info->num_used_proxies(), opt_num_proxies_);
1649 info->SetNumUsedProxies(1);
1650 RebalanceProxiesUnlocked(
1651 "download failed - failover indefinitely");
1652 try_again = !Interrupted(fqrn_, info);
1653 } else {
1654 try_again = false;
1655 }
1656 }
1657 } // Make a proxy failure a host failure
1658 } // Proxy failure assumed
1659 } // end !sharding
1660 350 }
1661
1662
2/2
✓ Branch 0 taken 134 times.
✓ Branch 1 taken 2765 times.
2899 if (try_again) {
1663
1/2
✓ Branch 3 taken 134 times.
✗ Branch 4 not taken.
134 LogCvmfs(kLogDownload, kLogDebug,
1664 "(manager '%s' - id %" PRId64 ") "
1665 "Trying again on same curl handle, same url: %d, "
1666 "error code %d no-cache %d",
1667 134 name_.c_str(), info->id(), same_url_retry, info->error_code(),
1668 134 info->nocache());
1669 // Reset internal state and destination
1670
4/8
✓ Branch 1 taken 134 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 134 times.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 134 times.
✗ Branch 9 not taken.
✓ Branch 10 taken 134 times.
134 if (info->sink() != NULL && info->sink()->Reset() != 0) {
1671 info->SetErrorCode(kFailLocalIO);
1672 goto verify_and_finalize_stop;
1673 }
1674
6/8
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 132 times.
✓ Branch 5 taken 2 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 2 times.
✗ Branch 8 not taken.
✓ Branch 9 taken 2 times.
✓ Branch 10 taken 132 times.
134 if (info->interrupt_cue() && info->interrupt_cue()->IsCanceled()) {
1675 2 info->SetErrorCode(kFailCanceled);
1676 2 goto verify_and_finalize_stop;
1677 }
1678
1679
2/2
✓ Branch 1 taken 114 times.
✓ Branch 2 taken 18 times.
132 if (info->expected_hash()) {
1680
1/2
✓ Branch 2 taken 114 times.
✗ Branch 3 not taken.
114 shash::Init(info->hash_context());
1681 }
1682
2/2
✓ Branch 1 taken 114 times.
✓ Branch 2 taken 18 times.
132 if (info->compressed()) {
1683
1/2
✓ Branch 2 taken 114 times.
✗ Branch 3 not taken.
114 zlib::DecompressInit(info->GetZstreamPtr());
1684 }
1685
1686
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 132 times.
132 if (sharding_policy_.UseCount() > 0) { // sharding policy
1687 ReleaseCredential(info);
1688 SetUrlOptions(info);
1689 } else { // no sharding policy
1690
1/2
✓ Branch 1 taken 132 times.
✗ Branch 2 not taken.
132 SetRegularCache(info);
1691
1692 // Failure handling
1693 132 bool switch_proxy = false;
1694 132 bool switch_host = false;
1695
2/4
✓ Branch 1 taken 66 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 66 times.
132 switch (info->error_code()) {
1696 66 case kFailBadData:
1697
1/2
✓ Branch 1 taken 66 times.
✗ Branch 2 not taken.
66 SetNocache(info);
1698 66 break;
1699 case kFailProxyResolve:
1700 case kFailProxyHttp:
1701 switch_proxy = true;
1702 break;
1703 case kFailHostResolve:
1704 case kFailHostHttp:
1705 case kFailHostAfterProxy:
1706 switch_host = true;
1707 break;
1708 66 default:
1709
2/2
✓ Branch 2 taken 2 times.
✓ Branch 3 taken 64 times.
66 if (IsProxyTransferError(info->error_code())) {
1710
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (same_url_retry) {
1711 Backoff(info);
1712 } else {
1713 2 switch_proxy = true;
1714 }
1715
1/2
✓ Branch 2 taken 64 times.
✗ Branch 3 not taken.
64 } else if (IsHostTransferError(info->error_code())) {
1716
2/2
✓ Branch 0 taken 60 times.
✓ Branch 1 taken 4 times.
64 if (same_url_retry) {
1717
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 Backoff(info);
1718 } else {
1719 4 switch_host = true;
1720 }
1721 } else {
1722 // No other errors expected when retrying
1723 PANIC(NULL);
1724 }
1725 }
1726
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 130 times.
132 if (switch_proxy) {
1727
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 ReleaseCredential(info);
1728
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 SwitchProxy(info);
1729 2 info->SetNumUsedProxies(info->num_used_proxies() + 1);
1730
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 SetUrlOptions(info);
1731 }
1732
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 128 times.
132 if (switch_host) {
1733
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 ReleaseCredential(info);
1734
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
4 if (was_metalink) {
1735 SwitchMetalink(info);
1736 info->SetNumUsedMetalinks(num_used_hosts + 1);
1737 } else {
1738
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 SwitchHost(info);
1739 4 info->SetNumUsedHosts(num_used_hosts + 1);
1740 }
1741
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 SetUrlOptions(info);
1742 }
1743 } // end !sharding
1744
1745
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 132 times.
132 if (failover_indefinitely_) {
1746 // try again, breaking if there's a cvmfs reload happening and we are in a
1747 // proxy failover. This will EIO the call application.
1748 return !Interrupted(fqrn_, info);
1749 }
1750 132 return true; // try again
1751 }
1752
1753 2765 verify_and_finalize_stop:
1754 // Finalize, flush destination file.
1755
1/2
✓ Branch 1 taken 2767 times.
✗ Branch 2 not taken.
2767 ReleaseCredential(info);
1756
1757 // In parallel-decompress mode (data_tube_ valid) the caller has not yet
1758 // pushed bytes through the sink / zstream — those operations happen on
1759 // the caller thread when it pops the queued chunks. Flushing the sink
1760 // and tearing down the zstream here would race the caller's decompress
1761 // and corrupt the output. Defer both to the caller.
1762
1/2
✓ Branch 1 taken 2767 times.
✗ Branch 2 not taken.
2767 const bool defer_finalize = info->IsValidDataTube();
1763
1/2
✓ Branch 0 taken 2767 times.
✗ Branch 1 not taken.
2767 if (!defer_finalize) {
1764
4/8
✓ Branch 1 taken 2767 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 2767 times.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 2767 times.
✗ Branch 9 not taken.
✓ Branch 10 taken 2767 times.
2767 if (info->sink() != NULL && info->sink()->Flush() != 0) {
1765 info->SetErrorCode(kFailLocalIO);
1766 }
1767
2/2
✓ Branch 1 taken 1899 times.
✓ Branch 2 taken 868 times.
2767 if (info->compressed())
1768
1/2
✓ Branch 2 taken 1899 times.
✗ Branch 3 not taken.
1899 zlib::DecompressFini(info->GetZstreamPtr());
1769 }
1770
1771
1/2
✓ Branch 1 taken 2767 times.
✗ Branch 2 not taken.
2767 if (info->headers()) {
1772
1/2
✓ Branch 2 taken 2767 times.
✗ Branch 3 not taken.
2767 header_lists_->PutList(info->headers());
1773 2767 info->SetHeaders(NULL);
1774 }
1775
1776 2767 return false; // stop transfer and return to Fetch()
1777 2899 }
1778
1779 1966 DownloadManager::~DownloadManager() {
1780 // cleaned up fini
1781
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1966 times.
1966 if (sharding_policy_.UseCount() > 0) {
1782 sharding_policy_.Reset();
1783 }
1784
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1966 times.
1966 if (health_check_.UseCount() > 0) {
1785 if (health_check_.Unique()) {
1786 LogCvmfs(kLogDownload, kLogDebug,
1787 "(manager '%s') Stopping healthcheck thread", name_.c_str());
1788 health_check_->StopHealthcheck();
1789 }
1790 health_check_.Reset();
1791 }
1792
1793
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1966 times.
1966 if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1794 // Shutdown I/O thread
1795 pipe_terminate_->Write(kPipeTerminateSignal);
1796 pthread_join(thread_download_, NULL);
1797 // All handles are removed from the multi stack
1798 pipe_terminate_.Destroy();
1799 pipe_jobs_.Destroy();
1800 }
1801
1802 3932 for (set<CURL *>::iterator i = pool_handles_idle_->begin(),
1803 1966 iEnd = pool_handles_idle_->end();
1804
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1966 times.
1966 i != iEnd;
1805 ++i) {
1806 curl_easy_cleanup(*i);
1807 }
1808
1809
1/2
✓ Branch 0 taken 1966 times.
✗ Branch 1 not taken.
1966 delete pool_handles_idle_;
1810
1/2
✓ Branch 0 taken 1966 times.
✗ Branch 1 not taken.
1966 delete pool_handles_inuse_;
1811 1966 curl_multi_cleanup(curl_multi_);
1812
1813
1/2
✓ Branch 0 taken 1966 times.
✗ Branch 1 not taken.
1966 delete header_lists_;
1814
1/2
✓ Branch 0 taken 1966 times.
✗ Branch 1 not taken.
1966 if (user_agent_)
1815 1966 free(user_agent_);
1816
1817
1/2
✓ Branch 0 taken 1966 times.
✗ Branch 1 not taken.
1966 delete counters_;
1818
2/2
✓ Branch 0 taken 880 times.
✓ Branch 1 taken 1086 times.
1966 delete opt_host_.chain;
1819
2/2
✓ Branch 0 taken 880 times.
✓ Branch 1 taken 1086 times.
1966 delete opt_host_chain_rtt_;
1820
2/2
✓ Branch 0 taken 750 times.
✓ Branch 1 taken 1216 times.
1966 delete opt_proxy_groups_;
1821
1822 1966 curl_global_cleanup();
1823
1/2
✓ Branch 0 taken 1966 times.
✗ Branch 1 not taken.
1966 delete resolver_;
1824
1825 // old destructor
1826 1966 pthread_mutex_destroy(lock_options_);
1827 1966 pthread_mutex_destroy(lock_synchronous_mode_);
1828 1966 free(lock_options_);
1829 1966 free(lock_synchronous_mode_);
1830 1966 }
1831
1832 1967 void DownloadManager::InitHeaders() {
1833 // User-Agent
1834
1/2
✓ Branch 2 taken 1967 times.
✗ Branch 3 not taken.
1967 string cernvm_id = "User-Agent: cvmfs ";
1835 #ifdef CVMFS_LIBCVMFS
1836
1/2
✓ Branch 1 taken 1967 times.
✗ Branch 2 not taken.
1967 cernvm_id += "libcvmfs ";
1837 #else
1838 cernvm_id += "Fuse ";
1839 #endif
1840
2/4
✓ Branch 2 taken 1967 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1967 times.
✗ Branch 6 not taken.
1967 cernvm_id += string(CVMFS_VERSION);
1841
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1967 times.
1967 if (getenv("CERNVM_UUID") != NULL) {
1842 cernvm_id += " "
1843 + sanitizer::InputSanitizer("az AZ 09 -")
1844 .Filter(getenv("CERNVM_UUID"));
1845 }
1846 1967 user_agent_ = strdup(cernvm_id.c_str());
1847
1848
1/2
✓ Branch 1 taken 1967 times.
✗ Branch 2 not taken.
1967 header_lists_ = new HeaderLists();
1849
1850
1/2
✓ Branch 1 taken 1967 times.
✗ Branch 2 not taken.
1967 default_headers_ = header_lists_->GetList("Connection: Keep-Alive");
1851
1/2
✓ Branch 1 taken 1967 times.
✗ Branch 2 not taken.
1967 header_lists_->AppendHeader(default_headers_, "Pragma:");
1852
1/2
✓ Branch 1 taken 1967 times.
✗ Branch 2 not taken.
1967 header_lists_->AppendHeader(default_headers_, user_agent_);
1853 1967 }
1854
1855 1967 DownloadManager::DownloadManager(const unsigned max_pool_handles,
1856 const perf::StatisticsTemplate &statistics,
1857 1967 const std::string &name)
1858 1967 : prng_(Prng())
1859 1967 , pool_handles_idle_(new set<CURL *>)
1860 1967 , pool_handles_inuse_(new set<CURL *>)
1861 1967 , pool_max_handles_(max_pool_handles)
1862 1967 , pipe_terminate_(NULL)
1863
1/2
✓ Branch 1 taken 1967 times.
✗ Branch 2 not taken.
1967 , pipe_jobs_(NULL)
1864 1967 , watch_fds_(NULL)
1865 1967 , watch_fds_size_(0)
1866 1967 , watch_fds_inuse_(0)
1867 1967 , watch_fds_max_(4 * max_pool_handles)
1868 1967 , opt_timeout_proxy_(5)
1869 1967 , opt_timeout_direct_(10)
1870 1967 , opt_low_speed_limit_(1024)
1871 1967 , opt_max_retries_(0)
1872 1967 , opt_backoff_init_ms_(0)
1873 1967 , opt_backoff_max_ms_(0)
1874 1967 , enable_info_header_(false)
1875 1967 , opt_ipv4_only_(false)
1876 1967 , follow_redirects_(false)
1877 1967 , ignore_signature_failures_(false)
1878 1967 , enable_http_tracing_(false)
1879 1967 , opt_metalink_(NULL, 0, 0, 0)
1880 1967 , opt_metalink_timestamp_link_(0)
1881 1967 , opt_host_(NULL, 0, 0, 0)
1882 1967 , opt_host_chain_rtt_(NULL)
1883 1967 , opt_proxy_groups_(NULL)
1884 1967 , opt_proxy_groups_current_(0)
1885 1967 , opt_proxy_groups_current_burned_(0)
1886 1967 , opt_proxy_groups_fallback_(0)
1887 1967 , opt_num_proxies_(0)
1888 1967 , opt_proxy_shard_(false)
1889 1967 , failover_indefinitely_(false)
1890
1/2
✓ Branch 1 taken 1967 times.
✗ Branch 2 not taken.
1967 , name_(name)
1891 1967 , opt_ip_preference_(dns::kIpPreferSystem)
1892 1967 , opt_timestamp_backup_proxies_(0)
1893 1967 , opt_timestamp_failover_proxies_(0)
1894 1967 , opt_proxy_groups_reset_after_(0)
1895 1967 , credentials_attachment_(NULL)
1896
4/8
✓ Branch 13 taken 1967 times.
✗ Branch 14 not taken.
✓ Branch 16 taken 1967 times.
✗ Branch 17 not taken.
✓ Branch 19 taken 1967 times.
✗ Branch 20 not taken.
✓ Branch 23 taken 1967 times.
✗ Branch 24 not taken.
5901 , counters_(new Counters(statistics)) {
1897 1967 atomic_init32(&multi_threaded_);
1898
1899 1967 lock_options_ = reinterpret_cast<pthread_mutex_t *>(
1900 1967 smalloc(sizeof(pthread_mutex_t)));
1901 1967 int retval = pthread_mutex_init(lock_options_, NULL);
1902
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1967 times.
1967 assert(retval == 0);
1903 1967 lock_synchronous_mode_ = reinterpret_cast<pthread_mutex_t *>(
1904 1967 smalloc(sizeof(pthread_mutex_t)));
1905 1967 retval = pthread_mutex_init(lock_synchronous_mode_, NULL);
1906
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1967 times.
1967 assert(retval == 0);
1907
1908
1/2
✓ Branch 1 taken 1967 times.
✗ Branch 2 not taken.
1967 retval = curl_global_init(CURL_GLOBAL_ALL);
1909
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1967 times.
1967 assert(retval == CURLE_OK);
1910
1911
1/2
✓ Branch 1 taken 1967 times.
✗ Branch 2 not taken.
1967 InitHeaders();
1912
1913
1/2
✓ Branch 1 taken 1967 times.
✗ Branch 2 not taken.
1967 curl_multi_ = curl_multi_init();
1914
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1967 times.
1967 assert(curl_multi_ != NULL);
1915
1/2
✓ Branch 1 taken 1967 times.
✗ Branch 2 not taken.
1967 curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION, CallbackCurlSocket);
1916
1/2
✓ Branch 1 taken 1967 times.
✗ Branch 2 not taken.
1967 curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1917 static_cast<void *>(this));
1918
1/2
✓ Branch 1 taken 1967 times.
✗ Branch 2 not taken.
1967 curl_multi_setopt(curl_multi_, CURLMOPT_MAXCONNECTS, watch_fds_max_);
1919
1/2
✓ Branch 1 taken 1967 times.
✗ Branch 2 not taken.
1967 curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1920 pool_max_handles_);
1921
1922 1967 prng_.InitLocaltime();
1923
1924 // Name resolving
1925 1967 if ((getenv("CVMFS_IPV4_ONLY") != NULL)
1926
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 1967 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 1967 times.
1967 && (strlen(getenv("CVMFS_IPV4_ONLY")) > 0)) {
1927 opt_ipv4_only_ = true;
1928 }
1929
1/2
✓ Branch 1 taken 1967 times.
✗ Branch 2 not taken.
1967 resolver_ = dns::NormalResolver::Create(opt_ipv4_only_, kDnsDefaultRetries,
1930 kDnsDefaultTimeoutMs);
1931
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1967 times.
1967 assert(resolver_);
1932 1967 }
1933
1934 /**
1935 * Spawns the I/O worker thread and switches the module in multi-threaded mode.
1936 * No way back except Fini(); Init();
1937 */
1938 void DownloadManager::Spawn() {
1939 pipe_terminate_ = new Pipe<kPipeThreadTerminator>();
1940 pipe_jobs_ = new Pipe<kPipeDownloadJobs>();
1941
1942 const int retval = pthread_create(&thread_download_, NULL, MainDownload,
1943 static_cast<void *>(this));
1944 assert(retval == 0);
1945
1946 atomic_inc32(&multi_threaded_);
1947
1948 if (health_check_.UseCount() > 0) {
1949 LogCvmfs(kLogDownload, kLogDebug,
1950 "(manager '%s') Starting healthcheck thread", name_.c_str());
1951 health_check_->StartHealthcheck();
1952 }
1953 }
1954
1955
1956 /**
1957 * Downloads data from an insecure outside channel (currently HTTP or file).
1958 */
1959 2767 Failures DownloadManager::Fetch(JobInfo *info) {
1960
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2767 times.
2767 assert(info != NULL);
1961
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2767 times.
2767 assert(info->url() != NULL);
1962
1963 Failures result;
1964
1/2
✓ Branch 1 taken 2767 times.
✗ Branch 2 not taken.
2767 result = PrepareDownloadDestination(info);
1965
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2767 times.
2767 if (result != kFailOk)
1966 return result;
1967
1968
2/2
✓ Branch 1 taken 1913 times.
✓ Branch 2 taken 854 times.
2767 if (info->expected_hash()) {
1969 1913 const shash::Algorithms algorithm = info->expected_hash()->algorithm;
1970 1913 info->GetHashContextPtr()->algorithm = algorithm;
1971
1/2
✓ Branch 1 taken 1913 times.
✗ Branch 2 not taken.
1913 info->GetHashContextPtr()->size = shash::GetContextSize(algorithm);
1972 1913 info->GetHashContextPtr()->buffer = alloca(info->hash_context().size);
1973 }
1974
1975 // In case JobInfo object is being reused
1976
2/4
✓ Branch 2 taken 2767 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2767 times.
✗ Branch 6 not taken.
2767 info->SetLink("");
1977
1978 // Prepare cvmfs-info: header, allocate string on the stack
1979 2767 info->SetInfoHeader(NULL);
1980
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2767 times.
2767 if (enable_info_header_) {
1981 const string header_info = info->GetInfoHeaderContents(info_header_template_);
1982 if (header_info != "") {
1983 const char * const header_name = "cvmfs-info: ";
1984 const size_t header_name_len = strlen(header_name);
1985 const size_t info_len = header_info.length();
1986 char *buf = static_cast<char *>(alloca(header_name_len + info_len + 1));
1987 memcpy(buf, header_name, header_name_len);
1988 memcpy(buf + header_name_len, header_info.c_str(), info_len);
1989 buf[header_name_len + info_len] = '\0';
1990 info->SetInfoHeader(buf);
1991 }
1992 }
1993
1994
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2767 times.
2767 if (enable_http_tracing_) {
1995 const std::string str_pid = "X-CVMFS-PID: " + StringifyInt(info->pid());
1996 const std::string str_gid = "X-CVMFS-GID: " + StringifyUint(info->gid());
1997 const std::string str_uid = "X-CVMFS-UID: " + StringifyUint(info->uid());
1998
1999 // will be auto freed at the end of this function Fetch(JobInfo *info)
2000 info->SetTracingHeaderPid(static_cast<char *>(alloca(str_pid.size() + 1)));
2001 info->SetTracingHeaderGid(static_cast<char *>(alloca(str_gid.size() + 1)));
2002 info->SetTracingHeaderUid(static_cast<char *>(alloca(str_uid.size() + 1)));
2003
2004 memcpy(info->tracing_header_pid(), str_pid.c_str(), str_pid.size() + 1);
2005 memcpy(info->tracing_header_gid(), str_gid.c_str(), str_gid.size() + 1);
2006 memcpy(info->tracing_header_uid(), str_uid.c_str(), str_uid.size() + 1);
2007 }
2008
2009
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2767 times.
2767 if (atomic_xadd32(&multi_threaded_, 0) == 1) {
2010 if (!info->IsValidPipeJobResults()) {
2011 info->CreatePipeJobResults();
2012 }
2013 if (!info->IsValidDataTube()) {
2014 info->CreateDataTube();
2015 }
2016
2017 // LogCvmfs(kLogDownload, kLogDebug, "send job to thread, pipe %d %d",
2018 // info->wait_at[0], info->wait_at[1]);
2019 pipe_jobs_->Write<JobInfo *>(info);
2020
2021 // Decompress / copy chunks on this caller thread (in parallel across
2022 // concurrent Fetch() callers) instead of on the single MainDownload
2023 // thread. Track decompression errors locally and apply them at the end
2024 // — VerifyAndFinalize already ran by the time kActionStop arrives.
2025 Failures decompress_err = kFailOk;
2026 do {
2027 DataTubeElement *ele = info->GetDataTubePtr()->PopFront();
2028
2029 if (ele->action == kActionStop) {
2030 delete ele;
2031 break;
2032 }
2033
2034 if (decompress_err == kFailOk && ele->size > 0) {
2035 if (info->compressed()) {
2036 const zlib::StreamStates retval = zlib::DecompressZStream2Sink(
2037 ele->data, static_cast<int64_t>(ele->size),
2038 info->GetZstreamPtr(), info->sink());
2039 if (retval == zlib::kStreamDataError) {
2040 LogCvmfs(kLogDownload, kLogSyslogErr,
2041 "(id %" PRId64 ") failed to decompress %s",
2042 info->id(), info->url()->c_str());
2043 decompress_err = kFailBadData;
2044 } else if (retval == zlib::kStreamIOError) {
2045 LogCvmfs(kLogDownload, kLogSyslogErr,
2046 "(id %" PRId64 ") decompressing %s, local IO error",
2047 info->id(), info->url()->c_str());
2048 decompress_err = kFailLocalIO;
2049 }
2050 } else {
2051 const int64_t written = info->sink()->Write(ele->data, ele->size);
2052 if (written < 0 ||
2053 static_cast<uint64_t>(written) != ele->size) {
2054 LogCvmfs(kLogDownload, kLogDebug,
2055 "(id %" PRId64 ") sink write failed for %s (%ld of %zu)",
2056 info->id(), info->url()->c_str(),
2057 static_cast<long>(written), ele->size);
2058 decompress_err = kFailLocalIO;
2059 }
2060 }
2061 }
2062 delete ele;
2063 } while (true);
2064
2065 info->GetPipeJobResultPtr()->Read<download::Failures>(&result);
2066
2067 // Caller-side finalize (deferred from VerifyAndFinalize). Must run after
2068 // all kActionDecompress chunks have been popped and decompressed.
2069 if (result == kFailOk && decompress_err == kFailOk) {
2070 if (info->sink() != NULL && info->sink()->Flush() != 0) {
2071 decompress_err = kFailLocalIO;
2072 }
2073 }
2074 if (info->compressed()) {
2075 zlib::DecompressFini(info->GetZstreamPtr());
2076 }
2077 if (result == kFailOk && decompress_err != kFailOk) {
2078 result = decompress_err;
2079 info->SetErrorCode(decompress_err);
2080 if (info->sink() != NULL) {
2081 info->sink()->Purge();
2082 }
2083 }
2084 // LogCvmfs(kLogDownload, kLogDebug, "got result %d", result);
2085 } else {
2086 2767 const MutexLockGuard l(lock_synchronous_mode_);
2087
1/2
✓ Branch 1 taken 2767 times.
✗ Branch 2 not taken.
2767 CURL *handle = AcquireCurlHandle();
2088
1/2
✓ Branch 1 taken 2767 times.
✗ Branch 2 not taken.
2767 InitializeRequest(info, handle);
2089
1/2
✓ Branch 1 taken 2767 times.
✗ Branch 2 not taken.
2767 SetUrlOptions(info);
2090 // curl_easy_setopt(handle, CURLOPT_VERBOSE, 1);
2091 int retval;
2092 do {
2093
1/2
✓ Branch 1 taken 2899 times.
✗ Branch 2 not taken.
2899 retval = curl_easy_perform(handle);
2094 2899 perf::Inc(counters_->n_requests);
2095 double elapsed;
2096
1/2
✓ Branch 1 taken 2899 times.
✗ Branch 2 not taken.
2899 if (curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &elapsed)
2097
1/2
✓ Branch 0 taken 2899 times.
✗ Branch 1 not taken.
2899 == CURLE_OK) {
2098 2899 perf::Xadd(counters_->sz_transfer_time,
2099 2899 static_cast<int64_t>(elapsed * 1000));
2100 }
2101
3/4
✓ Branch 1 taken 2899 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 132 times.
✓ Branch 4 taken 2767 times.
2899 } while (VerifyAndFinalize(retval, info));
2102 2767 result = info->error_code();
2103 // Prevent the handle from being added to the idle list, which
2104 // avoids that it is mistakenly picked up by the multi handle later
2105 // in multi-threaded context. This, in turn, would fail to wait for
2106 // for resolver threads that meanwhile vanished due to a fork()
2107 // in Daemonize()
2108
1/2
✓ Branch 2 taken 2767 times.
✗ Branch 3 not taken.
2767 ReleaseCurlHandle(info->curl_handle(), false /* allow_reuse */);
2109 2767 }
2110
2111
2/2
✓ Branch 0 taken 218 times.
✓ Branch 1 taken 2549 times.
2767 if (result != kFailOk) {
2112
1/2
✓ Branch 4 taken 218 times.
✗ Branch 5 not taken.
218 LogCvmfs(kLogDownload, kLogDebug,
2113 "(manager '%s' - id %" PRId64 ") "
2114 "download failed (error %d - %s)",
2115 name_.c_str(), info->id(), result, Code2Ascii(result));
2116
2117
1/2
✓ Branch 1 taken 218 times.
✗ Branch 2 not taken.
218 if (info->sink() != NULL) {
2118
1/2
✓ Branch 2 taken 218 times.
✗ Branch 3 not taken.
218 info->sink()->Purge();
2119 }
2120 }
2121
2122 2767 return result;
2123 }
2124
2125
2126 /**
2127 * Used by the client to connect the authz session manager to the download
2128 * manager.
2129 */
2130 622 void DownloadManager::SetCredentialsAttachment(CredentialsAttachment *ca) {
2131 622 const MutexLockGuard m(lock_options_);
2132 622 credentials_attachment_ = ca;
2133 622 }
2134
2135 /**
2136 * Gets the DNS sever.
2137 */
2138 std::string DownloadManager::GetDnsServer() const { return opt_dns_server_; }
2139
2140 /**
2141 * Sets a DNS server. Only for testing as it cannot be reverted to the system
2142 * default.
2143 */
2144 void DownloadManager::SetDnsServer(const string &address) {
2145 if (!address.empty()) {
2146 const MutexLockGuard m(lock_options_);
2147 opt_dns_server_ = address;
2148 assert(!opt_dns_server_.empty());
2149
2150 vector<string> servers;
2151 servers.push_back(address);
2152 const bool retval = resolver_->SetResolvers(servers);
2153 assert(retval);
2154 }
2155 LogCvmfs(kLogDownload, kLogSyslog, "(manager '%s') set nameserver to %s",
2156 name_.c_str(), address.c_str());
2157 }
2158
2159
2160 /**
2161 * Sets the DNS query timeout parameters.
2162 */
2163 998 void DownloadManager::SetDnsParameters(const unsigned retries,
2164 const unsigned timeout_ms) {
2165 998 const MutexLockGuard m(lock_options_);
2166 998 if ((resolver_->retries() == retries)
2167
3/6
✓ Branch 0 taken 998 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 998 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 998 times.
✗ Branch 6 not taken.
998 && (resolver_->timeout_ms() == timeout_ms)) {
2168 998 return;
2169 }
2170 delete resolver_;
2171 resolver_ = NULL;
2172 resolver_ = dns::NormalResolver::Create(opt_ipv4_only_, retries, timeout_ms);
2173 assert(resolver_);
2174
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 998 times.
998 }
2175
2176
2177 998 void DownloadManager::SetDnsTtlLimits(const unsigned min_seconds,
2178 const unsigned max_seconds) {
2179 998 const MutexLockGuard m(lock_options_);
2180 998 resolver_->set_min_ttl(min_seconds);
2181 998 resolver_->set_max_ttl(max_seconds);
2182 998 }
2183
2184
2185 void DownloadManager::SetIpPreference(dns::IpPreference preference) {
2186 const MutexLockGuard m(lock_options_);
2187 opt_ip_preference_ = preference;
2188 }
2189
2190
2191 /**
2192 * Sets two timeout values for proxied and for direct connections, respectively.
2193 * The timeout counts for all sorts of connection phases,
2194 * DNS, HTTP connect, etc.
2195 */
2196 1317 void DownloadManager::SetTimeout(const unsigned seconds_proxy,
2197 const unsigned seconds_direct) {
2198 1317 const MutexLockGuard m(lock_options_);
2199 1317 opt_timeout_proxy_ = seconds_proxy;
2200 1317 opt_timeout_direct_ = seconds_direct;
2201 1317 }
2202
2203
2204 /**
2205 * Sets contains the average transfer speed in bytes per second that the
2206 * transfer should be below during CURLOPT_LOW_SPEED_TIME seconds for libcurl to
2207 * consider it to be too slow and abort. Only effective for new connections.
2208 */
2209 void DownloadManager::SetLowSpeedLimit(const unsigned low_speed_limit) {
2210 const MutexLockGuard m(lock_options_);
2211 opt_low_speed_limit_ = low_speed_limit;
2212 }
2213
2214
2215 /**
2216 * Receives the currently active timeout values.
2217 */
2218 372 void DownloadManager::GetTimeout(unsigned *seconds_proxy,
2219 unsigned *seconds_direct) {
2220 372 const MutexLockGuard m(lock_options_);
2221 372 *seconds_proxy = opt_timeout_proxy_;
2222 372 *seconds_direct = opt_timeout_direct_;
2223 372 }
2224
2225
2226 /**
2227 * Parses a list of ';'-separated hosts for the metalink chain. The empty
2228 * string removes the metalink list.
2229 */
2230 void DownloadManager::SetMetalinkChain(const string &metalink_list) {
2231 SetMetalinkChain(SplitString(metalink_list, ';'));
2232 }
2233
2234
2235 void DownloadManager::SetMetalinkChain(
2236 const std::vector<std::string> &metalink_list) {
2237 const MutexLockGuard m(lock_options_);
2238 opt_metalink_.timestamp_backup = 0;
2239 delete opt_metalink_.chain;
2240 opt_metalink_.current = 0;
2241
2242 if (metalink_list.empty()) {
2243 opt_metalink_.chain = NULL;
2244 return;
2245 }
2246
2247 opt_metalink_.chain = new vector<string>(metalink_list);
2248 }
2249
2250
2251 /**
2252 * Retrieves the currently set chain of metalink hosts and the currently
2253 * used metalink host.
2254 */
2255 void DownloadManager::GetMetalinkInfo(vector<string> *metalink_chain,
2256 unsigned *current_metalink) {
2257 const MutexLockGuard m(lock_options_);
2258 if (opt_metalink_.chain) {
2259 if (current_metalink) {
2260 *current_metalink = opt_metalink_.current;
2261 }
2262 if (metalink_chain) {
2263 *metalink_chain = *opt_metalink_.chain;
2264 }
2265 }
2266 }
2267
2268
2269 /**
2270 * Parses a list of ';'-separated hosts for the host chain. The empty string
2271 * removes the host list.
2272 */
2273 881 void DownloadManager::SetHostChain(const string &host_list) {
2274
1/2
✓ Branch 2 taken 881 times.
✗ Branch 3 not taken.
881 SetHostChain(SplitString(host_list, ';'));
2275 881 }
2276
2277
2278 913 void DownloadManager::SetHostChain(const std::vector<std::string> &host_list) {
2279 913 const MutexLockGuard m(lock_options_);
2280 913 opt_host_.timestamp_backup = 0;
2281
2/2
✓ Branch 0 taken 405 times.
✓ Branch 1 taken 508 times.
913 delete opt_host_.chain;
2282
2/2
✓ Branch 0 taken 405 times.
✓ Branch 1 taken 508 times.
913 delete opt_host_chain_rtt_;
2283 913 opt_host_.current = 0;
2284
2285
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 913 times.
913 if (host_list.empty()) {
2286 opt_host_.chain = NULL;
2287 opt_host_chain_rtt_ = NULL;
2288 return;
2289 }
2290
2291
2/4
✓ Branch 1 taken 913 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 913 times.
✗ Branch 5 not taken.
913 opt_host_.chain = new vector<string>(host_list);
2292 1826 opt_host_chain_rtt_ = new vector<int>(opt_host_.chain->size(),
2293
2/4
✓ Branch 1 taken 913 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 913 times.
✗ Branch 5 not taken.
913 kProbeUnprobed);
2294 // LogCvmfs(kLogDownload, kLogSyslog, "using host %s",
2295 // (*opt_host_.chain)[0].c_str());
2296
1/2
✓ Branch 1 taken 913 times.
✗ Branch 2 not taken.
913 }
2297
2298
2299 /**
2300 * Retrieves the currently set chain of hosts, their round trip times, and the
2301 * currently used host.
2302 */
2303 224 void DownloadManager::GetHostInfo(vector<string> *host_chain, vector<int> *rtt,
2304 unsigned *current_host) {
2305 224 const MutexLockGuard m(lock_options_);
2306
1/2
✓ Branch 0 taken 224 times.
✗ Branch 1 not taken.
224 if (opt_host_.chain) {
2307
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 224 times.
224 if (current_host) {
2308 *current_host = opt_host_.current;
2309 }
2310
1/2
✓ Branch 0 taken 224 times.
✗ Branch 1 not taken.
224 if (host_chain) {
2311
1/2
✓ Branch 1 taken 224 times.
✗ Branch 2 not taken.
224 *host_chain = *opt_host_.chain;
2312 }
2313
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 224 times.
224 if (rtt) {
2314 *rtt = *opt_host_chain_rtt_;
2315 }
2316 }
2317 224 }
2318
2319
2320 /**
2321 * Jumps to the next proxy in the ring of forward proxy servers.
2322 * Selects one randomly from a load-balancing group.
2323 *
2324 * Allow for the fact that the proxy may have already been failed by
2325 * another transfer, or that the proxy may no longer be part of the
2326 * current load-balancing group.
2327 */
2328 2 void DownloadManager::SwitchProxy(JobInfo *info) {
2329 2 const MutexLockGuard m(lock_options_);
2330
2331
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (!opt_proxy_groups_) {
2332 return;
2333 }
2334
2335 // Fail any matching proxies within the current load-balancing group
2336 2 vector<ProxyInfo> *group = current_proxy_group();
2337 2 const unsigned group_size = group->size();
2338 2 unsigned failed = 0;
2339
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2 times.
4 for (unsigned i = 0; i < group_size - opt_proxy_groups_current_burned_; ++i) {
2340
5/14
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 2 times.
✗ Branch 8 not taken.
✓ Branch 9 taken 2 times.
✗ Branch 10 not taken.
✗ Branch 11 not taken.
✓ Branch 12 taken 2 times.
✗ Branch 13 not taken.
✗ Branch 14 not taken.
✗ Branch 15 not taken.
2 if (info && (info->proxy() == (*group)[i].url)) {
2341 // Move to list of failed proxies
2342 2 opt_proxy_groups_current_burned_++;
2343
1/2
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
2 swap((*group)[i],
2344 2 (*group)[group_size - opt_proxy_groups_current_burned_]);
2345 2 perf::Inc(counters_->n_proxy_failover);
2346 2 failed++;
2347 }
2348 }
2349
2350 // Do nothing more unless at least one proxy was marked as failed
2351
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (!failed)
2352 return;
2353
2354 // If all proxies from the current load-balancing group are burned, switch to
2355 // another group
2356
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (opt_proxy_groups_current_burned_ == group->size()) {
2357 2 opt_proxy_groups_current_burned_ = 0;
2358
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (opt_proxy_groups_->size() > 1) {
2359 4 opt_proxy_groups_current_ = (opt_proxy_groups_current_ + 1)
2360 2 % opt_proxy_groups_->size();
2361 // Remember the timestamp of switching to backup proxies
2362
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (opt_proxy_groups_reset_after_ > 0) {
2363 if (opt_proxy_groups_current_ > 0) {
2364 if (opt_timestamp_backup_proxies_ == 0)
2365 opt_timestamp_backup_proxies_ = time(NULL);
2366 // LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2367 // "switched to (another) backup proxy group");
2368 } else {
2369 opt_timestamp_backup_proxies_ = 0;
2370 // LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2371 // "switched back to primary proxy group");
2372 }
2373 opt_timestamp_failover_proxies_ = 0;
2374 }
2375 }
2376 } else {
2377 // Record failover time
2378 if (opt_proxy_groups_reset_after_ > 0) {
2379 if (opt_timestamp_failover_proxies_ == 0)
2380 opt_timestamp_failover_proxies_ = time(NULL);
2381 }
2382 }
2383
2384
2/4
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2 times.
✗ Branch 6 not taken.
2 UpdateProxiesUnlocked("failed proxy");
2385
1/2
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
2 LogCvmfs(kLogDownload, kLogDebug,
2386 "(manager '%s' - id %" PRId64 ") "
2387 "%lu proxies remain in group",
2388 name_.c_str(), info->id(),
2389 2 current_proxy_group()->size() - opt_proxy_groups_current_burned_);
2390
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 }
2391
2392
2393 /**
2394 * Switches to the next host in the chain. If jobinfo is set, switch only if
2395 * the current host is identical to the one used by jobinfo, otherwise another
2396 * transfer has already done the switch.
2397 */
2398 5 void DownloadManager::SwitchHostInfo(const std::string &typ,
2399 HostInfo &info,
2400 JobInfo *jobinfo) {
2401 5 const MutexLockGuard m(lock_options_);
2402
2403
5/6
✓ Branch 0 taken 5 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 1 times.
✓ Branch 4 taken 4 times.
✓ Branch 5 taken 1 times.
✓ Branch 6 taken 4 times.
5 if (!info.chain || (info.chain->size() == 1)) {
2404 1 return;
2405 }
2406
2407
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 if (jobinfo) {
2408 int lastused;
2409
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 if (typ == "host") {
2410 4 lastused = jobinfo->current_host_chain_index();
2411 } else {
2412 lastused = jobinfo->current_metalink_chain_index();
2413 }
2414
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
4 if (lastused != info.current) {
2415 LogCvmfs(kLogDownload, kLogDebug,
2416 "(manager '%s' - id %" PRId64 ")"
2417 "don't switch %s, "
2418 "last used %s: %s, current %s: %s",
2419 name_.c_str(), jobinfo->id(), typ.c_str(), typ.c_str(),
2420 (*info.chain)[lastused].c_str(), typ.c_str(),
2421 (*info.chain)[info.current].c_str());
2422 return;
2423 }
2424 }
2425
2426
1/2
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
4 string reason = "manually triggered";
2427
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 string info_id = "(manager " + name_;
2428
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 if (jobinfo) {
2429
1/2
✓ Branch 3 taken 4 times.
✗ Branch 4 not taken.
4 reason = download::Code2Ascii(jobinfo->error_code());
2430
2/4
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
4 info_id = " - id " + StringifyInt(jobinfo->id());
2431 }
2432
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 info_id += ")";
2433
2434
1/2
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
4 const std::string old_host = (*info.chain)[info.current];
2435 4 info.current = (info.current + 1) % static_cast<int>(info.chain->size());
2436
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 if (typ == "host") {
2437 4 perf::Inc(counters_->n_host_failover);
2438 } else {
2439 perf::Inc(counters_->n_metalink_failover);
2440 }
2441
1/3
✓ Branch 6 taken 4 times.
✗ Branch 7 not taken.
✗ Branch 8 not taken.
8 LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2442 "%s switching %s from %s to %s (%s)", info_id.c_str(), typ.c_str(),
2443 4 old_host.c_str(), (*info.chain)[info.current].c_str(),
2444 reason.c_str());
2445
2446 // Remember the timestamp of switching to backup host
2447
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
4 if (info.reset_after > 0) {
2448 if (info.current != 0) {
2449 if (info.timestamp_backup == 0)
2450 info.timestamp_backup = time(NULL);
2451 } else {
2452 info.timestamp_backup = 0;
2453 }
2454 }
2455
2/2
✓ Branch 4 taken 4 times.
✓ Branch 5 taken 1 times.
5 }
2456
2457 5 void DownloadManager::SwitchHost(JobInfo *info) {
2458
2/4
✓ Branch 2 taken 5 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 5 times.
✗ Branch 6 not taken.
5 SwitchHostInfo("host", opt_host_, info);
2459 5 }
2460
2461 1 void DownloadManager::SwitchHost() { SwitchHost(NULL); }
2462
2463
2464 void DownloadManager::SwitchMetalink(JobInfo *info) {
2465 SwitchHostInfo("metalink", opt_metalink_, info);
2466 }
2467
2468
2469 void DownloadManager::SwitchMetalink() { SwitchMetalink(NULL); }
2470
2471 965 bool DownloadManager::CheckMetalinkChain(time_t now) {
2472 965 return (opt_metalink_.chain
2473
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 965 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
965 && ((opt_metalink_timestamp_link_ == 0)
2474 || (static_cast<int64_t>((now == 0) ? time(NULL) : now)
2475 > static_cast<int64_t>(opt_metalink_timestamp_link_
2476
0/2
✗ Branch 0 not taken.
✗ Branch 1 not taken.
965 + opt_metalink_.reset_after))));
2477 }
2478
2479
2480 /**
2481 * Orders the hostlist according to RTT of downloading .cvmfschecksum.
2482 * Sets the current host to the best-responsive host.
2483 * If you change the host list in between by SetHostChain(), it will be
2484 * overwritten by this function.
2485 */
2486 void DownloadManager::ProbeHosts() {
2487 vector<string> host_chain;
2488 vector<int> host_rtt;
2489 unsigned current_host;
2490
2491 GetHostInfo(&host_chain, &host_rtt, &current_host);
2492
2493 // Stopwatch, two times to fill caches first
2494 unsigned i, retries;
2495 string url;
2496
2497 cvmfs::MemSink memsink;
2498 JobInfo info(&url, false, false, NULL, &memsink);
2499 for (retries = 0; retries < 2; ++retries) {
2500 for (i = 0; i < host_chain.size(); ++i) {
2501 url = host_chain[i] + "/.cvmfspublished";
2502
2503 struct timeval tv_start, tv_end;
2504 gettimeofday(&tv_start, NULL);
2505 const Failures result = Fetch(&info);
2506 gettimeofday(&tv_end, NULL);
2507 memsink.Reset();
2508 if (result == kFailOk) {
2509 host_rtt[i] = static_cast<int>(DiffTimeSeconds(tv_start, tv_end)
2510 * 1000);
2511 LogCvmfs(kLogDownload, kLogDebug,
2512 "(manager '%s' - id %" PRId64 ") "
2513 "probing host %s had %dms rtt",
2514 name_.c_str(), info.id(), url.c_str(), host_rtt[i]);
2515 } else {
2516 LogCvmfs(kLogDownload, kLogDebug,
2517 "(manager '%s' - id %" PRId64 ") "
2518 "error while probing host %s: %d %s",
2519 name_.c_str(), info.id(), url.c_str(), result,
2520 Code2Ascii(result));
2521 host_rtt[i] = INT_MAX;
2522 }
2523 }
2524 }
2525
2526 SortTeam(&host_rtt, &host_chain);
2527 for (i = 0; i < host_chain.size(); ++i) {
2528 if (host_rtt[i] == INT_MAX)
2529 host_rtt[i] = kProbeDown;
2530 }
2531
2532 const MutexLockGuard m(lock_options_);
2533 delete opt_host_.chain;
2534 delete opt_host_chain_rtt_;
2535 opt_host_.chain = new vector<string>(host_chain);
2536 opt_host_chain_rtt_ = new vector<int>(host_rtt);
2537 opt_host_.current = 0;
2538 }
2539
2540 bool DownloadManager::GeoSortServers(std::vector<std::string> *servers,
2541 std::vector<uint64_t> *output_order) {
2542 if (!servers) {
2543 return false;
2544 }
2545 if (servers->size() == 1) {
2546 if (output_order) {
2547 output_order->clear();
2548 output_order->push_back(0);
2549 }
2550 return true;
2551 }
2552
2553 std::vector<std::string> host_chain;
2554 GetHostInfo(&host_chain, NULL, NULL);
2555
2556 std::vector<std::string> server_dns_names;
2557 server_dns_names.reserve(servers->size());
2558 for (unsigned i = 0; i < servers->size(); ++i) {
2559 const std::string host = dns::ExtractHost((*servers)[i]);
2560 server_dns_names.push_back(host.empty() ? (*servers)[i] : host);
2561 }
2562 const std::string host_list = JoinStrings(server_dns_names, ",");
2563
2564 vector<string> host_chain_shuffled;
2565 {
2566 // Protect against concurrent access to prng_
2567 const MutexLockGuard m(lock_options_);
2568 // Determine random hosts for the Geo-API query
2569 host_chain_shuffled = Shuffle(host_chain, &prng_);
2570 }
2571 // Request ordered list via Geo-API
2572 bool success = false;
2573 const unsigned max_attempts = std::min(host_chain_shuffled.size(), size_t(3));
2574 vector<uint64_t> geo_order(servers->size());
2575 for (unsigned i = 0; i < max_attempts; ++i) {
2576 const string url = host_chain_shuffled[i] + "/api/v1.0/geo/@proxy@/"
2577 + host_list;
2578 LogCvmfs(kLogDownload, kLogDebug,
2579 "(manager '%s') requesting ordered server list from %s",
2580 name_.c_str(), url.c_str());
2581 cvmfs::MemSink memsink;
2582 JobInfo info(&url, false, false, NULL, &memsink);
2583 const Failures result = Fetch(&info);
2584 if (result == kFailOk) {
2585 const string order(reinterpret_cast<char *>(memsink.data()),
2586 memsink.pos());
2587 memsink.Reset();
2588 const bool retval = ValidateGeoReply(order, servers->size(), &geo_order);
2589 if (!retval) {
2590 LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2591 "(manager '%s') retrieved invalid GeoAPI reply from %s [%s]",
2592 name_.c_str(), url.c_str(), order.c_str());
2593 } else {
2594 LogCvmfs(kLogDownload, kLogDebug | kLogSyslog,
2595 "(manager '%s') "
2596 "geographic order of servers retrieved from %s",
2597 name_.c_str(),
2598 dns::ExtractHost(host_chain_shuffled[i]).c_str());
2599 // remove new line at end of "order"
2600 LogCvmfs(kLogDownload, kLogDebug, "order is %s",
2601 Trim(order, true /* trim_newline */).c_str());
2602 success = true;
2603 break;
2604 }
2605 } else {
2606 LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2607 "(manager '%s') GeoAPI request for %s failed with error %d [%s]",
2608 name_.c_str(), url.c_str(), result, Code2Ascii(result));
2609 }
2610 }
2611 if (!success) {
2612 LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2613 "(manager '%s') "
2614 "failed to retrieve geographic order from stratum 1 servers",
2615 name_.c_str());
2616 return false;
2617 }
2618
2619 if (output_order) {
2620 output_order->swap(geo_order);
2621 } else {
2622 std::vector<std::string> sorted_servers;
2623 sorted_servers.reserve(geo_order.size());
2624 for (unsigned i = 0; i < geo_order.size(); ++i) {
2625 const uint64_t orderval = geo_order[i];
2626 sorted_servers.push_back((*servers)[orderval]);
2627 }
2628 servers->swap(sorted_servers);
2629 }
2630 return true;
2631 }
2632
2633
2634 /**
2635 * Uses the Geo-API of Stratum 1s to let any of them order the list of servers
2636 * and fallback proxies (if any).
2637 * Tries at most three random Stratum 1s before giving up.
2638 * If you change the host list in between by SetHostChain() or the fallback
2639 * proxy list by SetProxyChain(), they will be overwritten by this function.
2640 */
2641 bool DownloadManager::ProbeGeo() {
2642 vector<string> host_chain;
2643 vector<int> host_rtt;
2644 unsigned current_host;
2645 vector<vector<ProxyInfo> > proxy_chain;
2646 unsigned fallback_group;
2647
2648 GetHostInfo(&host_chain, &host_rtt, &current_host);
2649 GetProxyInfo(&proxy_chain, NULL, &fallback_group);
2650 if ((host_chain.size() < 2) && ((proxy_chain.size() - fallback_group) < 2))
2651 return true;
2652
2653 vector<string> host_names;
2654 for (unsigned i = 0; i < host_chain.size(); ++i)
2655 host_names.push_back(dns::ExtractHost(host_chain[i]));
2656 SortTeam(&host_names, &host_chain);
2657 const unsigned last_geo_host = host_names.size();
2658
2659 if ((fallback_group == 0) && (last_geo_host > 1)) {
2660 // There are no non-fallback proxies, which means that the client
2661 // will always use the fallback proxies. Add a keyword separator
2662 // between the hosts and fallback proxies so the geosorting service
2663 // will know to sort the hosts based on the distance from the
2664 // closest fallback proxy rather than the distance from the client.
2665 host_names.push_back("+PXYSEP+");
2666 }
2667
2668 // Add fallback proxy names to the end of the host list
2669 const unsigned first_geo_fallback = host_names.size();
2670 for (unsigned i = fallback_group; i < proxy_chain.size(); ++i) {
2671 // We only take the first fallback proxy name from every group under the
2672 // assumption that load-balanced servers are at the same location
2673 host_names.push_back(proxy_chain[i][0].host.name());
2674 }
2675
2676 std::vector<uint64_t> geo_order;
2677 const bool success = GeoSortServers(&host_names, &geo_order);
2678 if (!success) {
2679 // GeoSortServers already logged a failure message.
2680 return false;
2681 }
2682
2683 // Re-install host chain and proxy chain
2684 const MutexLockGuard m(lock_options_);
2685 delete opt_host_.chain;
2686 opt_num_proxies_ = 0;
2687 opt_host_.chain = new vector<string>(host_chain.size());
2688
2689 // It's possible that opt_proxy_groups_fallback_ might have changed while
2690 // the lock wasn't held
2691 vector<vector<ProxyInfo> > *proxy_groups = new vector<vector<ProxyInfo> >(
2692 opt_proxy_groups_fallback_ + proxy_chain.size() - fallback_group);
2693 // First copy the non-fallback part of the current proxy chain
2694 for (unsigned i = 0; i < opt_proxy_groups_fallback_; ++i) {
2695 (*proxy_groups)[i] = (*opt_proxy_groups_)[i];
2696 opt_num_proxies_ += (*opt_proxy_groups_)[i].size();
2697 }
2698
2699 // Copy the host chain and fallback proxies by geo order. Array indices
2700 // in geo_order that are smaller than last_geo_host refer to a stratum 1,
2701 // and those indices greater than or equal to first_geo_fallback refer to
2702 // a fallback proxy.
2703 unsigned hosti = 0;
2704 unsigned proxyi = opt_proxy_groups_fallback_;
2705 for (unsigned i = 0; i < geo_order.size(); ++i) {
2706 const uint64_t orderval = geo_order[i];
2707 if (orderval < static_cast<uint64_t>(last_geo_host)) {
2708 // LogCvmfs(kLogCvmfs, kLogSyslog, "this is orderval %u at host index
2709 // %u", orderval, hosti);
2710 (*opt_host_.chain)[hosti++] = host_chain[orderval];
2711 } else if (orderval >= static_cast<uint64_t>(first_geo_fallback)) {
2712 // LogCvmfs(kLogCvmfs, kLogSyslog,
2713 // "this is orderval %u at proxy index %u, using proxy_chain index %u",
2714 // orderval, proxyi, fallback_group + orderval - first_geo_fallback);
2715 (*proxy_groups)[proxyi] = proxy_chain[fallback_group + orderval
2716 - first_geo_fallback];
2717 opt_num_proxies_ += (*proxy_groups)[proxyi].size();
2718 proxyi++;
2719 }
2720 }
2721
2722 opt_proxy_map_.clear();
2723 delete opt_proxy_groups_;
2724 opt_proxy_groups_ = proxy_groups;
2725 // In pathological cases, opt_proxy_groups_current_ can be larger now when
2726 // proxies changed in-between.
2727 if (opt_proxy_groups_current_ > opt_proxy_groups_->size()) {
2728 if (opt_proxy_groups_->size() == 0) {
2729 opt_proxy_groups_current_ = 0;
2730 } else {
2731 opt_proxy_groups_current_ = opt_proxy_groups_->size() - 1;
2732 }
2733 opt_proxy_groups_current_burned_ = 0;
2734 }
2735
2736 UpdateProxiesUnlocked("geosort");
2737
2738 delete opt_host_chain_rtt_;
2739 opt_host_chain_rtt_ = new vector<int>(host_chain.size(), kProbeGeo);
2740 opt_host_.current = 0;
2741
2742 return true;
2743 }
2744
2745
2746 /**
2747 * Validates a string of the form "1,4,2,3" representing in which order the
2748 * the expected_size number of hosts should be put for optimal geographic
2749 * proximity. Returns false if the reply_order string is invalid, otherwise
2750 * fills in the reply_vals array with zero-based order indexes (e.g.
2751 * [0,3,1,2]) and returns true.
2752 */
2753 28 bool DownloadManager::ValidateGeoReply(const string &reply_order,
2754 const unsigned expected_size,
2755 vector<uint64_t> *reply_vals) {
2756
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 26 times.
28 if (reply_order.empty())
2757 2 return false;
2758
2/4
✓ Branch 2 taken 26 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 26 times.
✗ Branch 6 not taken.
52 const sanitizer::InputSanitizer sanitizer("09 , \n");
2759
3/4
✓ Branch 1 taken 26 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
✓ Branch 4 taken 24 times.
26 if (!sanitizer.IsValid(reply_order))
2760 2 return false;
2761
2/4
✓ Branch 2 taken 24 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 24 times.
✗ Branch 6 not taken.
48 const sanitizer::InputSanitizer strip_newline("09 ,");
2762
1/2
✓ Branch 1 taken 24 times.
✗ Branch 2 not taken.
24 vector<string> reply_strings = SplitString(strip_newline.Filter(reply_order),
2763
1/2
✓ Branch 1 taken 24 times.
✗ Branch 2 not taken.
24 ',');
2764 24 vector<uint64_t> tmp_vals;
2765
2/2
✓ Branch 1 taken 46 times.
✓ Branch 2 taken 20 times.
66 for (unsigned i = 0; i < reply_strings.size(); ++i) {
2766
2/2
✓ Branch 2 taken 4 times.
✓ Branch 3 taken 42 times.
46 if (reply_strings[i].empty())
2767 4 return false;
2768
2/4
✓ Branch 2 taken 42 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 42 times.
✗ Branch 6 not taken.
42 tmp_vals.push_back(String2Uint64(reply_strings[i]));
2769 }
2770
2/2
✓ Branch 1 taken 10 times.
✓ Branch 2 taken 10 times.
20 if (tmp_vals.size() != expected_size)
2771 10 return false;
2772
2773 // Check if tmp_vals contains the number 1..n
2774
1/2
✓ Branch 3 taken 10 times.
✗ Branch 4 not taken.
10 set<uint64_t> const coverage(tmp_vals.begin(), tmp_vals.end());
2775
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 10 times.
10 if (coverage.size() != tmp_vals.size())
2776 return false;
2777
5/6
✓ Branch 2 taken 8 times.
✓ Branch 3 taken 2 times.
✗ Branch 7 not taken.
✓ Branch 8 taken 8 times.
✓ Branch 9 taken 2 times.
✓ Branch 10 taken 8 times.
10 if ((*coverage.begin() != 1) || (*coverage.rbegin() != coverage.size()))
2778 2 return false;
2779
2780
2/2
✓ Branch 0 taken 18 times.
✓ Branch 1 taken 8 times.
26 for (unsigned i = 0; i < expected_size; ++i) {
2781 18 (*reply_vals)[i] = tmp_vals[i] - 1;
2782 }
2783 8 return true;
2784 26 }
2785
2786
2787 /**
2788 * Removes DIRECT from a list of ';' and '|' separated proxies.
2789 * \return true if DIRECT was present, false otherwise
2790 */
2791 770 bool DownloadManager::StripDirect(const string &proxy_list,
2792 string *cleaned_list) {
2793
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 770 times.
770 assert(cleaned_list);
2794
2/2
✓ Branch 1 taken 752 times.
✓ Branch 2 taken 18 times.
770 if (proxy_list == "") {
2795
1/2
✓ Branch 1 taken 752 times.
✗ Branch 2 not taken.
752 *cleaned_list = "";
2796 752 return false;
2797 }
2798 18 bool result = false;
2799
2800
1/2
✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
18 vector<string> proxy_groups = SplitString(proxy_list, ';');
2801 18 vector<string> cleaned_groups;
2802
2/2
✓ Branch 1 taken 44 times.
✓ Branch 2 taken 18 times.
62 for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2803
1/2
✓ Branch 2 taken 44 times.
✗ Branch 3 not taken.
44 vector<string> group = SplitString(proxy_groups[i], '|');
2804 44 vector<string> cleaned;
2805
2/2
✓ Branch 1 taken 74 times.
✓ Branch 2 taken 44 times.
118 for (unsigned j = 0; j < group.size(); ++j) {
2806
6/6
✓ Branch 2 taken 54 times.
✓ Branch 3 taken 20 times.
✓ Branch 6 taken 26 times.
✓ Branch 7 taken 28 times.
✓ Branch 8 taken 46 times.
✓ Branch 9 taken 28 times.
74 if ((group[j] == "DIRECT") || (group[j] == "")) {
2807 46 result = true;
2808 } else {
2809
1/2
✓ Branch 2 taken 28 times.
✗ Branch 3 not taken.
28 cleaned.push_back(group[j]);
2810 }
2811 }
2812
2/2
✓ Branch 1 taken 14 times.
✓ Branch 2 taken 30 times.
44 if (!cleaned.empty())
2813
3/6
✓ Branch 2 taken 14 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 14 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 14 times.
✗ Branch 9 not taken.
14 cleaned_groups.push_back(JoinStrings(cleaned, "|"));
2814 44 }
2815
2816
2/4
✓ Branch 2 taken 18 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 18 times.
✗ Branch 6 not taken.
18 *cleaned_list = JoinStrings(cleaned_groups, ";");
2817 18 return result;
2818 18 }
2819
2820
2821 /**
2822 * Parses a list of ';'- and '|'-separated proxy servers and fallback proxy
2823 * servers for the proxy groups.
2824 * The empty string for both removes the proxy chain.
2825 * The set_mode parameter can be used to set either proxies (leaving fallback
2826 * proxies unchanged) or fallback proxies (leaving regular proxies unchanged)
2827 * or both.
2828 */
2829 750 void DownloadManager::SetProxyChain(const string &proxy_list,
2830 const string &fallback_proxy_list,
2831 const ProxySetModes set_mode) {
2832 750 const MutexLockGuard m(lock_options_);
2833
2834 750 opt_timestamp_backup_proxies_ = 0;
2835 750 opt_timestamp_failover_proxies_ = 0;
2836
1/2
✓ Branch 1 taken 750 times.
✗ Branch 2 not taken.
750 string set_proxy_list = opt_proxy_list_;
2837
1/2
✓ Branch 1 taken 750 times.
✗ Branch 2 not taken.
750 string set_proxy_fallback_list = opt_proxy_fallback_list_;
2838 bool contains_direct;
2839
3/4
✓ Branch 0 taken 750 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 744 times.
✓ Branch 3 taken 6 times.
750 if ((set_mode == kSetProxyFallback) || (set_mode == kSetProxyBoth)) {
2840
1/2
✓ Branch 1 taken 744 times.
✗ Branch 2 not taken.
744 opt_proxy_fallback_list_ = fallback_proxy_list;
2841 }
2842
3/4
✓ Branch 0 taken 744 times.
✓ Branch 1 taken 6 times.
✓ Branch 2 taken 744 times.
✗ Branch 3 not taken.
750 if ((set_mode == kSetProxyRegular) || (set_mode == kSetProxyBoth)) {
2843
1/2
✓ Branch 1 taken 750 times.
✗ Branch 2 not taken.
750 opt_proxy_list_ = proxy_list;
2844 }
2845
1/2
✓ Branch 1 taken 750 times.
✗ Branch 2 not taken.
750 contains_direct = StripDirect(opt_proxy_fallback_list_,
2846 &set_proxy_fallback_list);
2847
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 750 times.
750 if (contains_direct) {
2848 LogCvmfs(kLogDownload, kLogSyslogWarn | kLogDebug,
2849 "(manager '%s') fallback proxies do not support DIRECT, removing",
2850 name_.c_str());
2851 }
2852
1/2
✓ Branch 1 taken 750 times.
✗ Branch 2 not taken.
750 if (set_proxy_fallback_list == "") {
2853
1/2
✓ Branch 1 taken 750 times.
✗ Branch 2 not taken.
750 set_proxy_list = opt_proxy_list_;
2854 } else {
2855 const bool contains_direct = StripDirect(opt_proxy_list_, &set_proxy_list);
2856 if (contains_direct) {
2857 LogCvmfs(kLogDownload, kLogSyslog | kLogDebug,
2858 "(manager '%s') skipping DIRECT proxy to use fallback proxy",
2859 name_.c_str());
2860 }
2861 }
2862
2863 // From this point on, use set_proxy_list and set_fallback_proxy_list as
2864 // effective proxy lists!
2865
2866 750 opt_proxy_map_.clear();
2867
2/2
✓ Branch 0 taken 372 times.
✓ Branch 1 taken 378 times.
750 delete opt_proxy_groups_;
2868
2/6
✗ Branch 1 not taken.
✓ Branch 2 taken 750 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 750 times.
750 if ((set_proxy_list == "") && (set_proxy_fallback_list == "")) {
2869 opt_proxy_groups_ = NULL;
2870 opt_proxy_groups_current_ = 0;
2871 opt_proxy_groups_current_burned_ = 0;
2872 opt_proxy_groups_fallback_ = 0;
2873 opt_num_proxies_ = 0;
2874 return;
2875 }
2876
2877 // Determine number of regular proxy groups (== first fallback proxy group)
2878 750 opt_proxy_groups_fallback_ = 0;
2879
1/2
✓ Branch 1 taken 750 times.
✗ Branch 2 not taken.
750 if (set_proxy_list != "") {
2880
1/2
✓ Branch 1 taken 750 times.
✗ Branch 2 not taken.
750 opt_proxy_groups_fallback_ = SplitString(set_proxy_list, ';').size();
2881 }
2882
1/2
✓ Branch 2 taken 750 times.
✗ Branch 3 not taken.
750 LogCvmfs(kLogDownload, kLogDebug,
2883 "(manager '%s') "
2884 "first fallback proxy group %u",
2885 name_.c_str(), opt_proxy_groups_fallback_);
2886
2887 // Concatenate regular proxies and fallback proxies, both of which can be
2888 // empty.
2889
1/2
✓ Branch 1 taken 750 times.
✗ Branch 2 not taken.
750 string all_proxy_list = set_proxy_list;
2890
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 750 times.
750 if (set_proxy_fallback_list != "") {
2891 if (all_proxy_list != "")
2892 all_proxy_list += ";";
2893 all_proxy_list += set_proxy_fallback_list;
2894 }
2895
1/2
✓ Branch 3 taken 750 times.
✗ Branch 4 not taken.
750 LogCvmfs(kLogDownload, kLogDebug, "(manager '%s') full proxy list %s",
2896 name_.c_str(), all_proxy_list.c_str());
2897
2898 // Resolve server names in provided urls
2899 750 vector<string> hostnames; // All encountered hostnames
2900 750 vector<string> proxy_groups;
2901
1/2
✓ Branch 1 taken 750 times.
✗ Branch 2 not taken.
750 if (all_proxy_list != "")
2902
1/2
✓ Branch 1 taken 750 times.
✗ Branch 2 not taken.
750 proxy_groups = SplitString(all_proxy_list, ';');
2903
2/2
✓ Branch 1 taken 752 times.
✓ Branch 2 taken 750 times.
1502 for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2904
1/2
✓ Branch 2 taken 752 times.
✗ Branch 3 not taken.
752 vector<string> this_group = SplitString(proxy_groups[i], '|');
2905
2/2
✓ Branch 1 taken 752 times.
✓ Branch 2 taken 752 times.
1504 for (unsigned j = 0; j < this_group.size(); ++j) {
2906
1/2
✓ Branch 2 taken 752 times.
✗ Branch 3 not taken.
752 this_group[j] = dns::AddDefaultScheme(this_group[j]);
2907 // Note: DIRECT strings will be "extracted" to an empty string.
2908
1/2
✓ Branch 2 taken 752 times.
✗ Branch 3 not taken.
752 const string hostname = dns::ExtractHost(this_group[j]);
2909 // Save the hostname. Leave empty (DIRECT) names so indexes will
2910 // match later.
2911
1/2
✓ Branch 1 taken 752 times.
✗ Branch 2 not taken.
752 hostnames.push_back(hostname);
2912 752 }
2913 752 }
2914 750 vector<dns::Host> hosts;
2915
1/2
✓ Branch 3 taken 750 times.
✗ Branch 4 not taken.
750 LogCvmfs(kLogDownload, kLogDebug,
2916 "(manager '%s') "
2917 "resolving %lu proxy addresses",
2918 name_.c_str(), hostnames.size());
2919
1/2
✓ Branch 1 taken 750 times.
✗ Branch 2 not taken.
750 resolver_->ResolveMany(hostnames, &hosts);
2920
2921 // Construct opt_proxy_groups_: traverse proxy list in same order and expand
2922 // names to resolved IP addresses.
2923
1/2
✓ Branch 1 taken 750 times.
✗ Branch 2 not taken.
750 opt_proxy_groups_ = new vector<vector<ProxyInfo> >();
2924 750 opt_num_proxies_ = 0;
2925 750 unsigned num_proxy = 0; // Combined i, j counter
2926
2/2
✓ Branch 1 taken 752 times.
✓ Branch 2 taken 750 times.
1502 for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2927
1/2
✓ Branch 2 taken 752 times.
✗ Branch 3 not taken.
752 vector<string> this_group = SplitString(proxy_groups[i], '|');
2928 // Construct ProxyInfo objects from proxy string and DNS resolver result for
2929 // every proxy in this_group. One URL can result in multiple ProxyInfo
2930 // objects, one for each IP address.
2931 752 vector<ProxyInfo> infos;
2932
2/2
✓ Branch 1 taken 752 times.
✓ Branch 2 taken 752 times.
1504 for (unsigned j = 0; j < this_group.size(); ++j, ++num_proxy) {
2933
1/2
✓ Branch 2 taken 752 times.
✗ Branch 3 not taken.
752 this_group[j] = dns::AddDefaultScheme(this_group[j]);
2934
2/2
✓ Branch 2 taken 744 times.
✓ Branch 3 taken 8 times.
752 if (this_group[j] == "DIRECT") {
2935
3/6
✓ Branch 2 taken 744 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 744 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 744 times.
✗ Branch 9 not taken.
744 infos.push_back(ProxyInfo("DIRECT"));
2936 744 continue;
2937 }
2938
2939
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 8 times.
8 if (hosts[num_proxy].status() != dns::kFailOk) {
2940 LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2941 "(manager '%s') "
2942 "failed to resolve IP addresses for %s (%d - %s)",
2943 name_.c_str(), hosts[num_proxy].name().c_str(),
2944 hosts[num_proxy].status(),
2945 dns::Code2Ascii(hosts[num_proxy].status()));
2946 const dns::Host failed_host = dns::Host::ExtendDeadline(
2947 hosts[num_proxy], resolver_->min_ttl());
2948 infos.push_back(ProxyInfo(failed_host, this_group[j]));
2949 continue;
2950 }
2951
2952 // IPv4 addresses have precedence
2953 8 set<string> const best_addresses = hosts[num_proxy].ViewBestAddresses(
2954
2/4
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 8 times.
✗ Branch 5 not taken.
8 opt_ip_preference_);
2955 8 set<string>::const_iterator iter_ips = best_addresses.begin();
2956
2/2
✓ Branch 3 taken 8 times.
✓ Branch 4 taken 8 times.
16 for (; iter_ips != best_addresses.end(); ++iter_ips) {
2957
1/2
✓ Branch 3 taken 8 times.
✗ Branch 4 not taken.
8 const string url_ip = dns::RewriteUrl(this_group[j], *iter_ips);
2958
2/4
✓ Branch 2 taken 8 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 8 times.
✗ Branch 6 not taken.
8 infos.push_back(ProxyInfo(hosts[num_proxy], url_ip));
2959
2960
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 8 times.
8 if (sharding_policy_.UseCount() > 0) {
2961 sharding_policy_->AddProxy(url_ip);
2962 }
2963 8 }
2964 8 }
2965
1/2
✓ Branch 1 taken 752 times.
✗ Branch 2 not taken.
752 opt_proxy_groups_->push_back(infos);
2966 752 opt_num_proxies_ += infos.size();
2967 752 }
2968
1/2
✓ Branch 2 taken 750 times.
✗ Branch 3 not taken.
750 LogCvmfs(kLogDownload, kLogDebug,
2969 "(manager '%s') installed %u proxies in %lu load-balance groups",
2970 750 name_.c_str(), opt_num_proxies_, opt_proxy_groups_->size());
2971 750 opt_proxy_groups_current_ = 0;
2972 750 opt_proxy_groups_current_burned_ = 0;
2973
2974 // Select random start proxy from the first group.
2975
1/2
✓ Branch 1 taken 750 times.
✗ Branch 2 not taken.
750 if (opt_proxy_groups_->size() > 0) {
2976 // Select random start proxy from the first group.
2977
2/4
✓ Branch 2 taken 750 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 750 times.
✗ Branch 6 not taken.
750 UpdateProxiesUnlocked("set random start proxy from the first proxy group");
2978 }
2979
3/6
✓ Branch 5 taken 750 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 750 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 750 times.
✗ Branch 12 not taken.
750 }
2980
2981
2982 /**
2983 * Retrieves the proxy chain, optionally the currently active load-balancing
2984 * group, and optionally the index of the first fallback proxy group.
2985 * If there are no fallback proxies, the index will equal the size of
2986 * the proxy chain.
2987 */
2988 void DownloadManager::GetProxyInfo(vector<vector<ProxyInfo> > *proxy_chain,
2989 unsigned *current_group,
2990 unsigned *fallback_group) {
2991 assert(proxy_chain != NULL);
2992 const MutexLockGuard m(lock_options_);
2993
2994 if (!opt_proxy_groups_) {
2995 const vector<vector<ProxyInfo> > empty_chain;
2996 *proxy_chain = empty_chain;
2997 if (current_group != NULL)
2998 *current_group = 0;
2999 if (fallback_group != NULL)
3000 *fallback_group = 0;
3001 return;
3002 }
3003
3004 *proxy_chain = *opt_proxy_groups_;
3005 if (current_group != NULL)
3006 *current_group = opt_proxy_groups_current_;
3007 if (fallback_group != NULL)
3008 *fallback_group = opt_proxy_groups_fallback_;
3009 }
3010
3011 string DownloadManager::GetProxyList() { return opt_proxy_list_; }
3012
3013 string DownloadManager::GetFallbackProxyList() {
3014 return opt_proxy_fallback_list_;
3015 }
3016
3017 /**
3018 * Choose proxy
3019 */
3020 2773 DownloadManager::ProxyInfo *DownloadManager::ChooseProxyUnlocked(
3021 const shash::Any *hash) {
3022
2/2
✓ Branch 0 taken 2009 times.
✓ Branch 1 taken 764 times.
2773 if (!opt_proxy_groups_)
3023 2009 return NULL;
3024
3025
2/2
✓ Branch 0 taken 376 times.
✓ Branch 1 taken 388 times.
764 const uint32_t key = (hash ? hash->Partial32() : 0);
3026
1/2
✓ Branch 1 taken 764 times.
✗ Branch 2 not taken.
764 const map<uint32_t, ProxyInfo *>::iterator it = opt_proxy_map_.lower_bound(
3027 key);
3028 764 ProxyInfo *proxy = it->second;
3029
3030 764 return proxy;
3031 }
3032
3033 /**
3034 * Update currently selected proxy
3035 */
3036 1124 void DownloadManager::UpdateProxiesUnlocked(const string &reason) {
3037
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1124 times.
1124 if (!opt_proxy_groups_)
3038 return;
3039
3040 // Identify number of non-burned proxies within the current group
3041 1124 vector<ProxyInfo> *group = current_proxy_group();
3042 1124 const unsigned num_alive = (group->size() - opt_proxy_groups_current_burned_);
3043
2/4
✓ Branch 2 taken 1124 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1124 times.
✗ Branch 6 not taken.
2248 const string old_proxy = JoinStrings(opt_proxies_, "|");
3044
3045 // Rebuild proxy map and URL list
3046 1124 opt_proxy_map_.clear();
3047 1124 opt_proxies_.clear();
3048 1124 const uint32_t max_key = 0xffffffffUL;
3049
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1124 times.
1124 if (opt_proxy_shard_) {
3050 // Build a consistent map with multiple entries for each proxy
3051 for (unsigned i = 0; i < num_alive; ++i) {
3052 ProxyInfo *proxy = &(*group)[i];
3053 shash::Any proxy_hash(shash::kSha1);
3054 HashString(proxy->url, &proxy_hash);
3055 Prng prng;
3056 prng.InitSeed(proxy_hash.Partial32());
3057 for (unsigned j = 0; j < kProxyMapScale; ++j) {
3058 const std::pair<uint32_t, ProxyInfo *> entry(prng.Next(max_key), proxy);
3059 opt_proxy_map_.insert(entry);
3060 }
3061 const std::string proxy_name = proxy->host.name().empty()
3062 ? ""
3063 : " (" + proxy->host.name() + ")";
3064 opt_proxies_.push_back(proxy->url + proxy_name);
3065 }
3066 // Ensure lower_bound() finds a value for all keys
3067 ProxyInfo *first_proxy = opt_proxy_map_.begin()->second;
3068 const std::pair<uint32_t, ProxyInfo *> last_entry(max_key, first_proxy);
3069 opt_proxy_map_.insert(last_entry);
3070 } else {
3071 // Build a map with a single entry for one randomly selected proxy
3072 1124 const unsigned select = prng_.Next(num_alive);
3073 1124 ProxyInfo *proxy = &(*group)[select];
3074 1124 const std::pair<uint32_t, ProxyInfo *> entry(max_key, proxy);
3075
1/2
✓ Branch 1 taken 1124 times.
✗ Branch 2 not taken.
1124 opt_proxy_map_.insert(entry);
3076 1124 const std::string proxy_name = proxy->host.name().empty()
3077 ? ""
3078
9/18
✓ Branch 0 taken 1116 times.
✓ Branch 1 taken 8 times.
✓ Branch 4 taken 1116 times.
✗ Branch 5 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 8 times.
✗ Branch 9 not taken.
✗ Branch 10 not taken.
✓ Branch 11 taken 8 times.
✗ Branch 12 not taken.
✓ Branch 13 taken 8 times.
✓ Branch 14 taken 1116 times.
✓ Branch 15 taken 1116 times.
✓ Branch 16 taken 8 times.
✗ Branch 17 not taken.
✗ Branch 18 not taken.
✗ Branch 19 not taken.
✗ Branch 20 not taken.
1132 : " (" + proxy->host.name() + ")";
3079
2/4
✓ Branch 1 taken 1124 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1124 times.
✗ Branch 5 not taken.
1124 opt_proxies_.push_back(proxy->url + proxy_name);
3080 1124 }
3081
1/2
✓ Branch 3 taken 1124 times.
✗ Branch 4 not taken.
1124 sort(opt_proxies_.begin(), opt_proxies_.end());
3082
3083 // Report any change in proxy usage
3084
2/4
✓ Branch 2 taken 1124 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1124 times.
✗ Branch 6 not taken.
2248 const string new_proxy = JoinStrings(opt_proxies_, "|");
3085 const string curr_host = "Current host: "
3086 1124 + (opt_host_.chain
3087
6/12
✓ Branch 0 taken 1116 times.
✓ Branch 1 taken 8 times.
✓ Branch 4 taken 1116 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 8 times.
✗ Branch 9 not taken.
✓ Branch 10 taken 8 times.
✓ Branch 11 taken 1116 times.
✗ Branch 12 not taken.
✗ Branch 13 not taken.
2248 ? (*opt_host_.chain)[opt_host_.current]
3088
1/2
✓ Branch 1 taken 1124 times.
✗ Branch 2 not taken.
1124 : "");
3089
2/2
✓ Branch 1 taken 752 times.
✓ Branch 2 taken 372 times.
1124 if (new_proxy != old_proxy) {
3090
4/9
✗ Branch 2 not taken.
✓ Branch 3 taken 752 times.
✓ Branch 4 taken 750 times.
✓ Branch 5 taken 2 times.
✗ Branch 7 not taken.
✓ Branch 8 taken 752 times.
✗ Branch 9 not taken.
✗ Branch 12 not taken.
✗ Branch 13 not taken.
2258 LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
3091 "(manager '%s') switching proxy from %s to %s. Reason: %s [%s]",
3092 754 name_.c_str(), (old_proxy.empty() ? "(none)" : old_proxy.c_str()),
3093 1504 (new_proxy.empty() ? "(none)" : new_proxy.c_str()), reason.c_str(),
3094 curr_host.c_str());
3095 }
3096 1124 }
3097
3098 /**
3099 * Enable proxy sharding
3100 */
3101 void DownloadManager::ShardProxies() {
3102 opt_proxy_shard_ = true;
3103 RebalanceProxiesUnlocked("enable sharding");
3104 }
3105
3106 /**
3107 * Selects a new random proxy in the current load-balancing group. Resets the
3108 * "burned" counter.
3109 */
3110 void DownloadManager::RebalanceProxiesUnlocked(const string &reason) {
3111 if (!opt_proxy_groups_)
3112 return;
3113
3114 opt_timestamp_failover_proxies_ = 0;
3115 opt_proxy_groups_current_burned_ = 0;
3116 UpdateProxiesUnlocked(reason);
3117 }
3118
3119
3120 void DownloadManager::RebalanceProxies() {
3121 const MutexLockGuard m(lock_options_);
3122 RebalanceProxiesUnlocked("rebalance invoked manually");
3123 }
3124
3125
3126 /**
3127 * Switches to the next load-balancing group of proxy servers.
3128 */
3129 void DownloadManager::SwitchProxyGroup() {
3130 const MutexLockGuard m(lock_options_);
3131
3132 if (!opt_proxy_groups_ || (opt_proxy_groups_->size() < 2)) {
3133 return;
3134 }
3135
3136 opt_proxy_groups_current_ = (opt_proxy_groups_current_ + 1)
3137 % opt_proxy_groups_->size();
3138 opt_timestamp_backup_proxies_ = time(NULL);
3139
3140 const std::string msg = "switch to proxy group "
3141 + StringifyUint(opt_proxy_groups_current_);
3142 RebalanceProxiesUnlocked(msg);
3143 }
3144
3145
3146 void DownloadManager::SetProxyGroupResetDelay(const unsigned seconds) {
3147 const MutexLockGuard m(lock_options_);
3148 opt_proxy_groups_reset_after_ = seconds;
3149 if (opt_proxy_groups_reset_after_ == 0) {
3150 opt_timestamp_backup_proxies_ = 0;
3151 opt_timestamp_failover_proxies_ = 0;
3152 }
3153 }
3154
3155
3156 void DownloadManager::SetMetalinkResetDelay(const unsigned seconds) {
3157 const MutexLockGuard m(lock_options_);
3158 opt_metalink_.reset_after = seconds;
3159 if (opt_metalink_.reset_after == 0)
3160 opt_metalink_.timestamp_backup = 0;
3161 }
3162
3163
3164 void DownloadManager::SetHostResetDelay(const unsigned seconds) {
3165 const MutexLockGuard m(lock_options_);
3166 opt_host_.reset_after = seconds;
3167 if (opt_host_.reset_after == 0)
3168 opt_host_.timestamp_backup = 0;
3169 }
3170
3171
3172 945 void DownloadManager::SetRetryParameters(const unsigned max_retries,
3173 const unsigned backoff_init_ms,
3174 const unsigned backoff_max_ms) {
3175 945 const MutexLockGuard m(lock_options_);
3176 945 opt_max_retries_ = max_retries;
3177 945 opt_backoff_init_ms_ = backoff_init_ms;
3178 945 opt_backoff_max_ms_ = backoff_max_ms;
3179 945 }
3180
3181
3182 376 void DownloadManager::SetMaxIpaddrPerProxy(unsigned limit) {
3183 376 const MutexLockGuard m(lock_options_);
3184 376 resolver_->set_throttle(limit);
3185 376 }
3186
3187
3188 622 void DownloadManager::SetProxyTemplates(const std::string &direct,
3189 const std::string &forced) {
3190 622 const MutexLockGuard m(lock_options_);
3191
1/2
✓ Branch 1 taken 622 times.
✗ Branch 2 not taken.
622 proxy_template_direct_ = direct;
3192
1/2
✓ Branch 1 taken 622 times.
✗ Branch 2 not taken.
622 proxy_template_forced_ = forced;
3193 622 }
3194
3195
3196 void DownloadManager::EnableInfoHeader() { enable_info_header_ = true; }
3197
3198
3199 329 void DownloadManager::EnableRedirects() { follow_redirects_ = true; }
3200
3201 void DownloadManager::EnableIgnoreSignatureFailures() {
3202 ignore_signature_failures_ = true;
3203 }
3204
3205 void DownloadManager::EnableHTTPTracing() { enable_http_tracing_ = true; }
3206
3207 void DownloadManager::AddHTTPTracingHeader(const std::string &header) {
3208 http_tracing_headers_.push_back(header);
3209 }
3210
3211 323 void DownloadManager::UseSystemCertificatePath() {
3212 323 ssl_certificate_store_.UseSystemCertificatePath();
3213 323 }
3214
3215 bool DownloadManager::SetShardingPolicy(const ShardingPolicySelector type) {
3216 const bool success = false;
3217 switch (type) {
3218 default:
3219 LogCvmfs(
3220 kLogDownload, kLogDebug | kLogSyslogErr,
3221 "(manager '%s') "
3222 "Proposed sharding policy does not exist. Falling back to default",
3223 name_.c_str());
3224 }
3225 return success;
3226 }
3227
3228 void DownloadManager::SetFailoverIndefinitely() {
3229 failover_indefinitely_ = true;
3230 }
3231
3232 /**
3233 * Creates a copy of the existing download manager. Must only be called in
3234 * single-threaded stage because it calls curl_global_init().
3235 */
3236 376 DownloadManager *DownloadManager::Clone(
3237 const perf::StatisticsTemplate &statistics,
3238 const std::string &cloned_name) {
3239 DownloadManager *clone = new DownloadManager(pool_max_handles_, statistics,
3240
1/2
✓ Branch 2 taken 376 times.
✗ Branch 3 not taken.
376 cloned_name);
3241
3242 376 clone->SetDnsParameters(resolver_->retries(), resolver_->timeout_ms());
3243 376 clone->SetDnsTtlLimits(resolver_->min_ttl(), resolver_->max_ttl());
3244 376 clone->SetMaxIpaddrPerProxy(resolver_->throttle());
3245
3246
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 376 times.
376 if (!opt_dns_server_.empty())
3247 clone->SetDnsServer(opt_dns_server_);
3248 376 clone->opt_timeout_proxy_ = opt_timeout_proxy_;
3249 376 clone->opt_timeout_direct_ = opt_timeout_direct_;
3250 376 clone->opt_low_speed_limit_ = opt_low_speed_limit_;
3251 376 clone->opt_max_retries_ = opt_max_retries_;
3252 376 clone->opt_backoff_init_ms_ = opt_backoff_init_ms_;
3253 376 clone->opt_backoff_max_ms_ = opt_backoff_max_ms_;
3254 376 clone->enable_info_header_ = enable_info_header_;
3255 376 clone->enable_http_tracing_ = enable_http_tracing_;
3256 376 clone->http_tracing_headers_ = http_tracing_headers_;
3257 376 clone->follow_redirects_ = follow_redirects_;
3258 376 clone->ignore_signature_failures_ = ignore_signature_failures_;
3259
2/2
✓ Branch 0 taken 372 times.
✓ Branch 1 taken 4 times.
376 if (opt_host_.chain) {
3260
1/2
✓ Branch 2 taken 372 times.
✗ Branch 3 not taken.
372 clone->opt_host_.chain = new vector<string>(*opt_host_.chain);
3261
1/2
✓ Branch 2 taken 372 times.
✗ Branch 3 not taken.
372 clone->opt_host_chain_rtt_ = new vector<int>(*opt_host_chain_rtt_);
3262 }
3263
3264 376 CloneProxyConfig(clone);
3265 376 clone->opt_ip_preference_ = opt_ip_preference_;
3266 376 clone->proxy_template_direct_ = proxy_template_direct_;
3267 376 clone->proxy_template_forced_ = proxy_template_forced_;
3268 376 clone->opt_proxy_groups_reset_after_ = opt_proxy_groups_reset_after_;
3269 376 clone->opt_metalink_.reset_after = opt_metalink_.reset_after;
3270 376 clone->opt_host_.reset_after = opt_host_.reset_after;
3271 376 clone->credentials_attachment_ = credentials_attachment_;
3272 376 clone->ssl_certificate_store_ = ssl_certificate_store_;
3273
3274 376 clone->health_check_ = health_check_;
3275 376 clone->sharding_policy_ = sharding_policy_;
3276 376 clone->failover_indefinitely_ = failover_indefinitely_;
3277 376 clone->fqrn_ = fqrn_;
3278
3279 376 return clone;
3280 }
3281
3282
3283 376 void DownloadManager::CloneProxyConfig(DownloadManager *clone) {
3284 376 clone->opt_proxy_groups_current_ = opt_proxy_groups_current_;
3285 376 clone->opt_proxy_groups_current_burned_ = opt_proxy_groups_current_burned_;
3286 376 clone->opt_proxy_groups_fallback_ = opt_proxy_groups_fallback_;
3287 376 clone->opt_num_proxies_ = opt_num_proxies_;
3288 376 clone->opt_proxy_shard_ = opt_proxy_shard_;
3289 376 clone->opt_proxy_list_ = opt_proxy_list_;
3290 376 clone->opt_proxy_fallback_list_ = opt_proxy_fallback_list_;
3291
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 372 times.
376 if (opt_proxy_groups_ == NULL)
3292 4 return;
3293
3294
1/2
✓ Branch 2 taken 372 times.
✗ Branch 3 not taken.
372 clone->opt_proxy_groups_ = new vector<vector<ProxyInfo> >(*opt_proxy_groups_);
3295
2/4
✓ Branch 2 taken 372 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 372 times.
✗ Branch 6 not taken.
372 clone->UpdateProxiesUnlocked("cloned");
3296 }
3297
3298 } // namespace download
3299