GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/download.cc Lines: 700 1404 49.9 %
Date: 2019-02-03 02:48:13 Branches: 347 901 38.5 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 *
4
 * The download module provides an interface for fetching files via HTTP
5
 * and file.  It is internally using libcurl and the asynchronous DNS resolver
6
 * c-ares.  The JobInfo struct describes a single file/url to download and
7
 * keeps the state during the several phases of downloading.
8
 *
9
 * The module starts in single-threaded mode and can be switched to multi-
10
 * threaded mode by Spawn().  In multi-threaded mode, the Fetch() function still
11
 * blocks but there is a separate I/O thread using asynchronous I/O, which
12
 * maintains all concurrent connections simultaneously.  As there might be more
13
 * than 1024 file descriptors for the CernVM-FS process, the I/O thread uses
14
 * poll and the libcurl multi socket interface.
15
 *
16
 * While downloading, files can be decompressed and the secure hash can be
17
 * calculated on the fly.
18
 *
19
 * The module also implements failure handling.  If corrupted data has been
20
 * downloaded, the transfer is restarted using HTTP "no-cache" pragma.
21
 * A "host chain" can be configured.  When a host fails, there is automatic
22
 * fail-over to the next host in the chain until all hosts are probed.
23
 * Similarly a chain of proxy sets can be configured.  Inside a proxy set,
24
 * proxies are selected randomly (load-balancing set).
25
 */
26
27
// TODO(jblomer): MS for time summing
28
#define __STDC_FORMAT_MACROS
29
30
#include "cvmfs_config.h"
31
#include "download.h"
32
33
#include <alloca.h>
34
#include <errno.h>
35
#include <inttypes.h>
36
#include <poll.h>
37
#include <pthread.h>
38
#include <signal.h>
39
#include <stdint.h>
40
#include <sys/time.h>
41
#include <unistd.h>
42
43
#include <algorithm>
44
#include <cassert>
45
#include <cstdio>
46
#include <cstdlib>
47
#include <cstring>
48
#include <set>
49
50
#include "atomic.h"
51
#include "compression.h"
52
#include "duplex_curl.h"
53
#include "hash.h"
54
#include "logging.h"
55
#include "prng.h"
56
#include "sanitizer.h"
57
#include "smalloc.h"
58
#include "util/algorithm.h"
59
#include "util/posix.h"
60
#include "util/string.h"
61
62
using namespace std;  // NOLINT
63
64
namespace download {
65
66
20054
static inline bool EscapeUrlChar(char input, char output[3]) {
67








20054
  if (((input >= '0') && (input <= '9')) ||
68
      ((input >= 'A') && (input <= 'Z')) ||
69
      ((input >= 'a') && (input <= 'z')) ||
70
      (input == '/') || (input == ':') || (input == '.') ||
71
      (input == '+') || (input == '-') ||
72
      (input == '_') || (input == '~') ||
73
      (input == '[') || (input == ']') || (input == ','))
74
  {
75
20054
    output[0] = input;
76
20054
    return false;
77
  }
78
79
  output[0] = '%';
80
  output[1] = (input / 16) + ((input / 16 <= 9) ? '0' : 'A'-10);
81
  output[2] = (input % 16) + ((input % 16 <= 9) ? '0' : 'A'-10);
82
  return true;
83
}
84
85
86
/**
87
 * Escape special chars from the URL, except for ':' and '/',
88
 * which should keep their meaning.
89
 */
90
242
static string EscapeUrl(const string &url) {
91
242
  string escaped;
92
242
  escaped.reserve(url.length());
93
94
  char escaped_char[3];
95
20296
  for (unsigned i = 0, s = url.length(); i < s; ++i) {
96
20054
    if (EscapeUrlChar(url[i], escaped_char))
97
      escaped.append(escaped_char, 3);
98
    else
99
20054
      escaped.push_back(escaped_char[0]);
100
  }
101
  LogCvmfs(kLogDownload, kLogDebug, "escaped %s to %s",
102
242
           url.c_str(), escaped.c_str());
103
104
242
  return escaped;
105
}
106
107
108
/**
109
 * escaped array needs to be sufficiently large.  It's size is calculated by
110
 * passing NULL to EscapeHeader.
111
 */
112
static unsigned EscapeHeader(const string &header,
113
                             char *escaped_buf,
114
                             size_t buf_size)
115
{
116
  unsigned esc_pos = 0;
117
  char escaped_char[3];
118
  for (unsigned i = 0, s = header.size(); i < s; ++i) {
119
    if (EscapeUrlChar(header[i], escaped_char)) {
120
      for (unsigned j = 0; j < 3; ++j) {
121
        if (escaped_buf) {
122
          if (esc_pos >= buf_size)
123
            return esc_pos;
124
          escaped_buf[esc_pos] = escaped_char[j];
125
        }
126
        esc_pos++;
127
      }
128
    } else {
129
      if (escaped_buf) {
130
        if (esc_pos >= buf_size)
131
          return esc_pos;
132
        escaped_buf[esc_pos] = escaped_char[0];
133
      }
134
      esc_pos++;
135
    }
136
  }
137
138
  return esc_pos;
139
}
140
141
142
242
static Failures PrepareDownloadDestination(JobInfo *info) {
143
242
  info->destination_mem.size = 0;
144
242
  info->destination_mem.pos = 0;
145
242
  info->destination_mem.data = NULL;
146
147
242
  if (info->destination == kDestinationFile)
148
37
    assert(info->destination_file != NULL);
149
150
242
  if (info->destination == kDestinationPath) {
151
    assert(info->destination_path != NULL);
152
    info->destination_file = fopen(info->destination_path->c_str(), "w");
153
    if (info->destination_file == NULL) {
154
      LogCvmfs(kLogDownload, kLogDebug, "Failed to open path %s: %s"
155
               " (errno=%d).",
156
               info->destination_path->c_str(), strerror(errno), errno);
157
      return kFailLocalIO;
158
    }
159
  }
160
161
242
  if (info->destination == kDestinationSink)
162
100
    assert(info->destination_sink != NULL);
163
164
242
  return kFailOk;
165
}
166
167
168
/**
169
 * Called by curl for every HTTP header. Not called for file:// transfers.
170
 */
171
633
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb,
172
                                 void *info_link)
173
{
174
633
  const size_t num_bytes = size*nmemb;
175
633
  const string header_line(static_cast<const char *>(ptr), num_bytes);
176
633
  JobInfo *info = static_cast<JobInfo *>(info_link);
177
178
  // LogCvmfs(kLogDownload, kLogDebug, "REMOVE-ME: Header callback with %s",
179
  //          header_line.c_str());
180
181
  // Check http status codes
182

633
  if (HasPrefix(header_line, "HTTP/1.", false)) {
183
    if (header_line.length() < 10)
184
      return 0;
185
186
    unsigned i;
187
    for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {}
188
189
    // Code is initialized to -1
190
    if (header_line.length() > i+2) {
191
      info->http_code = DownloadManager::ParseHttpCode(&header_line[i]);
192
    }
193
194
    if ((info->http_code / 100) == 2) {
195
      return num_bytes;
196
    } else if ((info->http_code == 301) ||
197
               (info->http_code == 302) ||
198
               (info->http_code == 303) ||
199
               (info->http_code == 307))
200
    {
201
      if (!info->follow_redirects) {
202
        LogCvmfs(kLogDownload, kLogDebug, "redirect support not enabled: %s",
203
                 header_line.c_str());
204
        info->error_code = kFailHostHttp;
205
        return 0;
206
      }
207
      LogCvmfs(kLogDownload, kLogDebug, "http redirect: %s",
208
               header_line.c_str());
209
      // libcurl will handle this because of CURLOPT_FOLLOWLOCATION
210
      return num_bytes;
211
    } else {
212
      LogCvmfs(kLogDownload, kLogDebug, "http status error code: %s",
213
               header_line.c_str());
214
      if ((info->http_code / 100) == 5) {
215
        // 5XX returned by host
216
        info->error_code = kFailHostHttp;
217
      } else if ((info->http_code == 400) || (info->http_code == 404)) {
218
        // 400: error from the GeoAPI module
219
        // 404: the stratum 1 does not have the newest files
220
        info->error_code = kFailHostHttp;
221
      } else {
222
        info->error_code = (info->proxy == "DIRECT") ? kFailHostHttp :
223
                                                       kFailProxyHttp;
224
      }
225
      return 0;
226
    }
227
  }
228
229
  // Allocate memory for kDestinationMemory
230




633
  if ((info->destination == kDestinationMem) &&
231
      HasPrefix(header_line, "CONTENT-LENGTH:", true))
232
  {
233
102
    char *tmp = reinterpret_cast<char *>(alloca(num_bytes+1));
234
102
    uint64_t length = 0;
235
102
    sscanf(header_line.c_str(), "%s %" PRIu64, tmp, &length);
236
102
    if (length > 0) {
237
102
      if (length > DownloadManager::kMaxMemSize) {
238
        LogCvmfs(kLogDownload, kLogDebug | kLogSyslogErr,
239
                 "resource %s too large to store in memory (%" PRIu64 ")",
240
                 info->url->c_str(), length);
241
        info->error_code = kFailTooBig;
242
        return 0;
243
      }
244
102
      info->destination_mem.data = static_cast<char *>(smalloc(length));
245
    } else {
246
      // Empty resource
247
      info->destination_mem.data = NULL;
248
    }
249
102
    info->destination_mem.size = length;
250
531
  } else if (HasPrefix(header_line, "LOCATION:", true)) {
251
    // This comes along with redirects
252
    LogCvmfs(kLogDownload, kLogDebug, "%s", header_line.c_str());
253
  }
254
255
633
  return num_bytes;
256
}
257
258
259
/**
260
 * Called by curl for every received data chunk.
261
 */
262
208
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb,
263
                               void *info_link)
264
{
265
208
  const size_t num_bytes = size*nmemb;
266
208
  JobInfo *info = static_cast<JobInfo *>(info_link);
267
268
  // LogCvmfs(kLogDownload, kLogDebug, "Data callback,  %d bytes", num_bytes);
269
270
208
  if (num_bytes == 0)
271
    return 0;
272
273
208
  if (info->expected_hash)
274
134
    shash::Update((unsigned char *)ptr, num_bytes, info->hash_context);
275
276
208
  if (info->destination == kDestinationSink) {
277
77
    if (info->compressed) {
278
      zlib::StreamStates retval =
279
        zlib::DecompressZStream2Sink(ptr, num_bytes,
280
72
                                     &info->zstream, info->destination_sink);
281
72
      if (retval == zlib::kStreamDataError) {
282
        LogCvmfs(kLogDownload, kLogDebug, "failed to decompress %s",
283
                 info->url->c_str());
284
        info->error_code = kFailBadData;
285
        return 0;
286
72
      } else if (retval == zlib::kStreamIOError) {
287
        LogCvmfs(kLogDownload, kLogSyslogErr,
288
                 "decompressing %s, local IO error", info->url->c_str());
289
        info->error_code = kFailLocalIO;
290
        return 0;
291
      }
292
    } else {
293
5
      int64_t written = info->destination_sink->Write(ptr, num_bytes);
294

5
      if ((written < 0) || (static_cast<uint64_t>(written) != num_bytes)) {
295
        LogCvmfs(kLogDownload, kLogDebug, "Failed to perform write on %s (%"
296
                 PRId64 ")", info->url->c_str(), written);
297
        info->error_code = kFailLocalIO;
298
        return 0;
299
      }
300
    }
301
131
  } else if (info->destination == kDestinationMem) {
302
    // Write to memory
303
102
    if (info->destination_mem.pos + num_bytes > info->destination_mem.size) {
304
      if (info->destination_mem.size == 0) {
305
        LogCvmfs(kLogDownload, kLogDebug,
306
                 "Content-Length was missing or zero, but %zu bytes received",
307
                 info->destination_mem.pos + num_bytes);
308
      } else {
309
        LogCvmfs(kLogDownload, kLogDebug, "Callback had too much data: "
310
                 "start %zu, bytes %zu, expected %zu",
311
                 info->destination_mem.pos,
312
                 num_bytes,
313
                 info->destination_mem.size);
314
      }
315
      info->error_code = kFailBadData;
316
      return 0;
317
    }
318
    memcpy(info->destination_mem.data + info->destination_mem.pos,
319
102
           ptr, num_bytes);
320
102
    info->destination_mem.pos += num_bytes;
321
  } else {
322
    // Write to file
323
29
    if (info->compressed) {
324
      // LogCvmfs(kLogDownload, kLogDebug, "REMOVE-ME: writing %d bytes for %s",
325
      //          num_bytes, info->url->c_str());
326
      zlib::StreamStates retval =
327
        zlib::DecompressZStream2File(ptr, num_bytes,
328
26
                                     &info->zstream, info->destination_file);
329
26
      if (retval == zlib::kStreamDataError) {
330
        LogCvmfs(kLogDownload, kLogDebug, "failed to decompress %s",
331
                 info->url->c_str());
332
        info->error_code = kFailBadData;
333
        return 0;
334
26
      } else if (retval == zlib::kStreamIOError) {
335
        LogCvmfs(kLogDownload, kLogSyslogErr,
336
                 "decompressing %s, local IO error", info->url->c_str());
337
        info->error_code = kFailLocalIO;
338
        return 0;
339
      }
340
    } else {
341
3
      if (fwrite(ptr, 1, num_bytes, info->destination_file) != num_bytes) {
342
       LogCvmfs(kLogDownload, kLogDebug,
343
                 "downloading %s, IO failure: %s (errno=%d)",
344
                 info->url->c_str(), strerror(errno), errno);
345
        info->error_code = kFailLocalIO;
346
        return 0;
347
      }
348
    }
349
  }
350
351
208
  return num_bytes;
352
}
353
354
355
//------------------------------------------------------------------------------
356
357
358
const int DownloadManager::kProbeUnprobed = -1;
359
const int DownloadManager::kProbeDown     = -2;
360
const int DownloadManager::kProbeGeo      = -3;
361
const unsigned DownloadManager::kMaxMemSize = 1024*1024;
362
363
364
/**
365
 * -1 of digits is not a valid Http return code
366
 */
367
5
int DownloadManager::ParseHttpCode(const char digits[3]) {
368
5
  int result = 0;
369
5
  int factor = 100;
370
19
  for (int i = 0; i < 3; ++i) {
371

15
    if ((digits[i] < '0') || (digits[i] > '9'))
372
1
      return -1;
373
14
    result += (digits[i] - '0') * factor;
374
14
    factor /= 10;
375
  }
376
4
  return result;
377
}
378
379
380
/**
381
 * Called when new curl sockets arrive or existing curl sockets depart.
382
 */
383
int DownloadManager::CallbackCurlSocket(CURL *easy,
384
                                        curl_socket_t s,
385
                                        int action,
386
                                        void *userp,
387
                                        void *socketp)
388
{
389
  // LogCvmfs(kLogDownload, kLogDebug, "CallbackCurlSocket called with easy "
390
  //          "handle %p, socket %d, action %d", easy, s, action);
391
  DownloadManager *download_mgr = static_cast<DownloadManager *>(userp);
392
  if (action == CURL_POLL_NONE)
393
    return 0;
394
395
  // Find s in watch_fds_
396
  unsigned index;
397
  for (index = 0; index < download_mgr->watch_fds_inuse_; ++index) {
398
    if (download_mgr->watch_fds_[index].fd == s)
399
      break;
400
  }
401
  // Or create newly
402
  if (index == download_mgr->watch_fds_inuse_) {
403
    // Extend array if necessary
404
    if (download_mgr->watch_fds_inuse_ == download_mgr->watch_fds_size_)
405
    {
406
      download_mgr->watch_fds_size_ *= 2;
407
      download_mgr->watch_fds_ = static_cast<struct pollfd *>(
408
        srealloc(download_mgr->watch_fds_,
409
                 download_mgr->watch_fds_size_*sizeof(struct pollfd)));
410
    }
411
    download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].fd = s;
412
    download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].events = 0;
413
    download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].revents = 0;
414
    download_mgr->watch_fds_inuse_++;
415
  }
416
417
  switch (action) {
418
    case CURL_POLL_IN:
419
      download_mgr->watch_fds_[index].events = POLLIN | POLLPRI;
420
      break;
421
    case CURL_POLL_OUT:
422
      download_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND;
423
      break;
424
    case CURL_POLL_INOUT:
425
      download_mgr->watch_fds_[index].events =
426
        POLLIN | POLLPRI | POLLOUT | POLLWRBAND;
427
      break;
428
    case CURL_POLL_REMOVE:
429
      if (index < download_mgr->watch_fds_inuse_-1)
430
        download_mgr->watch_fds_[index] =
431
          download_mgr->watch_fds_[download_mgr->watch_fds_inuse_-1];
432
      download_mgr->watch_fds_inuse_--;
433
      // Shrink array if necessary
434
      if ((download_mgr->watch_fds_inuse_ > download_mgr->watch_fds_max_) &&
435
          (download_mgr->watch_fds_inuse_ < download_mgr->watch_fds_size_/2))
436
      {
437
        download_mgr->watch_fds_size_ /= 2;
438
        // LogCvmfs(kLogDownload, kLogDebug, "shrinking watch_fds_ (%d)",
439
        //          watch_fds_size_);
440
        download_mgr->watch_fds_ = static_cast<struct pollfd *>(
441
          srealloc(download_mgr->watch_fds_,
442
                   download_mgr->watch_fds_size_*sizeof(struct pollfd)));
443
        // LogCvmfs(kLogDownload, kLogDebug, "shrinking watch_fds_ done",
444
        //          watch_fds_size_);
445
      }
446
      break;
447
    default:
448
      break;
449
  }
450
451
  return 0;
452
}
453
454
455
/**
456
 * Worker thread event loop.  Waits on new JobInfo structs on a pipe.
457
 */
458
void *DownloadManager::MainDownload(void *data) {
459
  LogCvmfs(kLogDownload, kLogDebug, "download I/O thread started");
460
  DownloadManager *download_mgr = static_cast<DownloadManager *>(data);
461
462
  download_mgr->watch_fds_ =
463
    static_cast<struct pollfd *>(smalloc(2 * sizeof(struct pollfd)));
464
  download_mgr->watch_fds_size_ = 2;
465
  download_mgr->watch_fds_[0].fd = download_mgr->pipe_terminate_[0];
466
  download_mgr->watch_fds_[0].events = POLLIN | POLLPRI;
467
  download_mgr->watch_fds_[0].revents = 0;
468
  download_mgr->watch_fds_[1].fd = download_mgr->pipe_jobs_[0];
469
  download_mgr->watch_fds_[1].events = POLLIN | POLLPRI;
470
  download_mgr->watch_fds_[1].revents = 0;
471
  download_mgr->watch_fds_inuse_ = 2;
472
473
  int still_running = 0;
474
  struct timeval timeval_start, timeval_stop;
475
  gettimeofday(&timeval_start, NULL);
476
  while (true) {
477
    int timeout;
478
    if (still_running) {
479
      /* NOTE: The following might degrade the performance for many small files
480
       * use case. TODO(jblomer): look into it.
481
      // Specify a timeout for polling in ms; this allows us to return
482
      // to libcurl once a second so it can look for internal operations
483
      // which timed out.  libcurl has a more elaborate mechanism
484
      // (CURLMOPT_TIMERFUNCTION) that would inform us of the next potential
485
      // timeout.  TODO(bbockelm) we should switch to that in the future.
486
      timeout = 100;
487
      */
488
      timeout = 1;
489
    } else {
490
      timeout = -1;
491
      gettimeofday(&timeval_stop, NULL);
492
      int64_t delta = static_cast<int64_t>(
493
        1000 * DiffTimeSeconds(timeval_start, timeval_stop));
494
      perf::Xadd(download_mgr->counters_->sz_transfer_time, delta);
495
    }
496
    int retval = poll(download_mgr->watch_fds_, download_mgr->watch_fds_inuse_,
497
                      timeout);
498
    if (retval < 0) {
499
      continue;
500
    }
501
502
    // Handle timeout
503
    if (retval == 0) {
504
      retval = curl_multi_socket_action(download_mgr->curl_multi_,
505
                                        CURL_SOCKET_TIMEOUT,
506
                                        0,
507
                                        &still_running);
508
    }
509
510
    // Terminate I/O thread
511
    if (download_mgr->watch_fds_[0].revents)
512
      break;
513
514
    // New job arrives
515
    if (download_mgr->watch_fds_[1].revents) {
516
      download_mgr->watch_fds_[1].revents = 0;
517
      JobInfo *info;
518
      ReadPipe(download_mgr->pipe_jobs_[0], &info, sizeof(info));
519
      if (!still_running)
520
        gettimeofday(&timeval_start, NULL);
521
      CURL *handle = download_mgr->AcquireCurlHandle();
522
      download_mgr->InitializeRequest(info, handle);
523
      download_mgr->SetUrlOptions(info);
524
      curl_multi_add_handle(download_mgr->curl_multi_, handle);
525
      retval = curl_multi_socket_action(download_mgr->curl_multi_,
526
                                        CURL_SOCKET_TIMEOUT,
527
                                        0,
528
                                        &still_running);
529
    }
530
531
    // Activity on curl sockets
532
    // Within this loop the curl_multi_socket_action() may cause socket(s)
533
    // to be removed from watch_fds_. If a socket is removed it is replaced
534
    // by the socket at the end of the array and the inuse count is decreased.
535
    // Therefore loop over the array in reverse order.
536
    for (int64_t i = download_mgr->watch_fds_inuse_-1; i >= 2; --i) {
537
      if (i >= download_mgr->watch_fds_inuse_) {
538
        continue;
539
      }
540
      if (download_mgr->watch_fds_[i].revents) {
541
        int ev_bitmask = 0;
542
        if (download_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI))
543
          ev_bitmask |= CURL_CSELECT_IN;
544
        if (download_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
545
          ev_bitmask |= CURL_CSELECT_OUT;
546
        if (download_mgr->watch_fds_[i].revents &
547
            (POLLERR | POLLHUP | POLLNVAL))
548
        {
549
          ev_bitmask |= CURL_CSELECT_ERR;
550
        }
551
        download_mgr->watch_fds_[i].revents = 0;
552
553
        retval = curl_multi_socket_action(download_mgr->curl_multi_,
554
                                          download_mgr->watch_fds_[i].fd,
555
                                          ev_bitmask,
556
                                          &still_running);
557
      }
558
    }
559
560
    // Check if transfers are completed
561
    CURLMsg *curl_msg;
562
    int msgs_in_queue;
563
    while ((curl_msg = curl_multi_info_read(download_mgr->curl_multi_,
564
                                            &msgs_in_queue)))
565
    {
566
      if (curl_msg->msg == CURLMSG_DONE) {
567
        perf::Inc(download_mgr->counters_->n_requests);
568
        JobInfo *info;
569
        CURL *easy_handle = curl_msg->easy_handle;
570
        int curl_error = curl_msg->data.result;
571
        curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
572
573
        curl_multi_remove_handle(download_mgr->curl_multi_, easy_handle);
574
        if (download_mgr->VerifyAndFinalize(curl_error, info)) {
575
          curl_multi_add_handle(download_mgr->curl_multi_, easy_handle);
576
          retval = curl_multi_socket_action(download_mgr->curl_multi_,
577
                                            CURL_SOCKET_TIMEOUT,
578
                                            0,
579
                                            &still_running);
580
        } else {
581
          // Return easy handle into pool and write result back
582
          download_mgr->ReleaseCurlHandle(easy_handle);
583
584
          WritePipe(info->wait_at[1], &info->error_code,
585
                    sizeof(info->error_code));
586
        }
587
      }
588
    }
589
  }
590
591
  for (set<CURL *>::iterator i = download_mgr->pool_handles_inuse_->begin(),
592
       iEnd = download_mgr->pool_handles_inuse_->end(); i != iEnd; ++i)
593
  {
594
    curl_multi_remove_handle(download_mgr->curl_multi_, *i);
595
    curl_easy_cleanup(*i);
596
  }
597
  download_mgr->pool_handles_inuse_->clear();
598
  free(download_mgr->watch_fds_);
599
600
  LogCvmfs(kLogDownload, kLogDebug, "download I/O thread terminated");
601
  return NULL;
602
}
603
604
605
//------------------------------------------------------------------------------
606
607
608
207
HeaderLists::~HeaderLists() {
609
418
  for (unsigned i = 0; i < blocks_.size(); ++i) {
610
211
    delete[] blocks_[i];
611
  }
612
207
  blocks_.clear();
613
}
614
615
616
2518
curl_slist *HeaderLists::GetList(const char *header) {
617
2518
  return Get(header);
618
}
619
620
621
242
curl_slist *HeaderLists::DuplicateList(curl_slist *slist) {
622
242
  assert(slist);
623
242
  curl_slist *copy = GetList(slist->data);
624
242
  copy->next = slist->next;
625
242
  curl_slist *prev = copy;
626
242
  slist = slist->next;
627
968
  while (slist) {
628
484
    curl_slist *new_link = Get(slist->data);
629
484
    new_link->next = slist->next;
630
484
    prev->next = new_link;
631
484
    prev = new_link;
632
484
    slist = slist->next;
633
  }
634
242
  return copy;
635
}
636
637
638
434
void HeaderLists::AppendHeader(curl_slist *slist, const char *header) {
639
434
  assert(slist);
640
434
  curl_slist *new_link = Get(header);
641
434
  new_link->next = NULL;
642
643
1117
  while (slist->next)
644
249
    slist = slist->next;
645
434
  slist->next = new_link;
646
434
}
647
648
649
/**
650
 * Ensures that a certain header string is _not_ part of slist on return.
651
 * Note that if the first header element matches, the returned slist points
652
 * to a different value.
653
 */
654
20
void HeaderLists::CutHeader(const char *header, curl_slist **slist) {
655
20
  assert(slist);
656
  curl_slist head;
657
20
  head.next = *slist;
658
20
  curl_slist *prev = &head;
659
20
  curl_slist *rover = *slist;
660
76
  while (rover) {
661
36
    if (strcmp(rover->data, header) == 0) {
662
16
      prev->next = rover->next;
663
16
      Put(rover);
664
16
      rover = prev;
665
    }
666
36
    prev = rover;
667
36
    rover = rover->next;
668
  }
669
20
  *slist = head.next;
670
20
}
671
672
673
1282
void HeaderLists::PutList(curl_slist *slist) {
674
4356
  while (slist) {
675
1792
    curl_slist *next = slist->next;
676
1792
    Put(slist);
677
1792
    slist = next;
678
  }
679
1282
}
680
681
682
16
string HeaderLists::Print(curl_slist *slist) {
683
16
  string verbose;
684
64
  while (slist) {
685
32
    verbose += string(slist->data) + "\n";
686
32
    slist = slist->next;
687
  }
688
16
  return verbose;
689
}
690
691
692
3436
curl_slist *HeaderLists::Get(const char *header) {
693
3440
  for (unsigned i = 0; i < blocks_.size(); ++i) {
694
270005
    for (unsigned j = 0; j < kBlockSize; ++j) {
695
270001
      if (!IsUsed(&(blocks_[i][j]))) {
696
3220
        blocks_[i][j].data = const_cast<char *>(header);
697
3220
        return &(blocks_[i][j]);
698
      }
699
    }
700
  }
701
702
  // All used, new block
703
216
  AddBlock();
704
216
  blocks_[blocks_.size()-1][0].data = const_cast<char *>(header);
705
216
  return &(blocks_[blocks_.size()-1][0]);
706
}
707
708
709
57104
void HeaderLists::Put(curl_slist *slist) {
710
57104
  slist->data = NULL;
711
57104
  slist->next = NULL;
712
57104
}
713
714
715
216
void HeaderLists::AddBlock() {
716
216
  curl_slist *new_block = new curl_slist[kBlockSize];
717
55512
  for (unsigned i = 0; i < kBlockSize; ++i) {
718
55296
    Put(&new_block[i]);
719
  }
720
216
  blocks_.push_back(new_block);
721
216
}
722
723
724
//------------------------------------------------------------------------------
725
726
727
string DownloadManager::ProxyInfo::Print() {
728
  if (url == "DIRECT")
729
    return url;
730
731
  string result = url;
732
  int remaining =
733
    static_cast<int>(host.deadline()) - static_cast<int>(time(NULL));
734
  string expinfo = (remaining >= 0) ? "+" : "";
735
  if (abs(remaining) >= 3600) {
736
    expinfo += StringifyInt(remaining/3600) + "h";
737
  } else if (abs(remaining) >= 60) {
738
    expinfo += StringifyInt(remaining/60) + "m";
739
  } else {
740
    expinfo += StringifyInt(remaining) + "s";
741
  }
742
  if (host.status() == dns::kFailOk) {
743
    result += " (" + host.name() + ", " + expinfo + ")";
744
  } else {
745
    result += " (:unresolved:, " + expinfo + ")";
746
  }
747
  return result;
748
}
749
750
751
/**
752
 * Gets an idle CURL handle from the pool. Creates a new one and adds it to
753
 * the pool if necessary.
754
 */
755
242
CURL *DownloadManager::AcquireCurlHandle() {
756
  CURL *handle;
757
758
242
  if (pool_handles_idle_->empty()) {
759
    // Create a new handle
760
90
    handle = curl_easy_init();
761
90
    assert(handle != NULL);
762
763
90
    curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
764
    // curl_easy_setopt(curl_default, CURLOPT_FAILONERROR, 1);
765
90
    curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION, CallbackCurlHeader);
766
90
    curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlData);
767
  } else {
768
152
    handle = *(pool_handles_idle_->begin());
769
152
    pool_handles_idle_->erase(pool_handles_idle_->begin());
770
  }
771
772
242
  pool_handles_inuse_->insert(handle);
773
774
242
  return handle;
775
}
776
777
778
242
void DownloadManager::ReleaseCurlHandle(CURL *handle) {
779
242
  set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
780
242
  assert(elem != pool_handles_inuse_->end());
781
782
242
  if (pool_handles_idle_->size() > pool_max_handles_)
783
    curl_easy_cleanup(*elem);
784
  else
785
242
    pool_handles_idle_->insert(*elem);
786
787
242
  pool_handles_inuse_->erase(elem);
788
242
}
789
790
791
/**
792
 * HTTP request options: set the URL and other options such as timeout and
793
 * proxy.
794
 */
795
242
void DownloadManager::InitializeRequest(JobInfo *info, CURL *handle) {
796
  // Initialize internal download state
797
242
  info->curl_handle = handle;
798
242
  info->error_code = kFailOk;
799
242
  info->http_code = -1;
800
242
  info->follow_redirects = follow_redirects_;
801
242
  info->num_used_proxies = 1;
802
242
  info->num_used_hosts = 1;
803
242
  info->num_retries = 0;
804
242
  info->backoff_ms = 0;
805
242
  info->headers = header_lists_->DuplicateList(default_headers_);
806
242
  if (info->info_header) {
807
    header_lists_->AppendHeader(info->headers, info->info_header);
808
  }
809
242
  if (info->force_nocache) {
810
3
    SetNocache(info);
811
  } else {
812
239
    info->nocache = false;
813
  }
814
242
  if (info->compressed) {
815
158
    zlib::DecompressInit(&(info->zstream));
816
  }
817
242
  if (info->expected_hash) {
818
162
    assert(info->hash_context.buffer != NULL);
819
162
    shash::Init(info->hash_context);
820
  }
821
822

242
  if ((info->range_offset != -1) && (info->range_size)) {
823
    char byte_range_array[100];
824
    const int64_t range_lower = static_cast<int64_t>(info->range_offset);
825
    const int64_t range_upper = static_cast<int64_t>(
826
      info->range_offset + info->range_size - 1);
827
    if (snprintf(byte_range_array, sizeof(byte_range_array),
828
                 "%" PRId64 "-%" PRId64,
829
                 range_lower, range_upper) == 100)
830
    {
831
      abort();  // Should be impossible given limits on offset size.
832
    }
833
    curl_easy_setopt(handle, CURLOPT_RANGE, byte_range_array);
834
  } else {
835
242
    curl_easy_setopt(handle, CURLOPT_RANGE, NULL);
836
  }
837
838
  // Set curl parameters
839
242
  curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
840
  curl_easy_setopt(handle, CURLOPT_WRITEHEADER,
841
242
                   static_cast<void *>(info));
842
242
  curl_easy_setopt(handle, CURLOPT_WRITEDATA, static_cast<void *>(info));
843
242
  curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->headers);
844
242
  if (info->head_request)
845
    curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
846
  else
847
242
    curl_easy_setopt(handle, CURLOPT_HTTPGET, 1);
848
242
  if (opt_ipv4_only_)
849
    curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
850
242
  if (follow_redirects_) {
851
18
    curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1);
852
18
    curl_easy_setopt(handle, CURLOPT_MAXREDIRS, 4);
853
  }
854
242
}
855
856
857
/**
858
 * Sets the URL specific options such as host to use and timeout.  It might also
859
 * set an error code, in which case the further processing should react on.
860
 */
861
242
void DownloadManager::SetUrlOptions(JobInfo *info) {
862
242
  CURL *curl_handle = info->curl_handle;
863
242
  string url_prefix;
864
865
242
  pthread_mutex_lock(lock_options_);
866
  // Check if proxy group needs to be reset from backup to primary
867
242
  if (opt_timestamp_backup_proxies_ > 0) {
868
    const time_t now = time(NULL);
869
    if (static_cast<int64_t>(now) >
870
        static_cast<int64_t>(opt_timestamp_backup_proxies_ +
871
                             opt_proxy_groups_reset_after_))
872
    {
873
      string old_proxy;
874
      if (opt_proxy_groups_)
875
        old_proxy = (*opt_proxy_groups_)[opt_proxy_groups_current_][0].url;
876
877
      opt_proxy_groups_current_ = 0;
878
      RebalanceProxiesUnlocked();
879
      opt_timestamp_backup_proxies_ = 0;
880
881
      if (opt_proxy_groups_) {
882
        LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
883
                 "switching proxy from %s to %s (reset proxy group)",
884
                 old_proxy.c_str(), (*opt_proxy_groups_)[0][0].url.c_str());
885
      }
886
    }
887
  }
888
  // Check if load-balanced proxies within the group need to be reset
889
242
  if (opt_timestamp_failover_proxies_ > 0) {
890
    const time_t now = time(NULL);
891
    if (static_cast<int64_t>(now) >
892
        static_cast<int64_t>(opt_timestamp_failover_proxies_ +
893
                             opt_proxy_groups_reset_after_))
894
    {
895
      string old_proxy;
896
      if (opt_proxy_groups_)
897
        old_proxy = (*opt_proxy_groups_)[opt_proxy_groups_current_][0].url;
898
      RebalanceProxiesUnlocked();
899
      if (opt_proxy_groups_ && (old_proxy != (*opt_proxy_groups_)[0][0].url)) {
900
        LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
901
                 "switching proxy from %s to %s (reset load-balanced proxies)",
902
                 old_proxy.c_str(), (*opt_proxy_groups_)[0][0].url.c_str());
903
      }
904
    }
905
  }
906
  // Check if host needs to be reset
907
242
  if (opt_timestamp_backup_host_ > 0) {
908
    const time_t now = time(NULL);
909
    if (static_cast<int64_t>(now) >
910
        static_cast<int64_t>(opt_timestamp_backup_host_ +
911
                             opt_host_reset_after_))
912
    {
913
      LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
914
               "switching host from %s to %s (reset host)",
915
               (*opt_host_chain_)[opt_host_chain_current_].c_str(),
916
               (*opt_host_chain_)[0].c_str());
917
      opt_host_chain_current_ = 0;
918
      opt_timestamp_backup_host_ = 0;
919
    }
920
  }
921
922

242
  if (!opt_proxy_groups_ ||
923
      ((*opt_proxy_groups_)[opt_proxy_groups_current_][0].url == "DIRECT"))
924
  {
925
242
    info->proxy = "DIRECT";
926
242
    curl_easy_setopt(info->curl_handle, CURLOPT_PROXY, "");
927
  } else {
928
    ProxyInfo proxy = (*opt_proxy_groups_)[opt_proxy_groups_current_][0];
929
    ValidateProxyIpsUnlocked(proxy.url, proxy.host);
930
    ProxyInfo *proxy_ptr =
931
      &((*opt_proxy_groups_)[opt_proxy_groups_current_][0]);
932
    info->proxy = proxy_ptr->url;
933
    if (proxy_ptr->host.status() == dns::kFailOk) {
934
      curl_easy_setopt(info->curl_handle, CURLOPT_PROXY, info->proxy.c_str());
935
    } else {
936
      // We know it can't work, don't even try to download
937
      curl_easy_setopt(info->curl_handle, CURLOPT_PROXY, "0.0.0.0");
938
    }
939
  }
940
242
  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, opt_low_speed_limit_);
941
242
  if (info->proxy != "DIRECT") {
942
    curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_proxy_);
943
    curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_proxy_);
944
  } else {
945
242
    curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_direct_);
946
242
    curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_direct_);
947
  }
948
242
  if (!opt_dns_server_.empty())
949
    curl_easy_setopt(curl_handle, CURLOPT_DNS_SERVERS, opt_dns_server_.c_str());
950
951

242
  if (info->probe_hosts && opt_host_chain_)
952
180
    url_prefix = (*opt_host_chain_)[opt_host_chain_current_];
953
954
242
  string url = url_prefix + *(info->url);
955
956
242
  curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 1L);
957
242
  if (url.substr(0, 5) == "https") {
958
    const char *cadir = getenv("X509_CERT_DIR");
959
    if (!cadir || !*cadir) {cadir = "/etc/grid-security/certificates";}
960
    curl_easy_setopt(curl_handle, CURLOPT_CAPATH, cadir);
961
    const char *cabundle = getenv("X509_CERT_BUNDLE");
962
    if (cabundle && *cabundle) {
963
      curl_easy_setopt(curl_handle, CURLOPT_CAINFO, cabundle);
964
    }
965
    if (info->pid != -1) {
966
      if (credentials_attachment_ == NULL) {
967
        LogCvmfs(kLogDownload, kLogDebug,
968
                 "uses secure downloads but no credentials attachment set");
969
      } else {
970
        bool retval = credentials_attachment_->ConfigureCurlHandle(
971
          curl_handle, info->pid, &info->cred_data);
972
        if (!retval) {
973
          LogCvmfs(kLogDownload, kLogDebug, "failed attaching credentials");
974
        }
975
      }
976
    }
977
    // The download manager disables signal handling in the curl library;
978
    // as OpenSSL's implementation of TLS will generate a sigpipe in some
979
    // error paths, we must explicitly disable SIGPIPE here.
980
    // TODO(jblomer): it should be enough to do this once
981
    signal(SIGPIPE, SIG_IGN);
982
  }
983
984
242
  if (url.find("@proxy@") != string::npos) {
985
    // This is used in Geo-API requests (only), to replace a portion of the
986
    // URL with the current proxy name for the sake of caching the result.
987
    // Replace the @proxy@ either with a passed in "forced" template (which
988
    // is set from $CVMFS_PROXY_TEMPLATE) if there is one, or a "direct"
989
    // template (which is the uuid) if there's no proxy, or the name of the
990
    // proxy.
991
    string replacement;
992
    if (proxy_template_forced_ != "") {
993
      replacement = proxy_template_forced_;
994
    } else if (info->proxy == "DIRECT") {
995
      replacement = proxy_template_direct_;
996
    } else {
997
      if (opt_proxy_groups_current_ >= opt_proxy_groups_fallback_) {
998
        // It doesn't make sense to use the fallback proxies in Geo-API requests
999
        // since the fallback proxies are supposed to get sorted, too.
1000
        info->proxy = "DIRECT";
1001
        curl_easy_setopt(info->curl_handle, CURLOPT_PROXY, "");
1002
        replacement = proxy_template_direct_;
1003
      } else {
1004
        replacement =
1005
          (*opt_proxy_groups_)[opt_proxy_groups_current_][0].host.name();
1006
      }
1007
    }
1008
    replacement = (replacement == "") ? proxy_template_direct_ : replacement;
1009
    LogCvmfs(kLogDownload, kLogDebug, "replacing @proxy@ by %s",
1010
             replacement.c_str());
1011
    url = ReplaceAll(url, "@proxy@", replacement);
1012
  }
1013
242
  pthread_mutex_unlock(lock_options_);
1014
1015




242
  if ((info->destination == kDestinationMem) &&
1016
      (info->destination_mem.size == 0) &&
1017
      HasPrefix(url, "file://", false))
1018
  {
1019
104
    info->destination_mem.size = 64*1024;
1020
104
    info->destination_mem.data = static_cast<char *>(smalloc(64*1024));
1021
  }
1022
1023
242
  curl_easy_setopt(curl_handle, CURLOPT_URL, EscapeUrl(url).c_str());
1024
242
}
1025
1026
1027
/**
1028
 * Checks if the name resolving information is still up to date.  The host
1029
 * object should be one from the current load-balance group.  If the information
1030
 * changed, gather new set of resolved IPs and, if different, exchange them in
1031
 * the load-balance group on the fly.  In the latter case, also rebalance the
1032
 * proxies.  The options mutex needs to be open.
1033
 */
1034
void DownloadManager::ValidateProxyIpsUnlocked(
1035
  const string &url,
1036
  const dns::Host &host)
1037
{
1038
  if (!host.IsExpired())
1039
    return;
1040
  LogCvmfs(kLogDownload, kLogDebug, "validate DNS entry for %s",
1041
           host.name().c_str());
1042
1043
  unsigned group_idx = opt_proxy_groups_current_;
1044
  dns::Host new_host = resolver_->Resolve(host.name());
1045
1046
  bool update_only = true;  // No changes to the list of IP addresses.
1047
  if (new_host.status() != dns::kFailOk) {
1048
    // Try again later in case resolving fails.
1049
    LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
1050
             "failed to resolve IP addresses for %s (%d - %s)",
1051
             host.name().c_str(), new_host.status(),
1052
             dns::Code2Ascii(new_host.status()));
1053
    new_host = dns::Host::ExtendDeadline(host, resolver_->min_ttl());
1054
  } else if (!host.IsEquivalent(new_host)) {
1055
    update_only = false;
1056
  }
1057
1058
  if (update_only) {
1059
    for (unsigned i = 0; i < (*opt_proxy_groups_)[group_idx].size(); ++i) {
1060
      if ((*opt_proxy_groups_)[group_idx][i].host.id() == host.id())
1061
        (*opt_proxy_groups_)[group_idx][i].host = new_host;
1062
    }
1063
    return;
1064
  }
1065
1066
  assert(new_host.status() == dns::kFailOk);
1067
1068
  // Remove old host objects, insert new objects, and rebalance.
1069
  LogCvmfs(kLogDownload, kLogDebug | kLogSyslog,
1070
           "DNS entries for proxy %s changed, adjusting", host.name().c_str());
1071
  vector<ProxyInfo> *group = &((*opt_proxy_groups_)[opt_proxy_groups_current_]);
1072
  opt_num_proxies_ -= group->size();
1073
  for (unsigned i = 0; i < group->size(); ) {
1074
    if ((*group)[i].host.id() == host.id()) {
1075
      group->erase(group->begin() + i);
1076
    } else {
1077
      i++;
1078
    }
1079
  }
1080
  vector<ProxyInfo> new_infos;
1081
  set<string> best_addresses = new_host.ViewBestAddresses(opt_ip_preference_);
1082
  set<string>::const_iterator iter_ips = best_addresses.begin();
1083
  for (; iter_ips != best_addresses.end(); ++iter_ips) {
1084
    string url_ip = dns::RewriteUrl(url, *iter_ips);
1085
    new_infos.push_back(ProxyInfo(new_host, url_ip));
1086
  }
1087
  group->insert(group->end(), new_infos.begin(), new_infos.end());
1088
  opt_num_proxies_ += new_infos.size();
1089
1090
  RebalanceProxiesUnlocked();
1091
}
1092
1093
1094
/**
1095
 * Adds transfer time and downloaded bytes to the global counters.
1096
 */
1097
248
void DownloadManager::UpdateStatistics(CURL *handle) {
1098
  double val;
1099
  int retval;
1100
248
  int64_t sum = 0;
1101
1102
248
  retval = curl_easy_getinfo(handle, CURLINFO_SIZE_DOWNLOAD, &val);
1103
248
  assert(retval == CURLE_OK);
1104
248
  sum += static_cast<int64_t>(val);
1105
  /*retval = curl_easy_getinfo(handle, CURLINFO_HEADER_SIZE, &val);
1106
  assert(retval == CURLE_OK);
1107
  sum += static_cast<int64_t>(val);*/
1108
248
  perf::Xadd(counters_->sz_transferred_bytes, sum);
1109
248
}
1110
1111
1112
/**
1113
 * Retry if possible if not on no-cache and if not already done too often.
1114
 */
1115
248
bool DownloadManager::CanRetry(const JobInfo *info) {
1116
248
  pthread_mutex_lock(lock_options_);
1117
248
  unsigned max_retries = opt_max_retries_;
1118
248
  pthread_mutex_unlock(lock_options_);
1119
1120
  return !info->nocache && (info->num_retries < max_retries) &&
1121
    (IsProxyTransferError(info->error_code) ||
1122


248
     IsHostTransferError(info->error_code));
1123
}
1124
1125
1126
/**
1127
 * Backoff for retry to introduce a jitter into a cluster of requesting
1128
 * cvmfs nodes.
1129
 * Retry only when HTTP caching is on.
1130
 *
1131
 * \return true if backoff has been performed, false otherwise
1132
 */
1133
4
void DownloadManager::Backoff(JobInfo *info) {
1134
4
  pthread_mutex_lock(lock_options_);
1135
4
  unsigned backoff_init_ms = opt_backoff_init_ms_;
1136
4
  unsigned backoff_max_ms = opt_backoff_max_ms_;
1137
4
  pthread_mutex_unlock(lock_options_);
1138
1139
4
  info->num_retries++;
1140
4
  perf::Inc(counters_->n_retries);
1141
4
  if (info->backoff_ms == 0) {
1142
2
    info->backoff_ms = prng_.Next(backoff_init_ms + 1);  // Must be != 0
1143
  } else {
1144
2
    info->backoff_ms *= 2;
1145
  }
1146
4
  if (info->backoff_ms > backoff_max_ms)
1147
    info->backoff_ms = backoff_max_ms;
1148
1149
4
  LogCvmfs(kLogDownload, kLogDebug, "backing off for %d ms", info->backoff_ms);
1150
4
  SafeSleepMs(info->backoff_ms);
1151
4
}
1152
1153
1154
5
void DownloadManager::SetNocache(JobInfo *info) {
1155
5
  if (info->nocache)
1156
    return;
1157
5
  header_lists_->AppendHeader(info->headers, "Pragma: no-cache");
1158
5
  header_lists_->AppendHeader(info->headers, "Cache-Control: no-cache");
1159
5
  curl_easy_setopt(info->curl_handle, CURLOPT_HTTPHEADER, info->headers);
1160
5
  info->nocache = true;
1161
}
1162
1163
1164
/**
1165
 * Reverse operation of SetNocache. Makes sure that "no-cache" header
1166
 * disappears from the list of headers to let proxies work normally.
1167
 */
1168
6
void DownloadManager::SetRegularCache(JobInfo *info) {
1169
6
  if (info->nocache == false)
1170
6
    return;
1171
  header_lists_->CutHeader("Pragma: no-cache", &(info->headers));
1172
  header_lists_->CutHeader("Cache-Control: no-cache", &(info->headers));
1173
  curl_easy_setopt(info->curl_handle, CURLOPT_HTTPHEADER, info->headers);
1174
  info->nocache = false;
1175
}
1176
1177
1178
/**
1179
 * Frees the storage associated with the authz attachment from the job
1180
 */
1181
242
void DownloadManager::ReleaseCredential(JobInfo *info) {
1182
242
  if (info->cred_data) {
1183
    assert(credentials_attachment_ != NULL);  // Someone must have set it
1184
    credentials_attachment_->ReleaseCurlHandle(info->curl_handle,
1185
                                               info->cred_data);
1186
    info->cred_data = NULL;
1187
  }
1188
242
}
1189
1190
1191
/**
1192
 * Checks the result of a curl download and implements the failure logic, such
1193
 * as changing the proxy server.  Takes care of cleanup.
1194
 *
1195
 * \return true if another download should be performed, false otherwise
1196
 */
1197
248
bool DownloadManager::VerifyAndFinalize(const int curl_error, JobInfo *info) {
1198
  LogCvmfs(kLogDownload, kLogDebug,
1199
           "Verify downloaded url %s, proxy %s (curl error %d)",
1200
248
           info->url->c_str(), info->proxy.c_str(), curl_error);
1201
248
  UpdateStatistics(info->curl_handle);
1202
1203
  // Verification and error classification
1204



248
  switch (curl_error) {
1205
    case CURLE_OK:
1206
      // Verify content hash
1207
211
      if (info->expected_hash) {
1208
134
        shash::Any match_hash;
1209
134
        shash::Final(info->hash_context, &match_hash);
1210
134
        if (match_hash != *(info->expected_hash)) {
1211
          LogCvmfs(kLogDownload, kLogDebug,
1212
                   "hash verification of %s failed (expected %s, got %s)",
1213
                   info->url->c_str(), info->expected_hash->ToString().c_str(),
1214
4
                   match_hash.ToString().c_str());
1215
4
          info->error_code = kFailBadData;
1216
4
          break;
1217
        }
1218
      }
1219
1220
      // Decompress memory in a single run
1221

207
      if ((info->destination == kDestinationMem) && info->compressed) {
1222
        void *buf;
1223
        uint64_t size;
1224
        bool retval = zlib::DecompressMem2Mem(info->destination_mem.data,
1225
                                              info->destination_mem.pos,
1226
32
                                              &buf, &size);
1227
32
        if (retval) {
1228
32
          free(info->destination_mem.data);
1229
32
          info->destination_mem.data = static_cast<char *>(buf);
1230
32
          info->destination_mem.pos = info->destination_mem.size = size;
1231
        } else {
1232
          LogCvmfs(kLogDownload, kLogDebug,
1233
                   "decompression (memory) of url %s failed",
1234
                   info->url->c_str());
1235
          info->error_code = kFailBadData;
1236
          break;
1237
        }
1238
      }
1239
1240
207
      info->error_code = kFailOk;
1241
207
      break;
1242
    case CURLE_UNSUPPORTED_PROTOCOL:
1243
      info->error_code = kFailUnsupportedProtocol;
1244
      break;
1245
    case CURLE_URL_MALFORMAT:
1246
      info->error_code = kFailBadUrl;
1247
      break;
1248
    case CURLE_COULDNT_RESOLVE_PROXY:
1249
      info->error_code = kFailProxyResolve;
1250
      break;
1251
    case CURLE_COULDNT_RESOLVE_HOST:
1252
      info->error_code = kFailHostResolve;
1253
      break;
1254
    case CURLE_OPERATION_TIMEDOUT:
1255
      info->error_code = (info->proxy == "DIRECT") ?
1256
                         kFailHostTooSlow : kFailProxyTooSlow;
1257
      break;
1258
    case CURLE_PARTIAL_FILE:
1259
    case CURLE_GOT_NOTHING:
1260
    case CURLE_RECV_ERROR:
1261
      info->error_code = (info->proxy == "DIRECT") ?
1262
                         kFailHostShortTransfer : kFailProxyShortTransfer;
1263
      break;
1264
    case CURLE_FILE_COULDNT_READ_FILE:
1265
    case CURLE_COULDNT_CONNECT:
1266
37
      if (info->proxy != "DIRECT")
1267
        // This is a guess.  Fail-over can still change to switching host
1268
        info->error_code = kFailProxyConnection;
1269
      else
1270
37
        info->error_code = kFailHostConnection;
1271
37
      break;
1272
    case CURLE_TOO_MANY_REDIRECTS:
1273
      info->error_code = kFailHostConnection;
1274
      break;
1275
    case CURLE_SSL_CACERT_BADFILE:
1276
      LogCvmfs(kLogDownload, kLogDebug | kLogSyslogErr,
1277
               "Failed to load certificate bundle. "
1278
               "X509_CERT_BUNDLE might point to the wrong location.");
1279
      info->error_code = kFailHostConnection;
1280
      break;
1281
    // As of curl 7.62.0, CURLE_SSL_CACERT is the same as
1282
    // CURLE_PEER_FAILED_VERIFICATION
1283
    case CURLE_PEER_FAILED_VERIFICATION:
1284
      LogCvmfs(kLogDownload, kLogDebug | kLogSyslogErr,
1285
               "invalid SSL certificate of remote host. "
1286
               "X509_CERT_DIR and/or X509_CERT_BUNDLE might point to the wrong "
1287
               "location.");
1288
      info->error_code = kFailHostConnection;
1289
      break;
1290
    case CURLE_ABORTED_BY_CALLBACK:
1291
    case CURLE_WRITE_ERROR:
1292
      // Error set by callback
1293
      break;
1294
    default:
1295
      LogCvmfs(kLogDownload, kLogSyslogErr, "unexpected curl error (%d) while "
1296
               "trying to fetch %s", curl_error, info->url->c_str());
1297
      info->error_code = kFailOther;
1298
      break;
1299
  }
1300
1301
248
  std::vector<std::string> *host_chain = opt_host_chain_;
1302
1303
  // Determination if download should be repeated
1304
248
  bool try_again = false;
1305
248
  bool same_url_retry = CanRetry(info);
1306
248
  if (info->error_code != kFailOk) {
1307
41
    pthread_mutex_lock(lock_options_);
1308
41
    if (info->error_code == kFailBadData) {
1309
4
      if (!info->nocache) {
1310
2
        try_again = true;
1311
      } else {
1312
        // Make it a host failure
1313
        LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
1314
2
                 "data corruption with no-cache header, try another host");
1315
1316
2
        info->error_code = kFailHostHttp;
1317
      }
1318
    }
1319




41
    if ( same_url_retry || (
1320
         ( (info->error_code == kFailHostResolve) ||
1321
           IsHostTransferError(info->error_code) ||
1322
           (info->error_code == kFailHostHttp)) &&
1323
         info->probe_hosts &&
1324
         host_chain && (info->num_used_hosts < host_chain->size()))
1325
       )
1326
    {
1327
4
      try_again = true;
1328
    }
1329


41
    if ( same_url_retry || (
1330
         ( (info->error_code == kFailProxyResolve) ||
1331
           IsProxyTransferError(info->error_code) ||
1332
           (info->error_code == kFailProxyHttp)) )
1333
       )
1334
    {
1335
4
      try_again = true;
1336
      // If all proxies failed, do a next round with the next host
1337

4
      if (!same_url_retry && (info->num_used_proxies >= opt_num_proxies_)) {
1338
        // Check if this can be made a host fail-over
1339
        if (info->probe_hosts &&
1340
            host_chain &&
1341
            (info->num_used_hosts < host_chain->size()))
1342
        {
1343
          // reset proxy group if not already performed by other handle
1344
          if (opt_proxy_groups_) {
1345
            if ((opt_proxy_groups_current_ > 0) ||
1346
                (opt_proxy_groups_current_burned_ > 1))
1347
            {
1348
              string old_proxy;
1349
              old_proxy =
1350
                (*opt_proxy_groups_)[opt_proxy_groups_current_][0].url;
1351
              opt_proxy_groups_current_ = 0;
1352
              RebalanceProxiesUnlocked();
1353
              opt_timestamp_backup_proxies_ = 0;
1354
              if (opt_proxy_groups_) {
1355
                LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
1356
                         "switching proxy from %s to %s "
1357
                         "(reset proxies for host failover)",
1358
                         old_proxy.c_str(),
1359
                         (*opt_proxy_groups_)[0][0].url.c_str());
1360
              }
1361
            }
1362
          }
1363
1364
          // Make it a host failure
1365
          LogCvmfs(kLogDownload, kLogDebug, "make it a host failure");
1366
          info->num_used_proxies = 1;
1367
          info->error_code = kFailHostAfterProxy;
1368
        } else {
1369
          try_again = false;
1370
        }
1371
      }  // Make a proxy failure a host failure
1372
    }  // Proxy failure assumed
1373
41
    pthread_mutex_unlock(lock_options_);
1374
  }
1375
1376
248
  if (try_again) {
1377
    LogCvmfs(kLogDownload, kLogDebug, "Trying again on same curl handle, "
1378
6
             "same url: %d, error code %d", same_url_retry, info->error_code);
1379
    // Reset internal state and destination
1380

6
    if ((info->destination == kDestinationMem) && info->destination_mem.data) {
1381
      free(info->destination_mem.data);
1382
      info->destination_mem.data = NULL;
1383
      info->destination_mem.size = 0;
1384
      info->destination_mem.pos = 0;
1385
    }
1386

6
    if ((info->destination == kDestinationFile) ||
1387
        (info->destination == kDestinationPath))
1388
    {
1389

2
      if ((fflush(info->destination_file) != 0) ||
1390
          (ftruncate(fileno(info->destination_file), 0) != 0))
1391
      {
1392
        info->error_code = kFailLocalIO;
1393
        goto verify_and_finalize_stop;
1394
      }
1395
2
      rewind(info->destination_file);
1396
    }
1397
6
    if (info->destination == kDestinationSink) {
1398
2
      if (info->destination_sink->Reset() != 0) {
1399
        info->error_code = kFailLocalIO;
1400
        goto verify_and_finalize_stop;
1401
      }
1402
    }
1403
6
    if (info->expected_hash)
1404
4
      shash::Init(info->hash_context);
1405
6
    if (info->compressed)
1406
4
      zlib::DecompressInit(&info->zstream);
1407
6
    SetRegularCache(info);
1408
1409
    // Failure handling
1410
6
    bool switch_proxy = false;
1411
6
    bool switch_host = false;
1412

6
    switch (info->error_code) {
1413
      case kFailBadData:
1414
2
        SetNocache(info);
1415
2
        break;
1416
      case kFailProxyResolve:
1417
      case kFailProxyHttp:
1418
        switch_proxy = true;
1419
        break;
1420
      case kFailHostResolve:
1421
      case kFailHostHttp:
1422
      case kFailHostAfterProxy:
1423
        switch_host = true;
1424
        break;
1425
      default:
1426
4
        if (IsProxyTransferError(info->error_code)) {
1427
          if (same_url_retry)
1428
            Backoff(info);
1429
          else
1430
            switch_proxy = true;
1431
4
        } else if (IsHostTransferError(info->error_code)) {
1432
4
          if (same_url_retry)
1433
4
            Backoff(info);
1434
          else
1435
            switch_host = true;
1436
        } else {
1437
          // No other errors expected when retrying
1438
          abort();
1439
        }
1440
    }
1441
6
    if (switch_proxy) {
1442
      ReleaseCredential(info);
1443
      SwitchProxy(info);
1444
      info->num_used_proxies++;
1445
      SetUrlOptions(info);
1446
    }
1447
6
    if (switch_host) {
1448
      ReleaseCredential(info);
1449
      SwitchHost(info);
1450
      info->num_used_hosts++;
1451
      SetUrlOptions(info);
1452
    }
1453
1454
6
    return true;  // try again
1455
  }
1456
1457
 verify_and_finalize_stop:
1458
  // Finalize, flush destination file
1459
242
  ReleaseCredential(info);
1460

242
  if ((info->destination == kDestinationFile) &&
1461
      fflush(info->destination_file) != 0)
1462
  {
1463
    info->error_code = kFailLocalIO;
1464
242
  } else if (info->destination == kDestinationPath) {
1465
    if (fclose(info->destination_file) != 0)
1466
      info->error_code = kFailLocalIO;
1467
    info->destination_file = NULL;
1468
  }
1469
1470
242
  if (info->compressed)
1471
158
    zlib::DecompressFini(&info->zstream);
1472
1473
242
  if (info->headers) {
1474
242
    header_lists_->PutList(info->headers);
1475
242
    info->headers = NULL;
1476
  }
1477
1478
242
  return false;  // stop transfer and return to Fetch()
1479
}
1480
1481
1482
211
DownloadManager::DownloadManager() {
1483
211
  pool_handles_idle_ = NULL;
1484
211
  pool_handles_inuse_ = NULL;
1485
211
  pool_max_handles_ = 0;
1486
211
  curl_multi_ = NULL;
1487
211
  default_headers_ = NULL;
1488
1489
211
  atomic_init32(&multi_threaded_);
1490
211
  pipe_terminate_[0] = pipe_terminate_[1] = -1;
1491
1492
211
  pipe_jobs_[0] = pipe_jobs_[1] = -1;
1493
211
  watch_fds_ = NULL;
1494
211
  watch_fds_size_ = 0;
1495
211
  watch_fds_inuse_ = 0;
1496
211
  watch_fds_max_ = 0;
1497
1498
  lock_options_ =
1499
211
  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1500
211
  int retval = pthread_mutex_init(lock_options_, NULL);
1501
211
  assert(retval == 0);
1502
  lock_synchronous_mode_ =
1503
211
  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1504
211
  retval = pthread_mutex_init(lock_synchronous_mode_, NULL);
1505
211
  assert(retval == 0);
1506
1507
211
  opt_dns_server_ = "";
1508
211
  opt_ip_preference_ = dns::kIpPreferSystem;
1509
211
  opt_timeout_proxy_ = 0;
1510
211
  opt_timeout_direct_ = 0;
1511
211
  opt_low_speed_limit_ = 0;
1512
211
  opt_host_chain_ = NULL;
1513
211
  opt_host_chain_rtt_ = NULL;
1514
211
  opt_host_chain_current_ = 0;
1515
211
  opt_proxy_groups_ = NULL;
1516
211
  opt_proxy_groups_current_ = 0;
1517
211
  opt_proxy_groups_current_burned_ = 0;
1518
211
  opt_num_proxies_ = 0;
1519
211
  opt_max_retries_ = 0;
1520
211
  opt_backoff_init_ms_ = 0;
1521
211
  opt_backoff_max_ms_ = 0;
1522
211
  enable_info_header_ = false;
1523
211
  opt_ipv4_only_ = false;
1524
211
  follow_redirects_ = false;
1525
211
  use_system_proxy_ = false;
1526
1527
211
  resolver_ = NULL;
1528
1529
211
  opt_timestamp_backup_proxies_ = 0;
1530
211
  opt_timestamp_failover_proxies_ = 0;
1531
211
  opt_proxy_groups_reset_after_ = 0;
1532
211
  opt_timestamp_backup_host_ = 0;
1533
211
  opt_host_reset_after_ = 0;
1534
1535
211
  credentials_attachment_ = NULL;
1536
1537
211
  counters_ = NULL;
1538
211
}
1539
1540
1541
211
DownloadManager::~DownloadManager() {
1542
211
  pthread_mutex_destroy(lock_options_);
1543
211
  pthread_mutex_destroy(lock_synchronous_mode_);
1544
211
  free(lock_options_);
1545
211
  free(lock_synchronous_mode_);
1546
211
}
1547
1548
200
void DownloadManager::InitHeaders() {
1549
  // User-Agent
1550
200
  string cernvm_id = "User-Agent: cvmfs ";
1551
#ifdef CVMFS_LIBCVMFS
1552
200
  cernvm_id += "libcvmfs ";
1553
#else
1554
  cernvm_id += "Fuse ";
1555
#endif
1556
200
  cernvm_id += string(VERSION);
1557
200
  if (getenv("CERNVM_UUID") != NULL) {
1558
    cernvm_id += " " +
1559
    sanitizer::InputSanitizer("az AZ 09 -").Filter(getenv("CERNVM_UUID"));
1560
  }
1561
200
  user_agent_ = strdup(cernvm_id.c_str());
1562
1563
200
  header_lists_ = new HeaderLists();
1564
1565
200
  default_headers_ = header_lists_->GetList("Connection: Keep-Alive");
1566
200
  header_lists_->AppendHeader(default_headers_, "Pragma:");
1567
200
  header_lists_->AppendHeader(default_headers_, user_agent_);
1568
200
}
1569
1570
1571
199
void DownloadManager::FiniHeaders() {
1572
199
  delete header_lists_;
1573
199
  header_lists_ = NULL;
1574
199
  default_headers_ = NULL;
1575
199
}
1576
1577
1578
200
void DownloadManager::Init(const unsigned max_pool_handles,
1579
                           const bool use_system_proxy,
1580
                           perf::StatisticsTemplate statistics)
1581
{
1582
200
  atomic_init32(&multi_threaded_);
1583
200
  int retval = curl_global_init(CURL_GLOBAL_ALL);
1584
200
  assert(retval == CURLE_OK);
1585
200
  pool_handles_idle_ = new set<CURL *>;
1586
200
  pool_handles_inuse_ = new set<CURL *>;
1587
200
  pool_max_handles_ = max_pool_handles;
1588
200
  watch_fds_max_ = 4*pool_max_handles_;
1589
1590
200
  opt_timeout_proxy_ = 5;
1591
200
  opt_timeout_direct_ = 10;
1592
200
  opt_low_speed_limit_ = 1024;
1593
200
  opt_proxy_groups_current_ = 0;
1594
200
  opt_proxy_groups_current_burned_ = 0;
1595
200
  opt_num_proxies_ = 0;
1596
200
  opt_host_chain_current_ = 0;
1597
200
  opt_ip_preference_ = dns::kIpPreferSystem;
1598
1599
200
  counters_ = new Counters(statistics);
1600
1601
200
  user_agent_ = NULL;
1602
200
  InitHeaders();
1603
1604
200
  curl_multi_ = curl_multi_init();
1605
200
  assert(curl_multi_ != NULL);
1606
200
  curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION, CallbackCurlSocket);
1607
  curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1608
200
                    static_cast<void *>(this));
1609
200
  curl_multi_setopt(curl_multi_, CURLMOPT_MAXCONNECTS, watch_fds_max_);
1610
  curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1611
200
                    pool_max_handles_);
1612
1613
200
  prng_.InitLocaltime();
1614
1615
  // Name resolving
1616

200
  if ((getenv("CVMFS_IPV4_ONLY") != NULL) &&
1617
      (strlen(getenv("CVMFS_IPV4_ONLY")) > 0))
1618
  {
1619
    opt_ipv4_only_ = true;
1620
  }
1621
  resolver_ = dns::NormalResolver::Create(opt_ipv4_only_,
1622
200
    kDnsDefaultRetries, kDnsDefaultTimeoutMs);
1623
200
  assert(resolver_);
1624
1625
  // Parsing environment variables
1626
200
  if (use_system_proxy) {
1627
41
    use_system_proxy_ = true;
1628
41
    if (getenv("http_proxy") == NULL) {
1629
41
      SetProxyChain("", "", kSetProxyRegular);
1630
    } else {
1631
      SetProxyChain(string(getenv("http_proxy")), "", kSetProxyRegular);
1632
    }
1633
  }
1634
200
}
1635
1636
1637
199
void DownloadManager::Fini() {
1638
199
  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1639
    // Shutdown I/O thread
1640
    char buf = 'T';
1641
    WritePipe(pipe_terminate_[1], &buf, 1);
1642
    pthread_join(thread_download_, NULL);
1643
    // All handles are removed from the multi stack
1644
    close(pipe_terminate_[1]);
1645
    close(pipe_terminate_[0]);
1646
    close(pipe_jobs_[1]);
1647
    close(pipe_jobs_[0]);
1648
  }
1649
1650
488
  for (set<CURL *>::iterator i = pool_handles_idle_->begin(),
1651
199
       iEnd = pool_handles_idle_->end(); i != iEnd; ++i)
1652
  {
1653
90
    curl_easy_cleanup(*i);
1654
  }
1655
199
  delete pool_handles_idle_;
1656
199
  delete pool_handles_inuse_;
1657
199
  curl_multi_cleanup(curl_multi_);
1658
199
  pool_handles_idle_ = NULL;
1659
199
  pool_handles_inuse_ = NULL;
1660
199
  curl_multi_ = NULL;
1661
1662
199
  FiniHeaders();
1663
199
  if (user_agent_)
1664
199
    free(user_agent_);
1665
199
  user_agent_ = NULL;
1666
1667
199
  delete counters_;
1668
199
  counters_ = NULL;
1669
1670
199
  delete opt_host_chain_;
1671
199
  delete opt_host_chain_rtt_;
1672
199
  delete opt_proxy_groups_;
1673
199
  opt_host_chain_ = NULL;
1674
199
  opt_host_chain_rtt_ = NULL;
1675
199
  opt_proxy_groups_ = NULL;
1676
1677
199
  curl_global_cleanup();
1678
1679
199
  delete resolver_;
1680
199
  resolver_ = NULL;
1681
199
}
1682
1683
1684
/**
1685
 * Spawns the I/O worker thread and switches the module in multi-threaded mode.
1686
 * No way back except Fini(); Init();
1687
 */
1688
void DownloadManager::Spawn() {
1689
  MakePipe(pipe_terminate_);
1690
  MakePipe(pipe_jobs_);
1691
1692
  int retval = pthread_create(&thread_download_, NULL, MainDownload,
1693
                              static_cast<void *>(this));
1694
  assert(retval == 0);
1695
1696
  atomic_inc32(&multi_threaded_);
1697
}
1698
1699
1700
/**
1701
 * Downloads data from an unsecure outside channel (currently HTTP or file).
1702
 */
1703
242
Failures DownloadManager::Fetch(JobInfo *info) {
1704
242
  assert(info != NULL);
1705
242
  assert(info->url != NULL);
1706
1707
  Failures result;
1708
242
  result = PrepareDownloadDestination(info);
1709
242
  if (result != kFailOk)
1710
    return result;
1711
1712
242
  if (info->expected_hash) {
1713
162
    const shash::Algorithms algorithm = info->expected_hash->algorithm;
1714
162
    info->hash_context.algorithm = algorithm;
1715
162
    info->hash_context.size = shash::GetContextSize(algorithm);
1716
162
    info->hash_context.buffer = alloca(info->hash_context.size);
1717
  }
1718
1719
  // Prepare cvmfs-info: header, allocate string on the stack
1720
242
  info->info_header = NULL;
1721

242
  if (enable_info_header_ && info->extra_info) {
1722
    const char *header_name = "cvmfs-info: ";
1723
    const size_t header_name_len = strlen(header_name);
1724
    const unsigned header_size = 1 + header_name_len +
1725
      EscapeHeader(*(info->extra_info), NULL, 0);
1726
    info->info_header = static_cast<char *>(alloca(header_size));
1727
    memcpy(info->info_header, header_name, header_name_len);
1728
    EscapeHeader(*(info->extra_info), info->info_header + header_name_len,
1729
                 header_size - header_name_len);
1730
    info->info_header[header_size-1] = '\0';
1731
  }
1732
1733
242
  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1734
    if (info->wait_at[0] == -1) {
1735
      MakePipe(info->wait_at);
1736
    }
1737
1738
    // LogCvmfs(kLogDownload, kLogDebug, "send job to thread, pipe %d %d",
1739
    //          info->wait_at[0], info->wait_at[1]);
1740
    WritePipe(pipe_jobs_[1], &info, sizeof(info));
1741
    ReadPipe(info->wait_at[0], &result, sizeof(result));
1742
    // LogCvmfs(kLogDownload, kLogDebug, "got result %d", result);
1743
  } else {
1744
242
    pthread_mutex_lock(lock_synchronous_mode_);
1745
242
    CURL *handle = AcquireCurlHandle();
1746
242
    InitializeRequest(info, handle);
1747
242
    SetUrlOptions(info);
1748
    // curl_easy_setopt(handle, CURLOPT_VERBOSE, 1);
1749
    int retval;
1750
248
    do {
1751
248
      retval = curl_easy_perform(handle);
1752
248
      perf::Inc(counters_->n_requests);
1753
      double elapsed;
1754
248
      if (curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &elapsed) == CURLE_OK)
1755
248
        perf::Xadd(counters_->sz_transfer_time, (int64_t)(elapsed * 1000));
1756
    } while (VerifyAndFinalize(retval, info));
1757
242
    result = info->error_code;
1758
242
    ReleaseCurlHandle(info->curl_handle);
1759
242
    pthread_mutex_unlock(lock_synchronous_mode_);
1760
  }
1761
1762
242
  if (result != kFailOk) {
1763
    LogCvmfs(kLogDownload, kLogDebug, "download failed (error %d - %s)", result,
1764
35
             Code2Ascii(result));
1765
1766
35
    if (info->destination == kDestinationPath)
1767
      unlink(info->destination_path->c_str());
1768
1769
35
    if (info->destination_mem.data) {
1770
2
      free(info->destination_mem.data);
1771
2
      info->destination_mem.data = NULL;
1772
2
      info->destination_mem.size = 0;
1773
    }
1774
  }
1775
1776
242
  return result;
1777
}
1778
1779
1780
/**
1781
 * Used by the client to connect the authz session manager to the download
1782
 * manager.
1783
 */
1784
59
void DownloadManager::SetCredentialsAttachment(CredentialsAttachment *ca) {
1785
59
  pthread_mutex_lock(lock_options_);
1786
59
  credentials_attachment_ = ca;
1787
59
  pthread_mutex_unlock(lock_options_);
1788
59
}
1789
1790
/**
1791
 * Gets the DNS sever.
1792
 */
1793
std::string DownloadManager::GetDnsServer() const {
1794
  return opt_dns_server_;
1795
}
1796
1797
/**
1798
 * Sets a DNS server.  Only for testing as it cannot be reverted to the system
1799
 * default.
1800
 */
1801
void DownloadManager::SetDnsServer(const string &address) {
1802
  pthread_mutex_lock(lock_options_);
1803
  if (!address.empty()) {
1804
    opt_dns_server_ = address;
1805
    assert(!opt_dns_server_.empty());
1806
1807
    vector<string> servers;
1808
    servers.push_back(address);
1809
    bool retval = resolver_->SetResolvers(servers);
1810
    assert(retval);
1811
  }
1812
  pthread_mutex_unlock(lock_options_);
1813
  LogCvmfs(kLogDownload, kLogSyslog, "set nameserver to %s", address.c_str());
1814
}
1815
1816
1817
/**
1818
 * Sets the DNS query timeout parameters.
1819
 */
1820
117
void DownloadManager::SetDnsParameters(
1821
  const unsigned retries,
1822
  const unsigned timeout_ms)
1823
{
1824
117
  pthread_mutex_lock(lock_options_);
1825

117
  if ((resolver_->retries() == retries) &&
1826
      (resolver_->timeout_ms() == timeout_ms))
1827
  {
1828
117
    pthread_mutex_unlock(lock_options_);
1829
117
    return;
1830
  }
1831
  delete resolver_;
1832
  resolver_ = NULL;
1833
  resolver_ =
1834
    dns::NormalResolver::Create(opt_ipv4_only_, retries, timeout_ms);
1835
  assert(resolver_);
1836
  pthread_mutex_unlock(lock_options_);
1837
}
1838
1839
1840
117
void DownloadManager::SetDnsTtlLimits(
1841
  const unsigned min_seconds,
1842
  const unsigned max_seconds)
1843
{
1844
117
  pthread_mutex_lock(lock_options_);
1845
117
  resolver_->set_min_ttl(min_seconds);
1846
117
  resolver_->set_max_ttl(max_seconds);
1847
117
  pthread_mutex_unlock(lock_options_);
1848
117
}
1849
1850
1851
void DownloadManager::SetIpPreference(dns::IpPreference preference) {
1852
  pthread_mutex_lock(lock_options_);
1853
  opt_ip_preference_ = preference;
1854
  pthread_mutex_unlock(lock_options_);
1855
}
1856
1857
1858
/**
1859
 * Sets two timeout values for proxied and for direct conections, respectively.
1860
 * The timeout counts for all sorts of connection phases,
1861
 * DNS, HTTP connect, etc.
1862
 */
1863
137
void DownloadManager::SetTimeout(const unsigned seconds_proxy,
1864
                                 const unsigned seconds_direct)
1865
{
1866
137
  pthread_mutex_lock(lock_options_);
1867
137
  opt_timeout_proxy_ = seconds_proxy;
1868
137
  opt_timeout_direct_ = seconds_direct;
1869
137
  pthread_mutex_unlock(lock_options_);
1870
137
}
1871
1872
1873
/**
1874
 * Sets contains the average transfer speed in bytes per second that the
1875
 * transfer should be below during CURLOPT_LOW_SPEED_TIME seconds for libcurl to
1876
 * consider it to be too slow and abort.  Only effective for new connections.
1877
 */
1878
void DownloadManager::SetLowSpeedLimit(const unsigned low_speed_limit) {
1879
  pthread_mutex_lock(lock_options_);
1880
  opt_low_speed_limit_ = low_speed_limit;
1881
  pthread_mutex_unlock(lock_options_);
1882
}
1883
1884
1885
/**
1886
 * Receives the currently active timeout values.
1887
 */
1888
57
void DownloadManager::GetTimeout(unsigned *seconds_proxy,
1889
                                 unsigned *seconds_direct)
1890
{
1891
57
  pthread_mutex_lock(lock_options_);
1892
57
  *seconds_proxy = opt_timeout_proxy_;
1893
57
  *seconds_direct = opt_timeout_direct_;
1894
57
  pthread_mutex_unlock(lock_options_);
1895
57
}
1896
1897
1898
/**
1899
 * Parses a list of ';'-separated hosts for the host chain.  The empty string
1900
 * removes the host list.
1901
 */
1902
148
void DownloadManager::SetHostChain(const string &host_list) {
1903
148
  SetHostChain(SplitString(host_list, ';'));
1904
148
}
1905
1906
1907
154
void DownloadManager::SetHostChain(const std::vector<std::string> &host_list) {
1908
154
  pthread_mutex_lock(lock_options_);
1909
154
  opt_timestamp_backup_host_ = 0;
1910
154
  delete opt_host_chain_;
1911
154
  delete opt_host_chain_rtt_;
1912
154
  opt_host_chain_current_ = 0;
1913
1914
154
  if (host_list.empty()) {
1915
    opt_host_chain_ = NULL;
1916
    opt_host_chain_rtt_ = NULL;
1917
    pthread_mutex_unlock(lock_options_);
1918
    return;
1919
  }
1920
1921
154
  opt_host_chain_ = new vector<string>(host_list);
1922
  opt_host_chain_rtt_ =
1923
154
    new vector<int>(opt_host_chain_->size(), kProbeUnprobed);
1924
  // LogCvmfs(kLogDownload, kLogSyslog, "using host %s",
1925
  //          (*opt_host_chain_)[0].c_str());
1926
154
  pthread_mutex_unlock(lock_options_);
1927
}
1928
1929
1930
1931
/**
1932
 * Retrieves the currently set chain of hosts, their round trip times, and the
1933
 * currently used host.
1934
 */
1935
42
void DownloadManager::GetHostInfo(vector<string> *host_chain, vector<int> *rtt,
1936
                                  unsigned *current_host)
1937
{
1938
42
  pthread_mutex_lock(lock_options_);
1939
42
  if (opt_host_chain_) {
1940
42
    if (current_host) {*current_host = opt_host_chain_current_;}
1941
42
    if (host_chain) {*host_chain = *opt_host_chain_;}
1942
42
    if (rtt) {*rtt = *opt_host_chain_rtt_;}
1943
  }
1944
42
  pthread_mutex_unlock(lock_options_);
1945
42
}
1946
1947
1948
/**
1949
 * Jumps to the next proxy in the ring of forward proxy servers.
1950
 * Selects one randomly from a load-balancing group.
1951
 *
1952
 * If info is set, switch only if the current proxy is identical to the one used
1953
 * by info, otherwise another transfer has already done the switch.
1954
 */
1955
void DownloadManager::SwitchProxy(JobInfo *info) {
1956
  pthread_mutex_lock(lock_options_);
1957
1958
  if (!opt_proxy_groups_) {
1959
    pthread_mutex_unlock(lock_options_);
1960
    return;
1961
  }
1962
  if (info &&
1963
      ((*opt_proxy_groups_)[opt_proxy_groups_current_][0].url != info->proxy))
1964
  {
1965
    pthread_mutex_unlock(lock_options_);
1966
    return;
1967
  }
1968
1969
  perf::Inc(counters_->n_proxy_failover);
1970
  string old_proxy = (*opt_proxy_groups_)[opt_proxy_groups_current_][0].url;
1971
1972
  // If all proxies from the current load-balancing group are burned, switch to
1973
  // another group
1974
  if (opt_proxy_groups_current_burned_ ==
1975
      (*opt_proxy_groups_)[opt_proxy_groups_current_].size())
1976
  {
1977
    opt_proxy_groups_current_burned_ = 0;
1978
    if (opt_proxy_groups_->size() > 1) {
1979
      opt_proxy_groups_current_ = (opt_proxy_groups_current_ + 1) %
1980
      opt_proxy_groups_->size();
1981
      // Remeber the timestamp of switching to backup proxies
1982
      if (opt_proxy_groups_reset_after_ > 0) {
1983
        if (opt_proxy_groups_current_ > 0) {
1984
          if (opt_timestamp_backup_proxies_ == 0)
1985
            opt_timestamp_backup_proxies_ = time(NULL);
1986
          // LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
1987
          //          "switched to (another) backup proxy group");
1988
        } else {
1989
          opt_timestamp_backup_proxies_ = 0;
1990
          // LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
1991
          //          "switched back to primary proxy group");
1992
        }
1993
        opt_timestamp_failover_proxies_ = 0;
1994
      }
1995
    }
1996
  } else {
1997
    // failover within the same group
1998
    if (opt_proxy_groups_reset_after_ > 0) {
1999
      if (opt_timestamp_failover_proxies_ == 0)
2000
        opt_timestamp_failover_proxies_ = time(NULL);
2001
    }
2002
  }
2003
2004
  vector<ProxyInfo> *group = &((*opt_proxy_groups_)[opt_proxy_groups_current_]);
2005
  const unsigned group_size = group->size();
2006
2007
  // Move active proxy to the back
2008
  if (opt_proxy_groups_current_burned_) {
2009
    const ProxyInfo swap = (*group)[0];
2010
    (*group)[0] = (*group)[group_size - opt_proxy_groups_current_burned_];
2011
    (*group)[group_size - opt_proxy_groups_current_burned_] = swap;
2012
  }
2013
  opt_proxy_groups_current_burned_++;
2014
2015
  // Select new one
2016
  if ((group_size - opt_proxy_groups_current_burned_) > 0) {
2017
    int select = prng_.Next(group_size - opt_proxy_groups_current_burned_ + 1);
2018
2019
    // Move selected proxy to front
2020
    const ProxyInfo swap = (*group)[select];
2021
    (*group)[select] = (*group)[0];
2022
    (*group)[0] = swap;
2023
  }
2024
2025
  LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2026
           "switching proxy from %s to %s",
2027
           old_proxy.c_str(), (*group)[0].url.c_str());
2028
  LogCvmfs(kLogDownload, kLogDebug, "%d proxies remain in group",
2029
           group_size - opt_proxy_groups_current_burned_);
2030
2031
  pthread_mutex_unlock(lock_options_);
2032
}
2033
2034
2035
/**
2036
 * Switches to the next host in the chain.  If info is set, switch only if the
2037
 * current host is identical to the one used by info, otherwise another transfer
2038
 * has already done the switch.
2039
 */
2040
void DownloadManager::SwitchHost(JobInfo *info) {
2041
  bool do_switch = true;
2042
2043
  pthread_mutex_lock(lock_options_);
2044
  if (!opt_host_chain_ || (opt_host_chain_->size() == 1)) {
2045
    pthread_mutex_unlock(lock_options_);
2046
    return;
2047
  }
2048
2049
  if (info) {
2050
    char *effective_url;
2051
    curl_easy_getinfo(info->curl_handle, CURLINFO_EFFECTIVE_URL,
2052
                      &effective_url);
2053
    if (!HasPrefix(string(effective_url) + "/",
2054
                   (*opt_host_chain_)[opt_host_chain_current_] + "/",
2055
                   true))
2056
    {
2057
      do_switch = false;
2058
      LogCvmfs(kLogDownload, kLogDebug, "don't switch host, "
2059
               "effective url: %s, current host: %s", effective_url,
2060
               (*opt_host_chain_)[opt_host_chain_current_].c_str());
2061
    }
2062
  }
2063
2064
  if (do_switch) {
2065
    string old_host = (*opt_host_chain_)[opt_host_chain_current_];
2066
    opt_host_chain_current_ = (opt_host_chain_current_+1) %
2067
    opt_host_chain_->size();
2068
    perf::Inc(counters_->n_host_failover);
2069
    LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2070
             "switching host from %s to %s", old_host.c_str(),
2071
             (*opt_host_chain_)[opt_host_chain_current_].c_str());
2072
2073
    // Remeber the timestamp of switching to backup host
2074
    if (opt_host_reset_after_ > 0) {
2075
      if (opt_host_chain_current_ != 0) {
2076
        if (opt_timestamp_backup_host_ == 0)
2077
          opt_timestamp_backup_host_ = time(NULL);
2078
      } else {
2079
        opt_timestamp_backup_host_ = 0;
2080
      }
2081
    }
2082
  }
2083
  pthread_mutex_unlock(lock_options_);
2084
}
2085
2086
2087
void DownloadManager::SwitchHost() {
2088
  SwitchHost(NULL);
2089
}
2090
2091
2092
/**
2093
 * Orders the hostlist according to RTT of downloading .cvmfschecksum.
2094
 * Sets the current host to the best-responsive host.
2095
 * If you change the host list in between by SetHostChain(), it will be
2096
 * overwritten by this function.
2097
 */
2098
void DownloadManager::ProbeHosts() {
2099
  vector<string> host_chain;
2100
  vector<int> host_rtt;
2101
  unsigned current_host;
2102
2103
  GetHostInfo(&host_chain, &host_rtt, &current_host);
2104
2105
  // Stopwatch, two times to fill caches first
2106
  unsigned i, retries;
2107
  string url;
2108
  JobInfo info(&url, false, false, NULL);
2109
  for (retries = 0; retries < 2; ++retries) {
2110
    for (i = 0; i < host_chain.size(); ++i) {
2111
      url = host_chain[i] + "/.cvmfspublished";
2112
2113
      struct timeval tv_start, tv_end;
2114
      gettimeofday(&tv_start, NULL);
2115
      Failures result = Fetch(&info);
2116
      gettimeofday(&tv_end, NULL);
2117
      if (info.destination_mem.data)
2118
        free(info.destination_mem.data);
2119
      if (result == kFailOk) {
2120
        host_rtt[i] = static_cast<int>(
2121
          DiffTimeSeconds(tv_start, tv_end) * 1000);
2122
        LogCvmfs(kLogDownload, kLogDebug, "probing host %s had %dms rtt",
2123
                 url.c_str(), host_rtt[i]);
2124
      } else {
2125
        LogCvmfs(kLogDownload, kLogDebug, "error while probing host %s: %d %s",
2126
                 url.c_str(), result, Code2Ascii(result));
2127
        host_rtt[i] = INT_MAX;
2128
      }
2129
    }
2130
  }
2131
2132
  SortTeam(&host_rtt, &host_chain);
2133
  for (i = 0; i < host_chain.size(); ++i) {
2134
    if (host_rtt[i] == INT_MAX) host_rtt[i] = kProbeDown;
2135
  }
2136
2137
  pthread_mutex_lock(lock_options_);
2138
  delete opt_host_chain_;
2139
  delete opt_host_chain_rtt_;
2140
  opt_host_chain_ = new vector<string>(host_chain);
2141
  opt_host_chain_rtt_ = new vector<int>(host_rtt);
2142
  opt_host_chain_current_ = 0;
2143
  pthread_mutex_unlock(lock_options_);
2144
}
2145
2146
2147
bool DownloadManager::GeoSortServers(std::vector<std::string> *servers,
2148
                    std::vector<uint64_t> *output_order) {
2149
  if (!servers) {return false;}
2150
  if (servers->size() == 1) {
2151
    if (output_order) {
2152
      output_order->clear();
2153
      output_order->push_back(0);
2154
    }
2155
    return true;
2156
  }
2157
2158
  std::vector<std::string> host_chain;
2159
  GetHostInfo(&host_chain, NULL, NULL);
2160
2161
  std::vector<std::string> server_dns_names;
2162
  server_dns_names.reserve(servers->size());
2163
  for (unsigned i = 0; i < servers->size(); ++i) {
2164
    std::string host = dns::ExtractHost((*servers)[i]);
2165
    server_dns_names.push_back(host.empty() ? (*servers)[i] : host);
2166
  }
2167
  std::string host_list = JoinStrings(server_dns_names, ",");
2168
2169
  // Protect against concurrent access to prng_
2170
  pthread_mutex_lock(lock_options_);
2171
  // Determine random hosts for the Geo-API query
2172
  vector<string> host_chain_shuffled = Shuffle(host_chain, &prng_);
2173
  pthread_mutex_unlock(lock_options_);
2174
2175
  // Request ordered list via Geo-API
2176
  bool success = false;
2177
  unsigned max_attempts = std::min(host_chain_shuffled.size(), size_t(3));
2178
  vector<uint64_t> geo_order(servers->size());
2179
  for (unsigned i = 0; i < max_attempts; ++i) {
2180
    string url = host_chain_shuffled[i] + "/api/v1.0/geo/@proxy@/" + host_list;
2181
    LogCvmfs(kLogDownload, kLogDebug,
2182
             "requesting ordered server list from %s", url.c_str());
2183
    JobInfo info(&url, false, false, NULL);
2184
    Failures result = Fetch(&info);
2185
    if (result == kFailOk) {
2186
      string order(info.destination_mem.data, info.destination_mem.size);
2187
      free(info.destination_mem.data);
2188
      bool retval = ValidateGeoReply(order, servers->size(), &geo_order);
2189
      if (!retval) {
2190
        LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2191
                 "retrieved invalid GeoAPI reply from %s [%s]",
2192
                 url.c_str(), order.c_str());
2193
      } else {
2194
        LogCvmfs(kLogDownload, kLogDebug | kLogSyslog,
2195
                 "geographic order of servers retrieved from %s",
2196
                 dns::ExtractHost(host_chain_shuffled[i]).c_str());
2197
        LogCvmfs(kLogDownload, kLogDebug, "order is %s", order.c_str());
2198
        success = true;
2199
        break;
2200
      }
2201
    } else {
2202
      LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2203
               "GeoAPI request %s failed with error %d [%s]",
2204
               url.c_str(), result, Code2Ascii(result));
2205
    }
2206
  }
2207
  if (!success) {
2208
    LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2209
             "failed to retrieve geographic order from stratum 1 servers");
2210
    return false;
2211
  }
2212
2213
  if (output_order) {
2214
    output_order->swap(geo_order);
2215
  } else {
2216
    std::vector<std::string> sorted_servers;
2217
    sorted_servers.reserve(geo_order.size());
2218
    for (unsigned i = 0; i < geo_order.size(); ++i) {
2219
      uint64_t orderval = geo_order[i];
2220
      sorted_servers.push_back((*servers)[orderval]);
2221
    }
2222
    servers->swap(sorted_servers);
2223
  }
2224
  return true;
2225
}
2226
2227
2228
/**
2229
 * Uses the Geo-API of Stratum 1s to let any of them order the list of servers
2230
 *   and fallback proxies (if any).
2231
 * Tries at most three random Stratum 1s before giving up.
2232
 * If you change the host list in between by SetHostChain() or the fallback
2233
 *   proxy list by SetProxyChain(), they will be overwritten by this function.
2234
 */
2235
bool DownloadManager::ProbeGeo() {
2236
  vector<string> host_chain;
2237
  vector<int> host_rtt;
2238
  unsigned current_host;
2239
  vector< vector<ProxyInfo> > proxy_chain;
2240
  unsigned fallback_group;
2241
2242
  GetHostInfo(&host_chain, &host_rtt, &current_host);
2243
  GetProxyInfo(&proxy_chain, NULL, &fallback_group);
2244
  if ((host_chain.size() < 2) && ((proxy_chain.size() - fallback_group) < 2))
2245
    return true;
2246
2247
  vector<string> host_names;
2248
  for (unsigned i = 0; i < host_chain.size(); ++i)
2249
    host_names.push_back(dns::ExtractHost(host_chain[i]));
2250
  SortTeam(&host_names, &host_chain);
2251
  unsigned last_geo_host = host_names.size();
2252
2253
  if ((fallback_group == 0) && (last_geo_host > 1)) {
2254
    // There are no non-fallback proxies, which means that the client
2255
    // will always use the fallback proxies.  Add a keyword separator
2256
    // between the hosts and fallback proxies so the geosorting service
2257
    // will know to sort the hosts based on the distance from the
2258
    // closest fallback proxy rather than the distance from the client.
2259
    host_names.push_back("+PXYSEP+");
2260
  }
2261
2262
  // Add fallback proxy names to the end of the host list
2263
  unsigned first_geo_fallback = host_names.size();
2264
  for (unsigned i = fallback_group; i < proxy_chain.size(); ++i) {
2265
    // We only take the first fallback proxy name from every group under the
2266
    // assumption that load-balanced servers are at the same location
2267
    host_names.push_back(proxy_chain[i][0].host.name());
2268
  }
2269
  // TODO(dwd): fallback proxies should be sorted too for maximum
2270
  // cache reuse.  For WLCG there's no reason to presort fallbacks
2271
  // because they're set by a widely shared config so they're already
2272
  // in a fixed order, but that can change in a different context.
2273
2274
  std::vector<uint64_t> geo_order;
2275
  bool success = GeoSortServers(&host_names, &geo_order);
2276
  if (!success) {
2277
    // GeoSortServers already logged a failure message.
2278
    return false;
2279
  }
2280
2281
  // Re-install host chain and proxy chain
2282
  pthread_mutex_lock(lock_options_);
2283
  delete opt_host_chain_;
2284
  opt_num_proxies_ = 0;
2285
  opt_host_chain_ = new vector<string>(host_chain.size());
2286
2287
  // It's possible that opt_proxy_groups_fallback_ might have changed while the
2288
  // lock wasn't held
2289
  vector< vector< ProxyInfo> > *proxy_groups =
2290
        new vector< vector<ProxyInfo> > (
2291
            opt_proxy_groups_fallback_ + proxy_chain.size() - fallback_group);
2292
  // First copy the non-fallback part of the current proxy chain
2293
  for (unsigned i = 0; i < opt_proxy_groups_fallback_; ++i) {
2294
    (*proxy_groups)[i] = (*opt_proxy_groups_)[i];
2295
    opt_num_proxies_ += (*opt_proxy_groups_)[i].size();
2296
  }
2297
2298
  // Copy the host chain and fallback proxies by geo order.  Array indices
2299
  // in geo_order that are smaller than last_geo_host refer to a stratum 1,
2300
  // and those indices greater than or equal to first_geo_fallback refer to
2301
  // a fallback proxy.
2302
  unsigned hosti = 0;
2303
  unsigned proxyi = opt_proxy_groups_fallback_;
2304
  for (unsigned i = 0; i < geo_order.size(); ++i) {
2305
    uint64_t orderval = geo_order[i];
2306
    if (orderval < (uint64_t) last_geo_host) {
2307
      // LogCvmfs(kLogCvmfs, kLogSyslog, "this is orderval %u at host index %u",
2308
      //          orderval, hosti);
2309
      (*opt_host_chain_)[hosti++] = host_chain[orderval];
2310
    } else if (orderval >= (uint64_t) first_geo_fallback) {
2311
      // LogCvmfs(kLogCvmfs, kLogSyslog,
2312
      //   "this is orderval %u at proxy index %u, using proxy_chain index %u",
2313
      //   orderval, proxyi, fallback_group + orderval - first_geo_fallback);
2314
      (*proxy_groups)[proxyi] = proxy_chain[
2315
            fallback_group + orderval - first_geo_fallback];
2316
      opt_num_proxies_ += (*proxy_groups)[proxyi].size();
2317
      proxyi++;
2318
    }
2319
  }
2320
2321
  delete opt_proxy_groups_;
2322
  opt_proxy_groups_ = proxy_groups;
2323
  // In pathological cases, opt_proxy_groups_current_ can be larger now when
2324
  // proxies changed in-between.
2325
  if (opt_proxy_groups_current_ > opt_proxy_groups_->size()) {
2326
    if (opt_proxy_groups_->size() == 0) {
2327
      opt_proxy_groups_current_ = 0;
2328
    } else {
2329
      opt_proxy_groups_current_ = opt_proxy_groups_->size() - 1;
2330
    }
2331
    opt_proxy_groups_current_burned_ = 0;
2332
  }
2333
2334
  delete opt_host_chain_rtt_;
2335
  opt_host_chain_rtt_ = new vector<int>(host_chain.size(), kProbeGeo);
2336
  opt_host_chain_current_ = 0;
2337
  pthread_mutex_unlock(lock_options_);
2338
2339
  return true;
2340
}
2341
2342
2343
/**
2344
 * Validates a string of the form "1,4,2,3" representing in which order the
2345
 * the expected_size number of hosts should be put for optimal geographic
2346
 * proximity.  Returns false if the reply_order string is invalid, otherwise
2347
 * fills in the reply_vals array with zero-based order indexes (e.g.
2348
 * [0,3,1,2]) and returns true.
2349
 */
2350
14
bool DownloadManager::ValidateGeoReply(
2351
  const string &reply_order,
2352
  const unsigned expected_size,
2353
  vector<uint64_t> *reply_vals)
2354
{
2355
14
  if (reply_order.empty())
2356
1
    return false;
2357
13
  sanitizer::InputSanitizer sanitizer("09 , \n");
2358
13
  if (!sanitizer.IsValid(reply_order))
2359
1
    return false;
2360
12
  sanitizer::InputSanitizer strip_newline("09 ,");
2361
  vector<string> reply_strings =
2362
12
    SplitString(strip_newline.Filter(reply_order), ',');
2363
12
  vector<uint64_t> tmp_vals;
2364
33
  for (unsigned i = 0; i < reply_strings.size(); ++i) {
2365
23
    if (reply_strings[i].empty())
2366
2
      return false;
2367
21
    tmp_vals.push_back(String2Uint64(reply_strings[i]));
2368
  }
2369
10
  if (tmp_vals.size() != expected_size)
2370
5
    return false;
2371
2372
  // Check if tmp_vals contains the number 1..n
2373
5
  set<uint64_t> coverage(tmp_vals.begin(), tmp_vals.end());
2374
5
  if (coverage.size() != tmp_vals.size())
2375
    return false;
2376

5
  if ((*coverage.begin() != 1) || (*coverage.rbegin() != coverage.size()))
2377
1
    return false;
2378
2379
13
  for (unsigned i = 0; i < expected_size; ++i) {
2380
9
    (*reply_vals)[i] = tmp_vals[i] - 1;
2381
  }
2382
4
  return true;
2383
}
2384
2385
2386
/**
2387
 * Removes DIRECT from a list of ';' and '|' separated proxies.
2388
 * \return true if DIRECT was present, false otherwise
2389
 */
2390
165
bool DownloadManager::StripDirect(
2391
  const string &proxy_list,
2392
  string *cleaned_list)
2393
{
2394
165
  assert(cleaned_list);
2395
165
  if (proxy_list == "") {
2396
156
    *cleaned_list = "";
2397
156
    return false;
2398
  }
2399
9
  bool result = false;
2400
2401
9
  vector<string> proxy_groups = SplitString(proxy_list, ';');
2402
9
  vector<string> cleaned_groups;
2403

9
  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2404
22
    vector<string> group = SplitString(proxy_groups[i], '|');
2405
22
    vector<string> cleaned;
2406
59
    for (unsigned j = 0; j < group.size(); ++j) {
2407

37
      if ((group[j] == "DIRECT") || (group[j] == "")) {
2408
23
        result = true;
2409
      } else {
2410
14
        cleaned.push_back(group[j]);
2411
      }
2412
    }
2413
22
    if (!cleaned.empty())
2414
7
      cleaned_groups.push_back(JoinStrings(cleaned, "|"));
2415
  }
2416
2417
9
  *cleaned_list = JoinStrings(cleaned_groups, ";");
2418
9
  return result;
2419
}
2420
2421
2422
/**
2423
 * Parses a list of ';'- and '|'-separated proxy servers and fallback proxy
2424
 *   servers for the proxy groups.
2425
 * The empty string for both removes the proxy chain.
2426
 * The set_mode parameter can be used to set either proxies (leaving fallback
2427
 *   proxies unchanged) or fallback proxies (leaving regular proxies unchanged)
2428
 *   or both.
2429
 */
2430
155
void DownloadManager::SetProxyChain(
2431
  const string &proxy_list,
2432
  const string &fallback_proxy_list,
2433
  const ProxySetModes set_mode)
2434
{
2435
155
  pthread_mutex_lock(lock_options_);
2436
2437
155
  opt_timestamp_backup_proxies_ = 0;
2438
155
  opt_timestamp_failover_proxies_ = 0;
2439
155
  string set_proxy_list = opt_proxy_list_;
2440
155
  string set_proxy_fallback_list = opt_proxy_fallback_list_;
2441
  bool contains_direct;
2442

155
  if ((set_mode == kSetProxyFallback) || (set_mode == kSetProxyBoth)) {
2443
114
    opt_proxy_fallback_list_ = fallback_proxy_list;
2444
  }
2445

155
  if ((set_mode == kSetProxyRegular) || (set_mode == kSetProxyBoth)) {
2446
155
    opt_proxy_list_ = proxy_list;
2447
  }
2448
  contains_direct =
2449
155
    StripDirect(opt_proxy_fallback_list_, &set_proxy_fallback_list);
2450
155
  if (contains_direct) {
2451
    LogCvmfs(kLogDownload, kLogSyslogWarn | kLogDebug,
2452
             "fallback proxies do not support DIRECT, removing");
2453
  }
2454
155
  if (set_proxy_fallback_list == "") {
2455
155
    set_proxy_list = opt_proxy_list_;
2456
  } else {
2457
    bool contains_direct = StripDirect(opt_proxy_list_, &set_proxy_list);
2458
    if (contains_direct) {
2459
      LogCvmfs(kLogDownload, kLogSyslog | kLogDebug,
2460
               "skipping DIRECT proxy to use fallback proxy");
2461
    }
2462
  }
2463
2464
  // From this point on, use set_proxy_list and set_fallback_proxy_list as
2465
  // effective proxy lists!
2466
2467
155
  delete opt_proxy_groups_;
2468

155
  if ((set_proxy_list == "") && (set_proxy_fallback_list == "")) {
2469
41
    opt_proxy_groups_ = NULL;
2470
41
    opt_proxy_groups_current_ = 0;
2471
41
    opt_proxy_groups_current_burned_ = 0;
2472
41
    opt_proxy_groups_fallback_ = 0;
2473
41
    opt_num_proxies_ = 0;
2474
41
    pthread_mutex_unlock(lock_options_);
2475
    return;
2476
  }
2477
2478
  // Determine number of regular proxy groups (== first fallback proxy group)
2479
114
  opt_proxy_groups_fallback_ = 0;
2480
114
  if (set_proxy_list != "") {
2481
114
    opt_proxy_groups_fallback_ = SplitString(set_proxy_list, ';').size();
2482
  }
2483
  LogCvmfs(kLogDownload, kLogDebug, "first fallback proxy group %u",
2484
114
           opt_proxy_groups_fallback_);
2485
2486
  // Concatenate regular proxies and fallback proxies, both of which can be
2487
  // empty.
2488
114
  string all_proxy_list = set_proxy_list;
2489
114
  if (set_proxy_fallback_list != "") {
2490
    if (all_proxy_list != "")
2491
      all_proxy_list += ";";
2492
    all_proxy_list += set_proxy_fallback_list;
2493
  }
2494
  LogCvmfs(kLogDownload, kLogDebug, "full proxy list %s",
2495
114
           all_proxy_list.c_str());
2496
2497
  // Resolve server names in provided urls
2498
114
  vector<string> hostnames;  // All encountered hostnames
2499
114
  vector<string> proxy_groups;
2500
114
  if (all_proxy_list != "")
2501
114
    proxy_groups = SplitString(all_proxy_list, ';');
2502
114
  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2503
114
    vector<string> this_group = SplitString(proxy_groups[i], '|');
2504
114
    for (unsigned j = 0; j < this_group.size(); ++j) {
2505
114
      this_group[j] = dns::AddDefaultScheme(this_group[j]);
2506
      // Note: DIRECT strings will be "extracted" to an empty string.
2507
114
      string hostname = dns::ExtractHost(this_group[j]);
2508
      // Save the hostname.  Leave empty (DIRECT) names so indexes will
2509
      // match later.
2510
114
      hostnames.push_back(hostname);
2511
    }
2512
  }
2513
114
  vector<dns::Host> hosts;
2514
  LogCvmfs(kLogDownload, kLogDebug, "resolving %u proxy addresses",
2515
114
           hostnames.size());
2516
114
  resolver_->ResolveMany(hostnames, &hosts);
2517
2518
  // Construct opt_proxy_groups_: traverse proxy list in same order and expand
2519
  // names to resolved IP addresses.
2520
114
  opt_proxy_groups_ = new vector< vector<ProxyInfo> >();
2521
114
  opt_num_proxies_ = 0;
2522
114
  unsigned num_proxy = 0;  // Combined i, j counter
2523

228
  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2524
114
    vector<string> this_group = SplitString(proxy_groups[i], '|');
2525
    // Construct ProxyInfo objects from proxy string and DNS resolver result for
2526
    // every proxy in this_group.  One URL can result in multiple ProxyInfo
2527
    // objects, one for each IP address.
2528
114
    vector<ProxyInfo> infos;
2529
114
    for (unsigned j = 0; j < this_group.size(); ++j, ++num_proxy) {
2530
114
      this_group[j] = dns::AddDefaultScheme(this_group[j]);
2531
114
      if (this_group[j] == "DIRECT") {
2532
114
        infos.push_back(ProxyInfo("DIRECT"));
2533
114
        continue;
2534
      }
2535
2536
      if (hosts[num_proxy].status() != dns::kFailOk) {
2537
        LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2538
                 "failed to resolve IP addresses for %s (%d - %s)",
2539
                 hosts[num_proxy].name().c_str(), hosts[num_proxy].status(),
2540
                 dns::Code2Ascii(hosts[num_proxy].status()));
2541
        dns::Host failed_host =
2542
          dns::Host::ExtendDeadline(hosts[num_proxy], resolver_->min_ttl());
2543
        infos.push_back(ProxyInfo(failed_host, this_group[j]));
2544
        continue;
2545
      }
2546
2547
      // IPv4 addresses have precedence
2548
      set<string> best_addresses =
2549
        hosts[num_proxy].ViewBestAddresses(opt_ip_preference_);
2550
      set<string>::const_iterator iter_ips = best_addresses.begin();
2551
      for (; iter_ips != best_addresses.end(); ++iter_ips) {
2552
        string url_ip = dns::RewriteUrl(this_group[j], *iter_ips);
2553
        infos.push_back(ProxyInfo(hosts[num_proxy], url_ip));
2554
      }
2555
    }
2556
114
    opt_proxy_groups_->push_back(infos);
2557
114
    opt_num_proxies_ += infos.size();
2558
  }
2559
  LogCvmfs(kLogDownload, kLogDebug,
2560
           "installed %u proxies in %u load-balance groups",
2561
114
           opt_num_proxies_, opt_proxy_groups_->size());
2562
114
  opt_proxy_groups_current_ = 0;
2563
114
  opt_proxy_groups_current_burned_ = 1;
2564
2565
  // Select random start proxy from the first group.
2566
114
  if (opt_proxy_groups_->size() > 0) {
2567
    // Select random start proxy from the first group.
2568
114
    if ((*opt_proxy_groups_)[0].size() > 1) {
2569
      int random_index = prng_.Next((*opt_proxy_groups_)[0].size());
2570
      swap((*opt_proxy_groups_)[0][0], (*opt_proxy_groups_)[0][random_index]);
2571
    }
2572
    // LogCvmfs(kLogDownload, kLogSyslog, "using proxy %s",
2573
    //          (*opt_proxy_groups_)[0][0].c_str());
2574
  }
2575
2576

114
  pthread_mutex_unlock(lock_options_);
2577
}
2578
2579
2580
/**
2581
 * Retrieves the proxy chain, optionally the currently active load-balancing
2582
 *   group, and optionally the index of the first fallback proxy group.
2583
 *   If there are no fallback proxies, the index will equal the size of
2584
 *   the proxy chain.
2585
 */
2586
void DownloadManager::GetProxyInfo(vector< vector<ProxyInfo> > *proxy_chain,
2587
                                   unsigned *current_group,
2588
                                   unsigned *fallback_group)
2589
{
2590
  assert(proxy_chain != NULL);
2591
2592
  pthread_mutex_lock(lock_options_);
2593
2594
  if (!opt_proxy_groups_) {
2595
    pthread_mutex_unlock(lock_options_);
2596
    vector< vector<ProxyInfo> > empty_chain;
2597
    *proxy_chain = empty_chain;
2598
    if (current_group != NULL)
2599
      *current_group = 0;
2600
    if (fallback_group != NULL)
2601
      *fallback_group = 0;
2602
    return;
2603
  }
2604
2605
  *proxy_chain = *opt_proxy_groups_;
2606
  if (current_group != NULL)
2607
    *current_group = opt_proxy_groups_current_;
2608
  if (fallback_group != NULL)
2609
    *fallback_group = opt_proxy_groups_fallback_;
2610
2611
  pthread_mutex_unlock(lock_options_);
2612
}
2613
2614
string DownloadManager::GetProxyList() {
2615
  return opt_proxy_list_;
2616
}
2617
2618
string DownloadManager::GetFallbackProxyList() {
2619
  return opt_proxy_fallback_list_;
2620
}
2621
2622
/**
2623
 * Selects a new random proxy in the current load-balancing group.  Resets the
2624
 * "burned" counter.
2625
 */
2626
void DownloadManager::RebalanceProxiesUnlocked() {
2627
  if (!opt_proxy_groups_)
2628
    return;
2629
2630
  opt_timestamp_failover_proxies_ = 0;
2631
  opt_proxy_groups_current_burned_ = 1;
2632
  vector<ProxyInfo> *group = &((*opt_proxy_groups_)[opt_proxy_groups_current_]);
2633
  int select = prng_.Next(group->size());
2634
  swap((*group)[select], (*group)[0]);
2635
  // LogCvmfs(kLogDownload, kLogDebug | kLogSyslog,
2636
  //          "switching proxy from %s to %s (rebalance)",
2637
  //          (*group)[select].c_str(), swap.c_str());
2638
}
2639
2640
2641
void DownloadManager::RebalanceProxies() {
2642
  pthread_mutex_lock(lock_options_);
2643
  RebalanceProxiesUnlocked();
2644
  pthread_mutex_unlock(lock_options_);
2645
}
2646
2647
2648
/**
2649
 * Switches to the next load-balancing group of proxy servers.
2650
 */
2651
void DownloadManager::SwitchProxyGroup() {
2652
  pthread_mutex_lock(lock_options_);
2653
2654
  if (!opt_proxy_groups_ || (opt_proxy_groups_->size() < 2)) {
2655
    pthread_mutex_unlock(lock_options_);
2656
    return;
2657
  }
2658
2659
  // string old_proxy = (*opt_proxy_groups_)[opt_proxy_groups_current_][0];
2660
  opt_proxy_groups_current_ = (opt_proxy_groups_current_ + 1) %
2661
  opt_proxy_groups_->size();
2662
  opt_proxy_groups_current_burned_ = 1;
2663
  opt_timestamp_backup_proxies_ = time(NULL);
2664
  opt_timestamp_failover_proxies_ = 0;
2665
  // LogCvmfs(kLogDownload, kLogDebug | kLogSyslog,
2666
  //          "switching proxy from %s to %s (manual group change)",
2667
  //          old_proxy.c_str(),
2668
  //          (*opt_proxy_groups_)[opt_proxy_groups_current_][0].c_str());
2669
2670
  pthread_mutex_unlock(lock_options_);
2671
}
2672
2673
2674
void DownloadManager::SetProxyGroupResetDelay(const unsigned seconds) {
2675
  pthread_mutex_lock(lock_options_);
2676
  opt_proxy_groups_reset_after_ = seconds;
2677
  if (opt_proxy_groups_reset_after_ == 0) {
2678
    opt_timestamp_backup_proxies_ = 0;
2679
    opt_timestamp_failover_proxies_ = 0;
2680
  }
2681
  pthread_mutex_unlock(lock_options_);
2682
}
2683
2684
2685
void DownloadManager::SetHostResetDelay(const unsigned seconds)
2686
{
2687
  pthread_mutex_lock(lock_options_);
2688
  opt_host_reset_after_ = seconds;
2689
  if (opt_host_reset_after_ == 0)
2690
    opt_timestamp_backup_host_ = 0;
2691
  pthread_mutex_unlock(lock_options_);
2692
}
2693
2694
2695
80
void DownloadManager::SetRetryParameters(const unsigned max_retries,
2696
                                         const unsigned backoff_init_ms,
2697
                                         const unsigned backoff_max_ms)
2698
{
2699
80
  pthread_mutex_lock(lock_options_);
2700
80
  opt_max_retries_ = max_retries;
2701
80
  opt_backoff_init_ms_ = backoff_init_ms;
2702
80
  opt_backoff_max_ms_ = backoff_max_ms;
2703
80
  pthread_mutex_unlock(lock_options_);
2704
80
}
2705
2706
2707
58
void DownloadManager::SetMaxIpaddrPerProxy(unsigned limit) {
2708
58
  pthread_mutex_lock(lock_options_);
2709
58
  resolver_->set_throttle(limit);
2710
58
  pthread_mutex_unlock(lock_options_);
2711
58
}
2712
2713
2714
59
void DownloadManager::SetProxyTemplates(
2715
  const std::string &direct,
2716
  const std::string &forced)
2717
{
2718
59
  pthread_mutex_lock(lock_options_);
2719
59
  proxy_template_direct_ = direct;
2720
59
  proxy_template_forced_ = forced;
2721
59
  pthread_mutex_unlock(lock_options_);
2722
59
}
2723
2724
2725
void DownloadManager::EnableInfoHeader() {
2726
  enable_info_header_ = true;
2727
}
2728
2729
2730
21
void DownloadManager::EnableRedirects() {
2731
21
  follow_redirects_ = true;
2732
21
}
2733
2734
2735
/**
2736
 * Creates a copy of the existing download manager.  Must only be called in
2737
 * single-threaded stage because it calls curl_global_init().
2738
 */
2739
59
DownloadManager *DownloadManager::Clone(perf::StatisticsTemplate statistics) {
2740
59
  DownloadManager *clone = new DownloadManager();
2741
59
  clone->Init(pool_max_handles_, use_system_proxy_, statistics);
2742
59
  if (resolver_) {
2743
58
    clone->SetDnsParameters(resolver_->retries(), resolver_->timeout_ms());
2744
58
    clone->SetDnsTtlLimits(resolver_->min_ttl(), resolver_->max_ttl());
2745
58
    clone->SetMaxIpaddrPerProxy(resolver_->throttle());
2746
  }
2747
59
  if (!opt_dns_server_.empty())
2748
    clone->SetDnsServer(opt_dns_server_);
2749
59
  clone->opt_timeout_proxy_ = opt_timeout_proxy_;
2750
59
  clone->opt_timeout_direct_ = opt_timeout_direct_;
2751
59
  clone->opt_low_speed_limit_ = opt_low_speed_limit_;
2752
59
  clone->opt_max_retries_ = opt_max_retries_;
2753
59
  clone->opt_backoff_init_ms_ = opt_backoff_init_ms_;
2754
59
  clone->opt_backoff_max_ms_ = opt_backoff_max_ms_;
2755
59
  clone->enable_info_header_ = enable_info_header_;
2756
59
  clone->follow_redirects_ = follow_redirects_;
2757
59
  if (opt_host_chain_) {
2758
57
    clone->opt_host_chain_ = new vector<string>(*opt_host_chain_);
2759
57
    clone->opt_host_chain_rtt_ = new vector<int>(*opt_host_chain_rtt_);
2760
  }
2761
59
  CloneProxyConfig(clone);
2762
59
  clone->opt_ip_preference_ = opt_ip_preference_;
2763
59
  clone->proxy_template_direct_ = proxy_template_direct_;
2764
59
  clone->proxy_template_forced_ = proxy_template_forced_;
2765
59
  clone->opt_proxy_groups_reset_after_ = opt_proxy_groups_reset_after_;
2766
59
  clone->opt_host_reset_after_ = opt_host_reset_after_;
2767
59
  clone->credentials_attachment_ = credentials_attachment_;
2768
2769
59
  return clone;
2770
}
2771
2772
2773
59
void DownloadManager::CloneProxyConfig(DownloadManager *clone) {
2774
59
  clone->opt_proxy_groups_current_ = opt_proxy_groups_current_;
2775
59
  clone->opt_proxy_groups_current_burned_ = opt_proxy_groups_current_burned_;
2776
59
  clone->opt_proxy_groups_fallback_ = opt_proxy_groups_fallback_;
2777
59
  clone->opt_num_proxies_ = opt_num_proxies_;
2778
59
  clone->opt_proxy_list_ = opt_proxy_list_;
2779
59
  clone->opt_proxy_fallback_list_ = opt_proxy_fallback_list_;
2780
59
  if (opt_proxy_groups_ == NULL)
2781
2
    return;
2782
2783
  clone->opt_proxy_groups_ = new vector< vector<ProxyInfo> >(
2784
57
    *opt_proxy_groups_);
2785
}
2786
2787
}  // namespace download