CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
download.cc
Go to the documentation of this file.
1 
27 // TODO(jblomer): MS for time summing
28 // NOLINTNEXTLINE
29 #define __STDC_FORMAT_MACROS
30 
31 #include "cvmfs_config.h"
32 #include "download.h"
33 
34 #include <alloca.h>
35 #include <errno.h>
36 #include <inttypes.h>
37 #include <poll.h>
38 #include <pthread.h>
39 #include <signal.h>
40 #include <stdint.h>
41 #include <sys/time.h>
42 #include <unistd.h>
43 
44 #include <algorithm>
45 #include <cassert>
46 #include <cstdio>
47 #include <cstdlib>
48 #include <cstring>
49 #include <map>
50 #include <set>
51 #include <utility>
52 
53 #include "compression.h"
54 #include "crypto/hash.h"
55 #include "duplex_curl.h"
56 #include "interrupt.h"
57 #include "sanitizer.h"
58 #include "ssl.h"
59 #include "util/algorithm.h"
60 #include "util/atomic.h"
61 #include "util/concurrency.h"
62 #include "util/exception.h"
63 #include "util/logging.h"
64 #include "util/posix.h"
65 #include "util/prng.h"
66 #include "util/smalloc.h"
67 #include "util/string.h"
68 
69 using namespace std; // NOLINT
70 
71 namespace download {
72 
85 bool Interrupted(const std::string &fqrn, JobInfo *info) {
86  if (info->allow_failure()) {
87  return true;
88  }
89 
90  if (!fqrn.empty()) {
91  // it is up to the user the create this sentinel file ("pause_file") if
92  // CVMFS_FAILOVER_INDEFINITELY is used. It must be created during
93  // "cvmfs_config reload" and "cvmfs_config reload $fqrn"
94  std::string pause_file = std::string("/var/run/cvmfs/interrupt.") + fqrn;
95 
97  "Interrupted(): checking for existence of %s", pause_file.c_str());
98  if (FileExists(pause_file)) {
99  LogCvmfs(kLogDownload, kLogDebug, "Interrupt marker found - "
100  "Interrupting current download, this will EIO outstanding IO.");
101  if (0 != unlink(pause_file.c_str())) {
103  "Couldn't delete interrupt marker: errno=%d", errno);
104  }
105  return true;
106  }
107  }
108  return false;
109 }
110 
112  if (info->sink() != NULL && !info->sink()->IsValid()) {
113  cvmfs::PathSink* psink = dynamic_cast<cvmfs::PathSink*>(info->sink());
114  if (psink != NULL) {
115  LogCvmfs(kLogDownload, kLogDebug, "Failed to open path %s: %s"
116  " (errno=%d).", psink->path().c_str(),
117  strerror(errno), errno);
118  return kFailLocalIO;
119  } else {
120  LogCvmfs(kLogDownload, kLogDebug, "Failed to create a valid sink: \n %s",
121  info->sink()->Describe().c_str());
122  return kFailOther;
123  }
124  }
125 
126  return kFailOk;
127 }
128 
129 
133 static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb,
134  void *info_link)
135 {
136  const size_t num_bytes = size*nmemb;
137  const string header_line(static_cast<const char *>(ptr), num_bytes);
138  JobInfo *info = static_cast<JobInfo *>(info_link);
139 
140  // LogCvmfs(kLogDownload, kLogDebug, "REMOVE-ME: Header callback with %s",
141  // header_line.c_str());
142 
143  // Check http status codes
144  if (HasPrefix(header_line, "HTTP/1.", false)) {
145  if (header_line.length() < 10) {
146  return 0;
147  }
148 
149  unsigned i;
150  for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {}
151 
152  // Code is initialized to -1
153  if (header_line.length() > i+2) {
154  info->SetHttpCode(DownloadManager::ParseHttpCode(&header_line[i]));
155  }
156 
157  if ((info->http_code() / 100) == 2) {
158  return num_bytes;
159  } else if ((info->http_code() == 301) ||
160  (info->http_code() == 302) ||
161  (info->http_code() == 303) ||
162  (info->http_code() == 307))
163  {
164  if (!info->follow_redirects()) {
165  LogCvmfs(kLogDownload, kLogDebug, "redirect support not enabled: %s",
166  header_line.c_str());
168  return 0;
169  }
170  LogCvmfs(kLogDownload, kLogDebug, "http redirect: %s",
171  header_line.c_str());
172  // libcurl will handle this because of CURLOPT_FOLLOWLOCATION
173  return num_bytes;
174  } else {
175  LogCvmfs(kLogDownload, kLogDebug, "http status error code: %s [%d]",
176  header_line.c_str(), info->http_code());
177  if (((info->http_code() / 100) == 5) ||
178  (info->http_code() == 400) || (info->http_code() == 404))
179  {
180  // 5XX returned by host
181  // 400: error from the GeoAPI module
182  // 404: the stratum 1 does not have the newest files
184  } else if (info->http_code() == 429) {
185  // 429: rate throttling (we ignore the backoff hint for the time being)
187  } else {
188  info->SetErrorCode((info->proxy() == "DIRECT") ? kFailHostHttp :
190  }
191  return 0;
192  }
193  }
194 
195  // If needed: allocate space in sink
196  if (info->sink() != NULL && info->sink()->RequiresReserve() &&
197  HasPrefix(header_line, "CONTENT-LENGTH:", true))
198  {
199  char *tmp = reinterpret_cast<char *>(alloca(num_bytes+1));
200  uint64_t length = 0;
201  sscanf(header_line.c_str(), "%s %" PRIu64, tmp, &length);
202  if (length > 0) {
203  if (!info->sink()->Reserve(length)) {
205  "resource %s too large to store in memory (%" PRIu64 ")",
206  info->url()->c_str(), length);
207  info->SetErrorCode(kFailTooBig);
208  return 0;
209  }
210  } else {
211  // Empty resource
212  info->sink()->Reserve(0);
213  }
214  } else if (HasPrefix(header_line, "LOCATION:", true)) {
215  // This comes along with redirects
216  LogCvmfs(kLogDownload, kLogDebug, "%s", header_line.c_str());
217  } else if (HasPrefix(header_line, "X-SQUID-ERROR:", true)) {
218  // Reinterpret host error as proxy error
219  if (info->error_code() == kFailHostHttp) {
221  }
222  } else if (HasPrefix(header_line, "PROXY-STATUS:", true)) {
223  // Reinterpret host error as proxy error if applicable
224  if ((info->error_code() == kFailHostHttp) &&
225  (header_line.find("error=") != string::npos)) {
227  }
228  }
229 
230  return num_bytes;
231 }
232 
233 
237 static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb,
238  void *info_link)
239 {
240  const size_t num_bytes = size*nmemb;
241  JobInfo *info = static_cast<JobInfo *>(info_link);
242 
243  // TODO(heretherebedragons) remove if no error comes up
244  // as this means only jobinfo data request (and not header only)
245  // come here
246  assert(info->sink() != NULL);
247 
248  // LogCvmfs(kLogDownload, kLogDebug, "Data callback, %d bytes", num_bytes);
249 
250  if (num_bytes == 0)
251  return 0;
252 
253  if (info->expected_hash()) {
254  shash::Update(reinterpret_cast<unsigned char *>(ptr),
255  num_bytes, info->hash_context());
256  }
257 
258  if (info->compressed()) {
259  zlib::StreamStates retval =
260  zlib::DecompressZStream2Sink(ptr, static_cast<int64_t>(num_bytes),
261  info->GetZstreamPtr(), info->sink());
262  if (retval == zlib::kStreamDataError) {
263  LogCvmfs(kLogDownload, kLogSyslogErr, "failed to decompress %s",
264  info->url()->c_str());
265  info->SetErrorCode(kFailBadData);
266  return 0;
267  } else if (retval == zlib::kStreamIOError) {
269  "decompressing %s, local IO error", info->url()->c_str());
270  info->SetErrorCode(kFailLocalIO);
271  return 0;
272  }
273  } else {
274  int64_t written = info->sink()->Write(ptr, num_bytes);
275  if (written < 0 || static_cast<uint64_t>(written) != num_bytes) {
277  "Failed to perform write of %zu bytes to sink %s with errno %d",
278  num_bytes, info->sink()->Describe().c_str(), written);
279  }
280  }
281 
282  return num_bytes;
283 }
284 
285 #ifdef DEBUGMSG
286 static int CallbackCurlDebug(
287  CURL * /* handle */,
288  curl_infotype type,
289  char *data,
290  size_t size,
291  void * /* clientp */)
292 {
293  const char *prefix = "";
294  switch (type) {
295  case CURLINFO_TEXT:
296  prefix = "{info} ";
297  break;
298  case CURLINFO_HEADER_IN:
299  prefix = "{header/in} ";
300  break;
301  case CURLINFO_HEADER_OUT:
302  prefix = "{header/out} ";
303  break;
304  case CURLINFO_DATA_IN:
305  LogCvmfs(kLogCurl, kLogDebug, "{data/in} <snip>");
306  return 0;
307  case CURLINFO_DATA_OUT:
308  LogCvmfs(kLogCurl, kLogDebug, "{data/out} <snip>");
309  return 0;
310  case CURLINFO_SSL_DATA_IN:
311  LogCvmfs(kLogCurl, kLogDebug, "{ssldata/in} <snip>");
312  return 0;
313  case CURLINFO_SSL_DATA_OUT:
314  LogCvmfs(kLogCurl, kLogDebug, "{ssldata/out} <snip>");
315  return 0;
316  default:
317  // just log the message
318  break;
319  }
320  std::string msg(data, size);
321  for (size_t i = 0; i < msg.length(); ++i) {
322  if (msg[i] == '\0')
323  msg[i] = '~';
324  }
325 
326  LogCvmfs(kLogCurl, kLogDebug, "%s%s",
327  prefix, Trim(msg, true /* trim_newline */).c_str());
328  return 0;
329 }
330 #endif
331 
332 //------------------------------------------------------------------------------
333 
334 
335 const int DownloadManager::kProbeUnprobed = -1;
336 const int DownloadManager::kProbeDown = -2;
337 const int DownloadManager::kProbeGeo = -3;
338 
339 bool DownloadManager::EscapeUrlChar(unsigned char input, char output[3]) {
340  if (((input >= '0') && (input <= '9')) ||
341  ((input >= 'A') && (input <= 'Z')) ||
342  ((input >= 'a') && (input <= 'z')) ||
343  (input == '/') || (input == ':') || (input == '.') ||
344  (input == '@') ||
345  (input == '+') || (input == '-') ||
346  (input == '_') || (input == '~') ||
347  (input == '[') || (input == ']') || (input == ','))
348  {
349  output[0] = static_cast<char>(input);
350  return false;
351  }
352 
353  output[0] = '%';
354  output[1] = static_cast<char>(
355  (input / 16) + ((input / 16 <= 9) ? '0' : 'A'-10));
356  output[2] = static_cast<char>(
357  (input % 16) + ((input % 16 <= 9) ? '0' : 'A'-10));
358  return true;
359 }
360 
361 
366 string DownloadManager::EscapeUrl(const string &url) {
367  string escaped;
368  escaped.reserve(url.length());
369 
370  char escaped_char[3];
371  for (unsigned i = 0, s = url.length(); i < s; ++i) {
372  if (EscapeUrlChar(url[i], escaped_char)) {
373  escaped.append(escaped_char, 3);
374  } else {
375  escaped.push_back(escaped_char[0]);
376  }
377  }
378  LogCvmfs(kLogDownload, kLogDebug, "escaped %s to %s",
379  url.c_str(), escaped.c_str());
380 
381  return escaped;
382 }
383 
388 unsigned DownloadManager::EscapeHeader(const string &header,
389  char *escaped_buf,
390  size_t buf_size)
391 {
392  unsigned esc_pos = 0;
393  char escaped_char[3];
394  for (unsigned i = 0, s = header.size(); i < s; ++i) {
395  if (EscapeUrlChar(header[i], escaped_char)) {
396  for (unsigned j = 0; j < 3; ++j) {
397  if (escaped_buf) {
398  if (esc_pos >= buf_size)
399  return esc_pos;
400  escaped_buf[esc_pos] = escaped_char[j];
401  }
402  esc_pos++;
403  }
404  } else {
405  if (escaped_buf) {
406  if (esc_pos >= buf_size)
407  return esc_pos;
408  escaped_buf[esc_pos] = escaped_char[0];
409  }
410  esc_pos++;
411  }
412  }
413 
414  return esc_pos;
415 }
416 
420 int DownloadManager::ParseHttpCode(const char digits[3]) {
421  int result = 0;
422  int factor = 100;
423  for (int i = 0; i < 3; ++i) {
424  if ((digits[i] < '0') || (digits[i] > '9'))
425  return -1;
426  result += (digits[i] - '0') * factor;
427  factor /= 10;
428  }
429  return result;
430 }
431 
432 
436 int DownloadManager::CallbackCurlSocket(CURL * /* easy */,
437  curl_socket_t s,
438  int action,
439  void *userp,
440  void * /* socketp */)
441 {
442  // LogCvmfs(kLogDownload, kLogDebug, "CallbackCurlSocket called with easy "
443  // "handle %p, socket %d, action %d", easy, s, action);
444  DownloadManager *download_mgr = static_cast<DownloadManager *>(userp);
445  if (action == CURL_POLL_NONE)
446  return 0;
447 
448  // Find s in watch_fds_
449  unsigned index;
450 
451  // TODO(heretherebedragons) why start at index = 0 and not 2?
452  // fd[0] and fd[1] are fixed?
453  for (index = 0; index < download_mgr->watch_fds_inuse_; ++index) {
454  if (download_mgr->watch_fds_[index].fd == s)
455  break;
456  }
457  // Or create newly
458  if (index == download_mgr->watch_fds_inuse_) {
459  // Extend array if necessary
460  if (download_mgr->watch_fds_inuse_ == download_mgr->watch_fds_size_)
461  {
462  assert(download_mgr->watch_fds_size_ > 0);
463  download_mgr->watch_fds_size_ *= 2;
464  download_mgr->watch_fds_ = static_cast<struct pollfd *>(
465  srealloc(download_mgr->watch_fds_,
466  download_mgr->watch_fds_size_ * sizeof(struct pollfd)));
467  }
468  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].fd = s;
469  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].events = 0;
470  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].revents = 0;
471  download_mgr->watch_fds_inuse_++;
472  }
473 
474  switch (action) {
475  case CURL_POLL_IN:
476  download_mgr->watch_fds_[index].events = POLLIN | POLLPRI;
477  break;
478  case CURL_POLL_OUT:
479  download_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND;
480  break;
481  case CURL_POLL_INOUT:
482  download_mgr->watch_fds_[index].events =
483  POLLIN | POLLPRI | POLLOUT | POLLWRBAND;
484  break;
485  case CURL_POLL_REMOVE:
486  if (index < download_mgr->watch_fds_inuse_-1) {
487  download_mgr->watch_fds_[index] =
488  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_-1];
489  }
490  download_mgr->watch_fds_inuse_--;
491  // Shrink array if necessary
492  if ((download_mgr->watch_fds_inuse_ > download_mgr->watch_fds_max_) &&
493  (download_mgr->watch_fds_inuse_ < download_mgr->watch_fds_size_/2))
494  {
495  download_mgr->watch_fds_size_ /= 2;
496  // LogCvmfs(kLogDownload, kLogDebug, "shrinking watch_fds_ (%d)",
497  // watch_fds_size_);
498  download_mgr->watch_fds_ = static_cast<struct pollfd *>(
499  srealloc(download_mgr->watch_fds_,
500  download_mgr->watch_fds_size_*sizeof(struct pollfd)));
501  // LogCvmfs(kLogDownload, kLogDebug, "shrinking watch_fds_ done",
502  // watch_fds_size_);
503  }
504  break;
505  default:
506  break;
507  }
508 
509  return 0;
510 }
511 
512 
516 void *DownloadManager::MainDownload(void *data) {
517  LogCvmfs(kLogDownload, kLogDebug, "download I/O thread started");
518  DownloadManager *download_mgr = static_cast<DownloadManager *>(data);
519 
520  const int kIdxPipeTerminate = 0;
521  const int kIdxPipeJobs = 1;
522 
523  download_mgr->watch_fds_ =
524  static_cast<struct pollfd *>(smalloc(2 * sizeof(struct pollfd)));
525  download_mgr->watch_fds_size_ = 2;
526  download_mgr->watch_fds_[kIdxPipeTerminate].fd =
527  download_mgr->pipe_terminate_->GetReadFd();
528  download_mgr->watch_fds_[kIdxPipeTerminate].events = POLLIN | POLLPRI;
529  download_mgr->watch_fds_[kIdxPipeTerminate].revents = 0;
530  download_mgr->watch_fds_[kIdxPipeJobs].fd =
531  download_mgr->pipe_jobs_->GetReadFd();
532  download_mgr->watch_fds_[kIdxPipeJobs].events = POLLIN | POLLPRI;
533  download_mgr->watch_fds_[kIdxPipeJobs].revents = 0;
534  download_mgr->watch_fds_inuse_ = 2;
535 
536  int still_running = 0;
537  struct timeval timeval_start, timeval_stop;
538  gettimeofday(&timeval_start, NULL);
539  while (true) {
540  int timeout;
541  if (still_running) {
542  /* NOTE: The following might degrade the performance for many small files
543  * use case. TODO(jblomer): look into it.
544  // Specify a timeout for polling in ms; this allows us to return
545  // to libcurl once a second so it can look for internal operations
546  // which timed out. libcurl has a more elaborate mechanism
547  // (CURLMOPT_TIMERFUNCTION) that would inform us of the next potential
548  // timeout. TODO(bbockelm) we should switch to that in the future.
549  timeout = 100;
550  */
551  timeout = 1;
552  } else {
553  timeout = -1;
554  gettimeofday(&timeval_stop, NULL);
555  int64_t delta = static_cast<int64_t>(
556  1000 * DiffTimeSeconds(timeval_start, timeval_stop));
557  perf::Xadd(download_mgr->counters_->sz_transfer_time, delta);
558  }
559  int retval = poll(download_mgr->watch_fds_, download_mgr->watch_fds_inuse_,
560  timeout);
561  if (retval < 0) {
562  continue;
563  }
564 
565  // Handle timeout
566  if (retval == 0) {
567  curl_multi_socket_action(download_mgr->curl_multi_,
568  CURL_SOCKET_TIMEOUT,
569  0,
570  &still_running);
571  }
572 
573  // Terminate I/O thread
574  if (download_mgr->watch_fds_[kIdxPipeTerminate].revents)
575  break;
576 
577  // New job arrives
578  if (download_mgr->watch_fds_[kIdxPipeJobs].revents) {
579  download_mgr->watch_fds_[kIdxPipeJobs].revents = 0;
580  JobInfo *info;
581  download_mgr->pipe_jobs_->Read<JobInfo*>(&info);
582  if (!still_running) {
583  gettimeofday(&timeval_start, NULL);
584  }
585  CURL *handle = download_mgr->AcquireCurlHandle();
586  download_mgr->InitializeRequest(info, handle);
587  download_mgr->SetUrlOptions(info);
588  curl_multi_add_handle(download_mgr->curl_multi_, handle);
589  curl_multi_socket_action(download_mgr->curl_multi_,
590  CURL_SOCKET_TIMEOUT,
591  0,
592  &still_running);
593  }
594 
595  // Activity on curl sockets
596  // Within this loop the curl_multi_socket_action() may cause socket(s)
597  // to be removed from watch_fds_. If a socket is removed it is replaced
598  // by the socket at the end of the array and the inuse count is decreased.
599  // Therefore loop over the array in reverse order.
600  for (int64_t i = download_mgr->watch_fds_inuse_-1; i >= 2; --i) {
601  if (i >= download_mgr->watch_fds_inuse_) {
602  continue;
603  }
604  if (download_mgr->watch_fds_[i].revents) {
605  int ev_bitmask = 0;
606  if (download_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI))
607  ev_bitmask |= CURL_CSELECT_IN;
608  if (download_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
609  ev_bitmask |= CURL_CSELECT_OUT;
610  if (download_mgr->watch_fds_[i].revents &
611  (POLLERR | POLLHUP | POLLNVAL))
612  {
613  ev_bitmask |= CURL_CSELECT_ERR;
614  }
615  download_mgr->watch_fds_[i].revents = 0;
616 
617  curl_multi_socket_action(download_mgr->curl_multi_,
618  download_mgr->watch_fds_[i].fd,
619  ev_bitmask,
620  &still_running);
621  }
622  }
623 
624  // Check if transfers are completed
625  CURLMsg *curl_msg;
626  int msgs_in_queue;
627  while ((curl_msg = curl_multi_info_read(download_mgr->curl_multi_,
628  &msgs_in_queue)))
629  {
630  if (curl_msg->msg == CURLMSG_DONE) {
631  perf::Inc(download_mgr->counters_->n_requests);
632  JobInfo *info;
633  CURL *easy_handle = curl_msg->easy_handle;
634  int curl_error = curl_msg->data.result;
635  curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
636 
637  curl_multi_remove_handle(download_mgr->curl_multi_, easy_handle);
638  if (download_mgr->VerifyAndFinalize(curl_error, info)) {
639  curl_multi_add_handle(download_mgr->curl_multi_, easy_handle);
640  curl_multi_socket_action(download_mgr->curl_multi_,
641  CURL_SOCKET_TIMEOUT,
642  0,
643  &still_running);
644  } else {
645  // Return easy handle into pool and write result back
646  download_mgr->ReleaseCurlHandle(easy_handle);
647 
648  info->GetPipeJobResultWeakRef()->
649  Write<download::Failures>(info->error_code());
650  }
651  }
652  }
653  }
654 
655  for (set<CURL *>::iterator i = download_mgr->pool_handles_inuse_->begin(),
656  iEnd = download_mgr->pool_handles_inuse_->end(); i != iEnd; ++i)
657  {
658  curl_multi_remove_handle(download_mgr->curl_multi_, *i);
659  curl_easy_cleanup(*i);
660  }
661  download_mgr->pool_handles_inuse_->clear();
662  free(download_mgr->watch_fds_);
663 
664  LogCvmfs(kLogDownload, kLogDebug, "download I/O thread terminated");
665  return NULL;
666 }
667 
668 
669 //------------------------------------------------------------------------------
670 
671 
672 HeaderLists::~HeaderLists() {
673  for (unsigned i = 0; i < blocks_.size(); ++i) {
674  delete[] blocks_[i];
675  }
676  blocks_.clear();
677 }
678 
679 
680 curl_slist *HeaderLists::GetList(const char *header) {
681  return Get(header);
682 }
683 
684 
685 curl_slist *HeaderLists::DuplicateList(curl_slist *slist) {
686  assert(slist);
687  curl_slist *copy = GetList(slist->data);
688  copy->next = slist->next;
689  curl_slist *prev = copy;
690  slist = slist->next;
691  while (slist) {
692  curl_slist *new_link = Get(slist->data);
693  new_link->next = slist->next;
694  prev->next = new_link;
695  prev = new_link;
696  slist = slist->next;
697  }
698  return copy;
699 }
700 
701 
702 void HeaderLists::AppendHeader(curl_slist *slist, const char *header) {
703  assert(slist);
704  curl_slist *new_link = Get(header);
705  new_link->next = NULL;
706 
707  while (slist->next)
708  slist = slist->next;
709  slist->next = new_link;
710 }
711 
712 
718 void HeaderLists::CutHeader(const char *header, curl_slist **slist) {
719  assert(slist);
720  curl_slist head;
721  head.next = *slist;
722  curl_slist *prev = &head;
723  curl_slist *rover = *slist;
724  while (rover) {
725  if (strcmp(rover->data, header) == 0) {
726  prev->next = rover->next;
727  Put(rover);
728  rover = prev;
729  }
730  prev = rover;
731  rover = rover->next;
732  }
733  *slist = head.next;
734 }
735 
736 
737 void HeaderLists::PutList(curl_slist *slist) {
738  while (slist) {
739  curl_slist *next = slist->next;
740  Put(slist);
741  slist = next;
742  }
743 }
744 
745 
746 string HeaderLists::Print(curl_slist *slist) {
747  string verbose;
748  while (slist) {
749  verbose += string(slist->data) + "\n";
750  slist = slist->next;
751  }
752  return verbose;
753 }
754 
755 
756 curl_slist *HeaderLists::Get(const char *header) {
757  for (unsigned i = 0; i < blocks_.size(); ++i) {
758  for (unsigned j = 0; j < kBlockSize; ++j) {
759  if (!IsUsed(&(blocks_[i][j]))) {
760  blocks_[i][j].data = const_cast<char *>(header);
761  return &(blocks_[i][j]);
762  }
763  }
764  }
765 
766  // All used, new block
767  AddBlock();
768  blocks_[blocks_.size()-1][0].data = const_cast<char *>(header);
769  return &(blocks_[blocks_.size()-1][0]);
770 }
771 
772 
773 void HeaderLists::Put(curl_slist *slist) {
774  slist->data = NULL;
775  slist->next = NULL;
776 }
777 
778 
779 void HeaderLists::AddBlock() {
780  curl_slist *new_block = new curl_slist[kBlockSize];
781  for (unsigned i = 0; i < kBlockSize; ++i) {
782  Put(&new_block[i]);
783  }
784  blocks_.push_back(new_block);
785 }
786 
787 
788 //------------------------------------------------------------------------------
789 
790 
791 string DownloadManager::ProxyInfo::Print() {
792  if (url == "DIRECT")
793  return url;
794 
795  string result = url;
796  int remaining =
797  static_cast<int>(host.deadline()) - static_cast<int>(time(NULL));
798  string expinfo = (remaining >= 0) ? "+" : "";
799  if (abs(remaining) >= 3600) {
800  expinfo += StringifyInt(remaining/3600) + "h";
801  } else if (abs(remaining) >= 60) {
802  expinfo += StringifyInt(remaining/60) + "m";
803  } else {
804  expinfo += StringifyInt(remaining) + "s";
805  }
806  if (host.status() == dns::kFailOk) {
807  result += " (" + host.name() + ", " + expinfo + ")";
808  } else {
809  result += " (:unresolved:, " + expinfo + ")";
810  }
811  return result;
812 }
813 
814 
819 CURL *DownloadManager::AcquireCurlHandle() {
820  CURL *handle;
821 
822  if (pool_handles_idle_->empty()) {
823  // Create a new handle
824  handle = curl_easy_init();
825  assert(handle != NULL);
826 
827  curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
828  // curl_easy_setopt(curl_default, CURLOPT_FAILONERROR, 1);
829  curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION, CallbackCurlHeader);
830  curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlData);
831  } else {
832  handle = *(pool_handles_idle_->begin());
833  pool_handles_idle_->erase(pool_handles_idle_->begin());
834  }
835 
836  pool_handles_inuse_->insert(handle);
837 
838  return handle;
839 }
840 
841 
842 void DownloadManager::ReleaseCurlHandle(CURL *handle) {
843  set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
844  assert(elem != pool_handles_inuse_->end());
845 
846  if (pool_handles_idle_->size() > pool_max_handles_) {
847  curl_easy_cleanup(*elem);
848  } else {
849  pool_handles_idle_->insert(*elem);
850  }
851 
852  pool_handles_inuse_->erase(elem);
853 }
854 
855 
860 void DownloadManager::InitializeRequest(JobInfo *info, CURL *handle) {
861  // Initialize internal download state
862  info->SetCurlHandle(handle);
863  info->SetErrorCode(kFailOk);
864  info->SetHttpCode(-1);
865  info->SetFollowRedirects(follow_redirects_);
866  info->SetNumUsedProxies(1);
867  info->SetNumUsedHosts(1);
868  info->SetNumRetries(0);
869  info->SetBackoffMs(0);
870  info->SetHeaders(header_lists_->DuplicateList(default_headers_));
871  if (info->info_header()) {
872  header_lists_->AppendHeader(info->headers(), info->info_header());
873  }
874  if (enable_http_tracing_) {
875  for (unsigned int i = 0; i < http_tracing_headers_.size(); i++) {
876  header_lists_->AppendHeader(info->headers(),
877  (http_tracing_headers_)[i].c_str());
878  }
879 
880  header_lists_->AppendHeader(info->headers(), info->tracing_header_pid());
881  header_lists_->AppendHeader(info->headers(), info->tracing_header_gid());
882  header_lists_->AppendHeader(info->headers(), info->tracing_header_uid());
883 
884  LogCvmfs(kLogDownload, kLogDebug, "CURL Header for URL: %s is:\n %s",
885  info->url()->c_str(), header_lists_->Print(info->headers()).c_str());
886  }
887 
888  if (info->force_nocache()) {
889  SetNocache(info);
890  } else {
891  info->SetNocache(false);
892  }
893  if (info->compressed()) {
895  }
896  if (info->expected_hash()) {
897  assert(info->hash_context().buffer != NULL);
898  shash::Init(info->hash_context());
899  }
900 
901  if ((info->range_offset() != -1) && (info->range_size())) {
902  char byte_range_array[100];
903  const int64_t range_lower = static_cast<int64_t>(info->range_offset());
904  const int64_t range_upper = static_cast<int64_t>(
905  info->range_offset() + info->range_size() - 1);
906  if (snprintf(byte_range_array, sizeof(byte_range_array),
907  "%" PRId64 "-%" PRId64,
908  range_lower, range_upper) == 100)
909  {
910  PANIC(NULL); // Should be impossible given limits on offset size.
911  }
912  curl_easy_setopt(handle, CURLOPT_RANGE, byte_range_array);
913  } else {
914  curl_easy_setopt(handle, CURLOPT_RANGE, NULL);
915  }
916 
917  // Set curl parameters
918  curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
919  curl_easy_setopt(handle, CURLOPT_WRITEHEADER,
920  static_cast<void *>(info));
921  curl_easy_setopt(handle, CURLOPT_WRITEDATA, static_cast<void *>(info));
922  curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->headers());
923  if (info->head_request()) {
924  curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
925  } else {
926  curl_easy_setopt(handle, CURLOPT_HTTPGET, 1);
927  }
928  if (opt_ipv4_only_) {
929  curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
930  }
931  if (follow_redirects_) {
932  curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1);
933  curl_easy_setopt(handle, CURLOPT_MAXREDIRS, 4);
934  }
935 #ifdef DEBUGMSG
936  curl_easy_setopt(handle, CURLOPT_VERBOSE, 1);
937  curl_easy_setopt(handle, CURLOPT_DEBUGFUNCTION, CallbackCurlDebug);
938 #endif
939 }
940 
941 
946 void DownloadManager::SetUrlOptions(JobInfo *info) {
947  CURL *curl_handle = info->curl_handle();
948  string url_prefix;
949 
950  MutexLockGuard m(lock_options_);
951 
952  // sharding policy
953  if (sharding_policy_.UseCount() > 0) {
954  if (info->proxy() != "") {
955  // proxy already set, so this is a failover event
956  perf::Inc(counters_->n_proxy_failover);
957  }
958  info->SetProxy(sharding_policy_->GetNextProxy(info->url(), info->proxy(),
959  info->range_offset() == -1 ? 0 : info->range_offset()));
960 
961  curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY, info->proxy().c_str());
962  } else { // no sharding policy
963  // Check if proxy group needs to be reset from backup to primary
964  if (opt_timestamp_backup_proxies_ > 0) {
965  const time_t now = time(NULL);
966  if (static_cast<int64_t>(now) >
967  static_cast<int64_t>(opt_timestamp_backup_proxies_ +
968  opt_proxy_groups_reset_after_))
969  {
970  opt_proxy_groups_current_ = 0;
971  opt_timestamp_backup_proxies_ = 0;
972  RebalanceProxiesUnlocked("reset proxy group");
973  }
974  }
975  // Check if load-balanced proxies within the group need to be reset
976  if (opt_timestamp_failover_proxies_ > 0) {
977  const time_t now = time(NULL);
978  if (static_cast<int64_t>(now) >
979  static_cast<int64_t>(opt_timestamp_failover_proxies_ +
980  opt_proxy_groups_reset_after_))
981  {
982  RebalanceProxiesUnlocked("reset load-balanced proxies");
983  }
984  }
985  // Check if host needs to be reset
986  if (opt_timestamp_backup_host_ > 0) {
987  const time_t now = time(NULL);
988  if (static_cast<int64_t>(now) >
989  static_cast<int64_t>(opt_timestamp_backup_host_ +
990  opt_host_reset_after_))
991  {
993  "switching host from %s to %s (reset host)",
994  (*opt_host_chain_)[opt_host_chain_current_].c_str(),
995  (*opt_host_chain_)[0].c_str());
996  opt_host_chain_current_ = 0;
997  opt_timestamp_backup_host_ = 0;
998  }
999  }
1000 
1001  ProxyInfo *proxy = ChooseProxyUnlocked(info->expected_hash());
1002  if (!proxy || (proxy->url == "DIRECT")) {
1003  info->SetProxy("DIRECT");
1004  curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY, "");
1005  } else {
1006  // Note: inside ValidateProxyIpsUnlocked() we may change the proxy data
1007  // structure, so we must not pass proxy->... (== current_proxy())
1008  // parameters directly
1009  std::string purl = proxy->url;
1010  dns::Host phost = proxy->host;
1011  const bool changed = ValidateProxyIpsUnlocked(purl, phost);
1012  // Current proxy may have changed
1013  if (changed) {
1014  proxy = ChooseProxyUnlocked(info->expected_hash());
1015  }
1016  info->SetProxy(proxy->url);
1017  if (proxy->host.status() == dns::kFailOk) {
1018  curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY,
1019  info->proxy().c_str());
1020  } else {
1021  // We know it can't work, don't even try to download
1022  curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY, "0.0.0.0");
1023  }
1024  }
1025  } // end !sharding
1026 
1027  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, opt_low_speed_limit_);
1028  if (info->proxy() != "DIRECT") {
1029  curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_proxy_);
1030  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_proxy_);
1031  } else {
1032  curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_direct_);
1033  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_direct_);
1034  }
1035  if (!opt_dns_server_.empty())
1036  curl_easy_setopt(curl_handle, CURLOPT_DNS_SERVERS, opt_dns_server_.c_str());
1037 
1038  if (info->probe_hosts() && opt_host_chain_) {
1039  url_prefix = (*opt_host_chain_)[opt_host_chain_current_];
1040  info->SetCurrentHostChainIndex(opt_host_chain_current_);
1041  }
1042 
1043  string url = url_prefix + *(info->url());
1044 
1045  curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 1L);
1046  if (url.substr(0, 5) == "https") {
1047  bool rvb = ssl_certificate_store_.ApplySslCertificatePath(curl_handle);
1048  if (!rvb) {
1050  "Failed to set SSL certificate path %s",
1051  ssl_certificate_store_.GetCaPath().c_str());
1052  }
1053  if (info->pid() != -1) {
1054  if (credentials_attachment_ == NULL) {
1056  "uses secure downloads but no credentials attachment set");
1057  } else {
1058  bool retval = credentials_attachment_->ConfigureCurlHandle(
1059  curl_handle, info->pid(), info->GetCredDataPtr());
1060  if (!retval) {
1061  LogCvmfs(kLogDownload, kLogDebug, "failed attaching credentials");
1062  }
1063  }
1064  }
1065  // The download manager disables signal handling in the curl library;
1066  // as OpenSSL's implementation of TLS will generate a sigpipe in some
1067  // error paths, we must explicitly disable SIGPIPE here.
1068  // TODO(jblomer): it should be enough to do this once
1069  signal(SIGPIPE, SIG_IGN);
1070  }
1071 
1072  if (url.find("@proxy@") != string::npos) {
1073  // This is used in Geo-API requests (only), to replace a portion of the
1074  // URL with the current proxy name for the sake of caching the result.
1075  // Replace the @proxy@ either with a passed in "forced" template (which
1076  // is set from $CVMFS_PROXY_TEMPLATE) if there is one, or a "direct"
1077  // template (which is the uuid) if there's no proxy, or the name of the
1078  // proxy.
1079  string replacement;
1080  if (proxy_template_forced_ != "") {
1081  replacement = proxy_template_forced_;
1082  } else if (info->proxy() == "DIRECT") {
1083  replacement = proxy_template_direct_;
1084  } else {
1085  if (opt_proxy_groups_current_ >= opt_proxy_groups_fallback_) {
1086  // It doesn't make sense to use the fallback proxies in Geo-API requests
1087  // since the fallback proxies are supposed to get sorted, too.
1088  info->SetProxy("DIRECT");
1089  curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY, "");
1090  replacement = proxy_template_direct_;
1091  } else {
1092  replacement = ChooseProxyUnlocked(info->expected_hash())->host.name();
1093  }
1094  }
1095  replacement = (replacement == "") ? proxy_template_direct_ : replacement;
1096  LogCvmfs(kLogDownload, kLogDebug, "replacing @proxy@ by %s",
1097  replacement.c_str());
1098  url = ReplaceAll(url, "@proxy@", replacement);
1099  }
1100 
1101  // TODO(heretherebedragons) before removing
1102  // static_cast<cvmfs::MemSink*>(info->sink)->size() == 0
1103  // and just always call info->sink->Reserve()
1104  // we should do a speed check
1105  if ((info->sink() != NULL) && info->sink()->RequiresReserve() &&
1106  (static_cast<cvmfs::MemSink*>(info->sink())->size() == 0) &&
1107  HasPrefix(url, "file://", false)) {
1108  platform_stat64 stat_buf;
1109  int retval = platform_stat(url.c_str(), &stat_buf);
1110  if (retval != 0) {
1111  // this is an error: file does not exist or out of memory
1112  // error is caught in other code section.
1113  info->sink()->Reserve(64ul * 1024ul);
1114  } else {
1115  info->sink()->Reserve(stat_buf.st_size);
1116  }
1117  }
1118 
1119  curl_easy_setopt(curl_handle, CURLOPT_URL, EscapeUrl(url).c_str());
1120 }
1121 
1122 
1132 bool DownloadManager::ValidateProxyIpsUnlocked(
1133  const string &url,
1134  const dns::Host &host)
1135 {
1136  if (!host.IsExpired())
1137  return false;
1138  LogCvmfs(kLogDownload, kLogDebug, "validate DNS entry for %s",
1139  host.name().c_str());
1140 
1141  unsigned group_idx = opt_proxy_groups_current_;
1142  dns::Host new_host = resolver_->Resolve(host.name());
1143 
1144  bool update_only = true; // No changes to the list of IP addresses.
1145  if (new_host.status() != dns::kFailOk) {
1146  // Try again later in case resolving fails.
1148  "failed to resolve IP addresses for %s (%d - %s)",
1149  host.name().c_str(), new_host.status(),
1150  dns::Code2Ascii(new_host.status()));
1151  new_host = dns::Host::ExtendDeadline(host, resolver_->min_ttl());
1152  } else if (!host.IsEquivalent(new_host)) {
1153  update_only = false;
1154  }
1155 
1156  if (update_only) {
1157  for (unsigned i = 0; i < (*opt_proxy_groups_)[group_idx].size(); ++i) {
1158  if ((*opt_proxy_groups_)[group_idx][i].host.id() == host.id())
1159  (*opt_proxy_groups_)[group_idx][i].host = new_host;
1160  }
1161  return false;
1162  }
1163 
1164  assert(new_host.status() == dns::kFailOk);
1165 
1166  // Remove old host objects, insert new objects, and rebalance.
1168  "DNS entries for proxy %s changed, adjusting", host.name().c_str());
1169  vector<ProxyInfo> *group = current_proxy_group();
1170  opt_num_proxies_ -= group->size();
1171  for (unsigned i = 0; i < group->size(); ) {
1172  if ((*group)[i].host.id() == host.id()) {
1173  group->erase(group->begin() + i);
1174  } else {
1175  i++;
1176  }
1177  }
1178  vector<ProxyInfo> new_infos;
1179  set<string> best_addresses = new_host.ViewBestAddresses(opt_ip_preference_);
1180  set<string>::const_iterator iter_ips = best_addresses.begin();
1181  for (; iter_ips != best_addresses.end(); ++iter_ips) {
1182  string url_ip = dns::RewriteUrl(url, *iter_ips);
1183  new_infos.push_back(ProxyInfo(new_host, url_ip));
1184  }
1185  group->insert(group->end(), new_infos.begin(), new_infos.end());
1186  opt_num_proxies_ += new_infos.size();
1187 
1188  RebalanceProxiesUnlocked("DNS change");
1189  return true;
1190 }
1191 
1192 
1196 void DownloadManager::UpdateStatistics(CURL *handle) {
1197  double val;
1198  int retval;
1199  int64_t sum = 0;
1200 
1201  retval = curl_easy_getinfo(handle, CURLINFO_SIZE_DOWNLOAD, &val);
1202  assert(retval == CURLE_OK);
1203  sum += static_cast<int64_t>(val);
1204  /*retval = curl_easy_getinfo(handle, CURLINFO_HEADER_SIZE, &val);
1205  assert(retval == CURLE_OK);
1206  sum += static_cast<int64_t>(val);*/
1207  perf::Xadd(counters_->sz_transferred_bytes, sum);
1208 }
1209 
1210 
1214 bool DownloadManager::CanRetry(const JobInfo *info) {
1215  MutexLockGuard m(lock_options_);
1216  unsigned max_retries = opt_max_retries_;
1217 
1218  return !(info->nocache()) && (info->num_retries() < max_retries) &&
1219  (IsProxyTransferError(info->error_code()) ||
1220  IsHostTransferError(info->error_code()));
1221 }
1222 
1230 void DownloadManager::Backoff(JobInfo *info) {
1231  unsigned backoff_init_ms = 0;
1232  unsigned backoff_max_ms = 0;
1233  {
1234  MutexLockGuard m(lock_options_);
1235  backoff_init_ms = opt_backoff_init_ms_;
1236  backoff_max_ms = opt_backoff_max_ms_;
1237  }
1238 
1239  info->SetNumRetries(info->num_retries() + 1);
1240  perf::Inc(counters_->n_retries);
1241  if (info->backoff_ms() == 0) {
1242  info->SetBackoffMs(prng_.Next(backoff_init_ms + 1)); // Must be != 0
1243  } else {
1244  info->SetBackoffMs(info->backoff_ms() * 2);
1245  }
1246  if (info->backoff_ms() > backoff_max_ms) {
1247  info->SetBackoffMs(backoff_max_ms);
1248  }
1249 
1250  LogCvmfs(kLogDownload, kLogDebug, "backing off for %d ms",
1251  info->backoff_ms());
1252  SafeSleepMs(info->backoff_ms());
1253 }
1254 
1255 void DownloadManager::SetNocache(JobInfo *info) {
1256  if (info->nocache())
1257  return;
1258  header_lists_->AppendHeader(info->headers(), "Pragma: no-cache");
1259  header_lists_->AppendHeader(info->headers(), "Cache-Control: no-cache");
1260  curl_easy_setopt(info->curl_handle(), CURLOPT_HTTPHEADER, info->headers());
1261  info->SetNocache(true);
1262 }
1263 
1264 
1269 void DownloadManager::SetRegularCache(JobInfo *info) {
1270  if (info->nocache() == false)
1271  return;
1272  header_lists_->CutHeader("Pragma: no-cache", info->GetHeadersPtr());
1273  header_lists_->CutHeader("Cache-Control: no-cache", info->GetHeadersPtr());
1274  curl_easy_setopt(info->curl_handle(), CURLOPT_HTTPHEADER, info->headers());
1275  info->SetNocache(false);
1276 }
1277 
1278 
1282 void DownloadManager::ReleaseCredential(JobInfo *info) {
1283  if (info->cred_data()) {
1284  assert(credentials_attachment_ != NULL); // Someone must have set it
1285  credentials_attachment_->ReleaseCurlHandle(info->curl_handle(),
1286  info->cred_data());
1287  info->SetCredData(NULL);
1288  }
1289 }
1290 
1291 
1298 bool DownloadManager::VerifyAndFinalize(const int curl_error, JobInfo *info) {
1300  "Verify downloaded url %s, proxy %s (curl error %d)",
1301  info->url()->c_str(), info->proxy().c_str(), curl_error);
1302  UpdateStatistics(info->curl_handle());
1303 
1304  // Verification and error classification
1305  switch (curl_error) {
1306  case CURLE_OK:
1307  // Verify content hash
1308  if (info->expected_hash()) {
1309  shash::Any match_hash;
1310  shash::Final(info->hash_context(), &match_hash);
1311  if (match_hash != *(info->expected_hash())) {
1312  if (ignore_signature_failures_) {
1314  "ignoring failed hash verification of %s "
1315  "(expected %s, got %s)",
1316  info->url()->c_str(),
1317  info->expected_hash()->ToString().c_str(),
1318  match_hash.ToString().c_str());
1319  } else {
1321  "hash verification of %s failed (expected %s, got %s)",
1322  info->url()->c_str(),
1323  info->expected_hash()->ToString().c_str(),
1324  match_hash.ToString().c_str());
1325  info->SetErrorCode(kFailBadData);
1326  break;
1327  }
1328  }
1329  }
1330 
1331  info->SetErrorCode(kFailOk);
1332  break;
1333  case CURLE_UNSUPPORTED_PROTOCOL:
1335  break;
1336  case CURLE_URL_MALFORMAT:
1337  info->SetErrorCode(kFailBadUrl);
1338  break;
1339  case CURLE_COULDNT_RESOLVE_PROXY:
1341  break;
1342  case CURLE_COULDNT_RESOLVE_HOST:
1344  break;
1345  case CURLE_OPERATION_TIMEDOUT:
1346  info->SetErrorCode((info->proxy() == "DIRECT") ?
1348  break;
1349  case CURLE_PARTIAL_FILE:
1350  case CURLE_GOT_NOTHING:
1351  case CURLE_RECV_ERROR:
1352  info->SetErrorCode((info->proxy() == "DIRECT") ?
1354  break;
1355  case CURLE_FILE_COULDNT_READ_FILE:
1356  case CURLE_COULDNT_CONNECT:
1357  if (info->proxy() != "DIRECT") {
1358  // This is a guess. Fail-over can still change to switching host
1360  } else {
1362  }
1363  break;
1364  case CURLE_TOO_MANY_REDIRECTS:
1366  break;
1367  case CURLE_SSL_CACERT_BADFILE:
1369  "Failed to load certificate bundle. "
1370  "X509_CERT_BUNDLE might point to the wrong location.");
1372  break;
1373  // As of curl 7.62.0, CURLE_SSL_CACERT is the same as
1374  // CURLE_PEER_FAILED_VERIFICATION
1375  case CURLE_PEER_FAILED_VERIFICATION:
1377  "invalid SSL certificate of remote host. "
1378  "X509_CERT_DIR and/or X509_CERT_BUNDLE might point to the wrong "
1379  "location.");
1381  break;
1382  case CURLE_ABORTED_BY_CALLBACK:
1383  case CURLE_WRITE_ERROR:
1384  // Error set by callback
1385  break;
1386  case CURLE_SEND_ERROR:
1387  // The curl error CURLE_SEND_ERROR can be seen when a cache is misbehaving
1388  // and closing connections before the http request send is completed.
1389  // Handle this error, treating it as a short transfer error.
1390  info->SetErrorCode((info->proxy() == "DIRECT") ?
1391  kFailHostShortTransfer : kFailProxyShortTransfer);
1392  break;
1393  default:
1394  LogCvmfs(kLogDownload, kLogSyslogErr, "unexpected curl error (%d) while "
1395  "trying to fetch %s", curl_error, info->url()->c_str());
1396  info->SetErrorCode(kFailOther);
1397  break;
1398  }
1399 
1400  std::vector<std::string> *host_chain = opt_host_chain_;
1401 
1402  // Determination if download should be repeated
1403  bool try_again = false;
1404  bool same_url_retry = CanRetry(info);
1405  if (info->error_code() != kFailOk) {
1406  MutexLockGuard m(lock_options_);
1407  if (info->error_code() == kFailBadData) {
1408  if (!info->nocache()) {
1409  try_again = true;
1410  } else {
1411  // Make it a host failure
1413  "data corruption with no-cache header, try another host");
1414 
1415  info->SetErrorCode(kFailHostHttp);
1416  }
1417  }
1418  if ( same_url_retry || (
1419  ( (info->error_code() == kFailHostResolve) ||
1420  IsHostTransferError(info->error_code()) ||
1421  (info->error_code() == kFailHostHttp)) &&
1422  info->probe_hosts() &&
1423  host_chain && (info->num_used_hosts() < host_chain->size()))
1424  )
1425  {
1426  try_again = true;
1427  }
1428  if ( same_url_retry || (
1429  ( (info->error_code() == kFailProxyResolve) ||
1430  IsProxyTransferError(info->error_code()) ||
1431  (info->error_code() == kFailProxyHttp)) )
1432  )
1433  {
1434  if (sharding_policy_.UseCount() > 0) { // sharding policy
1435  try_again = true;
1436  same_url_retry = false;
1437  } else { // no sharding
1438  try_again = true;
1439  // If all proxies failed, do a next round with the next host
1440  if (!same_url_retry && (info->num_used_proxies() >= opt_num_proxies_)) {
1441  // Check if this can be made a host fail-over
1442  if (info->probe_hosts() &&
1443  host_chain &&
1444  (info->num_used_hosts() < host_chain->size()))
1445  {
1446  // reset proxy group if not already performed by other handle
1447  if (opt_proxy_groups_) {
1448  if ((opt_proxy_groups_current_ > 0) ||
1449  (opt_proxy_groups_current_burned_ > 0))
1450  {
1451  opt_proxy_groups_current_ = 0;
1452  opt_timestamp_backup_proxies_ = 0;
1453  RebalanceProxiesUnlocked("reset proxies for host failover");
1454  }
1455  }
1456 
1457  // Make it a host failure
1458  LogCvmfs(kLogDownload, kLogDebug, "make it a host failure");
1459  info->SetNumUsedProxies(1);
1461  } else {
1462  if (failover_indefinitely_) {
1463  // Instead of giving up, reset the num_used_proxies counter,
1464  // switch proxy and try again
1466  "VerifyAndFinalize() would fail the download here. "
1467  "Instead switch proxy and retry download. "
1468  "info->probe_hosts=%d host_chain=%x info->num_used_hosts=%d "
1469  "host_chain->size()=%d same_url_retry=%d "
1470  "info->num_used_proxies=%d opt_num_proxies_=%d",
1471  static_cast<int>(info->probe_hosts()),
1472  host_chain, info->num_used_hosts(),
1473  host_chain ?
1474  host_chain->size() : -1, static_cast<int>(same_url_retry),
1475  info->num_used_proxies(), opt_num_proxies_);
1476  info->SetNumUsedProxies(1);
1477  RebalanceProxiesUnlocked("failover indefinitely");
1478  try_again = !Interrupted(fqrn_, info);
1479  } else {
1480  try_again = false;
1481  }
1482  }
1483  } // Make a proxy failure a host failure
1484  } // Proxy failure assumed
1485  } // end !sharding
1486  }
1487 
1488  if (try_again) {
1489  LogCvmfs(kLogDownload, kLogDebug, "Trying again on same curl handle, "
1490  "same url: %d, error code %d no-cache %d",
1491  same_url_retry, info->error_code(), info->nocache());
1492  // Reset internal state and destination
1493  if (info->sink() != NULL && info->sink()->Reset() != 0) {
1494  info->SetErrorCode(kFailLocalIO);
1495  goto verify_and_finalize_stop;
1496  }
1497  if (info->interrupt_cue() && info->interrupt_cue()->IsCanceled()) {
1498  info->SetErrorCode(kFailCanceled);
1499  goto verify_and_finalize_stop;
1500  }
1501 
1502  if (info->expected_hash()) {
1503  shash::Init(info->hash_context());
1504  }
1505  if (info->compressed()) {
1507  }
1508 
1509  if (sharding_policy_.UseCount() > 0) { // sharding policy
1510  ReleaseCredential(info);
1511  SetUrlOptions(info);
1512  } else { // no sharding policy
1513  SetRegularCache(info);
1514 
1515  // Failure handling
1516  bool switch_proxy = false;
1517  bool switch_host = false;
1518  switch (info->error_code()) {
1519  case kFailBadData:
1520  SetNocache(info);
1521  break;
1522  case kFailProxyResolve:
1523  case kFailProxyHttp:
1524  switch_proxy = true;
1525  break;
1526  case kFailHostResolve:
1527  case kFailHostHttp:
1528  case kFailHostAfterProxy:
1529  switch_host = true;
1530  break;
1531  default:
1532  if (IsProxyTransferError(info->error_code())) {
1533  if (same_url_retry) {
1534  Backoff(info);
1535  } else {
1536  switch_proxy = true;
1537  }
1538  } else if (IsHostTransferError(info->error_code())) {
1539  if (same_url_retry) {
1540  Backoff(info);
1541  } else {
1542  switch_host = true;
1543  }
1544  } else {
1545  // No other errors expected when retrying
1546  PANIC(NULL);
1547  }
1548  }
1549  if (switch_proxy) {
1550  ReleaseCredential(info);
1551  SwitchProxy(info);
1552  info->SetNumUsedProxies(info->num_used_proxies() + 1);
1553  SetUrlOptions(info);
1554  }
1555  if (switch_host) {
1556  ReleaseCredential(info);
1557  SwitchHost(info);
1558  info->SetNumUsedHosts(info->num_used_hosts() + 1);
1559  SetUrlOptions(info);
1560  }
1561  } // end !sharding
1562 
1563  if (failover_indefinitely_) {
1564  // try again, breaking if there's a cvmfs reload happening and we are in a
1565  // proxy failover. This will EIO the call application.
1566  return !Interrupted(fqrn_, info);
1567  }
1568  return true; // try again
1569  }
1570 
1571  verify_and_finalize_stop:
1572  // Finalize, flush destination file
1573  ReleaseCredential(info);
1574  if (info->sink() != NULL && info->sink()->Flush() != 0) {
1575  info->SetErrorCode(kFailLocalIO);
1576  }
1577 
1578  if (info->compressed())
1580 
1581  if (info->headers()) {
1582  header_lists_->PutList(info->headers());
1583  info->SetHeaders(NULL);
1584  }
1585 
1586  return false; // stop transfer and return to Fetch()
1587 }
1588 
1589 DownloadManager::~DownloadManager() {
1590  // cleaned up fini
1591  if (sharding_policy_.UseCount() > 0) {
1592  sharding_policy_.Reset();
1593  }
1594  if (health_check_.UseCount() > 0) {
1595  if (health_check_.Unique()) {
1596  LogCvmfs(kLogDownload, kLogDebug, "Stopping healthcheck thread");
1597  health_check_->StopHealthcheck();
1598  }
1599  health_check_.Reset();
1600  }
1601 
1602  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1603  // Shutdown I/O thread
1604  pipe_terminate_->Write(kPipeTerminateSignal);
1605  pthread_join(thread_download_, NULL);
1606  // All handles are removed from the multi stack
1607  pipe_terminate_.Destroy();
1608  pipe_jobs_.Destroy();
1609  }
1610 
1611  for (set<CURL *>::iterator i = pool_handles_idle_->begin(),
1612  iEnd = pool_handles_idle_->end(); i != iEnd; ++i)
1613  {
1614  curl_easy_cleanup(*i);
1615  }
1616 
1617  delete pool_handles_idle_;
1618  delete pool_handles_inuse_;
1619  curl_multi_cleanup(curl_multi_);
1620 
1621  delete header_lists_;
1622  if (user_agent_)
1623  free(user_agent_);
1624 
1625  delete counters_;
1626  delete opt_host_chain_;
1627  delete opt_host_chain_rtt_;
1628  delete opt_proxy_groups_;
1629 
1630  curl_global_cleanup();
1631  delete resolver_;
1632 
1633  // old destructor
1634  pthread_mutex_destroy(lock_options_);
1635  pthread_mutex_destroy(lock_synchronous_mode_);
1636  free(lock_options_);
1637  free(lock_synchronous_mode_);
1638 }
1639 
1640 void DownloadManager::InitHeaders() {
1641  // User-Agent
1642  string cernvm_id = "User-Agent: cvmfs ";
1643 #ifdef CVMFS_LIBCVMFS
1644  cernvm_id += "libcvmfs ";
1645 #else
1646  cernvm_id += "Fuse ";
1647 #endif
1648  cernvm_id += string(VERSION);
1649  if (getenv("CERNVM_UUID") != NULL) {
1650  cernvm_id += " " +
1651  sanitizer::InputSanitizer("az AZ 09 -").Filter(getenv("CERNVM_UUID"));
1652  }
1653  user_agent_ = strdup(cernvm_id.c_str());
1654 
1655  header_lists_ = new HeaderLists();
1656 
1657  default_headers_ = header_lists_->GetList("Connection: Keep-Alive");
1658  header_lists_->AppendHeader(default_headers_, "Pragma:");
1659  header_lists_->AppendHeader(default_headers_, user_agent_);
1660 }
1661 
1662 DownloadManager::DownloadManager(const unsigned max_pool_handles,
1663  const perf::StatisticsTemplate &statistics) :
1664  prng_(Prng()),
1665  pool_handles_idle_(new set<CURL *>),
1666  pool_handles_inuse_(new set<CURL *>),
1667  pool_max_handles_(max_pool_handles),
1668  pipe_terminate_(NULL),
1669  pipe_jobs_(NULL),
1670  watch_fds_(NULL),
1671  watch_fds_size_(0),
1672  watch_fds_inuse_(0),
1673  watch_fds_max_(4 * max_pool_handles),
1674  opt_timeout_proxy_(5),
1675  opt_timeout_direct_(10),
1676  opt_low_speed_limit_(1024),
1677  opt_max_retries_(0),
1678  opt_backoff_init_ms_(0),
1679  opt_backoff_max_ms_(0),
1680  enable_info_header_(false),
1681  opt_ipv4_only_(false),
1682  follow_redirects_(false),
1683  ignore_signature_failures_(false),
1684  enable_http_tracing_(false),
1685  opt_host_chain_(NULL),
1686  opt_host_chain_rtt_(NULL),
1687  opt_host_chain_current_(0),
1688  opt_proxy_groups_(NULL),
1689  opt_proxy_groups_current_(0),
1690  opt_proxy_groups_current_burned_(0),
1691  opt_proxy_groups_fallback_(0),
1692  opt_num_proxies_(0),
1693  opt_proxy_shard_(false),
1694  failover_indefinitely_(false),
1695  opt_ip_preference_(dns::kIpPreferSystem),
1696  opt_timestamp_backup_proxies_(0),
1697  opt_timestamp_failover_proxies_(0),
1698  opt_proxy_groups_reset_after_(0),
1699  opt_timestamp_backup_host_(0),
1700  opt_host_reset_after_(0),
1701  credentials_attachment_(NULL),
1702  counters_(new Counters(statistics))
1703 {
1704  atomic_init32(&multi_threaded_);
1705 
1706  lock_options_ =
1707  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1708  int retval = pthread_mutex_init(lock_options_, NULL);
1709  assert(retval == 0);
1711  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1712  retval = pthread_mutex_init(lock_synchronous_mode_, NULL);
1713  assert(retval == 0);
1714 
1715  retval = curl_global_init(CURL_GLOBAL_ALL);
1716  assert(retval == CURLE_OK);
1717 
1718  InitHeaders();
1719 
1720  curl_multi_ = curl_multi_init();
1721  assert(curl_multi_ != NULL);
1722  curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION, CallbackCurlSocket);
1723  curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1724  static_cast<void *>(this));
1725  curl_multi_setopt(curl_multi_, CURLMOPT_MAXCONNECTS, watch_fds_max_);
1726  curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1728 
1729  prng_.InitLocaltime();
1730 
1731  // Name resolving
1732  if ((getenv("CVMFS_IPV4_ONLY") != NULL) &&
1733  (strlen(getenv("CVMFS_IPV4_ONLY")) > 0))
1734  {
1735  opt_ipv4_only_ = true;
1736  }
1739  assert(resolver_);
1740 }
1741 
1749 
1750  int retval = pthread_create(&thread_download_, NULL, MainDownload,
1751  static_cast<void *>(this));
1752  assert(retval == 0);
1753 
1754  atomic_inc32(&multi_threaded_);
1755 
1756  if (health_check_.UseCount() > 0) {
1757  LogCvmfs(kLogDownload, kLogDebug, "Starting healthcheck thread");
1758  health_check_->StartHealthcheck();
1759  }
1760 }
1761 
1762 
1767  assert(info != NULL);
1768  assert(info->url() != NULL);
1769 
1770  Failures result;
1771  result = PrepareDownloadDestination(info);
1772  if (result != kFailOk)
1773  return result;
1774 
1775  if (info->expected_hash()) {
1778  info->GetHashContextPtr()->size = shash::GetContextSize(algorithm);
1779  info->GetHashContextPtr()->buffer = alloca(info->hash_context().size);
1780  }
1781 
1782  // Prepare cvmfs-info: header, allocate string on the stack
1783  info->SetInfoHeader(NULL);
1784  if (enable_info_header_ && info->extra_info()) {
1785  const char *header_name = "cvmfs-info: ";
1786  const size_t header_name_len = strlen(header_name);
1787  const unsigned header_size = 1 + header_name_len +
1788  EscapeHeader(*(info->extra_info()), NULL, 0);
1789  info->SetInfoHeader(static_cast<char *>(alloca(header_size)));
1790  memcpy(info->info_header(), header_name, header_name_len);
1791  EscapeHeader(*(info->extra_info()), info->info_header() + header_name_len,
1792  header_size - header_name_len);
1793  info->info_header()[header_size-1] = '\0';
1794  }
1795 
1796  if (enable_http_tracing_) {
1797  const std::string str_pid = "X-CVMFS-PID: " + StringifyInt(info->pid());
1798  const std::string str_gid = "X-CVMFS-GID: " + StringifyUint(info->gid());
1799  const std::string str_uid = "X-CVMFS-UID: " + StringifyUint(info->uid());
1800 
1801  // will be auto freed at the end of this function Fetch(JobInfo *info)
1802  info->SetTracingHeaderPid(static_cast<char *>(alloca(str_pid.size() + 1)));
1803  info->SetTracingHeaderGid(static_cast<char *>(alloca(str_gid.size() + 1)));
1804  info->SetTracingHeaderUid(static_cast<char *>(alloca(str_uid.size() + 1)));
1805 
1806  memcpy(info->tracing_header_pid(), str_pid.c_str(), str_pid.size() + 1);
1807  memcpy(info->tracing_header_gid(), str_gid.c_str(), str_gid.size() + 1);
1808  memcpy(info->tracing_header_uid(), str_uid.c_str(), str_uid.size() + 1);
1809  }
1810 
1811  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1812  if (!info->IsValidPipeJobResults()) {
1813  info->CreatePipeJobResults();
1814  }
1815 
1816  // LogCvmfs(kLogDownload, kLogDebug, "send job to thread, pipe %d %d",
1817  // info->wait_at[0], info->wait_at[1]);
1818  pipe_jobs_->Write<JobInfo*>(info);
1819  info->GetPipeJobResultWeakRef()->Read<download::Failures>(&result);
1820  // LogCvmfs(kLogDownload, kLogDebug, "got result %d", result);
1821  } else {
1823  CURL *handle = AcquireCurlHandle();
1824  InitializeRequest(info, handle);
1825  SetUrlOptions(info);
1826  // curl_easy_setopt(handle, CURLOPT_VERBOSE, 1);
1827  int retval;
1828  do {
1829  retval = curl_easy_perform(handle);
1831  double elapsed;
1832  if (curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &elapsed) == CURLE_OK)
1833  {
1835  static_cast<int64_t>(elapsed * 1000));
1836  }
1837  } while (VerifyAndFinalize(retval, info));
1838  result = info->error_code();
1839  ReleaseCurlHandle(info->curl_handle());
1840  }
1841 
1842  if (result != kFailOk) {
1843  LogCvmfs(kLogDownload, kLogDebug, "download failed (error %d - %s)", result,
1844  Code2Ascii(result));
1845 
1846  if (info->sink() != NULL) {
1847  info->sink()->Purge();
1848  }
1849  }
1850 
1851  return result;
1852 }
1853 
1854 
1862 }
1863 
1867 std::string DownloadManager::GetDnsServer() const {
1868  return opt_dns_server_;
1869 }
1870 
1875 void DownloadManager::SetDnsServer(const string &address) {
1876  if (!address.empty()) {
1878  opt_dns_server_ = address;
1879  assert(!opt_dns_server_.empty());
1880 
1881  vector<string> servers;
1882  servers.push_back(address);
1883  bool retval = resolver_->SetResolvers(servers);
1884  assert(retval);
1885  }
1886  LogCvmfs(kLogDownload, kLogSyslog, "set nameserver to %s", address.c_str());
1887 }
1888 
1889 
1894  const unsigned retries,
1895  const unsigned timeout_ms)
1896 {
1898  if ((resolver_->retries() == retries) &&
1899  (resolver_->timeout_ms() == timeout_ms))
1900  {
1901  return;
1902  }
1903  delete resolver_;
1904  resolver_ = NULL;
1905  resolver_ =
1906  dns::NormalResolver::Create(opt_ipv4_only_, retries, timeout_ms);
1907  assert(resolver_);
1908 }
1909 
1910 
1912  const unsigned min_seconds,
1913  const unsigned max_seconds)
1914 {
1916  resolver_->set_min_ttl(min_seconds);
1917  resolver_->set_max_ttl(max_seconds);
1918 }
1919 
1920 
1923  opt_ip_preference_ = preference;
1924 }
1925 
1926 
1932 void DownloadManager::SetTimeout(const unsigned seconds_proxy,
1933  const unsigned seconds_direct)
1934 {
1936  opt_timeout_proxy_ = seconds_proxy;
1937  opt_timeout_direct_ = seconds_direct;
1938 }
1939 
1940 
1946 void DownloadManager::SetLowSpeedLimit(const unsigned low_speed_limit) {
1948  opt_low_speed_limit_ = low_speed_limit;
1949 }
1950 
1951 
1955 void DownloadManager::GetTimeout(unsigned *seconds_proxy,
1956  unsigned *seconds_direct)
1957 {
1959  *seconds_proxy = opt_timeout_proxy_;
1960  *seconds_direct = opt_timeout_direct_;
1961 }
1962 
1963 
1968 void DownloadManager::SetHostChain(const string &host_list) {
1969  SetHostChain(SplitString(host_list, ';'));
1970 }
1971 
1972 
1973 void DownloadManager::SetHostChain(const std::vector<std::string> &host_list) {
1976  delete opt_host_chain_;
1977  delete opt_host_chain_rtt_;
1979 
1980  if (host_list.empty()) {
1981  opt_host_chain_ = NULL;
1982  opt_host_chain_rtt_ = NULL;
1983  return;
1984  }
1985 
1986  opt_host_chain_ = new vector<string>(host_list);
1988  new vector<int>(opt_host_chain_->size(), kProbeUnprobed);
1989  // LogCvmfs(kLogDownload, kLogSyslog, "using host %s",
1990  // (*opt_host_chain_)[0].c_str());
1991 }
1992 
1993 
1994 
1999 void DownloadManager::GetHostInfo(vector<string> *host_chain, vector<int> *rtt,
2000  unsigned *current_host)
2001 {
2003  if (opt_host_chain_) {
2004  if (current_host) {*current_host = opt_host_chain_current_;}
2005  if (host_chain) {*host_chain = *opt_host_chain_;}
2006  if (rtt) {*rtt = *opt_host_chain_rtt_;}
2007  }
2008 }
2009 
2010 
2021 
2022  if (!opt_proxy_groups_) {
2023  return;
2024  }
2025 
2026  // Fail any matching proxies within the current load-balancing group
2027  vector<ProxyInfo> *group = current_proxy_group();
2028  const unsigned group_size = group->size();
2029  unsigned failed = 0;
2030  for (unsigned i = 0; i < group_size - opt_proxy_groups_current_burned_; ++i) {
2031  if (info && (info->proxy() == (*group)[i].url)) {
2032  // Move to list of failed proxies
2033  opt_proxy_groups_current_burned_++;
2034  swap((*group)[i],
2035  (*group)[group_size - opt_proxy_groups_current_burned_]);
2037  failed++;
2038  }
2039  }
2040 
2041  // Do nothing more unless at least one proxy was marked as failed
2042  if (!failed)
2043  return;
2044 
2045  // If all proxies from the current load-balancing group are burned, switch to
2046  // another group
2047  if (opt_proxy_groups_current_burned_ == group->size()) {
2048  opt_proxy_groups_current_burned_ = 0;
2049  if (opt_proxy_groups_->size() > 1) {
2051  opt_proxy_groups_->size();
2052  // Remember the timestamp of switching to backup proxies
2054  if (opt_proxy_groups_current_ > 0) {
2056  opt_timestamp_backup_proxies_ = time(NULL);
2057  // LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2058  // "switched to (another) backup proxy group");
2059  } else {
2061  // LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2062  // "switched back to primary proxy group");
2063  }
2065  }
2066  }
2067  } else {
2068  // Record failover time
2071  opt_timestamp_failover_proxies_ = time(NULL);
2072  }
2073  }
2074 
2075  UpdateProxiesUnlocked("failed proxy");
2076  LogCvmfs(kLogDownload, kLogDebug, "%d proxies remain in group",
2077  current_proxy_group()->size() - opt_proxy_groups_current_burned_);
2078 }
2079 
2080 
2088 
2089  if (!opt_host_chain_ || (opt_host_chain_->size() == 1)) {
2090  return;
2091  }
2092 
2093  if (info && (info->current_host_chain_index() != opt_host_chain_current_)) {
2095  "don't switch host, "
2096  "last used host: %s, current host: %s",
2097  (*opt_host_chain_)[info->current_host_chain_index()].c_str(),
2098  (*opt_host_chain_)[opt_host_chain_current_].c_str());
2099  return;
2100  }
2101 
2102  string reason = "manually triggered";
2103  if (info) {
2104  reason = download::Code2Ascii(info->error_code());
2105  }
2106 
2107  string old_host = (*opt_host_chain_)[opt_host_chain_current_];
2109  (opt_host_chain_current_ + 1) % opt_host_chain_->size();
2112  "switching host from %s to %s (%s)", old_host.c_str(),
2113  (*opt_host_chain_)[opt_host_chain_current_].c_str(),
2114  reason.c_str());
2115 
2116  // Remember the timestamp of switching to backup host
2117  if (opt_host_reset_after_ > 0) {
2118  if (opt_host_chain_current_ != 0) {
2119  if (opt_timestamp_backup_host_ == 0)
2120  opt_timestamp_backup_host_ = time(NULL);
2121  } else {
2123  }
2124  }
2125 }
2126 
2128  SwitchHost(NULL);
2129 }
2130 
2131 
2139  vector<string> host_chain;
2140  vector<int> host_rtt;
2141  unsigned current_host;
2142 
2143  GetHostInfo(&host_chain, &host_rtt, &current_host);
2144 
2145  // Stopwatch, two times to fill caches first
2146  unsigned i, retries;
2147  string url;
2148 
2149  cvmfs::MemSink memsink;
2150  JobInfo info(&url, false, false, NULL, &memsink);
2151  for (retries = 0; retries < 2; ++retries) {
2152  for (i = 0; i < host_chain.size(); ++i) {
2153  url = host_chain[i] + "/.cvmfspublished";
2154 
2155  struct timeval tv_start, tv_end;
2156  gettimeofday(&tv_start, NULL);
2157  Failures result = Fetch(&info);
2158  gettimeofday(&tv_end, NULL);
2159  memsink.Reset();
2160  if (result == kFailOk) {
2161  host_rtt[i] = static_cast<int>(
2162  DiffTimeSeconds(tv_start, tv_end) * 1000);
2163  LogCvmfs(kLogDownload, kLogDebug, "probing host %s had %dms rtt",
2164  url.c_str(), host_rtt[i]);
2165  } else {
2166  LogCvmfs(kLogDownload, kLogDebug, "error while probing host %s: %d %s",
2167  url.c_str(), result, Code2Ascii(result));
2168  host_rtt[i] = INT_MAX;
2169  }
2170  }
2171  }
2172 
2173  SortTeam(&host_rtt, &host_chain);
2174  for (i = 0; i < host_chain.size(); ++i) {
2175  if (host_rtt[i] == INT_MAX) host_rtt[i] = kProbeDown;
2176  }
2177 
2179  delete opt_host_chain_;
2180  delete opt_host_chain_rtt_;
2181  opt_host_chain_ = new vector<string>(host_chain);
2182  opt_host_chain_rtt_ = new vector<int>(host_rtt);
2184 }
2185 
2186 bool DownloadManager::GeoSortServers(std::vector<std::string> *servers,
2187  std::vector<uint64_t> *output_order) {
2188  if (!servers) {return false;}
2189  if (servers->size() == 1) {
2190  if (output_order) {
2191  output_order->clear();
2192  output_order->push_back(0);
2193  }
2194  return true;
2195  }
2196 
2197  std::vector<std::string> host_chain;
2198  GetHostInfo(&host_chain, NULL, NULL);
2199 
2200  std::vector<std::string> server_dns_names;
2201  server_dns_names.reserve(servers->size());
2202  for (unsigned i = 0; i < servers->size(); ++i) {
2203  std::string host = dns::ExtractHost((*servers)[i]);
2204  server_dns_names.push_back(host.empty() ? (*servers)[i] : host);
2205  }
2206  std::string host_list = JoinStrings(server_dns_names, ",");
2207 
2208  vector<string> host_chain_shuffled;
2209  {
2210  // Protect against concurrent access to prng_
2212  // Determine random hosts for the Geo-API query
2213  host_chain_shuffled = Shuffle(host_chain, &prng_);
2214  }
2215  // Request ordered list via Geo-API
2216  bool success = false;
2217  unsigned max_attempts = std::min(host_chain_shuffled.size(), size_t(3));
2218  vector<uint64_t> geo_order(servers->size());
2219  for (unsigned i = 0; i < max_attempts; ++i) {
2220  string url = host_chain_shuffled[i] + "/api/v1.0/geo/@proxy@/" + host_list;
2222  "requesting ordered server list from %s", url.c_str());
2223  cvmfs::MemSink memsink;
2224  JobInfo info(&url, false, false, NULL, &memsink);
2225  Failures result = Fetch(&info);
2226  if (result == kFailOk) {
2227  string order(reinterpret_cast<char*>(memsink.data()), memsink.pos());
2228  memsink.Reset();
2229  bool retval = ValidateGeoReply(order, servers->size(), &geo_order);
2230  if (!retval) {
2232  "retrieved invalid GeoAPI reply from %s [%s]",
2233  url.c_str(), order.c_str());
2234  } else {
2236  "geographic order of servers retrieved from %s",
2237  dns::ExtractHost(host_chain_shuffled[i]).c_str());
2238  LogCvmfs(kLogDownload, kLogDebug, "order is %s", order.c_str());
2239  success = true;
2240  break;
2241  }
2242  } else {
2244  "GeoAPI request %s failed with error %d [%s]",
2245  url.c_str(), result, Code2Ascii(result));
2246  }
2247  }
2248  if (!success) {
2250  "failed to retrieve geographic order from stratum 1 servers");
2251  return false;
2252  }
2253 
2254  if (output_order) {
2255  output_order->swap(geo_order);
2256  } else {
2257  std::vector<std::string> sorted_servers;
2258  sorted_servers.reserve(geo_order.size());
2259  for (unsigned i = 0; i < geo_order.size(); ++i) {
2260  uint64_t orderval = geo_order[i];
2261  sorted_servers.push_back((*servers)[orderval]);
2262  }
2263  servers->swap(sorted_servers);
2264  }
2265  return true;
2266 }
2267 
2268 
2277  vector<string> host_chain;
2278  vector<int> host_rtt;
2279  unsigned current_host;
2280  vector< vector<ProxyInfo> > proxy_chain;
2281  unsigned fallback_group;
2282 
2283  GetHostInfo(&host_chain, &host_rtt, &current_host);
2284  GetProxyInfo(&proxy_chain, NULL, &fallback_group);
2285  if ((host_chain.size() < 2) && ((proxy_chain.size() - fallback_group) < 2))
2286  return true;
2287 
2288  vector<string> host_names;
2289  for (unsigned i = 0; i < host_chain.size(); ++i)
2290  host_names.push_back(dns::ExtractHost(host_chain[i]));
2291  SortTeam(&host_names, &host_chain);
2292  unsigned last_geo_host = host_names.size();
2293 
2294  if ((fallback_group == 0) && (last_geo_host > 1)) {
2295  // There are no non-fallback proxies, which means that the client
2296  // will always use the fallback proxies. Add a keyword separator
2297  // between the hosts and fallback proxies so the geosorting service
2298  // will know to sort the hosts based on the distance from the
2299  // closest fallback proxy rather than the distance from the client.
2300  host_names.push_back("+PXYSEP+");
2301  }
2302 
2303  // Add fallback proxy names to the end of the host list
2304  unsigned first_geo_fallback = host_names.size();
2305  for (unsigned i = fallback_group; i < proxy_chain.size(); ++i) {
2306  // We only take the first fallback proxy name from every group under the
2307  // assumption that load-balanced servers are at the same location
2308  host_names.push_back(proxy_chain[i][0].host.name());
2309  }
2310 
2311  std::vector<uint64_t> geo_order;
2312  bool success = GeoSortServers(&host_names, &geo_order);
2313  if (!success) {
2314  // GeoSortServers already logged a failure message.
2315  return false;
2316  }
2317 
2318  // Re-install host chain and proxy chain
2320  delete opt_host_chain_;
2321  opt_num_proxies_ = 0;
2322  opt_host_chain_ = new vector<string>(host_chain.size());
2323 
2324  // It's possible that opt_proxy_groups_fallback_ might have changed while
2325  // the lock wasn't held
2326  vector<vector<ProxyInfo> > *proxy_groups = new vector<vector<ProxyInfo> >(
2327  opt_proxy_groups_fallback_ + proxy_chain.size() - fallback_group);
2328  // First copy the non-fallback part of the current proxy chain
2329  for (unsigned i = 0; i < opt_proxy_groups_fallback_; ++i) {
2330  (*proxy_groups)[i] = (*opt_proxy_groups_)[i];
2331  opt_num_proxies_ += (*opt_proxy_groups_)[i].size();
2332  }
2333 
2334  // Copy the host chain and fallback proxies by geo order. Array indices
2335  // in geo_order that are smaller than last_geo_host refer to a stratum 1,
2336  // and those indices greater than or equal to first_geo_fallback refer to
2337  // a fallback proxy.
2338  unsigned hosti = 0;
2339  unsigned proxyi = opt_proxy_groups_fallback_;
2340  for (unsigned i = 0; i < geo_order.size(); ++i) {
2341  uint64_t orderval = geo_order[i];
2342  if (orderval < static_cast<uint64_t>(last_geo_host)) {
2343  // LogCvmfs(kLogCvmfs, kLogSyslog, "this is orderval %u at host index
2344  // %u", orderval, hosti);
2345  (*opt_host_chain_)[hosti++] = host_chain[orderval];
2346  } else if (orderval >= static_cast<uint64_t>(first_geo_fallback)) {
2347  // LogCvmfs(kLogCvmfs, kLogSyslog,
2348  // "this is orderval %u at proxy index %u, using proxy_chain index %u",
2349  // orderval, proxyi, fallback_group + orderval - first_geo_fallback);
2350  (*proxy_groups)[proxyi] =
2351  proxy_chain[fallback_group + orderval - first_geo_fallback];
2352  opt_num_proxies_ += (*proxy_groups)[proxyi].size();
2353  proxyi++;
2354  }
2355  }
2356 
2357  opt_proxy_map_.clear();
2358  delete opt_proxy_groups_;
2359  opt_proxy_groups_ = proxy_groups;
2360  // In pathological cases, opt_proxy_groups_current_ can be larger now when
2361  // proxies changed in-between.
2363  if (opt_proxy_groups_->size() == 0) {
2365  } else {
2367  }
2369  }
2370 
2371  UpdateProxiesUnlocked("geosort");
2372 
2373  delete opt_host_chain_rtt_;
2374  opt_host_chain_rtt_ = new vector<int>(host_chain.size(), kProbeGeo);
2376 
2377  return true;
2378 }
2379 
2380 
2389  const string &reply_order,
2390  const unsigned expected_size,
2391  vector<uint64_t> *reply_vals)
2392 {
2393  if (reply_order.empty())
2394  return false;
2395  sanitizer::InputSanitizer sanitizer("09 , \n");
2396  if (!sanitizer.IsValid(reply_order))
2397  return false;
2398  sanitizer::InputSanitizer strip_newline("09 ,");
2399  vector<string> reply_strings =
2400  SplitString(strip_newline.Filter(reply_order), ',');
2401  vector<uint64_t> tmp_vals;
2402  for (unsigned i = 0; i < reply_strings.size(); ++i) {
2403  if (reply_strings[i].empty())
2404  return false;
2405  tmp_vals.push_back(String2Uint64(reply_strings[i]));
2406  }
2407  if (tmp_vals.size() != expected_size)
2408  return false;
2409 
2410  // Check if tmp_vals contains the number 1..n
2411  set<uint64_t> coverage(tmp_vals.begin(), tmp_vals.end());
2412  if (coverage.size() != tmp_vals.size())
2413  return false;
2414  if ((*coverage.begin() != 1) || (*coverage.rbegin() != coverage.size()))
2415  return false;
2416 
2417  for (unsigned i = 0; i < expected_size; ++i) {
2418  (*reply_vals)[i] = tmp_vals[i] - 1;
2419  }
2420  return true;
2421 }
2422 
2423 
2429  const string &proxy_list,
2430  string *cleaned_list)
2431 {
2432  assert(cleaned_list);
2433  if (proxy_list == "") {
2434  *cleaned_list = "";
2435  return false;
2436  }
2437  bool result = false;
2438 
2439  vector<string> proxy_groups = SplitString(proxy_list, ';');
2440  vector<string> cleaned_groups;
2441  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2442  vector<string> group = SplitString(proxy_groups[i], '|');
2443  vector<string> cleaned;
2444  for (unsigned j = 0; j < group.size(); ++j) {
2445  if ((group[j] == "DIRECT") || (group[j] == "")) {
2446  result = true;
2447  } else {
2448  cleaned.push_back(group[j]);
2449  }
2450  }
2451  if (!cleaned.empty())
2452  cleaned_groups.push_back(JoinStrings(cleaned, "|"));
2453  }
2454 
2455  *cleaned_list = JoinStrings(cleaned_groups, ";");
2456  return result;
2457 }
2458 
2459 
2469  const string &proxy_list,
2470  const string &fallback_proxy_list,
2471  const ProxySetModes set_mode)
2472 {
2474 
2477  string set_proxy_list = opt_proxy_list_;
2478  string set_proxy_fallback_list = opt_proxy_fallback_list_;
2479  bool contains_direct;
2480  if ((set_mode == kSetProxyFallback) || (set_mode == kSetProxyBoth)) {
2481  opt_proxy_fallback_list_ = fallback_proxy_list;
2482  }
2483  if ((set_mode == kSetProxyRegular) || (set_mode == kSetProxyBoth)) {
2484  opt_proxy_list_ = proxy_list;
2485  }
2486  contains_direct =
2487  StripDirect(opt_proxy_fallback_list_, &set_proxy_fallback_list);
2488  if (contains_direct) {
2490  "fallback proxies do not support DIRECT, removing");
2491  }
2492  if (set_proxy_fallback_list == "") {
2493  set_proxy_list = opt_proxy_list_;
2494  } else {
2495  bool contains_direct = StripDirect(opt_proxy_list_, &set_proxy_list);
2496  if (contains_direct) {
2498  "skipping DIRECT proxy to use fallback proxy");
2499  }
2500  }
2501 
2502  // From this point on, use set_proxy_list and set_fallback_proxy_list as
2503  // effective proxy lists!
2504 
2505  opt_proxy_map_.clear();
2506  delete opt_proxy_groups_;
2507  if ((set_proxy_list == "") && (set_proxy_fallback_list == "")) {
2508  opt_proxy_groups_ = NULL;
2512  opt_num_proxies_ = 0;
2513  return;
2514  }
2515 
2516  // Determine number of regular proxy groups (== first fallback proxy group)
2518  if (set_proxy_list != "") {
2519  opt_proxy_groups_fallback_ = SplitString(set_proxy_list, ';').size();
2520  }
2521  LogCvmfs(kLogDownload, kLogDebug, "first fallback proxy group %u",
2523 
2524  // Concatenate regular proxies and fallback proxies, both of which can be
2525  // empty.
2526  string all_proxy_list = set_proxy_list;
2527  if (set_proxy_fallback_list != "") {
2528  if (all_proxy_list != "")
2529  all_proxy_list += ";";
2530  all_proxy_list += set_proxy_fallback_list;
2531  }
2532  LogCvmfs(kLogDownload, kLogDebug, "full proxy list %s",
2533  all_proxy_list.c_str());
2534 
2535  // Resolve server names in provided urls
2536  vector<string> hostnames; // All encountered hostnames
2537  vector<string> proxy_groups;
2538  if (all_proxy_list != "")
2539  proxy_groups = SplitString(all_proxy_list, ';');
2540  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2541  vector<string> this_group = SplitString(proxy_groups[i], '|');
2542  for (unsigned j = 0; j < this_group.size(); ++j) {
2543  this_group[j] = dns::AddDefaultScheme(this_group[j]);
2544  // Note: DIRECT strings will be "extracted" to an empty string.
2545  string hostname = dns::ExtractHost(this_group[j]);
2546  // Save the hostname. Leave empty (DIRECT) names so indexes will
2547  // match later.
2548  hostnames.push_back(hostname);
2549  }
2550  }
2551  vector<dns::Host> hosts;
2552  LogCvmfs(kLogDownload, kLogDebug, "resolving %u proxy addresses",
2553  hostnames.size());
2554  resolver_->ResolveMany(hostnames, &hosts);
2555 
2556  // Construct opt_proxy_groups_: traverse proxy list in same order and expand
2557  // names to resolved IP addresses.
2558  opt_proxy_groups_ = new vector< vector<ProxyInfo> >();
2559  opt_num_proxies_ = 0;
2560  unsigned num_proxy = 0; // Combined i, j counter
2561  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2562  vector<string> this_group = SplitString(proxy_groups[i], '|');
2563  // Construct ProxyInfo objects from proxy string and DNS resolver result for
2564  // every proxy in this_group. One URL can result in multiple ProxyInfo
2565  // objects, one for each IP address.
2566  vector<ProxyInfo> infos;
2567  for (unsigned j = 0; j < this_group.size(); ++j, ++num_proxy) {
2568  this_group[j] = dns::AddDefaultScheme(this_group[j]);
2569  if (this_group[j] == "DIRECT") {
2570  infos.push_back(ProxyInfo("DIRECT"));
2571  continue;
2572  }
2573 
2574  if (hosts[num_proxy].status() != dns::kFailOk) {
2576  "failed to resolve IP addresses for %s (%d - %s)",
2577  hosts[num_proxy].name().c_str(), hosts[num_proxy].status(),
2578  dns::Code2Ascii(hosts[num_proxy].status()));
2579  dns::Host failed_host =
2580  dns::Host::ExtendDeadline(hosts[num_proxy], resolver_->min_ttl());
2581  infos.push_back(ProxyInfo(failed_host, this_group[j]));
2582  continue;
2583  }
2584 
2585  // IPv4 addresses have precedence
2586  set<string> best_addresses =
2587  hosts[num_proxy].ViewBestAddresses(opt_ip_preference_);
2588  set<string>::const_iterator iter_ips = best_addresses.begin();
2589  for (; iter_ips != best_addresses.end(); ++iter_ips) {
2590  string url_ip = dns::RewriteUrl(this_group[j], *iter_ips);
2591  infos.push_back(ProxyInfo(hosts[num_proxy], url_ip));
2592 
2593  if (sharding_policy_.UseCount() > 0) {
2594  sharding_policy_->AddProxy(url_ip);
2595  }
2596  }
2597  }
2598  opt_proxy_groups_->push_back(infos);
2599  opt_num_proxies_ += infos.size();
2600  }
2602  "installed %u proxies in %u load-balance groups",
2603  opt_num_proxies_, opt_proxy_groups_->size());
2606 
2607  // Select random start proxy from the first group.
2608  if (opt_proxy_groups_->size() > 0) {
2609  // Select random start proxy from the first group.
2610  UpdateProxiesUnlocked("set proxies");
2611  }
2612 }
2613 
2614 
2621 void DownloadManager::GetProxyInfo(vector< vector<ProxyInfo> > *proxy_chain,
2622  unsigned *current_group,
2623  unsigned *fallback_group)
2624 {
2625  assert(proxy_chain != NULL);
2627 
2628 
2629  if (!opt_proxy_groups_) {
2630  vector< vector<ProxyInfo> > empty_chain;
2631  *proxy_chain = empty_chain;
2632  if (current_group != NULL)
2633  *current_group = 0;
2634  if (fallback_group != NULL)
2635  *fallback_group = 0;
2636  return;
2637  }
2638 
2639  *proxy_chain = *opt_proxy_groups_;
2640  if (current_group != NULL)
2641  *current_group = opt_proxy_groups_current_;
2642  if (fallback_group != NULL)
2643  *fallback_group = opt_proxy_groups_fallback_;
2644 }
2645 
2647  return opt_proxy_list_;
2648 }
2649 
2651  return opt_proxy_fallback_list_;
2652 }
2653 
2659  if (!opt_proxy_groups_)
2660  return NULL;
2661 
2662  uint32_t key = (hash ? hash->Partial32() : 0);
2663  map<uint32_t, ProxyInfo *>::iterator it = opt_proxy_map_.lower_bound(key);
2664  ProxyInfo *proxy = it->second;
2665 
2666  return proxy;
2667 }
2668 
2672 void DownloadManager::UpdateProxiesUnlocked(const string &reason) {
2673  if (!opt_proxy_groups_)
2674  return;
2675 
2676  // Identify number of non-burned proxies within the current group
2677  vector<ProxyInfo> *group = current_proxy_group();
2678  unsigned num_alive = (group->size() - opt_proxy_groups_current_burned_);
2679  string old_proxy = JoinStrings(opt_proxy_urls_, "|");
2680 
2681  // Rebuild proxy map and URL list
2682  opt_proxy_map_.clear();
2683  opt_proxy_urls_.clear();
2684  const uint32_t max_key = 0xffffffffUL;
2685  if (opt_proxy_shard_) {
2686  // Build a consistent map with multiple entries for each proxy
2687  for (unsigned i = 0; i < num_alive; ++i) {
2688  ProxyInfo *proxy = &(*group)[i];
2689  shash::Any proxy_hash(shash::kSha1);
2690  HashString(proxy->url, &proxy_hash);
2691  Prng prng;
2692  prng.InitSeed(proxy_hash.Partial32());
2693  for (unsigned j = 0; j < kProxyMapScale; ++j) {
2694  const std::pair<uint32_t, ProxyInfo *> entry(prng.Next(max_key), proxy);
2695  opt_proxy_map_.insert(entry);
2696  }
2697  opt_proxy_urls_.push_back(proxy->url);
2698  }
2699  // Ensure lower_bound() finds a value for all keys
2700  ProxyInfo *first_proxy = opt_proxy_map_.begin()->second;
2701  const std::pair<uint32_t, ProxyInfo *> last_entry(max_key, first_proxy);
2702  opt_proxy_map_.insert(last_entry);
2703  } else {
2704  // Build a map with a single entry for one randomly selected proxy
2705  unsigned select = prng_.Next(num_alive);
2706  ProxyInfo *proxy = &(*group)[select];
2707  const std::pair<uint32_t, ProxyInfo *> entry(max_key, proxy);
2708  opt_proxy_map_.insert(entry);
2709  opt_proxy_urls_.push_back(proxy->url);
2710  }
2711  sort(opt_proxy_urls_.begin(), opt_proxy_urls_.end());
2712 
2713  // Report any change in proxy usage
2714  string new_proxy = JoinStrings(opt_proxy_urls_, "|");
2715  if (new_proxy != old_proxy) {
2717  "switching proxy from %s to %s (%s)",
2718  (old_proxy.empty() ? "(none)" : old_proxy.c_str()),
2719  (new_proxy.empty() ? "(none)" : new_proxy.c_str()),
2720  reason.c_str());
2721  }
2722 }
2723 
2728  opt_proxy_shard_ = true;
2729  RebalanceProxiesUnlocked("enable sharding");
2730 }
2731 
2736 void DownloadManager::RebalanceProxiesUnlocked(const string &reason) {
2737  if (!opt_proxy_groups_)
2738  return;
2739 
2742  UpdateProxiesUnlocked(reason);
2743 }
2744 
2745 
2748  RebalanceProxiesUnlocked("rebalance");
2749 }
2750 
2751 
2757 
2758  if (!opt_proxy_groups_ || (opt_proxy_groups_->size() < 2)) {
2759  return;
2760  }
2761 
2763  opt_proxy_groups_->size();
2764  opt_timestamp_backup_proxies_ = time(NULL);
2765  RebalanceProxiesUnlocked("switch proxy group");
2766 }
2767 
2768 
2769 void DownloadManager::SetProxyGroupResetDelay(const unsigned seconds) {
2772  if (opt_proxy_groups_reset_after_ == 0) {
2775  }
2776 }
2777 
2778 
2779 void DownloadManager::SetHostResetDelay(const unsigned seconds)
2780 {
2782  opt_host_reset_after_ = seconds;
2783  if (opt_host_reset_after_ == 0)
2785 }
2786 
2787 
2788 void DownloadManager::SetRetryParameters(const unsigned max_retries,
2789  const unsigned backoff_init_ms,
2790  const unsigned backoff_max_ms)
2791 {
2793  opt_max_retries_ = max_retries;
2794  opt_backoff_init_ms_ = backoff_init_ms;
2795  opt_backoff_max_ms_ = backoff_max_ms;
2796 }
2797 
2798 
2801  resolver_->set_throttle(limit);
2802 }
2803 
2804 
2806  const std::string &direct,
2807  const std::string &forced)
2808 {
2810  proxy_template_direct_ = direct;
2811  proxy_template_forced_ = forced;
2812 }
2813 
2814 
2816  enable_info_header_ = true;
2817 }
2818 
2819 
2821  follow_redirects_ = true;
2822 }
2823 
2826 }
2827 
2829  enable_http_tracing_ = true;
2830 }
2831 
2832 void DownloadManager::AddHTTPTracingHeader(const std::string &header) {
2833  http_tracing_headers_.push_back(header);
2834 }
2835 
2838 }
2839 
2841  bool success = false;
2842  switch (type) {
2843  default:
2845  "Proposed sharding policy does not exist. Falling back to default");
2846  }
2847  return success;
2848 }
2849 
2851  failover_indefinitely_ = true;
2852 }
2853 
2859  const perf::StatisticsTemplate &statistics)
2860 {
2861  DownloadManager *clone = new DownloadManager(pool_max_handles_, statistics);
2862 
2866 
2867  if (!opt_dns_server_.empty())
2868  clone->SetDnsServer(opt_dns_server_);
2880  if (opt_host_chain_) {
2881  clone->opt_host_chain_ = new vector<string>(*opt_host_chain_);
2882  clone->opt_host_chain_rtt_ = new vector<int>(*opt_host_chain_rtt_);
2883  }
2884 
2885  CloneProxyConfig(clone);
2893 
2894  clone->health_check_ = health_check_;
2897  clone->fqrn_ = fqrn_;
2898 
2899  return clone;
2900 }
2901 
2902 
2911  if (opt_proxy_groups_ == NULL)
2912  return;
2913 
2914  clone->opt_proxy_groups_ = new vector< vector<ProxyInfo> >(
2916  clone->UpdateProxiesUnlocked("cloned");
2917 }
2918 
2919 } // namespace download
unsigned opt_timeout_direct_
Definition: download.h:290
std::vector< std::string > http_tracing_headers_
Definition: download.h:306
bool StripDirect(const std::string &proxy_list, std::string *cleaned_list)
Definition: download.cc:2428
unsigned opt_low_speed_limit_
Definition: download.h:291
void HashString(const std::string &content, Any *any_digest)
Definition: hash.cc:268
#define LogCvmfs(source, mask,...)
Definition: logging.h:25
Definition: prng.h:28
static const unsigned kDnsDefaultTimeoutMs
Definition: download.h:155
std::vector< T > Shuffle(const std::vector< T > &input, Prng *prng)
Definition: algorithm.h:50
unsigned throttle() const
Definition: dns.h:206
z_stream * GetZstreamPtr()
Definition: jobinfo.h:121
unsigned opt_backoff_init_ms_
Definition: download.h:293
void SetInfoHeader(char *info_header)
Definition: jobinfo.h:195
shash::ContextPtr * GetHashContextPtr()
Definition: jobinfo.h:126
uid_t uid() const
Definition: jobinfo.h:137
int64_t Xadd(class Counter *counter, const int64_t delta)
Definition: statistics.h:51
struct stat64 platform_stat64
const char * Code2Ascii(const Failures error)
Definition: dns.h:53
int64_t id() const
Definition: dns.h:108
unsigned opt_proxy_groups_current_burned_
Definition: download.h:327
double DiffTimeSeconds(struct timeval start, struct timeval end)
Definition: algorithm.cc:31
unsigned opt_proxy_groups_reset_after_
Definition: download.h:416
bool probe_hosts() const
Definition: jobinfo.h:132
virtual bool IsCanceled()
Definition: interrupt.h:16
void SetUrlOptions(JobInfo *info)
Definition: download.cc:946
SharedPtr< ShardingPolicy > sharding_policy_
Definition: download.h:366
StreamStates DecompressZStream2Sink(const void *buf, const int64_t size, z_stream *strm, cvmfs::Sink *sink)
Definition: compression.cc:234
void ResolveMany(const std::vector< std::string > &names, std::vector< Host > *hosts)
Definition: dns.cc:370
std::string opt_proxy_fallback_list_
Definition: download.h:344
void SetHostChain(const std::string &host_list)
static NormalResolver * Create(const bool ipv4_only, const unsigned retries, const unsigned timeout_ms)
Definition: dns.cc:1259
unsigned opt_host_reset_after_
Definition: download.h:424
void SetLowSpeedLimit(const unsigned low_speed_limit)
Definition: download.cc:1946
virtual int Purge()=0
std::string proxy_template_direct_
Definition: download.h:400
curl_slist ** GetHeadersPtr()
Definition: jobinfo.h:124
static const int kProbeGeo
Definition: download.h:152
#define PANIC(...)
Definition: exception.h:29
string Trim(const string &raw, bool trim_newline)
Definition: string.cc:428
string ReplaceAll(const string &haystack, const string &needle, const string &replace_by)
Definition: string.cc:484
void set_min_ttl(unsigned seconds)
Definition: dns.h:207
string JoinStrings(const vector< string > &strings, const string &joint)
Definition: string.cc:325
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:249
unsigned opt_proxy_groups_current_
Definition: download.h:322
virtual bool RequiresReserve()=0
Pipe< kPipeDownloadJobsResults > * GetPipeJobResultWeakRef()
Definition: jobinfo.h:127
bool ValidateGeoReply(const std::string &reply_order, const unsigned expected_size, std::vector< uint64_t > *reply_vals)
Definition: download.cc:2388
std::vector< ProxyInfo > * current_proxy_group() const
Definition: download.h:262
void DecompressInit(z_stream *strm)
Definition: compression.cc:185
const std::string * url() const
Definition: jobinfo.h:130
void * cred_data() const
Definition: jobinfo.h:139
time_t opt_timestamp_backup_proxies_
Definition: download.h:414
void SetProxyChain(const std::string &proxy_list, const std::string &fallback_proxy_list, const ProxySetModes set_mode)
Definition: download.cc:2468
std::string GetProxyList()
Definition: download.cc:2646
unsigned min_ttl() const
Definition: dns.h:208
void InitLocaltime()
Definition: prng.h:39
bool allow_failure() const
Definition: jobinfo.h:167
std::set< CURL * > * pool_handles_inuse_
Definition: download.h:269
CURL * curl_handle() const
Definition: jobinfo.h:148
pthread_mutex_t * lock_options_
Definition: download.h:286
ProxyInfo * ChooseProxyUnlocked(const shash::Any *hash)
Definition: download.cc:2658
pthread_t thread_download_
Definition: download.h:276
DownloadManager(const unsigned max_pool_handles, const perf::StatisticsTemplate &statistics)
Definition: download.cc:1662
std::string opt_proxy_list_
Definition: download.h:340
const std::string & name() const
Definition: dns.h:118
perf::Counter * sz_transfer_time
Definition: download.h:44
void SetTracingHeaderGid(char *tracing_header_gid)
Definition: jobinfo.h:198
std::vector< std::vector< ProxyInfo > > * opt_proxy_groups_
Definition: download.h:318
assert((mem||(size==0))&&"Out Of Memory")
void SetNocache(bool nocache)
Definition: jobinfo.h:206
unsigned opt_proxy_groups_fallback_
Definition: download.h:332
void ReleaseCurlHandle(CURL *handle)
Definition: download.cc:842
StreamStates
Definition: compression.h:36
void SetTracingHeaderPid(char *tracing_header_pid)
Definition: jobinfo.h:196
void set_max_ttl(unsigned seconds)
Definition: dns.h:209
Algorithms algorithm
Definition: hash.h:125
char * tracing_header_gid() const
Definition: jobinfo.h:152
const std::string path()
Definition: sink_path.h:109
const std::set< std::string > & ViewBestAddresses(IpPreference preference) const
Definition: dns.cc:205
char * info_header() const
Definition: jobinfo.h:150
void SetDnsServer(const std::string &address)
Definition: download.cc:1875
int platform_stat(const char *path, platform_stat64 *buf)
bool force_nocache() const
Definition: jobinfo.h:135
std::string StringifyUint(const uint64_t value)
Definition: string.cc:84
void DecompressFini(z_stream *strm)
Definition: compression.cc:201
virtual std::string Describe()=0
static void * MainDownload(void *data)
Definition: download.cc:516
void InitSeed(const uint64_t seed)
Definition: prng.h:35
void SetTimeout(const unsigned seconds_proxy, const unsigned seconds_direct)
Definition: download.cc:1932
std::string opt_dns_server_
Definition: download.h:288
off_t range_offset() const
Definition: jobinfo.h:145
bool nocache() const
Definition: jobinfo.h:157
char algorithm
unsigned char num_used_hosts() const
Definition: jobinfo.h:161
virtual bool IsValid()=0
bool follow_redirects() const
Definition: jobinfo.h:134
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:133
void Init(ContextPtr context)
Definition: hash.cc:164
int http_code() const
Definition: jobinfo.h:159
bool IsValid(const std::string &input) const
Definition: sanitizer.cc:114
Algorithms
Definition: hash.h:41
bool FileExists(const std::string &path)
Definition: posix.cc:791
unsigned max_ttl() const
Definition: dns.h:210
void SetDnsTtlLimits(const unsigned min_seconds, const unsigned max_seconds)
Definition: download.cc:1911
static Failures PrepareDownloadDestination(JobInfo *info)
Definition: download.cc:111
void SetHttpCode(int http_code)
Definition: jobinfo.h:208
std::vector< std::string > opt_proxy_urls_
Definition: download.h:352
const char * Code2Ascii(const Failures error)
bool head_request() const
Definition: jobinfo.h:133
unsigned char num_retries() const
Definition: jobinfo.h:162
bool IsValidPipeJobResults()
Definition: jobinfo.h:107
off_t range_size() const
Definition: jobinfo.h:146
void GetProxyInfo(std::vector< std::vector< ProxyInfo > > *proxy_chain, unsigned *current_group, unsigned *fallback_group)
Definition: download.cc:2621
void SetProxyGroupResetDelay(const unsigned seconds)
Definition: download.cc:2769
atomic_int32 multi_threaded_
Definition: download.h:277
bool compressed() const
Definition: jobinfo.h:131
void SetCurrentHostChainIndex(unsigned int current_host_chain_index)
Definition: jobinfo.h:215
std::string AddDefaultScheme(const std::string &proxy)
Definition: dns.cc:187
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:290
cvmfs::Sink * sink() const
Definition: jobinfo.h:141
gid_t gid() const
Definition: jobinfo.h:138
dns::NormalResolver * resolver_
Definition: download.h:388
bool Interrupted(const std::string &fqrn, JobInfo *info)
Definition: download.cc:85
std::string proxy() const
Definition: jobinfo.h:156
dns::IpPreference opt_ip_preference_
Definition: download.h:393
Algorithms algorithm
Definition: hash.h:500
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:237
unsigned int current_host_chain_index() const
Definition: jobinfo.h:164
void UseSystemCertificatePath()
Definition: ssl.cc:68
Definition: dns.h:90
bool SetShardingPolicy(const ShardingPolicySelector type)
Definition: download.cc:2840
perf::Counter * n_host_failover
Definition: download.h:48
void UpdateProxiesUnlocked(const std::string &reason)
Definition: download.cc:2672
bool IsEquivalent(const Host &other) const
Definition: dns.cc:269
bool IsProxyTransferError(const Failures error)
void SetNumRetries(unsigned char num_retries)
Definition: jobinfo.h:213
bool IsExpired() const
Definition: dns.cc:280
void set_throttle(const unsigned throttle)
Definition: dns.h:205
InterruptCue * interrupt_cue() const
Definition: jobinfo.h:140
void SetIpPreference(const dns::IpPreference preference)
Definition: download.cc:1921
virtual int Reset()=0
shash::ContextPtr hash_context() const
Definition: jobinfo.h:155
perf::Counter * n_requests
Definition: download.h:45
bool Read(T *data)
Definition: pipe.h:166
void Final(ContextPtr context, Any *any_digest)
Definition: hash.cc:221
void SetRetryParameters(const unsigned max_retries, const unsigned backoff_init_ms, const unsigned backoff_max_ms)
Definition: download.cc:2788
string StringifyInt(const int64_t value)
Definition: string.cc:78
char * tracing_header_uid() const
Definition: jobinfo.h:153
Failures error_code() const
Definition: jobinfo.h:158
void CloneProxyConfig(DownloadManager *clone)
Definition: download.cc:2903
void SetMaxIpaddrPerProxy(unsigned limit)
Definition: download.cc:2799
void EnableIgnoreSignatureFailures()
Definition: download.cc:2824
DownloadManager * Clone(const perf::StatisticsTemplate &statistics)
Definition: download.cc:2858
void Inc(class Counter *counter)
Definition: statistics.h:50
void * buffer
Definition: hash.h:501
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:267
std::string ExtractHost(const std::string &url)
Definition: dns.cc:110
void ** GetCredDataPtr()
Definition: jobinfo.h:123
std::vector< int > * opt_host_chain_rtt_
Definition: download.h:314
SslCertificateStore ssl_certificate_store_
Definition: download.h:437
time_t opt_timestamp_backup_host_
Definition: download.h:423
std::string GetFallbackProxyList()
Definition: download.cc:2650
uint32_t Partial32() const
Definition: hash.h:394
void SetProxyTemplates(const std::string &direct, const std::string &forced)
Definition: download.cc:2805
IpPreference
Definition: dns.h:46
unsigned retries() const
Definition: dns.h:203
unsigned opt_backoff_max_ms_
Definition: download.h:294
void SetErrorCode(Failures error_code)
Definition: jobinfo.h:207
void SetNumUsedProxies(unsigned char num_used_proxies)
Definition: jobinfo.h:209
unsigned GetContextSize(const Algorithms algorithm)
Definition: hash.cc:148
std::string GetDnsServer() const
Definition: download.cc:1867
unsigned opt_host_chain_current_
Definition: download.h:315
CredentialsAttachment * credentials_attachment_
Definition: download.h:426
std::vector< std::string > * opt_host_chain_
Definition: download.h:309
struct pollfd * watch_fds_
Definition: download.h:281
void Update(const unsigned char *buffer, const unsigned buffer_length, ContextPtr context)
Definition: hash.cc:190
std::map< uint32_t, ProxyInfo * > opt_proxy_map_
Definition: download.h:348
uint64_t String2Uint64(const string &value)
Definition: string.cc:228
UniquePtr< Pipe< kPipeDownloadJobs > > pipe_jobs_
Definition: download.h:280
void CreatePipeJobResults()
Definition: jobinfo.h:103
Failures Fetch(JobInfo *info)
Definition: download.cc:1766
void SetCurlHandle(CURL *curl_handle)
Definition: jobinfo.h:193
void SetTracingHeaderUid(char *tracing_header_uid)
Definition: jobinfo.h:200
Definition: mutex.h:42
curl_slist * headers() const
Definition: jobinfo.h:149
const shash::Any * expected_hash() const
Definition: jobinfo.h:142
perf::Counter * n_proxy_failover
Definition: download.h:47
virtual int Flush()=0
void GetTimeout(unsigned *seconds_proxy, unsigned *seconds_direct)
Definition: download.cc:1955
void SetCredData(void *cred_data)
Definition: jobinfo.h:180
std::string Filter(const std::string &input) const
Definition: sanitizer.cc:107
std::string proxy_template_forced_
Definition: download.h:406
time_t opt_timestamp_failover_proxies_
Definition: download.h:415
void SetDnsParameters(const unsigned retries, const unsigned timeout_ms)
Definition: download.cc:1893
unsigned EscapeHeader(const std::string &header, char *escaped_buf, size_t buf_size)
Definition: download.cc:388
const std::string * extra_info() const
Definition: jobinfo.h:143
static Host ExtendDeadline(const Host &original, unsigned seconds_from_now)
Definition: dns.cc:231
UniquePtr< Pipe< kPipeThreadTerminator > > pipe_terminate_
Definition: download.h:278
void SetProxy(const std::string &proxy)
Definition: jobinfo.h:205
static const int kProbeUnprobed
Definition: download.h:143
unsigned backoff_ms() const
Definition: jobinfo.h:163
virtual int Reset()
Definition: sink_mem.cc:52
pid_t pid() const
Definition: jobinfo.h:136
unsigned char num_used_proxies() const
Definition: jobinfo.h:160
void SafeSleepMs(const unsigned ms)
Definition: posix.cc:1972
bool IsHostTransferError(const Failures error)
static const unsigned kDnsDefaultRetries
Definition: download.h:154
void SortTeam(std::vector< T > *tractor, std::vector< U > *towed)
Definition: algorithm.h:68
bool GeoSortServers(std::vector< std::string > *servers, std::vector< uint64_t > *output_order=NULL)
Definition: download.cc:2186
virtual bool Reserve(size_t size)=0
static const int kProbeDown
Definition: download.h:148
void SetNumUsedHosts(unsigned char num_used_hosts)
Definition: jobinfo.h:211
unsigned size
Definition: hash.h:502
void SetHeaders(curl_slist *headers)
Definition: jobinfo.h:194
Failures status() const
Definition: dns.h:119
static const unsigned kProxyMapScale
Definition: download.h:156
void GetHostInfo(std::vector< std::string > *host_chain, std::vector< int > *rtt, unsigned *current_host)
Definition: download.cc:1999
static void size_t size
Definition: smalloc.h:54
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
Definition: download.cc:1298
char * tracing_header_pid() const
Definition: jobinfo.h:151
void SetBackoffMs(unsigned backoff_ms)
Definition: jobinfo.h:214
SharedPtr< HealthCheck > health_check_
Definition: download.h:374
void SwitchProxy(JobInfo *info)
Definition: download.cc:2019
void AddHTTPTracingHeader(const std::string &header)
Definition: download.cc:2832
void SetCredentialsAttachment(CredentialsAttachment *ca)
Definition: download.cc:1859
void SetFollowRedirects(bool follow_redirects)
Definition: jobinfo.h:174
void RebalanceProxiesUnlocked(const std::string &reason)
Definition: download.cc:2736
pthread_mutex_t * lock_synchronous_mode_
Definition: download.h:287
virtual bool SetResolvers(const std::vector< std::string > &resolvers)
Definition: dns.cc:1290
void InitializeRequest(JobInfo *info, CURL *handle)
Definition: download.cc:860
static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, void *userp, void *socketp)
Definition: download.cc:436
string RewriteUrl(const string &url, const string &ip)
Definition: dns.cc:156
void SetHostResetDelay(const unsigned seconds)
Definition: download.cc:2779
uint32_t Next(const uint64_t boundary)
Definition: prng.h:49
virtual int64_t Write(const void *buf, uint64_t sz)=0
unsigned timeout_ms() const
Definition: dns.h:204