GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/network/download.cc
Date: 2026-06-28 02:36:10
Exec Total Coverage
Lines: 878 1767 49.7%
Branches: 672 2288 29.4%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 *
4 * The download module provides an interface for fetching files via HTTP
5 * and file. It is internally using libcurl and the asynchronous DNS resolver
6 * c-ares. The JobInfo struct describes a single file/url to download and
7 * keeps the state during the several phases of downloading.
8 *
9 * The module starts in single-threaded mode and can be switched to multi-
10 * threaded mode by Spawn(). In multi-threaded mode, the Fetch() function still
11 * blocks but there is a separate I/O thread using asynchronous I/O, which
12 * maintains all concurrent connections simultaneously. As there might be more
13 * than 1024 file descriptors for the CernVM-FS process, the I/O thread uses
14 * poll and the libcurl multi socket interface.
15 *
16 * While downloading, files can be decompressed and the secure hash can be
17 * calculated on the fly.
18 *
19 * The module also implements failure handling. If corrupted data has been
20 * downloaded, the transfer is restarted using HTTP "no-cache" pragma.
21 * A "host chain" can be configured. When a host fails, there is automatic
22 * fail-over to the next host in the chain until all hosts are probed.
23 * Similarly a chain of proxy sets can be configured. Inside a proxy set,
24 * proxies are selected randomly (load-balancing set).
25 */
26
27 // TODO(jblomer): MS for time summing
28
29 #include "download.h"
30
31 #include <alloca.h>
32 #include <errno.h>
33 #include <inttypes.h>
34 #include <poll.h>
35 #include <pthread.h>
36 #include <signal.h>
37 #include <stdint.h>
38 #include <sys/time.h>
39 #include <unistd.h>
40
41 #include <algorithm>
42 #include <cassert>
43 #include <cstdio>
44 #include <cstdlib>
45 #include <cstring>
46 #include <map>
47 #include <set>
48 #include <utility>
49
50 #include "compression/compression.h"
51 #include "crypto/hash.h"
52 #include "duplex_curl.h" // IWYU pragma: keep
53 #include "interrupt.h"
54 #include "network/sink_mem.h"
55 #include "network/sink_path.h"
56 #include "sanitizer.h"
57 #include "ssl.h"
58 #include "util/algorithm.h"
59 #include "util/atomic.h"
60 #include "util/exception.h"
61 #include "util/logging.h"
62 #include "util/posix.h"
63 #include "util/prng.h"
64 #include "util/smalloc.h"
65 #include "util/string.h"
66
67 using namespace std; // NOLINT
68
69 namespace download {
70
71 /**
72 * Returns the status if an interrupt happened for a given repository.
73 *
74 * Used only in case CVMFS_FAILOVER_INDEFINITELY (failover_indefinitely_) is set
75 * where failed downloads are retried indefinitely, unless an interrupt occurred
76 *
77 * @note If you use this functionality you need to change the source code of
78 * e.g. cvmfs_config reload to create a sentinel file. See comment below.
79 *
80 * @return true if an interrupt occurred
81 * false otherwise
82 */
83 bool Interrupted(const std::string &fqrn, JobInfo *info) {
84 if (info->allow_failure()) {
85 return true;
86 }
87
88 if (!fqrn.empty()) {
89 // it is up to the user the create this sentinel file ("pause_file") if
90 // CVMFS_FAILOVER_INDEFINITELY is used. It must be created during
91 // "cvmfs_config reload" and "cvmfs_config reload $fqrn"
92 const std::string pause_file = std::string("/var/run/cvmfs/interrupt.")
93 + fqrn;
94
95 LogCvmfs(kLogDownload, kLogDebug,
96 "(id %" PRId64 ") Interrupted(): checking for existence of %s",
97 info->id(), pause_file.c_str());
98 if (FileExists(pause_file)) {
99 LogCvmfs(kLogDownload, kLogDebug,
100 "(id %" PRId64 ") Interrupt marker found - "
101 "Interrupting current download, this will EIO outstanding IO.",
102 info->id());
103 if (0 != unlink(pause_file.c_str())) {
104 LogCvmfs(kLogDownload, kLogDebug,
105 "(id %" PRId64 ") Couldn't delete interrupt marker: errno=%d",
106 info->id(), errno);
107 }
108 return true;
109 }
110 }
111 return false;
112 }
113
114 4164 static Failures PrepareDownloadDestination(JobInfo *info) {
115
3/6
✓ Branch 1 taken 4164 times.
✗ Branch 2 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 4164 times.
✗ Branch 7 not taken.
✓ Branch 8 taken 4164 times.
4164 if (info->sink() != NULL && !info->sink()->IsValid()) {
116 cvmfs::PathSink *psink = dynamic_cast<cvmfs::PathSink *>(info->sink());
117 if (psink != NULL) {
118 LogCvmfs(kLogDownload, kLogDebug,
119 "(id %" PRId64 ") Failed to open path %s: %s (errno=%d).",
120 info->id(), psink->path().c_str(), strerror(errno), errno);
121 return kFailLocalIO;
122 } else {
123 LogCvmfs(kLogDownload, kLogDebug,
124 "(id %" PRId64 ") "
125 "Failed to create a valid sink: \n %s",
126 info->id(), info->sink()->Describe().c_str());
127 return kFailOther;
128 }
129 }
130
131 4164 return kFailOk;
132 }
133
134
135 /**
136 * Called by curl for every HTTP header. Not called for file:// transfers.
137 */
138 12121 static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb,
139 void *info_link) {
140 12121 const size_t num_bytes = size * nmemb;
141
1/2
✓ Branch 2 taken 12121 times.
✗ Branch 3 not taken.
12121 const string header_line(static_cast<const char *>(ptr), num_bytes);
142 12121 JobInfo *info = static_cast<JobInfo *>(info_link);
143
144 // LogCvmfs(kLogDownload, kLogDebug, "REMOVE-ME: Header callback with %s",
145 // header_line.c_str());
146
147 // Check http status codes
148
4/6
✓ Branch 2 taken 12121 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 12121 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 96 times.
✓ Branch 10 taken 12025 times.
12121 if (HasPrefix(header_line, "HTTP/1.", false)) {
149
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 96 times.
96 if (header_line.length() < 10) {
150 return 0;
151 }
152
153 unsigned i;
154
5/6
✓ Branch 1 taken 192 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 96 times.
✓ Branch 5 taken 96 times.
✓ Branch 6 taken 96 times.
✓ Branch 7 taken 96 times.
192 for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {
155 }
156
157 // Code is initialized to -1
158
1/2
✓ Branch 1 taken 96 times.
✗ Branch 2 not taken.
96 if (header_line.length() > i + 2) {
159 96 info->SetHttpCode(DownloadManager::ParseHttpCode(&header_line[i]));
160 }
161
162
2/2
✓ Branch 1 taken 72 times.
✓ Branch 2 taken 24 times.
96 if ((info->http_code() / 100) == 2) {
163 72 return num_bytes;
164
0/2
✗ Branch 2 not taken.
✗ Branch 3 not taken.
24 } else if ((info->http_code() == 301) || (info->http_code() == 302)
165
2/8
✗ Branch 0 not taken.
✓ Branch 1 taken 24 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 24 times.
✗ Branch 9 not taken.
24 || (info->http_code() == 303) || (info->http_code() == 307)) {
166
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 24 times.
24 if (!info->follow_redirects()) {
167 LogCvmfs(kLogDownload, kLogDebug,
168 "(id %" PRId64 ") redirect support not enabled: %s",
169 info->id(), header_line.c_str());
170 info->SetErrorCode(kFailHostHttp);
171 return 0;
172 }
173
1/2
✓ Branch 3 taken 24 times.
✗ Branch 4 not taken.
24 LogCvmfs(kLogDownload, kLogDebug, "(id %" PRId64 ") http redirect: %s",
174 info->id(), header_line.c_str());
175 // libcurl will handle this because of CURLOPT_FOLLOWLOCATION
176 24 return num_bytes;
177 } else {
178 LogCvmfs(kLogDownload, kLogDebug,
179 "(id %" PRId64 ") http status error code: %s [%d]", info->id(),
180 header_line.c_str(), info->http_code());
181 if (((info->http_code() / 100) == 5) || (info->http_code() == 400)
182 || (info->http_code() == 404)) {
183 // 5XX returned by host
184 // 400: error from the GeoAPI module
185 // 404: the stratum 1 does not have the newest files
186 info->SetErrorCode(kFailHostHttp);
187 } else if (info->http_code() == 429) {
188 // 429: rate throttling (we ignore the backoff hint for the time being)
189 info->SetErrorCode(kFailHostConnection);
190 } else {
191 info->SetErrorCode((info->proxy() == "DIRECT") ? kFailHostHttp
192 : kFailProxyHttp);
193 }
194 return 0;
195 }
196 }
197
198 // If needed: allocate space in sink
199
3/4
✓ Branch 2 taken 12025 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 2062 times.
✓ Branch 5 taken 9963 times.
12025 if (info->sink() != NULL && info->sink()->RequiresReserve()
200
11/20
✓ Branch 1 taken 12025 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 2062 times.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 2062 times.
✗ Branch 9 not taken.
✓ Branch 10 taken 658 times.
✓ Branch 11 taken 1404 times.
✓ Branch 12 taken 2062 times.
✓ Branch 13 taken 9963 times.
✗ Branch 14 not taken.
✓ Branch 15 taken 2062 times.
✓ Branch 16 taken 9963 times.
✓ Branch 18 taken 658 times.
✓ Branch 19 taken 11367 times.
✗ Branch 20 not taken.
✗ Branch 21 not taken.
✗ Branch 23 not taken.
✗ Branch 24 not taken.
24050 && HasPrefix(header_line, "CONTENT-LENGTH:", true)) {
201 658 char *tmp = reinterpret_cast<char *>(alloca(num_bytes + 1));
202 658 uint64_t length = 0;
203 658 sscanf(header_line.c_str(), "%s %" PRIu64, tmp, &length);
204
1/2
✓ Branch 0 taken 658 times.
✗ Branch 1 not taken.
658 if (length > 0) {
205
2/4
✓ Branch 2 taken 658 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 658 times.
658 if (!info->sink()->Reserve(length)) {
206 LogCvmfs(kLogDownload, kLogDebug | kLogSyslogErr,
207 "(id %" PRId64 ") "
208 "resource %s too large to store in memory (%" PRIu64 ")",
209 info->id(), info->url()->c_str(), length);
210 info->SetErrorCode(kFailTooBig);
211 return 0;
212 }
213 } else {
214 // Empty resource
215 info->sink()->Reserve(0);
216 }
217
4/6
✓ Branch 2 taken 11367 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 11367 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 24 times.
✓ Branch 10 taken 11343 times.
11367 } else if (HasPrefix(header_line, "LOCATION:", true)) {
218 // This comes along with redirects
219
1/2
✓ Branch 3 taken 24 times.
✗ Branch 4 not taken.
24 LogCvmfs(kLogDownload, kLogDebug, "(id %" PRId64 ") %s", info->id(),
220 header_line.c_str());
221
3/6
✓ Branch 2 taken 11343 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 11343 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 11343 times.
11343 } else if (HasPrefix(header_line, "LINK:", true)) {
222 // This is metalink info
223 LogCvmfs(kLogDownload, kLogDebug, "(id %" PRId64 ") %s", info->id(),
224 header_line.c_str());
225 std::string link = info->link();
226 if (link.size() != 0) {
227 // multiple LINK headers are allowed
228 link = link + ", " + header_line.substr(5);
229 } else {
230 link = header_line.substr(5);
231 }
232 info->SetLink(link);
233
3/6
✓ Branch 3 taken 11343 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 11343 times.
✗ Branch 7 not taken.
✗ Branch 10 not taken.
✓ Branch 11 taken 11343 times.
11343 } else if (HasPrefix(header_line, "X-SQUID-ERROR:", true)) {
234 // Reinterpret host error as proxy error
235 if (info->error_code() == kFailHostHttp) {
236 info->SetErrorCode(kFailProxyHttp);
237 }
238
3/6
✓ Branch 2 taken 11343 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 11343 times.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 11343 times.
11343 } else if (HasPrefix(header_line, "PROXY-STATUS:", true)) {
239 // Reinterpret host error as proxy error if applicable
240 if ((info->error_code() == kFailHostHttp)
241 && (header_line.find("error=") != string::npos)) {
242 info->SetErrorCode(kFailProxyHttp);
243 }
244 }
245
246 12025 return num_bytes;
247 12121 }
248
249
250 /**
251 * Called by curl for every received data chunk.
252 *
253 * In multi-threaded mode the chunk's hash is updated here (cheap) and the
254 * raw bytes are handed off to the calling Fetch() thread via JobInfo's
255 * data tube; the (relatively expensive) zlib decompression and sink->Write
256 * happen on that caller thread. This lets multiple callers — e.g. main
257 * thread plus N file-bundle prefetch workers — run their decompression in
258 * parallel instead of serializing on this single MainDownload thread.
259 *
260 * In single-threaded mode (no MainDownload thread, e.g. unit tests) the
261 * old inline path is kept.
262 */
263 4255 static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb,
264 void *info_link) {
265 4255 const size_t num_bytes = size * nmemb;
266 4255 JobInfo *info = static_cast<JobInfo *>(info_link);
267
268 // TODO(heretherebedragons) remove if no error comes up
269 // as this means only jobinfo data request (and not header only)
270 // come here
271
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4255 times.
4255 assert(info->sink() != NULL);
272
273
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4255 times.
4255 if (num_bytes == 0)
274 return 0;
275
276
2/2
✓ Branch 1 taken 3421 times.
✓ Branch 2 taken 834 times.
4255 if (info->expected_hash()) {
277 3421 shash::Update(reinterpret_cast<unsigned char *>(ptr), num_bytes,
278 info->hash_context());
279 }
280
281
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4255 times.
4255 if (info->IsValidDataTube()) {
282 // Parallel-decompress path: copy bytes onto a buffer the caller will own
283 // and enqueue. The caller thread (DownloadManager::Fetch) pops these
284 // elements and runs DecompressZStream2Sink / sink->Write itself.
285 char *buf = new char[num_bytes];
286 memcpy(buf, ptr, num_bytes);
287 DataTubeElement *ele = new DataTubeElement(buf, num_bytes,
288 kActionDecompress);
289 info->GetDataTubePtr()->EnqueueBack(ele);
290 return num_bytes;
291 }
292
293
2/2
✓ Branch 1 taken 3420 times.
✓ Branch 2 taken 835 times.
4255 if (info->compressed()) {
294 3420 const zlib::StreamStates retval = zlib::DecompressZStream2Sink(
295 ptr, static_cast<int64_t>(num_bytes), info->GetZstreamPtr(),
296 info->sink());
297
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3420 times.
3420 if (retval == zlib::kStreamDataError) {
298 LogCvmfs(kLogDownload, kLogSyslogErr,
299 "(id %" PRId64 ") failed to decompress %s", info->id(),
300 info->url()->c_str());
301 info->SetErrorCode(kFailBadData);
302 return 0;
303
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3420 times.
3420 } else if (retval == zlib::kStreamIOError) {
304 LogCvmfs(kLogDownload, kLogSyslogErr,
305 "(id %" PRId64 ") decompressing %s, local IO error", info->id(),
306 info->url()->c_str());
307 info->SetErrorCode(kFailLocalIO);
308 return 0;
309 }
310 } else {
311 835 const int64_t written = info->sink()->Write(ptr, num_bytes);
312
2/4
✓ Branch 0 taken 835 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 835 times.
835 if (written < 0 || static_cast<uint64_t>(written) != num_bytes) {
313 LogCvmfs(kLogDownload, kLogDebug,
314 "(id %" PRId64 ") "
315 "Failed to perform write of %zu bytes to sink %s with errno %ld",
316 info->id(), num_bytes, info->sink()->Describe().c_str(),
317 written);
318 }
319 }
320
321 4255 return num_bytes;
322 }
323
324 #ifdef DEBUGMSG
325 1191 static int CallbackCurlDebug(CURL *handle,
326 curl_infotype type,
327 char *data,
328 size_t size,
329 void * /* clientp */) {
330 JobInfo *info;
331
1/2
✓ Branch 1 taken 1191 times.
✗ Branch 2 not taken.
1191 curl_easy_getinfo(handle, CURLINFO_PRIVATE, &info);
332
333
3/6
✓ Branch 2 taken 1191 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1191 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 1191 times.
✗ Branch 9 not taken.
2382 std::string prefix = "(id " + StringifyInt(info->id()) + ") ";
334
4/8
✓ Branch 0 taken 431 times.
✓ Branch 1 taken 376 times.
✓ Branch 2 taken 96 times.
✓ Branch 3 taken 288 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
1191 switch (type) {
335 431 case CURLINFO_TEXT:
336
1/2
✓ Branch 1 taken 431 times.
✗ Branch 2 not taken.
431 prefix += "{info} ";
337 431 break;
338 376 case CURLINFO_HEADER_IN:
339
1/2
✓ Branch 1 taken 376 times.
✗ Branch 2 not taken.
376 prefix += "{header/recv} ";
340 376 break;
341 96 case CURLINFO_HEADER_OUT:
342
1/2
✓ Branch 1 taken 96 times.
✗ Branch 2 not taken.
96 prefix += "{header/sent} ";
343 96 break;
344 288 case CURLINFO_DATA_IN:
345
2/2
✓ Branch 0 taken 32 times.
✓ Branch 1 taken 256 times.
288 if (size < 50) {
346
1/2
✓ Branch 1 taken 32 times.
✗ Branch 2 not taken.
32 prefix += "{data/recv} ";
347 32 break;
348 } else {
349
1/2
✓ Branch 2 taken 256 times.
✗ Branch 3 not taken.
256 LogCvmfs(kLogCurl, kLogDebug, "%s{data/recv} <snip>", prefix.c_str());
350 256 return 0;
351 }
352 case CURLINFO_DATA_OUT:
353 if (size < 50) {
354 prefix += "{data/sent} ";
355 break;
356 } else {
357 LogCvmfs(kLogCurl, kLogDebug, "%s{data/sent} <snip>", prefix.c_str());
358 return 0;
359 }
360 case CURLINFO_SSL_DATA_IN:
361 if (size < 50) {
362 prefix += "{ssldata/recv} ";
363 break;
364 } else {
365 LogCvmfs(kLogCurl, kLogDebug, "%s{ssldata/recv} <snip>",
366 prefix.c_str());
367 return 0;
368 }
369 case CURLINFO_SSL_DATA_OUT:
370 if (size < 50) {
371 prefix += "{ssldata/sent} ";
372 break;
373 } else {
374 LogCvmfs(kLogCurl, kLogDebug, "%s{ssldata/sent} <snip>",
375 prefix.c_str());
376 return 0;
377 }
378 default:
379 // just log the message
380 break;
381 }
382
383 935 bool valid_char = true;
384
1/2
✓ Branch 2 taken 935 times.
✗ Branch 3 not taken.
935 std::string msg(data, size);
385
2/2
✓ Branch 1 taken 57737 times.
✓ Branch 2 taken 935 times.
58672 for (size_t i = 0; i < msg.length(); ++i) {
386
2/4
✓ Branch 1 taken 57737 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 57737 times.
57737 if (msg[i] == '\0') {
387 msg[i] = '~';
388 }
389
390 // verify that char is a valid printable char
391
3/6
✓ Branch 1 taken 57737 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 55338 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 55338 times.
113075 if ((msg[i] < ' ' || msg[i] > '~')
392
6/8
✓ Branch 0 taken 55338 times.
✓ Branch 1 taken 2399 times.
✓ Branch 3 taken 2399 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 984 times.
✓ Branch 6 taken 1415 times.
✗ Branch 7 not taken.
✓ Branch 8 taken 57737 times.
114059 && (msg[i] != 10 /*line feed*/
393
2/4
✓ Branch 1 taken 984 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 984 times.
984 && msg[i] != 13 /*carriage return*/)) {
394 valid_char = false;
395 }
396 }
397
398
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 935 times.
935 if (!valid_char) {
399 msg = "<Non-plaintext sequence>";
400 }
401
402
1/2
✓ Branch 3 taken 935 times.
✗ Branch 4 not taken.
935 LogCvmfs(kLogCurl, kLogDebug, "%s%s", prefix.c_str(),
403
1/2
✓ Branch 1 taken 935 times.
✗ Branch 2 not taken.
1870 Trim(msg, true /* trim_newline */).c_str());
404 935 return 0;
405 1191 }
406 #endif
407
408 //------------------------------------------------------------------------------
409
410
411 const int DownloadManager::kProbeUnprobed = -1;
412 const int DownloadManager::kProbeDown = -2;
413 const int DownloadManager::kProbeGeo = -3;
414
415 /**
416 * Escape special chars from the URL, except for ':' and '/',
417 * which should keep their meaning.
418 */
419 4196 string DownloadManager::EscapeUrl(const int64_t jobinfo_id, const string &url) {
420 4196 string escaped;
421
1/2
✓ Branch 2 taken 4196 times.
✗ Branch 3 not taken.
4196 escaped.reserve(url.length());
422
423 char escaped_char[3];
424
2/2
✓ Branch 1 taken 379167 times.
✓ Branch 2 taken 4196 times.
383363 for (unsigned i = 0, s = url.length(); i < s; ++i) {
425
3/4
✓ Branch 2 taken 379167 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 72 times.
✓ Branch 5 taken 379095 times.
379167 if (JobInfo::EscapeUrlChar(url[i], escaped_char)) {
426
1/2
✓ Branch 1 taken 72 times.
✗ Branch 2 not taken.
72 escaped.append(escaped_char, 3);
427 } else {
428
1/2
✓ Branch 1 taken 379095 times.
✗ Branch 2 not taken.
379095 escaped.push_back(escaped_char[0]);
429 }
430 }
431
1/2
✓ Branch 3 taken 4196 times.
✗ Branch 4 not taken.
4196 LogCvmfs(kLogDownload, kLogDebug, "(id %" PRId64 ") escaped %s to %s",
432 jobinfo_id, url.c_str(), escaped.c_str());
433
434 8392 return escaped;
435 }
436
437 /**
438 * -1 of digits is not a valid Http return code
439 */
440 136 int DownloadManager::ParseHttpCode(const char digits[3]) {
441 136 int result = 0;
442 136 int factor = 100;
443
2/2
✓ Branch 0 taken 408 times.
✓ Branch 1 taken 128 times.
536 for (int i = 0; i < 3; ++i) {
444
3/4
✓ Branch 0 taken 408 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 8 times.
✓ Branch 3 taken 400 times.
408 if ((digits[i] < '0') || (digits[i] > '9'))
445 8 return -1;
446 400 result += (digits[i] - '0') * factor;
447 400 factor /= 10;
448 }
449 128 return result;
450 }
451
452
453 /**
454 * Called when new curl sockets arrive or existing curl sockets depart.
455 */
456 int DownloadManager::CallbackCurlSocket(CURL * /* easy */,
457 curl_socket_t s,
458 int action,
459 void *userp,
460 void * /* socketp */) {
461 // LogCvmfs(kLogDownload, kLogDebug, "CallbackCurlSocket called with easy "
462 // "handle %p, socket %d, action %d", easy, s, action);
463 DownloadManager *download_mgr = static_cast<DownloadManager *>(userp);
464 if (action == CURL_POLL_NONE)
465 return 0;
466
467 // Find s in watch_fds_
468 unsigned index;
469
470 // TODO(heretherebedragons) why start at index = 0 and not 2?
471 // fd[0] and fd[1] are fixed?
472 for (index = 0; index < download_mgr->watch_fds_inuse_; ++index) {
473 if (download_mgr->watch_fds_[index].fd == s)
474 break;
475 }
476 // Or create newly
477 if (index == download_mgr->watch_fds_inuse_) {
478 // Extend array if necessary
479 if (download_mgr->watch_fds_inuse_ == download_mgr->watch_fds_size_) {
480 assert(download_mgr->watch_fds_size_ > 0);
481 download_mgr->watch_fds_size_ *= 2;
482 download_mgr->watch_fds_ = static_cast<struct pollfd *>(
483 srealloc(download_mgr->watch_fds_,
484 download_mgr->watch_fds_size_ * sizeof(struct pollfd)));
485 }
486 download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].fd = s;
487 download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].events = 0;
488 download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].revents = 0;
489 download_mgr->watch_fds_inuse_++;
490 }
491
492 switch (action) {
493 case CURL_POLL_IN:
494 download_mgr->watch_fds_[index].events = POLLIN | POLLPRI;
495 break;
496 case CURL_POLL_OUT:
497 download_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND;
498 break;
499 case CURL_POLL_INOUT:
500 download_mgr->watch_fds_[index].events = POLLIN | POLLPRI | POLLOUT
501 | POLLWRBAND;
502 break;
503 case CURL_POLL_REMOVE:
504 if (index < download_mgr->watch_fds_inuse_ - 1) {
505 download_mgr
506 ->watch_fds_[index] = download_mgr->watch_fds_
507 [download_mgr->watch_fds_inuse_ - 1];
508 }
509 download_mgr->watch_fds_inuse_--;
510 // Shrink array if necessary
511 if ((download_mgr->watch_fds_inuse_ > download_mgr->watch_fds_max_)
512 && (download_mgr->watch_fds_inuse_
513 < download_mgr->watch_fds_size_ / 2)) {
514 download_mgr->watch_fds_size_ /= 2;
515 // LogCvmfs(kLogDownload, kLogDebug, "shrinking watch_fds_ (%d)",
516 // watch_fds_size_);
517 download_mgr->watch_fds_ = static_cast<struct pollfd *>(
518 srealloc(download_mgr->watch_fds_,
519 download_mgr->watch_fds_size_ * sizeof(struct pollfd)));
520 // LogCvmfs(kLogDownload, kLogDebug, "shrinking watch_fds_ done",
521 // watch_fds_size_);
522 }
523 break;
524 default:
525 break;
526 }
527
528 return 0;
529 }
530
531
532 /**
533 * Worker thread event loop. Waits on new JobInfo structs on a pipe.
534 */
535 void *DownloadManager::MainDownload(void *data) {
536 DownloadManager *download_mgr = static_cast<DownloadManager *>(data);
537 LogCvmfs(kLogDownload, kLogDebug,
538 "download I/O thread of DownloadManager '%s' started",
539 download_mgr->name_.c_str());
540
541 const int kIdxPipeTerminate = 0;
542 const int kIdxPipeJobs = 1;
543
544 download_mgr->watch_fds_ = static_cast<struct pollfd *>(
545 smalloc(2 * sizeof(struct pollfd)));
546 download_mgr->watch_fds_size_ = 2;
547 download_mgr->watch_fds_[kIdxPipeTerminate].fd = download_mgr->pipe_terminate_
548 ->GetReadFd();
549 download_mgr->watch_fds_[kIdxPipeTerminate].events = POLLIN | POLLPRI;
550 download_mgr->watch_fds_[kIdxPipeTerminate].revents = 0;
551 download_mgr->watch_fds_[kIdxPipeJobs].fd = download_mgr->pipe_jobs_
552 ->GetReadFd();
553 download_mgr->watch_fds_[kIdxPipeJobs].events = POLLIN | POLLPRI;
554 download_mgr->watch_fds_[kIdxPipeJobs].revents = 0;
555 download_mgr->watch_fds_inuse_ = 2;
556
557 int still_running = 0;
558 struct timeval timeval_start, timeval_stop;
559 gettimeofday(&timeval_start, NULL);
560 while (true) {
561 int timeout;
562 if (still_running) {
563 /* NOTE: The following might degrade the performance for many small files
564 * use case. TODO(jblomer): look into it.
565 // Specify a timeout for polling in ms; this allows us to return
566 // to libcurl once a second so it can look for internal operations
567 // which timed out. libcurl has a more elaborate mechanism
568 // (CURLMOPT_TIMERFUNCTION) that would inform us of the next potential
569 // timeout. TODO(bbockelm) we should switch to that in the future.
570 timeout = 100;
571 */
572 timeout = 1;
573 } else {
574 timeout = -1;
575 gettimeofday(&timeval_stop, NULL);
576 const int64_t delta = static_cast<int64_t>(
577 1000 * DiffTimeSeconds(timeval_start, timeval_stop));
578 perf::Xadd(download_mgr->counters_->sz_transfer_time, delta);
579 }
580 const int retval = poll(download_mgr->watch_fds_,
581 download_mgr->watch_fds_inuse_, timeout);
582 if (retval < 0) {
583 continue;
584 }
585
586 // Handle timeout
587 if (retval == 0) {
588 curl_multi_socket_action(download_mgr->curl_multi_, CURL_SOCKET_TIMEOUT,
589 0, &still_running);
590 }
591
592 // Terminate I/O thread
593 if (download_mgr->watch_fds_[kIdxPipeTerminate].revents)
594 break;
595
596 // New job arrives
597 if (download_mgr->watch_fds_[kIdxPipeJobs].revents) {
598 download_mgr->watch_fds_[kIdxPipeJobs].revents = 0;
599 JobInfo *info;
600 download_mgr->pipe_jobs_->Read<JobInfo *>(&info);
601 if (!still_running) {
602 gettimeofday(&timeval_start, NULL);
603 }
604 CURL *handle = download_mgr->AcquireCurlHandle();
605 download_mgr->InitializeRequest(info, handle);
606 download_mgr->SetUrlOptions(info);
607 curl_multi_add_handle(download_mgr->curl_multi_, handle);
608 curl_multi_socket_action(download_mgr->curl_multi_, CURL_SOCKET_TIMEOUT,
609 0, &still_running);
610 }
611
612 // Activity on curl sockets
613 // Within this loop the curl_multi_socket_action() may cause socket(s)
614 // to be removed from watch_fds_. If a socket is removed it is replaced
615 // by the socket at the end of the array and the inuse count is decreased.
616 // Therefore loop over the array in reverse order.
617 for (int64_t i = download_mgr->watch_fds_inuse_ - 1; i >= 2; --i) {
618 if (i >= download_mgr->watch_fds_inuse_) {
619 continue;
620 }
621 if (download_mgr->watch_fds_[i].revents) {
622 int ev_bitmask = 0;
623 if (download_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI))
624 ev_bitmask |= CURL_CSELECT_IN;
625 if (download_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
626 ev_bitmask |= CURL_CSELECT_OUT;
627 if (download_mgr->watch_fds_[i].revents
628 & (POLLERR | POLLHUP | POLLNVAL)) {
629 ev_bitmask |= CURL_CSELECT_ERR;
630 }
631 download_mgr->watch_fds_[i].revents = 0;
632
633 curl_multi_socket_action(download_mgr->curl_multi_,
634 download_mgr->watch_fds_[i].fd,
635 ev_bitmask,
636 &still_running);
637 }
638 }
639
640 // Check if transfers are completed
641 CURLMsg *curl_msg;
642 int msgs_in_queue;
643 while ((curl_msg = curl_multi_info_read(download_mgr->curl_multi_,
644 &msgs_in_queue))) {
645 if (curl_msg->msg == CURLMSG_DONE) {
646 perf::Inc(download_mgr->counters_->n_requests);
647 JobInfo *info;
648 CURL *easy_handle = curl_msg->easy_handle;
649 const int curl_error = curl_msg->data.result;
650 curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
651
652 int64_t redir_count;
653 curl_easy_getinfo(easy_handle, CURLINFO_REDIRECT_COUNT, &redir_count);
654 LogCvmfs(kLogDownload, kLogDebug,
655 "(manager '%s' - id %" PRId64 ") "
656 "Number of CURL redirects %" PRId64,
657 download_mgr->name_.c_str(), info->id(), redir_count);
658
659 curl_multi_remove_handle(download_mgr->curl_multi_, easy_handle);
660 if (download_mgr->VerifyAndFinalize(curl_error, info)) {
661 curl_multi_add_handle(download_mgr->curl_multi_, easy_handle);
662 curl_multi_socket_action(download_mgr->curl_multi_,
663 CURL_SOCKET_TIMEOUT,
664 0,
665 &still_running);
666 } else {
667 // Return easy handle into pool and write result back
668 download_mgr->ReleaseCurlHandle(easy_handle, true /* allow_reuse */);
669
670 DataTubeElement *ele = new DataTubeElement(kActionStop);
671 info->GetDataTubePtr()->EnqueueBack(ele);
672 info->GetPipeJobResultPtr()->Write<download::Failures>(
673 info->error_code());
674 }
675 }
676 }
677 }
678
679 for (set<CURL *>::iterator i = download_mgr->pool_handles_inuse_->begin(),
680 iEnd = download_mgr->pool_handles_inuse_->end();
681 i != iEnd;
682 ++i) {
683 curl_multi_remove_handle(download_mgr->curl_multi_, *i);
684 curl_easy_cleanup(*i);
685 }
686 download_mgr->pool_handles_inuse_->clear();
687 free(download_mgr->watch_fds_);
688
689 LogCvmfs(kLogDownload, kLogDebug,
690 "download I/O thread of DownloadManager '%s' terminated",
691 download_mgr->name_.c_str());
692 return NULL;
693 }
694
695
696 //------------------------------------------------------------------------------
697
698
699 3076 HeaderLists::~HeaderLists() {
700
2/2
✓ Branch 1 taken 3117 times.
✓ Branch 2 taken 3076 times.
6193 for (unsigned i = 0; i < blocks_.size(); ++i) {
701
1/2
✓ Branch 1 taken 3117 times.
✗ Branch 2 not taken.
3117 delete[] blocks_[i];
702 }
703 3076 blocks_.clear();
704 3076 }
705
706
707 28438 curl_slist *HeaderLists::GetList(const char *header) { return Get(header); }
708
709
710 4164 curl_slist *HeaderLists::DuplicateList(curl_slist *slist) {
711
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4164 times.
4164 assert(slist);
712 4164 curl_slist *copy = GetList(slist->data);
713 4164 copy->next = slist->next;
714 4164 curl_slist *prev = copy;
715 4164 slist = slist->next;
716
2/2
✓ Branch 0 taken 8328 times.
✓ Branch 1 taken 4164 times.
12492 while (slist) {
717 8328 curl_slist *new_link = Get(slist->data);
718 8328 new_link->next = slist->next;
719 8328 prev->next = new_link;
720 8328 prev = new_link;
721 8328 slist = slist->next;
722 }
723 4164 return copy;
724 }
725
726
727 6348 void HeaderLists::AppendHeader(curl_slist *slist, const char *header) {
728
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6348 times.
6348 assert(slist);
729 6348 curl_slist *new_link = Get(header);
730 6348 new_link->next = NULL;
731
732
2/2
✓ Branch 0 taken 3521 times.
✓ Branch 1 taken 6348 times.
9869 while (slist->next)
733 3521 slist = slist->next;
734 6348 slist->next = new_link;
735 6348 }
736
737
738 /**
739 * Ensures that a certain header string is _not_ part of slist on return.
740 * Note that if the first header element matches, the returned slist points
741 * to a different value.
742 */
743 205 void HeaderLists::CutHeader(const char *header, curl_slist **slist) {
744
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 205 times.
205 assert(slist);
745 curl_slist head;
746 205 head.next = *slist;
747 205 curl_slist *prev = &head;
748 205 curl_slist *rover = *slist;
749
2/2
✓ Branch 0 taken 369 times.
✓ Branch 1 taken 205 times.
574 while (rover) {
750
2/2
✓ Branch 0 taken 164 times.
✓ Branch 1 taken 205 times.
369 if (strcmp(rover->data, header) == 0) {
751 164 prev->next = rover->next;
752 164 Put(rover);
753 164 rover = prev;
754 }
755 369 prev = rover;
756 369 rover = rover->next;
757 }
758 205 *slist = head.next;
759 205 }
760
761
762 14824 void HeaderLists::PutList(curl_slist *slist) {
763
2/2
✓ Branch 0 taken 23428 times.
✓ Branch 1 taken 14824 times.
38252 while (slist) {
764 23428 curl_slist *next = slist->next;
765 23428 Put(slist);
766 23428 slist = next;
767 }
768 14824 }
769
770
771 164 string HeaderLists::Print(curl_slist *slist) {
772 164 string verbose;
773
2/2
✓ Branch 0 taken 328 times.
✓ Branch 1 taken 164 times.
492 while (slist) {
774
3/6
✓ Branch 2 taken 328 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 328 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 328 times.
✗ Branch 9 not taken.
328 verbose += string(slist->data) + "\n";
775 328 slist = slist->next;
776 }
777 164 return verbose;
778 }
779
780
781 43114 curl_slist *HeaderLists::Get(const char *header) {
782
2/2
✓ Branch 1 taken 39996 times.
✓ Branch 2 taken 3159 times.
43155 for (unsigned i = 0; i < blocks_.size(); ++i) {
783
2/2
✓ Branch 0 taken 2797559 times.
✓ Branch 1 taken 41 times.
2797600 for (unsigned j = 0; j < kBlockSize; ++j) {
784
2/2
✓ Branch 2 taken 39955 times.
✓ Branch 3 taken 2757604 times.
2797559 if (!IsUsed(&(blocks_[i][j]))) {
785 39955 blocks_[i][j].data = const_cast<char *>(header);
786 39955 return &(blocks_[i][j]);
787 }
788 }
789 }
790
791 // All used, new block
792 3159 AddBlock();
793 3159 blocks_[blocks_.size() - 1][0].data = const_cast<char *>(header);
794 3159 return &(blocks_[blocks_.size() - 1][0]);
795 }
796
797
798 832296 void HeaderLists::Put(curl_slist *slist) {
799 832296 slist->data = NULL;
800 832296 slist->next = NULL;
801 832296 }
802
803
804 3159 void HeaderLists::AddBlock() {
805
1/2
✓ Branch 1 taken 3159 times.
✗ Branch 2 not taken.
3159 curl_slist *new_block = new curl_slist[kBlockSize];
806
2/2
✓ Branch 0 taken 808704 times.
✓ Branch 1 taken 3159 times.
811863 for (unsigned i = 0; i < kBlockSize; ++i) {
807 808704 Put(&new_block[i]);
808 }
809
1/2
✓ Branch 1 taken 3159 times.
✗ Branch 2 not taken.
3159 blocks_.push_back(new_block);
810 3159 }
811
812
813 //------------------------------------------------------------------------------
814
815
816 string DownloadManager::ProxyInfo::Print() {
817 if (url == "DIRECT")
818 return url;
819
820 string result = url;
821 const int remaining = static_cast<int>(host.deadline())
822 - static_cast<int>(time(NULL));
823 string expinfo = (remaining >= 0) ? "+" : "";
824 if (abs(remaining) >= 3600) {
825 expinfo += StringifyInt(remaining / 3600) + "h";
826 } else if (abs(remaining) >= 60) {
827 expinfo += StringifyInt(remaining / 60) + "m";
828 } else {
829 expinfo += StringifyInt(remaining) + "s";
830 }
831 if (host.status() == dns::kFailOk) {
832 result += " (" + host.name() + ", " + expinfo + ")";
833 } else {
834 result += " (:unresolved:, " + expinfo + ")";
835 }
836 return result;
837 }
838
839
840 /**
841 * Gets an idle CURL handle from the pool. Creates a new one and adds it to
842 * the pool if necessary.
843 */
844 4164 CURL *DownloadManager::AcquireCurlHandle() {
845 CURL *handle;
846
847
1/2
✓ Branch 1 taken 4164 times.
✗ Branch 2 not taken.
4164 if (pool_handles_idle_->empty()) {
848 // Create a new handle
849
1/2
✓ Branch 1 taken 4164 times.
✗ Branch 2 not taken.
4164 handle = curl_easy_init();
850
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4164 times.
4164 assert(handle != NULL);
851
852
1/2
✓ Branch 1 taken 4164 times.
✗ Branch 2 not taken.
4164 curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
853 // curl_easy_setopt(curl_default, CURLOPT_FAILONERROR, 1);
854
1/2
✓ Branch 1 taken 4164 times.
✗ Branch 2 not taken.
4164 curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION, CallbackCurlHeader);
855
1/2
✓ Branch 1 taken 4164 times.
✗ Branch 2 not taken.
4164 curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlData);
856 } else {
857 handle = *(pool_handles_idle_->begin());
858 pool_handles_idle_->erase(pool_handles_idle_->begin());
859 }
860
861
1/2
✓ Branch 1 taken 4164 times.
✗ Branch 2 not taken.
4164 pool_handles_inuse_->insert(handle);
862
863 4164 return handle;
864 }
865
866
867 4164 void DownloadManager::ReleaseCurlHandle(CURL *handle, bool allow_reuse) {
868
1/2
✓ Branch 1 taken 4164 times.
✗ Branch 2 not taken.
4164 const set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
869
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 4164 times.
4164 assert(elem != pool_handles_inuse_->end());
870
871
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 4164 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 4164 times.
✗ Branch 6 not taken.
4164 if (!allow_reuse || pool_handles_idle_->size() > pool_max_handles_) {
872
1/2
✓ Branch 2 taken 4164 times.
✗ Branch 3 not taken.
4164 curl_easy_cleanup(*elem);
873 } else {
874 pool_handles_idle_->insert(*elem);
875 }
876
877
1/2
✓ Branch 1 taken 4164 times.
✗ Branch 2 not taken.
4164 pool_handles_inuse_->erase(elem);
878 4164 }
879
880
881 /**
882 * HTTP request options: set the URL and other options such as timeout and
883 * proxy.
884 */
885 4164 void DownloadManager::InitializeRequest(JobInfo *info, CURL *handle) {
886 // Initialize internal download state
887 4164 info->SetCurlHandle(handle);
888 4164 info->SetErrorCode(kFailOk);
889 4164 info->SetHttpCode(-1);
890 4164 info->SetFollowRedirects(follow_redirects_);
891 4164 info->SetNumUsedProxies(1);
892 4164 info->SetNumUsedMetalinks(1);
893 4164 info->SetNumUsedHosts(1);
894 4164 info->SetNumRetries(0);
895 4164 info->SetBackoffMs(0);
896 4164 info->SetHeaders(header_lists_->DuplicateList(default_headers_));
897
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4164 times.
4164 if (info->info_header()) {
898 header_lists_->AppendHeader(info->headers(), info->info_header());
899 }
900
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4164 times.
4164 if (enable_http_tracing_) {
901 for (unsigned int i = 0; i < http_tracing_headers_.size(); i++) {
902 header_lists_->AppendHeader(info->headers(),
903 (http_tracing_headers_)[i].c_str());
904 }
905
906 header_lists_->AppendHeader(info->headers(), info->tracing_header_pid());
907 header_lists_->AppendHeader(info->headers(), info->tracing_header_gid());
908 header_lists_->AppendHeader(info->headers(), info->tracing_header_uid());
909
910 LogCvmfs(kLogDownload, kLogDebug,
911 "(manager '%s' - id %" PRId64 ") "
912 "CURL Header for URL: %s is:\n %s",
913 name_.c_str(), info->id(), info->url()->c_str(),
914 header_lists_->Print(info->headers()).c_str());
915 }
916
917
2/2
✓ Branch 1 taken 33 times.
✓ Branch 2 taken 4131 times.
4164 if (info->force_nocache()) {
918 33 SetNocache(info);
919 } else {
920 4131 info->SetNocache(false);
921 }
922
2/2
✓ Branch 1 taken 3497 times.
✓ Branch 2 taken 667 times.
4164 if (info->compressed()) {
923 3497 zlib::DecompressInit(info->GetZstreamPtr());
924 }
925
2/2
✓ Branch 1 taken 3498 times.
✓ Branch 2 taken 666 times.
4164 if (info->expected_hash()) {
926
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3498 times.
3498 assert(info->hash_context().buffer != NULL);
927 3498 shash::Init(info->hash_context());
928 }
929
930
2/6
✗ Branch 1 not taken.
✓ Branch 2 taken 4164 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 4164 times.
4164 if ((info->range_offset() != -1) && (info->range_size())) {
931 char byte_range_array[100];
932 const int64_t range_lower = static_cast<int64_t>(info->range_offset());
933 const int64_t range_upper = static_cast<int64_t>(info->range_offset()
934 + info->range_size() - 1);
935 if (snprintf(byte_range_array, sizeof(byte_range_array),
936 "%" PRId64 "-%" PRId64, range_lower, range_upper)
937 == 100) {
938 PANIC(NULL); // Should be impossible given limits on offset size.
939 }
940 curl_easy_setopt(handle, CURLOPT_RANGE, byte_range_array);
941 } else {
942 4164 curl_easy_setopt(handle, CURLOPT_RANGE, NULL);
943 }
944
945 // Set curl parameters
946 4164 curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
947 4164 curl_easy_setopt(handle, CURLOPT_WRITEHEADER, static_cast<void *>(info));
948 4164 curl_easy_setopt(handle, CURLOPT_WRITEDATA, static_cast<void *>(info));
949 4164 curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->headers());
950
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4164 times.
4164 if (info->head_request()) {
951 curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
952 } else {
953 4164 curl_easy_setopt(handle, CURLOPT_HTTPGET, 1);
954 }
955
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4164 times.
4164 if (opt_ipv4_only_) {
956 curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
957 }
958
2/2
✓ Branch 0 taken 2430 times.
✓ Branch 1 taken 1734 times.
4164 if (follow_redirects_) {
959 2430 curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1);
960 2430 curl_easy_setopt(handle, CURLOPT_MAXREDIRS, 4);
961 }
962 #ifdef DEBUGMSG
963 4164 curl_easy_setopt(handle, CURLOPT_VERBOSE, 1);
964 4164 curl_easy_setopt(handle, CURLOPT_DEBUGFUNCTION, CallbackCurlDebug);
965 #endif
966 4164 }
967
968 8376 void DownloadManager::CheckHostInfoReset(const std::string &typ,
969 HostInfo &info,
970 JobInfo *jobinfo,
971 time_t &now) {
972
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8376 times.
8376 if (info.timestamp_backup > 0) {
973 if (now == 0)
974 now = time(NULL);
975 if (static_cast<int64_t>(now)
976 > static_cast<int64_t>(info.timestamp_backup + info.reset_after)) {
977 LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
978 "(manager %s - id %" PRId64 ") "
979 "switching %s from %s to %s (reset %s)",
980 name_.c_str(), jobinfo->id(), typ.c_str(),
981 (*info.chain)[info.current].c_str(), (*info.chain)[0].c_str(),
982 typ.c_str());
983 info.current = 0;
984 info.timestamp_backup = 0;
985 }
986 }
987 8376 }
988
989
990 /**
991 * Sets the URL specific options such as host to use and timeout. It might also
992 * set an error code, in which case the further processing should react on.
993 */
994 4188 void DownloadManager::SetUrlOptions(JobInfo *info) {
995 4188 CURL *curl_handle = info->curl_handle();
996 4188 string url_prefix;
997 4188 time_t now = 0;
998
999 4188 const MutexLockGuard m(lock_options_);
1000
1001 // sharding policy
1002
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4188 times.
4188 if (sharding_policy_.UseCount() > 0) {
1003 if (info->proxy() != "") {
1004 // proxy already set, so this is a failover event
1005 perf::Inc(counters_->n_proxy_failover);
1006 }
1007 info->SetProxy(sharding_policy_->GetNextProxy(
1008 info->url(), info->proxy(),
1009 info->range_offset() == -1 ? 0 : info->range_offset()));
1010
1011 curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY, info->proxy().c_str());
1012 } else { // no sharding policy
1013 // Check if proxy group needs to be reset from backup to primary
1014
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4188 times.
4188 if (opt_timestamp_backup_proxies_ > 0) {
1015 now = time(NULL);
1016 if (static_cast<int64_t>(now) > static_cast<int64_t>(
1017 opt_timestamp_backup_proxies_ + opt_proxy_groups_reset_after_)) {
1018 opt_proxy_groups_current_ = 0;
1019 opt_timestamp_backup_proxies_ = 0;
1020 RebalanceProxiesUnlocked("Reset proxy group from backup to primary");
1021 }
1022 }
1023 // Check if load-balanced proxies within the group need to be reset
1024
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4188 times.
4188 if (opt_timestamp_failover_proxies_ > 0) {
1025 if (now == 0)
1026 now = time(NULL);
1027 if (static_cast<int64_t>(now)
1028 > static_cast<int64_t>(opt_timestamp_failover_proxies_
1029 + opt_proxy_groups_reset_after_)) {
1030 RebalanceProxiesUnlocked(
1031 "Reset load-balanced proxies within the active group");
1032 }
1033 }
1034
1035
1/2
✓ Branch 2 taken 4188 times.
✗ Branch 3 not taken.
4188 ProxyInfo *proxy = ChooseProxyUnlocked(info->expected_hash());
1036
6/6
✓ Branch 0 taken 1089 times.
✓ Branch 1 taken 3099 times.
✓ Branch 3 taken 1057 times.
✓ Branch 4 taken 32 times.
✓ Branch 5 taken 4156 times.
✓ Branch 6 taken 32 times.
4188 if (!proxy || (proxy->url == "DIRECT")) {
1037
2/4
✓ Branch 2 taken 4156 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4156 times.
✗ Branch 6 not taken.
4156 info->SetProxy("DIRECT");
1038
1/2
✓ Branch 2 taken 4156 times.
✗ Branch 3 not taken.
4156 curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY, "");
1039 } else {
1040 // Note: inside ValidateProxyIpsUnlocked() we may change the proxy data
1041 // structure, so we must not pass proxy->... (== current_proxy())
1042 // parameters directly
1043
1/2
✓ Branch 1 taken 32 times.
✗ Branch 2 not taken.
32 const std::string purl = proxy->url;
1044
1/2
✓ Branch 1 taken 32 times.
✗ Branch 2 not taken.
32 const dns::Host phost = proxy->host;
1045
1/2
✓ Branch 1 taken 32 times.
✗ Branch 2 not taken.
32 const bool changed = ValidateProxyIpsUnlocked(purl, phost);
1046 // Current proxy may have changed
1047
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 32 times.
32 if (changed) {
1048 proxy = ChooseProxyUnlocked(info->expected_hash());
1049 }
1050
1/2
✓ Branch 1 taken 32 times.
✗ Branch 2 not taken.
32 info->SetProxy(proxy->url);
1051
1/2
✓ Branch 1 taken 32 times.
✗ Branch 2 not taken.
32 if (proxy->host.status() == dns::kFailOk) {
1052
2/4
✓ Branch 1 taken 32 times.
✗ Branch 2 not taken.
✓ Branch 6 taken 32 times.
✗ Branch 7 not taken.
32 curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY,
1053 info->proxy().c_str());
1054 } else {
1055 // We know it can't work, don't even try to download
1056 curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY, "0.0.0.0");
1057 }
1058 32 }
1059 } // end !sharding
1060
1061 // Check if metalink and host chains need to be reset
1062
2/4
✓ Branch 2 taken 4188 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4188 times.
✗ Branch 6 not taken.
4188 CheckHostInfoReset("metalink", opt_metalink_, info, now);
1063
2/4
✓ Branch 2 taken 4188 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4188 times.
✗ Branch 6 not taken.
4188 CheckHostInfoReset("host", opt_host_, info, now);
1064
1065
1/2
✓ Branch 1 taken 4188 times.
✗ Branch 2 not taken.
4188 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, opt_low_speed_limit_);
1066
3/6
✓ Branch 1 taken 4188 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 32 times.
✓ Branch 6 taken 4156 times.
4188 if (info->proxy() != "DIRECT") {
1067
1/2
✓ Branch 1 taken 32 times.
✗ Branch 2 not taken.
32 curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_proxy_);
1068
1/2
✓ Branch 1 taken 32 times.
✗ Branch 2 not taken.
32 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_proxy_);
1069 } else {
1070
1/2
✓ Branch 1 taken 4156 times.
✗ Branch 2 not taken.
4156 curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_direct_);
1071
1/2
✓ Branch 1 taken 4156 times.
✗ Branch 2 not taken.
4156 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_direct_);
1072 }
1073
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4188 times.
4188 if (!opt_dns_server_.empty())
1074 curl_easy_setopt(curl_handle, CURLOPT_DNS_SERVERS, opt_dns_server_.c_str());
1075
1076
2/2
✓ Branch 1 taken 1255 times.
✓ Branch 2 taken 2933 times.
4188 if (info->probe_hosts()) {
1077
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1255 times.
1255 if (CheckMetalinkChain(now)) {
1078 url_prefix = (*opt_metalink_.chain)[opt_metalink_.current];
1079 info->SetCurrentMetalinkChainIndex(opt_metalink_.current);
1080 LogCvmfs(kLogDownload, kLogDebug,
1081 "(manager %s - id %" PRId64 ") "
1082 "reading from metalink %d",
1083 name_.c_str(), info->id(), opt_metalink_.current);
1084
1/2
✓ Branch 0 taken 1255 times.
✗ Branch 1 not taken.
1255 } else if (opt_host_.chain) {
1085
1/2
✓ Branch 2 taken 1255 times.
✗ Branch 3 not taken.
1255 url_prefix = (*opt_host_.chain)[opt_host_.current];
1086 1255 info->SetCurrentHostChainIndex(opt_host_.current);
1087
1/2
✓ Branch 3 taken 1255 times.
✗ Branch 4 not taken.
1255 LogCvmfs(kLogDownload, kLogDebug,
1088 "(manager %s - id %" PRId64 ") "
1089 "reading from host %d",
1090 name_.c_str(), info->id(), opt_host_.current);
1091 }
1092 }
1093
1094
1/2
✓ Branch 2 taken 4188 times.
✗ Branch 3 not taken.
4188 string url = url_prefix + *(info->url());
1095
1096
1/2
✓ Branch 1 taken 4188 times.
✗ Branch 2 not taken.
4188 curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 1L);
1097
2/6
✓ Branch 1 taken 4188 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 4188 times.
4188 if (url.substr(0, 5) == "https") {
1098 const bool rvb = ssl_certificate_store_.ApplySslCertificatePath(
1099 curl_handle);
1100 if (!rvb) {
1101 LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
1102 "(manager %s - id %" PRId64 ") "
1103 "Failed to set SSL certificate path %s",
1104 name_.c_str(), info->id(),
1105 ssl_certificate_store_.GetCaPath().c_str());
1106 }
1107 if (info->pid() != -1) {
1108 if (credentials_attachment_ == NULL) {
1109 LogCvmfs(kLogDownload, kLogDebug,
1110 "(manager %s - id %" PRId64 ") "
1111 "uses secure downloads but no credentials attachment set",
1112 name_.c_str(), info->id());
1113 } else {
1114 const bool retval = credentials_attachment_->ConfigureCurlHandle(
1115 curl_handle, info->pid(), info->GetCredDataPtr());
1116 if (!retval) {
1117 LogCvmfs(kLogDownload, kLogDebug,
1118 "(manager %s - id %" PRId64 ") "
1119 "failed attaching credentials",
1120 name_.c_str(), info->id());
1121 }
1122 }
1123 }
1124 // The download manager disables signal handling in the curl library;
1125 // as OpenSSL's implementation of TLS will generate a sigpipe in some
1126 // error paths, we must explicitly disable SIGPIPE here.
1127 // TODO(jblomer): it should be enough to do this once
1128 signal(SIGPIPE, SIG_IGN);
1129 }
1130
1131
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4188 times.
4188 if (url.find("@proxy@") != string::npos) {
1132 // This is used in Geo-API requests (only), to replace a portion of the
1133 // URL with the current proxy name for the sake of caching the result.
1134 // Replace the @proxy@ either with a passed in "forced" template (which
1135 // is set from $CVMFS_PROXY_TEMPLATE) if there is one, or a "direct"
1136 // template (which is the uuid) if there's no proxy, or the name of the
1137 // proxy.
1138 string replacement;
1139 if (proxy_template_forced_ != "") {
1140 replacement = proxy_template_forced_;
1141 } else if (info->proxy() == "DIRECT") {
1142 replacement = proxy_template_direct_;
1143 } else {
1144 if (opt_proxy_groups_current_ >= opt_proxy_groups_fallback_) {
1145 // It doesn't make sense to use the fallback proxies in Geo-API requests
1146 // since the fallback proxies are supposed to get sorted, too.
1147 info->SetProxy("DIRECT");
1148 curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY, "");
1149 replacement = proxy_template_direct_;
1150 } else {
1151 replacement = ChooseProxyUnlocked(info->expected_hash())->host.name();
1152 }
1153 }
1154 replacement = (replacement == "") ? proxy_template_direct_ : replacement;
1155 LogCvmfs(kLogDownload, kLogDebug,
1156 "(manager %s - id %" PRId64 ") "
1157 "replacing @proxy@ by %s",
1158 name_.c_str(), info->id(), replacement.c_str());
1159 url = ReplaceAll(url, "@proxy@", replacement);
1160 }
1161
1162 // TODO(heretherebedragons) before removing
1163 // static_cast<cvmfs::MemSink*>(info->sink)->size() == 0
1164 // and just always call info->sink->Reserve()
1165 // we should do a speed check
1166
3/4
✓ Branch 2 taken 4188 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 802 times.
✓ Branch 5 taken 3386 times.
4188 if ((info->sink() != NULL) && info->sink()->RequiresReserve()
1167
1/2
✓ Branch 2 taken 802 times.
✗ Branch 3 not taken.
802 && (static_cast<cvmfs::MemSink *>(info->sink())->size() == 0)
1168
11/20
✓ Branch 1 taken 4188 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 802 times.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 802 times.
✗ Branch 9 not taken.
✓ Branch 10 taken 658 times.
✓ Branch 11 taken 144 times.
✓ Branch 12 taken 802 times.
✓ Branch 13 taken 3386 times.
✗ Branch 14 not taken.
✓ Branch 15 taken 802 times.
✓ Branch 16 taken 3386 times.
✓ Branch 18 taken 658 times.
✓ Branch 19 taken 3530 times.
✗ Branch 20 not taken.
✗ Branch 21 not taken.
✗ Branch 23 not taken.
✗ Branch 24 not taken.
8376 && HasPrefix(url, "file://", false)) {
1169 platform_stat64 stat_buf;
1170 658 const int retval = platform_stat(url.c_str(), &stat_buf);
1171
1/2
✓ Branch 0 taken 658 times.
✗ Branch 1 not taken.
658 if (retval != 0) {
1172 // this is an error: file does not exist or out of memory
1173 // error is caught in other code section.
1174
1/2
✓ Branch 2 taken 658 times.
✗ Branch 3 not taken.
658 info->sink()->Reserve(64ul * 1024ul);
1175 } else {
1176 info->sink()->Reserve(stat_buf.st_size);
1177 }
1178 }
1179
1180
2/4
✓ Branch 2 taken 4188 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 4188 times.
✗ Branch 7 not taken.
4188 curl_easy_setopt(curl_handle, CURLOPT_URL,
1181 EscapeUrl(info->id(), url).c_str());
1182 4188 }
1183
1184
1185 /**
1186 * Checks if the name resolving information is still up to date. The host
1187 * object should be one from the current load-balance group. If the information
1188 * changed, gather new set of resolved IPs and, if different, exchange them in
1189 * the load-balance group on the fly. In the latter case, also rebalance the
1190 * proxies. The options mutex needs to be open.
1191 *
1192 * Returns true if proxies may have changed.
1193 */
1194 32 bool DownloadManager::ValidateProxyIpsUnlocked(const string &url,
1195 const dns::Host &host) {
1196
2/4
✓ Branch 1 taken 32 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 32 times.
✗ Branch 4 not taken.
32 if (!host.IsExpired())
1197 32 return false;
1198 LogCvmfs(kLogDownload, kLogDebug, "(manager '%s') validate DNS entry for %s",
1199 name_.c_str(), host.name().c_str());
1200
1201 const unsigned group_idx = opt_proxy_groups_current_;
1202 dns::Host new_host = resolver_->Resolve(host.name());
1203
1204 bool update_only = true; // No changes to the list of IP addresses.
1205 if (new_host.status() != dns::kFailOk) {
1206 // Try again later in case resolving fails.
1207 LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
1208 "(manager '%s') failed to resolve IP addresses for %s (%d - %s)",
1209 name_.c_str(), host.name().c_str(), new_host.status(),
1210 dns::Code2Ascii(new_host.status()));
1211 new_host = dns::Host::ExtendDeadline(host, resolver_->min_ttl());
1212 } else if (!host.IsEquivalent(new_host)) {
1213 update_only = false;
1214 }
1215
1216 if (update_only) {
1217 for (unsigned i = 0; i < (*opt_proxy_groups_)[group_idx].size(); ++i) {
1218 if ((*opt_proxy_groups_)[group_idx][i].host.id() == host.id())
1219 (*opt_proxy_groups_)[group_idx][i].host = new_host;
1220 }
1221 return false;
1222 }
1223
1224 assert(new_host.status() == dns::kFailOk);
1225
1226 // Remove old host objects, insert new objects, and rebalance.
1227 LogCvmfs(kLogDownload, kLogDebug | kLogSyslog,
1228 "(manager '%s') DNS entries for proxy %s changed, adjusting",
1229 name_.c_str(), host.name().c_str());
1230 vector<ProxyInfo> *group = current_proxy_group();
1231 opt_num_proxies_ -= group->size();
1232 for (unsigned i = 0; i < group->size();) {
1233 if ((*group)[i].host.id() == host.id()) {
1234 group->erase(group->begin() + i);
1235 } else {
1236 i++;
1237 }
1238 }
1239 vector<ProxyInfo> new_infos;
1240 set<string> const best_addresses = new_host.ViewBestAddresses(opt_ip_preference_);
1241 set<string>::const_iterator iter_ips = best_addresses.begin();
1242 for (; iter_ips != best_addresses.end(); ++iter_ips) {
1243 const string url_ip = dns::RewriteUrl(url, *iter_ips);
1244 new_infos.push_back(ProxyInfo(new_host, url_ip));
1245 }
1246 group->insert(group->end(), new_infos.begin(), new_infos.end());
1247 opt_num_proxies_ += new_infos.size();
1248
1249 const std::string msg = "DNS entries for proxy " + host.name() + " changed";
1250
1251 RebalanceProxiesUnlocked(msg);
1252 return true;
1253 }
1254
1255
1256 /**
1257 * Adds transfer time and downloaded bytes to the global counters.
1258 */
1259 4436 void DownloadManager::UpdateStatistics(CURL *handle) {
1260 double val;
1261 int retval;
1262 4436 int64_t sum = 0;
1263
1264
1/2
✓ Branch 1 taken 4436 times.
✗ Branch 2 not taken.
4436 retval = curl_easy_getinfo(handle, CURLINFO_SIZE_DOWNLOAD, &val);
1265
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4436 times.
4436 assert(retval == CURLE_OK);
1266 4436 sum += static_cast<int64_t>(val);
1267 /*retval = curl_easy_getinfo(handle, CURLINFO_HEADER_SIZE, &val);
1268 assert(retval == CURLE_OK);
1269 sum += static_cast<int64_t>(val);*/
1270 4436 perf::Xadd(counters_->sz_transferred_bytes, sum);
1271 4436 }
1272
1273
1274 /**
1275 * Retry if possible if not on no-cache and if not already done too often.
1276 */
1277 4436 bool DownloadManager::CanRetry(const JobInfo *info) {
1278 4436 const MutexLockGuard m(lock_options_);
1279 4436 const unsigned max_retries = opt_max_retries_;
1280
1281
2/2
✓ Branch 2 taken 3069 times.
✓ Branch 3 taken 1311 times.
8816 return !(info->nocache()) && (info->num_retries() < max_retries)
1282
3/4
✓ Branch 0 taken 4380 times.
✓ Branch 1 taken 56 times.
✓ Branch 4 taken 3069 times.
✗ Branch 5 not taken.
11885 && (IsProxyTransferError(info->error_code())
1283
2/2
✓ Branch 2 taken 225 times.
✓ Branch 3 taken 2844 times.
3069 || IsHostTransferError(info->error_code()));
1284 4436 }
1285
1286 /**
1287 * Backoff for retry to introduce a jitter into a cluster of requesting
1288 * cvmfs nodes.
1289 * Retry only when HTTP caching is on.
1290 *
1291 * \return true if backoff has been performed, false otherwise
1292 */
1293 225 void DownloadManager::Backoff(JobInfo *info) {
1294 225 unsigned backoff_init_ms = 0;
1295 225 unsigned backoff_max_ms = 0;
1296 {
1297 225 const MutexLockGuard m(lock_options_);
1298 225 backoff_init_ms = opt_backoff_init_ms_;
1299 225 backoff_max_ms = opt_backoff_max_ms_;
1300 225 }
1301
1302 225 info->SetNumRetries(info->num_retries() + 1);
1303 225 perf::Inc(counters_->n_retries);
1304
2/2
✓ Branch 1 taken 91 times.
✓ Branch 2 taken 134 times.
225 if (info->backoff_ms() == 0) {
1305 91 info->SetBackoffMs(prng_.Next(backoff_init_ms + 1)); // Must be != 0
1306 } else {
1307 134 info->SetBackoffMs(info->backoff_ms() * 2);
1308 }
1309
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 225 times.
225 if (info->backoff_ms() > backoff_max_ms) {
1310 info->SetBackoffMs(backoff_max_ms);
1311 }
1312
1313 225 LogCvmfs(kLogDownload, kLogDebug,
1314 "(manager '%s' - id %" PRId64 ") backing off for %d ms",
1315 name_.c_str(), info->id(), info->backoff_ms());
1316 225 SafeSleepMs(info->backoff_ms());
1317 225 }
1318
1319 56 void DownloadManager::SetNocache(JobInfo *info) {
1320
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 56 times.
56 if (info->nocache())
1321 return;
1322 56 header_lists_->AppendHeader(info->headers(), "Pragma: no-cache");
1323 56 header_lists_->AppendHeader(info->headers(), "Cache-Control: no-cache");
1324 56 curl_easy_setopt(info->curl_handle(), CURLOPT_HTTPHEADER, info->headers());
1325 56 info->SetNocache(true);
1326 }
1327
1328
1329 /**
1330 * Reverse operation of SetNocache. Makes sure that "no-cache" header
1331 * disappears from the list of headers to let proxies work normally.
1332 */
1333 272 void DownloadManager::SetRegularCache(JobInfo *info) {
1334
1/2
✓ Branch 1 taken 272 times.
✗ Branch 2 not taken.
272 if (info->nocache() == false)
1335 272 return;
1336 header_lists_->CutHeader("Pragma: no-cache", info->GetHeadersPtr());
1337 header_lists_->CutHeader("Cache-Control: no-cache", info->GetHeadersPtr());
1338 curl_easy_setopt(info->curl_handle(), CURLOPT_HTTPHEADER, info->headers());
1339 info->SetNocache(false);
1340 }
1341
1342
1343 /**
1344 * Frees the storage associated with the authz attachment from the job
1345 */
1346 4188 void DownloadManager::ReleaseCredential(JobInfo *info) {
1347
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4188 times.
4188 if (info->cred_data()) {
1348 assert(credentials_attachment_ != NULL); // Someone must have set it
1349 credentials_attachment_->ReleaseCurlHandle(info->curl_handle(),
1350 info->cred_data());
1351 info->SetCredData(NULL);
1352 }
1353 4188 }
1354
1355
1356 /* Sort links based on the "pri=" parameter */
1357 static bool sortlinks(const std::string &s1, const std::string &s2) {
1358 const size_t pos1 = s1.find("; pri=");
1359 const size_t pos2 = s2.find("; pri=");
1360 int pri1, pri2;
1361 if ((pos1 != std::string::npos) && (pos2 != std::string::npos)
1362 && (sscanf(s1.substr(pos1 + 6).c_str(), "%d", &pri1) == 1)
1363 && (sscanf(s2.substr(pos2 + 6).c_str(), "%d", &pri2) == 1)) {
1364 return pri1 < pri2;
1365 }
1366 return false;
1367 }
1368
1369 /**
1370 * Parses Link header and uses it to set a new host chain.
1371 * See rfc6249.
1372 */
1373 void DownloadManager::ProcessLink(JobInfo *info) {
1374 std::vector<std::string> links = SplitString(info->link(), ',');
1375 if (info->link().find("; pri=") != std::string::npos)
1376 std::sort(links.begin(), links.end(), sortlinks);
1377
1378 std::vector<std::string> host_list;
1379
1380 std::vector<std::string>::const_iterator il = links.begin();
1381 for (; il != links.end(); ++il) {
1382 const std::string &link = *il;
1383 if ((link.find("; rel=duplicate") == std::string::npos)
1384 && (link.find("; rel=\"duplicate\"") == std::string::npos)) {
1385 LogCvmfs(kLogDownload, kLogDebug,
1386 "skipping link '%s' because it does not contain rel=duplicate",
1387 link.c_str());
1388 continue;
1389 }
1390 // ignore depth= field since there's nothing useful we can do with it
1391
1392 size_t start = link.find('<');
1393 if (start == std::string::npos) {
1394 LogCvmfs(
1395 kLogDownload, kLogDebug,
1396 "skipping link '%s' because it does not have a left angle bracket",
1397 link.c_str());
1398 continue;
1399 }
1400
1401 start++;
1402 if ((link.substr(start, 7) != "http://")
1403 && (link.substr(start, 8) != "https://")) {
1404 LogCvmfs(kLogDownload, kLogDebug,
1405 "skipping link '%s' of unrecognized url protocol", link.c_str());
1406 continue;
1407 }
1408
1409 size_t end = link.find('/', start + 8);
1410 if (end == std::string::npos)
1411 end = link.find('>');
1412 if (end == std::string::npos) {
1413 LogCvmfs(kLogDownload, kLogDebug,
1414 "skipping link '%s' because no slash in url and no right angle "
1415 "bracket",
1416 link.c_str());
1417 continue;
1418 }
1419 const std::string host = link.substr(start, end - start);
1420 LogCvmfs(kLogDownload, kLogDebug, "adding linked host '%s'", host.c_str());
1421 host_list.push_back(host);
1422 }
1423
1424 if (host_list.size() > 0) {
1425 SetHostChain(host_list);
1426 opt_metalink_timestamp_link_ = time(NULL);
1427 }
1428 }
1429
1430
1431 /**
1432 * Checks the result of a curl download and implements the failure logic, such
1433 * as changing the proxy server. Takes care of cleanup.
1434 *
1435 * \return true if another download should be performed, false otherwise
1436 */
1437 4436 bool DownloadManager::VerifyAndFinalize(const int curl_error, JobInfo *info) {
1438
1/2
✓ Branch 6 taken 4436 times.
✗ Branch 7 not taken.
4436 LogCvmfs(kLogDownload, kLogDebug,
1439 "(manager '%s' - id %" PRId64 ") "
1440 "Verify downloaded url %s, proxy %s (curl error %d)",
1441 name_.c_str(), info->id(), info->url()->c_str(),
1442
1/2
✓ Branch 1 taken 4436 times.
✗ Branch 2 not taken.
8872 info->proxy().c_str(), curl_error);
1443
1/2
✓ Branch 2 taken 4436 times.
✗ Branch 3 not taken.
4436 UpdateStatistics(info->curl_handle());
1444
1445 bool was_metalink;
1446 4436 std::string typ;
1447
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4436 times.
4436 if (info->current_metalink_chain_index() >= 0) {
1448 was_metalink = true;
1449 typ = "metalink";
1450 if (info->link() != "") {
1451 // process Link header whether or not the redirected URL got an error
1452 ProcessLink(info);
1453 }
1454 } else {
1455 4436 was_metalink = false;
1456
1/2
✓ Branch 1 taken 4436 times.
✗ Branch 2 not taken.
4436 typ = "host";
1457 }
1458
1459
1460 // Verification and error classification
1461
3/14
✓ Branch 0 taken 3987 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 18 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 431 times.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
✗ Branch 10 not taken.
✗ Branch 11 not taken.
✗ Branch 12 not taken.
✗ Branch 13 not taken.
4436 switch (curl_error) {
1462 3987 case CURLE_OK:
1463 // Verify content hash
1464
2/2
✓ Branch 1 taken 3433 times.
✓ Branch 2 taken 554 times.
3987 if (info->expected_hash()) {
1465
1/2
✓ Branch 1 taken 3433 times.
✗ Branch 2 not taken.
3433 shash::Any match_hash;
1466
1/2
✓ Branch 2 taken 3433 times.
✗ Branch 3 not taken.
3433 shash::Final(info->hash_context(), &match_hash);
1467
3/4
✓ Branch 2 taken 3433 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 46 times.
✓ Branch 5 taken 3387 times.
3433 if (match_hash != *(info->expected_hash())) {
1468
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 46 times.
46 if (ignore_signature_failures_) {
1469 LogCvmfs(
1470 kLogDownload, kLogDebug | kLogSyslogErr,
1471 "(manager '%s' - id %" PRId64 ") "
1472 "ignoring failed hash verification of %s (expected %s, got %s)",
1473 name_.c_str(), info->id(), info->url()->c_str(),
1474 info->expected_hash()->ToString().c_str(),
1475 match_hash.ToString().c_str());
1476 } else {
1477
1/2
✓ Branch 7 taken 46 times.
✗ Branch 8 not taken.
92 LogCvmfs(kLogDownload, kLogDebug,
1478 "(manager '%s' - id %" PRId64 ") "
1479 "hash verification of %s failed (expected %s, got %s)",
1480 name_.c_str(), info->id(), info->url()->c_str(),
1481
1/2
✓ Branch 2 taken 46 times.
✗ Branch 3 not taken.
92 info->expected_hash()->ToString().c_str(),
1482
1/2
✓ Branch 1 taken 46 times.
✗ Branch 2 not taken.
92 match_hash.ToString().c_str());
1483 46 info->SetErrorCode(kFailBadData);
1484 46 break;
1485 }
1486 }
1487 }
1488
1489 3941 info->SetErrorCode(kFailOk);
1490 3941 break;
1491 case CURLE_UNSUPPORTED_PROTOCOL:
1492 info->SetErrorCode(kFailUnsupportedProtocol);
1493 break;
1494 18 case CURLE_URL_MALFORMAT:
1495 18 info->SetErrorCode(kFailBadUrl);
1496 18 break;
1497 case CURLE_COULDNT_RESOLVE_PROXY:
1498 info->SetErrorCode(kFailProxyResolve);
1499 break;
1500 case CURLE_COULDNT_RESOLVE_HOST:
1501 info->SetErrorCode(kFailHostResolve);
1502 break;
1503 case CURLE_OPERATION_TIMEDOUT:
1504 info->SetErrorCode((info->proxy() == "DIRECT") ? kFailHostTooSlow
1505 : kFailProxyTooSlow);
1506 break;
1507 case CURLE_PARTIAL_FILE:
1508 case CURLE_GOT_NOTHING:
1509 case CURLE_RECV_ERROR:
1510 info->SetErrorCode((info->proxy() == "DIRECT") ? kFailHostShortTransfer
1511 : kFailProxyShortTransfer);
1512 break;
1513 431 case CURLE_FILE_COULDNT_READ_FILE:
1514 case CURLE_COULDNT_CONNECT:
1515
3/6
✓ Branch 1 taken 431 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 8 times.
✓ Branch 6 taken 423 times.
431 if (info->proxy() != "DIRECT") {
1516 // This is a guess. Fail-over can still change to switching host
1517 8 info->SetErrorCode(kFailProxyConnection);
1518 } else {
1519 423 info->SetErrorCode(kFailHostConnection);
1520 }
1521 431 break;
1522 case CURLE_TOO_MANY_REDIRECTS:
1523 info->SetErrorCode(kFailHostConnection);
1524 break;
1525 case CURLE_SSL_CACERT_BADFILE:
1526 LogCvmfs(kLogDownload, kLogDebug | kLogSyslogErr,
1527 "(manager '%s' -id %" PRId64 ") "
1528 "Failed to load certificate bundle. "
1529 "X509_CERT_BUNDLE might point to the wrong location.",
1530 name_.c_str(), info->id());
1531 info->SetErrorCode(kFailHostConnection);
1532 break;
1533 // As of curl 7.62.0, CURLE_SSL_CACERT is the same as
1534 // CURLE_PEER_FAILED_VERIFICATION
1535 case CURLE_PEER_FAILED_VERIFICATION:
1536 LogCvmfs(kLogDownload, kLogDebug | kLogSyslogErr,
1537 "(manager '%s' - id %" PRId64 ") "
1538 "invalid SSL certificate of remote host. "
1539 "X509_CERT_DIR and/or X509_CERT_BUNDLE might point to the wrong "
1540 "location.",
1541 name_.c_str(), info->id());
1542 info->SetErrorCode(kFailHostConnection);
1543 break;
1544 case CURLE_ABORTED_BY_CALLBACK:
1545 case CURLE_WRITE_ERROR:
1546 // Error set by callback
1547 break;
1548 case CURLE_SEND_ERROR:
1549 // The curl error CURLE_SEND_ERROR can be seen when a cache is misbehaving
1550 // and closing connections before the http request send is completed.
1551 // Handle this error, treating it as a short transfer error.
1552 info->SetErrorCode((info->proxy() == "DIRECT") ? kFailHostShortTransfer
1553 : kFailProxyShortTransfer);
1554 break;
1555 default:
1556 LogCvmfs(kLogDownload, kLogSyslogErr,
1557 "(manager '%s' - id %" PRId64 ") "
1558 "unexpected curl error (%d) while trying to fetch %s",
1559 name_.c_str(), info->id(), curl_error, info->url()->c_str());
1560 info->SetErrorCode(kFailOther);
1561 break;
1562 }
1563
1564 std::vector<std::string> *host_chain;
1565 unsigned char num_used_hosts;
1566
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4436 times.
4436 if (was_metalink) {
1567 host_chain = opt_metalink_.chain;
1568 num_used_hosts = info->num_used_metalinks();
1569 } else {
1570 4436 host_chain = opt_host_.chain;
1571 4436 num_used_hosts = info->num_used_hosts();
1572 }
1573
1574 // Determination if download should be repeated
1575 4436 bool try_again = false;
1576
1/2
✓ Branch 1 taken 4436 times.
✗ Branch 2 not taken.
4436 bool same_url_retry = CanRetry(info);
1577
2/2
✓ Branch 1 taken 495 times.
✓ Branch 2 taken 3941 times.
4436 if (info->error_code() != kFailOk) {
1578 495 const MutexLockGuard m(lock_options_);
1579
2/2
✓ Branch 1 taken 46 times.
✓ Branch 2 taken 449 times.
495 if (info->error_code() == kFailBadData) {
1580
2/2
✓ Branch 1 taken 23 times.
✓ Branch 2 taken 23 times.
46 if (!info->nocache()) {
1581 23 try_again = true;
1582 } else {
1583 // Make it a host failure
1584
1/2
✓ Branch 4 taken 23 times.
✗ Branch 5 not taken.
23 LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
1585 "(manager '%s' - id %" PRId64 ") "
1586 "data corruption with no-cache header, try another %s",
1587 name_.c_str(), info->id(), typ.c_str());
1588
1589 23 info->SetErrorCode(kFailHostHttp);
1590 }
1591 }
1592 495 if (same_url_retry
1593
5/6
✓ Branch 0 taken 270 times.
✓ Branch 1 taken 225 times.
✓ Branch 3 taken 270 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 249 times.
✓ Branch 6 taken 246 times.
765 || (((info->error_code() == kFailHostResolve)
1594
2/2
✓ Branch 2 taken 72 times.
✓ Branch 3 taken 198 times.
270 || IsHostTransferError(info->error_code())
1595
2/2
✓ Branch 1 taken 23 times.
✓ Branch 2 taken 49 times.
72 || (info->error_code() == kFailHostHttp))
1596
3/4
✓ Branch 1 taken 123 times.
✓ Branch 2 taken 98 times.
✓ Branch 3 taken 123 times.
✗ Branch 4 not taken.
221 && info->probe_hosts() && host_chain
1597
2/2
✓ Branch 1 taken 24 times.
✓ Branch 2 taken 99 times.
123 && (num_used_hosts < host_chain->size()))) {
1598 249 try_again = true;
1599 }
1600 495 if (same_url_retry
1601
5/6
✓ Branch 0 taken 270 times.
✓ Branch 1 taken 225 times.
✓ Branch 3 taken 270 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 233 times.
✓ Branch 6 taken 262 times.
765 || (((info->error_code() == kFailProxyResolve)
1602
2/2
✓ Branch 2 taken 262 times.
✓ Branch 3 taken 8 times.
270 || IsProxyTransferError(info->error_code())
1603
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 262 times.
262 || (info->error_code() == kFailProxyHttp)))) {
1604
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 233 times.
233 if (sharding_policy_.UseCount() > 0) { // sharding policy
1605 try_again = true;
1606 same_url_retry = false;
1607 } else { // no sharding
1608 233 try_again = true;
1609 // If all proxies failed, do a next round with the next host
1610
4/6
✓ Branch 0 taken 8 times.
✓ Branch 1 taken 225 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 8 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 233 times.
233 if (!same_url_retry && (info->num_used_proxies() >= opt_num_proxies_)) {
1611 // Check if this can be made a host fail-over
1612 if (info->probe_hosts() && host_chain
1613 && (num_used_hosts < host_chain->size())) {
1614 // reset proxy group if not already performed by other handle
1615 if (opt_proxy_groups_) {
1616 if ((opt_proxy_groups_current_ > 0)
1617 || (opt_proxy_groups_current_burned_ > 0)) {
1618 opt_proxy_groups_current_ = 0;
1619 opt_timestamp_backup_proxies_ = 0;
1620 const std::string msg = "reset proxies for " + typ
1621 + " failover";
1622 RebalanceProxiesUnlocked(msg);
1623 }
1624 }
1625
1626 // Make it a host failure
1627 LogCvmfs(kLogDownload, kLogDebug,
1628 "(manager '%s' - id %" PRId64 ") make it a %s failure",
1629 name_.c_str(), info->id(), typ.c_str());
1630 info->SetNumUsedProxies(1);
1631 info->SetErrorCode(kFailHostAfterProxy);
1632 } else {
1633 if (failover_indefinitely_) {
1634 // Instead of giving up, reset the num_used_proxies counter,
1635 // switch proxy and try again
1636 LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
1637 "(manager '%s' - id %" PRId64 ") "
1638 "VerifyAndFinalize() would fail the download here. "
1639 "Instead switch proxy and retry download. "
1640 "typ=%s "
1641 "info->probe_hosts=%d host_chain=%p num_used_hosts=%d "
1642 "host_chain->size()=%lu same_url_retry=%d "
1643 "info->num_used_proxies=%d opt_num_proxies_=%d",
1644 name_.c_str(), info->id(), typ.c_str(),
1645 static_cast<int>(info->probe_hosts()), host_chain,
1646 num_used_hosts, host_chain ? host_chain->size() : -1,
1647 static_cast<int>(same_url_retry),
1648 info->num_used_proxies(), opt_num_proxies_);
1649 info->SetNumUsedProxies(1);
1650 RebalanceProxiesUnlocked(
1651 "download failed - failover indefinitely");
1652 try_again = !Interrupted(fqrn_, info);
1653 } else {
1654 try_again = false;
1655 }
1656 }
1657 } // Make a proxy failure a host failure
1658 } // Proxy failure assumed
1659 } // end !sharding
1660 495 }
1661
1662
2/2
✓ Branch 0 taken 280 times.
✓ Branch 1 taken 4156 times.
4436 if (try_again) {
1663
1/2
✓ Branch 3 taken 280 times.
✗ Branch 4 not taken.
280 LogCvmfs(kLogDownload, kLogDebug,
1664 "(manager '%s' - id %" PRId64 ") "
1665 "Trying again on same curl handle, same url: %d, "
1666 "error code %d no-cache %d",
1667 280 name_.c_str(), info->id(), same_url_retry, info->error_code(),
1668 280 info->nocache());
1669 // Reset internal state and destination. In parallel-decompress mode the
1670 // sink and zstream are owned by the caller thread (it pops and decompresses
1671 // the queued chunks). Resetting them here would race the caller and, worse,
1672 // leave the bytes of this failed attempt in the tube to be decompressed
1673 // into the output. Defer the reset by enqueuing an ordered marker; the
1674 // caller discards this attempt's bytes when it pops it (see below).
1675
1/2
✓ Branch 1 taken 280 times.
✗ Branch 2 not taken.
280 const bool defer_reset = info->IsValidDataTube();
1676
1/2
✓ Branch 0 taken 280 times.
✗ Branch 1 not taken.
280 if (!defer_reset) {
1677
4/8
✓ Branch 1 taken 280 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 280 times.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 280 times.
✗ Branch 9 not taken.
✓ Branch 10 taken 280 times.
280 if (info->sink() != NULL && info->sink()->Reset() != 0) {
1678 info->SetErrorCode(kFailLocalIO);
1679 goto verify_and_finalize_stop;
1680 }
1681 }
1682
6/8
✓ Branch 1 taken 8 times.
✓ Branch 2 taken 272 times.
✓ Branch 5 taken 8 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 8 times.
✗ Branch 8 not taken.
✓ Branch 9 taken 8 times.
✓ Branch 10 taken 272 times.
280 if (info->interrupt_cue() && info->interrupt_cue()->IsCanceled()) {
1683 8 info->SetErrorCode(kFailCanceled);
1684 8 goto verify_and_finalize_stop;
1685 }
1686
1687 // The hash context is updated on this (MainDownload) thread in
1688 // CallbackCurlData(), so it is reset here regardless of the decompress mode.
1689
2/2
✓ Branch 1 taken 152 times.
✓ Branch 2 taken 120 times.
272 if (info->expected_hash()) {
1690
1/2
✓ Branch 2 taken 152 times.
✗ Branch 3 not taken.
152 shash::Init(info->hash_context());
1691 }
1692
5/6
✓ Branch 1 taken 152 times.
✓ Branch 2 taken 120 times.
✓ Branch 3 taken 152 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 152 times.
✓ Branch 6 taken 120 times.
272 if (info->compressed() && !defer_reset) {
1693
1/2
✓ Branch 2 taken 152 times.
✗ Branch 3 not taken.
152 zlib::DecompressInit(info->GetZstreamPtr());
1694 }
1695
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 272 times.
272 if (defer_reset) {
1696 info->GetDataTubePtr()->EnqueueBack(new DataTubeElement(kActionReset));
1697 }
1698
1699
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 272 times.
272 if (sharding_policy_.UseCount() > 0) { // sharding policy
1700 ReleaseCredential(info);
1701 SetUrlOptions(info);
1702 } else { // no sharding policy
1703
1/2
✓ Branch 1 taken 272 times.
✗ Branch 2 not taken.
272 SetRegularCache(info);
1704
1705 // Failure handling
1706 272 bool switch_proxy = false;
1707 272 bool switch_host = false;
1708
2/4
✓ Branch 1 taken 23 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 249 times.
272 switch (info->error_code()) {
1709 23 case kFailBadData:
1710
1/2
✓ Branch 1 taken 23 times.
✗ Branch 2 not taken.
23 SetNocache(info);
1711 23 break;
1712 case kFailProxyResolve:
1713 case kFailProxyHttp:
1714 switch_proxy = true;
1715 break;
1716 case kFailHostResolve:
1717 case kFailHostHttp:
1718 case kFailHostAfterProxy:
1719 switch_host = true;
1720 break;
1721 249 default:
1722
2/2
✓ Branch 2 taken 8 times.
✓ Branch 3 taken 241 times.
249 if (IsProxyTransferError(info->error_code())) {
1723
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8 times.
8 if (same_url_retry) {
1724 Backoff(info);
1725 } else {
1726 8 switch_proxy = true;
1727 }
1728
1/2
✓ Branch 2 taken 241 times.
✗ Branch 3 not taken.
241 } else if (IsHostTransferError(info->error_code())) {
1729
2/2
✓ Branch 0 taken 225 times.
✓ Branch 1 taken 16 times.
241 if (same_url_retry) {
1730
1/2
✓ Branch 1 taken 225 times.
✗ Branch 2 not taken.
225 Backoff(info);
1731 } else {
1732 16 switch_host = true;
1733 }
1734 } else {
1735 // No other errors expected when retrying
1736 PANIC(NULL);
1737 }
1738 }
1739
2/2
✓ Branch 0 taken 8 times.
✓ Branch 1 taken 264 times.
272 if (switch_proxy) {
1740
1/2
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
8 ReleaseCredential(info);
1741
1/2
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
8 SwitchProxy(info);
1742 8 info->SetNumUsedProxies(info->num_used_proxies() + 1);
1743
1/2
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
8 SetUrlOptions(info);
1744 }
1745
2/2
✓ Branch 0 taken 16 times.
✓ Branch 1 taken 256 times.
272 if (switch_host) {
1746
1/2
✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
16 ReleaseCredential(info);
1747
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 16 times.
16 if (was_metalink) {
1748 SwitchMetalink(info);
1749 info->SetNumUsedMetalinks(num_used_hosts + 1);
1750 } else {
1751
1/2
✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
16 SwitchHost(info);
1752 16 info->SetNumUsedHosts(num_used_hosts + 1);
1753 }
1754
1/2
✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
16 SetUrlOptions(info);
1755 }
1756 } // end !sharding
1757
1758
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 272 times.
272 if (failover_indefinitely_) {
1759 // try again, breaking if there's a cvmfs reload happening and we are in a
1760 // proxy failover. This will EIO the call application.
1761 return !Interrupted(fqrn_, info);
1762 }
1763 272 return true; // try again
1764 }
1765
1766 4156 verify_and_finalize_stop:
1767 // Finalize, flush destination file.
1768
1/2
✓ Branch 1 taken 4164 times.
✗ Branch 2 not taken.
4164 ReleaseCredential(info);
1769
1770 // In parallel-decompress mode (data_tube_ valid) the caller has not yet
1771 // pushed bytes through the sink / zstream — those operations happen on
1772 // the caller thread when it pops the queued chunks. Flushing the sink
1773 // and tearing down the zstream here would race the caller's decompress
1774 // and corrupt the output. Defer both to the caller.
1775
1/2
✓ Branch 1 taken 4164 times.
✗ Branch 2 not taken.
4164 const bool defer_finalize = info->IsValidDataTube();
1776
1/2
✓ Branch 0 taken 4164 times.
✗ Branch 1 not taken.
4164 if (!defer_finalize) {
1777
4/8
✓ Branch 1 taken 4164 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 4164 times.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 4164 times.
✗ Branch 9 not taken.
✓ Branch 10 taken 4164 times.
4164 if (info->sink() != NULL && info->sink()->Flush() != 0) {
1778 info->SetErrorCode(kFailLocalIO);
1779 }
1780
2/2
✓ Branch 1 taken 3497 times.
✓ Branch 2 taken 667 times.
4164 if (info->compressed())
1781
1/2
✓ Branch 2 taken 3497 times.
✗ Branch 3 not taken.
3497 zlib::DecompressFini(info->GetZstreamPtr());
1782 }
1783
1784
1/2
✓ Branch 1 taken 4164 times.
✗ Branch 2 not taken.
4164 if (info->headers()) {
1785
1/2
✓ Branch 2 taken 4164 times.
✗ Branch 3 not taken.
4164 header_lists_->PutList(info->headers());
1786 4164 info->SetHeaders(NULL);
1787 }
1788
1789 4164 return false; // stop transfer and return to Fetch()
1790 4436 }
1791
1792 2994 DownloadManager::~DownloadManager() {
1793 // cleaned up fini
1794
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2994 times.
2994 if (sharding_policy_.UseCount() > 0) {
1795 sharding_policy_.Reset();
1796 }
1797
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2994 times.
2994 if (health_check_.UseCount() > 0) {
1798 if (health_check_.Unique()) {
1799 LogCvmfs(kLogDownload, kLogDebug,
1800 "(manager '%s') Stopping healthcheck thread", name_.c_str());
1801 health_check_->StopHealthcheck();
1802 }
1803 health_check_.Reset();
1804 }
1805
1806
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2994 times.
2994 if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1807 // Shutdown I/O thread
1808 pipe_terminate_->Write(kPipeTerminateSignal);
1809 pthread_join(thread_download_, NULL);
1810 // All handles are removed from the multi stack
1811 pipe_terminate_.Destroy();
1812 pipe_jobs_.Destroy();
1813 }
1814
1815 5988 for (set<CURL *>::iterator i = pool_handles_idle_->begin(),
1816 2994 iEnd = pool_handles_idle_->end();
1817
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2994 times.
2994 i != iEnd;
1818 ++i) {
1819 curl_easy_cleanup(*i);
1820 }
1821
1822
1/2
✓ Branch 0 taken 2994 times.
✗ Branch 1 not taken.
2994 delete pool_handles_idle_;
1823
1/2
✓ Branch 0 taken 2994 times.
✗ Branch 1 not taken.
2994 delete pool_handles_inuse_;
1824 2994 curl_multi_cleanup(curl_multi_);
1825
1826
1/2
✓ Branch 0 taken 2994 times.
✗ Branch 1 not taken.
2994 delete header_lists_;
1827
1/2
✓ Branch 0 taken 2994 times.
✗ Branch 1 not taken.
2994 if (user_agent_)
1828 2994 free(user_agent_);
1829
1830
1/2
✓ Branch 0 taken 2994 times.
✗ Branch 1 not taken.
2994 delete counters_;
1831
2/2
✓ Branch 0 taken 1266 times.
✓ Branch 1 taken 1728 times.
2994 delete opt_host_.chain;
1832
2/2
✓ Branch 0 taken 1266 times.
✓ Branch 1 taken 1728 times.
2994 delete opt_host_chain_rtt_;
1833
2/2
✓ Branch 0 taken 1154 times.
✓ Branch 1 taken 1840 times.
2994 delete opt_proxy_groups_;
1834
1835 2994 curl_global_cleanup();
1836
1/2
✓ Branch 0 taken 2994 times.
✗ Branch 1 not taken.
2994 delete resolver_;
1837
1838 // old destructor
1839 2994 pthread_mutex_destroy(lock_options_);
1840 2994 pthread_mutex_destroy(lock_synchronous_mode_);
1841 2994 free(lock_options_);
1842 2994 free(lock_synchronous_mode_);
1843 2994 }
1844
1845 2995 void DownloadManager::InitHeaders() {
1846 // User-Agent
1847
1/2
✓ Branch 2 taken 2995 times.
✗ Branch 3 not taken.
2995 string cernvm_id = "User-Agent: cvmfs ";
1848 #ifdef CVMFS_LIBCVMFS
1849
1/2
✓ Branch 1 taken 2995 times.
✗ Branch 2 not taken.
2995 cernvm_id += "libcvmfs ";
1850 #else
1851 cernvm_id += "Fuse ";
1852 #endif
1853
2/4
✓ Branch 2 taken 2995 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2995 times.
✗ Branch 6 not taken.
2995 cernvm_id += string(CVMFS_VERSION);
1854
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2995 times.
2995 if (getenv("CERNVM_UUID") != NULL) {
1855 cernvm_id += " "
1856 + sanitizer::InputSanitizer("az AZ 09 -")
1857 .Filter(getenv("CERNVM_UUID"));
1858 }
1859 2995 user_agent_ = strdup(cernvm_id.c_str());
1860
1861
1/2
✓ Branch 1 taken 2995 times.
✗ Branch 2 not taken.
2995 header_lists_ = new HeaderLists();
1862
1863
1/2
✓ Branch 1 taken 2995 times.
✗ Branch 2 not taken.
2995 default_headers_ = header_lists_->GetList("Connection: Keep-Alive");
1864
1/2
✓ Branch 1 taken 2995 times.
✗ Branch 2 not taken.
2995 header_lists_->AppendHeader(default_headers_, "Pragma:");
1865
1/2
✓ Branch 1 taken 2995 times.
✗ Branch 2 not taken.
2995 header_lists_->AppendHeader(default_headers_, user_agent_);
1866 2995 }
1867
1868 2995 DownloadManager::DownloadManager(const unsigned max_pool_handles,
1869 const perf::StatisticsTemplate &statistics,
1870 2995 const std::string &name)
1871 2995 : prng_(Prng())
1872 2995 , pool_handles_idle_(new set<CURL *>)
1873 2995 , pool_handles_inuse_(new set<CURL *>)
1874 2995 , pool_max_handles_(max_pool_handles)
1875 2995 , pipe_terminate_(NULL)
1876
1/2
✓ Branch 1 taken 2995 times.
✗ Branch 2 not taken.
2995 , pipe_jobs_(NULL)
1877 2995 , watch_fds_(NULL)
1878 2995 , watch_fds_size_(0)
1879 2995 , watch_fds_inuse_(0)
1880 2995 , watch_fds_max_(4 * max_pool_handles)
1881 2995 , opt_timeout_proxy_(5)
1882 2995 , opt_timeout_direct_(10)
1883 2995 , opt_low_speed_limit_(1024)
1884 2995 , opt_max_retries_(0)
1885 2995 , opt_backoff_init_ms_(0)
1886 2995 , opt_backoff_max_ms_(0)
1887 2995 , enable_info_header_(false)
1888 2995 , opt_ipv4_only_(false)
1889 2995 , follow_redirects_(false)
1890 2995 , ignore_signature_failures_(false)
1891 2995 , enable_http_tracing_(false)
1892 2995 , opt_metalink_(NULL, 0, 0, 0)
1893 2995 , opt_metalink_timestamp_link_(0)
1894 2995 , opt_host_(NULL, 0, 0, 0)
1895 2995 , opt_host_chain_rtt_(NULL)
1896 2995 , opt_proxy_groups_(NULL)
1897 2995 , opt_proxy_groups_current_(0)
1898 2995 , opt_proxy_groups_current_burned_(0)
1899 2995 , opt_proxy_groups_fallback_(0)
1900 2995 , opt_num_proxies_(0)
1901 2995 , opt_proxy_shard_(false)
1902 2995 , failover_indefinitely_(false)
1903
1/2
✓ Branch 1 taken 2995 times.
✗ Branch 2 not taken.
2995 , name_(name)
1904 2995 , opt_ip_preference_(dns::kIpPreferSystem)
1905 2995 , opt_timestamp_backup_proxies_(0)
1906 2995 , opt_timestamp_failover_proxies_(0)
1907 2995 , opt_proxy_groups_reset_after_(0)
1908 2995 , credentials_attachment_(NULL)
1909
4/8
✓ Branch 13 taken 2995 times.
✗ Branch 14 not taken.
✓ Branch 16 taken 2995 times.
✗ Branch 17 not taken.
✓ Branch 19 taken 2995 times.
✗ Branch 20 not taken.
✓ Branch 23 taken 2995 times.
✗ Branch 24 not taken.
8985 , counters_(new Counters(statistics)) {
1910 2995 atomic_init32(&multi_threaded_);
1911
1912 2995 lock_options_ = reinterpret_cast<pthread_mutex_t *>(
1913 2995 smalloc(sizeof(pthread_mutex_t)));
1914 2995 int retval = pthread_mutex_init(lock_options_, NULL);
1915
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2995 times.
2995 assert(retval == 0);
1916 2995 lock_synchronous_mode_ = reinterpret_cast<pthread_mutex_t *>(
1917 2995 smalloc(sizeof(pthread_mutex_t)));
1918 2995 retval = pthread_mutex_init(lock_synchronous_mode_, NULL);
1919
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2995 times.
2995 assert(retval == 0);
1920
1921
1/2
✓ Branch 1 taken 2995 times.
✗ Branch 2 not taken.
2995 retval = curl_global_init(CURL_GLOBAL_ALL);
1922
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2995 times.
2995 assert(retval == CURLE_OK);
1923
1924
1/2
✓ Branch 1 taken 2995 times.
✗ Branch 2 not taken.
2995 InitHeaders();
1925
1926
1/2
✓ Branch 1 taken 2995 times.
✗ Branch 2 not taken.
2995 curl_multi_ = curl_multi_init();
1927
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2995 times.
2995 assert(curl_multi_ != NULL);
1928
1/2
✓ Branch 1 taken 2995 times.
✗ Branch 2 not taken.
2995 curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION, CallbackCurlSocket);
1929
1/2
✓ Branch 1 taken 2995 times.
✗ Branch 2 not taken.
2995 curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1930 static_cast<void *>(this));
1931
1/2
✓ Branch 1 taken 2995 times.
✗ Branch 2 not taken.
2995 curl_multi_setopt(curl_multi_, CURLMOPT_MAXCONNECTS, watch_fds_max_);
1932
1/2
✓ Branch 1 taken 2995 times.
✗ Branch 2 not taken.
2995 curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1933 pool_max_handles_);
1934
1935 2995 prng_.InitLocaltime();
1936
1937 // Name resolving
1938 2995 if ((getenv("CVMFS_IPV4_ONLY") != NULL)
1939
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 2995 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 2995 times.
2995 && (strlen(getenv("CVMFS_IPV4_ONLY")) > 0)) {
1940 opt_ipv4_only_ = true;
1941 }
1942
1/2
✓ Branch 1 taken 2995 times.
✗ Branch 2 not taken.
2995 resolver_ = dns::NormalResolver::Create(opt_ipv4_only_, kDnsDefaultRetries,
1943 kDnsDefaultTimeoutMs);
1944
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2995 times.
2995 assert(resolver_);
1945 2995 }
1946
1947 /**
1948 * Spawns the I/O worker thread and switches the module in multi-threaded mode.
1949 * No way back except Fini(); Init();
1950 */
1951 void DownloadManager::Spawn() {
1952 pipe_terminate_ = new Pipe<kPipeThreadTerminator>();
1953 pipe_jobs_ = new Pipe<kPipeDownloadJobs>();
1954
1955 const int retval = pthread_create(&thread_download_, NULL, MainDownload,
1956 static_cast<void *>(this));
1957 assert(retval == 0);
1958
1959 atomic_inc32(&multi_threaded_);
1960
1961 if (health_check_.UseCount() > 0) {
1962 LogCvmfs(kLogDownload, kLogDebug,
1963 "(manager '%s') Starting healthcheck thread", name_.c_str());
1964 health_check_->StartHealthcheck();
1965 }
1966 }
1967
1968
1969 /**
1970 * Downloads data from an insecure outside channel (currently HTTP or file).
1971 */
1972 4164 Failures DownloadManager::Fetch(JobInfo *info) {
1973
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4164 times.
4164 assert(info != NULL);
1974
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4164 times.
4164 assert(info->url() != NULL);
1975
1976 Failures result;
1977
1/2
✓ Branch 1 taken 4164 times.
✗ Branch 2 not taken.
4164 result = PrepareDownloadDestination(info);
1978
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4164 times.
4164 if (result != kFailOk)
1979 return result;
1980
1981
2/2
✓ Branch 1 taken 3498 times.
✓ Branch 2 taken 666 times.
4164 if (info->expected_hash()) {
1982 3498 const shash::Algorithms algorithm = info->expected_hash()->algorithm;
1983 3498 info->GetHashContextPtr()->algorithm = algorithm;
1984
1/2
✓ Branch 1 taken 3498 times.
✗ Branch 2 not taken.
3498 info->GetHashContextPtr()->size = shash::GetContextSize(algorithm);
1985 3498 info->GetHashContextPtr()->buffer = alloca(info->hash_context().size);
1986 }
1987
1988 // In case JobInfo object is being reused
1989
2/4
✓ Branch 2 taken 4164 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4164 times.
✗ Branch 6 not taken.
4164 info->SetLink("");
1990
1991 // Prepare cvmfs-info: header, allocate string on the stack
1992 4164 info->SetInfoHeader(NULL);
1993
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4164 times.
4164 if (enable_info_header_) {
1994 const string header_info = info->GetInfoHeaderContents(info_header_template_);
1995 if (header_info != "") {
1996 const char * const header_name = "cvmfs-info: ";
1997 const size_t header_name_len = strlen(header_name);
1998 const size_t info_len = header_info.length();
1999 char *buf = static_cast<char *>(alloca(header_name_len + info_len + 1));
2000 memcpy(buf, header_name, header_name_len);
2001 memcpy(buf + header_name_len, header_info.c_str(), info_len);
2002 buf[header_name_len + info_len] = '\0';
2003 info->SetInfoHeader(buf);
2004 }
2005 }
2006
2007
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4164 times.
4164 if (enable_http_tracing_) {
2008 const std::string str_pid = "X-CVMFS-PID: " + StringifyInt(info->pid());
2009 const std::string str_gid = "X-CVMFS-GID: " + StringifyUint(info->gid());
2010 const std::string str_uid = "X-CVMFS-UID: " + StringifyUint(info->uid());
2011
2012 // will be auto freed at the end of this function Fetch(JobInfo *info)
2013 info->SetTracingHeaderPid(static_cast<char *>(alloca(str_pid.size() + 1)));
2014 info->SetTracingHeaderGid(static_cast<char *>(alloca(str_gid.size() + 1)));
2015 info->SetTracingHeaderUid(static_cast<char *>(alloca(str_uid.size() + 1)));
2016
2017 memcpy(info->tracing_header_pid(), str_pid.c_str(), str_pid.size() + 1);
2018 memcpy(info->tracing_header_gid(), str_gid.c_str(), str_gid.size() + 1);
2019 memcpy(info->tracing_header_uid(), str_uid.c_str(), str_uid.size() + 1);
2020 }
2021
2022
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4164 times.
4164 if (atomic_xadd32(&multi_threaded_, 0) == 1) {
2023 if (!info->IsValidPipeJobResults()) {
2024 info->CreatePipeJobResults();
2025 }
2026 if (!info->IsValidDataTube()) {
2027 info->CreateDataTube();
2028 }
2029
2030 // LogCvmfs(kLogDownload, kLogDebug, "send job to thread, pipe %d %d",
2031 // info->wait_at[0], info->wait_at[1]);
2032 pipe_jobs_->Write<JobInfo *>(info);
2033
2034 // Decompress / copy chunks on this caller thread (in parallel across
2035 // concurrent Fetch() callers) instead of on the single MainDownload
2036 // thread. Track decompression errors locally and apply them at the end
2037 // — VerifyAndFinalize already ran by the time kActionStop arrives.
2038 Failures decompress_err = kFailOk;
2039 do {
2040 DataTubeElement *ele = info->GetDataTubePtr()->PopFront();
2041
2042 if (ele->action == kActionStop) {
2043 delete ele;
2044 break;
2045 }
2046
2047 if (ele->action == kActionReset) {
2048 // The chunks popped so far belonged to a download attempt that was
2049 // superseded by a retry / host fail-over (VerifyAndFinalize enqueued
2050 // this marker). Discard their — possibly corrupt — decompressed output
2051 // and start fresh so the next attempt's bytes are processed cleanly.
2052 if (info->compressed()) {
2053 zlib::DecompressFini(info->GetZstreamPtr());
2054 zlib::DecompressInit(info->GetZstreamPtr());
2055 }
2056 if (info->sink() != NULL && info->sink()->Reset() != 0) {
2057 decompress_err = kFailLocalIO;
2058 } else {
2059 decompress_err = kFailOk;
2060 }
2061 delete ele;
2062 continue;
2063 }
2064
2065 if (decompress_err == kFailOk && ele->size > 0) {
2066 if (info->compressed()) {
2067 const zlib::StreamStates retval = zlib::DecompressZStream2Sink(
2068 ele->data, static_cast<int64_t>(ele->size),
2069 info->GetZstreamPtr(), info->sink());
2070 if (retval == zlib::kStreamDataError) {
2071 LogCvmfs(kLogDownload, kLogSyslogErr,
2072 "(id %" PRId64 ") failed to decompress %s",
2073 info->id(), info->url()->c_str());
2074 decompress_err = kFailBadData;
2075 } else if (retval == zlib::kStreamIOError) {
2076 LogCvmfs(kLogDownload, kLogSyslogErr,
2077 "(id %" PRId64 ") decompressing %s, local IO error",
2078 info->id(), info->url()->c_str());
2079 decompress_err = kFailLocalIO;
2080 }
2081 } else {
2082 const int64_t written = info->sink()->Write(ele->data, ele->size);
2083 if (written < 0 ||
2084 static_cast<uint64_t>(written) != ele->size) {
2085 LogCvmfs(kLogDownload, kLogDebug,
2086 "(id %" PRId64 ") sink write failed for %s (%ld of %zu)",
2087 info->id(), info->url()->c_str(),
2088 static_cast<long>(written), ele->size);
2089 decompress_err = kFailLocalIO;
2090 }
2091 }
2092 }
2093 delete ele;
2094 } while (true);
2095
2096 info->GetPipeJobResultPtr()->Read<download::Failures>(&result);
2097
2098 // Caller-side finalize (deferred from VerifyAndFinalize). Must run after
2099 // all kActionDecompress chunks have been popped and decompressed.
2100 if (result == kFailOk && decompress_err == kFailOk) {
2101 if (info->sink() != NULL && info->sink()->Flush() != 0) {
2102 decompress_err = kFailLocalIO;
2103 }
2104 }
2105 if (info->compressed()) {
2106 zlib::DecompressFini(info->GetZstreamPtr());
2107 }
2108 if (result == kFailOk && decompress_err != kFailOk) {
2109 result = decompress_err;
2110 info->SetErrorCode(decompress_err);
2111 if (info->sink() != NULL) {
2112 info->sink()->Purge();
2113 }
2114 }
2115 // LogCvmfs(kLogDownload, kLogDebug, "got result %d", result);
2116 } else {
2117 4164 const MutexLockGuard l(lock_synchronous_mode_);
2118
1/2
✓ Branch 1 taken 4164 times.
✗ Branch 2 not taken.
4164 CURL *handle = AcquireCurlHandle();
2119
1/2
✓ Branch 1 taken 4164 times.
✗ Branch 2 not taken.
4164 InitializeRequest(info, handle);
2120
1/2
✓ Branch 1 taken 4164 times.
✗ Branch 2 not taken.
4164 SetUrlOptions(info);
2121 // curl_easy_setopt(handle, CURLOPT_VERBOSE, 1);
2122 int retval;
2123 do {
2124
1/2
✓ Branch 1 taken 4436 times.
✗ Branch 2 not taken.
4436 retval = curl_easy_perform(handle);
2125 4436 perf::Inc(counters_->n_requests);
2126 double elapsed;
2127
1/2
✓ Branch 1 taken 4436 times.
✗ Branch 2 not taken.
4436 if (curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &elapsed)
2128
1/2
✓ Branch 0 taken 4436 times.
✗ Branch 1 not taken.
4436 == CURLE_OK) {
2129 4436 perf::Xadd(counters_->sz_transfer_time,
2130 4436 static_cast<int64_t>(elapsed * 1000));
2131 }
2132
3/4
✓ Branch 1 taken 4436 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 272 times.
✓ Branch 4 taken 4164 times.
4436 } while (VerifyAndFinalize(retval, info));
2133 4164 result = info->error_code();
2134 // Prevent the handle from being added to the idle list, which
2135 // avoids that it is mistakenly picked up by the multi handle later
2136 // in multi-threaded context. This, in turn, would fail to wait for
2137 // for resolver threads that meanwhile vanished due to a fork()
2138 // in Daemonize()
2139
1/2
✓ Branch 2 taken 4164 times.
✗ Branch 3 not taken.
4164 ReleaseCurlHandle(info->curl_handle(), false /* allow_reuse */);
2140 4164 }
2141
2142
2/2
✓ Branch 0 taken 223 times.
✓ Branch 1 taken 3941 times.
4164 if (result != kFailOk) {
2143
1/2
✓ Branch 4 taken 223 times.
✗ Branch 5 not taken.
223 LogCvmfs(kLogDownload, kLogDebug,
2144 "(manager '%s' - id %" PRId64 ") "
2145 "download failed (error %d - %s)",
2146 name_.c_str(), info->id(), result, Code2Ascii(result));
2147
2148
1/2
✓ Branch 1 taken 223 times.
✗ Branch 2 not taken.
223 if (info->sink() != NULL) {
2149
1/2
✓ Branch 2 taken 223 times.
✗ Branch 3 not taken.
223 info->sink()->Purge();
2150 }
2151 }
2152
2153 4164 return result;
2154 }
2155
2156
2157 /**
2158 * Used by the client to connect the authz session manager to the download
2159 * manager.
2160 */
2161 675 void DownloadManager::SetCredentialsAttachment(CredentialsAttachment *ca) {
2162 675 const MutexLockGuard m(lock_options_);
2163 675 credentials_attachment_ = ca;
2164 675 }
2165
2166 /**
2167 * Gets the DNS sever.
2168 */
2169 std::string DownloadManager::GetDnsServer() const { return opt_dns_server_; }
2170
2171 /**
2172 * Sets a DNS server. Only for testing as it cannot be reverted to the system
2173 * default.
2174 */
2175 void DownloadManager::SetDnsServer(const string &address) {
2176 if (!address.empty()) {
2177 const MutexLockGuard m(lock_options_);
2178 opt_dns_server_ = address;
2179 assert(!opt_dns_server_.empty());
2180
2181 vector<string> servers;
2182 servers.push_back(address);
2183 const bool retval = resolver_->SetResolvers(servers);
2184 assert(retval);
2185 }
2186 LogCvmfs(kLogDownload, kLogSyslog, "(manager '%s') set nameserver to %s",
2187 name_.c_str(), address.c_str());
2188 }
2189
2190
2191 /**
2192 * Sets the DNS query timeout parameters.
2193 */
2194 1256 void DownloadManager::SetDnsParameters(const unsigned retries,
2195 const unsigned timeout_ms) {
2196 1256 const MutexLockGuard m(lock_options_);
2197 1256 if ((resolver_->retries() == retries)
2198
3/6
✓ Branch 0 taken 1256 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 1256 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 1256 times.
✗ Branch 6 not taken.
1256 && (resolver_->timeout_ms() == timeout_ms)) {
2199 1256 return;
2200 }
2201 delete resolver_;
2202 resolver_ = NULL;
2203 resolver_ = dns::NormalResolver::Create(opt_ipv4_only_, retries, timeout_ms);
2204 assert(resolver_);
2205
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1256 times.
1256 }
2206
2207
2208 1256 void DownloadManager::SetDnsTtlLimits(const unsigned min_seconds,
2209 const unsigned max_seconds) {
2210 1256 const MutexLockGuard m(lock_options_);
2211 1256 resolver_->set_min_ttl(min_seconds);
2212 1256 resolver_->set_max_ttl(max_seconds);
2213 1256 }
2214
2215
2216 void DownloadManager::SetIpPreference(dns::IpPreference preference) {
2217 const MutexLockGuard m(lock_options_);
2218 opt_ip_preference_ = preference;
2219 }
2220
2221
2222 /**
2223 * Sets two timeout values for proxied and for direct connections, respectively.
2224 * The timeout counts for all sorts of connection phases,
2225 * DNS, HTTP connect, etc.
2226 */
2227 2077 void DownloadManager::SetTimeout(const unsigned seconds_proxy,
2228 const unsigned seconds_direct) {
2229 2077 const MutexLockGuard m(lock_options_);
2230 2077 opt_timeout_proxy_ = seconds_proxy;
2231 2077 opt_timeout_direct_ = seconds_direct;
2232 2077 }
2233
2234
2235 /**
2236 * Sets contains the average transfer speed in bytes per second that the
2237 * transfer should be below during CURLOPT_LOW_SPEED_TIME seconds for libcurl to
2238 * consider it to be too slow and abort. Only effective for new connections.
2239 */
2240 void DownloadManager::SetLowSpeedLimit(const unsigned low_speed_limit) {
2241 const MutexLockGuard m(lock_options_);
2242 opt_low_speed_limit_ = low_speed_limit;
2243 }
2244
2245
2246 /**
2247 * Receives the currently active timeout values.
2248 */
2249 565 void DownloadManager::GetTimeout(unsigned *seconds_proxy,
2250 unsigned *seconds_direct) {
2251 565 const MutexLockGuard m(lock_options_);
2252 565 *seconds_proxy = opt_timeout_proxy_;
2253 565 *seconds_direct = opt_timeout_direct_;
2254 565 }
2255
2256
2257 /**
2258 * Parses a list of ';'-separated hosts for the metalink chain. The empty
2259 * string removes the metalink list.
2260 */
2261 void DownloadManager::SetMetalinkChain(const string &metalink_list) {
2262 SetMetalinkChain(SplitString(metalink_list, ';'));
2263 }
2264
2265
2266 void DownloadManager::SetMetalinkChain(
2267 const std::vector<std::string> &metalink_list) {
2268 const MutexLockGuard m(lock_options_);
2269 opt_metalink_.timestamp_backup = 0;
2270 delete opt_metalink_.chain;
2271 opt_metalink_.current = 0;
2272
2273 if (metalink_list.empty()) {
2274 opt_metalink_.chain = NULL;
2275 return;
2276 }
2277
2278 opt_metalink_.chain = new vector<string>(metalink_list);
2279 }
2280
2281
2282 /**
2283 * Retrieves the currently set chain of metalink hosts and the currently
2284 * used metalink host.
2285 */
2286 void DownloadManager::GetMetalinkInfo(vector<string> *metalink_chain,
2287 unsigned *current_metalink) {
2288 const MutexLockGuard m(lock_options_);
2289 if (opt_metalink_.chain) {
2290 if (current_metalink) {
2291 *current_metalink = opt_metalink_.current;
2292 }
2293 if (metalink_chain) {
2294 *metalink_chain = *opt_metalink_.chain;
2295 }
2296 }
2297 }
2298
2299
2300 /**
2301 * Parses a list of ';'-separated hosts for the host chain. The empty string
2302 * removes the host list.
2303 */
2304 1275 void DownloadManager::SetHostChain(const string &host_list) {
2305
1/2
✓ Branch 2 taken 1275 times.
✗ Branch 3 not taken.
1275 SetHostChain(SplitString(host_list, ';'));
2306 1275 }
2307
2308
2309 1291 void DownloadManager::SetHostChain(const std::vector<std::string> &host_list) {
2310 1291 const MutexLockGuard m(lock_options_);
2311 1291 opt_host_.timestamp_backup = 0;
2312
2/2
✓ Branch 0 taken 590 times.
✓ Branch 1 taken 701 times.
1291 delete opt_host_.chain;
2313
2/2
✓ Branch 0 taken 590 times.
✓ Branch 1 taken 701 times.
1291 delete opt_host_chain_rtt_;
2314 1291 opt_host_.current = 0;
2315
2316
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1291 times.
1291 if (host_list.empty()) {
2317 opt_host_.chain = NULL;
2318 opt_host_chain_rtt_ = NULL;
2319 return;
2320 }
2321
2322
2/4
✓ Branch 1 taken 1291 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1291 times.
✗ Branch 5 not taken.
1291 opt_host_.chain = new vector<string>(host_list);
2323 2582 opt_host_chain_rtt_ = new vector<int>(opt_host_.chain->size(),
2324
2/4
✓ Branch 1 taken 1291 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1291 times.
✗ Branch 5 not taken.
1291 kProbeUnprobed);
2325 // LogCvmfs(kLogDownload, kLogSyslog, "using host %s",
2326 // (*opt_host_.chain)[0].c_str());
2327
1/2
✓ Branch 1 taken 1291 times.
✗ Branch 2 not taken.
1291 }
2328
2329
2330 /**
2331 * Retrieves the currently set chain of hosts, their round trip times, and the
2332 * currently used host.
2333 */
2334 112 void DownloadManager::GetHostInfo(vector<string> *host_chain, vector<int> *rtt,
2335 unsigned *current_host) {
2336 112 const MutexLockGuard m(lock_options_);
2337
1/2
✓ Branch 0 taken 112 times.
✗ Branch 1 not taken.
112 if (opt_host_.chain) {
2338
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 112 times.
112 if (current_host) {
2339 *current_host = opt_host_.current;
2340 }
2341
1/2
✓ Branch 0 taken 112 times.
✗ Branch 1 not taken.
112 if (host_chain) {
2342
1/2
✓ Branch 1 taken 112 times.
✗ Branch 2 not taken.
112 *host_chain = *opt_host_.chain;
2343 }
2344
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 112 times.
112 if (rtt) {
2345 *rtt = *opt_host_chain_rtt_;
2346 }
2347 }
2348 112 }
2349
2350
2351 /**
2352 * Jumps to the next proxy in the ring of forward proxy servers.
2353 * Selects one randomly from a load-balancing group.
2354 *
2355 * Allow for the fact that the proxy may have already been failed by
2356 * another transfer, or that the proxy may no longer be part of the
2357 * current load-balancing group.
2358 */
2359 8 void DownloadManager::SwitchProxy(JobInfo *info) {
2360 8 const MutexLockGuard m(lock_options_);
2361
2362
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8 times.
8 if (!opt_proxy_groups_) {
2363 return;
2364 }
2365
2366 // Fail any matching proxies within the current load-balancing group
2367 8 vector<ProxyInfo> *group = current_proxy_group();
2368 8 const unsigned group_size = group->size();
2369 8 unsigned failed = 0;
2370
2/2
✓ Branch 0 taken 8 times.
✓ Branch 1 taken 8 times.
16 for (unsigned i = 0; i < group_size - opt_proxy_groups_current_burned_; ++i) {
2371
5/14
✓ Branch 0 taken 8 times.
✗ Branch 1 not taken.
✓ Branch 4 taken 8 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 8 times.
✗ Branch 8 not taken.
✓ Branch 9 taken 8 times.
✗ Branch 10 not taken.
✗ Branch 11 not taken.
✓ Branch 12 taken 8 times.
✗ Branch 13 not taken.
✗ Branch 14 not taken.
✗ Branch 15 not taken.
8 if (info && (info->proxy() == (*group)[i].url)) {
2372 // Move to list of failed proxies
2373 8 opt_proxy_groups_current_burned_++;
2374
1/2
✓ Branch 2 taken 8 times.
✗ Branch 3 not taken.
8 swap((*group)[i],
2375 8 (*group)[group_size - opt_proxy_groups_current_burned_]);
2376 8 perf::Inc(counters_->n_proxy_failover);
2377 8 failed++;
2378 }
2379 }
2380
2381 // Do nothing more unless at least one proxy was marked as failed
2382
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8 times.
8 if (!failed)
2383 return;
2384
2385 // If all proxies from the current load-balancing group are burned, switch to
2386 // another group
2387
1/2
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
8 if (opt_proxy_groups_current_burned_ == group->size()) {
2388 8 opt_proxy_groups_current_burned_ = 0;
2389
1/2
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
8 if (opt_proxy_groups_->size() > 1) {
2390 16 opt_proxy_groups_current_ = (opt_proxy_groups_current_ + 1)
2391 8 % opt_proxy_groups_->size();
2392 // Remember the timestamp of switching to backup proxies
2393
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8 times.
8 if (opt_proxy_groups_reset_after_ > 0) {
2394 if (opt_proxy_groups_current_ > 0) {
2395 if (opt_timestamp_backup_proxies_ == 0)
2396 opt_timestamp_backup_proxies_ = time(NULL);
2397 // LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2398 // "switched to (another) backup proxy group");
2399 } else {
2400 opt_timestamp_backup_proxies_ = 0;
2401 // LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2402 // "switched back to primary proxy group");
2403 }
2404 opt_timestamp_failover_proxies_ = 0;
2405 }
2406 }
2407 } else {
2408 // Record failover time
2409 if (opt_proxy_groups_reset_after_ > 0) {
2410 if (opt_timestamp_failover_proxies_ == 0)
2411 opt_timestamp_failover_proxies_ = time(NULL);
2412 }
2413 }
2414
2415
2/4
✓ Branch 2 taken 8 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 8 times.
✗ Branch 6 not taken.
8 UpdateProxiesUnlocked("failed proxy");
2416
1/2
✓ Branch 2 taken 8 times.
✗ Branch 3 not taken.
8 LogCvmfs(kLogDownload, kLogDebug,
2417 "(manager '%s' - id %" PRId64 ") "
2418 "%lu proxies remain in group",
2419 name_.c_str(), info->id(),
2420 8 current_proxy_group()->size() - opt_proxy_groups_current_burned_);
2421
1/2
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
8 }
2422
2423
2424 /**
2425 * Switches to the next host in the chain. If jobinfo is set, switch only if
2426 * the current host is identical to the one used by jobinfo, otherwise another
2427 * transfer has already done the switch.
2428 */
2429 25 void DownloadManager::SwitchHostInfo(const std::string &typ,
2430 HostInfo &info,
2431 JobInfo *jobinfo) {
2432 25 const MutexLockGuard m(lock_options_);
2433
2434
5/6
✓ Branch 0 taken 25 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 9 times.
✓ Branch 4 taken 16 times.
✓ Branch 5 taken 9 times.
✓ Branch 6 taken 16 times.
25 if (!info.chain || (info.chain->size() == 1)) {
2435 9 return;
2436 }
2437
2438
1/2
✓ Branch 0 taken 16 times.
✗ Branch 1 not taken.
16 if (jobinfo) {
2439 int lastused;
2440
1/2
✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
16 if (typ == "host") {
2441 16 lastused = jobinfo->current_host_chain_index();
2442 } else {
2443 lastused = jobinfo->current_metalink_chain_index();
2444 }
2445
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 16 times.
16 if (lastused != info.current) {
2446 LogCvmfs(kLogDownload, kLogDebug,
2447 "(manager '%s' - id %" PRId64 ")"
2448 "don't switch %s, "
2449 "last used %s: %s, current %s: %s",
2450 name_.c_str(), jobinfo->id(), typ.c_str(), typ.c_str(),
2451 (*info.chain)[lastused].c_str(), typ.c_str(),
2452 (*info.chain)[info.current].c_str());
2453 return;
2454 }
2455 }
2456
2457
1/2
✓ Branch 2 taken 16 times.
✗ Branch 3 not taken.
16 string reason = "manually triggered";
2458
1/2
✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
16 string info_id = "(manager " + name_;
2459
1/2
✓ Branch 0 taken 16 times.
✗ Branch 1 not taken.
16 if (jobinfo) {
2460
1/2
✓ Branch 3 taken 16 times.
✗ Branch 4 not taken.
16 reason = download::Code2Ascii(jobinfo->error_code());
2461
2/4
✓ Branch 2 taken 16 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 16 times.
✗ Branch 6 not taken.
16 info_id = " - id " + StringifyInt(jobinfo->id());
2462 }
2463
1/2
✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
16 info_id += ")";
2464
2465
1/2
✓ Branch 2 taken 16 times.
✗ Branch 3 not taken.
16 const std::string old_host = (*info.chain)[info.current];
2466 16 info.current = (info.current + 1) % static_cast<int>(info.chain->size());
2467
1/2
✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
16 if (typ == "host") {
2468 16 perf::Inc(counters_->n_host_failover);
2469 } else {
2470 perf::Inc(counters_->n_metalink_failover);
2471 }
2472
1/3
✓ Branch 6 taken 16 times.
✗ Branch 7 not taken.
✗ Branch 8 not taken.
32 LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2473 "%s switching %s from %s to %s (%s)", info_id.c_str(), typ.c_str(),
2474 16 old_host.c_str(), (*info.chain)[info.current].c_str(),
2475 reason.c_str());
2476
2477 // Remember the timestamp of switching to backup host
2478
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 16 times.
16 if (info.reset_after > 0) {
2479 if (info.current != 0) {
2480 if (info.timestamp_backup == 0)
2481 info.timestamp_backup = time(NULL);
2482 } else {
2483 info.timestamp_backup = 0;
2484 }
2485 }
2486
2/2
✓ Branch 4 taken 16 times.
✓ Branch 5 taken 9 times.
25 }
2487
2488 25 void DownloadManager::SwitchHost(JobInfo *info) {
2489
2/4
✓ Branch 2 taken 25 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 25 times.
✗ Branch 6 not taken.
25 SwitchHostInfo("host", opt_host_, info);
2490 25 }
2491
2492 9 void DownloadManager::SwitchHost() { SwitchHost(NULL); }
2493
2494
2495 void DownloadManager::SwitchMetalink(JobInfo *info) {
2496 SwitchHostInfo("metalink", opt_metalink_, info);
2497 }
2498
2499
2500 void DownloadManager::SwitchMetalink() { SwitchMetalink(NULL); }
2501
2502 1255 bool DownloadManager::CheckMetalinkChain(time_t now) {
2503 1255 return (opt_metalink_.chain
2504
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 1255 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
1255 && ((opt_metalink_timestamp_link_ == 0)
2505 || (static_cast<int64_t>((now == 0) ? time(NULL) : now)
2506 > static_cast<int64_t>(opt_metalink_timestamp_link_
2507
0/2
✗ Branch 0 not taken.
✗ Branch 1 not taken.
1255 + opt_metalink_.reset_after))));
2508 }
2509
2510
2511 /**
2512 * Orders the hostlist according to RTT of downloading .cvmfschecksum.
2513 * Sets the current host to the best-responsive host.
2514 * If you change the host list in between by SetHostChain(), it will be
2515 * overwritten by this function.
2516 */
2517 void DownloadManager::ProbeHosts() {
2518 vector<string> host_chain;
2519 vector<int> host_rtt;
2520 unsigned current_host;
2521
2522 GetHostInfo(&host_chain, &host_rtt, &current_host);
2523
2524 // Stopwatch, two times to fill caches first
2525 unsigned i, retries;
2526 string url;
2527
2528 cvmfs::MemSink memsink;
2529 JobInfo info(&url, false, false, NULL, &memsink);
2530 for (retries = 0; retries < 2; ++retries) {
2531 for (i = 0; i < host_chain.size(); ++i) {
2532 url = host_chain[i] + "/.cvmfspublished";
2533
2534 struct timeval tv_start, tv_end;
2535 gettimeofday(&tv_start, NULL);
2536 const Failures result = Fetch(&info);
2537 gettimeofday(&tv_end, NULL);
2538 memsink.Reset();
2539 if (result == kFailOk) {
2540 host_rtt[i] = static_cast<int>(DiffTimeSeconds(tv_start, tv_end)
2541 * 1000);
2542 LogCvmfs(kLogDownload, kLogDebug,
2543 "(manager '%s' - id %" PRId64 ") "
2544 "probing host %s had %dms rtt",
2545 name_.c_str(), info.id(), url.c_str(), host_rtt[i]);
2546 } else {
2547 LogCvmfs(kLogDownload, kLogDebug,
2548 "(manager '%s' - id %" PRId64 ") "
2549 "error while probing host %s: %d %s",
2550 name_.c_str(), info.id(), url.c_str(), result,
2551 Code2Ascii(result));
2552 host_rtt[i] = INT_MAX;
2553 }
2554 }
2555 }
2556
2557 SortTeam(&host_rtt, &host_chain);
2558 for (i = 0; i < host_chain.size(); ++i) {
2559 if (host_rtt[i] == INT_MAX)
2560 host_rtt[i] = kProbeDown;
2561 }
2562
2563 const MutexLockGuard m(lock_options_);
2564 delete opt_host_.chain;
2565 delete opt_host_chain_rtt_;
2566 opt_host_.chain = new vector<string>(host_chain);
2567 opt_host_chain_rtt_ = new vector<int>(host_rtt);
2568 opt_host_.current = 0;
2569 }
2570
2571 bool DownloadManager::GeoSortServers(std::vector<std::string> *servers,
2572 std::vector<uint64_t> *output_order) {
2573 if (!servers) {
2574 return false;
2575 }
2576 if (servers->size() == 1) {
2577 if (output_order) {
2578 output_order->clear();
2579 output_order->push_back(0);
2580 }
2581 return true;
2582 }
2583
2584 std::vector<std::string> host_chain;
2585 GetHostInfo(&host_chain, NULL, NULL);
2586
2587 std::vector<std::string> server_dns_names;
2588 server_dns_names.reserve(servers->size());
2589 for (unsigned i = 0; i < servers->size(); ++i) {
2590 const std::string host = dns::ExtractHost((*servers)[i]);
2591 server_dns_names.push_back(host.empty() ? (*servers)[i] : host);
2592 }
2593 const std::string host_list = JoinStrings(server_dns_names, ",");
2594
2595 vector<string> host_chain_shuffled;
2596 {
2597 // Protect against concurrent access to prng_
2598 const MutexLockGuard m(lock_options_);
2599 // Determine random hosts for the Geo-API query
2600 host_chain_shuffled = Shuffle(host_chain, &prng_);
2601 }
2602 // Request ordered list via Geo-API
2603 bool success = false;
2604 const unsigned max_attempts = std::min(host_chain_shuffled.size(), size_t(3));
2605 vector<uint64_t> geo_order(servers->size());
2606 for (unsigned i = 0; i < max_attempts; ++i) {
2607 const string url = host_chain_shuffled[i] + "/api/v1.0/geo/@proxy@/"
2608 + host_list;
2609 LogCvmfs(kLogDownload, kLogDebug,
2610 "(manager '%s') requesting ordered server list from %s",
2611 name_.c_str(), url.c_str());
2612 cvmfs::MemSink memsink;
2613 JobInfo info(&url, false, false, NULL, &memsink);
2614 const Failures result = Fetch(&info);
2615 if (result == kFailOk) {
2616 const string order(reinterpret_cast<char *>(memsink.data()),
2617 memsink.pos());
2618 memsink.Reset();
2619 const bool retval = ValidateGeoReply(order, servers->size(), &geo_order);
2620 if (!retval) {
2621 LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2622 "(manager '%s') retrieved invalid GeoAPI reply from %s [%s]",
2623 name_.c_str(), url.c_str(), order.c_str());
2624 } else {
2625 LogCvmfs(kLogDownload, kLogDebug | kLogSyslog,
2626 "(manager '%s') "
2627 "geographic order of servers retrieved from %s",
2628 name_.c_str(),
2629 dns::ExtractHost(host_chain_shuffled[i]).c_str());
2630 // remove new line at end of "order"
2631 LogCvmfs(kLogDownload, kLogDebug, "order is %s",
2632 Trim(order, true /* trim_newline */).c_str());
2633 success = true;
2634 break;
2635 }
2636 } else {
2637 LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2638 "(manager '%s') GeoAPI request for %s failed with error %d [%s]",
2639 name_.c_str(), url.c_str(), result, Code2Ascii(result));
2640 }
2641 }
2642 if (!success) {
2643 LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2644 "(manager '%s') "
2645 "failed to retrieve geographic order from stratum 1 servers",
2646 name_.c_str());
2647 return false;
2648 }
2649
2650 if (output_order) {
2651 output_order->swap(geo_order);
2652 } else {
2653 std::vector<std::string> sorted_servers;
2654 sorted_servers.reserve(geo_order.size());
2655 for (unsigned i = 0; i < geo_order.size(); ++i) {
2656 const uint64_t orderval = geo_order[i];
2657 sorted_servers.push_back((*servers)[orderval]);
2658 }
2659 servers->swap(sorted_servers);
2660 }
2661 return true;
2662 }
2663
2664
2665 /**
2666 * Uses the Geo-API of Stratum 1s to let any of them order the list of servers
2667 * and fallback proxies (if any).
2668 * Tries at most three random Stratum 1s before giving up.
2669 * If you change the host list in between by SetHostChain() or the fallback
2670 * proxy list by SetProxyChain(), they will be overwritten by this function.
2671 */
2672 bool DownloadManager::ProbeGeo() {
2673 vector<string> host_chain;
2674 vector<int> host_rtt;
2675 unsigned current_host;
2676 vector<vector<ProxyInfo> > proxy_chain;
2677 unsigned fallback_group;
2678
2679 GetHostInfo(&host_chain, &host_rtt, &current_host);
2680 GetProxyInfo(&proxy_chain, NULL, &fallback_group);
2681 if ((host_chain.size() < 2) && ((proxy_chain.size() - fallback_group) < 2))
2682 return true;
2683
2684 vector<string> host_names;
2685 for (unsigned i = 0; i < host_chain.size(); ++i)
2686 host_names.push_back(dns::ExtractHost(host_chain[i]));
2687 SortTeam(&host_names, &host_chain);
2688 const unsigned last_geo_host = host_names.size();
2689
2690 if ((fallback_group == 0) && (last_geo_host > 1)) {
2691 // There are no non-fallback proxies, which means that the client
2692 // will always use the fallback proxies. Add a keyword separator
2693 // between the hosts and fallback proxies so the geosorting service
2694 // will know to sort the hosts based on the distance from the
2695 // closest fallback proxy rather than the distance from the client.
2696 host_names.push_back("+PXYSEP+");
2697 }
2698
2699 // Add fallback proxy names to the end of the host list
2700 const unsigned first_geo_fallback = host_names.size();
2701 for (unsigned i = fallback_group; i < proxy_chain.size(); ++i) {
2702 // We only take the first fallback proxy name from every group under the
2703 // assumption that load-balanced servers are at the same location
2704 host_names.push_back(proxy_chain[i][0].host.name());
2705 }
2706
2707 std::vector<uint64_t> geo_order;
2708 const bool success = GeoSortServers(&host_names, &geo_order);
2709 if (!success) {
2710 // GeoSortServers already logged a failure message.
2711 return false;
2712 }
2713
2714 // Re-install host chain and proxy chain
2715 const MutexLockGuard m(lock_options_);
2716 delete opt_host_.chain;
2717 opt_num_proxies_ = 0;
2718 opt_host_.chain = new vector<string>(host_chain.size());
2719
2720 // It's possible that opt_proxy_groups_fallback_ might have changed while
2721 // the lock wasn't held
2722 vector<vector<ProxyInfo> > *proxy_groups = new vector<vector<ProxyInfo> >(
2723 opt_proxy_groups_fallback_ + proxy_chain.size() - fallback_group);
2724 // First copy the non-fallback part of the current proxy chain
2725 for (unsigned i = 0; i < opt_proxy_groups_fallback_; ++i) {
2726 (*proxy_groups)[i] = (*opt_proxy_groups_)[i];
2727 opt_num_proxies_ += (*opt_proxy_groups_)[i].size();
2728 }
2729
2730 // Copy the host chain and fallback proxies by geo order. Array indices
2731 // in geo_order that are smaller than last_geo_host refer to a stratum 1,
2732 // and those indices greater than or equal to first_geo_fallback refer to
2733 // a fallback proxy.
2734 unsigned hosti = 0;
2735 unsigned proxyi = opt_proxy_groups_fallback_;
2736 for (unsigned i = 0; i < geo_order.size(); ++i) {
2737 const uint64_t orderval = geo_order[i];
2738 if (orderval < static_cast<uint64_t>(last_geo_host)) {
2739 // LogCvmfs(kLogCvmfs, kLogSyslog, "this is orderval %u at host index
2740 // %u", orderval, hosti);
2741 (*opt_host_.chain)[hosti++] = host_chain[orderval];
2742 } else if (orderval >= static_cast<uint64_t>(first_geo_fallback)) {
2743 // LogCvmfs(kLogCvmfs, kLogSyslog,
2744 // "this is orderval %u at proxy index %u, using proxy_chain index %u",
2745 // orderval, proxyi, fallback_group + orderval - first_geo_fallback);
2746 (*proxy_groups)[proxyi] = proxy_chain[fallback_group + orderval
2747 - first_geo_fallback];
2748 opt_num_proxies_ += (*proxy_groups)[proxyi].size();
2749 proxyi++;
2750 }
2751 }
2752
2753 opt_proxy_map_.clear();
2754 delete opt_proxy_groups_;
2755 opt_proxy_groups_ = proxy_groups;
2756 // In pathological cases, opt_proxy_groups_current_ can be larger now when
2757 // proxies changed in-between.
2758 if (opt_proxy_groups_current_ > opt_proxy_groups_->size()) {
2759 if (opt_proxy_groups_->size() == 0) {
2760 opt_proxy_groups_current_ = 0;
2761 } else {
2762 opt_proxy_groups_current_ = opt_proxy_groups_->size() - 1;
2763 }
2764 opt_proxy_groups_current_burned_ = 0;
2765 }
2766
2767 UpdateProxiesUnlocked("geosort");
2768
2769 delete opt_host_chain_rtt_;
2770 opt_host_chain_rtt_ = new vector<int>(host_chain.size(), kProbeGeo);
2771 opt_host_.current = 0;
2772
2773 return true;
2774 }
2775
2776
2777 /**
2778 * Validates a string of the form "1,4,2,3" representing in which order the
2779 * the expected_size number of hosts should be put for optimal geographic
2780 * proximity. Returns false if the reply_order string is invalid, otherwise
2781 * fills in the reply_vals array with zero-based order indexes (e.g.
2782 * [0,3,1,2]) and returns true.
2783 */
2784 112 bool DownloadManager::ValidateGeoReply(const string &reply_order,
2785 const unsigned expected_size,
2786 vector<uint64_t> *reply_vals) {
2787
2/2
✓ Branch 1 taken 8 times.
✓ Branch 2 taken 104 times.
112 if (reply_order.empty())
2788 8 return false;
2789
2/4
✓ Branch 2 taken 104 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 104 times.
✗ Branch 6 not taken.
208 const sanitizer::InputSanitizer sanitizer("09 , \n");
2790
3/4
✓ Branch 1 taken 104 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 8 times.
✓ Branch 4 taken 96 times.
104 if (!sanitizer.IsValid(reply_order))
2791 8 return false;
2792
2/4
✓ Branch 2 taken 96 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 96 times.
✗ Branch 6 not taken.
192 const sanitizer::InputSanitizer strip_newline("09 ,");
2793
1/2
✓ Branch 1 taken 96 times.
✗ Branch 2 not taken.
96 vector<string> reply_strings = SplitString(strip_newline.Filter(reply_order),
2794
1/2
✓ Branch 1 taken 96 times.
✗ Branch 2 not taken.
96 ',');
2795 96 vector<uint64_t> tmp_vals;
2796
2/2
✓ Branch 1 taken 184 times.
✓ Branch 2 taken 80 times.
264 for (unsigned i = 0; i < reply_strings.size(); ++i) {
2797
2/2
✓ Branch 2 taken 16 times.
✓ Branch 3 taken 168 times.
184 if (reply_strings[i].empty())
2798 16 return false;
2799
2/4
✓ Branch 2 taken 168 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 168 times.
✗ Branch 6 not taken.
168 tmp_vals.push_back(String2Uint64(reply_strings[i]));
2800 }
2801
2/2
✓ Branch 1 taken 40 times.
✓ Branch 2 taken 40 times.
80 if (tmp_vals.size() != expected_size)
2802 40 return false;
2803
2804 // Check if tmp_vals contains the number 1..n
2805
1/2
✓ Branch 3 taken 40 times.
✗ Branch 4 not taken.
40 set<uint64_t> const coverage(tmp_vals.begin(), tmp_vals.end());
2806
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 40 times.
40 if (coverage.size() != tmp_vals.size())
2807 return false;
2808
5/6
✓ Branch 2 taken 32 times.
✓ Branch 3 taken 8 times.
✗ Branch 7 not taken.
✓ Branch 8 taken 32 times.
✓ Branch 9 taken 8 times.
✓ Branch 10 taken 32 times.
40 if ((*coverage.begin() != 1) || (*coverage.rbegin() != coverage.size()))
2809 8 return false;
2810
2811
2/2
✓ Branch 0 taken 72 times.
✓ Branch 1 taken 32 times.
104 for (unsigned i = 0; i < expected_size; ++i) {
2812 72 (*reply_vals)[i] = tmp_vals[i] - 1;
2813 }
2814 32 return true;
2815 104 }
2816
2817
2818 /**
2819 * Removes DIRECT from a list of ';' and '|' separated proxies.
2820 * \return true if DIRECT was present, false otherwise
2821 */
2822 1234 bool DownloadManager::StripDirect(const string &proxy_list,
2823 string *cleaned_list) {
2824
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1234 times.
1234 assert(cleaned_list);
2825
2/2
✓ Branch 1 taken 1162 times.
✓ Branch 2 taken 72 times.
1234 if (proxy_list == "") {
2826
1/2
✓ Branch 1 taken 1162 times.
✗ Branch 2 not taken.
1162 *cleaned_list = "";
2827 1162 return false;
2828 }
2829 72 bool result = false;
2830
2831
1/2
✓ Branch 1 taken 72 times.
✗ Branch 2 not taken.
72 vector<string> proxy_groups = SplitString(proxy_list, ';');
2832 72 vector<string> cleaned_groups;
2833
2/2
✓ Branch 1 taken 176 times.
✓ Branch 2 taken 72 times.
248 for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2834
1/2
✓ Branch 2 taken 176 times.
✗ Branch 3 not taken.
176 vector<string> group = SplitString(proxy_groups[i], '|');
2835 176 vector<string> cleaned;
2836
2/2
✓ Branch 1 taken 296 times.
✓ Branch 2 taken 176 times.
472 for (unsigned j = 0; j < group.size(); ++j) {
2837
6/6
✓ Branch 2 taken 216 times.
✓ Branch 3 taken 80 times.
✓ Branch 6 taken 104 times.
✓ Branch 7 taken 112 times.
✓ Branch 8 taken 184 times.
✓ Branch 9 taken 112 times.
296 if ((group[j] == "DIRECT") || (group[j] == "")) {
2838 184 result = true;
2839 } else {
2840
1/2
✓ Branch 2 taken 112 times.
✗ Branch 3 not taken.
112 cleaned.push_back(group[j]);
2841 }
2842 }
2843
2/2
✓ Branch 1 taken 56 times.
✓ Branch 2 taken 120 times.
176 if (!cleaned.empty())
2844
3/6
✓ Branch 2 taken 56 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 56 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 56 times.
✗ Branch 9 not taken.
56 cleaned_groups.push_back(JoinStrings(cleaned, "|"));
2845 176 }
2846
2847
2/4
✓ Branch 2 taken 72 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 72 times.
✗ Branch 6 not taken.
72 *cleaned_list = JoinStrings(cleaned_groups, ";");
2848 72 return result;
2849 72 }
2850
2851
2852 /**
2853 * Parses a list of ';'- and '|'-separated proxy servers and fallback proxy
2854 * servers for the proxy groups.
2855 * The empty string for both removes the proxy chain.
2856 * The set_mode parameter can be used to set either proxies (leaving fallback
2857 * proxies unchanged) or fallback proxies (leaving regular proxies unchanged)
2858 * or both.
2859 */
2860 1154 void DownloadManager::SetProxyChain(const string &proxy_list,
2861 const string &fallback_proxy_list,
2862 const ProxySetModes set_mode) {
2863 1154 const MutexLockGuard m(lock_options_);
2864
2865 1154 opt_timestamp_backup_proxies_ = 0;
2866 1154 opt_timestamp_failover_proxies_ = 0;
2867
1/2
✓ Branch 1 taken 1154 times.
✗ Branch 2 not taken.
1154 string set_proxy_list = opt_proxy_list_;
2868
1/2
✓ Branch 1 taken 1154 times.
✗ Branch 2 not taken.
1154 string set_proxy_fallback_list = opt_proxy_fallback_list_;
2869 bool contains_direct;
2870
3/4
✓ Branch 0 taken 1154 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1130 times.
✓ Branch 3 taken 24 times.
1154 if ((set_mode == kSetProxyFallback) || (set_mode == kSetProxyBoth)) {
2871
1/2
✓ Branch 1 taken 1130 times.
✗ Branch 2 not taken.
1130 opt_proxy_fallback_list_ = fallback_proxy_list;
2872 }
2873
3/4
✓ Branch 0 taken 1130 times.
✓ Branch 1 taken 24 times.
✓ Branch 2 taken 1130 times.
✗ Branch 3 not taken.
1154 if ((set_mode == kSetProxyRegular) || (set_mode == kSetProxyBoth)) {
2874
1/2
✓ Branch 1 taken 1154 times.
✗ Branch 2 not taken.
1154 opt_proxy_list_ = proxy_list;
2875 }
2876
1/2
✓ Branch 1 taken 1154 times.
✗ Branch 2 not taken.
1154 contains_direct = StripDirect(opt_proxy_fallback_list_,
2877 &set_proxy_fallback_list);
2878
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1154 times.
1154 if (contains_direct) {
2879 LogCvmfs(kLogDownload, kLogSyslogWarn | kLogDebug,
2880 "(manager '%s') fallback proxies do not support DIRECT, removing",
2881 name_.c_str());
2882 }
2883
1/2
✓ Branch 1 taken 1154 times.
✗ Branch 2 not taken.
1154 if (set_proxy_fallback_list == "") {
2884
1/2
✓ Branch 1 taken 1154 times.
✗ Branch 2 not taken.
1154 set_proxy_list = opt_proxy_list_;
2885 } else {
2886 const bool contains_direct = StripDirect(opt_proxy_list_, &set_proxy_list);
2887 if (contains_direct) {
2888 LogCvmfs(kLogDownload, kLogSyslog | kLogDebug,
2889 "(manager '%s') skipping DIRECT proxy to use fallback proxy",
2890 name_.c_str());
2891 }
2892 }
2893
2894 // From this point on, use set_proxy_list and set_fallback_proxy_list as
2895 // effective proxy lists!
2896
2897 1154 opt_proxy_map_.clear();
2898
2/2
✓ Branch 0 taken 565 times.
✓ Branch 1 taken 589 times.
1154 delete opt_proxy_groups_;
2899
2/6
✗ Branch 1 not taken.
✓ Branch 2 taken 1154 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 1154 times.
1154 if ((set_proxy_list == "") && (set_proxy_fallback_list == "")) {
2900 opt_proxy_groups_ = NULL;
2901 opt_proxy_groups_current_ = 0;
2902 opt_proxy_groups_current_burned_ = 0;
2903 opt_proxy_groups_fallback_ = 0;
2904 opt_num_proxies_ = 0;
2905 return;
2906 }
2907
2908 // Determine number of regular proxy groups (== first fallback proxy group)
2909 1154 opt_proxy_groups_fallback_ = 0;
2910
1/2
✓ Branch 1 taken 1154 times.
✗ Branch 2 not taken.
1154 if (set_proxy_list != "") {
2911
1/2
✓ Branch 1 taken 1154 times.
✗ Branch 2 not taken.
1154 opt_proxy_groups_fallback_ = SplitString(set_proxy_list, ';').size();
2912 }
2913
1/2
✓ Branch 2 taken 1154 times.
✗ Branch 3 not taken.
1154 LogCvmfs(kLogDownload, kLogDebug,
2914 "(manager '%s') "
2915 "first fallback proxy group %u",
2916 name_.c_str(), opt_proxy_groups_fallback_);
2917
2918 // Concatenate regular proxies and fallback proxies, both of which can be
2919 // empty.
2920
1/2
✓ Branch 1 taken 1154 times.
✗ Branch 2 not taken.
1154 string all_proxy_list = set_proxy_list;
2921
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1154 times.
1154 if (set_proxy_fallback_list != "") {
2922 if (all_proxy_list != "")
2923 all_proxy_list += ";";
2924 all_proxy_list += set_proxy_fallback_list;
2925 }
2926
1/2
✓ Branch 3 taken 1154 times.
✗ Branch 4 not taken.
1154 LogCvmfs(kLogDownload, kLogDebug, "(manager '%s') full proxy list %s",
2927 name_.c_str(), all_proxy_list.c_str());
2928
2929 // Resolve server names in provided urls
2930 1154 vector<string> hostnames; // All encountered hostnames
2931 1154 vector<string> proxy_groups;
2932
1/2
✓ Branch 1 taken 1154 times.
✗ Branch 2 not taken.
1154 if (all_proxy_list != "")
2933
1/2
✓ Branch 1 taken 1154 times.
✗ Branch 2 not taken.
1154 proxy_groups = SplitString(all_proxy_list, ';');
2934
2/2
✓ Branch 1 taken 1162 times.
✓ Branch 2 taken 1154 times.
2316 for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2935
1/2
✓ Branch 2 taken 1162 times.
✗ Branch 3 not taken.
1162 vector<string> this_group = SplitString(proxy_groups[i], '|');
2936
2/2
✓ Branch 1 taken 1162 times.
✓ Branch 2 taken 1162 times.
2324 for (unsigned j = 0; j < this_group.size(); ++j) {
2937
1/2
✓ Branch 2 taken 1162 times.
✗ Branch 3 not taken.
1162 this_group[j] = dns::AddDefaultScheme(this_group[j]);
2938 // Note: DIRECT strings will be "extracted" to an empty string.
2939
1/2
✓ Branch 2 taken 1162 times.
✗ Branch 3 not taken.
1162 const string hostname = dns::ExtractHost(this_group[j]);
2940 // Save the hostname. Leave empty (DIRECT) names so indexes will
2941 // match later.
2942
1/2
✓ Branch 1 taken 1162 times.
✗ Branch 2 not taken.
1162 hostnames.push_back(hostname);
2943 1162 }
2944 1162 }
2945 1154 vector<dns::Host> hosts;
2946
1/2
✓ Branch 3 taken 1154 times.
✗ Branch 4 not taken.
1154 LogCvmfs(kLogDownload, kLogDebug,
2947 "(manager '%s') "
2948 "resolving %lu proxy addresses",
2949 name_.c_str(), hostnames.size());
2950
1/2
✓ Branch 1 taken 1154 times.
✗ Branch 2 not taken.
1154 resolver_->ResolveMany(hostnames, &hosts);
2951
2952 // Construct opt_proxy_groups_: traverse proxy list in same order and expand
2953 // names to resolved IP addresses.
2954
1/2
✓ Branch 1 taken 1154 times.
✗ Branch 2 not taken.
1154 opt_proxy_groups_ = new vector<vector<ProxyInfo> >();
2955 1154 opt_num_proxies_ = 0;
2956 1154 unsigned num_proxy = 0; // Combined i, j counter
2957
2/2
✓ Branch 1 taken 1162 times.
✓ Branch 2 taken 1154 times.
2316 for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2958
1/2
✓ Branch 2 taken 1162 times.
✗ Branch 3 not taken.
1162 vector<string> this_group = SplitString(proxy_groups[i], '|');
2959 // Construct ProxyInfo objects from proxy string and DNS resolver result for
2960 // every proxy in this_group. One URL can result in multiple ProxyInfo
2961 // objects, one for each IP address.
2962 1162 vector<ProxyInfo> infos;
2963
2/2
✓ Branch 1 taken 1162 times.
✓ Branch 2 taken 1162 times.
2324 for (unsigned j = 0; j < this_group.size(); ++j, ++num_proxy) {
2964
1/2
✓ Branch 2 taken 1162 times.
✗ Branch 3 not taken.
1162 this_group[j] = dns::AddDefaultScheme(this_group[j]);
2965
2/2
✓ Branch 2 taken 1130 times.
✓ Branch 3 taken 32 times.
1162 if (this_group[j] == "DIRECT") {
2966
3/6
✓ Branch 2 taken 1130 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1130 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 1130 times.
✗ Branch 9 not taken.
1130 infos.push_back(ProxyInfo("DIRECT"));
2967 1130 continue;
2968 }
2969
2970
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 32 times.
32 if (hosts[num_proxy].status() != dns::kFailOk) {
2971 LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2972 "(manager '%s') "
2973 "failed to resolve IP addresses for %s (%d - %s)",
2974 name_.c_str(), hosts[num_proxy].name().c_str(),
2975 hosts[num_proxy].status(),
2976 dns::Code2Ascii(hosts[num_proxy].status()));
2977 const dns::Host failed_host = dns::Host::ExtendDeadline(
2978 hosts[num_proxy], resolver_->min_ttl());
2979 infos.push_back(ProxyInfo(failed_host, this_group[j]));
2980 continue;
2981 }
2982
2983 // IPv4 addresses have precedence
2984 32 set<string> const best_addresses = hosts[num_proxy].ViewBestAddresses(
2985
2/4
✓ Branch 1 taken 32 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 32 times.
✗ Branch 5 not taken.
32 opt_ip_preference_);
2986 32 set<string>::const_iterator iter_ips = best_addresses.begin();
2987
2/2
✓ Branch 3 taken 32 times.
✓ Branch 4 taken 32 times.
64 for (; iter_ips != best_addresses.end(); ++iter_ips) {
2988
1/2
✓ Branch 3 taken 32 times.
✗ Branch 4 not taken.
32 const string url_ip = dns::RewriteUrl(this_group[j], *iter_ips);
2989
2/4
✓ Branch 2 taken 32 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 32 times.
✗ Branch 6 not taken.
32 infos.push_back(ProxyInfo(hosts[num_proxy], url_ip));
2990
2991
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 32 times.
32 if (sharding_policy_.UseCount() > 0) {
2992 sharding_policy_->AddProxy(url_ip);
2993 }
2994 32 }
2995 32 }
2996
1/2
✓ Branch 1 taken 1162 times.
✗ Branch 2 not taken.
1162 opt_proxy_groups_->push_back(infos);
2997 1162 opt_num_proxies_ += infos.size();
2998 1162 }
2999
1/2
✓ Branch 2 taken 1154 times.
✗ Branch 3 not taken.
1154 LogCvmfs(kLogDownload, kLogDebug,
3000 "(manager '%s') installed %u proxies in %lu load-balance groups",
3001 1154 name_.c_str(), opt_num_proxies_, opt_proxy_groups_->size());
3002 1154 opt_proxy_groups_current_ = 0;
3003 1154 opt_proxy_groups_current_burned_ = 0;
3004
3005 // Select random start proxy from the first group.
3006
1/2
✓ Branch 1 taken 1154 times.
✗ Branch 2 not taken.
1154 if (opt_proxy_groups_->size() > 0) {
3007 // Select random start proxy from the first group.
3008
2/4
✓ Branch 2 taken 1154 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1154 times.
✗ Branch 6 not taken.
1154 UpdateProxiesUnlocked("set random start proxy from the first proxy group");
3009 }
3010
3/6
✓ Branch 5 taken 1154 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 1154 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 1154 times.
✗ Branch 12 not taken.
1154 }
3011
3012
3013 /**
3014 * Retrieves the proxy chain, optionally the currently active load-balancing
3015 * group, and optionally the index of the first fallback proxy group.
3016 * If there are no fallback proxies, the index will equal the size of
3017 * the proxy chain.
3018 */
3019 void DownloadManager::GetProxyInfo(vector<vector<ProxyInfo> > *proxy_chain,
3020 unsigned *current_group,
3021 unsigned *fallback_group) {
3022 assert(proxy_chain != NULL);
3023 const MutexLockGuard m(lock_options_);
3024
3025 if (!opt_proxy_groups_) {
3026 const vector<vector<ProxyInfo> > empty_chain;
3027 *proxy_chain = empty_chain;
3028 if (current_group != NULL)
3029 *current_group = 0;
3030 if (fallback_group != NULL)
3031 *fallback_group = 0;
3032 return;
3033 }
3034
3035 *proxy_chain = *opt_proxy_groups_;
3036 if (current_group != NULL)
3037 *current_group = opt_proxy_groups_current_;
3038 if (fallback_group != NULL)
3039 *fallback_group = opt_proxy_groups_fallback_;
3040 }
3041
3042 string DownloadManager::GetProxyList() { return opt_proxy_list_; }
3043
3044 string DownloadManager::GetFallbackProxyList() {
3045 return opt_proxy_fallback_list_;
3046 }
3047
3048 /**
3049 * Choose proxy
3050 */
3051 4188 DownloadManager::ProxyInfo *DownloadManager::ChooseProxyUnlocked(
3052 const shash::Any *hash) {
3053
2/2
✓ Branch 0 taken 3099 times.
✓ Branch 1 taken 1089 times.
4188 if (!opt_proxy_groups_)
3054 3099 return NULL;
3055
3056
2/2
✓ Branch 0 taken 706 times.
✓ Branch 1 taken 383 times.
1089 const uint32_t key = (hash ? hash->Partial32() : 0);
3057
1/2
✓ Branch 1 taken 1089 times.
✗ Branch 2 not taken.
1089 const map<uint32_t, ProxyInfo *>::iterator it = opt_proxy_map_.lower_bound(
3058 key);
3059 1089 ProxyInfo *proxy = it->second;
3060
3061 1089 return proxy;
3062 }
3063
3064 /**
3065 * Update currently selected proxy
3066 */
3067 1727 void DownloadManager::UpdateProxiesUnlocked(const string &reason) {
3068
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1727 times.
1727 if (!opt_proxy_groups_)
3069 return;
3070
3071 // Identify number of non-burned proxies within the current group
3072 1727 vector<ProxyInfo> *group = current_proxy_group();
3073 1727 const unsigned num_alive = (group->size() - opt_proxy_groups_current_burned_);
3074
2/4
✓ Branch 2 taken 1727 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1727 times.
✗ Branch 6 not taken.
3454 const string old_proxy = JoinStrings(opt_proxies_, "|");
3075
3076 // Rebuild proxy map and URL list
3077 1727 opt_proxy_map_.clear();
3078 1727 opt_proxies_.clear();
3079 1727 const uint32_t max_key = 0xffffffffUL;
3080
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1727 times.
1727 if (opt_proxy_shard_) {
3081 // Build a consistent map with multiple entries for each proxy
3082 for (unsigned i = 0; i < num_alive; ++i) {
3083 ProxyInfo *proxy = &(*group)[i];
3084 shash::Any proxy_hash(shash::kSha1);
3085 HashString(proxy->url, &proxy_hash);
3086 Prng prng;
3087 prng.InitSeed(proxy_hash.Partial32());
3088 for (unsigned j = 0; j < kProxyMapScale; ++j) {
3089 const std::pair<uint32_t, ProxyInfo *> entry(prng.Next(max_key), proxy);
3090 opt_proxy_map_.insert(entry);
3091 }
3092 const std::string proxy_name = proxy->host.name().empty()
3093 ? ""
3094 : " (" + proxy->host.name() + ")";
3095 opt_proxies_.push_back(proxy->url + proxy_name);
3096 }
3097 // Ensure lower_bound() finds a value for all keys
3098 ProxyInfo *first_proxy = opt_proxy_map_.begin()->second;
3099 const std::pair<uint32_t, ProxyInfo *> last_entry(max_key, first_proxy);
3100 opt_proxy_map_.insert(last_entry);
3101 } else {
3102 // Build a map with a single entry for one randomly selected proxy
3103 1727 const unsigned select = prng_.Next(num_alive);
3104 1727 ProxyInfo *proxy = &(*group)[select];
3105 1727 const std::pair<uint32_t, ProxyInfo *> entry(max_key, proxy);
3106
1/2
✓ Branch 1 taken 1727 times.
✗ Branch 2 not taken.
1727 opt_proxy_map_.insert(entry);
3107 1727 const std::string proxy_name = proxy->host.name().empty()
3108 ? ""
3109
9/18
✓ Branch 0 taken 1695 times.
✓ Branch 1 taken 32 times.
✓ Branch 4 taken 1695 times.
✗ Branch 5 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 32 times.
✗ Branch 9 not taken.
✗ Branch 10 not taken.
✓ Branch 11 taken 32 times.
✗ Branch 12 not taken.
✓ Branch 13 taken 32 times.
✓ Branch 14 taken 1695 times.
✓ Branch 15 taken 1695 times.
✓ Branch 16 taken 32 times.
✗ Branch 17 not taken.
✗ Branch 18 not taken.
✗ Branch 19 not taken.
✗ Branch 20 not taken.
1759 : " (" + proxy->host.name() + ")";
3110
2/4
✓ Branch 1 taken 1727 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1727 times.
✗ Branch 5 not taken.
1727 opt_proxies_.push_back(proxy->url + proxy_name);
3111 1727 }
3112
1/2
✓ Branch 3 taken 1727 times.
✗ Branch 4 not taken.
1727 sort(opt_proxies_.begin(), opt_proxies_.end());
3113
3114 // Report any change in proxy usage
3115
2/4
✓ Branch 2 taken 1727 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1727 times.
✗ Branch 6 not taken.
3454 const string new_proxy = JoinStrings(opt_proxies_, "|");
3116 const string curr_host = "Current host: "
3117 1727 + (opt_host_.chain
3118
6/12
✓ Branch 0 taken 1695 times.
✓ Branch 1 taken 32 times.
✓ Branch 4 taken 1695 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 32 times.
✗ Branch 9 not taken.
✓ Branch 10 taken 32 times.
✓ Branch 11 taken 1695 times.
✗ Branch 12 not taken.
✗ Branch 13 not taken.
3454 ? (*opt_host_.chain)[opt_host_.current]
3119
1/2
✓ Branch 1 taken 1727 times.
✗ Branch 2 not taken.
1727 : "");
3120
2/2
✓ Branch 1 taken 1162 times.
✓ Branch 2 taken 565 times.
1727 if (new_proxy != old_proxy) {
3121
4/9
✗ Branch 2 not taken.
✓ Branch 3 taken 1162 times.
✓ Branch 4 taken 1154 times.
✓ Branch 5 taken 8 times.
✗ Branch 7 not taken.
✓ Branch 8 taken 1162 times.
✗ Branch 9 not taken.
✗ Branch 12 not taken.
✗ Branch 13 not taken.
3494 LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
3122 "(manager '%s') switching proxy from %s to %s. Reason: %s [%s]",
3123 1170 name_.c_str(), (old_proxy.empty() ? "(none)" : old_proxy.c_str()),
3124 2324 (new_proxy.empty() ? "(none)" : new_proxy.c_str()), reason.c_str(),
3125 curr_host.c_str());
3126 }
3127 1727 }
3128
3129 /**
3130 * Enable proxy sharding
3131 */
3132 void DownloadManager::ShardProxies() {
3133 opt_proxy_shard_ = true;
3134 RebalanceProxiesUnlocked("enable sharding");
3135 }
3136
3137 /**
3138 * Selects a new random proxy in the current load-balancing group. Resets the
3139 * "burned" counter.
3140 */
3141 void DownloadManager::RebalanceProxiesUnlocked(const string &reason) {
3142 if (!opt_proxy_groups_)
3143 return;
3144
3145 opt_timestamp_failover_proxies_ = 0;
3146 opt_proxy_groups_current_burned_ = 0;
3147 UpdateProxiesUnlocked(reason);
3148 }
3149
3150
3151 void DownloadManager::RebalanceProxies() {
3152 const MutexLockGuard m(lock_options_);
3153 RebalanceProxiesUnlocked("rebalance invoked manually");
3154 }
3155
3156
3157 /**
3158 * Switches to the next load-balancing group of proxy servers.
3159 */
3160 void DownloadManager::SwitchProxyGroup() {
3161 const MutexLockGuard m(lock_options_);
3162
3163 if (!opt_proxy_groups_ || (opt_proxy_groups_->size() < 2)) {
3164 return;
3165 }
3166
3167 opt_proxy_groups_current_ = (opt_proxy_groups_current_ + 1)
3168 % opt_proxy_groups_->size();
3169 opt_timestamp_backup_proxies_ = time(NULL);
3170
3171 const std::string msg = "switch to proxy group "
3172 + StringifyUint(opt_proxy_groups_current_);
3173 RebalanceProxiesUnlocked(msg);
3174 }
3175
3176
3177 void DownloadManager::SetProxyGroupResetDelay(const unsigned seconds) {
3178 const MutexLockGuard m(lock_options_);
3179 opt_proxy_groups_reset_after_ = seconds;
3180 if (opt_proxy_groups_reset_after_ == 0) {
3181 opt_timestamp_backup_proxies_ = 0;
3182 opt_timestamp_failover_proxies_ = 0;
3183 }
3184 }
3185
3186
3187 void DownloadManager::SetMetalinkResetDelay(const unsigned seconds) {
3188 const MutexLockGuard m(lock_options_);
3189 opt_metalink_.reset_after = seconds;
3190 if (opt_metalink_.reset_after == 0)
3191 opt_metalink_.timestamp_backup = 0;
3192 }
3193
3194
3195 void DownloadManager::SetHostResetDelay(const unsigned seconds) {
3196 const MutexLockGuard m(lock_options_);
3197 opt_host_.reset_after = seconds;
3198 if (opt_host_.reset_after == 0)
3199 opt_host_.timestamp_backup = 0;
3200 }
3201
3202
3203 1512 void DownloadManager::SetRetryParameters(const unsigned max_retries,
3204 const unsigned backoff_init_ms,
3205 const unsigned backoff_max_ms) {
3206 1512 const MutexLockGuard m(lock_options_);
3207 1512 opt_max_retries_ = max_retries;
3208 1512 opt_backoff_init_ms_ = backoff_init_ms;
3209 1512 opt_backoff_max_ms_ = backoff_max_ms;
3210 1512 }
3211
3212
3213 581 void DownloadManager::SetMaxIpaddrPerProxy(unsigned limit) {
3214 581 const MutexLockGuard m(lock_options_);
3215 581 resolver_->set_throttle(limit);
3216 581 }
3217
3218
3219 675 void DownloadManager::SetProxyTemplates(const std::string &direct,
3220 const std::string &forced) {
3221 675 const MutexLockGuard m(lock_options_);
3222
1/2
✓ Branch 1 taken 675 times.
✗ Branch 2 not taken.
675 proxy_template_direct_ = direct;
3223
1/2
✓ Branch 1 taken 675 times.
✗ Branch 2 not taken.
675 proxy_template_forced_ = forced;
3224 675 }
3225
3226
3227 void DownloadManager::EnableInfoHeader() { enable_info_header_ = true; }
3228
3229
3230 861 void DownloadManager::EnableRedirects() { follow_redirects_ = true; }
3231
3232 void DownloadManager::EnableIgnoreSignatureFailures() {
3233 ignore_signature_failures_ = true;
3234 }
3235
3236 void DownloadManager::EnableHTTPTracing() { enable_http_tracing_ = true; }
3237
3238 void DownloadManager::AddHTTPTracingHeader(const std::string &header) {
3239 http_tracing_headers_.push_back(header);
3240 }
3241
3242 837 void DownloadManager::UseSystemCertificatePath() {
3243 837 ssl_certificate_store_.UseSystemCertificatePath();
3244 837 }
3245
3246 bool DownloadManager::SetShardingPolicy(const ShardingPolicySelector type) {
3247 const bool success = false;
3248 switch (type) {
3249 default:
3250 LogCvmfs(
3251 kLogDownload, kLogDebug | kLogSyslogErr,
3252 "(manager '%s') "
3253 "Proposed sharding policy does not exist. Falling back to default",
3254 name_.c_str());
3255 }
3256 return success;
3257 }
3258
3259 void DownloadManager::SetFailoverIndefinitely() {
3260 failover_indefinitely_ = true;
3261 }
3262
3263 /**
3264 * Creates a copy of the existing download manager. Must only be called in
3265 * single-threaded stage because it calls curl_global_init().
3266 */
3267 581 DownloadManager *DownloadManager::Clone(
3268 const perf::StatisticsTemplate &statistics,
3269 const std::string &cloned_name) {
3270 DownloadManager *clone = new DownloadManager(pool_max_handles_, statistics,
3271
1/2
✓ Branch 2 taken 581 times.
✗ Branch 3 not taken.
581 cloned_name);
3272
3273 581 clone->SetDnsParameters(resolver_->retries(), resolver_->timeout_ms());
3274 581 clone->SetDnsTtlLimits(resolver_->min_ttl(), resolver_->max_ttl());
3275 581 clone->SetMaxIpaddrPerProxy(resolver_->throttle());
3276
3277
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 581 times.
581 if (!opt_dns_server_.empty())
3278 clone->SetDnsServer(opt_dns_server_);
3279 581 clone->opt_timeout_proxy_ = opt_timeout_proxy_;
3280 581 clone->opt_timeout_direct_ = opt_timeout_direct_;
3281 581 clone->opt_low_speed_limit_ = opt_low_speed_limit_;
3282 581 clone->opt_max_retries_ = opt_max_retries_;
3283 581 clone->opt_backoff_init_ms_ = opt_backoff_init_ms_;
3284 581 clone->opt_backoff_max_ms_ = opt_backoff_max_ms_;
3285 581 clone->enable_info_header_ = enable_info_header_;
3286 581 clone->enable_http_tracing_ = enable_http_tracing_;
3287 581 clone->http_tracing_headers_ = http_tracing_headers_;
3288 581 clone->follow_redirects_ = follow_redirects_;
3289 581 clone->ignore_signature_failures_ = ignore_signature_failures_;
3290
2/2
✓ Branch 0 taken 565 times.
✓ Branch 1 taken 16 times.
581 if (opt_host_.chain) {
3291
1/2
✓ Branch 2 taken 565 times.
✗ Branch 3 not taken.
565 clone->opt_host_.chain = new vector<string>(*opt_host_.chain);
3292
1/2
✓ Branch 2 taken 565 times.
✗ Branch 3 not taken.
565 clone->opt_host_chain_rtt_ = new vector<int>(*opt_host_chain_rtt_);
3293 }
3294
3295 581 CloneProxyConfig(clone);
3296 581 clone->opt_ip_preference_ = opt_ip_preference_;
3297 581 clone->proxy_template_direct_ = proxy_template_direct_;
3298 581 clone->proxy_template_forced_ = proxy_template_forced_;
3299 581 clone->opt_proxy_groups_reset_after_ = opt_proxy_groups_reset_after_;
3300 581 clone->opt_metalink_.reset_after = opt_metalink_.reset_after;
3301 581 clone->opt_host_.reset_after = opt_host_.reset_after;
3302 581 clone->credentials_attachment_ = credentials_attachment_;
3303 581 clone->ssl_certificate_store_ = ssl_certificate_store_;
3304
3305 581 clone->health_check_ = health_check_;
3306 581 clone->sharding_policy_ = sharding_policy_;
3307 581 clone->failover_indefinitely_ = failover_indefinitely_;
3308 581 clone->fqrn_ = fqrn_;
3309
3310 581 return clone;
3311 }
3312
3313
3314 581 void DownloadManager::CloneProxyConfig(DownloadManager *clone) {
3315 581 clone->opt_proxy_groups_current_ = opt_proxy_groups_current_;
3316 581 clone->opt_proxy_groups_current_burned_ = opt_proxy_groups_current_burned_;
3317 581 clone->opt_proxy_groups_fallback_ = opt_proxy_groups_fallback_;
3318 581 clone->opt_num_proxies_ = opt_num_proxies_;
3319 581 clone->opt_proxy_shard_ = opt_proxy_shard_;
3320 581 clone->opt_proxy_list_ = opt_proxy_list_;
3321 581 clone->opt_proxy_fallback_list_ = opt_proxy_fallback_list_;
3322
2/2
✓ Branch 0 taken 16 times.
✓ Branch 1 taken 565 times.
581 if (opt_proxy_groups_ == NULL)
3323 16 return;
3324
3325
1/2
✓ Branch 2 taken 565 times.
✗ Branch 3 not taken.
565 clone->opt_proxy_groups_ = new vector<vector<ProxyInfo> >(*opt_proxy_groups_);
3326
2/4
✓ Branch 2 taken 565 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 565 times.
✗ Branch 6 not taken.
565 clone->UpdateProxiesUnlocked("cloned");
3327 }
3328
3329 } // namespace download
3330