CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
download.cc
Go to the documentation of this file.
1 
27 // TODO(jblomer): MS for time summing
28 // NOLINTNEXTLINE
29 #define __STDC_FORMAT_MACROS
30 
31 
32 #include "download.h"
33 
34 #include <alloca.h>
35 #include <errno.h>
36 #include <inttypes.h>
37 #include <poll.h>
38 #include <pthread.h>
39 #include <signal.h>
40 #include <stdint.h>
41 #include <sys/time.h>
42 #include <unistd.h>
43 
44 #include <algorithm>
45 #include <cassert>
46 #include <cstdio>
47 #include <cstdlib>
48 #include <cstring>
49 #include <map>
50 #include <set>
51 #include <utility>
52 
54 #include "crypto/hash.h"
55 #include "duplex_curl.h"
56 #include "interrupt.h"
57 #include "sanitizer.h"
58 #include "ssl.h"
59 #include "util/algorithm.h"
60 #include "util/atomic.h"
61 #include "util/concurrency.h"
62 #include "util/exception.h"
63 #include "util/logging.h"
64 #include "util/posix.h"
65 #include "util/prng.h"
66 #include "util/smalloc.h"
67 #include "util/string.h"
68 
69 using namespace std; // NOLINT
70 
71 namespace download {
72 
85 bool Interrupted(const std::string &fqrn, JobInfo *info) {
86  if (info->allow_failure()) {
87  return true;
88  }
89 
90  if (!fqrn.empty()) {
91  // it is up to the user the create this sentinel file ("pause_file") if
92  // CVMFS_FAILOVER_INDEFINITELY is used. It must be created during
93  // "cvmfs_config reload" and "cvmfs_config reload $fqrn"
94  std::string pause_file = std::string("/var/run/cvmfs/interrupt.") + fqrn;
95 
97  "(id %" PRId64 ") Interrupted(): checking for existence of %s",
98  info->id(), pause_file.c_str());
99  if (FileExists(pause_file)) {
101  "(id %" PRId64 ") Interrupt marker found - "
102  "Interrupting current download, this will EIO outstanding IO.",
103  info->id());
104  if (0 != unlink(pause_file.c_str())) {
106  "(id %" PRId64 ") Couldn't delete interrupt marker: errno=%d",
107  info->id(), errno);
108  }
109  return true;
110  }
111  }
112  return false;
113 }
114 
116  if (info->sink() != NULL && !info->sink()->IsValid()) {
117  cvmfs::PathSink *psink = dynamic_cast<cvmfs::PathSink *>(info->sink());
118  if (psink != NULL) {
120  "(id %" PRId64 ") Failed to open path %s: %s (errno=%d).",
121  info->id(), psink->path().c_str(), strerror(errno), errno);
122  return kFailLocalIO;
123  } else {
125  "(id %" PRId64 ") "
126  "Failed to create a valid sink: \n %s",
127  info->id(), info->sink()->Describe().c_str());
128  return kFailOther;
129  }
130  }
131 
132  return kFailOk;
133 }
134 
135 
139 static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb,
140  void *info_link) {
141  const size_t num_bytes = size * nmemb;
142  const string header_line(static_cast<const char *>(ptr), num_bytes);
143  JobInfo *info = static_cast<JobInfo *>(info_link);
144 
145  // LogCvmfs(kLogDownload, kLogDebug, "REMOVE-ME: Header callback with %s",
146  // header_line.c_str());
147 
148  // Check http status codes
149  if (HasPrefix(header_line, "HTTP/1.", false)) {
150  if (header_line.length() < 10) {
151  return 0;
152  }
153 
154  unsigned i;
155  for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {
156  }
157 
158  // Code is initialized to -1
159  if (header_line.length() > i + 2) {
160  info->SetHttpCode(DownloadManager::ParseHttpCode(&header_line[i]));
161  }
162 
163  if ((info->http_code() / 100) == 2) {
164  return num_bytes;
165  } else if ((info->http_code() == 301) || (info->http_code() == 302)
166  || (info->http_code() == 303) || (info->http_code() == 307)) {
167  if (!info->follow_redirects()) {
169  "(id %" PRId64 ") redirect support not enabled: %s",
170  info->id(), header_line.c_str());
172  return 0;
173  }
174  LogCvmfs(kLogDownload, kLogDebug, "(id %" PRId64 ") http redirect: %s",
175  info->id(), header_line.c_str());
176  // libcurl will handle this because of CURLOPT_FOLLOWLOCATION
177  return num_bytes;
178  } else {
180  "(id %" PRId64 ") http status error code: %s [%d]", info->id(),
181  header_line.c_str(), info->http_code());
182  if (((info->http_code() / 100) == 5) || (info->http_code() == 400)
183  || (info->http_code() == 404)) {
184  // 5XX returned by host
185  // 400: error from the GeoAPI module
186  // 404: the stratum 1 does not have the newest files
188  } else if (info->http_code() == 429) {
189  // 429: rate throttling (we ignore the backoff hint for the time being)
191  } else {
192  info->SetErrorCode((info->proxy() == "DIRECT") ? kFailHostHttp
193  : kFailProxyHttp);
194  }
195  return 0;
196  }
197  }
198 
199  // If needed: allocate space in sink
200  if (info->sink() != NULL && info->sink()->RequiresReserve()
201  && HasPrefix(header_line, "CONTENT-LENGTH:", true)) {
202  char *tmp = reinterpret_cast<char *>(alloca(num_bytes + 1));
203  uint64_t length = 0;
204  sscanf(header_line.c_str(), "%s %" PRIu64, tmp, &length);
205  if (length > 0) {
206  if (!info->sink()->Reserve(length)) {
208  "(id %" PRId64 ") "
209  "resource %s too large to store in memory (%" PRIu64 ")",
210  info->id(), info->url()->c_str(), length);
211  info->SetErrorCode(kFailTooBig);
212  return 0;
213  }
214  } else {
215  // Empty resource
216  info->sink()->Reserve(0);
217  }
218  } else if (HasPrefix(header_line, "LOCATION:", true)) {
219  // This comes along with redirects
220  LogCvmfs(kLogDownload, kLogDebug, "(id %" PRId64 ") %s", info->id(),
221  header_line.c_str());
222  } else if (HasPrefix(header_line, "LINK:", true)) {
223  // This is metalink info
224  LogCvmfs(kLogDownload, kLogDebug, "(id %" PRId64 ") %s", info->id(),
225  header_line.c_str());
226  std::string link = info->link();
227  if (link.size() != 0) {
228  // multiple LINK headers are allowed
229  link = link + ", " + header_line.substr(5);
230  } else {
231  link = header_line.substr(5);
232  }
233  info->SetLink(link);
234  } else if (HasPrefix(header_line, "X-SQUID-ERROR:", true)) {
235  // Reinterpret host error as proxy error
236  if (info->error_code() == kFailHostHttp) {
238  }
239  } else if (HasPrefix(header_line, "PROXY-STATUS:", true)) {
240  // Reinterpret host error as proxy error if applicable
241  if ((info->error_code() == kFailHostHttp)
242  && (header_line.find("error=") != string::npos)) {
244  }
245  }
246 
247  return num_bytes;
248 }
249 
250 
254 static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb,
255  void *info_link) {
256  const size_t num_bytes = size * nmemb;
257  JobInfo *info = static_cast<JobInfo *>(info_link);
258 
259  // TODO(heretherebedragons) remove if no error comes up
260  // as this means only jobinfo data request (and not header only)
261  // come here
262  assert(info->sink() != NULL);
263 
264  // LogCvmfs(kLogDownload, kLogDebug, "Data callback, %d bytes", num_bytes);
265 
266  if (num_bytes == 0)
267  return 0;
268 
269  if (info->expected_hash()) {
270  shash::Update(reinterpret_cast<unsigned char *>(ptr), num_bytes,
271  info->hash_context());
272  }
273 
274  if (info->compressed()) {
276  ptr, static_cast<int64_t>(num_bytes), info->GetZstreamPtr(),
277  info->sink());
278  if (retval == zlib::kStreamDataError) {
280  "(id %" PRId64 ") failed to decompress %s", info->id(),
281  info->url()->c_str());
282  info->SetErrorCode(kFailBadData);
283  return 0;
284  } else if (retval == zlib::kStreamIOError) {
286  "(id %" PRId64 ") decompressing %s, local IO error", info->id(),
287  info->url()->c_str());
288  info->SetErrorCode(kFailLocalIO);
289  return 0;
290  }
291  } else {
292  int64_t written = info->sink()->Write(ptr, num_bytes);
293  if (written < 0 || static_cast<uint64_t>(written) != num_bytes) {
295  "(id %" PRId64 ") "
296  "Failed to perform write of %zu bytes to sink %s with errno %ld",
297  info->id(), num_bytes, info->sink()->Describe().c_str(),
298  written);
299  }
300  }
301 
302  return num_bytes;
303 }
304 
305 #ifdef DEBUGMSG
306 static int CallbackCurlDebug(CURL *handle,
307  curl_infotype type,
308  char *data,
309  size_t size,
310  void * /* clientp */) {
311  JobInfo *info;
312  curl_easy_getinfo(handle, CURLINFO_PRIVATE, &info);
313 
314  std::string prefix = "(id " + StringifyInt(info->id()) + ") ";
315  switch (type) {
316  case CURLINFO_TEXT:
317  prefix += "{info} ";
318  break;
319  case CURLINFO_HEADER_IN:
320  prefix += "{header/recv} ";
321  break;
322  case CURLINFO_HEADER_OUT:
323  prefix += "{header/sent} ";
324  break;
325  case CURLINFO_DATA_IN:
326  if (size < 50) {
327  prefix += "{data/recv} ";
328  break;
329  } else {
330  LogCvmfs(kLogCurl, kLogDebug, "%s{data/recv} <snip>", prefix.c_str());
331  return 0;
332  }
333  case CURLINFO_DATA_OUT:
334  if (size < 50) {
335  prefix += "{data/sent} ";
336  break;
337  } else {
338  LogCvmfs(kLogCurl, kLogDebug, "%s{data/sent} <snip>", prefix.c_str());
339  return 0;
340  }
341  case CURLINFO_SSL_DATA_IN:
342  if (size < 50) {
343  prefix += "{ssldata/recv} ";
344  break;
345  } else {
346  LogCvmfs(kLogCurl, kLogDebug, "%s{ssldata/recv} <snip>",
347  prefix.c_str());
348  return 0;
349  }
350  case CURLINFO_SSL_DATA_OUT:
351  if (size < 50) {
352  prefix += "{ssldata/sent} ";
353  break;
354  } else {
355  LogCvmfs(kLogCurl, kLogDebug, "%s{ssldata/sent} <snip>",
356  prefix.c_str());
357  return 0;
358  }
359  default:
360  // just log the message
361  break;
362  }
363 
364  bool valid_char = true;
365  std::string msg(data, size);
366  for (size_t i = 0; i < msg.length(); ++i) {
367  if (msg[i] == '\0') {
368  msg[i] = '~';
369  }
370 
371  // verify that char is a valid printable char
372  if ((msg[i] < ' ' || msg[i] > '~')
373  && (msg[i] != 10 /*line feed*/
374  && msg[i] != 13 /*carriage return*/)) {
375  valid_char = false;
376  }
377  }
378 
379  if (!valid_char) {
380  msg = "<Non-plaintext sequence>";
381  }
382 
383  LogCvmfs(kLogCurl, kLogDebug, "%s%s", prefix.c_str(),
384  Trim(msg, true /* trim_newline */).c_str());
385  return 0;
386 }
387 #endif
388 
389 //------------------------------------------------------------------------------
390 
391 
392 const int DownloadManager::kProbeUnprobed = -1;
393 const int DownloadManager::kProbeDown = -2;
394 const int DownloadManager::kProbeGeo = -3;
395 
396 bool DownloadManager::EscapeUrlChar(unsigned char input, char output[3]) {
397  if (((input >= '0') && (input <= '9')) || ((input >= 'A') && (input <= 'Z'))
398  || ((input >= 'a') && (input <= 'z')) || (input == '/') || (input == ':')
399  || (input == '.') || (input == '@') || (input == '+') || (input == '-')
400  || (input == '_') || (input == '~') || (input == '[') || (input == ']')
401  || (input == ',')) {
402  output[0] = static_cast<char>(input);
403  return false;
404  }
405 
406  output[0] = '%';
407  output[1] = static_cast<char>((input / 16)
408  + ((input / 16 <= 9) ? '0' : 'A' - 10));
409  output[2] = static_cast<char>((input % 16)
410  + ((input % 16 <= 9) ? '0' : 'A' - 10));
411  return true;
412 }
413 
414 
419 string DownloadManager::EscapeUrl(const int64_t jobinfo_id, const string &url) {
420  string escaped;
421  escaped.reserve(url.length());
422 
423  char escaped_char[3];
424  for (unsigned i = 0, s = url.length(); i < s; ++i) {
425  if (EscapeUrlChar(url[i], escaped_char)) {
426  escaped.append(escaped_char, 3);
427  } else {
428  escaped.push_back(escaped_char[0]);
429  }
430  }
431  LogCvmfs(kLogDownload, kLogDebug, "(id %" PRId64 ") escaped %s to %s",
432  jobinfo_id, url.c_str(), escaped.c_str());
433 
434  return escaped;
435 }
436 
441 unsigned DownloadManager::EscapeHeader(const string &header,
442  char *escaped_buf,
443  size_t buf_size) {
444  unsigned esc_pos = 0;
445  char escaped_char[3];
446  for (unsigned i = 0, s = header.size(); i < s; ++i) {
447  if (EscapeUrlChar(header[i], escaped_char)) {
448  for (unsigned j = 0; j < 3; ++j) {
449  if (escaped_buf) {
450  if (esc_pos >= buf_size)
451  return esc_pos;
452  escaped_buf[esc_pos] = escaped_char[j];
453  }
454  esc_pos++;
455  }
456  } else {
457  if (escaped_buf) {
458  if (esc_pos >= buf_size)
459  return esc_pos;
460  escaped_buf[esc_pos] = escaped_char[0];
461  }
462  esc_pos++;
463  }
464  }
465 
466  return esc_pos;
467 }
468 
472 int DownloadManager::ParseHttpCode(const char digits[3]) {
473  int result = 0;
474  int factor = 100;
475  for (int i = 0; i < 3; ++i) {
476  if ((digits[i] < '0') || (digits[i] > '9'))
477  return -1;
478  result += (digits[i] - '0') * factor;
479  factor /= 10;
480  }
481  return result;
482 }
483 
484 
488 int DownloadManager::CallbackCurlSocket(CURL * /* easy */,
489  curl_socket_t s,
490  int action,
491  void *userp,
492  void * /* socketp */) {
493  // LogCvmfs(kLogDownload, kLogDebug, "CallbackCurlSocket called with easy "
494  // "handle %p, socket %d, action %d", easy, s, action);
495  DownloadManager *download_mgr = static_cast<DownloadManager *>(userp);
496  if (action == CURL_POLL_NONE)
497  return 0;
498 
499  // Find s in watch_fds_
500  unsigned index;
501 
502  // TODO(heretherebedragons) why start at index = 0 and not 2?
503  // fd[0] and fd[1] are fixed?
504  for (index = 0; index < download_mgr->watch_fds_inuse_; ++index) {
505  if (download_mgr->watch_fds_[index].fd == s)
506  break;
507  }
508  // Or create newly
509  if (index == download_mgr->watch_fds_inuse_) {
510  // Extend array if necessary
511  if (download_mgr->watch_fds_inuse_ == download_mgr->watch_fds_size_) {
512  assert(download_mgr->watch_fds_size_ > 0);
513  download_mgr->watch_fds_size_ *= 2;
514  download_mgr->watch_fds_ = static_cast<struct pollfd *>(
515  srealloc(download_mgr->watch_fds_,
516  download_mgr->watch_fds_size_ * sizeof(struct pollfd)));
517  }
518  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].fd = s;
519  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].events = 0;
520  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].revents = 0;
521  download_mgr->watch_fds_inuse_++;
522  }
523 
524  switch (action) {
525  case CURL_POLL_IN:
526  download_mgr->watch_fds_[index].events = POLLIN | POLLPRI;
527  break;
528  case CURL_POLL_OUT:
529  download_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND;
530  break;
531  case CURL_POLL_INOUT:
532  download_mgr->watch_fds_[index].events = POLLIN | POLLPRI | POLLOUT
533  | POLLWRBAND;
534  break;
535  case CURL_POLL_REMOVE:
536  if (index < download_mgr->watch_fds_inuse_ - 1) {
537  download_mgr
538  ->watch_fds_[index] = download_mgr->watch_fds_
539  [download_mgr->watch_fds_inuse_ - 1];
540  }
541  download_mgr->watch_fds_inuse_--;
542  // Shrink array if necessary
543  if ((download_mgr->watch_fds_inuse_ > download_mgr->watch_fds_max_)
544  && (download_mgr->watch_fds_inuse_
545  < download_mgr->watch_fds_size_ / 2)) {
546  download_mgr->watch_fds_size_ /= 2;
547  // LogCvmfs(kLogDownload, kLogDebug, "shrinking watch_fds_ (%d)",
548  // watch_fds_size_);
549  download_mgr->watch_fds_ = static_cast<struct pollfd *>(
550  srealloc(download_mgr->watch_fds_,
551  download_mgr->watch_fds_size_ * sizeof(struct pollfd)));
552  // LogCvmfs(kLogDownload, kLogDebug, "shrinking watch_fds_ done",
553  // watch_fds_size_);
554  }
555  break;
556  default:
557  break;
558  }
559 
560  return 0;
561 }
562 
563 
567 void *DownloadManager::MainDownload(void *data) {
568  DownloadManager *download_mgr = static_cast<DownloadManager *>(data);
570  "download I/O thread of DownloadManager '%s' started",
571  download_mgr->name_.c_str());
572 
573  const int kIdxPipeTerminate = 0;
574  const int kIdxPipeJobs = 1;
575 
576  download_mgr->watch_fds_ = static_cast<struct pollfd *>(
577  smalloc(2 * sizeof(struct pollfd)));
578  download_mgr->watch_fds_size_ = 2;
579  download_mgr->watch_fds_[kIdxPipeTerminate].fd = download_mgr->pipe_terminate_
580  ->GetReadFd();
581  download_mgr->watch_fds_[kIdxPipeTerminate].events = POLLIN | POLLPRI;
582  download_mgr->watch_fds_[kIdxPipeTerminate].revents = 0;
583  download_mgr->watch_fds_[kIdxPipeJobs].fd = download_mgr->pipe_jobs_
584  ->GetReadFd();
585  download_mgr->watch_fds_[kIdxPipeJobs].events = POLLIN | POLLPRI;
586  download_mgr->watch_fds_[kIdxPipeJobs].revents = 0;
587  download_mgr->watch_fds_inuse_ = 2;
588 
589  int still_running = 0;
590  struct timeval timeval_start, timeval_stop;
591  gettimeofday(&timeval_start, NULL);
592  while (true) {
593  int timeout;
594  if (still_running) {
595  /* NOTE: The following might degrade the performance for many small files
596  * use case. TODO(jblomer): look into it.
597  // Specify a timeout for polling in ms; this allows us to return
598  // to libcurl once a second so it can look for internal operations
599  // which timed out. libcurl has a more elaborate mechanism
600  // (CURLMOPT_TIMERFUNCTION) that would inform us of the next potential
601  // timeout. TODO(bbockelm) we should switch to that in the future.
602  timeout = 100;
603  */
604  timeout = 1;
605  } else {
606  timeout = -1;
607  gettimeofday(&timeval_stop, NULL);
608  int64_t delta = static_cast<int64_t>(
609  1000 * DiffTimeSeconds(timeval_start, timeval_stop));
610  perf::Xadd(download_mgr->counters_->sz_transfer_time, delta);
611  }
612  int retval = poll(download_mgr->watch_fds_, download_mgr->watch_fds_inuse_,
613  timeout);
614  if (retval < 0) {
615  continue;
616  }
617 
618  // Handle timeout
619  if (retval == 0) {
620  curl_multi_socket_action(
621  download_mgr->curl_multi_, CURL_SOCKET_TIMEOUT, 0, &still_running);
622  }
623 
624  // Terminate I/O thread
625  if (download_mgr->watch_fds_[kIdxPipeTerminate].revents)
626  break;
627 
628  // New job arrives
629  if (download_mgr->watch_fds_[kIdxPipeJobs].revents) {
630  download_mgr->watch_fds_[kIdxPipeJobs].revents = 0;
631  JobInfo *info;
632  download_mgr->pipe_jobs_->Read<JobInfo *>(&info);
633  if (!still_running) {
634  gettimeofday(&timeval_start, NULL);
635  }
636  CURL *handle = download_mgr->AcquireCurlHandle();
637  download_mgr->InitializeRequest(info, handle);
638  download_mgr->SetUrlOptions(info);
639  curl_multi_add_handle(download_mgr->curl_multi_, handle);
640  curl_multi_socket_action(
641  download_mgr->curl_multi_, CURL_SOCKET_TIMEOUT, 0, &still_running);
642  }
643 
644  // Activity on curl sockets
645  // Within this loop the curl_multi_socket_action() may cause socket(s)
646  // to be removed from watch_fds_. If a socket is removed it is replaced
647  // by the socket at the end of the array and the inuse count is decreased.
648  // Therefore loop over the array in reverse order.
649  for (int64_t i = download_mgr->watch_fds_inuse_ - 1; i >= 2; --i) {
650  if (i >= download_mgr->watch_fds_inuse_) {
651  continue;
652  }
653  if (download_mgr->watch_fds_[i].revents) {
654  int ev_bitmask = 0;
655  if (download_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI))
656  ev_bitmask |= CURL_CSELECT_IN;
657  if (download_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
658  ev_bitmask |= CURL_CSELECT_OUT;
659  if (download_mgr->watch_fds_[i].revents
660  & (POLLERR | POLLHUP | POLLNVAL)) {
661  ev_bitmask |= CURL_CSELECT_ERR;
662  }
663  download_mgr->watch_fds_[i].revents = 0;
664 
665  curl_multi_socket_action(download_mgr->curl_multi_,
666  download_mgr->watch_fds_[i].fd,
667  ev_bitmask,
668  &still_running);
669  }
670  }
671 
672  // Check if transfers are completed
673  CURLMsg *curl_msg;
674  int msgs_in_queue;
675  while ((curl_msg = curl_multi_info_read(download_mgr->curl_multi_,
676  &msgs_in_queue))) {
677  if (curl_msg->msg == CURLMSG_DONE) {
678  perf::Inc(download_mgr->counters_->n_requests);
679  JobInfo *info;
680  CURL *easy_handle = curl_msg->easy_handle;
681  int curl_error = curl_msg->data.result;
682  curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
683 
684  int64_t redir_count;
685  curl_easy_getinfo(easy_handle, CURLINFO_REDIRECT_COUNT, &redir_count);
687  "(manager '%s' - id %" PRId64 ") "
688  "Number of CURL redirects %" PRId64,
689  download_mgr->name_.c_str(), info->id(), redir_count);
690 
691  curl_multi_remove_handle(download_mgr->curl_multi_, easy_handle);
692  if (download_mgr->VerifyAndFinalize(curl_error, info)) {
693  curl_multi_add_handle(download_mgr->curl_multi_, easy_handle);
694  curl_multi_socket_action(download_mgr->curl_multi_,
695  CURL_SOCKET_TIMEOUT,
696  0,
697  &still_running);
698  } else {
699  // Return easy handle into pool and write result back
700  download_mgr->ReleaseCurlHandle(easy_handle);
701 
703  info->GetDataTubePtr()->EnqueueBack(ele);
705  info->error_code());
706  }
707  }
708  }
709  }
710 
711  for (set<CURL *>::iterator i = download_mgr->pool_handles_inuse_->begin(),
712  iEnd = download_mgr->pool_handles_inuse_->end();
713  i != iEnd;
714  ++i) {
715  curl_multi_remove_handle(download_mgr->curl_multi_, *i);
716  curl_easy_cleanup(*i);
717  }
718  download_mgr->pool_handles_inuse_->clear();
719  free(download_mgr->watch_fds_);
720 
722  "download I/O thread of DownloadManager '%s' terminated",
723  download_mgr->name_.c_str());
724  return NULL;
725 }
726 
727 
728 //------------------------------------------------------------------------------
729 
730 
731 HeaderLists::~HeaderLists() {
732  for (unsigned i = 0; i < blocks_.size(); ++i) {
733  delete[] blocks_[i];
734  }
735  blocks_.clear();
736 }
737 
738 
739 curl_slist *HeaderLists::GetList(const char *header) { return Get(header); }
740 
741 
742 curl_slist *HeaderLists::DuplicateList(curl_slist *slist) {
743  assert(slist);
744  curl_slist *copy = GetList(slist->data);
745  copy->next = slist->next;
746  curl_slist *prev = copy;
747  slist = slist->next;
748  while (slist) {
749  curl_slist *new_link = Get(slist->data);
750  new_link->next = slist->next;
751  prev->next = new_link;
752  prev = new_link;
753  slist = slist->next;
754  }
755  return copy;
756 }
757 
758 
759 void HeaderLists::AppendHeader(curl_slist *slist, const char *header) {
760  assert(slist);
761  curl_slist *new_link = Get(header);
762  new_link->next = NULL;
763 
764  while (slist->next)
765  slist = slist->next;
766  slist->next = new_link;
767 }
768 
769 
775 void HeaderLists::CutHeader(const char *header, curl_slist **slist) {
776  assert(slist);
777  curl_slist head;
778  head.next = *slist;
779  curl_slist *prev = &head;
780  curl_slist *rover = *slist;
781  while (rover) {
782  if (strcmp(rover->data, header) == 0) {
783  prev->next = rover->next;
784  Put(rover);
785  rover = prev;
786  }
787  prev = rover;
788  rover = rover->next;
789  }
790  *slist = head.next;
791 }
792 
793 
794 void HeaderLists::PutList(curl_slist *slist) {
795  while (slist) {
796  curl_slist *next = slist->next;
797  Put(slist);
798  slist = next;
799  }
800 }
801 
802 
803 string HeaderLists::Print(curl_slist *slist) {
804  string verbose;
805  while (slist) {
806  verbose += string(slist->data) + "\n";
807  slist = slist->next;
808  }
809  return verbose;
810 }
811 
812 
813 curl_slist *HeaderLists::Get(const char *header) {
814  for (unsigned i = 0; i < blocks_.size(); ++i) {
815  for (unsigned j = 0; j < kBlockSize; ++j) {
816  if (!IsUsed(&(blocks_[i][j]))) {
817  blocks_[i][j].data = const_cast<char *>(header);
818  return &(blocks_[i][j]);
819  }
820  }
821  }
822 
823  // All used, new block
824  AddBlock();
825  blocks_[blocks_.size() - 1][0].data = const_cast<char *>(header);
826  return &(blocks_[blocks_.size() - 1][0]);
827 }
828 
829 
830 void HeaderLists::Put(curl_slist *slist) {
831  slist->data = NULL;
832  slist->next = NULL;
833 }
834 
835 
836 void HeaderLists::AddBlock() {
837  curl_slist *new_block = new curl_slist[kBlockSize];
838  for (unsigned i = 0; i < kBlockSize; ++i) {
839  Put(&new_block[i]);
840  }
841  blocks_.push_back(new_block);
842 }
843 
844 
845 //------------------------------------------------------------------------------
846 
847 
848 string DownloadManager::ProxyInfo::Print() {
849  if (url == "DIRECT")
850  return url;
851 
852  string result = url;
853  int remaining = static_cast<int>(host.deadline())
854  - static_cast<int>(time(NULL));
855  string expinfo = (remaining >= 0) ? "+" : "";
856  if (abs(remaining) >= 3600) {
857  expinfo += StringifyInt(remaining / 3600) + "h";
858  } else if (abs(remaining) >= 60) {
859  expinfo += StringifyInt(remaining / 60) + "m";
860  } else {
861  expinfo += StringifyInt(remaining) + "s";
862  }
863  if (host.status() == dns::kFailOk) {
864  result += " (" + host.name() + ", " + expinfo + ")";
865  } else {
866  result += " (:unresolved:, " + expinfo + ")";
867  }
868  return result;
869 }
870 
871 
876 CURL *DownloadManager::AcquireCurlHandle() {
877  CURL *handle;
878 
879  if (pool_handles_idle_->empty()) {
880  // Create a new handle
881  handle = curl_easy_init();
882  assert(handle != NULL);
883 
884  curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
885  // curl_easy_setopt(curl_default, CURLOPT_FAILONERROR, 1);
886  curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION, CallbackCurlHeader);
887  curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlData);
888  } else {
889  handle = *(pool_handles_idle_->begin());
890  pool_handles_idle_->erase(pool_handles_idle_->begin());
891  }
892 
893  pool_handles_inuse_->insert(handle);
894 
895  return handle;
896 }
897 
898 
899 void DownloadManager::ReleaseCurlHandle(CURL *handle) {
900  set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
901  assert(elem != pool_handles_inuse_->end());
902 
903  if (pool_handles_idle_->size() > pool_max_handles_) {
904  curl_easy_cleanup(*elem);
905  } else {
906  pool_handles_idle_->insert(*elem);
907  }
908 
909  pool_handles_inuse_->erase(elem);
910 }
911 
912 
917 void DownloadManager::InitializeRequest(JobInfo *info, CURL *handle) {
918  // Initialize internal download state
919  info->SetCurlHandle(handle);
920  info->SetErrorCode(kFailOk);
921  info->SetHttpCode(-1);
922  info->SetFollowRedirects(follow_redirects_);
923  info->SetNumUsedProxies(1);
924  info->SetNumUsedMetalinks(1);
925  info->SetNumUsedHosts(1);
926  info->SetNumRetries(0);
927  info->SetBackoffMs(0);
928  info->SetHeaders(header_lists_->DuplicateList(default_headers_));
929  if (info->info_header()) {
930  header_lists_->AppendHeader(info->headers(), info->info_header());
931  }
932  if (enable_http_tracing_) {
933  for (unsigned int i = 0; i < http_tracing_headers_.size(); i++) {
934  header_lists_->AppendHeader(info->headers(),
935  (http_tracing_headers_)[i].c_str());
936  }
937 
938  header_lists_->AppendHeader(info->headers(), info->tracing_header_pid());
939  header_lists_->AppendHeader(info->headers(), info->tracing_header_gid());
940  header_lists_->AppendHeader(info->headers(), info->tracing_header_uid());
941 
943  "(manager '%s' - id %" PRId64 ") "
944  "CURL Header for URL: %s is:\n %s",
945  name_.c_str(), info->id(), info->url()->c_str(),
946  header_lists_->Print(info->headers()).c_str());
947  }
948 
949  if (info->force_nocache()) {
950  SetNocache(info);
951  } else {
952  info->SetNocache(false);
953  }
954  if (info->compressed()) {
956  }
957  if (info->expected_hash()) {
958  assert(info->hash_context().buffer != NULL);
959  shash::Init(info->hash_context());
960  }
961 
962  if ((info->range_offset() != -1) && (info->range_size())) {
963  char byte_range_array[100];
964  const int64_t range_lower = static_cast<int64_t>(info->range_offset());
965  const int64_t range_upper = static_cast<int64_t>(info->range_offset()
966  + info->range_size() - 1);
967  if (snprintf(byte_range_array, sizeof(byte_range_array),
968  "%" PRId64 "-%" PRId64, range_lower, range_upper)
969  == 100) {
970  PANIC(NULL); // Should be impossible given limits on offset size.
971  }
972  curl_easy_setopt(handle, CURLOPT_RANGE, byte_range_array);
973  } else {
974  curl_easy_setopt(handle, CURLOPT_RANGE, NULL);
975  }
976 
977  // Set curl parameters
978  curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
979  curl_easy_setopt(handle, CURLOPT_WRITEHEADER, static_cast<void *>(info));
980  curl_easy_setopt(handle, CURLOPT_WRITEDATA, static_cast<void *>(info));
981  curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->headers());
982  if (info->head_request()) {
983  curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
984  } else {
985  curl_easy_setopt(handle, CURLOPT_HTTPGET, 1);
986  }
987  if (opt_ipv4_only_) {
988  curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
989  }
990  if (follow_redirects_) {
991  curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1);
992  curl_easy_setopt(handle, CURLOPT_MAXREDIRS, 4);
993  }
994 #ifdef DEBUGMSG
995  curl_easy_setopt(handle, CURLOPT_VERBOSE, 1);
996  curl_easy_setopt(handle, CURLOPT_DEBUGFUNCTION, CallbackCurlDebug);
997 #endif
998 }
999 
1000 void DownloadManager::CheckHostInfoReset(const std::string &typ,
1001  HostInfo &info,
1002  JobInfo *jobinfo,
1003  time_t &now) {
1004  if (info.timestamp_backup > 0) {
1005  if (now == 0)
1006  now = time(NULL);
1007  if (static_cast<int64_t>(now)
1008  > static_cast<int64_t>(info.timestamp_backup + info.reset_after)) {
1010  "(manager %s - id %" PRId64 ") "
1011  "switching %s from %s to %s (reset %s)",
1012  name_.c_str(), jobinfo->id(), typ.c_str(),
1013  (*info.chain)[info.current].c_str(), (*info.chain)[0].c_str(),
1014  typ.c_str());
1015  info.current = 0;
1016  info.timestamp_backup = 0;
1017  }
1018  }
1019 }
1020 
1021 
1026 void DownloadManager::SetUrlOptions(JobInfo *info) {
1027  CURL *curl_handle = info->curl_handle();
1028  string url_prefix;
1029  time_t now = 0;
1030 
1031  MutexLockGuard m(lock_options_);
1032 
1033  // sharding policy
1034  if (sharding_policy_.UseCount() > 0) {
1035  if (info->proxy() != "") {
1036  // proxy already set, so this is a failover event
1037  perf::Inc(counters_->n_proxy_failover);
1038  }
1039  info->SetProxy(sharding_policy_->GetNextProxy(
1040  info->url(), info->proxy(),
1041  info->range_offset() == -1 ? 0 : info->range_offset()));
1042 
1043  curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY, info->proxy().c_str());
1044  } else { // no sharding policy
1045  // Check if proxy group needs to be reset from backup to primary
1046  if (opt_timestamp_backup_proxies_ > 0) {
1047  now = time(NULL);
1048  if (static_cast<int64_t>(now) > static_cast<int64_t>(
1049  opt_timestamp_backup_proxies_ + opt_proxy_groups_reset_after_)) {
1050  opt_proxy_groups_current_ = 0;
1051  opt_timestamp_backup_proxies_ = 0;
1052  RebalanceProxiesUnlocked("Reset proxy group from backup to primary");
1053  }
1054  }
1055  // Check if load-balanced proxies within the group need to be reset
1056  if (opt_timestamp_failover_proxies_ > 0) {
1057  if (now == 0)
1058  now = time(NULL);
1059  if (static_cast<int64_t>(now)
1060  > static_cast<int64_t>(opt_timestamp_failover_proxies_
1061  + opt_proxy_groups_reset_after_)) {
1062  RebalanceProxiesUnlocked(
1063  "Reset load-balanced proxies within the active group");
1064  }
1065  }
1066 
1067  ProxyInfo *proxy = ChooseProxyUnlocked(info->expected_hash());
1068  if (!proxy || (proxy->url == "DIRECT")) {
1069  info->SetProxy("DIRECT");
1070  curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY, "");
1071  } else {
1072  // Note: inside ValidateProxyIpsUnlocked() we may change the proxy data
1073  // structure, so we must not pass proxy->... (== current_proxy())
1074  // parameters directly
1075  std::string purl = proxy->url;
1076  dns::Host phost = proxy->host;
1077  const bool changed = ValidateProxyIpsUnlocked(purl, phost);
1078  // Current proxy may have changed
1079  if (changed) {
1080  proxy = ChooseProxyUnlocked(info->expected_hash());
1081  }
1082  info->SetProxy(proxy->url);
1083  if (proxy->host.status() == dns::kFailOk) {
1084  curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY,
1085  info->proxy().c_str());
1086  } else {
1087  // We know it can't work, don't even try to download
1088  curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY, "0.0.0.0");
1089  }
1090  }
1091  } // end !sharding
1092 
1093  // Check if metalink and host chains need to be reset
1094  CheckHostInfoReset("metalink", opt_metalink_, info, now);
1095  CheckHostInfoReset("host", opt_metalink_, info, now);
1096 
1097  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, opt_low_speed_limit_);
1098  if (info->proxy() != "DIRECT") {
1099  curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_proxy_);
1100  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_proxy_);
1101  } else {
1102  curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_direct_);
1103  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_direct_);
1104  }
1105  if (!opt_dns_server_.empty())
1106  curl_easy_setopt(curl_handle, CURLOPT_DNS_SERVERS, opt_dns_server_.c_str());
1107 
1108  if (info->probe_hosts()) {
1109  if (CheckMetalinkChain(now)) {
1110  url_prefix = (*opt_metalink_.chain)[opt_metalink_.current];
1111  info->SetCurrentMetalinkChainIndex(opt_metalink_.current);
1113  "(manager %s - id %" PRId64 ") "
1114  "reading from metalink %d",
1115  name_.c_str(), info->id(), opt_metalink_.current);
1116  } else if (opt_host_.chain) {
1117  url_prefix = (*opt_host_.chain)[opt_host_.current];
1118  info->SetCurrentHostChainIndex(opt_host_.current);
1120  "(manager %s - id %" PRId64 ") "
1121  "reading from host %d",
1122  name_.c_str(), info->id(), opt_host_.current);
1123  }
1124  }
1125 
1126  string url = url_prefix + *(info->url());
1127 
1128  curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 1L);
1129  if (url.substr(0, 5) == "https") {
1130  bool rvb = ssl_certificate_store_.ApplySslCertificatePath(curl_handle);
1131  if (!rvb) {
1133  "(manager %s - id %" PRId64 ") "
1134  "Failed to set SSL certificate path %s",
1135  name_.c_str(), info->id(),
1136  ssl_certificate_store_.GetCaPath().c_str());
1137  }
1138  if (info->pid() != -1) {
1139  if (credentials_attachment_ == NULL) {
1141  "(manager %s - id %" PRId64 ") "
1142  "uses secure downloads but no credentials attachment set",
1143  name_.c_str(), info->id());
1144  } else {
1145  bool retval = credentials_attachment_->ConfigureCurlHandle(
1146  curl_handle, info->pid(), info->GetCredDataPtr());
1147  if (!retval) {
1149  "(manager %s - id %" PRId64 ") "
1150  "failed attaching credentials",
1151  name_.c_str(), info->id());
1152  }
1153  }
1154  }
1155  // The download manager disables signal handling in the curl library;
1156  // as OpenSSL's implementation of TLS will generate a sigpipe in some
1157  // error paths, we must explicitly disable SIGPIPE here.
1158  // TODO(jblomer): it should be enough to do this once
1159  signal(SIGPIPE, SIG_IGN);
1160  }
1161 
1162  if (url.find("@proxy@") != string::npos) {
1163  // This is used in Geo-API requests (only), to replace a portion of the
1164  // URL with the current proxy name for the sake of caching the result.
1165  // Replace the @proxy@ either with a passed in "forced" template (which
1166  // is set from $CVMFS_PROXY_TEMPLATE) if there is one, or a "direct"
1167  // template (which is the uuid) if there's no proxy, or the name of the
1168  // proxy.
1169  string replacement;
1170  if (proxy_template_forced_ != "") {
1171  replacement = proxy_template_forced_;
1172  } else if (info->proxy() == "DIRECT") {
1173  replacement = proxy_template_direct_;
1174  } else {
1175  if (opt_proxy_groups_current_ >= opt_proxy_groups_fallback_) {
1176  // It doesn't make sense to use the fallback proxies in Geo-API requests
1177  // since the fallback proxies are supposed to get sorted, too.
1178  info->SetProxy("DIRECT");
1179  curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY, "");
1180  replacement = proxy_template_direct_;
1181  } else {
1182  replacement = ChooseProxyUnlocked(info->expected_hash())->host.name();
1183  }
1184  }
1185  replacement = (replacement == "") ? proxy_template_direct_ : replacement;
1187  "(manager %s - id %" PRId64 ") "
1188  "replacing @proxy@ by %s",
1189  name_.c_str(), info->id(), replacement.c_str());
1190  url = ReplaceAll(url, "@proxy@", replacement);
1191  }
1192 
1193  // TODO(heretherebedragons) before removing
1194  // static_cast<cvmfs::MemSink*>(info->sink)->size() == 0
1195  // and just always call info->sink->Reserve()
1196  // we should do a speed check
1197  if ((info->sink() != NULL) && info->sink()->RequiresReserve()
1198  && (static_cast<cvmfs::MemSink *>(info->sink())->size() == 0)
1199  && HasPrefix(url, "file://", false)) {
1200  platform_stat64 stat_buf;
1201  int retval = platform_stat(url.c_str(), &stat_buf);
1202  if (retval != 0) {
1203  // this is an error: file does not exist or out of memory
1204  // error is caught in other code section.
1205  info->sink()->Reserve(64ul * 1024ul);
1206  } else {
1207  info->sink()->Reserve(stat_buf.st_size);
1208  }
1209  }
1210 
1211  curl_easy_setopt(curl_handle, CURLOPT_URL,
1212  EscapeUrl(info->id(), url).c_str());
1213 }
1214 
1215 
1225 bool DownloadManager::ValidateProxyIpsUnlocked(const string &url,
1226  const dns::Host &host) {
1227  if (!host.IsExpired())
1228  return false;
1229  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s') validate DNS entry for %s",
1230  name_.c_str(), host.name().c_str());
1231 
1232  unsigned group_idx = opt_proxy_groups_current_;
1233  dns::Host new_host = resolver_->Resolve(host.name());
1234 
1235  bool update_only = true; // No changes to the list of IP addresses.
1236  if (new_host.status() != dns::kFailOk) {
1237  // Try again later in case resolving fails.
1239  "(manager '%s') failed to resolve IP addresses for %s (%d - %s)",
1240  name_.c_str(), host.name().c_str(), new_host.status(),
1241  dns::Code2Ascii(new_host.status()));
1242  new_host = dns::Host::ExtendDeadline(host, resolver_->min_ttl());
1243  } else if (!host.IsEquivalent(new_host)) {
1244  update_only = false;
1245  }
1246 
1247  if (update_only) {
1248  for (unsigned i = 0; i < (*opt_proxy_groups_)[group_idx].size(); ++i) {
1249  if ((*opt_proxy_groups_)[group_idx][i].host.id() == host.id())
1250  (*opt_proxy_groups_)[group_idx][i].host = new_host;
1251  }
1252  return false;
1253  }
1254 
1255  assert(new_host.status() == dns::kFailOk);
1256 
1257  // Remove old host objects, insert new objects, and rebalance.
1259  "(manager '%s') DNS entries for proxy %s changed, adjusting",
1260  name_.c_str(), host.name().c_str());
1261  vector<ProxyInfo> *group = current_proxy_group();
1262  opt_num_proxies_ -= group->size();
1263  for (unsigned i = 0; i < group->size();) {
1264  if ((*group)[i].host.id() == host.id()) {
1265  group->erase(group->begin() + i);
1266  } else {
1267  i++;
1268  }
1269  }
1270  vector<ProxyInfo> new_infos;
1271  set<string> best_addresses = new_host.ViewBestAddresses(opt_ip_preference_);
1272  set<string>::const_iterator iter_ips = best_addresses.begin();
1273  for (; iter_ips != best_addresses.end(); ++iter_ips) {
1274  string url_ip = dns::RewriteUrl(url, *iter_ips);
1275  new_infos.push_back(ProxyInfo(new_host, url_ip));
1276  }
1277  group->insert(group->end(), new_infos.begin(), new_infos.end());
1278  opt_num_proxies_ += new_infos.size();
1279 
1280  std::string msg = "DNS entries for proxy " + host.name() + " changed";
1281 
1282  RebalanceProxiesUnlocked(msg);
1283  return true;
1284 }
1285 
1286 
1290 void DownloadManager::UpdateStatistics(CURL *handle) {
1291  double val;
1292  int retval;
1293  int64_t sum = 0;
1294 
1295  retval = curl_easy_getinfo(handle, CURLINFO_SIZE_DOWNLOAD, &val);
1296  assert(retval == CURLE_OK);
1297  sum += static_cast<int64_t>(val);
1298  /*retval = curl_easy_getinfo(handle, CURLINFO_HEADER_SIZE, &val);
1299  assert(retval == CURLE_OK);
1300  sum += static_cast<int64_t>(val);*/
1301  perf::Xadd(counters_->sz_transferred_bytes, sum);
1302 }
1303 
1304 
1308 bool DownloadManager::CanRetry(const JobInfo *info) {
1309  MutexLockGuard m(lock_options_);
1310  unsigned max_retries = opt_max_retries_;
1311 
1312  return !(info->nocache()) && (info->num_retries() < max_retries)
1313  && (IsProxyTransferError(info->error_code())
1314  || IsHostTransferError(info->error_code()));
1315 }
1316 
1324 void DownloadManager::Backoff(JobInfo *info) {
1325  unsigned backoff_init_ms = 0;
1326  unsigned backoff_max_ms = 0;
1327  {
1328  MutexLockGuard m(lock_options_);
1329  backoff_init_ms = opt_backoff_init_ms_;
1330  backoff_max_ms = opt_backoff_max_ms_;
1331  }
1332 
1333  info->SetNumRetries(info->num_retries() + 1);
1334  perf::Inc(counters_->n_retries);
1335  if (info->backoff_ms() == 0) {
1336  info->SetBackoffMs(prng_.Next(backoff_init_ms + 1)); // Must be != 0
1337  } else {
1338  info->SetBackoffMs(info->backoff_ms() * 2);
1339  }
1340  if (info->backoff_ms() > backoff_max_ms) {
1341  info->SetBackoffMs(backoff_max_ms);
1342  }
1343 
1345  "(manager '%s' - id %" PRId64 ") backing off for %d ms",
1346  name_.c_str(), info->id(), info->backoff_ms());
1347  SafeSleepMs(info->backoff_ms());
1348 }
1349 
1350 void DownloadManager::SetNocache(JobInfo *info) {
1351  if (info->nocache())
1352  return;
1353  header_lists_->AppendHeader(info->headers(), "Pragma: no-cache");
1354  header_lists_->AppendHeader(info->headers(), "Cache-Control: no-cache");
1355  curl_easy_setopt(info->curl_handle(), CURLOPT_HTTPHEADER, info->headers());
1356  info->SetNocache(true);
1357 }
1358 
1359 
1364 void DownloadManager::SetRegularCache(JobInfo *info) {
1365  if (info->nocache() == false)
1366  return;
1367  header_lists_->CutHeader("Pragma: no-cache", info->GetHeadersPtr());
1368  header_lists_->CutHeader("Cache-Control: no-cache", info->GetHeadersPtr());
1369  curl_easy_setopt(info->curl_handle(), CURLOPT_HTTPHEADER, info->headers());
1370  info->SetNocache(false);
1371 }
1372 
1373 
1377 void DownloadManager::ReleaseCredential(JobInfo *info) {
1378  if (info->cred_data()) {
1379  assert(credentials_attachment_ != NULL); // Someone must have set it
1380  credentials_attachment_->ReleaseCurlHandle(info->curl_handle(),
1381  info->cred_data());
1382  info->SetCredData(NULL);
1383  }
1384 }
1385 
1386 
1387 /* Sort links based on the "pri=" parameter */
1388 static bool sortlinks(const std::string &s1, const std::string &s2) {
1389  const size_t pos1 = s1.find("; pri=");
1390  const size_t pos2 = s2.find("; pri=");
1391  int pri1, pri2;
1392  if ((pos1 != std::string::npos) && (pos2 != std::string::npos)
1393  && (sscanf(s1.substr(pos1 + 6).c_str(), "%d", &pri1) == 1)
1394  && (sscanf(s2.substr(pos2 + 6).c_str(), "%d", &pri2) == 1)) {
1395  return pri1 < pri2;
1396  }
1397  return false;
1398 }
1399 
1404 void DownloadManager::ProcessLink(JobInfo *info) {
1405  std::vector<std::string> links = SplitString(info->link(), ',');
1406  if (info->link().find("; pri=") != std::string::npos)
1407  std::sort(links.begin(), links.end(), sortlinks);
1408 
1409  std::vector<std::string> host_list;
1410 
1411  std::vector<std::string>::const_iterator il = links.begin();
1412  for (; il != links.end(); ++il) {
1413  const std::string &link = *il;
1414  if ((link.find("; rel=duplicate") == std::string::npos)
1415  && (link.find("; rel=\"duplicate\"") == std::string::npos)) {
1417  "skipping link '%s' because it does not contain rel=duplicate",
1418  link.c_str());
1419  continue;
1420  }
1421  // ignore depth= field since there's nothing useful we can do with it
1422 
1423  size_t start = link.find('<');
1424  if (start == std::string::npos) {
1425  LogCvmfs(
1427  "skipping link '%s' because it does not have a left angle bracket",
1428  link.c_str());
1429  continue;
1430  }
1431 
1432  start++;
1433  if ((link.substr(start, 7) != "http://")
1434  && (link.substr(start, 8) != "https://")) {
1436  "skipping link '%s' of unrecognized url protocol", link.c_str());
1437  continue;
1438  }
1439 
1440  size_t end = link.find('/', start + 8);
1441  if (end == std::string::npos)
1442  end = link.find('>');
1443  if (end == std::string::npos) {
1445  "skipping link '%s' because no slash in url and no right angle "
1446  "bracket",
1447  link.c_str());
1448  continue;
1449  }
1450  const std::string host = link.substr(start, end - start);
1451  LogCvmfs(kLogDownload, kLogDebug, "adding linked host '%s'", host.c_str());
1452  host_list.push_back(host);
1453  }
1454 
1455  if (host_list.size() > 0) {
1456  SetHostChain(host_list);
1457  opt_metalink_timestamp_link_ = time(NULL);
1458  }
1459 }
1460 
1461 
1468 bool DownloadManager::VerifyAndFinalize(const int curl_error, JobInfo *info) {
1470  "(manager '%s' - id %" PRId64 ") "
1471  "Verify downloaded url %s, proxy %s (curl error %d)",
1472  name_.c_str(), info->id(), info->url()->c_str(),
1473  info->proxy().c_str(), curl_error);
1474  UpdateStatistics(info->curl_handle());
1475 
1476  bool was_metalink;
1477  std::string typ;
1478  if (info->current_metalink_chain_index() >= 0) {
1479  was_metalink = true;
1480  typ = "metalink";
1481  if (info->link() != "") {
1482  // process Link header whether or not the redirected URL got an error
1483  ProcessLink(info);
1484  }
1485  } else {
1486  was_metalink = false;
1487  typ = "host";
1488  }
1489 
1490 
1491  // Verification and error classification
1492  switch (curl_error) {
1493  case CURLE_OK:
1494  // Verify content hash
1495  if (info->expected_hash()) {
1496  shash::Any match_hash;
1497  shash::Final(info->hash_context(), &match_hash);
1498  if (match_hash != *(info->expected_hash())) {
1499  if (ignore_signature_failures_) {
1500  LogCvmfs(
1502  "(manager '%s' - id %" PRId64 ") "
1503  "ignoring failed hash verification of %s (expected %s, got %s)",
1504  name_.c_str(), info->id(), info->url()->c_str(),
1505  info->expected_hash()->ToString().c_str(),
1506  match_hash.ToString().c_str());
1507  } else {
1509  "(manager '%s' - id %" PRId64 ") "
1510  "hash verification of %s failed (expected %s, got %s)",
1511  name_.c_str(), info->id(), info->url()->c_str(),
1512  info->expected_hash()->ToString().c_str(),
1513  match_hash.ToString().c_str());
1514  info->SetErrorCode(kFailBadData);
1515  break;
1516  }
1517  }
1518  }
1519 
1520  info->SetErrorCode(kFailOk);
1521  break;
1522  case CURLE_UNSUPPORTED_PROTOCOL:
1524  break;
1525  case CURLE_URL_MALFORMAT:
1526  info->SetErrorCode(kFailBadUrl);
1527  break;
1528  case CURLE_COULDNT_RESOLVE_PROXY:
1530  break;
1531  case CURLE_COULDNT_RESOLVE_HOST:
1533  break;
1534  case CURLE_OPERATION_TIMEDOUT:
1535  info->SetErrorCode((info->proxy() == "DIRECT") ? kFailHostTooSlow
1536  : kFailProxyTooSlow);
1537  break;
1538  case CURLE_PARTIAL_FILE:
1539  case CURLE_GOT_NOTHING:
1540  case CURLE_RECV_ERROR:
1541  info->SetErrorCode((info->proxy() == "DIRECT") ? kFailHostShortTransfer
1543  break;
1544  case CURLE_FILE_COULDNT_READ_FILE:
1545  case CURLE_COULDNT_CONNECT:
1546  if (info->proxy() != "DIRECT") {
1547  // This is a guess. Fail-over can still change to switching host
1549  } else {
1551  }
1552  break;
1553  case CURLE_TOO_MANY_REDIRECTS:
1555  break;
1556  case CURLE_SSL_CACERT_BADFILE:
1558  "(manager '%s' -id %" PRId64 ") "
1559  "Failed to load certificate bundle. "
1560  "X509_CERT_BUNDLE might point to the wrong location.",
1561  name_.c_str(), info->id());
1563  break;
1564  // As of curl 7.62.0, CURLE_SSL_CACERT is the same as
1565  // CURLE_PEER_FAILED_VERIFICATION
1566  case CURLE_PEER_FAILED_VERIFICATION:
1568  "(manager '%s' - id %" PRId64 ") "
1569  "invalid SSL certificate of remote host. "
1570  "X509_CERT_DIR and/or X509_CERT_BUNDLE might point to the wrong "
1571  "location.",
1572  name_.c_str(), info->id());
1574  break;
1575  case CURLE_ABORTED_BY_CALLBACK:
1576  case CURLE_WRITE_ERROR:
1577  // Error set by callback
1578  break;
1579  case CURLE_SEND_ERROR:
1580  // The curl error CURLE_SEND_ERROR can be seen when a cache is misbehaving
1581  // and closing connections before the http request send is completed.
1582  // Handle this error, treating it as a short transfer error.
1583  info->SetErrorCode((info->proxy() == "DIRECT") ? kFailHostShortTransfer
1584  : kFailProxyShortTransfer);
1585  break;
1586  default:
1588  "(manager '%s' - id %" PRId64 ") "
1589  "unexpected curl error (%d) while trying to fetch %s",
1590  name_.c_str(), info->id(), curl_error, info->url()->c_str());
1591  info->SetErrorCode(kFailOther);
1592  break;
1593  }
1594 
1595  std::vector<std::string> *host_chain;
1596  unsigned char num_used_hosts;
1597  if (was_metalink) {
1598  host_chain = opt_metalink_.chain;
1599  num_used_hosts = info->num_used_metalinks();
1600  } else {
1601  host_chain = opt_host_.chain;
1602  num_used_hosts = info->num_used_hosts();
1603  }
1604 
1605  // Determination if download should be repeated
1606  bool try_again = false;
1607  bool same_url_retry = CanRetry(info);
1608  if (info->error_code() != kFailOk) {
1609  MutexLockGuard m(lock_options_);
1610  if (info->error_code() == kFailBadData) {
1611  if (!info->nocache()) {
1612  try_again = true;
1613  } else {
1614  // Make it a host failure
1616  "(manager '%s' - id %" PRId64 ") "
1617  "data corruption with no-cache header, try another %s",
1618  name_.c_str(), info->id(), typ.c_str());
1619 
1620  info->SetErrorCode(kFailHostHttp);
1621  }
1622  }
1623  if (same_url_retry
1624  || (((info->error_code() == kFailHostResolve)
1625  || IsHostTransferError(info->error_code())
1626  || (info->error_code() == kFailHostHttp))
1627  && info->probe_hosts() && host_chain
1628  && (num_used_hosts < host_chain->size()))) {
1629  try_again = true;
1630  }
1631  if (same_url_retry
1632  || (((info->error_code() == kFailProxyResolve)
1633  || IsProxyTransferError(info->error_code())
1634  || (info->error_code() == kFailProxyHttp)))) {
1635  if (sharding_policy_.UseCount() > 0) { // sharding policy
1636  try_again = true;
1637  same_url_retry = false;
1638  } else { // no sharding
1639  try_again = true;
1640  // If all proxies failed, do a next round with the next host
1641  if (!same_url_retry && (info->num_used_proxies() >= opt_num_proxies_)) {
1642  // Check if this can be made a host fail-over
1643  if (info->probe_hosts() && host_chain
1644  && (num_used_hosts < host_chain->size())) {
1645  // reset proxy group if not already performed by other handle
1646  if (opt_proxy_groups_) {
1647  if ((opt_proxy_groups_current_ > 0)
1648  || (opt_proxy_groups_current_burned_ > 0)) {
1649  opt_proxy_groups_current_ = 0;
1650  opt_timestamp_backup_proxies_ = 0;
1651  const std::string msg = "reset proxies for " + typ
1652  + " failover";
1653  RebalanceProxiesUnlocked(msg);
1654  }
1655  }
1656 
1657  // Make it a host failure
1659  "(manager '%s' - id %" PRId64 ") make it a %s failure",
1660  name_.c_str(), info->id(), typ.c_str());
1661  info->SetNumUsedProxies(1);
1663  } else {
1664  if (failover_indefinitely_) {
1665  // Instead of giving up, reset the num_used_proxies counter,
1666  // switch proxy and try again
1668  "(manager '%s' - id %" PRId64 ") "
1669  "VerifyAndFinalize() would fail the download here. "
1670  "Instead switch proxy and retry download. "
1671  "typ=%s "
1672  "info->probe_hosts=%d host_chain=%p num_used_hosts=%d "
1673  "host_chain->size()=%lu same_url_retry=%d "
1674  "info->num_used_proxies=%d opt_num_proxies_=%d",
1675  name_.c_str(), info->id(), typ.c_str(),
1676  static_cast<int>(info->probe_hosts()), host_chain,
1677  num_used_hosts, host_chain ? host_chain->size() : -1,
1678  static_cast<int>(same_url_retry),
1679  info->num_used_proxies(), opt_num_proxies_);
1680  info->SetNumUsedProxies(1);
1681  RebalanceProxiesUnlocked(
1682  "download failed - failover indefinitely");
1683  try_again = !Interrupted(fqrn_, info);
1684  } else {
1685  try_again = false;
1686  }
1687  }
1688  } // Make a proxy failure a host failure
1689  } // Proxy failure assumed
1690  } // end !sharding
1691  }
1692 
1693  if (try_again) {
1695  "(manager '%s' - id %" PRId64 ") "
1696  "Trying again on same curl handle, same url: %d, "
1697  "error code %d no-cache %d",
1698  name_.c_str(), info->id(), same_url_retry, info->error_code(),
1699  info->nocache());
1700  // Reset internal state and destination
1701  if (info->sink() != NULL && info->sink()->Reset() != 0) {
1702  info->SetErrorCode(kFailLocalIO);
1703  goto verify_and_finalize_stop;
1704  }
1705  if (info->interrupt_cue() && info->interrupt_cue()->IsCanceled()) {
1706  info->SetErrorCode(kFailCanceled);
1707  goto verify_and_finalize_stop;
1708  }
1709 
1710  if (info->expected_hash()) {
1711  shash::Init(info->hash_context());
1712  }
1713  if (info->compressed()) {
1715  }
1716 
1717  if (sharding_policy_.UseCount() > 0) { // sharding policy
1718  ReleaseCredential(info);
1719  SetUrlOptions(info);
1720  } else { // no sharding policy
1721  SetRegularCache(info);
1722 
1723  // Failure handling
1724  bool switch_proxy = false;
1725  bool switch_host = false;
1726  switch (info->error_code()) {
1727  case kFailBadData:
1728  SetNocache(info);
1729  break;
1730  case kFailProxyResolve:
1731  case kFailProxyHttp:
1732  switch_proxy = true;
1733  break;
1734  case kFailHostResolve:
1735  case kFailHostHttp:
1736  case kFailHostAfterProxy:
1737  switch_host = true;
1738  break;
1739  default:
1740  if (IsProxyTransferError(info->error_code())) {
1741  if (same_url_retry) {
1742  Backoff(info);
1743  } else {
1744  switch_proxy = true;
1745  }
1746  } else if (IsHostTransferError(info->error_code())) {
1747  if (same_url_retry) {
1748  Backoff(info);
1749  } else {
1750  switch_host = true;
1751  }
1752  } else {
1753  // No other errors expected when retrying
1754  PANIC(NULL);
1755  }
1756  }
1757  if (switch_proxy) {
1758  ReleaseCredential(info);
1759  SwitchProxy(info);
1760  info->SetNumUsedProxies(info->num_used_proxies() + 1);
1761  SetUrlOptions(info);
1762  }
1763  if (switch_host) {
1764  ReleaseCredential(info);
1765  if (was_metalink) {
1766  SwitchMetalink(info);
1767  info->SetNumUsedMetalinks(num_used_hosts + 1);
1768  } else {
1769  SwitchHost(info);
1770  info->SetNumUsedHosts(num_used_hosts + 1);
1771  }
1772  SetUrlOptions(info);
1773  }
1774  } // end !sharding
1775 
1776  if (failover_indefinitely_) {
1777  // try again, breaking if there's a cvmfs reload happening and we are in a
1778  // proxy failover. This will EIO the call application.
1779  return !Interrupted(fqrn_, info);
1780  }
1781  return true; // try again
1782  }
1783 
1784 verify_and_finalize_stop:
1785  // Finalize, flush destination file
1786  ReleaseCredential(info);
1787  if (info->sink() != NULL && info->sink()->Flush() != 0) {
1788  info->SetErrorCode(kFailLocalIO);
1789  }
1790 
1791  if (info->compressed())
1793 
1794  if (info->headers()) {
1795  header_lists_->PutList(info->headers());
1796  info->SetHeaders(NULL);
1797  }
1798 
1799  return false; // stop transfer and return to Fetch()
1800 }
1801 
1802 DownloadManager::~DownloadManager() {
1803  // cleaned up fini
1804  if (sharding_policy_.UseCount() > 0) {
1805  sharding_policy_.Reset();
1806  }
1807  if (health_check_.UseCount() > 0) {
1808  if (health_check_.Unique()) {
1810  "(manager '%s') Stopping healthcheck thread", name_.c_str());
1811  health_check_->StopHealthcheck();
1812  }
1813  health_check_.Reset();
1814  }
1815 
1816  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1817  // Shutdown I/O thread
1818  pipe_terminate_->Write(kPipeTerminateSignal);
1819  pthread_join(thread_download_, NULL);
1820  // All handles are removed from the multi stack
1821  pipe_terminate_.Destroy();
1822  pipe_jobs_.Destroy();
1823  }
1824 
1825  for (set<CURL *>::iterator i = pool_handles_idle_->begin(),
1826  iEnd = pool_handles_idle_->end();
1827  i != iEnd;
1828  ++i) {
1829  curl_easy_cleanup(*i);
1830  }
1831 
1832  delete pool_handles_idle_;
1833  delete pool_handles_inuse_;
1834  curl_multi_cleanup(curl_multi_);
1835 
1836  delete header_lists_;
1837  if (user_agent_)
1838  free(user_agent_);
1839 
1840  delete counters_;
1841  delete opt_host_.chain;
1842  delete opt_host_chain_rtt_;
1843  delete opt_proxy_groups_;
1844 
1845  curl_global_cleanup();
1846  delete resolver_;
1847 
1848  // old destructor
1849  pthread_mutex_destroy(lock_options_);
1850  pthread_mutex_destroy(lock_synchronous_mode_);
1851  free(lock_options_);
1852  free(lock_synchronous_mode_);
1853 }
1854 
1855 void DownloadManager::InitHeaders() {
1856  // User-Agent
1857  string cernvm_id = "User-Agent: cvmfs ";
1858 #ifdef CVMFS_LIBCVMFS
1859  cernvm_id += "libcvmfs ";
1860 #else
1861  cernvm_id += "Fuse ";
1862 #endif
1863  cernvm_id += string(CVMFS_VERSION);
1864  if (getenv("CERNVM_UUID") != NULL) {
1865  cernvm_id += " "
1866  + sanitizer::InputSanitizer("az AZ 09 -")
1867  .Filter(getenv("CERNVM_UUID"));
1868  }
1869  user_agent_ = strdup(cernvm_id.c_str());
1870 
1871  header_lists_ = new HeaderLists();
1872 
1873  default_headers_ = header_lists_->GetList("Connection: Keep-Alive");
1874  header_lists_->AppendHeader(default_headers_, "Pragma:");
1875  header_lists_->AppendHeader(default_headers_, user_agent_);
1876 }
1877 
1878 DownloadManager::DownloadManager(const unsigned max_pool_handles,
1879  const perf::StatisticsTemplate &statistics,
1880  const std::string &name)
1881  : prng_(Prng())
1882  , pool_handles_idle_(new set<CURL *>)
1883  , pool_handles_inuse_(new set<CURL *>)
1884  , pool_max_handles_(max_pool_handles)
1885  , pipe_terminate_(NULL)
1886  , pipe_jobs_(NULL)
1887  , watch_fds_(NULL)
1888  , watch_fds_size_(0)
1889  , watch_fds_inuse_(0)
1890  , watch_fds_max_(4 * max_pool_handles)
1891  , opt_timeout_proxy_(5)
1892  , opt_timeout_direct_(10)
1893  , opt_low_speed_limit_(1024)
1894  , opt_max_retries_(0)
1895  , opt_backoff_init_ms_(0)
1896  , opt_backoff_max_ms_(0)
1897  , enable_info_header_(false)
1898  , opt_ipv4_only_(false)
1899  , follow_redirects_(false)
1900  , ignore_signature_failures_(false)
1901  , enable_http_tracing_(false)
1902  , opt_metalink_(NULL, 0, 0, 0)
1903  , opt_metalink_timestamp_link_(0)
1904  , opt_host_(NULL, 0, 0, 0)
1905  , opt_host_chain_rtt_(NULL)
1906  , opt_proxy_groups_(NULL)
1907  , opt_proxy_groups_current_(0)
1908  , opt_proxy_groups_current_burned_(0)
1909  , opt_proxy_groups_fallback_(0)
1910  , opt_num_proxies_(0)
1911  , opt_proxy_shard_(false)
1912  , failover_indefinitely_(false)
1913  , name_(name)
1914  , opt_ip_preference_(dns::kIpPreferSystem)
1915  , opt_timestamp_backup_proxies_(0)
1916  , opt_timestamp_failover_proxies_(0)
1917  , opt_proxy_groups_reset_after_(0)
1918  , credentials_attachment_(NULL)
1919  , counters_(new Counters(statistics)) {
1920  atomic_init32(&multi_threaded_);
1921 
1922  lock_options_ = reinterpret_cast<pthread_mutex_t *>(
1923  smalloc(sizeof(pthread_mutex_t)));
1924  int retval = pthread_mutex_init(lock_options_, NULL);
1925  assert(retval == 0);
1926  lock_synchronous_mode_ = reinterpret_cast<pthread_mutex_t *>(
1927  smalloc(sizeof(pthread_mutex_t)));
1928  retval = pthread_mutex_init(lock_synchronous_mode_, NULL);
1929  assert(retval == 0);
1930 
1931  retval = curl_global_init(CURL_GLOBAL_ALL);
1932  assert(retval == CURLE_OK);
1933 
1934  InitHeaders();
1935 
1936  curl_multi_ = curl_multi_init();
1937  assert(curl_multi_ != NULL);
1938  curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION, CallbackCurlSocket);
1939  curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1940  static_cast<void *>(this));
1941  curl_multi_setopt(curl_multi_, CURLMOPT_MAXCONNECTS, watch_fds_max_);
1942  curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1944 
1945  prng_.InitLocaltime();
1946 
1947  // Name resolving
1948  if ((getenv("CVMFS_IPV4_ONLY") != NULL)
1949  && (strlen(getenv("CVMFS_IPV4_ONLY")) > 0)) {
1950  opt_ipv4_only_ = true;
1951  }
1954  assert(resolver_);
1955 }
1956 
1964 
1965  int retval = pthread_create(&thread_download_, NULL, MainDownload,
1966  static_cast<void *>(this));
1967  assert(retval == 0);
1968 
1969  atomic_inc32(&multi_threaded_);
1970 
1971  if (health_check_.UseCount() > 0) {
1973  "(manager '%s') Starting healthcheck thread", name_.c_str());
1974  health_check_->StartHealthcheck();
1975  }
1976 }
1977 
1978 
1983  assert(info != NULL);
1984  assert(info->url() != NULL);
1985 
1986  Failures result;
1987  result = PrepareDownloadDestination(info);
1988  if (result != kFailOk)
1989  return result;
1990 
1991  if (info->expected_hash()) {
1994  info->GetHashContextPtr()->size = shash::GetContextSize(algorithm);
1995  info->GetHashContextPtr()->buffer = alloca(info->hash_context().size);
1996  }
1997 
1998  // In case JobInfo object is being reused
1999  info->SetLink("");
2000 
2001  // Prepare cvmfs-info: header, allocate string on the stack
2002  info->SetInfoHeader(NULL);
2003  if (enable_info_header_ && info->extra_info()) {
2004  const char *header_name = "cvmfs-info: ";
2005  const size_t header_name_len = strlen(header_name);
2006  const unsigned header_size = 1 + header_name_len
2007  + EscapeHeader(*(info->extra_info()), NULL, 0);
2008  info->SetInfoHeader(static_cast<char *>(alloca(header_size)));
2009  memcpy(info->info_header(), header_name, header_name_len);
2010  EscapeHeader(*(info->extra_info()), info->info_header() + header_name_len,
2011  header_size - header_name_len);
2012  info->info_header()[header_size - 1] = '\0';
2013  }
2014 
2015  if (enable_http_tracing_) {
2016  const std::string str_pid = "X-CVMFS-PID: " + StringifyInt(info->pid());
2017  const std::string str_gid = "X-CVMFS-GID: " + StringifyUint(info->gid());
2018  const std::string str_uid = "X-CVMFS-UID: " + StringifyUint(info->uid());
2019 
2020  // will be auto freed at the end of this function Fetch(JobInfo *info)
2021  info->SetTracingHeaderPid(static_cast<char *>(alloca(str_pid.size() + 1)));
2022  info->SetTracingHeaderGid(static_cast<char *>(alloca(str_gid.size() + 1)));
2023  info->SetTracingHeaderUid(static_cast<char *>(alloca(str_uid.size() + 1)));
2024 
2025  memcpy(info->tracing_header_pid(), str_pid.c_str(), str_pid.size() + 1);
2026  memcpy(info->tracing_header_gid(), str_gid.c_str(), str_gid.size() + 1);
2027  memcpy(info->tracing_header_uid(), str_uid.c_str(), str_uid.size() + 1);
2028  }
2029 
2030  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
2031  if (!info->IsValidPipeJobResults()) {
2032  info->CreatePipeJobResults();
2033  }
2034  if (!info->IsValidDataTube()) {
2035  info->CreateDataTube();
2036  }
2037 
2038  // LogCvmfs(kLogDownload, kLogDebug, "send job to thread, pipe %d %d",
2039  // info->wait_at[0], info->wait_at[1]);
2040  pipe_jobs_->Write<JobInfo *>(info);
2041 
2042  do {
2043  DataTubeElement *ele = info->GetDataTubePtr()->PopFront();
2044 
2045  if (ele->action == kActionStop) {
2046  delete ele;
2047  break;
2048  }
2049  // TODO(heretherebedragons) add compression
2050  } while (true);
2051 
2052  info->GetPipeJobResultPtr()->Read<download::Failures>(&result);
2053  // LogCvmfs(kLogDownload, kLogDebug, "got result %d", result);
2054  } else {
2056  CURL *handle = AcquireCurlHandle();
2057  InitializeRequest(info, handle);
2058  SetUrlOptions(info);
2059  // curl_easy_setopt(handle, CURLOPT_VERBOSE, 1);
2060  int retval;
2061  do {
2062  retval = curl_easy_perform(handle);
2064  double elapsed;
2065  if (curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &elapsed)
2066  == CURLE_OK) {
2068  static_cast<int64_t>(elapsed * 1000));
2069  }
2070  } while (VerifyAndFinalize(retval, info));
2071  result = info->error_code();
2072  ReleaseCurlHandle(info->curl_handle());
2073  }
2074 
2075  if (result != kFailOk) {
2077  "(manager '%s' - id %" PRId64 ") "
2078  "download failed (error %d - %s)",
2079  name_.c_str(), info->id(), result, Code2Ascii(result));
2080 
2081  if (info->sink() != NULL) {
2082  info->sink()->Purge();
2083  }
2084  }
2085 
2086  return result;
2087 }
2088 
2089 
2097 }
2098 
2102 std::string DownloadManager::GetDnsServer() const { return opt_dns_server_; }
2103 
2108 void DownloadManager::SetDnsServer(const string &address) {
2109  if (!address.empty()) {
2111  opt_dns_server_ = address;
2112  assert(!opt_dns_server_.empty());
2113 
2114  vector<string> servers;
2115  servers.push_back(address);
2116  bool retval = resolver_->SetResolvers(servers);
2117  assert(retval);
2118  }
2119  LogCvmfs(kLogDownload, kLogSyslog, "(manager '%s') set nameserver to %s",
2120  name_.c_str(), address.c_str());
2121 }
2122 
2123 
2128  const unsigned timeout_ms) {
2130  if ((resolver_->retries() == retries)
2131  && (resolver_->timeout_ms() == timeout_ms)) {
2132  return;
2133  }
2134  delete resolver_;
2135  resolver_ = NULL;
2136  resolver_ = dns::NormalResolver::Create(opt_ipv4_only_, retries, timeout_ms);
2137  assert(resolver_);
2138 }
2139 
2140 
2141 void DownloadManager::SetDnsTtlLimits(const unsigned min_seconds,
2142  const unsigned max_seconds) {
2144  resolver_->set_min_ttl(min_seconds);
2145  resolver_->set_max_ttl(max_seconds);
2146 }
2147 
2148 
2151  opt_ip_preference_ = preference;
2152 }
2153 
2154 
2160 void DownloadManager::SetTimeout(const unsigned seconds_proxy,
2161  const unsigned seconds_direct) {
2163  opt_timeout_proxy_ = seconds_proxy;
2164  opt_timeout_direct_ = seconds_direct;
2165 }
2166 
2167 
2173 void DownloadManager::SetLowSpeedLimit(const unsigned low_speed_limit) {
2175  opt_low_speed_limit_ = low_speed_limit;
2176 }
2177 
2178 
2182 void DownloadManager::GetTimeout(unsigned *seconds_proxy,
2183  unsigned *seconds_direct) {
2185  *seconds_proxy = opt_timeout_proxy_;
2186  *seconds_direct = opt_timeout_direct_;
2187 }
2188 
2189 
2194 void DownloadManager::SetMetalinkChain(const string &metalink_list) {
2195  SetMetalinkChain(SplitString(metalink_list, ';'));
2196 }
2197 
2198 
2200  const std::vector<std::string> &metalink_list) {
2201  const MutexLockGuard m(lock_options_);
2203  delete opt_metalink_.chain;
2204  opt_metalink_.current = 0;
2205 
2206  if (metalink_list.empty()) {
2207  opt_metalink_.chain = NULL;
2208  return;
2209  }
2210 
2211  opt_metalink_.chain = new vector<string>(metalink_list);
2212 }
2213 
2214 
2219 void DownloadManager::GetMetalinkInfo(vector<string> *metalink_chain,
2220  unsigned *current_metalink) {
2221  const MutexLockGuard m(lock_options_);
2222  if (opt_metalink_.chain) {
2223  if (current_metalink) {
2224  *current_metalink = opt_metalink_.current;
2225  }
2226  if (metalink_chain) {
2227  *metalink_chain = *opt_metalink_.chain;
2228  }
2229  }
2230 }
2231 
2232 
2237 void DownloadManager::SetHostChain(const string &host_list) {
2238  SetHostChain(SplitString(host_list, ';'));
2239 }
2240 
2241 
2242 void DownloadManager::SetHostChain(const std::vector<std::string> &host_list) {
2245  delete opt_host_.chain;
2246  delete opt_host_chain_rtt_;
2247  opt_host_.current = 0;
2248 
2249  if (host_list.empty()) {
2250  opt_host_.chain = NULL;
2251  opt_host_chain_rtt_ = NULL;
2252  return;
2253  }
2254 
2255  opt_host_.chain = new vector<string>(host_list);
2256  opt_host_chain_rtt_ = new vector<int>(opt_host_.chain->size(),
2257  kProbeUnprobed);
2258  // LogCvmfs(kLogDownload, kLogSyslog, "using host %s",
2259  // (*opt_host_.chain)[0].c_str());
2260 }
2261 
2262 
2267 void DownloadManager::GetHostInfo(vector<string> *host_chain, vector<int> *rtt,
2268  unsigned *current_host) {
2270  if (opt_host_.chain) {
2271  if (current_host) {
2272  *current_host = opt_host_.current;
2273  }
2274  if (host_chain) {
2275  *host_chain = *opt_host_.chain;
2276  }
2277  if (rtt) {
2278  *rtt = *opt_host_chain_rtt_;
2279  }
2280  }
2281 }
2282 
2283 
2294 
2295  if (!opt_proxy_groups_) {
2296  return;
2297  }
2298 
2299  // Fail any matching proxies within the current load-balancing group
2300  vector<ProxyInfo> *group = current_proxy_group();
2301  const unsigned group_size = group->size();
2302  unsigned failed = 0;
2303  for (unsigned i = 0; i < group_size - opt_proxy_groups_current_burned_; ++i) {
2304  if (info && (info->proxy() == (*group)[i].url)) {
2305  // Move to list of failed proxies
2306  opt_proxy_groups_current_burned_++;
2307  swap((*group)[i],
2308  (*group)[group_size - opt_proxy_groups_current_burned_]);
2310  failed++;
2311  }
2312  }
2313 
2314  // Do nothing more unless at least one proxy was marked as failed
2315  if (!failed)
2316  return;
2317 
2318  // If all proxies from the current load-balancing group are burned, switch to
2319  // another group
2320  if (opt_proxy_groups_current_burned_ == group->size()) {
2321  opt_proxy_groups_current_burned_ = 0;
2322  if (opt_proxy_groups_->size() > 1) {
2324  % opt_proxy_groups_->size();
2325  // Remember the timestamp of switching to backup proxies
2327  if (opt_proxy_groups_current_ > 0) {
2329  opt_timestamp_backup_proxies_ = time(NULL);
2330  // LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2331  // "switched to (another) backup proxy group");
2332  } else {
2334  // LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2335  // "switched back to primary proxy group");
2336  }
2338  }
2339  }
2340  } else {
2341  // Record failover time
2344  opt_timestamp_failover_proxies_ = time(NULL);
2345  }
2346  }
2347 
2348  UpdateProxiesUnlocked("failed proxy");
2350  "(manager '%s' - id %" PRId64 ") "
2351  "%lu proxies remain in group",
2352  name_.c_str(), info->id(),
2354 }
2355 
2356 
2362 void DownloadManager::SwitchHostInfo(const std::string &typ,
2363  HostInfo &info,
2364  JobInfo *jobinfo) {
2366 
2367  if (!info.chain || (info.chain->size() == 1)) {
2368  return;
2369  }
2370 
2371  if (jobinfo) {
2372  int lastused;
2373  if (typ == "host") {
2374  lastused = jobinfo->current_host_chain_index();
2375  } else {
2376  lastused = jobinfo->current_metalink_chain_index();
2377  }
2378  if (lastused != info.current) {
2380  "(manager '%s' - id %" PRId64 ")"
2381  "don't switch %s, "
2382  "last used %s: %s, current %s: %s",
2383  name_.c_str(), jobinfo->id(), typ.c_str(), typ.c_str(),
2384  (*info.chain)[lastused].c_str(), typ.c_str(),
2385  (*info.chain)[info.current].c_str());
2386  return;
2387  }
2388  }
2389 
2390  string reason = "manually triggered";
2391  string info_id = "(manager " + name_;
2392  if (jobinfo) {
2393  reason = download::Code2Ascii(jobinfo->error_code());
2394  info_id = " - id " + StringifyInt(jobinfo->id());
2395  }
2396  info_id += ")";
2397 
2398  const std::string old_host = (*info.chain)[info.current];
2399  info.current = (info.current + 1) % static_cast<int>(info.chain->size());
2400  if (typ == "host") {
2402  } else {
2404  }
2406  "%s switching %s from %s to %s (%s)", info_id.c_str(), typ.c_str(),
2407  old_host.c_str(), (*info.chain)[info.current].c_str(),
2408  reason.c_str());
2409 
2410  // Remember the timestamp of switching to backup host
2411  if (info.reset_after > 0) {
2412  if (info.current != 0) {
2413  if (info.timestamp_backup == 0)
2414  info.timestamp_backup = time(NULL);
2415  } else {
2416  info.timestamp_backup = 0;
2417  }
2418  }
2419 }
2420 
2422  SwitchHostInfo("host", opt_host_, info);
2423 }
2424 
2426 
2427 
2429  SwitchHostInfo("metalink", opt_metalink_, info);
2430 }
2431 
2432 
2434 
2436  return (opt_metalink_.chain
2437  && ((opt_metalink_timestamp_link_ == 0)
2438  || (static_cast<int64_t>((now == 0) ? time(NULL) : now)
2439  > static_cast<int64_t>(opt_metalink_timestamp_link_
2440  + opt_metalink_.reset_after))));
2441 }
2442 
2443 
2451  vector<string> host_chain;
2452  vector<int> host_rtt;
2453  unsigned current_host;
2454 
2455  GetHostInfo(&host_chain, &host_rtt, &current_host);
2456 
2457  // Stopwatch, two times to fill caches first
2458  unsigned i, retries;
2459  string url;
2460 
2461  cvmfs::MemSink memsink;
2462  JobInfo info(&url, false, false, NULL, &memsink);
2463  for (retries = 0; retries < 2; ++retries) {
2464  for (i = 0; i < host_chain.size(); ++i) {
2465  url = host_chain[i] + "/.cvmfspublished";
2466 
2467  struct timeval tv_start, tv_end;
2468  gettimeofday(&tv_start, NULL);
2469  Failures result = Fetch(&info);
2470  gettimeofday(&tv_end, NULL);
2471  memsink.Reset();
2472  if (result == kFailOk) {
2473  host_rtt[i] = static_cast<int>(DiffTimeSeconds(tv_start, tv_end)
2474  * 1000);
2476  "(manager '%s' - id %" PRId64 ") "
2477  "probing host %s had %dms rtt",
2478  name_.c_str(), info.id(), url.c_str(), host_rtt[i]);
2479  } else {
2481  "(manager '%s' - id %" PRId64 ") "
2482  "error while probing host %s: %d %s",
2483  name_.c_str(), info.id(), url.c_str(), result,
2484  Code2Ascii(result));
2485  host_rtt[i] = INT_MAX;
2486  }
2487  }
2488  }
2489 
2490  SortTeam(&host_rtt, &host_chain);
2491  for (i = 0; i < host_chain.size(); ++i) {
2492  if (host_rtt[i] == INT_MAX)
2493  host_rtt[i] = kProbeDown;
2494  }
2495 
2497  delete opt_host_.chain;
2498  delete opt_host_chain_rtt_;
2499  opt_host_.chain = new vector<string>(host_chain);
2500  opt_host_chain_rtt_ = new vector<int>(host_rtt);
2501  opt_host_.current = 0;
2502 }
2503 
2504 bool DownloadManager::GeoSortServers(std::vector<std::string> *servers,
2505  std::vector<uint64_t> *output_order) {
2506  if (!servers) {
2507  return false;
2508  }
2509  if (servers->size() == 1) {
2510  if (output_order) {
2511  output_order->clear();
2512  output_order->push_back(0);
2513  }
2514  return true;
2515  }
2516 
2517  std::vector<std::string> host_chain;
2518  GetHostInfo(&host_chain, NULL, NULL);
2519 
2520  std::vector<std::string> server_dns_names;
2521  server_dns_names.reserve(servers->size());
2522  for (unsigned i = 0; i < servers->size(); ++i) {
2523  std::string host = dns::ExtractHost((*servers)[i]);
2524  server_dns_names.push_back(host.empty() ? (*servers)[i] : host);
2525  }
2526  std::string host_list = JoinStrings(server_dns_names, ",");
2527 
2528  vector<string> host_chain_shuffled;
2529  {
2530  // Protect against concurrent access to prng_
2532  // Determine random hosts for the Geo-API query
2533  host_chain_shuffled = Shuffle(host_chain, &prng_);
2534  }
2535  // Request ordered list via Geo-API
2536  bool success = false;
2537  unsigned max_attempts = std::min(host_chain_shuffled.size(), size_t(3));
2538  vector<uint64_t> geo_order(servers->size());
2539  for (unsigned i = 0; i < max_attempts; ++i) {
2540  string url = host_chain_shuffled[i] + "/api/v1.0/geo/@proxy@/" + host_list;
2542  "(manager '%s') requesting ordered server list from %s",
2543  name_.c_str(), url.c_str());
2544  cvmfs::MemSink memsink;
2545  JobInfo info(&url, false, false, NULL, &memsink);
2546  Failures result = Fetch(&info);
2547  if (result == kFailOk) {
2548  string order(reinterpret_cast<char *>(memsink.data()), memsink.pos());
2549  memsink.Reset();
2550  bool retval = ValidateGeoReply(order, servers->size(), &geo_order);
2551  if (!retval) {
2553  "(manager '%s') retrieved invalid GeoAPI reply from %s [%s]",
2554  name_.c_str(), url.c_str(), order.c_str());
2555  } else {
2557  "(manager '%s') "
2558  "geographic order of servers retrieved from %s",
2559  name_.c_str(),
2560  dns::ExtractHost(host_chain_shuffled[i]).c_str());
2561  // remove new line at end of "order"
2562  LogCvmfs(kLogDownload, kLogDebug, "order is %s",
2563  Trim(order, true /* trim_newline */).c_str());
2564  success = true;
2565  break;
2566  }
2567  } else {
2569  "(manager '%s') GeoAPI request for %s failed with error %d [%s]",
2570  name_.c_str(), url.c_str(), result, Code2Ascii(result));
2571  }
2572  }
2573  if (!success) {
2575  "(manager '%s') "
2576  "failed to retrieve geographic order from stratum 1 servers",
2577  name_.c_str());
2578  return false;
2579  }
2580 
2581  if (output_order) {
2582  output_order->swap(geo_order);
2583  } else {
2584  std::vector<std::string> sorted_servers;
2585  sorted_servers.reserve(geo_order.size());
2586  for (unsigned i = 0; i < geo_order.size(); ++i) {
2587  uint64_t orderval = geo_order[i];
2588  sorted_servers.push_back((*servers)[orderval]);
2589  }
2590  servers->swap(sorted_servers);
2591  }
2592  return true;
2593 }
2594 
2595 
2604  vector<string> host_chain;
2605  vector<int> host_rtt;
2606  unsigned current_host;
2607  vector<vector<ProxyInfo> > proxy_chain;
2608  unsigned fallback_group;
2609 
2610  GetHostInfo(&host_chain, &host_rtt, &current_host);
2611  GetProxyInfo(&proxy_chain, NULL, &fallback_group);
2612  if ((host_chain.size() < 2) && ((proxy_chain.size() - fallback_group) < 2))
2613  return true;
2614 
2615  vector<string> host_names;
2616  for (unsigned i = 0; i < host_chain.size(); ++i)
2617  host_names.push_back(dns::ExtractHost(host_chain[i]));
2618  SortTeam(&host_names, &host_chain);
2619  unsigned last_geo_host = host_names.size();
2620 
2621  if ((fallback_group == 0) && (last_geo_host > 1)) {
2622  // There are no non-fallback proxies, which means that the client
2623  // will always use the fallback proxies. Add a keyword separator
2624  // between the hosts and fallback proxies so the geosorting service
2625  // will know to sort the hosts based on the distance from the
2626  // closest fallback proxy rather than the distance from the client.
2627  host_names.push_back("+PXYSEP+");
2628  }
2629 
2630  // Add fallback proxy names to the end of the host list
2631  unsigned first_geo_fallback = host_names.size();
2632  for (unsigned i = fallback_group; i < proxy_chain.size(); ++i) {
2633  // We only take the first fallback proxy name from every group under the
2634  // assumption that load-balanced servers are at the same location
2635  host_names.push_back(proxy_chain[i][0].host.name());
2636  }
2637 
2638  std::vector<uint64_t> geo_order;
2639  bool success = GeoSortServers(&host_names, &geo_order);
2640  if (!success) {
2641  // GeoSortServers already logged a failure message.
2642  return false;
2643  }
2644 
2645  // Re-install host chain and proxy chain
2647  delete opt_host_.chain;
2648  opt_num_proxies_ = 0;
2649  opt_host_.chain = new vector<string>(host_chain.size());
2650 
2651  // It's possible that opt_proxy_groups_fallback_ might have changed while
2652  // the lock wasn't held
2653  vector<vector<ProxyInfo> > *proxy_groups = new vector<vector<ProxyInfo> >(
2654  opt_proxy_groups_fallback_ + proxy_chain.size() - fallback_group);
2655  // First copy the non-fallback part of the current proxy chain
2656  for (unsigned i = 0; i < opt_proxy_groups_fallback_; ++i) {
2657  (*proxy_groups)[i] = (*opt_proxy_groups_)[i];
2658  opt_num_proxies_ += (*opt_proxy_groups_)[i].size();
2659  }
2660 
2661  // Copy the host chain and fallback proxies by geo order. Array indices
2662  // in geo_order that are smaller than last_geo_host refer to a stratum 1,
2663  // and those indices greater than or equal to first_geo_fallback refer to
2664  // a fallback proxy.
2665  unsigned hosti = 0;
2666  unsigned proxyi = opt_proxy_groups_fallback_;
2667  for (unsigned i = 0; i < geo_order.size(); ++i) {
2668  uint64_t orderval = geo_order[i];
2669  if (orderval < static_cast<uint64_t>(last_geo_host)) {
2670  // LogCvmfs(kLogCvmfs, kLogSyslog, "this is orderval %u at host index
2671  // %u", orderval, hosti);
2672  (*opt_host_.chain)[hosti++] = host_chain[orderval];
2673  } else if (orderval >= static_cast<uint64_t>(first_geo_fallback)) {
2674  // LogCvmfs(kLogCvmfs, kLogSyslog,
2675  // "this is orderval %u at proxy index %u, using proxy_chain index %u",
2676  // orderval, proxyi, fallback_group + orderval - first_geo_fallback);
2677  (*proxy_groups)[proxyi] = proxy_chain[fallback_group + orderval
2678  - first_geo_fallback];
2679  opt_num_proxies_ += (*proxy_groups)[proxyi].size();
2680  proxyi++;
2681  }
2682  }
2683 
2684  opt_proxy_map_.clear();
2685  delete opt_proxy_groups_;
2686  opt_proxy_groups_ = proxy_groups;
2687  // In pathological cases, opt_proxy_groups_current_ can be larger now when
2688  // proxies changed in-between.
2690  if (opt_proxy_groups_->size() == 0) {
2692  } else {
2694  }
2696  }
2697 
2698  UpdateProxiesUnlocked("geosort");
2699 
2700  delete opt_host_chain_rtt_;
2701  opt_host_chain_rtt_ = new vector<int>(host_chain.size(), kProbeGeo);
2702  opt_host_.current = 0;
2703 
2704  return true;
2705 }
2706 
2707 
2715 bool DownloadManager::ValidateGeoReply(const string &reply_order,
2716  const unsigned expected_size,
2717  vector<uint64_t> *reply_vals) {
2718  if (reply_order.empty())
2719  return false;
2720  sanitizer::InputSanitizer sanitizer("09 , \n");
2721  if (!sanitizer.IsValid(reply_order))
2722  return false;
2723  sanitizer::InputSanitizer strip_newline("09 ,");
2724  vector<string> reply_strings = SplitString(strip_newline.Filter(reply_order),
2725  ',');
2726  vector<uint64_t> tmp_vals;
2727  for (unsigned i = 0; i < reply_strings.size(); ++i) {
2728  if (reply_strings[i].empty())
2729  return false;
2730  tmp_vals.push_back(String2Uint64(reply_strings[i]));
2731  }
2732  if (tmp_vals.size() != expected_size)
2733  return false;
2734 
2735  // Check if tmp_vals contains the number 1..n
2736  set<uint64_t> coverage(tmp_vals.begin(), tmp_vals.end());
2737  if (coverage.size() != tmp_vals.size())
2738  return false;
2739  if ((*coverage.begin() != 1) || (*coverage.rbegin() != coverage.size()))
2740  return false;
2741 
2742  for (unsigned i = 0; i < expected_size; ++i) {
2743  (*reply_vals)[i] = tmp_vals[i] - 1;
2744  }
2745  return true;
2746 }
2747 
2748 
2753 bool DownloadManager::StripDirect(const string &proxy_list,
2754  string *cleaned_list) {
2755  assert(cleaned_list);
2756  if (proxy_list == "") {
2757  *cleaned_list = "";
2758  return false;
2759  }
2760  bool result = false;
2761 
2762  vector<string> proxy_groups = SplitString(proxy_list, ';');
2763  vector<string> cleaned_groups;
2764  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2765  vector<string> group = SplitString(proxy_groups[i], '|');
2766  vector<string> cleaned;
2767  for (unsigned j = 0; j < group.size(); ++j) {
2768  if ((group[j] == "DIRECT") || (group[j] == "")) {
2769  result = true;
2770  } else {
2771  cleaned.push_back(group[j]);
2772  }
2773  }
2774  if (!cleaned.empty())
2775  cleaned_groups.push_back(JoinStrings(cleaned, "|"));
2776  }
2777 
2778  *cleaned_list = JoinStrings(cleaned_groups, ";");
2779  return result;
2780 }
2781 
2782 
2791 void DownloadManager::SetProxyChain(const string &proxy_list,
2792  const string &fallback_proxy_list,
2793  const ProxySetModes set_mode) {
2795 
2798  string set_proxy_list = opt_proxy_list_;
2799  string set_proxy_fallback_list = opt_proxy_fallback_list_;
2800  bool contains_direct;
2801  if ((set_mode == kSetProxyFallback) || (set_mode == kSetProxyBoth)) {
2802  opt_proxy_fallback_list_ = fallback_proxy_list;
2803  }
2804  if ((set_mode == kSetProxyRegular) || (set_mode == kSetProxyBoth)) {
2805  opt_proxy_list_ = proxy_list;
2806  }
2807  contains_direct = StripDirect(opt_proxy_fallback_list_,
2808  &set_proxy_fallback_list);
2809  if (contains_direct) {
2811  "(manager '%s') fallback proxies do not support DIRECT, removing",
2812  name_.c_str());
2813  }
2814  if (set_proxy_fallback_list == "") {
2815  set_proxy_list = opt_proxy_list_;
2816  } else {
2817  bool contains_direct = StripDirect(opt_proxy_list_, &set_proxy_list);
2818  if (contains_direct) {
2820  "(manager '%s') skipping DIRECT proxy to use fallback proxy",
2821  name_.c_str());
2822  }
2823  }
2824 
2825  // From this point on, use set_proxy_list and set_fallback_proxy_list as
2826  // effective proxy lists!
2827 
2828  opt_proxy_map_.clear();
2829  delete opt_proxy_groups_;
2830  if ((set_proxy_list == "") && (set_proxy_fallback_list == "")) {
2831  opt_proxy_groups_ = NULL;
2835  opt_num_proxies_ = 0;
2836  return;
2837  }
2838 
2839  // Determine number of regular proxy groups (== first fallback proxy group)
2841  if (set_proxy_list != "") {
2842  opt_proxy_groups_fallback_ = SplitString(set_proxy_list, ';').size();
2843  }
2845  "(manager '%s') "
2846  "first fallback proxy group %u",
2848 
2849  // Concatenate regular proxies and fallback proxies, both of which can be
2850  // empty.
2851  string all_proxy_list = set_proxy_list;
2852  if (set_proxy_fallback_list != "") {
2853  if (all_proxy_list != "")
2854  all_proxy_list += ";";
2855  all_proxy_list += set_proxy_fallback_list;
2856  }
2857  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s') full proxy list %s",
2858  name_.c_str(), all_proxy_list.c_str());
2859 
2860  // Resolve server names in provided urls
2861  vector<string> hostnames; // All encountered hostnames
2862  vector<string> proxy_groups;
2863  if (all_proxy_list != "")
2864  proxy_groups = SplitString(all_proxy_list, ';');
2865  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2866  vector<string> this_group = SplitString(proxy_groups[i], '|');
2867  for (unsigned j = 0; j < this_group.size(); ++j) {
2868  this_group[j] = dns::AddDefaultScheme(this_group[j]);
2869  // Note: DIRECT strings will be "extracted" to an empty string.
2870  string hostname = dns::ExtractHost(this_group[j]);
2871  // Save the hostname. Leave empty (DIRECT) names so indexes will
2872  // match later.
2873  hostnames.push_back(hostname);
2874  }
2875  }
2876  vector<dns::Host> hosts;
2878  "(manager '%s') "
2879  "resolving %lu proxy addresses",
2880  name_.c_str(), hostnames.size());
2881  resolver_->ResolveMany(hostnames, &hosts);
2882 
2883  // Construct opt_proxy_groups_: traverse proxy list in same order and expand
2884  // names to resolved IP addresses.
2885  opt_proxy_groups_ = new vector<vector<ProxyInfo> >();
2886  opt_num_proxies_ = 0;
2887  unsigned num_proxy = 0; // Combined i, j counter
2888  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2889  vector<string> this_group = SplitString(proxy_groups[i], '|');
2890  // Construct ProxyInfo objects from proxy string and DNS resolver result for
2891  // every proxy in this_group. One URL can result in multiple ProxyInfo
2892  // objects, one for each IP address.
2893  vector<ProxyInfo> infos;
2894  for (unsigned j = 0; j < this_group.size(); ++j, ++num_proxy) {
2895  this_group[j] = dns::AddDefaultScheme(this_group[j]);
2896  if (this_group[j] == "DIRECT") {
2897  infos.push_back(ProxyInfo("DIRECT"));
2898  continue;
2899  }
2900 
2901  if (hosts[num_proxy].status() != dns::kFailOk) {
2903  "(manager '%s') "
2904  "failed to resolve IP addresses for %s (%d - %s)",
2905  name_.c_str(), hosts[num_proxy].name().c_str(),
2906  hosts[num_proxy].status(),
2907  dns::Code2Ascii(hosts[num_proxy].status()));
2908  dns::Host failed_host = dns::Host::ExtendDeadline(hosts[num_proxy],
2909  resolver_->min_ttl());
2910  infos.push_back(ProxyInfo(failed_host, this_group[j]));
2911  continue;
2912  }
2913 
2914  // IPv4 addresses have precedence
2915  set<string> best_addresses = hosts[num_proxy].ViewBestAddresses(
2917  set<string>::const_iterator iter_ips = best_addresses.begin();
2918  for (; iter_ips != best_addresses.end(); ++iter_ips) {
2919  string url_ip = dns::RewriteUrl(this_group[j], *iter_ips);
2920  infos.push_back(ProxyInfo(hosts[num_proxy], url_ip));
2921 
2922  if (sharding_policy_.UseCount() > 0) {
2923  sharding_policy_->AddProxy(url_ip);
2924  }
2925  }
2926  }
2927  opt_proxy_groups_->push_back(infos);
2928  opt_num_proxies_ += infos.size();
2929  }
2931  "(manager '%s') installed %u proxies in %lu load-balance groups",
2932  name_.c_str(), opt_num_proxies_, opt_proxy_groups_->size());
2935 
2936  // Select random start proxy from the first group.
2937  if (opt_proxy_groups_->size() > 0) {
2938  // Select random start proxy from the first group.
2939  UpdateProxiesUnlocked("set random start proxy from the first proxy group");
2940  }
2941 }
2942 
2943 
2950 void DownloadManager::GetProxyInfo(vector<vector<ProxyInfo> > *proxy_chain,
2951  unsigned *current_group,
2952  unsigned *fallback_group) {
2953  assert(proxy_chain != NULL);
2955 
2956 
2957  if (!opt_proxy_groups_) {
2958  vector<vector<ProxyInfo> > empty_chain;
2959  *proxy_chain = empty_chain;
2960  if (current_group != NULL)
2961  *current_group = 0;
2962  if (fallback_group != NULL)
2963  *fallback_group = 0;
2964  return;
2965  }
2966 
2967  *proxy_chain = *opt_proxy_groups_;
2968  if (current_group != NULL)
2969  *current_group = opt_proxy_groups_current_;
2970  if (fallback_group != NULL)
2971  *fallback_group = opt_proxy_groups_fallback_;
2972 }
2973 
2975 
2977  return opt_proxy_fallback_list_;
2978 }
2979 
2984  const shash::Any *hash) {
2985  if (!opt_proxy_groups_)
2986  return NULL;
2987 
2988  uint32_t key = (hash ? hash->Partial32() : 0);
2989  map<uint32_t, ProxyInfo *>::iterator it = opt_proxy_map_.lower_bound(key);
2990  ProxyInfo *proxy = it->second;
2991 
2992  return proxy;
2993 }
2994 
2998 void DownloadManager::UpdateProxiesUnlocked(const string &reason) {
2999  if (!opt_proxy_groups_)
3000  return;
3001 
3002  // Identify number of non-burned proxies within the current group
3003  vector<ProxyInfo> *group = current_proxy_group();
3004  unsigned num_alive = (group->size() - opt_proxy_groups_current_burned_);
3005  string old_proxy = JoinStrings(opt_proxies_, "|");
3006 
3007  // Rebuild proxy map and URL list
3008  opt_proxy_map_.clear();
3009  opt_proxies_.clear();
3010  const uint32_t max_key = 0xffffffffUL;
3011  if (opt_proxy_shard_) {
3012  // Build a consistent map with multiple entries for each proxy
3013  for (unsigned i = 0; i < num_alive; ++i) {
3014  ProxyInfo *proxy = &(*group)[i];
3015  shash::Any proxy_hash(shash::kSha1);
3016  HashString(proxy->url, &proxy_hash);
3017  Prng prng;
3018  prng.InitSeed(proxy_hash.Partial32());
3019  for (unsigned j = 0; j < kProxyMapScale; ++j) {
3020  const std::pair<uint32_t, ProxyInfo *> entry(prng.Next(max_key), proxy);
3021  opt_proxy_map_.insert(entry);
3022  }
3023  std::string proxy_name = proxy->host.name().empty()
3024  ? ""
3025  : " (" + proxy->host.name() + ")";
3026  opt_proxies_.push_back(proxy->url + proxy_name);
3027  }
3028  // Ensure lower_bound() finds a value for all keys
3029  ProxyInfo *first_proxy = opt_proxy_map_.begin()->second;
3030  const std::pair<uint32_t, ProxyInfo *> last_entry(max_key, first_proxy);
3031  opt_proxy_map_.insert(last_entry);
3032  } else {
3033  // Build a map with a single entry for one randomly selected proxy
3034  unsigned select = prng_.Next(num_alive);
3035  ProxyInfo *proxy = &(*group)[select];
3036  const std::pair<uint32_t, ProxyInfo *> entry(max_key, proxy);
3037  opt_proxy_map_.insert(entry);
3038  std::string proxy_name = proxy->host.name().empty()
3039  ? ""
3040  : " (" + proxy->host.name() + ")";
3041  opt_proxies_.push_back(proxy->url + proxy_name);
3042  }
3043  sort(opt_proxies_.begin(), opt_proxies_.end());
3044 
3045  // Report any change in proxy usage
3046  string new_proxy = JoinStrings(opt_proxies_, "|");
3047  const string curr_host = "Current host: "
3048  + (opt_host_.chain
3050  : "");
3051  if (new_proxy != old_proxy) {
3053  "(manager '%s') switching proxy from %s to %s. Reason: %s [%s]",
3054  name_.c_str(), (old_proxy.empty() ? "(none)" : old_proxy.c_str()),
3055  (new_proxy.empty() ? "(none)" : new_proxy.c_str()), reason.c_str(),
3056  curr_host.c_str());
3057  }
3058 }
3059 
3064  opt_proxy_shard_ = true;
3065  RebalanceProxiesUnlocked("enable sharding");
3066 }
3067 
3072 void DownloadManager::RebalanceProxiesUnlocked(const string &reason) {
3073  if (!opt_proxy_groups_)
3074  return;
3075 
3078  UpdateProxiesUnlocked(reason);
3079 }
3080 
3081 
3084  RebalanceProxiesUnlocked("rebalance invoked manually");
3085 }
3086 
3087 
3093 
3094  if (!opt_proxy_groups_ || (opt_proxy_groups_->size() < 2)) {
3095  return;
3096  }
3097 
3099  % opt_proxy_groups_->size();
3100  opt_timestamp_backup_proxies_ = time(NULL);
3101 
3102  std::string msg = "switch to proxy group "
3105 }
3106 
3107 
3108 void DownloadManager::SetProxyGroupResetDelay(const unsigned seconds) {
3111  if (opt_proxy_groups_reset_after_ == 0) {
3114  }
3115 }
3116 
3117 
3118 void DownloadManager::SetMetalinkResetDelay(const unsigned seconds) {
3119  const MutexLockGuard m(lock_options_);
3120  opt_metalink_.reset_after = seconds;
3121  if (opt_metalink_.reset_after == 0)
3123 }
3124 
3125 
3126 void DownloadManager::SetHostResetDelay(const unsigned seconds) {
3128  opt_host_.reset_after = seconds;
3129  if (opt_host_.reset_after == 0)
3131 }
3132 
3133 
3134 void DownloadManager::SetRetryParameters(const unsigned max_retries,
3135  const unsigned backoff_init_ms,
3136  const unsigned backoff_max_ms) {
3138  opt_max_retries_ = max_retries;
3139  opt_backoff_init_ms_ = backoff_init_ms;
3140  opt_backoff_max_ms_ = backoff_max_ms;
3141 }
3142 
3143 
3146  resolver_->set_throttle(limit);
3147 }
3148 
3149 
3150 void DownloadManager::SetProxyTemplates(const std::string &direct,
3151  const std::string &forced) {
3153  proxy_template_direct_ = direct;
3154  proxy_template_forced_ = forced;
3155 }
3156 
3157 
3159 
3160 
3162 
3165 }
3166 
3168 
3169 void DownloadManager::AddHTTPTracingHeader(const std::string &header) {
3170  http_tracing_headers_.push_back(header);
3171 }
3172 
3175 }
3176 
3178  bool success = false;
3179  switch (type) {
3180  default:
3181  LogCvmfs(
3183  "(manager '%s') "
3184  "Proposed sharding policy does not exist. Falling back to default",
3185  name_.c_str());
3186  }
3187  return success;
3188 }
3189 
3191  failover_indefinitely_ = true;
3192 }
3193 
3199  const perf::StatisticsTemplate &statistics,
3200  const std::string &cloned_name) {
3201  DownloadManager *clone = new DownloadManager(pool_max_handles_, statistics,
3202  cloned_name);
3203 
3207 
3208  if (!opt_dns_server_.empty())
3209  clone->SetDnsServer(opt_dns_server_);
3221  if (opt_host_.chain) {
3222  clone->opt_host_.chain = new vector<string>(*opt_host_.chain);
3223  clone->opt_host_chain_rtt_ = new vector<int>(*opt_host_chain_rtt_);
3224  }
3225 
3226  CloneProxyConfig(clone);
3235 
3236  clone->health_check_ = health_check_;
3239  clone->fqrn_ = fqrn_;
3240 
3241  return clone;
3242 }
3243 
3244 
3253  if (opt_proxy_groups_ == NULL)
3254  return;
3255 
3256  clone->opt_proxy_groups_ = new vector<vector<ProxyInfo> >(*opt_proxy_groups_);
3257  clone->UpdateProxiesUnlocked("cloned");
3258 }
3259 
3260 } // namespace download
unsigned opt_timeout_direct_
Definition: download.h:328
std::vector< std::string > http_tracing_headers_
Definition: download.h:344
bool StripDirect(const std::string &proxy_list, std::string *cleaned_list)
Definition: download.cc:2753
unsigned opt_low_speed_limit_
Definition: download.h:329
void HashString(const std::string &content, Any *any_digest)
Definition: hash.cc:269
Definition: prng.h:27
static const unsigned kDnsDefaultTimeoutMs
Definition: download.h:173
std::vector< T > Shuffle(const std::vector< T > &input, Prng *prng)
Definition: algorithm.h:50
unsigned throttle() const
Definition: dns.h:206
z_stream * GetZstreamPtr()
Definition: jobinfo.h:160
unsigned opt_backoff_init_ms_
Definition: download.h:331
void SetInfoHeader(char *info_header)
Definition: jobinfo.h:242
shash::ContextPtr * GetHashContextPtr()
Definition: jobinfo.h:165
uid_t uid() const
Definition: jobinfo.h:178
int64_t Xadd(class Counter *counter, const int64_t delta)
Definition: statistics.h:51
struct stat64 platform_stat64
const char * Code2Ascii(const Failures error)
Definition: dns.h:53
bool Write(const T &data)
Definition: pipe.h:123
int64_t id() const
Definition: dns.h:108
unsigned opt_proxy_groups_current_burned_
Definition: download.h:368
double DiffTimeSeconds(struct timeval start, struct timeval end)
Definition: algorithm.cc:31
unsigned opt_proxy_groups_reset_after_
Definition: download.h:462
bool probe_hosts() const
Definition: jobinfo.h:173
virtual bool IsCanceled()
Definition: interrupt.h:16
void SetUrlOptions(JobInfo *info)
Definition: download.cc:1026
SharedPtr< ShardingPolicy > sharding_policy_
Definition: download.h:407
StreamStates DecompressZStream2Sink(const void *buf, const int64_t size, z_stream *strm, cvmfs::Sink *sink)
Definition: compression.cc:233
void ResolveMany(const std::vector< std::string > &names, std::vector< Host > *hosts)
Definition: dns.cc:355
std::string opt_proxy_fallback_list_
Definition: download.h:385
void SetHostChain(const std::string &host_list)
bool CheckMetalinkChain(const time_t now)
Definition: download.cc:2435
static NormalResolver * Create(const bool ipv4_only, const unsigned retries, const unsigned timeout_ms)
Definition: dns.cc:1227
int64_t id() const
Definition: jobinfo.h:213
void SetMetalinkChain(const std::string &metalink_list)
void SetLowSpeedLimit(const unsigned low_speed_limit)
Definition: download.cc:2173
virtual int Purge()=0
DownloadManager(const unsigned max_pool_handles, const perf::StatisticsTemplate &statistics, const std::string &name="standard")
Definition: download.cc:1878
std::string proxy_template_direct_
Definition: download.h:446
std::vector< std::string > opt_proxies_
Definition: download.h:393
curl_slist ** GetHeadersPtr()
Definition: jobinfo.h:163
static const int kProbeGeo
Definition: download.h:170
#define PANIC(...)
Definition: exception.h:29
string Trim(const string &raw, bool trim_newline)
Definition: string.cc:466
string ReplaceAll(const string &haystack, const string &needle, const string &replace_by)
Definition: string.cc:516
void set_min_ttl(unsigned seconds)
Definition: dns.h:207
string JoinStrings(const vector< string > &strings, const string &joint)
Definition: string.cc:356
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:241
unsigned opt_proxy_groups_current_
Definition: download.h:363
void SetCurrentHostChainIndex(int current_host_chain_index)
Definition: jobinfo.h:275
virtual bool RequiresReserve()=0
bool ValidateGeoReply(const std::string &reply_order, const unsigned expected_size, std::vector< uint64_t > *reply_vals)
Definition: download.cc:2715
std::vector< ProxyInfo > * current_proxy_group() const
Definition: download.h:299
void DecompressInit(z_stream *strm)
Definition: compression.cc:190
const std::string * url() const
Definition: jobinfo.h:171
void * cred_data() const
Definition: jobinfo.h:180
time_t opt_timestamp_backup_proxies_
Definition: download.h:460
void SetProxyChain(const std::string &proxy_list, const std::string &fallback_proxy_list, const ProxySetModes set_mode)
Definition: download.cc:2791
std::string GetProxyList()
Definition: download.cc:2974
unsigned min_ttl() const
Definition: dns.h:208
void InitLocaltime()
Definition: prng.h:34
bool allow_failure() const
Definition: jobinfo.h:212
void GetMetalinkInfo(std::vector< std::string > *metalink_chain, unsigned *current_metalink)
Definition: download.cc:2219
std::set< CURL * > * pool_handles_inuse_
Definition: download.h:307
CURL * curl_handle() const
Definition: jobinfo.h:189
pthread_mutex_t * lock_options_
Definition: download.h:324
ProxyInfo * ChooseProxyUnlocked(const shash::Any *hash)
Definition: download.cc:2983
pthread_t thread_download_
Definition: download.h:314
std::string opt_proxy_list_
Definition: download.h:381
const std::string & name() const
Definition: dns.h:118
perf::Counter * sz_transfer_time
Definition: download.h:43
void SetTracingHeaderGid(char *tracing_header_gid)
Definition: jobinfo.h:246
assert((mem||(size==0))&&"Out Of Memory")
void SetNocache(bool nocache)
Definition: jobinfo.h:258
unsigned opt_proxy_groups_fallback_
Definition: download.h:373
void ReleaseCurlHandle(CURL *handle)
Definition: download.cc:899
bool IsValidDataTube()
Definition: jobinfo.h:148
static bool sortlinks(const std::string &s1, const std::string &s2)
Definition: download.cc:1388
StreamStates
Definition: compression.h:36
void SetTracingHeaderPid(char *tracing_header_pid)
Definition: jobinfo.h:243
void set_max_ttl(unsigned seconds)
Definition: dns.h:209
Tube< DataTubeElement > * GetDataTubePtr()
Definition: jobinfo.h:169
Algorithms algorithm
Definition: hash.h:122
char * tracing_header_gid() const
Definition: jobinfo.h:193
const std::string path()
Definition: sink_path.h:98
const std::set< std::string > & ViewBestAddresses(IpPreference preference) const
Definition: dns.cc:198
char * info_header() const
Definition: jobinfo.h:191
void SetDnsServer(const std::string &address)
Definition: download.cc:2108
int platform_stat(const char *path, platform_stat64 *buf)
DownloadManager * Clone(const perf::StatisticsTemplate &statistics, const std::string &cloned_name)
Definition: download.cc:3198
bool force_nocache() const
Definition: jobinfo.h:176
std::string StringifyUint(const uint64_t value)
Definition: string.cc:83
void DecompressFini(z_stream *strm)
Definition: compression.cc:204
virtual std::string Describe()=0
static void * MainDownload(void *data)
Definition: download.cc:567
void InitSeed(const uint64_t seed)
Definition: prng.h:32
void SetTimeout(const unsigned seconds_proxy, const unsigned seconds_direct)
Definition: download.cc:2160
void SetLink(const std::string &link)
Definition: jobinfo.h:257
std::string opt_dns_server_
Definition: download.h:326
void SwitchHostInfo(const std::string &typ, HostInfo &info, JobInfo *jobinfo)
Definition: download.cc:2362
off_t range_offset() const
Definition: jobinfo.h:186
bool nocache() const
Definition: jobinfo.h:199
char algorithm
unsigned char num_used_hosts() const
Definition: jobinfo.h:204
virtual bool IsValid()=0
bool follow_redirects() const
Definition: jobinfo.h:175
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:139
void Init(ContextPtr context)
Definition: hash.cc:166
int http_code() const
Definition: jobinfo.h:201
bool IsValid(const std::string &input) const
Definition: sanitizer.cc:112
Algorithms
Definition: hash.h:41
void CreateDataTube()
Definition: jobinfo.h:143
void SetNumUsedMetalinks(unsigned char num_used_metalinks)
Definition: jobinfo.h:264
bool FileExists(const std::string &path)
Definition: posix.cc:803
unsigned max_ttl() const
Definition: dns.h:210
Pipe< kPipeDownloadJobsResults > * GetPipeJobResultPtr()
Definition: jobinfo.h:166
void SetDnsTtlLimits(const unsigned min_seconds, const unsigned max_seconds)
Definition: download.cc:2141
static Failures PrepareDownloadDestination(JobInfo *info)
Definition: download.cc:115
perf::Counter * n_metalink_failover
Definition: download.h:46
void SetHttpCode(int http_code)
Definition: jobinfo.h:260
DataTubeAction action
Definition: jobinfo.h:50
const char * Code2Ascii(const Failures error)
bool head_request() const
Definition: jobinfo.h:174
unsigned char num_retries() const
Definition: jobinfo.h:205
std::vector< std::string > * chain
Definition: download.h:136
bool IsValidPipeJobResults()
Definition: jobinfo.h:141
std::vector< std::vector< ProxyInfo > > * opt_proxy_groups_
Definition: download.h:359
off_t range_size() const
Definition: jobinfo.h:187
void GetProxyInfo(std::vector< std::vector< ProxyInfo > > *proxy_chain, unsigned *current_group, unsigned *fallback_group)
Definition: download.cc:2950
void SetProxyGroupResetDelay(const unsigned seconds)
Definition: download.cc:3108
void SetCurrentMetalinkChainIndex(int current_metalink_chain_index)
Definition: jobinfo.h:272
atomic_int32 multi_threaded_
Definition: download.h:315
bool compressed() const
Definition: jobinfo.h:172
std::string AddDefaultScheme(const std::string &proxy)
Definition: dns.cc:182
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:306
cvmfs::Sink * sink() const
Definition: jobinfo.h:182
gid_t gid() const
Definition: jobinfo.h:179
dns::NormalResolver * resolver_
Definition: download.h:434
bool Interrupted(const std::string &fqrn, JobInfo *info)
Definition: download.cc:85
std::string proxy() const
Definition: jobinfo.h:197
dns::IpPreference opt_ip_preference_
Definition: download.h:439
Algorithms algorithm
Definition: hash.h:488
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:254
void UseSystemCertificatePath()
Definition: ssl.cc:71
Definition: dns.h:90
bool SetShardingPolicy(const ShardingPolicySelector type)
Definition: download.cc:3177
perf::Counter * n_host_failover
Definition: download.h:47
void UpdateProxiesUnlocked(const std::string &reason)
Definition: download.cc:2998
time_t opt_metalink_timestamp_link_
Definition: download.h:348
bool IsEquivalent(const Host &other) const
Definition: dns.cc:257
std::string link() const
Definition: jobinfo.h:198
bool IsProxyTransferError(const Failures error)
void SetNumRetries(unsigned char num_retries)
Definition: jobinfo.h:270
bool IsExpired() const
Definition: dns.cc:267
void set_throttle(const unsigned throttle)
Definition: dns.h:205
InterruptCue * interrupt_cue() const
Definition: jobinfo.h:181
void SetIpPreference(const dns::IpPreference preference)
Definition: download.cc:2149
virtual int Reset()=0
shash::ContextPtr hash_context() const
Definition: jobinfo.h:196
perf::Counter * n_requests
Definition: download.h:44
bool Read(T *data)
Definition: pipe.h:164
void Final(ContextPtr context, Any *any_digest)
Definition: hash.cc:223
void SetRetryParameters(const unsigned max_retries, const unsigned backoff_init_ms, const unsigned backoff_max_ms)
Definition: download.cc:3134
string StringifyInt(const int64_t value)
Definition: string.cc:77
char * tracing_header_uid() const
Definition: jobinfo.h:194
Failures error_code() const
Definition: jobinfo.h:200
void CloneProxyConfig(DownloadManager *clone)
Definition: download.cc:3245
void SetMaxIpaddrPerProxy(unsigned limit)
Definition: download.cc:3144
void EnableIgnoreSignatureFailures()
Definition: download.cc:3163
void Inc(class Counter *counter)
Definition: statistics.h:50
void * buffer
Definition: hash.h:489
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:279
std::string ExtractHost(const std::string &url)
Definition: dns.cc:108
void ** GetCredDataPtr()
Definition: jobinfo.h:162
std::vector< int > * opt_host_chain_rtt_
Definition: download.h:356
SslCertificateStore ssl_certificate_store_
Definition: download.h:475
std::string GetFallbackProxyList()
Definition: download.cc:2976
uint32_t Partial32() const
Definition: hash.h:382
void SetProxyTemplates(const std::string &direct, const std::string &forced)
Definition: download.cc:3150
IpPreference
Definition: dns.h:46
unsigned retries() const
Definition: dns.h:203
unsigned opt_backoff_max_ms_
Definition: download.h:332
void SetErrorCode(Failures error_code)
Definition: jobinfo.h:259
void SetNumUsedProxies(unsigned char num_used_proxies)
Definition: jobinfo.h:261
unsigned GetContextSize(const Algorithms algorithm)
Definition: hash.cc:150
std::string GetDnsServer() const
Definition: download.cc:2102
int current_metalink_chain_index() const
Definition: jobinfo.h:207
CredentialsAttachment * credentials_attachment_
Definition: download.h:464
struct pollfd * watch_fds_
Definition: download.h:319
void Update(const unsigned char *buffer, const unsigned buffer_length, ContextPtr context)
Definition: hash.cc:192
std::map< uint32_t, ProxyInfo * > opt_proxy_map_
Definition: download.h:389
uint64_t String2Uint64(const string &value)
Definition: string.cc:240
UniquePtr< Pipe< kPipeDownloadJobs > > pipe_jobs_
Definition: download.h:318
void CreatePipeJobResults()
Definition: jobinfo.h:137
Failures Fetch(JobInfo *info)
Definition: download.cc:1982
unsigned char num_used_metalinks() const
Definition: jobinfo.h:203
void SetCurlHandle(CURL *curl_handle)
Definition: jobinfo.h:240
void SetTracingHeaderUid(char *tracing_header_uid)
Definition: jobinfo.h:249
Definition: mutex.h:42
curl_slist * headers() const
Definition: jobinfo.h:190
const shash::Any * expected_hash() const
Definition: jobinfo.h:183
perf::Counter * n_proxy_failover
Definition: download.h:48
virtual int Flush()=0
void GetTimeout(unsigned *seconds_proxy, unsigned *seconds_direct)
Definition: download.cc:2182
void SetCredData(void *cred_data)
Definition: jobinfo.h:227
std::string Filter(const std::string &input) const
Definition: sanitizer.cc:105
std::string proxy_template_forced_
Definition: download.h:452
time_t opt_timestamp_failover_proxies_
Definition: download.h:461
void SetDnsParameters(const unsigned retries, const unsigned timeout_ms)
Definition: download.cc:2127
unsigned EscapeHeader(const std::string &header, char *escaped_buf, size_t buf_size)
Definition: download.cc:441
const std::string * extra_info() const
Definition: jobinfo.h:184
static Host ExtendDeadline(const Host &original, unsigned seconds_from_now)
Definition: dns.cc:223
UniquePtr< Pipe< kPipeThreadTerminator > > pipe_terminate_
Definition: download.h:316
void SetProxy(const std::string &proxy)
Definition: jobinfo.h:256
static const int kProbeUnprobed
Definition: download.h:161
unsigned backoff_ms() const
Definition: jobinfo.h:206
virtual int Reset()
Definition: sink_mem.cc:52
pid_t pid() const
Definition: jobinfo.h:177
unsigned char num_used_proxies() const
Definition: jobinfo.h:202
int current_host_chain_index() const
Definition: jobinfo.h:210
void SafeSleepMs(const unsigned ms)
Definition: posix.cc:2024
void SetMetalinkResetDelay(const unsigned seconds)
Definition: download.cc:3118
bool IsHostTransferError(const Failures error)
static const unsigned kDnsDefaultRetries
Definition: download.h:172
void SortTeam(std::vector< T > *tractor, std::vector< U > *towed)
Definition: algorithm.h:68
bool GeoSortServers(std::vector< std::string > *servers, std::vector< uint64_t > *output_order=NULL)
Definition: download.cc:2504
virtual bool Reserve(size_t size)=0
static const int kProbeDown
Definition: download.h:166
void SetNumUsedHosts(unsigned char num_used_hosts)
Definition: jobinfo.h:267
unsigned size
Definition: hash.h:490
void SetHeaders(curl_slist *headers)
Definition: jobinfo.h:241
Failures status() const
Definition: dns.h:119
static const unsigned kProxyMapScale
Definition: download.h:174
void GetHostInfo(std::vector< std::string > *host_chain, std::vector< int > *rtt, unsigned *current_host)
Definition: download.cc:2267
static void size_t size
Definition: smalloc.h:54
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
Definition: download.cc:1468
char * tracing_header_pid() const
Definition: jobinfo.h:192
void SetBackoffMs(unsigned backoff_ms)
Definition: jobinfo.h:271
SharedPtr< HealthCheck > health_check_
Definition: download.h:415
void SwitchProxy(JobInfo *info)
Definition: download.cc:2292
void AddHTTPTracingHeader(const std::string &header)
Definition: download.cc:3169
void SetCredentialsAttachment(CredentialsAttachment *ca)
Definition: download.cc:2094
void SetFollowRedirects(bool follow_redirects)
Definition: jobinfo.h:220
void RebalanceProxiesUnlocked(const std::string &reason)
Definition: download.cc:3072
pthread_mutex_t * lock_synchronous_mode_
Definition: download.h:325
virtual bool SetResolvers(const std::vector< std::string > &resolvers)
Definition: dns.cc:1256
void InitializeRequest(JobInfo *info, CURL *handle)
Definition: download.cc:917
static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, void *userp, void *socketp)
Definition: download.cc:488
string RewriteUrl(const string &url, const string &ip)
Definition: dns.cc:152
void SetHostResetDelay(const unsigned seconds)
Definition: download.cc:3126
uint32_t Next(const uint64_t boundary)
Definition: prng.h:44
virtual int64_t Write(const void *buf, uint64_t sz)=0
unsigned timeout_ms() const
Definition: dns.h:204
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545