CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
download.cc
Go to the documentation of this file.
1 
27 // TODO(jblomer): MS for time summing
28 // NOLINTNEXTLINE
29 #define __STDC_FORMAT_MACROS
30 
31 
32 #include "download.h"
33 
34 #include <alloca.h>
35 #include <errno.h>
36 #include <inttypes.h>
37 #include <poll.h>
38 #include <pthread.h>
39 #include <signal.h>
40 #include <stdint.h>
41 #include <sys/time.h>
42 #include <unistd.h>
43 
44 #include <algorithm>
45 #include <cassert>
46 #include <cstdio>
47 #include <cstdlib>
48 #include <cstring>
49 #include <map>
50 #include <set>
51 #include <utility>
52 
54 #include "crypto/hash.h"
55 #include "duplex_curl.h"
56 #include "interrupt.h"
57 #include "sanitizer.h"
58 #include "ssl.h"
59 #include "util/algorithm.h"
60 #include "util/atomic.h"
61 #include "util/concurrency.h"
62 #include "util/exception.h"
63 #include "util/logging.h"
64 #include "util/posix.h"
65 #include "util/prng.h"
66 #include "util/smalloc.h"
67 #include "util/string.h"
68 
69 using namespace std; // NOLINT
70 
71 namespace download {
72 
85 bool Interrupted(const std::string &fqrn, JobInfo *info) {
86  if (info->allow_failure()) {
87  return true;
88  }
89 
90  if (!fqrn.empty()) {
91  // it is up to the user the create this sentinel file ("pause_file") if
92  // CVMFS_FAILOVER_INDEFINITELY is used. It must be created during
93  // "cvmfs_config reload" and "cvmfs_config reload $fqrn"
94  std::string pause_file = std::string("/var/run/cvmfs/interrupt.") + fqrn;
95 
97  "(id %" PRId64 ") Interrupted(): checking for existence of %s",
98  info->id(), pause_file.c_str());
99  if (FileExists(pause_file)) {
101  "(id %" PRId64 ") Interrupt marker found - "
102  "Interrupting current download, this will EIO outstanding IO.",
103  info->id());
104  if (0 != unlink(pause_file.c_str())) {
106  "(id %" PRId64 ") Couldn't delete interrupt marker: errno=%d",
107  info->id(), errno);
108  }
109  return true;
110  }
111  }
112  return false;
113 }
114 
116  if (info->sink() != NULL && !info->sink()->IsValid()) {
117  cvmfs::PathSink* psink = dynamic_cast<cvmfs::PathSink*>(info->sink());
118  if (psink != NULL) {
120  "(id %" PRId64 ") Failed to open path %s: %s (errno=%d).",
121  info->id(), psink->path().c_str(), strerror(errno), errno);
122  return kFailLocalIO;
123  } else {
124  LogCvmfs(kLogDownload, kLogDebug, "(id %" PRId64 ") "
125  "Failed to create a valid sink: \n %s",
126  info->id(), info->sink()->Describe().c_str());
127  return kFailOther;
128  }
129  }
130 
131  return kFailOk;
132 }
133 
134 
138 static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb,
139  void *info_link)
140 {
141  const size_t num_bytes = size*nmemb;
142  const string header_line(static_cast<const char *>(ptr), num_bytes);
143  JobInfo *info = static_cast<JobInfo *>(info_link);
144 
145  // LogCvmfs(kLogDownload, kLogDebug, "REMOVE-ME: Header callback with %s",
146  // header_line.c_str());
147 
148  // Check http status codes
149  if (HasPrefix(header_line, "HTTP/1.", false)) {
150  if (header_line.length() < 10) {
151  return 0;
152  }
153 
154  unsigned i;
155  for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {}
156 
157  // Code is initialized to -1
158  if (header_line.length() > i+2) {
159  info->SetHttpCode(DownloadManager::ParseHttpCode(&header_line[i]));
160  }
161 
162  if ((info->http_code() / 100) == 2) {
163  return num_bytes;
164  } else if ((info->http_code() == 301) ||
165  (info->http_code() == 302) ||
166  (info->http_code() == 303) ||
167  (info->http_code() == 307))
168  {
169  if (!info->follow_redirects()) {
171  "(id %" PRId64 ") redirect support not enabled: %s",
172  info->id(), header_line.c_str());
174  return 0;
175  }
176  LogCvmfs(kLogDownload, kLogDebug, "(id %" PRId64 ") http redirect: %s",
177  info->id(), header_line.c_str());
178  // libcurl will handle this because of CURLOPT_FOLLOWLOCATION
179  return num_bytes;
180  } else {
182  "(id %" PRId64 ") http status error code: %s [%d]",
183  info->id(), header_line.c_str(), info->http_code());
184  if (((info->http_code() / 100) == 5) ||
185  (info->http_code() == 400) || (info->http_code() == 404))
186  {
187  // 5XX returned by host
188  // 400: error from the GeoAPI module
189  // 404: the stratum 1 does not have the newest files
191  } else if (info->http_code() == 429) {
192  // 429: rate throttling (we ignore the backoff hint for the time being)
194  } else {
195  info->SetErrorCode((info->proxy() == "DIRECT") ? kFailHostHttp :
197  }
198  return 0;
199  }
200  }
201 
202  // If needed: allocate space in sink
203  if (info->sink() != NULL && info->sink()->RequiresReserve() &&
204  HasPrefix(header_line, "CONTENT-LENGTH:", true))
205  {
206  char *tmp = reinterpret_cast<char *>(alloca(num_bytes+1));
207  uint64_t length = 0;
208  sscanf(header_line.c_str(), "%s %" PRIu64, tmp, &length);
209  if (length > 0) {
210  if (!info->sink()->Reserve(length)) {
211  LogCvmfs(kLogDownload, kLogDebug | kLogSyslogErr, "(id %" PRId64 ") "
212  "resource %s too large to store in memory (%" PRIu64 ")",
213  info->id(), info->url()->c_str(), length);
214  info->SetErrorCode(kFailTooBig);
215  return 0;
216  }
217  } else {
218  // Empty resource
219  info->sink()->Reserve(0);
220  }
221  } else if (HasPrefix(header_line, "LOCATION:", true)) {
222  // This comes along with redirects
223  LogCvmfs(kLogDownload, kLogDebug, "(id %" PRId64 ") %s",
224  info->id(), header_line.c_str());
225  } else if (HasPrefix(header_line, "LINK:", true)) {
226  // This is metalink info
227  LogCvmfs(kLogDownload, kLogDebug, "(id %" PRId64 ") %s",
228  info->id(), header_line.c_str());
229  std::string link = info->link();
230  if (link.size() != 0) {
231  // multiple LINK headers are allowed
232  link = link + ", " + header_line.substr(5);
233  } else {
234  link = header_line.substr(5);
235  }
236  info->SetLink(link);
237  } else if (HasPrefix(header_line, "X-SQUID-ERROR:", true)) {
238  // Reinterpret host error as proxy error
239  if (info->error_code() == kFailHostHttp) {
241  }
242  } else if (HasPrefix(header_line, "PROXY-STATUS:", true)) {
243  // Reinterpret host error as proxy error if applicable
244  if ((info->error_code() == kFailHostHttp) &&
245  (header_line.find("error=") != string::npos)) {
247  }
248  }
249 
250  return num_bytes;
251 }
252 
253 
257 static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb,
258  void *info_link)
259 {
260  const size_t num_bytes = size*nmemb;
261  JobInfo *info = static_cast<JobInfo *>(info_link);
262 
263  // TODO(heretherebedragons) remove if no error comes up
264  // as this means only jobinfo data request (and not header only)
265  // come here
266  assert(info->sink() != NULL);
267 
268  // LogCvmfs(kLogDownload, kLogDebug, "Data callback, %d bytes", num_bytes);
269 
270  if (num_bytes == 0)
271  return 0;
272 
273  if (info->expected_hash()) {
274  shash::Update(reinterpret_cast<unsigned char *>(ptr),
275  num_bytes, info->hash_context());
276  }
277 
278  if (info->compressed()) {
279  zlib::StreamStates retval =
280  zlib::DecompressZStream2Sink(ptr, static_cast<int64_t>(num_bytes),
281  info->GetZstreamPtr(), info->sink());
282  if (retval == zlib::kStreamDataError) {
284  "(id %" PRId64 ") failed to decompress %s",
285  info->id(), info->url()->c_str());
286  info->SetErrorCode(kFailBadData);
287  return 0;
288  } else if (retval == zlib::kStreamIOError) {
290  "(id %" PRId64 ") decompressing %s, local IO error",
291  info->id(), info->url()->c_str());
292  info->SetErrorCode(kFailLocalIO);
293  return 0;
294  }
295  } else {
296  int64_t written = info->sink()->Write(ptr, num_bytes);
297  if (written < 0 || static_cast<uint64_t>(written) != num_bytes) {
298  LogCvmfs(kLogDownload, kLogDebug, "(id %" PRId64 ") "
299  "Failed to perform write of %zu bytes to sink %s with errno %ld",
300  info->id(), num_bytes, info->sink()->Describe().c_str(), written);
301  }
302  }
303 
304  return num_bytes;
305 }
306 
307 #ifdef DEBUGMSG
308 static int CallbackCurlDebug(
309  CURL * handle,
310  curl_infotype type,
311  char *data,
312  size_t size,
313  void * /* clientp */)
314 {
315  JobInfo *info;
316  curl_easy_getinfo(handle, CURLINFO_PRIVATE, &info);
317 
318  std::string prefix = "(id " + StringifyInt(info->id()) + ") ";
319  switch (type) {
320  case CURLINFO_TEXT:
321  prefix += "{info} ";
322  break;
323  case CURLINFO_HEADER_IN:
324  prefix += "{header/recv} ";
325  break;
326  case CURLINFO_HEADER_OUT:
327  prefix += "{header/sent} ";
328  break;
329  case CURLINFO_DATA_IN:
330  if (size < 50) {
331  prefix += "{data/recv} ";
332  break;
333  } else {
334  LogCvmfs(kLogCurl, kLogDebug, "%s{data/recv} <snip>", prefix.c_str());
335  return 0;
336  }
337  case CURLINFO_DATA_OUT:
338  if (size < 50) {
339  prefix += "{data/sent} ";
340  break;
341  } else {
342  LogCvmfs(kLogCurl, kLogDebug, "%s{data/sent} <snip>", prefix.c_str());
343  return 0;
344  }
345  case CURLINFO_SSL_DATA_IN:
346  if (size < 50) {
347  prefix += "{ssldata/recv} ";
348  break;
349  } else {
350  LogCvmfs(kLogCurl, kLogDebug, "%s{ssldata/recv} <snip>",
351  prefix.c_str());
352  return 0;
353  }
354  case CURLINFO_SSL_DATA_OUT:
355  if (size < 50) {
356  prefix += "{ssldata/sent} ";
357  break;
358  } else {
359  LogCvmfs(kLogCurl, kLogDebug, "%s{ssldata/sent} <snip>",
360  prefix.c_str());
361  return 0;
362  }
363  default:
364  // just log the message
365  break;
366  }
367 
368  bool valid_char = true;
369  std::string msg(data, size);
370  for (size_t i = 0; i < msg.length(); ++i) {
371  if (msg[i] == '\0') {
372  msg[i] = '~';
373  }
374 
375  // verify that char is a valid printable char
376  if ((msg[i] < ' ' || msg[i] > '~')
377  && (msg[i] != 10 /*line feed*/
378  && msg[i] != 13 /*carriage return*/)) {
379  valid_char = false;
380  }
381  }
382 
383  if (!valid_char) {
384  msg = "<Non-plaintext sequence>";
385  }
386 
387  LogCvmfs(kLogCurl, kLogDebug, "%s%s",
388  prefix.c_str(), Trim(msg, true /* trim_newline */).c_str());
389  return 0;
390 }
391 #endif
392 
393 //------------------------------------------------------------------------------
394 
395 
396 const int DownloadManager::kProbeUnprobed = -1;
397 const int DownloadManager::kProbeDown = -2;
398 const int DownloadManager::kProbeGeo = -3;
399 
400 bool DownloadManager::EscapeUrlChar(unsigned char input, char output[3]) {
401  if (((input >= '0') && (input <= '9')) ||
402  ((input >= 'A') && (input <= 'Z')) ||
403  ((input >= 'a') && (input <= 'z')) ||
404  (input == '/') || (input == ':') || (input == '.') ||
405  (input == '@') ||
406  (input == '+') || (input == '-') ||
407  (input == '_') || (input == '~') ||
408  (input == '[') || (input == ']') || (input == ','))
409  {
410  output[0] = static_cast<char>(input);
411  return false;
412  }
413 
414  output[0] = '%';
415  output[1] = static_cast<char>(
416  (input / 16) + ((input / 16 <= 9) ? '0' : 'A'-10));
417  output[2] = static_cast<char>(
418  (input % 16) + ((input % 16 <= 9) ? '0' : 'A'-10));
419  return true;
420 }
421 
422 
427 string DownloadManager::EscapeUrl(const int64_t jobinfo_id, const string &url) {
428  string escaped;
429  escaped.reserve(url.length());
430 
431  char escaped_char[3];
432  for (unsigned i = 0, s = url.length(); i < s; ++i) {
433  if (EscapeUrlChar(url[i], escaped_char)) {
434  escaped.append(escaped_char, 3);
435  } else {
436  escaped.push_back(escaped_char[0]);
437  }
438  }
439  LogCvmfs(kLogDownload, kLogDebug, "(id %" PRId64 ") escaped %s to %s",
440  jobinfo_id, url.c_str(), escaped.c_str());
441 
442  return escaped;
443 }
444 
449 unsigned DownloadManager::EscapeHeader(const string &header,
450  char *escaped_buf,
451  size_t buf_size)
452 {
453  unsigned esc_pos = 0;
454  char escaped_char[3];
455  for (unsigned i = 0, s = header.size(); i < s; ++i) {
456  if (EscapeUrlChar(header[i], escaped_char)) {
457  for (unsigned j = 0; j < 3; ++j) {
458  if (escaped_buf) {
459  if (esc_pos >= buf_size)
460  return esc_pos;
461  escaped_buf[esc_pos] = escaped_char[j];
462  }
463  esc_pos++;
464  }
465  } else {
466  if (escaped_buf) {
467  if (esc_pos >= buf_size)
468  return esc_pos;
469  escaped_buf[esc_pos] = escaped_char[0];
470  }
471  esc_pos++;
472  }
473  }
474 
475  return esc_pos;
476 }
477 
481 int DownloadManager::ParseHttpCode(const char digits[3]) {
482  int result = 0;
483  int factor = 100;
484  for (int i = 0; i < 3; ++i) {
485  if ((digits[i] < '0') || (digits[i] > '9'))
486  return -1;
487  result += (digits[i] - '0') * factor;
488  factor /= 10;
489  }
490  return result;
491 }
492 
493 
497 int DownloadManager::CallbackCurlSocket(CURL * /* easy */,
498  curl_socket_t s,
499  int action,
500  void *userp,
501  void * /* socketp */)
502 {
503  // LogCvmfs(kLogDownload, kLogDebug, "CallbackCurlSocket called with easy "
504  // "handle %p, socket %d, action %d", easy, s, action);
505  DownloadManager *download_mgr = static_cast<DownloadManager *>(userp);
506  if (action == CURL_POLL_NONE)
507  return 0;
508 
509  // Find s in watch_fds_
510  unsigned index;
511 
512  // TODO(heretherebedragons) why start at index = 0 and not 2?
513  // fd[0] and fd[1] are fixed?
514  for (index = 0; index < download_mgr->watch_fds_inuse_; ++index) {
515  if (download_mgr->watch_fds_[index].fd == s)
516  break;
517  }
518  // Or create newly
519  if (index == download_mgr->watch_fds_inuse_) {
520  // Extend array if necessary
521  if (download_mgr->watch_fds_inuse_ == download_mgr->watch_fds_size_)
522  {
523  assert(download_mgr->watch_fds_size_ > 0);
524  download_mgr->watch_fds_size_ *= 2;
525  download_mgr->watch_fds_ = static_cast<struct pollfd *>(
526  srealloc(download_mgr->watch_fds_,
527  download_mgr->watch_fds_size_ * sizeof(struct pollfd)));
528  }
529  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].fd = s;
530  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].events = 0;
531  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].revents = 0;
532  download_mgr->watch_fds_inuse_++;
533  }
534 
535  switch (action) {
536  case CURL_POLL_IN:
537  download_mgr->watch_fds_[index].events = POLLIN | POLLPRI;
538  break;
539  case CURL_POLL_OUT:
540  download_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND;
541  break;
542  case CURL_POLL_INOUT:
543  download_mgr->watch_fds_[index].events =
544  POLLIN | POLLPRI | POLLOUT | POLLWRBAND;
545  break;
546  case CURL_POLL_REMOVE:
547  if (index < download_mgr->watch_fds_inuse_-1) {
548  download_mgr->watch_fds_[index] =
549  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_-1];
550  }
551  download_mgr->watch_fds_inuse_--;
552  // Shrink array if necessary
553  if ((download_mgr->watch_fds_inuse_ > download_mgr->watch_fds_max_) &&
554  (download_mgr->watch_fds_inuse_ < download_mgr->watch_fds_size_/2))
555  {
556  download_mgr->watch_fds_size_ /= 2;
557  // LogCvmfs(kLogDownload, kLogDebug, "shrinking watch_fds_ (%d)",
558  // watch_fds_size_);
559  download_mgr->watch_fds_ = static_cast<struct pollfd *>(
560  srealloc(download_mgr->watch_fds_,
561  download_mgr->watch_fds_size_*sizeof(struct pollfd)));
562  // LogCvmfs(kLogDownload, kLogDebug, "shrinking watch_fds_ done",
563  // watch_fds_size_);
564  }
565  break;
566  default:
567  break;
568  }
569 
570  return 0;
571 }
572 
573 
577 void *DownloadManager::MainDownload(void *data) {
578  DownloadManager *download_mgr = static_cast<DownloadManager *>(data);
580  "download I/O thread of DownloadManager '%s' started",
581  download_mgr->name_.c_str());
582 
583  const int kIdxPipeTerminate = 0;
584  const int kIdxPipeJobs = 1;
585 
586  download_mgr->watch_fds_ =
587  static_cast<struct pollfd *>(smalloc(2 * sizeof(struct pollfd)));
588  download_mgr->watch_fds_size_ = 2;
589  download_mgr->watch_fds_[kIdxPipeTerminate].fd =
590  download_mgr->pipe_terminate_->GetReadFd();
591  download_mgr->watch_fds_[kIdxPipeTerminate].events = POLLIN | POLLPRI;
592  download_mgr->watch_fds_[kIdxPipeTerminate].revents = 0;
593  download_mgr->watch_fds_[kIdxPipeJobs].fd =
594  download_mgr->pipe_jobs_->GetReadFd();
595  download_mgr->watch_fds_[kIdxPipeJobs].events = POLLIN | POLLPRI;
596  download_mgr->watch_fds_[kIdxPipeJobs].revents = 0;
597  download_mgr->watch_fds_inuse_ = 2;
598 
599  int still_running = 0;
600  struct timeval timeval_start, timeval_stop;
601  gettimeofday(&timeval_start, NULL);
602  while (true) {
603  int timeout;
604  if (still_running) {
605  /* NOTE: The following might degrade the performance for many small files
606  * use case. TODO(jblomer): look into it.
607  // Specify a timeout for polling in ms; this allows us to return
608  // to libcurl once a second so it can look for internal operations
609  // which timed out. libcurl has a more elaborate mechanism
610  // (CURLMOPT_TIMERFUNCTION) that would inform us of the next potential
611  // timeout. TODO(bbockelm) we should switch to that in the future.
612  timeout = 100;
613  */
614  timeout = 1;
615  } else {
616  timeout = -1;
617  gettimeofday(&timeval_stop, NULL);
618  int64_t delta = static_cast<int64_t>(
619  1000 * DiffTimeSeconds(timeval_start, timeval_stop));
620  perf::Xadd(download_mgr->counters_->sz_transfer_time, delta);
621  }
622  int retval = poll(download_mgr->watch_fds_, download_mgr->watch_fds_inuse_,
623  timeout);
624  if (retval < 0) {
625  continue;
626  }
627 
628  // Handle timeout
629  if (retval == 0) {
630  curl_multi_socket_action(download_mgr->curl_multi_,
631  CURL_SOCKET_TIMEOUT,
632  0,
633  &still_running);
634  }
635 
636  // Terminate I/O thread
637  if (download_mgr->watch_fds_[kIdxPipeTerminate].revents)
638  break;
639 
640  // New job arrives
641  if (download_mgr->watch_fds_[kIdxPipeJobs].revents) {
642  download_mgr->watch_fds_[kIdxPipeJobs].revents = 0;
643  JobInfo *info;
644  download_mgr->pipe_jobs_->Read<JobInfo*>(&info);
645  if (!still_running) {
646  gettimeofday(&timeval_start, NULL);
647  }
648  CURL *handle = download_mgr->AcquireCurlHandle();
649  download_mgr->InitializeRequest(info, handle);
650  download_mgr->SetUrlOptions(info);
651  curl_multi_add_handle(download_mgr->curl_multi_, handle);
652  curl_multi_socket_action(download_mgr->curl_multi_,
653  CURL_SOCKET_TIMEOUT,
654  0,
655  &still_running);
656  }
657 
658  // Activity on curl sockets
659  // Within this loop the curl_multi_socket_action() may cause socket(s)
660  // to be removed from watch_fds_. If a socket is removed it is replaced
661  // by the socket at the end of the array and the inuse count is decreased.
662  // Therefore loop over the array in reverse order.
663  for (int64_t i = download_mgr->watch_fds_inuse_-1; i >= 2; --i) {
664  if (i >= download_mgr->watch_fds_inuse_) {
665  continue;
666  }
667  if (download_mgr->watch_fds_[i].revents) {
668  int ev_bitmask = 0;
669  if (download_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI))
670  ev_bitmask |= CURL_CSELECT_IN;
671  if (download_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
672  ev_bitmask |= CURL_CSELECT_OUT;
673  if (download_mgr->watch_fds_[i].revents &
674  (POLLERR | POLLHUP | POLLNVAL))
675  {
676  ev_bitmask |= CURL_CSELECT_ERR;
677  }
678  download_mgr->watch_fds_[i].revents = 0;
679 
680  curl_multi_socket_action(download_mgr->curl_multi_,
681  download_mgr->watch_fds_[i].fd,
682  ev_bitmask,
683  &still_running);
684  }
685  }
686 
687  // Check if transfers are completed
688  CURLMsg *curl_msg;
689  int msgs_in_queue;
690  while ((curl_msg = curl_multi_info_read(download_mgr->curl_multi_,
691  &msgs_in_queue)))
692  {
693  if (curl_msg->msg == CURLMSG_DONE) {
694  perf::Inc(download_mgr->counters_->n_requests);
695  JobInfo *info;
696  CURL *easy_handle = curl_msg->easy_handle;
697  int curl_error = curl_msg->data.result;
698  curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
699 
700  int64_t redir_count;
701  curl_easy_getinfo(easy_handle, CURLINFO_REDIRECT_COUNT, &redir_count);
702  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s' - id %" PRId64 ") "
703  "Number of CURL redirects %" PRId64 ,
704  download_mgr->name_.c_str(), info->id(),
705  redir_count);
706 
707  curl_multi_remove_handle(download_mgr->curl_multi_, easy_handle);
708  if (download_mgr->VerifyAndFinalize(curl_error, info)) {
709  curl_multi_add_handle(download_mgr->curl_multi_, easy_handle);
710  curl_multi_socket_action(download_mgr->curl_multi_,
711  CURL_SOCKET_TIMEOUT,
712  0,
713  &still_running);
714  } else {
715  // Return easy handle into pool and write result back
716  download_mgr->ReleaseCurlHandle(easy_handle);
717 
719  info->GetDataTubePtr()->EnqueueBack(ele);
720  info->GetPipeJobResultPtr()->
721  Write<download::Failures>(info->error_code());
722  }
723  }
724  }
725  }
726 
727  for (set<CURL *>::iterator i = download_mgr->pool_handles_inuse_->begin(),
728  iEnd = download_mgr->pool_handles_inuse_->end(); i != iEnd; ++i)
729  {
730  curl_multi_remove_handle(download_mgr->curl_multi_, *i);
731  curl_easy_cleanup(*i);
732  }
733  download_mgr->pool_handles_inuse_->clear();
734  free(download_mgr->watch_fds_);
735 
737  "download I/O thread of DownloadManager '%s' terminated",
738  download_mgr->name_.c_str());
739  return NULL;
740 }
741 
742 
743 //------------------------------------------------------------------------------
744 
745 
746 HeaderLists::~HeaderLists() {
747  for (unsigned i = 0; i < blocks_.size(); ++i) {
748  delete[] blocks_[i];
749  }
750  blocks_.clear();
751 }
752 
753 
754 curl_slist *HeaderLists::GetList(const char *header) {
755  return Get(header);
756 }
757 
758 
759 curl_slist *HeaderLists::DuplicateList(curl_slist *slist) {
760  assert(slist);
761  curl_slist *copy = GetList(slist->data);
762  copy->next = slist->next;
763  curl_slist *prev = copy;
764  slist = slist->next;
765  while (slist) {
766  curl_slist *new_link = Get(slist->data);
767  new_link->next = slist->next;
768  prev->next = new_link;
769  prev = new_link;
770  slist = slist->next;
771  }
772  return copy;
773 }
774 
775 
776 void HeaderLists::AppendHeader(curl_slist *slist, const char *header) {
777  assert(slist);
778  curl_slist *new_link = Get(header);
779  new_link->next = NULL;
780 
781  while (slist->next)
782  slist = slist->next;
783  slist->next = new_link;
784 }
785 
786 
792 void HeaderLists::CutHeader(const char *header, curl_slist **slist) {
793  assert(slist);
794  curl_slist head;
795  head.next = *slist;
796  curl_slist *prev = &head;
797  curl_slist *rover = *slist;
798  while (rover) {
799  if (strcmp(rover->data, header) == 0) {
800  prev->next = rover->next;
801  Put(rover);
802  rover = prev;
803  }
804  prev = rover;
805  rover = rover->next;
806  }
807  *slist = head.next;
808 }
809 
810 
811 void HeaderLists::PutList(curl_slist *slist) {
812  while (slist) {
813  curl_slist *next = slist->next;
814  Put(slist);
815  slist = next;
816  }
817 }
818 
819 
820 string HeaderLists::Print(curl_slist *slist) {
821  string verbose;
822  while (slist) {
823  verbose += string(slist->data) + "\n";
824  slist = slist->next;
825  }
826  return verbose;
827 }
828 
829 
830 curl_slist *HeaderLists::Get(const char *header) {
831  for (unsigned i = 0; i < blocks_.size(); ++i) {
832  for (unsigned j = 0; j < kBlockSize; ++j) {
833  if (!IsUsed(&(blocks_[i][j]))) {
834  blocks_[i][j].data = const_cast<char *>(header);
835  return &(blocks_[i][j]);
836  }
837  }
838  }
839 
840  // All used, new block
841  AddBlock();
842  blocks_[blocks_.size()-1][0].data = const_cast<char *>(header);
843  return &(blocks_[blocks_.size()-1][0]);
844 }
845 
846 
847 void HeaderLists::Put(curl_slist *slist) {
848  slist->data = NULL;
849  slist->next = NULL;
850 }
851 
852 
853 void HeaderLists::AddBlock() {
854  curl_slist *new_block = new curl_slist[kBlockSize];
855  for (unsigned i = 0; i < kBlockSize; ++i) {
856  Put(&new_block[i]);
857  }
858  blocks_.push_back(new_block);
859 }
860 
861 
862 //------------------------------------------------------------------------------
863 
864 
865 string DownloadManager::ProxyInfo::Print() {
866  if (url == "DIRECT")
867  return url;
868 
869  string result = url;
870  int remaining =
871  static_cast<int>(host.deadline()) - static_cast<int>(time(NULL));
872  string expinfo = (remaining >= 0) ? "+" : "";
873  if (abs(remaining) >= 3600) {
874  expinfo += StringifyInt(remaining/3600) + "h";
875  } else if (abs(remaining) >= 60) {
876  expinfo += StringifyInt(remaining/60) + "m";
877  } else {
878  expinfo += StringifyInt(remaining) + "s";
879  }
880  if (host.status() == dns::kFailOk) {
881  result += " (" + host.name() + ", " + expinfo + ")";
882  } else {
883  result += " (:unresolved:, " + expinfo + ")";
884  }
885  return result;
886 }
887 
888 
893 CURL *DownloadManager::AcquireCurlHandle() {
894  CURL *handle;
895 
896  if (pool_handles_idle_->empty()) {
897  // Create a new handle
898  handle = curl_easy_init();
899  assert(handle != NULL);
900 
901  curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
902  // curl_easy_setopt(curl_default, CURLOPT_FAILONERROR, 1);
903  curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION, CallbackCurlHeader);
904  curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlData);
905  } else {
906  handle = *(pool_handles_idle_->begin());
907  pool_handles_idle_->erase(pool_handles_idle_->begin());
908  }
909 
910  pool_handles_inuse_->insert(handle);
911 
912  return handle;
913 }
914 
915 
916 void DownloadManager::ReleaseCurlHandle(CURL *handle) {
917  set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
918  assert(elem != pool_handles_inuse_->end());
919 
920  if (pool_handles_idle_->size() > pool_max_handles_) {
921  curl_easy_cleanup(*elem);
922  } else {
923  pool_handles_idle_->insert(*elem);
924  }
925 
926  pool_handles_inuse_->erase(elem);
927 }
928 
929 
934 void DownloadManager::InitializeRequest(JobInfo *info, CURL *handle) {
935  // Initialize internal download state
936  info->SetCurlHandle(handle);
937  info->SetErrorCode(kFailOk);
938  info->SetHttpCode(-1);
939  info->SetFollowRedirects(follow_redirects_);
940  info->SetNumUsedProxies(1);
941  info->SetNumUsedMetalinks(1);
942  info->SetNumUsedHosts(1);
943  info->SetNumRetries(0);
944  info->SetBackoffMs(0);
945  info->SetHeaders(header_lists_->DuplicateList(default_headers_));
946  if (info->info_header()) {
947  header_lists_->AppendHeader(info->headers(), info->info_header());
948  }
949  if (enable_http_tracing_) {
950  for (unsigned int i = 0; i < http_tracing_headers_.size(); i++) {
951  header_lists_->AppendHeader(info->headers(),
952  (http_tracing_headers_)[i].c_str());
953  }
954 
955  header_lists_->AppendHeader(info->headers(), info->tracing_header_pid());
956  header_lists_->AppendHeader(info->headers(), info->tracing_header_gid());
957  header_lists_->AppendHeader(info->headers(), info->tracing_header_uid());
958 
959  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s' - id %" PRId64 ") "
960  "CURL Header for URL: %s is:\n %s",
961  name_.c_str(), info->id(), info->url()->c_str(),
962  header_lists_->Print(info->headers()).c_str());
963  }
964 
965  if (info->force_nocache()) {
966  SetNocache(info);
967  } else {
968  info->SetNocache(false);
969  }
970  if (info->compressed()) {
972  }
973  if (info->expected_hash()) {
974  assert(info->hash_context().buffer != NULL);
975  shash::Init(info->hash_context());
976  }
977 
978  if ((info->range_offset() != -1) && (info->range_size())) {
979  char byte_range_array[100];
980  const int64_t range_lower = static_cast<int64_t>(info->range_offset());
981  const int64_t range_upper = static_cast<int64_t>(
982  info->range_offset() + info->range_size() - 1);
983  if (snprintf(byte_range_array, sizeof(byte_range_array),
984  "%" PRId64 "-%" PRId64,
985  range_lower, range_upper) == 100)
986  {
987  PANIC(NULL); // Should be impossible given limits on offset size.
988  }
989  curl_easy_setopt(handle, CURLOPT_RANGE, byte_range_array);
990  } else {
991  curl_easy_setopt(handle, CURLOPT_RANGE, NULL);
992  }
993 
994  // Set curl parameters
995  curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
996  curl_easy_setopt(handle, CURLOPT_WRITEHEADER,
997  static_cast<void *>(info));
998  curl_easy_setopt(handle, CURLOPT_WRITEDATA, static_cast<void *>(info));
999  curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->headers());
1000  if (info->head_request()) {
1001  curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
1002  } else {
1003  curl_easy_setopt(handle, CURLOPT_HTTPGET, 1);
1004  }
1005  if (opt_ipv4_only_) {
1006  curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
1007  }
1008  if (follow_redirects_) {
1009  curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1);
1010  curl_easy_setopt(handle, CURLOPT_MAXREDIRS, 4);
1011  }
1012 #ifdef DEBUGMSG
1013  curl_easy_setopt(handle, CURLOPT_VERBOSE, 1);
1014  curl_easy_setopt(handle, CURLOPT_DEBUGFUNCTION, CallbackCurlDebug);
1015 #endif
1016 }
1017 
1018 void DownloadManager::CheckHostInfoReset(
1019  const std::string &typ,
1020  HostInfo &info,
1021  JobInfo *jobinfo,
1022  time_t &now)
1023 {
1024  if (info.timestamp_backup > 0) {
1025  if (now == 0)
1026  now = time(NULL);
1027  if (static_cast<int64_t>(now) >
1028  static_cast<int64_t>(info.timestamp_backup + info.reset_after))
1029  {
1031  "(manager %s - id %" PRId64 ") "
1032  "switching %s from %s to %s (reset %s)", name_.c_str(),
1033  jobinfo->id(), typ.c_str(), (*info.chain)[info.current].c_str(),
1034  (*info.chain)[0].c_str(), typ.c_str());
1035  info.current = 0;
1036  info.timestamp_backup = 0;
1037  }
1038  }
1039 }
1040 
1041 
1046 void DownloadManager::SetUrlOptions(JobInfo *info) {
1047  CURL *curl_handle = info->curl_handle();
1048  string url_prefix;
1049  time_t now = 0;
1050 
1051  MutexLockGuard m(lock_options_);
1052 
1053  // sharding policy
1054  if (sharding_policy_.UseCount() > 0) {
1055  if (info->proxy() != "") {
1056  // proxy already set, so this is a failover event
1057  perf::Inc(counters_->n_proxy_failover);
1058  }
1059  info->SetProxy(sharding_policy_->GetNextProxy(info->url(), info->proxy(),
1060  info->range_offset() == -1 ? 0 : info->range_offset()));
1061 
1062  curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY, info->proxy().c_str());
1063  } else { // no sharding policy
1064  // Check if proxy group needs to be reset from backup to primary
1065  if (opt_timestamp_backup_proxies_ > 0) {
1066  now = time(NULL);
1067  if (static_cast<int64_t>(now) >
1068  static_cast<int64_t>(opt_timestamp_backup_proxies_ +
1069  opt_proxy_groups_reset_after_))
1070  {
1071  opt_proxy_groups_current_ = 0;
1072  opt_timestamp_backup_proxies_ = 0;
1073  RebalanceProxiesUnlocked("Reset proxy group from backup to primary");
1074  }
1075  }
1076  // Check if load-balanced proxies within the group need to be reset
1077  if (opt_timestamp_failover_proxies_ > 0) {
1078  if (now == 0)
1079  now = time(NULL);
1080  if (static_cast<int64_t>(now) >
1081  static_cast<int64_t>(opt_timestamp_failover_proxies_ +
1082  opt_proxy_groups_reset_after_))
1083  {
1084  RebalanceProxiesUnlocked(
1085  "Reset load-balanced proxies within the active group");
1086  }
1087  }
1088 
1089  ProxyInfo *proxy = ChooseProxyUnlocked(info->expected_hash());
1090  if (!proxy || (proxy->url == "DIRECT")) {
1091  info->SetProxy("DIRECT");
1092  curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY, "");
1093  } else {
1094  // Note: inside ValidateProxyIpsUnlocked() we may change the proxy data
1095  // structure, so we must not pass proxy->... (== current_proxy())
1096  // parameters directly
1097  std::string purl = proxy->url;
1098  dns::Host phost = proxy->host;
1099  const bool changed = ValidateProxyIpsUnlocked(purl, phost);
1100  // Current proxy may have changed
1101  if (changed) {
1102  proxy = ChooseProxyUnlocked(info->expected_hash());
1103  }
1104  info->SetProxy(proxy->url);
1105  if (proxy->host.status() == dns::kFailOk) {
1106  curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY,
1107  info->proxy().c_str());
1108  } else {
1109  // We know it can't work, don't even try to download
1110  curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY, "0.0.0.0");
1111  }
1112  }
1113  } // end !sharding
1114 
1115  // Check if metalink and host chains need to be reset
1116  CheckHostInfoReset("metalink", opt_metalink_, info, now);
1117  CheckHostInfoReset("host", opt_metalink_, info, now);
1118 
1119  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, opt_low_speed_limit_);
1120  if (info->proxy() != "DIRECT") {
1121  curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_proxy_);
1122  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_proxy_);
1123  } else {
1124  curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_direct_);
1125  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_direct_);
1126  }
1127  if (!opt_dns_server_.empty())
1128  curl_easy_setopt(curl_handle, CURLOPT_DNS_SERVERS, opt_dns_server_.c_str());
1129 
1130  if (info->probe_hosts()) {
1131  if (CheckMetalinkChain(now)) {
1132  url_prefix = (*opt_metalink_.chain)[opt_metalink_.current];
1133  info->SetCurrentMetalinkChainIndex(opt_metalink_.current);
1134  LogCvmfs(kLogDownload, kLogDebug, "(manager %s - id %" PRId64 ") "
1135  "reading from metalink %d",
1136  name_.c_str(), info->id(), opt_metalink_.current);
1137  } else if (opt_host_.chain) {
1138  url_prefix = (*opt_host_.chain)[opt_host_.current];
1139  info->SetCurrentHostChainIndex(opt_host_.current);
1140  LogCvmfs(kLogDownload, kLogDebug, "(manager %s - id %" PRId64 ") "
1141  "reading from host %d",
1142  name_.c_str(), info->id(), opt_host_.current);
1143  }
1144  }
1145 
1146  string url = url_prefix + *(info->url());
1147 
1148  curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 1L);
1149  if (url.substr(0, 5) == "https") {
1150  bool rvb = ssl_certificate_store_.ApplySslCertificatePath(curl_handle);
1151  if (!rvb) {
1153  "(manager %s - id %" PRId64 ") "
1154  "Failed to set SSL certificate path %s", name_.c_str(),
1155  info->id(), ssl_certificate_store_.GetCaPath().c_str());
1156  }
1157  if (info->pid() != -1) {
1158  if (credentials_attachment_ == NULL) {
1159  LogCvmfs(kLogDownload, kLogDebug, "(manager %s - id %" PRId64 ") "
1160  "uses secure downloads but no credentials attachment set",
1161  name_.c_str(), info->id());
1162  } else {
1163  bool retval = credentials_attachment_->ConfigureCurlHandle(
1164  curl_handle, info->pid(), info->GetCredDataPtr());
1165  if (!retval) {
1166  LogCvmfs(kLogDownload, kLogDebug, "(manager %s - id %" PRId64 ") "
1167  "failed attaching credentials",
1168  name_.c_str(), info->id());
1169  }
1170  }
1171  }
1172  // The download manager disables signal handling in the curl library;
1173  // as OpenSSL's implementation of TLS will generate a sigpipe in some
1174  // error paths, we must explicitly disable SIGPIPE here.
1175  // TODO(jblomer): it should be enough to do this once
1176  signal(SIGPIPE, SIG_IGN);
1177  }
1178 
1179  if (url.find("@proxy@") != string::npos) {
1180  // This is used in Geo-API requests (only), to replace a portion of the
1181  // URL with the current proxy name for the sake of caching the result.
1182  // Replace the @proxy@ either with a passed in "forced" template (which
1183  // is set from $CVMFS_PROXY_TEMPLATE) if there is one, or a "direct"
1184  // template (which is the uuid) if there's no proxy, or the name of the
1185  // proxy.
1186  string replacement;
1187  if (proxy_template_forced_ != "") {
1188  replacement = proxy_template_forced_;
1189  } else if (info->proxy() == "DIRECT") {
1190  replacement = proxy_template_direct_;
1191  } else {
1192  if (opt_proxy_groups_current_ >= opt_proxy_groups_fallback_) {
1193  // It doesn't make sense to use the fallback proxies in Geo-API requests
1194  // since the fallback proxies are supposed to get sorted, too.
1195  info->SetProxy("DIRECT");
1196  curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY, "");
1197  replacement = proxy_template_direct_;
1198  } else {
1199  replacement = ChooseProxyUnlocked(info->expected_hash())->host.name();
1200  }
1201  }
1202  replacement = (replacement == "") ? proxy_template_direct_ : replacement;
1203  LogCvmfs(kLogDownload, kLogDebug, "(manager %s - id %" PRId64 ") "
1204  "replacing @proxy@ by %s",
1205  name_.c_str(), info->id(), replacement.c_str());
1206  url = ReplaceAll(url, "@proxy@", replacement);
1207  }
1208 
1209  // TODO(heretherebedragons) before removing
1210  // static_cast<cvmfs::MemSink*>(info->sink)->size() == 0
1211  // and just always call info->sink->Reserve()
1212  // we should do a speed check
1213  if ((info->sink() != NULL) && info->sink()->RequiresReserve() &&
1214  (static_cast<cvmfs::MemSink*>(info->sink())->size() == 0) &&
1215  HasPrefix(url, "file://", false)) {
1216  platform_stat64 stat_buf;
1217  int retval = platform_stat(url.c_str(), &stat_buf);
1218  if (retval != 0) {
1219  // this is an error: file does not exist or out of memory
1220  // error is caught in other code section.
1221  info->sink()->Reserve(64ul * 1024ul);
1222  } else {
1223  info->sink()->Reserve(stat_buf.st_size);
1224  }
1225  }
1226 
1227  curl_easy_setopt(curl_handle, CURLOPT_URL,
1228  EscapeUrl(info->id(), url).c_str());
1229 }
1230 
1231 
1241 bool DownloadManager::ValidateProxyIpsUnlocked(
1242  const string &url,
1243  const dns::Host &host)
1244 {
1245  if (!host.IsExpired())
1246  return false;
1247  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s') validate DNS entry for %s",
1248  name_.c_str(), host.name().c_str());
1249 
1250  unsigned group_idx = opt_proxy_groups_current_;
1251  dns::Host new_host = resolver_->Resolve(host.name());
1252 
1253  bool update_only = true; // No changes to the list of IP addresses.
1254  if (new_host.status() != dns::kFailOk) {
1255  // Try again later in case resolving fails.
1257  "(manager '%s') failed to resolve IP addresses for %s (%d - %s)",
1258  name_.c_str(), host.name().c_str(), new_host.status(),
1259  dns::Code2Ascii(new_host.status()));
1260  new_host = dns::Host::ExtendDeadline(host, resolver_->min_ttl());
1261  } else if (!host.IsEquivalent(new_host)) {
1262  update_only = false;
1263  }
1264 
1265  if (update_only) {
1266  for (unsigned i = 0; i < (*opt_proxy_groups_)[group_idx].size(); ++i) {
1267  if ((*opt_proxy_groups_)[group_idx][i].host.id() == host.id())
1268  (*opt_proxy_groups_)[group_idx][i].host = new_host;
1269  }
1270  return false;
1271  }
1272 
1273  assert(new_host.status() == dns::kFailOk);
1274 
1275  // Remove old host objects, insert new objects, and rebalance.
1277  "(manager '%s') DNS entries for proxy %s changed, adjusting",
1278  name_.c_str(), host.name().c_str());
1279  vector<ProxyInfo> *group = current_proxy_group();
1280  opt_num_proxies_ -= group->size();
1281  for (unsigned i = 0; i < group->size(); ) {
1282  if ((*group)[i].host.id() == host.id()) {
1283  group->erase(group->begin() + i);
1284  } else {
1285  i++;
1286  }
1287  }
1288  vector<ProxyInfo> new_infos;
1289  set<string> best_addresses = new_host.ViewBestAddresses(opt_ip_preference_);
1290  set<string>::const_iterator iter_ips = best_addresses.begin();
1291  for (; iter_ips != best_addresses.end(); ++iter_ips) {
1292  string url_ip = dns::RewriteUrl(url, *iter_ips);
1293  new_infos.push_back(ProxyInfo(new_host, url_ip));
1294  }
1295  group->insert(group->end(), new_infos.begin(), new_infos.end());
1296  opt_num_proxies_ += new_infos.size();
1297 
1298  std::string msg = "DNS entries for proxy " + host.name() + " changed";
1299 
1300  RebalanceProxiesUnlocked(msg);
1301  return true;
1302 }
1303 
1304 
1308 void DownloadManager::UpdateStatistics(CURL *handle) {
1309  double val;
1310  int retval;
1311  int64_t sum = 0;
1312 
1313  retval = curl_easy_getinfo(handle, CURLINFO_SIZE_DOWNLOAD, &val);
1314  assert(retval == CURLE_OK);
1315  sum += static_cast<int64_t>(val);
1316  /*retval = curl_easy_getinfo(handle, CURLINFO_HEADER_SIZE, &val);
1317  assert(retval == CURLE_OK);
1318  sum += static_cast<int64_t>(val);*/
1319  perf::Xadd(counters_->sz_transferred_bytes, sum);
1320 }
1321 
1322 
1326 bool DownloadManager::CanRetry(const JobInfo *info) {
1327  MutexLockGuard m(lock_options_);
1328  unsigned max_retries = opt_max_retries_;
1329 
1330  return !(info->nocache()) && (info->num_retries() < max_retries) &&
1331  (IsProxyTransferError(info->error_code()) ||
1332  IsHostTransferError(info->error_code()));
1333 }
1334 
1342 void DownloadManager::Backoff(JobInfo *info) {
1343  unsigned backoff_init_ms = 0;
1344  unsigned backoff_max_ms = 0;
1345  {
1346  MutexLockGuard m(lock_options_);
1347  backoff_init_ms = opt_backoff_init_ms_;
1348  backoff_max_ms = opt_backoff_max_ms_;
1349  }
1350 
1351  info->SetNumRetries(info->num_retries() + 1);
1352  perf::Inc(counters_->n_retries);
1353  if (info->backoff_ms() == 0) {
1354  info->SetBackoffMs(prng_.Next(backoff_init_ms + 1)); // Must be != 0
1355  } else {
1356  info->SetBackoffMs(info->backoff_ms() * 2);
1357  }
1358  if (info->backoff_ms() > backoff_max_ms) {
1359  info->SetBackoffMs(backoff_max_ms);
1360  }
1361 
1363  "(manager '%s' - id %" PRId64 ") backing off for %d ms",
1364  name_.c_str(), info->id(), info->backoff_ms());
1365  SafeSleepMs(info->backoff_ms());
1366 }
1367 
1368 void DownloadManager::SetNocache(JobInfo *info) {
1369  if (info->nocache())
1370  return;
1371  header_lists_->AppendHeader(info->headers(), "Pragma: no-cache");
1372  header_lists_->AppendHeader(info->headers(), "Cache-Control: no-cache");
1373  curl_easy_setopt(info->curl_handle(), CURLOPT_HTTPHEADER, info->headers());
1374  info->SetNocache(true);
1375 }
1376 
1377 
1382 void DownloadManager::SetRegularCache(JobInfo *info) {
1383  if (info->nocache() == false)
1384  return;
1385  header_lists_->CutHeader("Pragma: no-cache", info->GetHeadersPtr());
1386  header_lists_->CutHeader("Cache-Control: no-cache", info->GetHeadersPtr());
1387  curl_easy_setopt(info->curl_handle(), CURLOPT_HTTPHEADER, info->headers());
1388  info->SetNocache(false);
1389 }
1390 
1391 
1395 void DownloadManager::ReleaseCredential(JobInfo *info) {
1396  if (info->cred_data()) {
1397  assert(credentials_attachment_ != NULL); // Someone must have set it
1398  credentials_attachment_->ReleaseCurlHandle(info->curl_handle(),
1399  info->cred_data());
1400  info->SetCredData(NULL);
1401  }
1402 }
1403 
1404 
1405 /* Sort links based on the "pri=" parameter */
1406 static bool sortlinks(const std::string &s1, const std::string &s2) {
1407  const size_t pos1 = s1.find("; pri=");
1408  const size_t pos2 = s2.find("; pri=");
1409  int pri1, pri2;
1410  if ((pos1 != std::string::npos) &&
1411  (pos2 != std::string::npos) &&
1412  (sscanf(s1.substr(pos1+6).c_str(), "%d", &pri1) == 1) &&
1413  (sscanf(s2.substr(pos2+6).c_str(), "%d", &pri2) == 1)) {
1414  return pri1 < pri2;
1415  }
1416  return false;
1417 }
1418 
1423 void DownloadManager::ProcessLink(JobInfo *info) {
1424 
1425  std::vector<std::string> links = SplitString(info->link(), ',');
1426  if (info->link().find("; pri=") != std::string::npos)
1427  std::sort(links.begin(), links.end(), sortlinks);
1428 
1429  std::vector<std::string> host_list;
1430 
1431  std::vector<std::string>::const_iterator il = links.begin();
1432  for (; il != links.end(); ++il) {
1433  const std::string &link = *il;
1434  if ((link.find("; rel=duplicate") == std::string::npos) &&
1435  (link.find("; rel=\"duplicate\"") == std::string::npos)) {
1437  "skipping link '%s' because it does not contain rel=duplicate",
1438  link.c_str());
1439  continue;
1440  }
1441  // ignore depth= field since there's nothing useful we can do with it
1442 
1443  size_t start = link.find('<');
1444  if (start == std::string::npos) {
1446  "skipping link '%s' because it does not have a left angle bracket",
1447  link.c_str());
1448  continue;
1449  }
1450 
1451  start++;
1452  if ((link.substr(start, 7) != "http://") &&
1453  (link.substr(start, 8) != "https://")) {
1455  "skipping link '%s' of unrecognized url protocol", link.c_str());
1456  continue;
1457  }
1458 
1459  size_t end = link.find('/', start+8);
1460  if (end == std::string::npos)
1461  end = link.find('>');
1462  if (end == std::string::npos) {
1464  "skipping link '%s' because no slash in url and no right angle bracket",
1465  link.c_str());
1466  continue;
1467  }
1468  const std::string host = link.substr(start, end-start);
1469  LogCvmfs(kLogDownload, kLogDebug, "adding linked host '%s'", host.c_str());
1470  host_list.push_back(host);
1471  }
1472 
1473  if (host_list.size() > 0) {
1474  SetHostChain(host_list);
1475  opt_metalink_timestamp_link_ = time(NULL);
1476  }
1477 }
1478 
1479 
1486 bool DownloadManager::VerifyAndFinalize(const int curl_error, JobInfo *info) {
1487  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s' - id %" PRId64 ") "
1488  "Verify downloaded url %s, proxy %s (curl error %d)",
1489  name_.c_str(), info->id(), info->url()->c_str(),
1490  info->proxy().c_str(), curl_error);
1491  UpdateStatistics(info->curl_handle());
1492 
1493  bool was_metalink;
1494  std::string typ;
1495  if (info->current_metalink_chain_index() >= 0) {
1496  was_metalink = true;
1497  typ = "metalink";
1498  if (info->link() != "") {
1499  // process Link header whether or not the redirected URL got an error
1500  ProcessLink(info);
1501  }
1502  } else {
1503  was_metalink = false;
1504  typ = "host";
1505  }
1506 
1507 
1508  // Verification and error classification
1509  switch (curl_error) {
1510  case CURLE_OK:
1511  // Verify content hash
1512  if (info->expected_hash()) {
1513  shash::Any match_hash;
1514  shash::Final(info->hash_context(), &match_hash);
1515  if (match_hash != *(info->expected_hash())) {
1516  if (ignore_signature_failures_) {
1518  "(manager '%s' - id %" PRId64 ") "
1519  "ignoring failed hash verification of %s (expected %s, got %s)",
1520  name_.c_str(), info->id(), info->url()->c_str(),
1521  info->expected_hash()->ToString().c_str(),
1522  match_hash.ToString().c_str());
1523  } else {
1524  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s' - id %" PRId64 ") "
1525  "hash verification of %s failed (expected %s, got %s)",
1526  name_.c_str(), info->id(), info->url()->c_str(),
1527  info->expected_hash()->ToString().c_str(),
1528  match_hash.ToString().c_str());
1529  info->SetErrorCode(kFailBadData);
1530  break;
1531  }
1532  }
1533  }
1534 
1535  info->SetErrorCode(kFailOk);
1536  break;
1537  case CURLE_UNSUPPORTED_PROTOCOL:
1539  break;
1540  case CURLE_URL_MALFORMAT:
1541  info->SetErrorCode(kFailBadUrl);
1542  break;
1543  case CURLE_COULDNT_RESOLVE_PROXY:
1545  break;
1546  case CURLE_COULDNT_RESOLVE_HOST:
1548  break;
1549  case CURLE_OPERATION_TIMEDOUT:
1550  info->SetErrorCode((info->proxy() == "DIRECT") ?
1552  break;
1553  case CURLE_PARTIAL_FILE:
1554  case CURLE_GOT_NOTHING:
1555  case CURLE_RECV_ERROR:
1556  info->SetErrorCode((info->proxy() == "DIRECT") ?
1558  break;
1559  case CURLE_FILE_COULDNT_READ_FILE:
1560  case CURLE_COULDNT_CONNECT:
1561  if (info->proxy() != "DIRECT") {
1562  // This is a guess. Fail-over can still change to switching host
1564  } else {
1566  }
1567  break;
1568  case CURLE_TOO_MANY_REDIRECTS:
1570  break;
1571  case CURLE_SSL_CACERT_BADFILE:
1573  "(manager '%s' -id %" PRId64 ") "
1574  "Failed to load certificate bundle. "
1575  "X509_CERT_BUNDLE might point to the wrong location.",
1576  name_.c_str(), info->id());
1578  break;
1579  // As of curl 7.62.0, CURLE_SSL_CACERT is the same as
1580  // CURLE_PEER_FAILED_VERIFICATION
1581  case CURLE_PEER_FAILED_VERIFICATION:
1583  "(manager '%s' - id %" PRId64 ") "
1584  "invalid SSL certificate of remote host. "
1585  "X509_CERT_DIR and/or X509_CERT_BUNDLE might point to the wrong "
1586  "location.", name_.c_str(), info->id());
1588  break;
1589  case CURLE_ABORTED_BY_CALLBACK:
1590  case CURLE_WRITE_ERROR:
1591  // Error set by callback
1592  break;
1593  case CURLE_SEND_ERROR:
1594  // The curl error CURLE_SEND_ERROR can be seen when a cache is misbehaving
1595  // and closing connections before the http request send is completed.
1596  // Handle this error, treating it as a short transfer error.
1597  info->SetErrorCode((info->proxy() == "DIRECT") ?
1598  kFailHostShortTransfer : kFailProxyShortTransfer);
1599  break;
1600  default:
1601  LogCvmfs(kLogDownload, kLogSyslogErr, "(manager '%s' - id %" PRId64 ") "
1602  "unexpected curl error (%d) while trying to fetch %s",
1603  name_.c_str(), info->id(), curl_error, info->url()->c_str());
1604  info->SetErrorCode(kFailOther);
1605  break;
1606  }
1607 
1608  std::vector<std::string> *host_chain;
1609  unsigned char num_used_hosts;
1610  if (was_metalink) {
1611  host_chain = opt_metalink_.chain;
1612  num_used_hosts = info->num_used_metalinks();
1613  } else {
1614  host_chain = opt_host_.chain;
1615  num_used_hosts = info->num_used_hosts();
1616  }
1617 
1618  // Determination if download should be repeated
1619  bool try_again = false;
1620  bool same_url_retry = CanRetry(info);
1621  if (info->error_code() != kFailOk) {
1622  MutexLockGuard m(lock_options_);
1623  if (info->error_code() == kFailBadData) {
1624  if (!info->nocache()) {
1625  try_again = true;
1626  } else {
1627  // Make it a host failure
1629  "(manager '%s' - id %" PRId64 ") "
1630  "data corruption with no-cache header, try another %s",
1631  name_.c_str(), info->id(), typ.c_str());
1632 
1633  info->SetErrorCode(kFailHostHttp);
1634  }
1635  }
1636  if ( same_url_retry || (
1637  ( (info->error_code() == kFailHostResolve) ||
1638  IsHostTransferError(info->error_code()) ||
1639  (info->error_code() == kFailHostHttp)) &&
1640  info->probe_hosts() &&
1641  host_chain && (num_used_hosts < host_chain->size()))
1642  )
1643  {
1644  try_again = true;
1645  }
1646  if ( same_url_retry || (
1647  ( (info->error_code() == kFailProxyResolve) ||
1648  IsProxyTransferError(info->error_code()) ||
1649  (info->error_code() == kFailProxyHttp)) )
1650  )
1651  {
1652  if (sharding_policy_.UseCount() > 0) { // sharding policy
1653  try_again = true;
1654  same_url_retry = false;
1655  } else { // no sharding
1656  try_again = true;
1657  // If all proxies failed, do a next round with the next host
1658  if (!same_url_retry && (info->num_used_proxies() >= opt_num_proxies_)) {
1659  // Check if this can be made a host fail-over
1660  if (info->probe_hosts() &&
1661  host_chain && (num_used_hosts < host_chain->size()))
1662  {
1663  // reset proxy group if not already performed by other handle
1664  if (opt_proxy_groups_) {
1665  if ((opt_proxy_groups_current_ > 0) ||
1666  (opt_proxy_groups_current_burned_ > 0))
1667  {
1668  opt_proxy_groups_current_ = 0;
1669  opt_timestamp_backup_proxies_ = 0;
1670  const std::string msg =
1671  "reset proxies for " + typ + " failover";
1672  RebalanceProxiesUnlocked(msg);
1673  }
1674  }
1675 
1676  // Make it a host failure
1678  "(manager '%s' - id %" PRId64 ") make it a %s failure",
1679  name_.c_str(), info->id(), typ.c_str());
1680  info->SetNumUsedProxies(1);
1682  } else {
1683  if (failover_indefinitely_) {
1684  // Instead of giving up, reset the num_used_proxies counter,
1685  // switch proxy and try again
1687  "(manager '%s' - id %" PRId64 ") "
1688  "VerifyAndFinalize() would fail the download here. "
1689  "Instead switch proxy and retry download. "
1690  "typ=%s "
1691  "info->probe_hosts=%d host_chain=%p num_used_hosts=%d "
1692  "host_chain->size()=%lu same_url_retry=%d "
1693  "info->num_used_proxies=%d opt_num_proxies_=%d",
1694  name_.c_str(), info->id(), typ.c_str(),
1695  static_cast<int>(info->probe_hosts()),
1696  host_chain, num_used_hosts,
1697  host_chain ?
1698  host_chain->size() : -1, static_cast<int>(same_url_retry),
1699  info->num_used_proxies(), opt_num_proxies_);
1700  info->SetNumUsedProxies(1);
1701  RebalanceProxiesUnlocked(
1702  "download failed - failover indefinitely");
1703  try_again = !Interrupted(fqrn_, info);
1704  } else {
1705  try_again = false;
1706  }
1707  }
1708  } // Make a proxy failure a host failure
1709  } // Proxy failure assumed
1710  } // end !sharding
1711  }
1712 
1713  if (try_again) {
1714  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s' - id %" PRId64 ") "
1715  "Trying again on same curl handle, same url: %d, "
1716  "error code %d no-cache %d",
1717  name_.c_str(), info->id(), same_url_retry,
1718  info->error_code(), info->nocache());
1719  // Reset internal state and destination
1720  if (info->sink() != NULL && info->sink()->Reset() != 0) {
1721  info->SetErrorCode(kFailLocalIO);
1722  goto verify_and_finalize_stop;
1723  }
1724  if (info->interrupt_cue() && info->interrupt_cue()->IsCanceled()) {
1725  info->SetErrorCode(kFailCanceled);
1726  goto verify_and_finalize_stop;
1727  }
1728 
1729  if (info->expected_hash()) {
1730  shash::Init(info->hash_context());
1731  }
1732  if (info->compressed()) {
1734  }
1735 
1736  if (sharding_policy_.UseCount() > 0) { // sharding policy
1737  ReleaseCredential(info);
1738  SetUrlOptions(info);
1739  } else { // no sharding policy
1740  SetRegularCache(info);
1741 
1742  // Failure handling
1743  bool switch_proxy = false;
1744  bool switch_host = false;
1745  switch (info->error_code()) {
1746  case kFailBadData:
1747  SetNocache(info);
1748  break;
1749  case kFailProxyResolve:
1750  case kFailProxyHttp:
1751  switch_proxy = true;
1752  break;
1753  case kFailHostResolve:
1754  case kFailHostHttp:
1755  case kFailHostAfterProxy:
1756  switch_host = true;
1757  break;
1758  default:
1759  if (IsProxyTransferError(info->error_code())) {
1760  if (same_url_retry) {
1761  Backoff(info);
1762  } else {
1763  switch_proxy = true;
1764  }
1765  } else if (IsHostTransferError(info->error_code())) {
1766  if (same_url_retry) {
1767  Backoff(info);
1768  } else {
1769  switch_host = true;
1770  }
1771  } else {
1772  // No other errors expected when retrying
1773  PANIC(NULL);
1774  }
1775  }
1776  if (switch_proxy) {
1777  ReleaseCredential(info);
1778  SwitchProxy(info);
1779  info->SetNumUsedProxies(info->num_used_proxies() + 1);
1780  SetUrlOptions(info);
1781  }
1782  if (switch_host) {
1783  ReleaseCredential(info);
1784  if (was_metalink) {
1785  SwitchMetalink(info);
1786  info->SetNumUsedMetalinks(num_used_hosts + 1);
1787  } else {
1788  SwitchHost(info);
1789  info->SetNumUsedHosts(num_used_hosts + 1);
1790  }
1791  SetUrlOptions(info);
1792  }
1793  } // end !sharding
1794 
1795  if (failover_indefinitely_) {
1796  // try again, breaking if there's a cvmfs reload happening and we are in a
1797  // proxy failover. This will EIO the call application.
1798  return !Interrupted(fqrn_, info);
1799  }
1800  return true; // try again
1801  }
1802 
1803  verify_and_finalize_stop:
1804  // Finalize, flush destination file
1805  ReleaseCredential(info);
1806  if (info->sink() != NULL && info->sink()->Flush() != 0) {
1807  info->SetErrorCode(kFailLocalIO);
1808  }
1809 
1810  if (info->compressed())
1812 
1813  if (info->headers()) {
1814  header_lists_->PutList(info->headers());
1815  info->SetHeaders(NULL);
1816  }
1817 
1818  return false; // stop transfer and return to Fetch()
1819 }
1820 
1821 DownloadManager::~DownloadManager() {
1822  // cleaned up fini
1823  if (sharding_policy_.UseCount() > 0) {
1824  sharding_policy_.Reset();
1825  }
1826  if (health_check_.UseCount() > 0) {
1827  if (health_check_.Unique()) {
1829  "(manager '%s') Stopping healthcheck thread", name_.c_str());
1830  health_check_->StopHealthcheck();
1831  }
1832  health_check_.Reset();
1833  }
1834 
1835  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1836  // Shutdown I/O thread
1837  pipe_terminate_->Write(kPipeTerminateSignal);
1838  pthread_join(thread_download_, NULL);
1839  // All handles are removed from the multi stack
1840  pipe_terminate_.Destroy();
1841  pipe_jobs_.Destroy();
1842  }
1843 
1844  for (set<CURL *>::iterator i = pool_handles_idle_->begin(),
1845  iEnd = pool_handles_idle_->end(); i != iEnd; ++i)
1846  {
1847  curl_easy_cleanup(*i);
1848  }
1849 
1850  delete pool_handles_idle_;
1851  delete pool_handles_inuse_;
1852  curl_multi_cleanup(curl_multi_);
1853 
1854  delete header_lists_;
1855  if (user_agent_)
1856  free(user_agent_);
1857 
1858  delete counters_;
1859  delete opt_host_.chain;
1860  delete opt_host_chain_rtt_;
1861  delete opt_proxy_groups_;
1862 
1863  curl_global_cleanup();
1864  delete resolver_;
1865 
1866  // old destructor
1867  pthread_mutex_destroy(lock_options_);
1868  pthread_mutex_destroy(lock_synchronous_mode_);
1869  free(lock_options_);
1870  free(lock_synchronous_mode_);
1871 }
1872 
1873 void DownloadManager::InitHeaders() {
1874  // User-Agent
1875  string cernvm_id = "User-Agent: cvmfs ";
1876 #ifdef CVMFS_LIBCVMFS
1877  cernvm_id += "libcvmfs ";
1878 #else
1879  cernvm_id += "Fuse ";
1880 #endif
1881  cernvm_id += string(CVMFS_VERSION);
1882  if (getenv("CERNVM_UUID") != NULL) {
1883  cernvm_id += " " +
1884  sanitizer::InputSanitizer("az AZ 09 -").Filter(getenv("CERNVM_UUID"));
1885  }
1886  user_agent_ = strdup(cernvm_id.c_str());
1887 
1888  header_lists_ = new HeaderLists();
1889 
1890  default_headers_ = header_lists_->GetList("Connection: Keep-Alive");
1891  header_lists_->AppendHeader(default_headers_, "Pragma:");
1892  header_lists_->AppendHeader(default_headers_, user_agent_);
1893 }
1894 
1895 DownloadManager::DownloadManager(const unsigned max_pool_handles,
1896  const perf::StatisticsTemplate &statistics,
1897  const std::string &name) :
1898  prng_(Prng()),
1899  pool_handles_idle_(new set<CURL *>),
1900  pool_handles_inuse_(new set<CURL *>),
1901  pool_max_handles_(max_pool_handles),
1902  pipe_terminate_(NULL),
1903  pipe_jobs_(NULL),
1904  watch_fds_(NULL),
1905  watch_fds_size_(0),
1906  watch_fds_inuse_(0),
1907  watch_fds_max_(4 * max_pool_handles),
1908  opt_timeout_proxy_(5),
1909  opt_timeout_direct_(10),
1910  opt_low_speed_limit_(1024),
1911  opt_max_retries_(0),
1912  opt_backoff_init_ms_(0),
1913  opt_backoff_max_ms_(0),
1914  enable_info_header_(false),
1915  opt_ipv4_only_(false),
1916  follow_redirects_(false),
1917  ignore_signature_failures_(false),
1918  enable_http_tracing_(false),
1919  opt_metalink_(NULL, 0, 0, 0),
1920  opt_metalink_timestamp_link_(0),
1921  opt_host_(NULL, 0, 0, 0),
1922  opt_host_chain_rtt_(NULL),
1923  opt_proxy_groups_(NULL),
1924  opt_proxy_groups_current_(0),
1925  opt_proxy_groups_current_burned_(0),
1926  opt_proxy_groups_fallback_(0),
1927  opt_num_proxies_(0),
1928  opt_proxy_shard_(false),
1929  failover_indefinitely_(false),
1930  name_(name),
1931  opt_ip_preference_(dns::kIpPreferSystem),
1932  opt_timestamp_backup_proxies_(0),
1933  opt_timestamp_failover_proxies_(0),
1934  opt_proxy_groups_reset_after_(0),
1935  credentials_attachment_(NULL),
1936  counters_(new Counters(statistics))
1937 {
1938  atomic_init32(&multi_threaded_);
1939 
1940  lock_options_ =
1941  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1942  int retval = pthread_mutex_init(lock_options_, NULL);
1943  assert(retval == 0);
1945  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1946  retval = pthread_mutex_init(lock_synchronous_mode_, NULL);
1947  assert(retval == 0);
1948 
1949  retval = curl_global_init(CURL_GLOBAL_ALL);
1950  assert(retval == CURLE_OK);
1951 
1952  InitHeaders();
1953 
1954  curl_multi_ = curl_multi_init();
1955  assert(curl_multi_ != NULL);
1956  curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION, CallbackCurlSocket);
1957  curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1958  static_cast<void *>(this));
1959  curl_multi_setopt(curl_multi_, CURLMOPT_MAXCONNECTS, watch_fds_max_);
1960  curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1962 
1963  prng_.InitLocaltime();
1964 
1965  // Name resolving
1966  if ((getenv("CVMFS_IPV4_ONLY") != NULL) &&
1967  (strlen(getenv("CVMFS_IPV4_ONLY")) > 0))
1968  {
1969  opt_ipv4_only_ = true;
1970  }
1973  assert(resolver_);
1974 }
1975 
1983 
1984  int retval = pthread_create(&thread_download_, NULL, MainDownload,
1985  static_cast<void *>(this));
1986  assert(retval == 0);
1987 
1988  atomic_inc32(&multi_threaded_);
1989 
1990  if (health_check_.UseCount() > 0) {
1992  "(manager '%s') Starting healthcheck thread", name_.c_str());
1993  health_check_->StartHealthcheck();
1994  }
1995 }
1996 
1997 
2002  assert(info != NULL);
2003  assert(info->url() != NULL);
2004 
2005  Failures result;
2006  result = PrepareDownloadDestination(info);
2007  if (result != kFailOk)
2008  return result;
2009 
2010  if (info->expected_hash()) {
2013  info->GetHashContextPtr()->size = shash::GetContextSize(algorithm);
2014  info->GetHashContextPtr()->buffer = alloca(info->hash_context().size);
2015  }
2016 
2017  // In case JobInfo object is being reused
2018  info->SetLink("");
2019 
2020  // Prepare cvmfs-info: header, allocate string on the stack
2021  info->SetInfoHeader(NULL);
2022  if (enable_info_header_ && info->extra_info()) {
2023  const char *header_name = "cvmfs-info: ";
2024  const size_t header_name_len = strlen(header_name);
2025  const unsigned header_size = 1 + header_name_len +
2026  EscapeHeader(*(info->extra_info()), NULL, 0);
2027  info->SetInfoHeader(static_cast<char *>(alloca(header_size)));
2028  memcpy(info->info_header(), header_name, header_name_len);
2029  EscapeHeader(*(info->extra_info()), info->info_header() + header_name_len,
2030  header_size - header_name_len);
2031  info->info_header()[header_size-1] = '\0';
2032  }
2033 
2034  if (enable_http_tracing_) {
2035  const std::string str_pid = "X-CVMFS-PID: " + StringifyInt(info->pid());
2036  const std::string str_gid = "X-CVMFS-GID: " + StringifyUint(info->gid());
2037  const std::string str_uid = "X-CVMFS-UID: " + StringifyUint(info->uid());
2038 
2039  // will be auto freed at the end of this function Fetch(JobInfo *info)
2040  info->SetTracingHeaderPid(static_cast<char *>(alloca(str_pid.size() + 1)));
2041  info->SetTracingHeaderGid(static_cast<char *>(alloca(str_gid.size() + 1)));
2042  info->SetTracingHeaderUid(static_cast<char *>(alloca(str_uid.size() + 1)));
2043 
2044  memcpy(info->tracing_header_pid(), str_pid.c_str(), str_pid.size() + 1);
2045  memcpy(info->tracing_header_gid(), str_gid.c_str(), str_gid.size() + 1);
2046  memcpy(info->tracing_header_uid(), str_uid.c_str(), str_uid.size() + 1);
2047  }
2048 
2049  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
2050  if (!info->IsValidPipeJobResults()) {
2051  info->CreatePipeJobResults();
2052  }
2053  if (!info->IsValidDataTube()) {
2054  info->CreateDataTube();
2055  }
2056 
2057  // LogCvmfs(kLogDownload, kLogDebug, "send job to thread, pipe %d %d",
2058  // info->wait_at[0], info->wait_at[1]);
2059  pipe_jobs_->Write<JobInfo*>(info);
2060 
2061  do {
2062  DataTubeElement* ele = info->GetDataTubePtr()->PopFront();
2063 
2064  if (ele->action == kActionStop) {
2065  delete ele;
2066  break;
2067  }
2068  // TODO(heretherebedragons) add compression
2069  } while (true);
2070 
2071  info->GetPipeJobResultPtr()->Read<download::Failures>(&result);
2072  // LogCvmfs(kLogDownload, kLogDebug, "got result %d", result);
2073  } else {
2075  CURL *handle = AcquireCurlHandle();
2076  InitializeRequest(info, handle);
2077  SetUrlOptions(info);
2078  // curl_easy_setopt(handle, CURLOPT_VERBOSE, 1);
2079  int retval;
2080  do {
2081  retval = curl_easy_perform(handle);
2083  double elapsed;
2084  if (curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &elapsed) == CURLE_OK)
2085  {
2087  static_cast<int64_t>(elapsed * 1000));
2088  }
2089  } while (VerifyAndFinalize(retval, info));
2090  result = info->error_code();
2091  ReleaseCurlHandle(info->curl_handle());
2092  }
2093 
2094  if (result != kFailOk) {
2095  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s' - id %" PRId64 ") "
2096  "download failed (error %d - %s)",
2097  name_.c_str(),
2098  info->id(), result, Code2Ascii(result));
2099 
2100  if (info->sink() != NULL) {
2101  info->sink()->Purge();
2102  }
2103  }
2104 
2105  return result;
2106 }
2107 
2108 
2116 }
2117 
2121 std::string DownloadManager::GetDnsServer() const {
2122  return opt_dns_server_;
2123 }
2124 
2129 void DownloadManager::SetDnsServer(const string &address) {
2130  if (!address.empty()) {
2132  opt_dns_server_ = address;
2133  assert(!opt_dns_server_.empty());
2134 
2135  vector<string> servers;
2136  servers.push_back(address);
2137  bool retval = resolver_->SetResolvers(servers);
2138  assert(retval);
2139  }
2140  LogCvmfs(kLogDownload, kLogSyslog, "(manager '%s') set nameserver to %s",
2141  name_.c_str(), address.c_str());
2142 }
2143 
2144 
2149  const unsigned retries,
2150  const unsigned timeout_ms)
2151 {
2153  if ((resolver_->retries() == retries) &&
2154  (resolver_->timeout_ms() == timeout_ms))
2155  {
2156  return;
2157  }
2158  delete resolver_;
2159  resolver_ = NULL;
2160  resolver_ =
2161  dns::NormalResolver::Create(opt_ipv4_only_, retries, timeout_ms);
2162  assert(resolver_);
2163 }
2164 
2165 
2167  const unsigned min_seconds,
2168  const unsigned max_seconds)
2169 {
2171  resolver_->set_min_ttl(min_seconds);
2172  resolver_->set_max_ttl(max_seconds);
2173 }
2174 
2175 
2178  opt_ip_preference_ = preference;
2179 }
2180 
2181 
2187 void DownloadManager::SetTimeout(const unsigned seconds_proxy,
2188  const unsigned seconds_direct)
2189 {
2191  opt_timeout_proxy_ = seconds_proxy;
2192  opt_timeout_direct_ = seconds_direct;
2193 }
2194 
2195 
2201 void DownloadManager::SetLowSpeedLimit(const unsigned low_speed_limit) {
2203  opt_low_speed_limit_ = low_speed_limit;
2204 }
2205 
2206 
2210 void DownloadManager::GetTimeout(unsigned *seconds_proxy,
2211  unsigned *seconds_direct)
2212 {
2214  *seconds_proxy = opt_timeout_proxy_;
2215  *seconds_direct = opt_timeout_direct_;
2216 }
2217 
2218 
2223 void DownloadManager::SetMetalinkChain(const string &metalink_list) {
2224  SetMetalinkChain(SplitString(metalink_list, ';'));
2225 }
2226 
2227 
2229  const std::vector<std::string> &metalink_list) {
2230  const MutexLockGuard m(lock_options_);
2232  delete opt_metalink_.chain;
2233  opt_metalink_.current = 0;
2234 
2235  if (metalink_list.empty()) {
2236  opt_metalink_.chain = NULL;
2237  return;
2238  }
2239 
2240  opt_metalink_.chain = new vector<string>(metalink_list);
2241 }
2242 
2243 
2248 void DownloadManager::GetMetalinkInfo(vector<string> *metalink_chain,
2249  unsigned *current_metalink)
2250 {
2251  const MutexLockGuard m(lock_options_);
2252  if (opt_metalink_.chain) {
2253  if (current_metalink) {*current_metalink = opt_metalink_.current;}
2254  if (metalink_chain) {*metalink_chain = *opt_metalink_.chain;}
2255  }
2256 }
2257 
2258 
2263 void DownloadManager::SetHostChain(const string &host_list) {
2264  SetHostChain(SplitString(host_list, ';'));
2265 }
2266 
2267 
2268 void DownloadManager::SetHostChain(const std::vector<std::string> &host_list) {
2271  delete opt_host_.chain;
2272  delete opt_host_chain_rtt_;
2273  opt_host_.current = 0;
2274 
2275  if (host_list.empty()) {
2276  opt_host_.chain = NULL;
2277  opt_host_chain_rtt_ = NULL;
2278  return;
2279  }
2280 
2281  opt_host_.chain = new vector<string>(host_list);
2283  new vector<int>(opt_host_.chain->size(), kProbeUnprobed);
2284  // LogCvmfs(kLogDownload, kLogSyslog, "using host %s",
2285  // (*opt_host_.chain)[0].c_str());
2286 }
2287 
2288 
2293 void DownloadManager::GetHostInfo(vector<string> *host_chain, vector<int> *rtt,
2294  unsigned *current_host)
2295 {
2297  if (opt_host_.chain) {
2298  if (current_host) {*current_host = opt_host_.current;}
2299  if (host_chain) {*host_chain = *opt_host_.chain;}
2300  if (rtt) {*rtt = *opt_host_chain_rtt_;}
2301  }
2302 }
2303 
2304 
2315 
2316  if (!opt_proxy_groups_) {
2317  return;
2318  }
2319 
2320  // Fail any matching proxies within the current load-balancing group
2321  vector<ProxyInfo> *group = current_proxy_group();
2322  const unsigned group_size = group->size();
2323  unsigned failed = 0;
2324  for (unsigned i = 0; i < group_size - opt_proxy_groups_current_burned_; ++i) {
2325  if (info && (info->proxy() == (*group)[i].url)) {
2326  // Move to list of failed proxies
2327  opt_proxy_groups_current_burned_++;
2328  swap((*group)[i],
2329  (*group)[group_size - opt_proxy_groups_current_burned_]);
2331  failed++;
2332  }
2333  }
2334 
2335  // Do nothing more unless at least one proxy was marked as failed
2336  if (!failed)
2337  return;
2338 
2339  // If all proxies from the current load-balancing group are burned, switch to
2340  // another group
2341  if (opt_proxy_groups_current_burned_ == group->size()) {
2342  opt_proxy_groups_current_burned_ = 0;
2343  if (opt_proxy_groups_->size() > 1) {
2345  opt_proxy_groups_->size();
2346  // Remember the timestamp of switching to backup proxies
2348  if (opt_proxy_groups_current_ > 0) {
2350  opt_timestamp_backup_proxies_ = time(NULL);
2351  // LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2352  // "switched to (another) backup proxy group");
2353  } else {
2355  // LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2356  // "switched back to primary proxy group");
2357  }
2359  }
2360  }
2361  } else {
2362  // Record failover time
2365  opt_timestamp_failover_proxies_ = time(NULL);
2366  }
2367  }
2368 
2369  UpdateProxiesUnlocked("failed proxy");
2370  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s' - id %" PRId64 ") "
2371  "%lu proxies remain in group", name_.c_str(), info->id(),
2373 }
2374 
2375 
2381 void DownloadManager::SwitchHostInfo(const std::string &typ,
2382  HostInfo &info,
2383  JobInfo *jobinfo) {
2385 
2386  if (!info.chain || (info.chain->size() == 1)) {
2387  return;
2388  }
2389 
2390  if (jobinfo) {
2391  int lastused;
2392  if (typ == "host") {
2393  lastused = jobinfo->current_host_chain_index();
2394  } else {
2395  lastused = jobinfo->current_metalink_chain_index();
2396  }
2397  if (lastused != info.current) {
2399  "(manager '%s' - id %" PRId64 ")"
2400  "don't switch %s, "
2401  "last used %s: %s, current %s: %s",
2402  name_.c_str(), jobinfo->id(), typ.c_str(),
2403  typ.c_str(), (*info.chain)[lastused].c_str(),
2404  typ.c_str(), (*info.chain)[info.current].c_str());
2405  return;
2406  }
2407  }
2408 
2409  string reason = "manually triggered";
2410  string info_id = "(manager " + name_;
2411  if (jobinfo) {
2412  reason = download::Code2Ascii(jobinfo->error_code());
2413  info_id = " - id " + StringifyInt(jobinfo->id());
2414  }
2415  info_id += ")";
2416 
2417  const std::string old_host = (*info.chain)[info.current];
2418  info.current = (info.current + 1) % static_cast<int>(info.chain->size());
2419  if (typ == "host") {
2421  } else {
2423  }
2425  "%s switching %s from %s to %s (%s)", info_id.c_str(), typ.c_str(),
2426  old_host.c_str(), (*info.chain)[info.current].c_str(),
2427  reason.c_str());
2428 
2429  // Remember the timestamp of switching to backup host
2430  if (info.reset_after > 0) {
2431  if (info.current != 0) {
2432  if (info.timestamp_backup == 0)
2433  info.timestamp_backup = time(NULL);
2434  } else {
2435  info.timestamp_backup = 0;
2436  }
2437  }
2438 }
2439 
2441  SwitchHostInfo("host", opt_host_, info);
2442 }
2443 
2445  SwitchHost(NULL);
2446 }
2447 
2448 
2450  SwitchHostInfo("metalink", opt_metalink_, info);
2451 }
2452 
2453 
2455  SwitchMetalink(NULL);
2456 }
2457 
2459  return (opt_metalink_.chain &&
2460  ((opt_metalink_timestamp_link_ == 0) ||
2461  (static_cast<int64_t>((now == 0) ? time(NULL) : now) >
2462  static_cast<int64_t>(opt_metalink_timestamp_link_ +
2464 }
2465 
2466 
2474  vector<string> host_chain;
2475  vector<int> host_rtt;
2476  unsigned current_host;
2477 
2478  GetHostInfo(&host_chain, &host_rtt, &current_host);
2479 
2480  // Stopwatch, two times to fill caches first
2481  unsigned i, retries;
2482  string url;
2483 
2484  cvmfs::MemSink memsink;
2485  JobInfo info(&url, false, false, NULL, &memsink);
2486  for (retries = 0; retries < 2; ++retries) {
2487  for (i = 0; i < host_chain.size(); ++i) {
2488  url = host_chain[i] + "/.cvmfspublished";
2489 
2490  struct timeval tv_start, tv_end;
2491  gettimeofday(&tv_start, NULL);
2492  Failures result = Fetch(&info);
2493  gettimeofday(&tv_end, NULL);
2494  memsink.Reset();
2495  if (result == kFailOk) {
2496  host_rtt[i] = static_cast<int>(
2497  DiffTimeSeconds(tv_start, tv_end) * 1000);
2498  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s' - id %" PRId64 ") "
2499  "probing host %s had %dms rtt",
2500  name_.c_str(), info.id(),
2501  url.c_str(), host_rtt[i]);
2502  } else {
2503  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s' - id %" PRId64 ") "
2504  "error while probing host %s: %d %s",
2505  name_.c_str(), info.id(),
2506  url.c_str(), result, Code2Ascii(result));
2507  host_rtt[i] = INT_MAX;
2508  }
2509  }
2510  }
2511 
2512  SortTeam(&host_rtt, &host_chain);
2513  for (i = 0; i < host_chain.size(); ++i) {
2514  if (host_rtt[i] == INT_MAX) host_rtt[i] = kProbeDown;
2515  }
2516 
2518  delete opt_host_.chain;
2519  delete opt_host_chain_rtt_;
2520  opt_host_.chain = new vector<string>(host_chain);
2521  opt_host_chain_rtt_ = new vector<int>(host_rtt);
2522  opt_host_.current = 0;
2523 }
2524 
2525 bool DownloadManager::GeoSortServers(std::vector<std::string> *servers,
2526  std::vector<uint64_t> *output_order) {
2527  if (!servers) {return false;}
2528  if (servers->size() == 1) {
2529  if (output_order) {
2530  output_order->clear();
2531  output_order->push_back(0);
2532  }
2533  return true;
2534  }
2535 
2536  std::vector<std::string> host_chain;
2537  GetHostInfo(&host_chain, NULL, NULL);
2538 
2539  std::vector<std::string> server_dns_names;
2540  server_dns_names.reserve(servers->size());
2541  for (unsigned i = 0; i < servers->size(); ++i) {
2542  std::string host = dns::ExtractHost((*servers)[i]);
2543  server_dns_names.push_back(host.empty() ? (*servers)[i] : host);
2544  }
2545  std::string host_list = JoinStrings(server_dns_names, ",");
2546 
2547  vector<string> host_chain_shuffled;
2548  {
2549  // Protect against concurrent access to prng_
2551  // Determine random hosts for the Geo-API query
2552  host_chain_shuffled = Shuffle(host_chain, &prng_);
2553  }
2554  // Request ordered list via Geo-API
2555  bool success = false;
2556  unsigned max_attempts = std::min(host_chain_shuffled.size(), size_t(3));
2557  vector<uint64_t> geo_order(servers->size());
2558  for (unsigned i = 0; i < max_attempts; ++i) {
2559  string url = host_chain_shuffled[i] + "/api/v1.0/geo/@proxy@/" + host_list;
2561  "(manager '%s') requesting ordered server list from %s",
2562  name_.c_str(), url.c_str());
2563  cvmfs::MemSink memsink;
2564  JobInfo info(&url, false, false, NULL, &memsink);
2565  Failures result = Fetch(&info);
2566  if (result == kFailOk) {
2567  string order(reinterpret_cast<char*>(memsink.data()), memsink.pos());
2568  memsink.Reset();
2569  bool retval = ValidateGeoReply(order, servers->size(), &geo_order);
2570  if (!retval) {
2572  "(manager '%s') retrieved invalid GeoAPI reply from %s [%s]",
2573  name_.c_str(), url.c_str(), order.c_str());
2574  } else {
2575  LogCvmfs(kLogDownload, kLogDebug | kLogSyslog, "(manager '%s') "
2576  "geographic order of servers retrieved from %s",
2577  name_.c_str(),
2578  dns::ExtractHost(host_chain_shuffled[i]).c_str());
2579  // remove new line at end of "order"
2580  LogCvmfs(kLogDownload, kLogDebug, "order is %s",
2581  Trim(order, true /* trim_newline */).c_str());
2582  success = true;
2583  break;
2584  }
2585  } else {
2587  "(manager '%s') GeoAPI request for %s failed with error %d [%s]",
2588  name_.c_str(), url.c_str(), result, Code2Ascii(result));
2589  }
2590  }
2591  if (!success) {
2592  LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn, "(manager '%s') "
2593  "failed to retrieve geographic order from stratum 1 servers",
2594  name_.c_str());
2595  return false;
2596  }
2597 
2598  if (output_order) {
2599  output_order->swap(geo_order);
2600  } else {
2601  std::vector<std::string> sorted_servers;
2602  sorted_servers.reserve(geo_order.size());
2603  for (unsigned i = 0; i < geo_order.size(); ++i) {
2604  uint64_t orderval = geo_order[i];
2605  sorted_servers.push_back((*servers)[orderval]);
2606  }
2607  servers->swap(sorted_servers);
2608  }
2609  return true;
2610 }
2611 
2612 
2621  vector<string> host_chain;
2622  vector<int> host_rtt;
2623  unsigned current_host;
2624  vector< vector<ProxyInfo> > proxy_chain;
2625  unsigned fallback_group;
2626 
2627  GetHostInfo(&host_chain, &host_rtt, &current_host);
2628  GetProxyInfo(&proxy_chain, NULL, &fallback_group);
2629  if ((host_chain.size() < 2) && ((proxy_chain.size() - fallback_group) < 2))
2630  return true;
2631 
2632  vector<string> host_names;
2633  for (unsigned i = 0; i < host_chain.size(); ++i)
2634  host_names.push_back(dns::ExtractHost(host_chain[i]));
2635  SortTeam(&host_names, &host_chain);
2636  unsigned last_geo_host = host_names.size();
2637 
2638  if ((fallback_group == 0) && (last_geo_host > 1)) {
2639  // There are no non-fallback proxies, which means that the client
2640  // will always use the fallback proxies. Add a keyword separator
2641  // between the hosts and fallback proxies so the geosorting service
2642  // will know to sort the hosts based on the distance from the
2643  // closest fallback proxy rather than the distance from the client.
2644  host_names.push_back("+PXYSEP+");
2645  }
2646 
2647  // Add fallback proxy names to the end of the host list
2648  unsigned first_geo_fallback = host_names.size();
2649  for (unsigned i = fallback_group; i < proxy_chain.size(); ++i) {
2650  // We only take the first fallback proxy name from every group under the
2651  // assumption that load-balanced servers are at the same location
2652  host_names.push_back(proxy_chain[i][0].host.name());
2653  }
2654 
2655  std::vector<uint64_t> geo_order;
2656  bool success = GeoSortServers(&host_names, &geo_order);
2657  if (!success) {
2658  // GeoSortServers already logged a failure message.
2659  return false;
2660  }
2661 
2662  // Re-install host chain and proxy chain
2664  delete opt_host_.chain;
2665  opt_num_proxies_ = 0;
2666  opt_host_.chain = new vector<string>(host_chain.size());
2667 
2668  // It's possible that opt_proxy_groups_fallback_ might have changed while
2669  // the lock wasn't held
2670  vector<vector<ProxyInfo> > *proxy_groups = new vector<vector<ProxyInfo> >(
2671  opt_proxy_groups_fallback_ + proxy_chain.size() - fallback_group);
2672  // First copy the non-fallback part of the current proxy chain
2673  for (unsigned i = 0; i < opt_proxy_groups_fallback_; ++i) {
2674  (*proxy_groups)[i] = (*opt_proxy_groups_)[i];
2675  opt_num_proxies_ += (*opt_proxy_groups_)[i].size();
2676  }
2677 
2678  // Copy the host chain and fallback proxies by geo order. Array indices
2679  // in geo_order that are smaller than last_geo_host refer to a stratum 1,
2680  // and those indices greater than or equal to first_geo_fallback refer to
2681  // a fallback proxy.
2682  unsigned hosti = 0;
2683  unsigned proxyi = opt_proxy_groups_fallback_;
2684  for (unsigned i = 0; i < geo_order.size(); ++i) {
2685  uint64_t orderval = geo_order[i];
2686  if (orderval < static_cast<uint64_t>(last_geo_host)) {
2687  // LogCvmfs(kLogCvmfs, kLogSyslog, "this is orderval %u at host index
2688  // %u", orderval, hosti);
2689  (*opt_host_.chain)[hosti++] = host_chain[orderval];
2690  } else if (orderval >= static_cast<uint64_t>(first_geo_fallback)) {
2691  // LogCvmfs(kLogCvmfs, kLogSyslog,
2692  // "this is orderval %u at proxy index %u, using proxy_chain index %u",
2693  // orderval, proxyi, fallback_group + orderval - first_geo_fallback);
2694  (*proxy_groups)[proxyi] =
2695  proxy_chain[fallback_group + orderval - first_geo_fallback];
2696  opt_num_proxies_ += (*proxy_groups)[proxyi].size();
2697  proxyi++;
2698  }
2699  }
2700 
2701  opt_proxy_map_.clear();
2702  delete opt_proxy_groups_;
2703  opt_proxy_groups_ = proxy_groups;
2704  // In pathological cases, opt_proxy_groups_current_ can be larger now when
2705  // proxies changed in-between.
2707  if (opt_proxy_groups_->size() == 0) {
2709  } else {
2711  }
2713  }
2714 
2715  UpdateProxiesUnlocked("geosort");
2716 
2717  delete opt_host_chain_rtt_;
2718  opt_host_chain_rtt_ = new vector<int>(host_chain.size(), kProbeGeo);
2719  opt_host_.current = 0;
2720 
2721  return true;
2722 }
2723 
2724 
2733  const string &reply_order,
2734  const unsigned expected_size,
2735  vector<uint64_t> *reply_vals)
2736 {
2737  if (reply_order.empty())
2738  return false;
2739  sanitizer::InputSanitizer sanitizer("09 , \n");
2740  if (!sanitizer.IsValid(reply_order))
2741  return false;
2742  sanitizer::InputSanitizer strip_newline("09 ,");
2743  vector<string> reply_strings =
2744  SplitString(strip_newline.Filter(reply_order), ',');
2745  vector<uint64_t> tmp_vals;
2746  for (unsigned i = 0; i < reply_strings.size(); ++i) {
2747  if (reply_strings[i].empty())
2748  return false;
2749  tmp_vals.push_back(String2Uint64(reply_strings[i]));
2750  }
2751  if (tmp_vals.size() != expected_size)
2752  return false;
2753 
2754  // Check if tmp_vals contains the number 1..n
2755  set<uint64_t> coverage(tmp_vals.begin(), tmp_vals.end());
2756  if (coverage.size() != tmp_vals.size())
2757  return false;
2758  if ((*coverage.begin() != 1) || (*coverage.rbegin() != coverage.size()))
2759  return false;
2760 
2761  for (unsigned i = 0; i < expected_size; ++i) {
2762  (*reply_vals)[i] = tmp_vals[i] - 1;
2763  }
2764  return true;
2765 }
2766 
2767 
2773  const string &proxy_list,
2774  string *cleaned_list)
2775 {
2776  assert(cleaned_list);
2777  if (proxy_list == "") {
2778  *cleaned_list = "";
2779  return false;
2780  }
2781  bool result = false;
2782 
2783  vector<string> proxy_groups = SplitString(proxy_list, ';');
2784  vector<string> cleaned_groups;
2785  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2786  vector<string> group = SplitString(proxy_groups[i], '|');
2787  vector<string> cleaned;
2788  for (unsigned j = 0; j < group.size(); ++j) {
2789  if ((group[j] == "DIRECT") || (group[j] == "")) {
2790  result = true;
2791  } else {
2792  cleaned.push_back(group[j]);
2793  }
2794  }
2795  if (!cleaned.empty())
2796  cleaned_groups.push_back(JoinStrings(cleaned, "|"));
2797  }
2798 
2799  *cleaned_list = JoinStrings(cleaned_groups, ";");
2800  return result;
2801 }
2802 
2803 
2813  const string &proxy_list,
2814  const string &fallback_proxy_list,
2815  const ProxySetModes set_mode)
2816 {
2818 
2821  string set_proxy_list = opt_proxy_list_;
2822  string set_proxy_fallback_list = opt_proxy_fallback_list_;
2823  bool contains_direct;
2824  if ((set_mode == kSetProxyFallback) || (set_mode == kSetProxyBoth)) {
2825  opt_proxy_fallback_list_ = fallback_proxy_list;
2826  }
2827  if ((set_mode == kSetProxyRegular) || (set_mode == kSetProxyBoth)) {
2828  opt_proxy_list_ = proxy_list;
2829  }
2830  contains_direct =
2831  StripDirect(opt_proxy_fallback_list_, &set_proxy_fallback_list);
2832  if (contains_direct) {
2834  "(manager '%s') fallback proxies do not support DIRECT, removing",
2835  name_.c_str());
2836  }
2837  if (set_proxy_fallback_list == "") {
2838  set_proxy_list = opt_proxy_list_;
2839  } else {
2840  bool contains_direct = StripDirect(opt_proxy_list_, &set_proxy_list);
2841  if (contains_direct) {
2843  "(manager '%s') skipping DIRECT proxy to use fallback proxy",
2844  name_.c_str());
2845  }
2846  }
2847 
2848  // From this point on, use set_proxy_list and set_fallback_proxy_list as
2849  // effective proxy lists!
2850 
2851  opt_proxy_map_.clear();
2852  delete opt_proxy_groups_;
2853  if ((set_proxy_list == "") && (set_proxy_fallback_list == "")) {
2854  opt_proxy_groups_ = NULL;
2858  opt_num_proxies_ = 0;
2859  return;
2860  }
2861 
2862  // Determine number of regular proxy groups (== first fallback proxy group)
2864  if (set_proxy_list != "") {
2865  opt_proxy_groups_fallback_ = SplitString(set_proxy_list, ';').size();
2866  }
2867  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s') "
2868  "first fallback proxy group %u",
2870 
2871  // Concatenate regular proxies and fallback proxies, both of which can be
2872  // empty.
2873  string all_proxy_list = set_proxy_list;
2874  if (set_proxy_fallback_list != "") {
2875  if (all_proxy_list != "")
2876  all_proxy_list += ";";
2877  all_proxy_list += set_proxy_fallback_list;
2878  }
2879  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s') full proxy list %s",
2880  name_.c_str(), all_proxy_list.c_str());
2881 
2882  // Resolve server names in provided urls
2883  vector<string> hostnames; // All encountered hostnames
2884  vector<string> proxy_groups;
2885  if (all_proxy_list != "")
2886  proxy_groups = SplitString(all_proxy_list, ';');
2887  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2888  vector<string> this_group = SplitString(proxy_groups[i], '|');
2889  for (unsigned j = 0; j < this_group.size(); ++j) {
2890  this_group[j] = dns::AddDefaultScheme(this_group[j]);
2891  // Note: DIRECT strings will be "extracted" to an empty string.
2892  string hostname = dns::ExtractHost(this_group[j]);
2893  // Save the hostname. Leave empty (DIRECT) names so indexes will
2894  // match later.
2895  hostnames.push_back(hostname);
2896  }
2897  }
2898  vector<dns::Host> hosts;
2899  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s') "
2900  "resolving %lu proxy addresses",
2901  name_.c_str(), hostnames.size());
2902  resolver_->ResolveMany(hostnames, &hosts);
2903 
2904  // Construct opt_proxy_groups_: traverse proxy list in same order and expand
2905  // names to resolved IP addresses.
2906  opt_proxy_groups_ = new vector< vector<ProxyInfo> >();
2907  opt_num_proxies_ = 0;
2908  unsigned num_proxy = 0; // Combined i, j counter
2909  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2910  vector<string> this_group = SplitString(proxy_groups[i], '|');
2911  // Construct ProxyInfo objects from proxy string and DNS resolver result for
2912  // every proxy in this_group. One URL can result in multiple ProxyInfo
2913  // objects, one for each IP address.
2914  vector<ProxyInfo> infos;
2915  for (unsigned j = 0; j < this_group.size(); ++j, ++num_proxy) {
2916  this_group[j] = dns::AddDefaultScheme(this_group[j]);
2917  if (this_group[j] == "DIRECT") {
2918  infos.push_back(ProxyInfo("DIRECT"));
2919  continue;
2920  }
2921 
2922  if (hosts[num_proxy].status() != dns::kFailOk) {
2923  LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn, "(manager '%s') "
2924  "failed to resolve IP addresses for %s (%d - %s)", name_.c_str(),
2925  hosts[num_proxy].name().c_str(), hosts[num_proxy].status(),
2926  dns::Code2Ascii(hosts[num_proxy].status()));
2927  dns::Host failed_host =
2928  dns::Host::ExtendDeadline(hosts[num_proxy], resolver_->min_ttl());
2929  infos.push_back(ProxyInfo(failed_host, this_group[j]));
2930  continue;
2931  }
2932 
2933  // IPv4 addresses have precedence
2934  set<string> best_addresses =
2935  hosts[num_proxy].ViewBestAddresses(opt_ip_preference_);
2936  set<string>::const_iterator iter_ips = best_addresses.begin();
2937  for (; iter_ips != best_addresses.end(); ++iter_ips) {
2938  string url_ip = dns::RewriteUrl(this_group[j], *iter_ips);
2939  infos.push_back(ProxyInfo(hosts[num_proxy], url_ip));
2940 
2941  if (sharding_policy_.UseCount() > 0) {
2942  sharding_policy_->AddProxy(url_ip);
2943  }
2944  }
2945  }
2946  opt_proxy_groups_->push_back(infos);
2947  opt_num_proxies_ += infos.size();
2948  }
2950  "(manager '%s') installed %u proxies in %lu load-balance groups",
2951  name_.c_str(), opt_num_proxies_, opt_proxy_groups_->size());
2954 
2955  // Select random start proxy from the first group.
2956  if (opt_proxy_groups_->size() > 0) {
2957  // Select random start proxy from the first group.
2958  UpdateProxiesUnlocked("set random start proxy from the first proxy group");
2959  }
2960 }
2961 
2962 
2969 void DownloadManager::GetProxyInfo(vector< vector<ProxyInfo> > *proxy_chain,
2970  unsigned *current_group,
2971  unsigned *fallback_group)
2972 {
2973  assert(proxy_chain != NULL);
2975 
2976 
2977  if (!opt_proxy_groups_) {
2978  vector< vector<ProxyInfo> > empty_chain;
2979  *proxy_chain = empty_chain;
2980  if (current_group != NULL)
2981  *current_group = 0;
2982  if (fallback_group != NULL)
2983  *fallback_group = 0;
2984  return;
2985  }
2986 
2987  *proxy_chain = *opt_proxy_groups_;
2988  if (current_group != NULL)
2989  *current_group = opt_proxy_groups_current_;
2990  if (fallback_group != NULL)
2991  *fallback_group = opt_proxy_groups_fallback_;
2992 }
2993 
2995  return opt_proxy_list_;
2996 }
2997 
2999  return opt_proxy_fallback_list_;
3000 }
3001 
3007  if (!opt_proxy_groups_)
3008  return NULL;
3009 
3010  uint32_t key = (hash ? hash->Partial32() : 0);
3011  map<uint32_t, ProxyInfo *>::iterator it = opt_proxy_map_.lower_bound(key);
3012  ProxyInfo *proxy = it->second;
3013 
3014  return proxy;
3015 }
3016 
3020 void DownloadManager::UpdateProxiesUnlocked(const string &reason) {
3021  if (!opt_proxy_groups_)
3022  return;
3023 
3024  // Identify number of non-burned proxies within the current group
3025  vector<ProxyInfo> *group = current_proxy_group();
3026  unsigned num_alive = (group->size() - opt_proxy_groups_current_burned_);
3027  string old_proxy = JoinStrings(opt_proxies_, "|");
3028 
3029  // Rebuild proxy map and URL list
3030  opt_proxy_map_.clear();
3031  opt_proxies_.clear();
3032  const uint32_t max_key = 0xffffffffUL;
3033  if (opt_proxy_shard_) {
3034  // Build a consistent map with multiple entries for each proxy
3035  for (unsigned i = 0; i < num_alive; ++i) {
3036  ProxyInfo *proxy = &(*group)[i];
3037  shash::Any proxy_hash(shash::kSha1);
3038  HashString(proxy->url, &proxy_hash);
3039  Prng prng;
3040  prng.InitSeed(proxy_hash.Partial32());
3041  for (unsigned j = 0; j < kProxyMapScale; ++j) {
3042  const std::pair<uint32_t, ProxyInfo *> entry(prng.Next(max_key), proxy);
3043  opt_proxy_map_.insert(entry);
3044  }
3045  std::string proxy_name = proxy->host.name().empty() ?
3046  "" : " (" + proxy->host.name() + ")";
3047  opt_proxies_.push_back(proxy->url + proxy_name);
3048  }
3049  // Ensure lower_bound() finds a value for all keys
3050  ProxyInfo *first_proxy = opt_proxy_map_.begin()->second;
3051  const std::pair<uint32_t, ProxyInfo *> last_entry(max_key, first_proxy);
3052  opt_proxy_map_.insert(last_entry);
3053  } else {
3054  // Build a map with a single entry for one randomly selected proxy
3055  unsigned select = prng_.Next(num_alive);
3056  ProxyInfo *proxy = &(*group)[select];
3057  const std::pair<uint32_t, ProxyInfo *> entry(max_key, proxy);
3058  opt_proxy_map_.insert(entry);
3059  std::string proxy_name = proxy->host.name().empty() ?
3060  "" : " (" + proxy->host.name() + ")";
3061  opt_proxies_.push_back(proxy->url + proxy_name);
3062  }
3063  sort(opt_proxies_.begin(), opt_proxies_.end());
3064 
3065  // Report any change in proxy usage
3066  string new_proxy = JoinStrings(opt_proxies_, "|");
3067  const string curr_host = "Current host: " + (opt_host_.chain ?
3068  (*opt_host_.chain)[opt_host_.current] : "");
3069  if (new_proxy != old_proxy) {
3071  "(manager '%s') switching proxy from %s to %s. Reason: %s [%s]",
3072  name_.c_str(), (old_proxy.empty() ? "(none)" : old_proxy.c_str()),
3073  (new_proxy.empty() ? "(none)" : new_proxy.c_str()),
3074  reason.c_str(), curr_host.c_str());
3075  }
3076 }
3077 
3082  opt_proxy_shard_ = true;
3083  RebalanceProxiesUnlocked("enable sharding");
3084 }
3085 
3090 void DownloadManager::RebalanceProxiesUnlocked(const string &reason) {
3091  if (!opt_proxy_groups_)
3092  return;
3093 
3096  UpdateProxiesUnlocked(reason);
3097 }
3098 
3099 
3102  RebalanceProxiesUnlocked("rebalance invoked manually");
3103 }
3104 
3105 
3111 
3112  if (!opt_proxy_groups_ || (opt_proxy_groups_->size() < 2)) {
3113  return;
3114  }
3115 
3117  opt_proxy_groups_->size();
3118  opt_timestamp_backup_proxies_ = time(NULL);
3119 
3120  std::string msg = "switch to proxy group " +
3123 }
3124 
3125 
3126 void DownloadManager::SetProxyGroupResetDelay(const unsigned seconds) {
3129  if (opt_proxy_groups_reset_after_ == 0) {
3132  }
3133 }
3134 
3135 
3136 void DownloadManager::SetMetalinkResetDelay(const unsigned seconds)
3137 {
3138  const MutexLockGuard m(lock_options_);
3139  opt_metalink_.reset_after = seconds;
3140  if (opt_metalink_.reset_after == 0)
3142 }
3143 
3144 
3145 void DownloadManager::SetHostResetDelay(const unsigned seconds)
3146 {
3148  opt_host_.reset_after = seconds;
3149  if (opt_host_.reset_after == 0)
3151 }
3152 
3153 
3154 void DownloadManager::SetRetryParameters(const unsigned max_retries,
3155  const unsigned backoff_init_ms,
3156  const unsigned backoff_max_ms)
3157 {
3159  opt_max_retries_ = max_retries;
3160  opt_backoff_init_ms_ = backoff_init_ms;
3161  opt_backoff_max_ms_ = backoff_max_ms;
3162 }
3163 
3164 
3167  resolver_->set_throttle(limit);
3168 }
3169 
3170 
3172  const std::string &direct,
3173  const std::string &forced)
3174 {
3176  proxy_template_direct_ = direct;
3177  proxy_template_forced_ = forced;
3178 }
3179 
3180 
3182  enable_info_header_ = true;
3183 }
3184 
3185 
3187  follow_redirects_ = true;
3188 }
3189 
3192 }
3193 
3195  enable_http_tracing_ = true;
3196 }
3197 
3198 void DownloadManager::AddHTTPTracingHeader(const std::string &header) {
3199  http_tracing_headers_.push_back(header);
3200 }
3201 
3204 }
3205 
3207  bool success = false;
3208  switch (type) {
3209  default:
3210  LogCvmfs(kLogDownload, kLogDebug | kLogSyslogErr, "(manager '%s') "
3211  "Proposed sharding policy does not exist. Falling back to default",
3212  name_.c_str());
3213  }
3214  return success;
3215 }
3216 
3218  failover_indefinitely_ = true;
3219 }
3220 
3226  const perf::StatisticsTemplate &statistics, const std::string &cloned_name)
3227 {
3228  DownloadManager *clone = new DownloadManager(pool_max_handles_, statistics,
3229  cloned_name);
3230 
3234 
3235  if (!opt_dns_server_.empty())
3236  clone->SetDnsServer(opt_dns_server_);
3248  if (opt_host_.chain) {
3249  clone->opt_host_.chain = new vector<string>(*opt_host_.chain);
3250  clone->opt_host_chain_rtt_ = new vector<int>(*opt_host_chain_rtt_);
3251  }
3252 
3253  CloneProxyConfig(clone);
3262 
3263  clone->health_check_ = health_check_;
3266  clone->fqrn_ = fqrn_;
3267 
3268  return clone;
3269 }
3270 
3271 
3280  if (opt_proxy_groups_ == NULL)
3281  return;
3282 
3283  clone->opt_proxy_groups_ = new vector< vector<ProxyInfo> >(
3285  clone->UpdateProxiesUnlocked("cloned");
3286 }
3287 
3288 } // namespace download
unsigned opt_timeout_direct_
Definition: download.h:331
std::vector< std::string > http_tracing_headers_
Definition: download.h:347
bool StripDirect(const std::string &proxy_list, std::string *cleaned_list)
Definition: download.cc:2772
unsigned opt_low_speed_limit_
Definition: download.h:332
void HashString(const std::string &content, Any *any_digest)
Definition: hash.cc:268
Definition: prng.h:28
static const unsigned kDnsDefaultTimeoutMs
Definition: download.h:177
std::vector< T > Shuffle(const std::vector< T > &input, Prng *prng)
Definition: algorithm.h:50
unsigned throttle() const
Definition: dns.h:206
z_stream * GetZstreamPtr()
Definition: jobinfo.h:165
unsigned opt_backoff_init_ms_
Definition: download.h:334
void SetInfoHeader(char *info_header)
Definition: jobinfo.h:244
shash::ContextPtr * GetHashContextPtr()
Definition: jobinfo.h:170
uid_t uid() const
Definition: jobinfo.h:182
int64_t Xadd(class Counter *counter, const int64_t delta)
Definition: statistics.h:51
struct stat64 platform_stat64
const char * Code2Ascii(const Failures error)
Definition: dns.h:53
int64_t id() const
Definition: dns.h:108
unsigned opt_proxy_groups_current_burned_
Definition: download.h:371
double DiffTimeSeconds(struct timeval start, struct timeval end)
Definition: algorithm.cc:31
unsigned opt_proxy_groups_reset_after_
Definition: download.h:465
bool probe_hosts() const
Definition: jobinfo.h:177
virtual bool IsCanceled()
Definition: interrupt.h:16
void SetUrlOptions(JobInfo *info)
Definition: download.cc:1046
SharedPtr< ShardingPolicy > sharding_policy_
Definition: download.h:410
StreamStates DecompressZStream2Sink(const void *buf, const int64_t size, z_stream *strm, cvmfs::Sink *sink)
Definition: compression.cc:234
void ResolveMany(const std::vector< std::string > &names, std::vector< Host > *hosts)
Definition: dns.cc:370
std::string opt_proxy_fallback_list_
Definition: download.h:388
void SetHostChain(const std::string &host_list)
bool CheckMetalinkChain(const time_t now)
Definition: download.cc:2458
static NormalResolver * Create(const bool ipv4_only, const unsigned retries, const unsigned timeout_ms)
Definition: dns.cc:1269
int64_t id() const
Definition: jobinfo.h:216
void SetMetalinkChain(const std::string &metalink_list)
void SetLowSpeedLimit(const unsigned low_speed_limit)
Definition: download.cc:2201
virtual int Purge()=0
DownloadManager(const unsigned max_pool_handles, const perf::StatisticsTemplate &statistics, const std::string &name="standard")
Definition: download.cc:1895
std::string proxy_template_direct_
Definition: download.h:449
std::vector< std::string > opt_proxies_
Definition: download.h:396
curl_slist ** GetHeadersPtr()
Definition: jobinfo.h:168
static const int kProbeGeo
Definition: download.h:174
#define PANIC(...)
Definition: exception.h:29
string Trim(const string &raw, bool trim_newline)
Definition: string.cc:446
string ReplaceAll(const string &haystack, const string &needle, const string &replace_by)
Definition: string.cc:502
void set_min_ttl(unsigned seconds)
Definition: dns.h:207
string JoinStrings(const vector< string > &strings, const string &joint)
Definition: string.cc:343
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:249
unsigned opt_proxy_groups_current_
Definition: download.h:366
void SetCurrentHostChainIndex(int current_host_chain_index)
Definition: jobinfo.h:269
virtual bool RequiresReserve()=0
bool ValidateGeoReply(const std::string &reply_order, const unsigned expected_size, std::vector< uint64_t > *reply_vals)
Definition: download.cc:2732
std::vector< ProxyInfo > * current_proxy_group() const
Definition: download.h:303
void DecompressInit(z_stream *strm)
Definition: compression.cc:185
const std::string * url() const
Definition: jobinfo.h:175
void * cred_data() const
Definition: jobinfo.h:184
time_t opt_timestamp_backup_proxies_
Definition: download.h:463
void SetProxyChain(const std::string &proxy_list, const std::string &fallback_proxy_list, const ProxySetModes set_mode)
Definition: download.cc:2812
std::string GetProxyList()
Definition: download.cc:2994
unsigned min_ttl() const
Definition: dns.h:208
void InitLocaltime()
Definition: prng.h:39
bool allow_failure() const
Definition: jobinfo.h:215
void GetMetalinkInfo(std::vector< std::string > *metalink_chain, unsigned *current_metalink)
Definition: download.cc:2248
std::set< CURL * > * pool_handles_inuse_
Definition: download.h:310
CURL * curl_handle() const
Definition: jobinfo.h:193
pthread_mutex_t * lock_options_
Definition: download.h:327
ProxyInfo * ChooseProxyUnlocked(const shash::Any *hash)
Definition: download.cc:3006
pthread_t thread_download_
Definition: download.h:317
std::string opt_proxy_list_
Definition: download.h:384
const std::string & name() const
Definition: dns.h:118
perf::Counter * sz_transfer_time
Definition: download.h:44
void SetTracingHeaderGid(char *tracing_header_gid)
Definition: jobinfo.h:247
std::vector< std::vector< ProxyInfo > > * opt_proxy_groups_
Definition: download.h:362
assert((mem||(size==0))&&"Out Of Memory")
void SetNocache(bool nocache)
Definition: jobinfo.h:256
unsigned opt_proxy_groups_fallback_
Definition: download.h:376
void ReleaseCurlHandle(CURL *handle)
Definition: download.cc:916
bool IsValidDataTube()
Definition: jobinfo.h:151
static bool sortlinks(const std::string &s1, const std::string &s2)
Definition: download.cc:1406
StreamStates
Definition: compression.h:36
void SetTracingHeaderPid(char *tracing_header_pid)
Definition: jobinfo.h:245
void set_max_ttl(unsigned seconds)
Definition: dns.h:209
Tube< DataTubeElement > * GetDataTubePtr()
Definition: jobinfo.h:173
Algorithms algorithm
Definition: hash.h:125
char * tracing_header_gid() const
Definition: jobinfo.h:197
const std::string path()
Definition: sink_path.h:109
const std::set< std::string > & ViewBestAddresses(IpPreference preference) const
Definition: dns.cc:205
char * info_header() const
Definition: jobinfo.h:195
void SetDnsServer(const std::string &address)
Definition: download.cc:2129
int platform_stat(const char *path, platform_stat64 *buf)
DownloadManager * Clone(const perf::StatisticsTemplate &statistics, const std::string &cloned_name)
Definition: download.cc:3225
bool force_nocache() const
Definition: jobinfo.h:180
std::string StringifyUint(const uint64_t value)
Definition: string.cc:84
void DecompressFini(z_stream *strm)
Definition: compression.cc:201
virtual std::string Describe()=0
static void * MainDownload(void *data)
Definition: download.cc:577
void InitSeed(const uint64_t seed)
Definition: prng.h:35
void SetTimeout(const unsigned seconds_proxy, const unsigned seconds_direct)
Definition: download.cc:2187
void SetLink(const std::string &link)
Definition: jobinfo.h:255
std::string opt_dns_server_
Definition: download.h:329
void SwitchHostInfo(const std::string &typ, HostInfo &info, JobInfo *jobinfo)
Definition: download.cc:2381
off_t range_offset() const
Definition: jobinfo.h:190
bool nocache() const
Definition: jobinfo.h:203
char algorithm
unsigned char num_used_hosts() const
Definition: jobinfo.h:208
virtual bool IsValid()=0
bool follow_redirects() const
Definition: jobinfo.h:179
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:138
void Init(ContextPtr context)
Definition: hash.cc:164
int http_code() const
Definition: jobinfo.h:205
bool IsValid(const std::string &input) const
Definition: sanitizer.cc:114
Algorithms
Definition: hash.h:41
void CreateDataTube()
Definition: jobinfo.h:146
void SetNumUsedMetalinks(unsigned char num_used_metalinks)
Definition: jobinfo.h:261
bool FileExists(const std::string &path)
Definition: posix.cc:802
unsigned max_ttl() const
Definition: dns.h:210
Pipe< kPipeDownloadJobsResults > * GetPipeJobResultPtr()
Definition: jobinfo.h:171
void SetDnsTtlLimits(const unsigned min_seconds, const unsigned max_seconds)
Definition: download.cc:2166
static Failures PrepareDownloadDestination(JobInfo *info)
Definition: download.cc:115
perf::Counter * n_metalink_failover
Definition: download.h:47
void SetHttpCode(int http_code)
Definition: jobinfo.h:258
DataTubeAction action
Definition: jobinfo.h:49
const char * Code2Ascii(const Failures error)
bool head_request() const
Definition: jobinfo.h:178
unsigned char num_retries() const
Definition: jobinfo.h:209
std::vector< std::string > * chain
Definition: download.h:138
bool IsValidPipeJobResults()
Definition: jobinfo.h:142
off_t range_size() const
Definition: jobinfo.h:191
void GetProxyInfo(std::vector< std::vector< ProxyInfo > > *proxy_chain, unsigned *current_group, unsigned *fallback_group)
Definition: download.cc:2969
void SetProxyGroupResetDelay(const unsigned seconds)
Definition: download.cc:3126
void SetCurrentMetalinkChainIndex(int current_metalink_chain_index)
Definition: jobinfo.h:267
atomic_int32 multi_threaded_
Definition: download.h:318
bool compressed() const
Definition: jobinfo.h:176
std::string AddDefaultScheme(const std::string &proxy)
Definition: dns.cc:187
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:308
cvmfs::Sink * sink() const
Definition: jobinfo.h:186
gid_t gid() const
Definition: jobinfo.h:183
dns::NormalResolver * resolver_
Definition: download.h:437
bool Interrupted(const std::string &fqrn, JobInfo *info)
Definition: download.cc:85
std::string proxy() const
Definition: jobinfo.h:201
dns::IpPreference opt_ip_preference_
Definition: download.h:442
Algorithms algorithm
Definition: hash.h:500
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:257
void UseSystemCertificatePath()
Definition: ssl.cc:68
Definition: dns.h:90
bool SetShardingPolicy(const ShardingPolicySelector type)
Definition: download.cc:3206
perf::Counter * n_host_failover
Definition: download.h:48
void UpdateProxiesUnlocked(const std::string &reason)
Definition: download.cc:3020
time_t opt_metalink_timestamp_link_
Definition: download.h:351
bool IsEquivalent(const Host &other) const
Definition: dns.cc:269
std::string link() const
Definition: jobinfo.h:202
bool IsProxyTransferError(const Failures error)
void SetNumRetries(unsigned char num_retries)
Definition: jobinfo.h:265
bool IsExpired() const
Definition: dns.cc:280
void set_throttle(const unsigned throttle)
Definition: dns.h:205
InterruptCue * interrupt_cue() const
Definition: jobinfo.h:185
void SetIpPreference(const dns::IpPreference preference)
Definition: download.cc:2176
virtual int Reset()=0
shash::ContextPtr hash_context() const
Definition: jobinfo.h:200
perf::Counter * n_requests
Definition: download.h:45
bool Read(T *data)
Definition: pipe.h:166
void Final(ContextPtr context, Any *any_digest)
Definition: hash.cc:221
void SetRetryParameters(const unsigned max_retries, const unsigned backoff_init_ms, const unsigned backoff_max_ms)
Definition: download.cc:3154
string StringifyInt(const int64_t value)
Definition: string.cc:78
char * tracing_header_uid() const
Definition: jobinfo.h:198
Failures error_code() const
Definition: jobinfo.h:204
void CloneProxyConfig(DownloadManager *clone)
Definition: download.cc:3272
void SetMaxIpaddrPerProxy(unsigned limit)
Definition: download.cc:3165
void EnableIgnoreSignatureFailures()
Definition: download.cc:3190
void Inc(class Counter *counter)
Definition: statistics.h:50
void * buffer
Definition: hash.h:501
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:285
std::string ExtractHost(const std::string &url)
Definition: dns.cc:110
void ** GetCredDataPtr()
Definition: jobinfo.h:167
std::vector< int > * opt_host_chain_rtt_
Definition: download.h:359
SslCertificateStore ssl_certificate_store_
Definition: download.h:478
std::string GetFallbackProxyList()
Definition: download.cc:2998
uint32_t Partial32() const
Definition: hash.h:394
void SetProxyTemplates(const std::string &direct, const std::string &forced)
Definition: download.cc:3171
IpPreference
Definition: dns.h:46
unsigned retries() const
Definition: dns.h:203
unsigned opt_backoff_max_ms_
Definition: download.h:335
void SetErrorCode(Failures error_code)
Definition: jobinfo.h:257
void SetNumUsedProxies(unsigned char num_used_proxies)
Definition: jobinfo.h:259
unsigned GetContextSize(const Algorithms algorithm)
Definition: hash.cc:148
std::string GetDnsServer() const
Definition: download.cc:2121
int current_metalink_chain_index() const
Definition: jobinfo.h:211
CredentialsAttachment * credentials_attachment_
Definition: download.h:467
struct pollfd * watch_fds_
Definition: download.h:322
void Update(const unsigned char *buffer, const unsigned buffer_length, ContextPtr context)
Definition: hash.cc:190
std::map< uint32_t, ProxyInfo * > opt_proxy_map_
Definition: download.h:392
uint64_t String2Uint64(const string &value)
Definition: string.cc:246
UniquePtr< Pipe< kPipeDownloadJobs > > pipe_jobs_
Definition: download.h:321
void CreatePipeJobResults()
Definition: jobinfo.h:138
Failures Fetch(JobInfo *info)
Definition: download.cc:2001
unsigned char num_used_metalinks() const
Definition: jobinfo.h:207
void SetCurlHandle(CURL *curl_handle)
Definition: jobinfo.h:242
void SetTracingHeaderUid(char *tracing_header_uid)
Definition: jobinfo.h:249
Definition: mutex.h:42
curl_slist * headers() const
Definition: jobinfo.h:194
const shash::Any * expected_hash() const
Definition: jobinfo.h:187
perf::Counter * n_proxy_failover
Definition: download.h:49
virtual int Flush()=0
void GetTimeout(unsigned *seconds_proxy, unsigned *seconds_direct)
Definition: download.cc:2210
void SetCredData(void *cred_data)
Definition: jobinfo.h:229
std::string Filter(const std::string &input) const
Definition: sanitizer.cc:107
std::string proxy_template_forced_
Definition: download.h:455
time_t opt_timestamp_failover_proxies_
Definition: download.h:464
void SetDnsParameters(const unsigned retries, const unsigned timeout_ms)
Definition: download.cc:2148
unsigned EscapeHeader(const std::string &header, char *escaped_buf, size_t buf_size)
Definition: download.cc:449
const std::string * extra_info() const
Definition: jobinfo.h:188
static Host ExtendDeadline(const Host &original, unsigned seconds_from_now)
Definition: dns.cc:231
UniquePtr< Pipe< kPipeThreadTerminator > > pipe_terminate_
Definition: download.h:319
void SetProxy(const std::string &proxy)
Definition: jobinfo.h:254
static const int kProbeUnprobed
Definition: download.h:165
unsigned backoff_ms() const
Definition: jobinfo.h:210
virtual int Reset()
Definition: sink_mem.cc:52
pid_t pid() const
Definition: jobinfo.h:181
unsigned char num_used_proxies() const
Definition: jobinfo.h:206
int current_host_chain_index() const
Definition: jobinfo.h:213
void SafeSleepMs(const unsigned ms)
Definition: posix.cc:2049
void SetMetalinkResetDelay(const unsigned seconds)
Definition: download.cc:3136
bool IsHostTransferError(const Failures error)
static const unsigned kDnsDefaultRetries
Definition: download.h:176
void SortTeam(std::vector< T > *tractor, std::vector< U > *towed)
Definition: algorithm.h:68
bool GeoSortServers(std::vector< std::string > *servers, std::vector< uint64_t > *output_order=NULL)
Definition: download.cc:2525
virtual bool Reserve(size_t size)=0
static const int kProbeDown
Definition: download.h:170
void SetNumUsedHosts(unsigned char num_used_hosts)
Definition: jobinfo.h:263
unsigned size
Definition: hash.h:502
void SetHeaders(curl_slist *headers)
Definition: jobinfo.h:243
Failures status() const
Definition: dns.h:119
static const unsigned kProxyMapScale
Definition: download.h:178
void GetHostInfo(std::vector< std::string > *host_chain, std::vector< int > *rtt, unsigned *current_host)
Definition: download.cc:2293
static void size_t size
Definition: smalloc.h:54
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
Definition: download.cc:1486
char * tracing_header_pid() const
Definition: jobinfo.h:196
void SetBackoffMs(unsigned backoff_ms)
Definition: jobinfo.h:266
SharedPtr< HealthCheck > health_check_
Definition: download.h:418
void SwitchProxy(JobInfo *info)
Definition: download.cc:2313
void AddHTTPTracingHeader(const std::string &header)
Definition: download.cc:3198
void SetCredentialsAttachment(CredentialsAttachment *ca)
Definition: download.cc:2113
void SetFollowRedirects(bool follow_redirects)
Definition: jobinfo.h:223
void RebalanceProxiesUnlocked(const std::string &reason)
Definition: download.cc:3090
pthread_mutex_t * lock_synchronous_mode_
Definition: download.h:328
virtual bool SetResolvers(const std::vector< std::string > &resolvers)
Definition: dns.cc:1300
void InitializeRequest(JobInfo *info, CURL *handle)
Definition: download.cc:934
static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, void *userp, void *socketp)
Definition: download.cc:497
string RewriteUrl(const string &url, const string &ip)
Definition: dns.cc:156
void SetHostResetDelay(const unsigned seconds)
Definition: download.cc:3145
uint32_t Next(const uint64_t boundary)
Definition: prng.h:49
virtual int64_t Write(const void *buf, uint64_t sz)=0
unsigned timeout_ms() const
Definition: dns.h:204
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528