CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
download.cc
Go to the documentation of this file.
1 
27 // TODO(jblomer): MS for time summing
28 // NOLINTNEXTLINE
29 #define __STDC_FORMAT_MACROS
30 
31 
32 #include "download.h"
33 
34 #include <alloca.h>
35 #include <errno.h>
36 #include <inttypes.h>
37 #include <poll.h>
38 #include <pthread.h>
39 #include <signal.h>
40 #include <stdint.h>
41 #include <sys/time.h>
42 #include <unistd.h>
43 
44 #include <algorithm>
45 #include <cassert>
46 #include <cstdio>
47 #include <cstdlib>
48 #include <cstring>
49 #include <map>
50 #include <set>
51 #include <utility>
52 
54 #include "crypto/hash.h"
55 #include "duplex_curl.h"
56 #include "interrupt.h"
57 #include "sanitizer.h"
58 #include "ssl.h"
59 #include "util/algorithm.h"
60 #include "util/atomic.h"
61 #include "util/concurrency.h"
62 #include "util/exception.h"
63 #include "util/logging.h"
64 #include "util/posix.h"
65 #include "util/prng.h"
66 #include "util/smalloc.h"
67 #include "util/string.h"
68 
69 using namespace std; // NOLINT
70 
71 namespace download {
72 
85 bool Interrupted(const std::string &fqrn, JobInfo *info) {
86  if (info->allow_failure()) {
87  return true;
88  }
89 
90  if (!fqrn.empty()) {
91  // it is up to the user the create this sentinel file ("pause_file") if
92  // CVMFS_FAILOVER_INDEFINITELY is used. It must be created during
93  // "cvmfs_config reload" and "cvmfs_config reload $fqrn"
94  const std::string pause_file =
95  std::string("/var/run/cvmfs/interrupt.") + fqrn;
96 
98  "(id %" PRId64 ") Interrupted(): checking for existence of %s",
99  info->id(), pause_file.c_str());
100  if (FileExists(pause_file)) {
102  "(id %" PRId64 ") Interrupt marker found - "
103  "Interrupting current download, this will EIO outstanding IO.",
104  info->id());
105  if (0 != unlink(pause_file.c_str())) {
107  "(id %" PRId64 ") Couldn't delete interrupt marker: errno=%d",
108  info->id(), errno);
109  }
110  return true;
111  }
112  }
113  return false;
114 }
115 
117  if (info->sink() != NULL && !info->sink()->IsValid()) {
118  cvmfs::PathSink *psink = dynamic_cast<cvmfs::PathSink *>(info->sink());
119  if (psink != NULL) {
121  "(id %" PRId64 ") Failed to open path %s: %s (errno=%d).",
122  info->id(), psink->path().c_str(), strerror(errno), errno);
123  return kFailLocalIO;
124  } else {
126  "(id %" PRId64 ") "
127  "Failed to create a valid sink: \n %s",
128  info->id(), info->sink()->Describe().c_str());
129  return kFailOther;
130  }
131  }
132 
133  return kFailOk;
134 }
135 
136 
140 static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb,
141  void *info_link) {
142  const size_t num_bytes = size * nmemb;
143  const string header_line(static_cast<const char *>(ptr), num_bytes);
144  JobInfo *info = static_cast<JobInfo *>(info_link);
145 
146  // LogCvmfs(kLogDownload, kLogDebug, "REMOVE-ME: Header callback with %s",
147  // header_line.c_str());
148 
149  // Check http status codes
150  if (HasPrefix(header_line, "HTTP/1.", false)) {
151  if (header_line.length() < 10) {
152  return 0;
153  }
154 
155  unsigned i;
156  for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {
157  }
158 
159  // Code is initialized to -1
160  if (header_line.length() > i + 2) {
161  info->SetHttpCode(DownloadManager::ParseHttpCode(&header_line[i]));
162  }
163 
164  if ((info->http_code() / 100) == 2) {
165  return num_bytes;
166  } else if ((info->http_code() == 301) || (info->http_code() == 302)
167  || (info->http_code() == 303) || (info->http_code() == 307)) {
168  if (!info->follow_redirects()) {
170  "(id %" PRId64 ") redirect support not enabled: %s",
171  info->id(), header_line.c_str());
173  return 0;
174  }
175  LogCvmfs(kLogDownload, kLogDebug, "(id %" PRId64 ") http redirect: %s",
176  info->id(), header_line.c_str());
177  // libcurl will handle this because of CURLOPT_FOLLOWLOCATION
178  return num_bytes;
179  } else {
181  "(id %" PRId64 ") http status error code: %s [%d]", info->id(),
182  header_line.c_str(), info->http_code());
183  if (((info->http_code() / 100) == 5) || (info->http_code() == 400)
184  || (info->http_code() == 404)) {
185  // 5XX returned by host
186  // 400: error from the GeoAPI module
187  // 404: the stratum 1 does not have the newest files
189  } else if (info->http_code() == 429) {
190  // 429: rate throttling (we ignore the backoff hint for the time being)
192  } else {
193  info->SetErrorCode((info->proxy() == "DIRECT") ? kFailHostHttp
194  : kFailProxyHttp);
195  }
196  return 0;
197  }
198  }
199 
200  // If needed: allocate space in sink
201  if (info->sink() != NULL && info->sink()->RequiresReserve()
202  && HasPrefix(header_line, "CONTENT-LENGTH:", true)) {
203  char *tmp = reinterpret_cast<char *>(alloca(num_bytes + 1));
204  uint64_t length = 0;
205  sscanf(header_line.c_str(), "%s %" PRIu64, tmp, &length);
206  if (length > 0) {
207  if (!info->sink()->Reserve(length)) {
209  "(id %" PRId64 ") "
210  "resource %s too large to store in memory (%" PRIu64 ")",
211  info->id(), info->url()->c_str(), length);
212  info->SetErrorCode(kFailTooBig);
213  return 0;
214  }
215  } else {
216  // Empty resource
217  info->sink()->Reserve(0);
218  }
219  } else if (HasPrefix(header_line, "LOCATION:", true)) {
220  // This comes along with redirects
221  LogCvmfs(kLogDownload, kLogDebug, "(id %" PRId64 ") %s", info->id(),
222  header_line.c_str());
223  } else if (HasPrefix(header_line, "LINK:", true)) {
224  // This is metalink info
225  LogCvmfs(kLogDownload, kLogDebug, "(id %" PRId64 ") %s", info->id(),
226  header_line.c_str());
227  std::string link = info->link();
228  if (link.size() != 0) {
229  // multiple LINK headers are allowed
230  link = link + ", " + header_line.substr(5);
231  } else {
232  link = header_line.substr(5);
233  }
234  info->SetLink(link);
235  } else if (HasPrefix(header_line, "X-SQUID-ERROR:", true)) {
236  // Reinterpret host error as proxy error
237  if (info->error_code() == kFailHostHttp) {
239  }
240  } else if (HasPrefix(header_line, "PROXY-STATUS:", true)) {
241  // Reinterpret host error as proxy error if applicable
242  if ((info->error_code() == kFailHostHttp)
243  && (header_line.find("error=") != string::npos)) {
245  }
246  }
247 
248  return num_bytes;
249 }
250 
251 
255 static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb,
256  void *info_link) {
257  const size_t num_bytes = size * nmemb;
258  JobInfo *info = static_cast<JobInfo *>(info_link);
259 
260  // TODO(heretherebedragons) remove if no error comes up
261  // as this means only jobinfo data request (and not header only)
262  // come here
263  assert(info->sink() != NULL);
264 
265  // LogCvmfs(kLogDownload, kLogDebug, "Data callback, %d bytes", num_bytes);
266 
267  if (num_bytes == 0)
268  return 0;
269 
270  if (info->expected_hash()) {
271  shash::Update(reinterpret_cast<unsigned char *>(ptr), num_bytes,
272  info->hash_context());
273  }
274 
275  if (info->compressed()) {
276  const zlib::StreamStates retval =
277  zlib::DecompressZStream2Sink(ptr, static_cast<int64_t>(num_bytes),
278  info->GetZstreamPtr(), info->sink());
279  if (retval == zlib::kStreamDataError) {
281  "(id %" PRId64 ") failed to decompress %s", info->id(),
282  info->url()->c_str());
283  info->SetErrorCode(kFailBadData);
284  return 0;
285  } else if (retval == zlib::kStreamIOError) {
287  "(id %" PRId64 ") decompressing %s, local IO error", info->id(),
288  info->url()->c_str());
289  info->SetErrorCode(kFailLocalIO);
290  return 0;
291  }
292  } else {
293  const int64_t written = info->sink()->Write(ptr, num_bytes);
294  if (written < 0 || static_cast<uint64_t>(written) != num_bytes) {
296  "(id %" PRId64 ") "
297  "Failed to perform write of %zu bytes to sink %s with errno %ld",
298  info->id(), num_bytes, info->sink()->Describe().c_str(),
299  written);
300  }
301  }
302 
303  return num_bytes;
304 }
305 
306 #ifdef DEBUGMSG
307 static int CallbackCurlDebug(CURL *handle,
308  curl_infotype type,
309  char *data,
310  size_t size,
311  void * /* clientp */) {
312  JobInfo *info;
313  curl_easy_getinfo(handle, CURLINFO_PRIVATE, &info);
314 
315  std::string prefix = "(id " + StringifyInt(info->id()) + ") ";
316  switch (type) {
317  case CURLINFO_TEXT:
318  prefix += "{info} ";
319  break;
320  case CURLINFO_HEADER_IN:
321  prefix += "{header/recv} ";
322  break;
323  case CURLINFO_HEADER_OUT:
324  prefix += "{header/sent} ";
325  break;
326  case CURLINFO_DATA_IN:
327  if (size < 50) {
328  prefix += "{data/recv} ";
329  break;
330  } else {
331  LogCvmfs(kLogCurl, kLogDebug, "%s{data/recv} <snip>", prefix.c_str());
332  return 0;
333  }
334  case CURLINFO_DATA_OUT:
335  if (size < 50) {
336  prefix += "{data/sent} ";
337  break;
338  } else {
339  LogCvmfs(kLogCurl, kLogDebug, "%s{data/sent} <snip>", prefix.c_str());
340  return 0;
341  }
342  case CURLINFO_SSL_DATA_IN:
343  if (size < 50) {
344  prefix += "{ssldata/recv} ";
345  break;
346  } else {
347  LogCvmfs(kLogCurl, kLogDebug, "%s{ssldata/recv} <snip>",
348  prefix.c_str());
349  return 0;
350  }
351  case CURLINFO_SSL_DATA_OUT:
352  if (size < 50) {
353  prefix += "{ssldata/sent} ";
354  break;
355  } else {
356  LogCvmfs(kLogCurl, kLogDebug, "%s{ssldata/sent} <snip>",
357  prefix.c_str());
358  return 0;
359  }
360  default:
361  // just log the message
362  break;
363  }
364 
365  bool valid_char = true;
366  std::string msg(data, size);
367  for (size_t i = 0; i < msg.length(); ++i) {
368  if (msg[i] == '\0') {
369  msg[i] = '~';
370  }
371 
372  // verify that char is a valid printable char
373  if ((msg[i] < ' ' || msg[i] > '~')
374  && (msg[i] != 10 /*line feed*/
375  && msg[i] != 13 /*carriage return*/)) {
376  valid_char = false;
377  }
378  }
379 
380  if (!valid_char) {
381  msg = "<Non-plaintext sequence>";
382  }
383 
384  LogCvmfs(kLogCurl, kLogDebug, "%s%s", prefix.c_str(),
385  Trim(msg, true /* trim_newline */).c_str());
386  return 0;
387 }
388 #endif
389 
390 //------------------------------------------------------------------------------
391 
392 
393 const int DownloadManager::kProbeUnprobed = -1;
394 const int DownloadManager::kProbeDown = -2;
395 const int DownloadManager::kProbeGeo = -3;
396 
397 bool DownloadManager::EscapeUrlChar(unsigned char input, char output[3]) {
398  if (((input >= '0') && (input <= '9')) || ((input >= 'A') && (input <= 'Z'))
399  || ((input >= 'a') && (input <= 'z')) || (input == '/') || (input == ':')
400  || (input == '.') || (input == '@') || (input == '+') || (input == '-')
401  || (input == '_') || (input == '~') || (input == '[') || (input == ']')
402  || (input == ',')) {
403  output[0] = static_cast<char>(input);
404  return false;
405  }
406 
407  output[0] = '%';
408  output[1] = static_cast<char>((input / 16)
409  + ((input / 16 <= 9) ? '0' : 'A' - 10));
410  output[2] = static_cast<char>((input % 16)
411  + ((input % 16 <= 9) ? '0' : 'A' - 10));
412  return true;
413 }
414 
415 
420 string DownloadManager::EscapeUrl(const int64_t jobinfo_id, const string &url) {
421  string escaped;
422  escaped.reserve(url.length());
423 
424  char escaped_char[3];
425  for (unsigned i = 0, s = url.length(); i < s; ++i) {
426  if (EscapeUrlChar(url[i], escaped_char)) {
427  escaped.append(escaped_char, 3);
428  } else {
429  escaped.push_back(escaped_char[0]);
430  }
431  }
432  LogCvmfs(kLogDownload, kLogDebug, "(id %" PRId64 ") escaped %s to %s",
433  jobinfo_id, url.c_str(), escaped.c_str());
434 
435  return escaped;
436 }
437 
442 unsigned DownloadManager::EscapeHeader(const string &header,
443  char *escaped_buf,
444  size_t buf_size) {
445  unsigned esc_pos = 0;
446  char escaped_char[3];
447  for (unsigned i = 0, s = header.size(); i < s; ++i) {
448  if (EscapeUrlChar(header[i], escaped_char)) {
449  for (unsigned j = 0; j < 3; ++j) {
450  if (escaped_buf) {
451  if (esc_pos >= buf_size)
452  return esc_pos;
453  escaped_buf[esc_pos] = escaped_char[j];
454  }
455  esc_pos++;
456  }
457  } else {
458  if (escaped_buf) {
459  if (esc_pos >= buf_size)
460  return esc_pos;
461  escaped_buf[esc_pos] = escaped_char[0];
462  }
463  esc_pos++;
464  }
465  }
466 
467  return esc_pos;
468 }
469 
473 int DownloadManager::ParseHttpCode(const char digits[3]) {
474  int result = 0;
475  int factor = 100;
476  for (int i = 0; i < 3; ++i) {
477  if ((digits[i] < '0') || (digits[i] > '9'))
478  return -1;
479  result += (digits[i] - '0') * factor;
480  factor /= 10;
481  }
482  return result;
483 }
484 
485 
489 int DownloadManager::CallbackCurlSocket(CURL * /* easy */,
490  curl_socket_t s,
491  int action,
492  void *userp,
493  void * /* socketp */) {
494  // LogCvmfs(kLogDownload, kLogDebug, "CallbackCurlSocket called with easy "
495  // "handle %p, socket %d, action %d", easy, s, action);
496  DownloadManager *download_mgr = static_cast<DownloadManager *>(userp);
497  if (action == CURL_POLL_NONE)
498  return 0;
499 
500  // Find s in watch_fds_
501  unsigned index;
502 
503  // TODO(heretherebedragons) why start at index = 0 and not 2?
504  // fd[0] and fd[1] are fixed?
505  for (index = 0; index < download_mgr->watch_fds_inuse_; ++index) {
506  if (download_mgr->watch_fds_[index].fd == s)
507  break;
508  }
509  // Or create newly
510  if (index == download_mgr->watch_fds_inuse_) {
511  // Extend array if necessary
512  if (download_mgr->watch_fds_inuse_ == download_mgr->watch_fds_size_) {
513  assert(download_mgr->watch_fds_size_ > 0);
514  download_mgr->watch_fds_size_ *= 2;
515  download_mgr->watch_fds_ = static_cast<struct pollfd *>(
516  srealloc(download_mgr->watch_fds_,
517  download_mgr->watch_fds_size_ * sizeof(struct pollfd)));
518  }
519  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].fd = s;
520  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].events = 0;
521  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].revents = 0;
522  download_mgr->watch_fds_inuse_++;
523  }
524 
525  switch (action) {
526  case CURL_POLL_IN:
527  download_mgr->watch_fds_[index].events = POLLIN | POLLPRI;
528  break;
529  case CURL_POLL_OUT:
530  download_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND;
531  break;
532  case CURL_POLL_INOUT:
533  download_mgr->watch_fds_[index].events = POLLIN | POLLPRI | POLLOUT
534  | POLLWRBAND;
535  break;
536  case CURL_POLL_REMOVE:
537  if (index < download_mgr->watch_fds_inuse_ - 1) {
538  download_mgr
539  ->watch_fds_[index] = download_mgr->watch_fds_
540  [download_mgr->watch_fds_inuse_ - 1];
541  }
542  download_mgr->watch_fds_inuse_--;
543  // Shrink array if necessary
544  if ((download_mgr->watch_fds_inuse_ > download_mgr->watch_fds_max_)
545  && (download_mgr->watch_fds_inuse_
546  < download_mgr->watch_fds_size_ / 2)) {
547  download_mgr->watch_fds_size_ /= 2;
548  // LogCvmfs(kLogDownload, kLogDebug, "shrinking watch_fds_ (%d)",
549  // watch_fds_size_);
550  download_mgr->watch_fds_ = static_cast<struct pollfd *>(
551  srealloc(download_mgr->watch_fds_,
552  download_mgr->watch_fds_size_ * sizeof(struct pollfd)));
553  // LogCvmfs(kLogDownload, kLogDebug, "shrinking watch_fds_ done",
554  // watch_fds_size_);
555  }
556  break;
557  default:
558  break;
559  }
560 
561  return 0;
562 }
563 
564 
568 void *DownloadManager::MainDownload(void *data) {
569  DownloadManager *download_mgr = static_cast<DownloadManager *>(data);
571  "download I/O thread of DownloadManager '%s' started",
572  download_mgr->name_.c_str());
573 
574  const int kIdxPipeTerminate = 0;
575  const int kIdxPipeJobs = 1;
576 
577  download_mgr->watch_fds_ = static_cast<struct pollfd *>(
578  smalloc(2 * sizeof(struct pollfd)));
579  download_mgr->watch_fds_size_ = 2;
580  download_mgr->watch_fds_[kIdxPipeTerminate].fd = download_mgr->pipe_terminate_
581  ->GetReadFd();
582  download_mgr->watch_fds_[kIdxPipeTerminate].events = POLLIN | POLLPRI;
583  download_mgr->watch_fds_[kIdxPipeTerminate].revents = 0;
584  download_mgr->watch_fds_[kIdxPipeJobs].fd = download_mgr->pipe_jobs_
585  ->GetReadFd();
586  download_mgr->watch_fds_[kIdxPipeJobs].events = POLLIN | POLLPRI;
587  download_mgr->watch_fds_[kIdxPipeJobs].revents = 0;
588  download_mgr->watch_fds_inuse_ = 2;
589 
590  int still_running = 0;
591  struct timeval timeval_start, timeval_stop;
592  gettimeofday(&timeval_start, NULL);
593  while (true) {
594  int timeout;
595  if (still_running) {
596  /* NOTE: The following might degrade the performance for many small files
597  * use case. TODO(jblomer): look into it.
598  // Specify a timeout for polling in ms; this allows us to return
599  // to libcurl once a second so it can look for internal operations
600  // which timed out. libcurl has a more elaborate mechanism
601  // (CURLMOPT_TIMERFUNCTION) that would inform us of the next potential
602  // timeout. TODO(bbockelm) we should switch to that in the future.
603  timeout = 100;
604  */
605  timeout = 1;
606  } else {
607  timeout = -1;
608  gettimeofday(&timeval_stop, NULL);
609  const int64_t delta = static_cast<int64_t>(
610  1000 * DiffTimeSeconds(timeval_start, timeval_stop));
611  perf::Xadd(download_mgr->counters_->sz_transfer_time, delta);
612  }
613  const int retval =
614  poll(download_mgr->watch_fds_, download_mgr->watch_fds_inuse_, timeout);
615  if (retval < 0) {
616  continue;
617  }
618 
619  // Handle timeout
620  if (retval == 0) {
621  curl_multi_socket_action(
622  download_mgr->curl_multi_, CURL_SOCKET_TIMEOUT, 0, &still_running);
623  }
624 
625  // Terminate I/O thread
626  if (download_mgr->watch_fds_[kIdxPipeTerminate].revents)
627  break;
628 
629  // New job arrives
630  if (download_mgr->watch_fds_[kIdxPipeJobs].revents) {
631  download_mgr->watch_fds_[kIdxPipeJobs].revents = 0;
632  JobInfo *info;
633  download_mgr->pipe_jobs_->Read<JobInfo *>(&info);
634  if (!still_running) {
635  gettimeofday(&timeval_start, NULL);
636  }
637  CURL *handle = download_mgr->AcquireCurlHandle();
638  download_mgr->InitializeRequest(info, handle);
639  download_mgr->SetUrlOptions(info);
640  curl_multi_add_handle(download_mgr->curl_multi_, handle);
641  curl_multi_socket_action(
642  download_mgr->curl_multi_, CURL_SOCKET_TIMEOUT, 0, &still_running);
643  }
644 
645  // Activity on curl sockets
646  // Within this loop the curl_multi_socket_action() may cause socket(s)
647  // to be removed from watch_fds_. If a socket is removed it is replaced
648  // by the socket at the end of the array and the inuse count is decreased.
649  // Therefore loop over the array in reverse order.
650  for (int64_t i = download_mgr->watch_fds_inuse_ - 1; i >= 2; --i) {
651  if (i >= download_mgr->watch_fds_inuse_) {
652  continue;
653  }
654  if (download_mgr->watch_fds_[i].revents) {
655  int ev_bitmask = 0;
656  if (download_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI))
657  ev_bitmask |= CURL_CSELECT_IN;
658  if (download_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
659  ev_bitmask |= CURL_CSELECT_OUT;
660  if (download_mgr->watch_fds_[i].revents
661  & (POLLERR | POLLHUP | POLLNVAL)) {
662  ev_bitmask |= CURL_CSELECT_ERR;
663  }
664  download_mgr->watch_fds_[i].revents = 0;
665 
666  curl_multi_socket_action(download_mgr->curl_multi_,
667  download_mgr->watch_fds_[i].fd,
668  ev_bitmask,
669  &still_running);
670  }
671  }
672 
673  // Check if transfers are completed
674  CURLMsg *curl_msg;
675  int msgs_in_queue;
676  while ((curl_msg = curl_multi_info_read(download_mgr->curl_multi_,
677  &msgs_in_queue))) {
678  if (curl_msg->msg == CURLMSG_DONE) {
679  perf::Inc(download_mgr->counters_->n_requests);
680  JobInfo *info;
681  CURL *easy_handle = curl_msg->easy_handle;
682  const int curl_error = curl_msg->data.result;
683  curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
684 
685  int64_t redir_count;
686  curl_easy_getinfo(easy_handle, CURLINFO_REDIRECT_COUNT, &redir_count);
688  "(manager '%s' - id %" PRId64 ") "
689  "Number of CURL redirects %" PRId64,
690  download_mgr->name_.c_str(), info->id(), redir_count);
691 
692  curl_multi_remove_handle(download_mgr->curl_multi_, easy_handle);
693  if (download_mgr->VerifyAndFinalize(curl_error, info)) {
694  curl_multi_add_handle(download_mgr->curl_multi_, easy_handle);
695  curl_multi_socket_action(download_mgr->curl_multi_,
696  CURL_SOCKET_TIMEOUT,
697  0,
698  &still_running);
699  } else {
700  // Return easy handle into pool and write result back
701  download_mgr->ReleaseCurlHandle(easy_handle);
702 
704  info->GetDataTubePtr()->EnqueueBack(ele);
706  info->error_code());
707  }
708  }
709  }
710  }
711 
712  for (set<CURL *>::iterator i = download_mgr->pool_handles_inuse_->begin(),
713  iEnd = download_mgr->pool_handles_inuse_->end();
714  i != iEnd;
715  ++i) {
716  curl_multi_remove_handle(download_mgr->curl_multi_, *i);
717  curl_easy_cleanup(*i);
718  }
719  download_mgr->pool_handles_inuse_->clear();
720  free(download_mgr->watch_fds_);
721 
723  "download I/O thread of DownloadManager '%s' terminated",
724  download_mgr->name_.c_str());
725  return NULL;
726 }
727 
728 
729 //------------------------------------------------------------------------------
730 
731 
732 HeaderLists::~HeaderLists() {
733  for (unsigned i = 0; i < blocks_.size(); ++i) {
734  delete[] blocks_[i];
735  }
736  blocks_.clear();
737 }
738 
739 
740 curl_slist *HeaderLists::GetList(const char *header) { return Get(header); }
741 
742 
743 curl_slist *HeaderLists::DuplicateList(curl_slist *slist) {
744  assert(slist);
745  curl_slist *copy = GetList(slist->data);
746  copy->next = slist->next;
747  curl_slist *prev = copy;
748  slist = slist->next;
749  while (slist) {
750  curl_slist *new_link = Get(slist->data);
751  new_link->next = slist->next;
752  prev->next = new_link;
753  prev = new_link;
754  slist = slist->next;
755  }
756  return copy;
757 }
758 
759 
760 void HeaderLists::AppendHeader(curl_slist *slist, const char *header) {
761  assert(slist);
762  curl_slist *new_link = Get(header);
763  new_link->next = NULL;
764 
765  while (slist->next)
766  slist = slist->next;
767  slist->next = new_link;
768 }
769 
770 
776 void HeaderLists::CutHeader(const char *header, curl_slist **slist) {
777  assert(slist);
778  curl_slist head;
779  head.next = *slist;
780  curl_slist *prev = &head;
781  curl_slist *rover = *slist;
782  while (rover) {
783  if (strcmp(rover->data, header) == 0) {
784  prev->next = rover->next;
785  Put(rover);
786  rover = prev;
787  }
788  prev = rover;
789  rover = rover->next;
790  }
791  *slist = head.next;
792 }
793 
794 
795 void HeaderLists::PutList(curl_slist *slist) {
796  while (slist) {
797  curl_slist *next = slist->next;
798  Put(slist);
799  slist = next;
800  }
801 }
802 
803 
804 string HeaderLists::Print(curl_slist *slist) {
805  string verbose;
806  while (slist) {
807  verbose += string(slist->data) + "\n";
808  slist = slist->next;
809  }
810  return verbose;
811 }
812 
813 
814 curl_slist *HeaderLists::Get(const char *header) {
815  for (unsigned i = 0; i < blocks_.size(); ++i) {
816  for (unsigned j = 0; j < kBlockSize; ++j) {
817  if (!IsUsed(&(blocks_[i][j]))) {
818  blocks_[i][j].data = const_cast<char *>(header);
819  return &(blocks_[i][j]);
820  }
821  }
822  }
823 
824  // All used, new block
825  AddBlock();
826  blocks_[blocks_.size() - 1][0].data = const_cast<char *>(header);
827  return &(blocks_[blocks_.size() - 1][0]);
828 }
829 
830 
831 void HeaderLists::Put(curl_slist *slist) {
832  slist->data = NULL;
833  slist->next = NULL;
834 }
835 
836 
837 void HeaderLists::AddBlock() {
838  curl_slist *new_block = new curl_slist[kBlockSize];
839  for (unsigned i = 0; i < kBlockSize; ++i) {
840  Put(&new_block[i]);
841  }
842  blocks_.push_back(new_block);
843 }
844 
845 
846 //------------------------------------------------------------------------------
847 
848 
849 string DownloadManager::ProxyInfo::Print() {
850  if (url == "DIRECT")
851  return url;
852 
853  string result = url;
854  const int remaining =
855  static_cast<int>(host.deadline()) - static_cast<int>(time(NULL));
856  string expinfo = (remaining >= 0) ? "+" : "";
857  if (abs(remaining) >= 3600) {
858  expinfo += StringifyInt(remaining / 3600) + "h";
859  } else if (abs(remaining) >= 60) {
860  expinfo += StringifyInt(remaining / 60) + "m";
861  } else {
862  expinfo += StringifyInt(remaining) + "s";
863  }
864  if (host.status() == dns::kFailOk) {
865  result += " (" + host.name() + ", " + expinfo + ")";
866  } else {
867  result += " (:unresolved:, " + expinfo + ")";
868  }
869  return result;
870 }
871 
872 
877 CURL *DownloadManager::AcquireCurlHandle() {
878  CURL *handle;
879 
880  if (pool_handles_idle_->empty()) {
881  // Create a new handle
882  handle = curl_easy_init();
883  assert(handle != NULL);
884 
885  curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
886  // curl_easy_setopt(curl_default, CURLOPT_FAILONERROR, 1);
887  curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION, CallbackCurlHeader);
888  curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlData);
889  } else {
890  handle = *(pool_handles_idle_->begin());
891  pool_handles_idle_->erase(pool_handles_idle_->begin());
892  }
893 
894  pool_handles_inuse_->insert(handle);
895 
896  return handle;
897 }
898 
899 
900 void DownloadManager::ReleaseCurlHandle(CURL *handle) {
901  const set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
902  assert(elem != pool_handles_inuse_->end());
903 
904  if (pool_handles_idle_->size() > pool_max_handles_) {
905  curl_easy_cleanup(*elem);
906  } else {
907  pool_handles_idle_->insert(*elem);
908  }
909 
910  pool_handles_inuse_->erase(elem);
911 }
912 
913 
918 void DownloadManager::InitializeRequest(JobInfo *info, CURL *handle) {
919  // Initialize internal download state
920  info->SetCurlHandle(handle);
921  info->SetErrorCode(kFailOk);
922  info->SetHttpCode(-1);
923  info->SetFollowRedirects(follow_redirects_);
924  info->SetNumUsedProxies(1);
925  info->SetNumUsedMetalinks(1);
926  info->SetNumUsedHosts(1);
927  info->SetNumRetries(0);
928  info->SetBackoffMs(0);
929  info->SetHeaders(header_lists_->DuplicateList(default_headers_));
930  if (info->info_header()) {
931  header_lists_->AppendHeader(info->headers(), info->info_header());
932  }
933  if (enable_http_tracing_) {
934  for (unsigned int i = 0; i < http_tracing_headers_.size(); i++) {
935  header_lists_->AppendHeader(info->headers(),
936  (http_tracing_headers_)[i].c_str());
937  }
938 
939  header_lists_->AppendHeader(info->headers(), info->tracing_header_pid());
940  header_lists_->AppendHeader(info->headers(), info->tracing_header_gid());
941  header_lists_->AppendHeader(info->headers(), info->tracing_header_uid());
942 
944  "(manager '%s' - id %" PRId64 ") "
945  "CURL Header for URL: %s is:\n %s",
946  name_.c_str(), info->id(), info->url()->c_str(),
947  header_lists_->Print(info->headers()).c_str());
948  }
949 
950  if (info->force_nocache()) {
951  SetNocache(info);
952  } else {
953  info->SetNocache(false);
954  }
955  if (info->compressed()) {
957  }
958  if (info->expected_hash()) {
959  assert(info->hash_context().buffer != NULL);
960  shash::Init(info->hash_context());
961  }
962 
963  if ((info->range_offset() != -1) && (info->range_size())) {
964  char byte_range_array[100];
965  const int64_t range_lower = static_cast<int64_t>(info->range_offset());
966  const int64_t range_upper = static_cast<int64_t>(info->range_offset()
967  + info->range_size() - 1);
968  if (snprintf(byte_range_array, sizeof(byte_range_array),
969  "%" PRId64 "-%" PRId64, range_lower, range_upper)
970  == 100) {
971  PANIC(NULL); // Should be impossible given limits on offset size.
972  }
973  curl_easy_setopt(handle, CURLOPT_RANGE, byte_range_array);
974  } else {
975  curl_easy_setopt(handle, CURLOPT_RANGE, NULL);
976  }
977 
978  // Set curl parameters
979  curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
980  curl_easy_setopt(handle, CURLOPT_WRITEHEADER, static_cast<void *>(info));
981  curl_easy_setopt(handle, CURLOPT_WRITEDATA, static_cast<void *>(info));
982  curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->headers());
983  if (info->head_request()) {
984  curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
985  } else {
986  curl_easy_setopt(handle, CURLOPT_HTTPGET, 1);
987  }
988  if (opt_ipv4_only_) {
989  curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
990  }
991  if (follow_redirects_) {
992  curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1);
993  curl_easy_setopt(handle, CURLOPT_MAXREDIRS, 4);
994  }
995 #ifdef DEBUGMSG
996  curl_easy_setopt(handle, CURLOPT_VERBOSE, 1);
997  curl_easy_setopt(handle, CURLOPT_DEBUGFUNCTION, CallbackCurlDebug);
998 #endif
999 }
1000 
1001 void DownloadManager::CheckHostInfoReset(const std::string &typ,
1002  HostInfo &info,
1003  JobInfo *jobinfo,
1004  time_t &now) {
1005  if (info.timestamp_backup > 0) {
1006  if (now == 0)
1007  now = time(NULL);
1008  if (static_cast<int64_t>(now)
1009  > static_cast<int64_t>(info.timestamp_backup + info.reset_after)) {
1011  "(manager %s - id %" PRId64 ") "
1012  "switching %s from %s to %s (reset %s)",
1013  name_.c_str(), jobinfo->id(), typ.c_str(),
1014  (*info.chain)[info.current].c_str(), (*info.chain)[0].c_str(),
1015  typ.c_str());
1016  info.current = 0;
1017  info.timestamp_backup = 0;
1018  }
1019  }
1020 }
1021 
1022 
1027 void DownloadManager::SetUrlOptions(JobInfo *info) {
1028  CURL *curl_handle = info->curl_handle();
1029  string url_prefix;
1030  time_t now = 0;
1031 
1032  const MutexLockGuard m(lock_options_);
1033 
1034  // sharding policy
1035  if (sharding_policy_.UseCount() > 0) {
1036  if (info->proxy() != "") {
1037  // proxy already set, so this is a failover event
1038  perf::Inc(counters_->n_proxy_failover);
1039  }
1040  info->SetProxy(sharding_policy_->GetNextProxy(
1041  info->url(), info->proxy(),
1042  info->range_offset() == -1 ? 0 : info->range_offset()));
1043 
1044  curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY, info->proxy().c_str());
1045  } else { // no sharding policy
1046  // Check if proxy group needs to be reset from backup to primary
1047  if (opt_timestamp_backup_proxies_ > 0) {
1048  now = time(NULL);
1049  if (static_cast<int64_t>(now) > static_cast<int64_t>(
1050  opt_timestamp_backup_proxies_ + opt_proxy_groups_reset_after_)) {
1051  opt_proxy_groups_current_ = 0;
1052  opt_timestamp_backup_proxies_ = 0;
1053  RebalanceProxiesUnlocked("Reset proxy group from backup to primary");
1054  }
1055  }
1056  // Check if load-balanced proxies within the group need to be reset
1057  if (opt_timestamp_failover_proxies_ > 0) {
1058  if (now == 0)
1059  now = time(NULL);
1060  if (static_cast<int64_t>(now)
1061  > static_cast<int64_t>(opt_timestamp_failover_proxies_
1062  + opt_proxy_groups_reset_after_)) {
1063  RebalanceProxiesUnlocked(
1064  "Reset load-balanced proxies within the active group");
1065  }
1066  }
1067 
1068  ProxyInfo *proxy = ChooseProxyUnlocked(info->expected_hash());
1069  if (!proxy || (proxy->url == "DIRECT")) {
1070  info->SetProxy("DIRECT");
1071  curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY, "");
1072  } else {
1073  // Note: inside ValidateProxyIpsUnlocked() we may change the proxy data
1074  // structure, so we must not pass proxy->... (== current_proxy())
1075  // parameters directly
1076  const std::string purl = proxy->url;
1077  const dns::Host phost = proxy->host;
1078  const bool changed = ValidateProxyIpsUnlocked(purl, phost);
1079  // Current proxy may have changed
1080  if (changed) {
1081  proxy = ChooseProxyUnlocked(info->expected_hash());
1082  }
1083  info->SetProxy(proxy->url);
1084  if (proxy->host.status() == dns::kFailOk) {
1085  curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY,
1086  info->proxy().c_str());
1087  } else {
1088  // We know it can't work, don't even try to download
1089  curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY, "0.0.0.0");
1090  }
1091  }
1092  } // end !sharding
1093 
1094  // Check if metalink and host chains need to be reset
1095  CheckHostInfoReset("metalink", opt_metalink_, info, now);
1096  CheckHostInfoReset("host", opt_metalink_, info, now);
1097 
1098  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, opt_low_speed_limit_);
1099  if (info->proxy() != "DIRECT") {
1100  curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_proxy_);
1101  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_proxy_);
1102  } else {
1103  curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_direct_);
1104  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_direct_);
1105  }
1106  if (!opt_dns_server_.empty())
1107  curl_easy_setopt(curl_handle, CURLOPT_DNS_SERVERS, opt_dns_server_.c_str());
1108 
1109  if (info->probe_hosts()) {
1110  if (CheckMetalinkChain(now)) {
1111  url_prefix = (*opt_metalink_.chain)[opt_metalink_.current];
1112  info->SetCurrentMetalinkChainIndex(opt_metalink_.current);
1114  "(manager %s - id %" PRId64 ") "
1115  "reading from metalink %d",
1116  name_.c_str(), info->id(), opt_metalink_.current);
1117  } else if (opt_host_.chain) {
1118  url_prefix = (*opt_host_.chain)[opt_host_.current];
1119  info->SetCurrentHostChainIndex(opt_host_.current);
1121  "(manager %s - id %" PRId64 ") "
1122  "reading from host %d",
1123  name_.c_str(), info->id(), opt_host_.current);
1124  }
1125  }
1126 
1127  string url = url_prefix + *(info->url());
1128 
1129  curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 1L);
1130  if (url.substr(0, 5) == "https") {
1131  const bool rvb =
1132  ssl_certificate_store_.ApplySslCertificatePath(curl_handle);
1133  if (!rvb) {
1135  "(manager %s - id %" PRId64 ") "
1136  "Failed to set SSL certificate path %s",
1137  name_.c_str(), info->id(),
1138  ssl_certificate_store_.GetCaPath().c_str());
1139  }
1140  if (info->pid() != -1) {
1141  if (credentials_attachment_ == NULL) {
1143  "(manager %s - id %" PRId64 ") "
1144  "uses secure downloads but no credentials attachment set",
1145  name_.c_str(), info->id());
1146  } else {
1147  const bool retval = credentials_attachment_->ConfigureCurlHandle(
1148  curl_handle, info->pid(), info->GetCredDataPtr());
1149  if (!retval) {
1151  "(manager %s - id %" PRId64 ") "
1152  "failed attaching credentials",
1153  name_.c_str(), info->id());
1154  }
1155  }
1156  }
1157  // The download manager disables signal handling in the curl library;
1158  // as OpenSSL's implementation of TLS will generate a sigpipe in some
1159  // error paths, we must explicitly disable SIGPIPE here.
1160  // TODO(jblomer): it should be enough to do this once
1161  signal(SIGPIPE, SIG_IGN);
1162  }
1163 
1164  if (url.find("@proxy@") != string::npos) {
1165  // This is used in Geo-API requests (only), to replace a portion of the
1166  // URL with the current proxy name for the sake of caching the result.
1167  // Replace the @proxy@ either with a passed in "forced" template (which
1168  // is set from $CVMFS_PROXY_TEMPLATE) if there is one, or a "direct"
1169  // template (which is the uuid) if there's no proxy, or the name of the
1170  // proxy.
1171  string replacement;
1172  if (proxy_template_forced_ != "") {
1173  replacement = proxy_template_forced_;
1174  } else if (info->proxy() == "DIRECT") {
1175  replacement = proxy_template_direct_;
1176  } else {
1177  if (opt_proxy_groups_current_ >= opt_proxy_groups_fallback_) {
1178  // It doesn't make sense to use the fallback proxies in Geo-API requests
1179  // since the fallback proxies are supposed to get sorted, too.
1180  info->SetProxy("DIRECT");
1181  curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY, "");
1182  replacement = proxy_template_direct_;
1183  } else {
1184  replacement = ChooseProxyUnlocked(info->expected_hash())->host.name();
1185  }
1186  }
1187  replacement = (replacement == "") ? proxy_template_direct_ : replacement;
1189  "(manager %s - id %" PRId64 ") "
1190  "replacing @proxy@ by %s",
1191  name_.c_str(), info->id(), replacement.c_str());
1192  url = ReplaceAll(url, "@proxy@", replacement);
1193  }
1194 
1195  // TODO(heretherebedragons) before removing
1196  // static_cast<cvmfs::MemSink*>(info->sink)->size() == 0
1197  // and just always call info->sink->Reserve()
1198  // we should do a speed check
1199  if ((info->sink() != NULL) && info->sink()->RequiresReserve()
1200  && (static_cast<cvmfs::MemSink *>(info->sink())->size() == 0)
1201  && HasPrefix(url, "file://", false)) {
1202  platform_stat64 stat_buf;
1203  const int retval = platform_stat(url.c_str(), &stat_buf);
1204  if (retval != 0) {
1205  // this is an error: file does not exist or out of memory
1206  // error is caught in other code section.
1207  info->sink()->Reserve(64ul * 1024ul);
1208  } else {
1209  info->sink()->Reserve(stat_buf.st_size);
1210  }
1211  }
1212 
1213  curl_easy_setopt(curl_handle, CURLOPT_URL,
1214  EscapeUrl(info->id(), url).c_str());
1215 }
1216 
1217 
1227 bool DownloadManager::ValidateProxyIpsUnlocked(const string &url,
1228  const dns::Host &host) {
1229  if (!host.IsExpired())
1230  return false;
1231  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s') validate DNS entry for %s",
1232  name_.c_str(), host.name().c_str());
1233 
1234  const unsigned group_idx = opt_proxy_groups_current_;
1235  dns::Host new_host = resolver_->Resolve(host.name());
1236 
1237  bool update_only = true; // No changes to the list of IP addresses.
1238  if (new_host.status() != dns::kFailOk) {
1239  // Try again later in case resolving fails.
1241  "(manager '%s') failed to resolve IP addresses for %s (%d - %s)",
1242  name_.c_str(), host.name().c_str(), new_host.status(),
1243  dns::Code2Ascii(new_host.status()));
1244  new_host = dns::Host::ExtendDeadline(host, resolver_->min_ttl());
1245  } else if (!host.IsEquivalent(new_host)) {
1246  update_only = false;
1247  }
1248 
1249  if (update_only) {
1250  for (unsigned i = 0; i < (*opt_proxy_groups_)[group_idx].size(); ++i) {
1251  if ((*opt_proxy_groups_)[group_idx][i].host.id() == host.id())
1252  (*opt_proxy_groups_)[group_idx][i].host = new_host;
1253  }
1254  return false;
1255  }
1256 
1257  assert(new_host.status() == dns::kFailOk);
1258 
1259  // Remove old host objects, insert new objects, and rebalance.
1261  "(manager '%s') DNS entries for proxy %s changed, adjusting",
1262  name_.c_str(), host.name().c_str());
1263  vector<ProxyInfo> *group = current_proxy_group();
1264  opt_num_proxies_ -= group->size();
1265  for (unsigned i = 0; i < group->size();) {
1266  if ((*group)[i].host.id() == host.id()) {
1267  group->erase(group->begin() + i);
1268  } else {
1269  i++;
1270  }
1271  }
1272  vector<ProxyInfo> new_infos;
1273  set<string> best_addresses = new_host.ViewBestAddresses(opt_ip_preference_);
1274  set<string>::const_iterator iter_ips = best_addresses.begin();
1275  for (; iter_ips != best_addresses.end(); ++iter_ips) {
1276  const string url_ip = dns::RewriteUrl(url, *iter_ips);
1277  new_infos.push_back(ProxyInfo(new_host, url_ip));
1278  }
1279  group->insert(group->end(), new_infos.begin(), new_infos.end());
1280  opt_num_proxies_ += new_infos.size();
1281 
1282  const std::string msg = "DNS entries for proxy " + host.name() + " changed";
1283 
1284  RebalanceProxiesUnlocked(msg);
1285  return true;
1286 }
1287 
1288 
1292 void DownloadManager::UpdateStatistics(CURL *handle) {
1293  double val;
1294  int retval;
1295  int64_t sum = 0;
1296 
1297  retval = curl_easy_getinfo(handle, CURLINFO_SIZE_DOWNLOAD, &val);
1298  assert(retval == CURLE_OK);
1299  sum += static_cast<int64_t>(val);
1300  /*retval = curl_easy_getinfo(handle, CURLINFO_HEADER_SIZE, &val);
1301  assert(retval == CURLE_OK);
1302  sum += static_cast<int64_t>(val);*/
1303  perf::Xadd(counters_->sz_transferred_bytes, sum);
1304 }
1305 
1306 
1310 bool DownloadManager::CanRetry(const JobInfo *info) {
1311  const MutexLockGuard m(lock_options_);
1312  const unsigned max_retries = opt_max_retries_;
1313 
1314  return !(info->nocache()) && (info->num_retries() < max_retries)
1315  && (IsProxyTransferError(info->error_code())
1316  || IsHostTransferError(info->error_code()));
1317 }
1318 
1326 void DownloadManager::Backoff(JobInfo *info) {
1327  unsigned backoff_init_ms = 0;
1328  unsigned backoff_max_ms = 0;
1329  {
1330  const MutexLockGuard m(lock_options_);
1331  backoff_init_ms = opt_backoff_init_ms_;
1332  backoff_max_ms = opt_backoff_max_ms_;
1333  }
1334 
1335  info->SetNumRetries(info->num_retries() + 1);
1336  perf::Inc(counters_->n_retries);
1337  if (info->backoff_ms() == 0) {
1338  info->SetBackoffMs(prng_.Next(backoff_init_ms + 1)); // Must be != 0
1339  } else {
1340  info->SetBackoffMs(info->backoff_ms() * 2);
1341  }
1342  if (info->backoff_ms() > backoff_max_ms) {
1343  info->SetBackoffMs(backoff_max_ms);
1344  }
1345 
1347  "(manager '%s' - id %" PRId64 ") backing off for %d ms",
1348  name_.c_str(), info->id(), info->backoff_ms());
1349  SafeSleepMs(info->backoff_ms());
1350 }
1351 
1352 void DownloadManager::SetNocache(JobInfo *info) {
1353  if (info->nocache())
1354  return;
1355  header_lists_->AppendHeader(info->headers(), "Pragma: no-cache");
1356  header_lists_->AppendHeader(info->headers(), "Cache-Control: no-cache");
1357  curl_easy_setopt(info->curl_handle(), CURLOPT_HTTPHEADER, info->headers());
1358  info->SetNocache(true);
1359 }
1360 
1361 
1366 void DownloadManager::SetRegularCache(JobInfo *info) {
1367  if (info->nocache() == false)
1368  return;
1369  header_lists_->CutHeader("Pragma: no-cache", info->GetHeadersPtr());
1370  header_lists_->CutHeader("Cache-Control: no-cache", info->GetHeadersPtr());
1371  curl_easy_setopt(info->curl_handle(), CURLOPT_HTTPHEADER, info->headers());
1372  info->SetNocache(false);
1373 }
1374 
1375 
1379 void DownloadManager::ReleaseCredential(JobInfo *info) {
1380  if (info->cred_data()) {
1381  assert(credentials_attachment_ != NULL); // Someone must have set it
1382  credentials_attachment_->ReleaseCurlHandle(info->curl_handle(),
1383  info->cred_data());
1384  info->SetCredData(NULL);
1385  }
1386 }
1387 
1388 
1389 /* Sort links based on the "pri=" parameter */
1390 static bool sortlinks(const std::string &s1, const std::string &s2) {
1391  const size_t pos1 = s1.find("; pri=");
1392  const size_t pos2 = s2.find("; pri=");
1393  int pri1, pri2;
1394  if ((pos1 != std::string::npos) && (pos2 != std::string::npos)
1395  && (sscanf(s1.substr(pos1 + 6).c_str(), "%d", &pri1) == 1)
1396  && (sscanf(s2.substr(pos2 + 6).c_str(), "%d", &pri2) == 1)) {
1397  return pri1 < pri2;
1398  }
1399  return false;
1400 }
1401 
1406 void DownloadManager::ProcessLink(JobInfo *info) {
1407  std::vector<std::string> links = SplitString(info->link(), ',');
1408  if (info->link().find("; pri=") != std::string::npos)
1409  std::sort(links.begin(), links.end(), sortlinks);
1410 
1411  std::vector<std::string> host_list;
1412 
1413  std::vector<std::string>::const_iterator il = links.begin();
1414  for (; il != links.end(); ++il) {
1415  const std::string &link = *il;
1416  if ((link.find("; rel=duplicate") == std::string::npos)
1417  && (link.find("; rel=\"duplicate\"") == std::string::npos)) {
1419  "skipping link '%s' because it does not contain rel=duplicate",
1420  link.c_str());
1421  continue;
1422  }
1423  // ignore depth= field since there's nothing useful we can do with it
1424 
1425  size_t start = link.find('<');
1426  if (start == std::string::npos) {
1427  LogCvmfs(
1429  "skipping link '%s' because it does not have a left angle bracket",
1430  link.c_str());
1431  continue;
1432  }
1433 
1434  start++;
1435  if ((link.substr(start, 7) != "http://")
1436  && (link.substr(start, 8) != "https://")) {
1438  "skipping link '%s' of unrecognized url protocol", link.c_str());
1439  continue;
1440  }
1441 
1442  size_t end = link.find('/', start + 8);
1443  if (end == std::string::npos)
1444  end = link.find('>');
1445  if (end == std::string::npos) {
1447  "skipping link '%s' because no slash in url and no right angle "
1448  "bracket",
1449  link.c_str());
1450  continue;
1451  }
1452  const std::string host = link.substr(start, end - start);
1453  LogCvmfs(kLogDownload, kLogDebug, "adding linked host '%s'", host.c_str());
1454  host_list.push_back(host);
1455  }
1456 
1457  if (host_list.size() > 0) {
1458  SetHostChain(host_list);
1459  opt_metalink_timestamp_link_ = time(NULL);
1460  }
1461 }
1462 
1463 
1470 bool DownloadManager::VerifyAndFinalize(const int curl_error, JobInfo *info) {
1472  "(manager '%s' - id %" PRId64 ") "
1473  "Verify downloaded url %s, proxy %s (curl error %d)",
1474  name_.c_str(), info->id(), info->url()->c_str(),
1475  info->proxy().c_str(), curl_error);
1476  UpdateStatistics(info->curl_handle());
1477 
1478  bool was_metalink;
1479  std::string typ;
1480  if (info->current_metalink_chain_index() >= 0) {
1481  was_metalink = true;
1482  typ = "metalink";
1483  if (info->link() != "") {
1484  // process Link header whether or not the redirected URL got an error
1485  ProcessLink(info);
1486  }
1487  } else {
1488  was_metalink = false;
1489  typ = "host";
1490  }
1491 
1492 
1493  // Verification and error classification
1494  switch (curl_error) {
1495  case CURLE_OK:
1496  // Verify content hash
1497  if (info->expected_hash()) {
1498  shash::Any match_hash;
1499  shash::Final(info->hash_context(), &match_hash);
1500  if (match_hash != *(info->expected_hash())) {
1501  if (ignore_signature_failures_) {
1502  LogCvmfs(
1504  "(manager '%s' - id %" PRId64 ") "
1505  "ignoring failed hash verification of %s (expected %s, got %s)",
1506  name_.c_str(), info->id(), info->url()->c_str(),
1507  info->expected_hash()->ToString().c_str(),
1508  match_hash.ToString().c_str());
1509  } else {
1511  "(manager '%s' - id %" PRId64 ") "
1512  "hash verification of %s failed (expected %s, got %s)",
1513  name_.c_str(), info->id(), info->url()->c_str(),
1514  info->expected_hash()->ToString().c_str(),
1515  match_hash.ToString().c_str());
1516  info->SetErrorCode(kFailBadData);
1517  break;
1518  }
1519  }
1520  }
1521 
1522  info->SetErrorCode(kFailOk);
1523  break;
1524  case CURLE_UNSUPPORTED_PROTOCOL:
1526  break;
1527  case CURLE_URL_MALFORMAT:
1528  info->SetErrorCode(kFailBadUrl);
1529  break;
1530  case CURLE_COULDNT_RESOLVE_PROXY:
1532  break;
1533  case CURLE_COULDNT_RESOLVE_HOST:
1535  break;
1536  case CURLE_OPERATION_TIMEDOUT:
1537  info->SetErrorCode((info->proxy() == "DIRECT") ? kFailHostTooSlow
1538  : kFailProxyTooSlow);
1539  break;
1540  case CURLE_PARTIAL_FILE:
1541  case CURLE_GOT_NOTHING:
1542  case CURLE_RECV_ERROR:
1543  info->SetErrorCode((info->proxy() == "DIRECT") ? kFailHostShortTransfer
1545  break;
1546  case CURLE_FILE_COULDNT_READ_FILE:
1547  case CURLE_COULDNT_CONNECT:
1548  if (info->proxy() != "DIRECT") {
1549  // This is a guess. Fail-over can still change to switching host
1551  } else {
1553  }
1554  break;
1555  case CURLE_TOO_MANY_REDIRECTS:
1557  break;
1558  case CURLE_SSL_CACERT_BADFILE:
1560  "(manager '%s' -id %" PRId64 ") "
1561  "Failed to load certificate bundle. "
1562  "X509_CERT_BUNDLE might point to the wrong location.",
1563  name_.c_str(), info->id());
1565  break;
1566  // As of curl 7.62.0, CURLE_SSL_CACERT is the same as
1567  // CURLE_PEER_FAILED_VERIFICATION
1568  case CURLE_PEER_FAILED_VERIFICATION:
1570  "(manager '%s' - id %" PRId64 ") "
1571  "invalid SSL certificate of remote host. "
1572  "X509_CERT_DIR and/or X509_CERT_BUNDLE might point to the wrong "
1573  "location.",
1574  name_.c_str(), info->id());
1576  break;
1577  case CURLE_ABORTED_BY_CALLBACK:
1578  case CURLE_WRITE_ERROR:
1579  // Error set by callback
1580  break;
1581  case CURLE_SEND_ERROR:
1582  // The curl error CURLE_SEND_ERROR can be seen when a cache is misbehaving
1583  // and closing connections before the http request send is completed.
1584  // Handle this error, treating it as a short transfer error.
1585  info->SetErrorCode((info->proxy() == "DIRECT") ? kFailHostShortTransfer
1586  : kFailProxyShortTransfer);
1587  break;
1588  default:
1590  "(manager '%s' - id %" PRId64 ") "
1591  "unexpected curl error (%d) while trying to fetch %s",
1592  name_.c_str(), info->id(), curl_error, info->url()->c_str());
1593  info->SetErrorCode(kFailOther);
1594  break;
1595  }
1596 
1597  std::vector<std::string> *host_chain;
1598  unsigned char num_used_hosts;
1599  if (was_metalink) {
1600  host_chain = opt_metalink_.chain;
1601  num_used_hosts = info->num_used_metalinks();
1602  } else {
1603  host_chain = opt_host_.chain;
1604  num_used_hosts = info->num_used_hosts();
1605  }
1606 
1607  // Determination if download should be repeated
1608  bool try_again = false;
1609  bool same_url_retry = CanRetry(info);
1610  if (info->error_code() != kFailOk) {
1611  const MutexLockGuard m(lock_options_);
1612  if (info->error_code() == kFailBadData) {
1613  if (!info->nocache()) {
1614  try_again = true;
1615  } else {
1616  // Make it a host failure
1618  "(manager '%s' - id %" PRId64 ") "
1619  "data corruption with no-cache header, try another %s",
1620  name_.c_str(), info->id(), typ.c_str());
1621 
1622  info->SetErrorCode(kFailHostHttp);
1623  }
1624  }
1625  if (same_url_retry
1626  || (((info->error_code() == kFailHostResolve)
1627  || IsHostTransferError(info->error_code())
1628  || (info->error_code() == kFailHostHttp))
1629  && info->probe_hosts() && host_chain
1630  && (num_used_hosts < host_chain->size()))) {
1631  try_again = true;
1632  }
1633  if (same_url_retry
1634  || (((info->error_code() == kFailProxyResolve)
1635  || IsProxyTransferError(info->error_code())
1636  || (info->error_code() == kFailProxyHttp)))) {
1637  if (sharding_policy_.UseCount() > 0) { // sharding policy
1638  try_again = true;
1639  same_url_retry = false;
1640  } else { // no sharding
1641  try_again = true;
1642  // If all proxies failed, do a next round with the next host
1643  if (!same_url_retry && (info->num_used_proxies() >= opt_num_proxies_)) {
1644  // Check if this can be made a host fail-over
1645  if (info->probe_hosts() && host_chain
1646  && (num_used_hosts < host_chain->size())) {
1647  // reset proxy group if not already performed by other handle
1648  if (opt_proxy_groups_) {
1649  if ((opt_proxy_groups_current_ > 0)
1650  || (opt_proxy_groups_current_burned_ > 0)) {
1651  opt_proxy_groups_current_ = 0;
1652  opt_timestamp_backup_proxies_ = 0;
1653  const std::string msg = "reset proxies for " + typ
1654  + " failover";
1655  RebalanceProxiesUnlocked(msg);
1656  }
1657  }
1658 
1659  // Make it a host failure
1661  "(manager '%s' - id %" PRId64 ") make it a %s failure",
1662  name_.c_str(), info->id(), typ.c_str());
1663  info->SetNumUsedProxies(1);
1665  } else {
1666  if (failover_indefinitely_) {
1667  // Instead of giving up, reset the num_used_proxies counter,
1668  // switch proxy and try again
1670  "(manager '%s' - id %" PRId64 ") "
1671  "VerifyAndFinalize() would fail the download here. "
1672  "Instead switch proxy and retry download. "
1673  "typ=%s "
1674  "info->probe_hosts=%d host_chain=%p num_used_hosts=%d "
1675  "host_chain->size()=%lu same_url_retry=%d "
1676  "info->num_used_proxies=%d opt_num_proxies_=%d",
1677  name_.c_str(), info->id(), typ.c_str(),
1678  static_cast<int>(info->probe_hosts()), host_chain,
1679  num_used_hosts, host_chain ? host_chain->size() : -1,
1680  static_cast<int>(same_url_retry),
1681  info->num_used_proxies(), opt_num_proxies_);
1682  info->SetNumUsedProxies(1);
1683  RebalanceProxiesUnlocked(
1684  "download failed - failover indefinitely");
1685  try_again = !Interrupted(fqrn_, info);
1686  } else {
1687  try_again = false;
1688  }
1689  }
1690  } // Make a proxy failure a host failure
1691  } // Proxy failure assumed
1692  } // end !sharding
1693  }
1694 
1695  if (try_again) {
1697  "(manager '%s' - id %" PRId64 ") "
1698  "Trying again on same curl handle, same url: %d, "
1699  "error code %d no-cache %d",
1700  name_.c_str(), info->id(), same_url_retry, info->error_code(),
1701  info->nocache());
1702  // Reset internal state and destination
1703  if (info->sink() != NULL && info->sink()->Reset() != 0) {
1704  info->SetErrorCode(kFailLocalIO);
1705  goto verify_and_finalize_stop;
1706  }
1707  if (info->interrupt_cue() && info->interrupt_cue()->IsCanceled()) {
1708  info->SetErrorCode(kFailCanceled);
1709  goto verify_and_finalize_stop;
1710  }
1711 
1712  if (info->expected_hash()) {
1713  shash::Init(info->hash_context());
1714  }
1715  if (info->compressed()) {
1717  }
1718 
1719  if (sharding_policy_.UseCount() > 0) { // sharding policy
1720  ReleaseCredential(info);
1721  SetUrlOptions(info);
1722  } else { // no sharding policy
1723  SetRegularCache(info);
1724 
1725  // Failure handling
1726  bool switch_proxy = false;
1727  bool switch_host = false;
1728  switch (info->error_code()) {
1729  case kFailBadData:
1730  SetNocache(info);
1731  break;
1732  case kFailProxyResolve:
1733  case kFailProxyHttp:
1734  switch_proxy = true;
1735  break;
1736  case kFailHostResolve:
1737  case kFailHostHttp:
1738  case kFailHostAfterProxy:
1739  switch_host = true;
1740  break;
1741  default:
1742  if (IsProxyTransferError(info->error_code())) {
1743  if (same_url_retry) {
1744  Backoff(info);
1745  } else {
1746  switch_proxy = true;
1747  }
1748  } else if (IsHostTransferError(info->error_code())) {
1749  if (same_url_retry) {
1750  Backoff(info);
1751  } else {
1752  switch_host = true;
1753  }
1754  } else {
1755  // No other errors expected when retrying
1756  PANIC(NULL);
1757  }
1758  }
1759  if (switch_proxy) {
1760  ReleaseCredential(info);
1761  SwitchProxy(info);
1762  info->SetNumUsedProxies(info->num_used_proxies() + 1);
1763  SetUrlOptions(info);
1764  }
1765  if (switch_host) {
1766  ReleaseCredential(info);
1767  if (was_metalink) {
1768  SwitchMetalink(info);
1769  info->SetNumUsedMetalinks(num_used_hosts + 1);
1770  } else {
1771  SwitchHost(info);
1772  info->SetNumUsedHosts(num_used_hosts + 1);
1773  }
1774  SetUrlOptions(info);
1775  }
1776  } // end !sharding
1777 
1778  if (failover_indefinitely_) {
1779  // try again, breaking if there's a cvmfs reload happening and we are in a
1780  // proxy failover. This will EIO the call application.
1781  return !Interrupted(fqrn_, info);
1782  }
1783  return true; // try again
1784  }
1785 
1786 verify_and_finalize_stop:
1787  // Finalize, flush destination file
1788  ReleaseCredential(info);
1789  if (info->sink() != NULL && info->sink()->Flush() != 0) {
1790  info->SetErrorCode(kFailLocalIO);
1791  }
1792 
1793  if (info->compressed())
1795 
1796  if (info->headers()) {
1797  header_lists_->PutList(info->headers());
1798  info->SetHeaders(NULL);
1799  }
1800 
1801  return false; // stop transfer and return to Fetch()
1802 }
1803 
1804 DownloadManager::~DownloadManager() {
1805  // cleaned up fini
1806  if (sharding_policy_.UseCount() > 0) {
1807  sharding_policy_.Reset();
1808  }
1809  if (health_check_.UseCount() > 0) {
1810  if (health_check_.Unique()) {
1812  "(manager '%s') Stopping healthcheck thread", name_.c_str());
1813  health_check_->StopHealthcheck();
1814  }
1815  health_check_.Reset();
1816  }
1817 
1818  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1819  // Shutdown I/O thread
1820  pipe_terminate_->Write(kPipeTerminateSignal);
1821  pthread_join(thread_download_, NULL);
1822  // All handles are removed from the multi stack
1823  pipe_terminate_.Destroy();
1824  pipe_jobs_.Destroy();
1825  }
1826 
1827  for (set<CURL *>::iterator i = pool_handles_idle_->begin(),
1828  iEnd = pool_handles_idle_->end();
1829  i != iEnd;
1830  ++i) {
1831  curl_easy_cleanup(*i);
1832  }
1833 
1834  delete pool_handles_idle_;
1835  delete pool_handles_inuse_;
1836  curl_multi_cleanup(curl_multi_);
1837 
1838  delete header_lists_;
1839  if (user_agent_)
1840  free(user_agent_);
1841 
1842  delete counters_;
1843  delete opt_host_.chain;
1844  delete opt_host_chain_rtt_;
1845  delete opt_proxy_groups_;
1846 
1847  curl_global_cleanup();
1848  delete resolver_;
1849 
1850  // old destructor
1851  pthread_mutex_destroy(lock_options_);
1852  pthread_mutex_destroy(lock_synchronous_mode_);
1853  free(lock_options_);
1854  free(lock_synchronous_mode_);
1855 }
1856 
1857 void DownloadManager::InitHeaders() {
1858  // User-Agent
1859  string cernvm_id = "User-Agent: cvmfs ";
1860 #ifdef CVMFS_LIBCVMFS
1861  cernvm_id += "libcvmfs ";
1862 #else
1863  cernvm_id += "Fuse ";
1864 #endif
1865  cernvm_id += string(CVMFS_VERSION);
1866  if (getenv("CERNVM_UUID") != NULL) {
1867  cernvm_id += " "
1868  + sanitizer::InputSanitizer("az AZ 09 -")
1869  .Filter(getenv("CERNVM_UUID"));
1870  }
1871  user_agent_ = strdup(cernvm_id.c_str());
1872 
1873  header_lists_ = new HeaderLists();
1874 
1875  default_headers_ = header_lists_->GetList("Connection: Keep-Alive");
1876  header_lists_->AppendHeader(default_headers_, "Pragma:");
1877  header_lists_->AppendHeader(default_headers_, user_agent_);
1878 }
1879 
1880 DownloadManager::DownloadManager(const unsigned max_pool_handles,
1881  const perf::StatisticsTemplate &statistics,
1882  const std::string &name)
1883  : prng_(Prng())
1884  , pool_handles_idle_(new set<CURL *>)
1885  , pool_handles_inuse_(new set<CURL *>)
1886  , pool_max_handles_(max_pool_handles)
1887  , pipe_terminate_(NULL)
1888  , pipe_jobs_(NULL)
1889  , watch_fds_(NULL)
1890  , watch_fds_size_(0)
1891  , watch_fds_inuse_(0)
1892  , watch_fds_max_(4 * max_pool_handles)
1893  , opt_timeout_proxy_(5)
1894  , opt_timeout_direct_(10)
1895  , opt_low_speed_limit_(1024)
1896  , opt_max_retries_(0)
1897  , opt_backoff_init_ms_(0)
1898  , opt_backoff_max_ms_(0)
1899  , enable_info_header_(false)
1900  , opt_ipv4_only_(false)
1901  , follow_redirects_(false)
1902  , ignore_signature_failures_(false)
1903  , enable_http_tracing_(false)
1904  , opt_metalink_(NULL, 0, 0, 0)
1905  , opt_metalink_timestamp_link_(0)
1906  , opt_host_(NULL, 0, 0, 0)
1907  , opt_host_chain_rtt_(NULL)
1908  , opt_proxy_groups_(NULL)
1909  , opt_proxy_groups_current_(0)
1910  , opt_proxy_groups_current_burned_(0)
1911  , opt_proxy_groups_fallback_(0)
1912  , opt_num_proxies_(0)
1913  , opt_proxy_shard_(false)
1914  , failover_indefinitely_(false)
1915  , name_(name)
1916  , opt_ip_preference_(dns::kIpPreferSystem)
1917  , opt_timestamp_backup_proxies_(0)
1918  , opt_timestamp_failover_proxies_(0)
1919  , opt_proxy_groups_reset_after_(0)
1920  , credentials_attachment_(NULL)
1921  , counters_(new Counters(statistics)) {
1922  atomic_init32(&multi_threaded_);
1923 
1924  lock_options_ = reinterpret_cast<pthread_mutex_t *>(
1925  smalloc(sizeof(pthread_mutex_t)));
1926  int retval = pthread_mutex_init(lock_options_, NULL);
1927  assert(retval == 0);
1928  lock_synchronous_mode_ = reinterpret_cast<pthread_mutex_t *>(
1929  smalloc(sizeof(pthread_mutex_t)));
1930  retval = pthread_mutex_init(lock_synchronous_mode_, NULL);
1931  assert(retval == 0);
1932 
1933  retval = curl_global_init(CURL_GLOBAL_ALL);
1934  assert(retval == CURLE_OK);
1935 
1936  InitHeaders();
1937 
1938  curl_multi_ = curl_multi_init();
1939  assert(curl_multi_ != NULL);
1940  curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION, CallbackCurlSocket);
1941  curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1942  static_cast<void *>(this));
1943  curl_multi_setopt(curl_multi_, CURLMOPT_MAXCONNECTS, watch_fds_max_);
1944  curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1946 
1947  prng_.InitLocaltime();
1948 
1949  // Name resolving
1950  if ((getenv("CVMFS_IPV4_ONLY") != NULL)
1951  && (strlen(getenv("CVMFS_IPV4_ONLY")) > 0)) {
1952  opt_ipv4_only_ = true;
1953  }
1956  assert(resolver_);
1957 }
1958 
1966 
1967  const int retval = pthread_create(&thread_download_, NULL, MainDownload,
1968  static_cast<void *>(this));
1969  assert(retval == 0);
1970 
1971  atomic_inc32(&multi_threaded_);
1972 
1973  if (health_check_.UseCount() > 0) {
1975  "(manager '%s') Starting healthcheck thread", name_.c_str());
1976  health_check_->StartHealthcheck();
1977  }
1978 }
1979 
1980 
1985  assert(info != NULL);
1986  assert(info->url() != NULL);
1987 
1988  Failures result;
1989  result = PrepareDownloadDestination(info);
1990  if (result != kFailOk)
1991  return result;
1992 
1993  if (info->expected_hash()) {
1996  info->GetHashContextPtr()->size = shash::GetContextSize(algorithm);
1997  info->GetHashContextPtr()->buffer = alloca(info->hash_context().size);
1998  }
1999 
2000  // In case JobInfo object is being reused
2001  info->SetLink("");
2002 
2003  // Prepare cvmfs-info: header, allocate string on the stack
2004  info->SetInfoHeader(NULL);
2005  if (enable_info_header_ && info->extra_info()) {
2006  const char *header_name = "cvmfs-info: ";
2007  const size_t header_name_len = strlen(header_name);
2008  const unsigned header_size = 1 + header_name_len
2009  + EscapeHeader(*(info->extra_info()), NULL, 0);
2010  info->SetInfoHeader(static_cast<char *>(alloca(header_size)));
2011  memcpy(info->info_header(), header_name, header_name_len);
2012  EscapeHeader(*(info->extra_info()), info->info_header() + header_name_len,
2013  header_size - header_name_len);
2014  info->info_header()[header_size - 1] = '\0';
2015  }
2016 
2017  if (enable_http_tracing_) {
2018  const std::string str_pid = "X-CVMFS-PID: " + StringifyInt(info->pid());
2019  const std::string str_gid = "X-CVMFS-GID: " + StringifyUint(info->gid());
2020  const std::string str_uid = "X-CVMFS-UID: " + StringifyUint(info->uid());
2021 
2022  // will be auto freed at the end of this function Fetch(JobInfo *info)
2023  info->SetTracingHeaderPid(static_cast<char *>(alloca(str_pid.size() + 1)));
2024  info->SetTracingHeaderGid(static_cast<char *>(alloca(str_gid.size() + 1)));
2025  info->SetTracingHeaderUid(static_cast<char *>(alloca(str_uid.size() + 1)));
2026 
2027  memcpy(info->tracing_header_pid(), str_pid.c_str(), str_pid.size() + 1);
2028  memcpy(info->tracing_header_gid(), str_gid.c_str(), str_gid.size() + 1);
2029  memcpy(info->tracing_header_uid(), str_uid.c_str(), str_uid.size() + 1);
2030  }
2031 
2032  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
2033  if (!info->IsValidPipeJobResults()) {
2034  info->CreatePipeJobResults();
2035  }
2036  if (!info->IsValidDataTube()) {
2037  info->CreateDataTube();
2038  }
2039 
2040  // LogCvmfs(kLogDownload, kLogDebug, "send job to thread, pipe %d %d",
2041  // info->wait_at[0], info->wait_at[1]);
2042  pipe_jobs_->Write<JobInfo *>(info);
2043 
2044  do {
2045  DataTubeElement *ele = info->GetDataTubePtr()->PopFront();
2046 
2047  if (ele->action == kActionStop) {
2048  delete ele;
2049  break;
2050  }
2051  // TODO(heretherebedragons) add compression
2052  } while (true);
2053 
2054  info->GetPipeJobResultPtr()->Read<download::Failures>(&result);
2055  // LogCvmfs(kLogDownload, kLogDebug, "got result %d", result);
2056  } else {
2058  CURL *handle = AcquireCurlHandle();
2059  InitializeRequest(info, handle);
2060  SetUrlOptions(info);
2061  // curl_easy_setopt(handle, CURLOPT_VERBOSE, 1);
2062  int retval;
2063  do {
2064  retval = curl_easy_perform(handle);
2066  double elapsed;
2067  if (curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &elapsed)
2068  == CURLE_OK) {
2070  static_cast<int64_t>(elapsed * 1000));
2071  }
2072  } while (VerifyAndFinalize(retval, info));
2073  result = info->error_code();
2074  ReleaseCurlHandle(info->curl_handle());
2075  }
2076 
2077  if (result != kFailOk) {
2079  "(manager '%s' - id %" PRId64 ") "
2080  "download failed (error %d - %s)",
2081  name_.c_str(), info->id(), result, Code2Ascii(result));
2082 
2083  if (info->sink() != NULL) {
2084  info->sink()->Purge();
2085  }
2086  }
2087 
2088  return result;
2089 }
2090 
2091 
2097  const MutexLockGuard m(lock_options_);
2099 }
2100 
2104 std::string DownloadManager::GetDnsServer() const { return opt_dns_server_; }
2105 
2110 void DownloadManager::SetDnsServer(const string &address) {
2111  if (!address.empty()) {
2112  const MutexLockGuard m(lock_options_);
2113  opt_dns_server_ = address;
2114  assert(!opt_dns_server_.empty());
2115 
2116  vector<string> servers;
2117  servers.push_back(address);
2118  const bool retval = resolver_->SetResolvers(servers);
2119  assert(retval);
2120  }
2121  LogCvmfs(kLogDownload, kLogSyslog, "(manager '%s') set nameserver to %s",
2122  name_.c_str(), address.c_str());
2123 }
2124 
2125 
2130  const unsigned timeout_ms) {
2131  const MutexLockGuard m(lock_options_);
2132  if ((resolver_->retries() == retries)
2133  && (resolver_->timeout_ms() == timeout_ms)) {
2134  return;
2135  }
2136  delete resolver_;
2137  resolver_ = NULL;
2138  resolver_ = dns::NormalResolver::Create(opt_ipv4_only_, retries, timeout_ms);
2139  assert(resolver_);
2140 }
2141 
2142 
2143 void DownloadManager::SetDnsTtlLimits(const unsigned min_seconds,
2144  const unsigned max_seconds) {
2145  const MutexLockGuard m(lock_options_);
2146  resolver_->set_min_ttl(min_seconds);
2147  resolver_->set_max_ttl(max_seconds);
2148 }
2149 
2150 
2152  const MutexLockGuard m(lock_options_);
2153  opt_ip_preference_ = preference;
2154 }
2155 
2156 
2162 void DownloadManager::SetTimeout(const unsigned seconds_proxy,
2163  const unsigned seconds_direct) {
2164  const MutexLockGuard m(lock_options_);
2165  opt_timeout_proxy_ = seconds_proxy;
2166  opt_timeout_direct_ = seconds_direct;
2167 }
2168 
2169 
2175 void DownloadManager::SetLowSpeedLimit(const unsigned low_speed_limit) {
2176  const MutexLockGuard m(lock_options_);
2177  opt_low_speed_limit_ = low_speed_limit;
2178 }
2179 
2180 
2184 void DownloadManager::GetTimeout(unsigned *seconds_proxy,
2185  unsigned *seconds_direct) {
2186  const MutexLockGuard m(lock_options_);
2187  *seconds_proxy = opt_timeout_proxy_;
2188  *seconds_direct = opt_timeout_direct_;
2189 }
2190 
2191 
2196 void DownloadManager::SetMetalinkChain(const string &metalink_list) {
2197  SetMetalinkChain(SplitString(metalink_list, ';'));
2198 }
2199 
2200 
2202  const std::vector<std::string> &metalink_list) {
2203  const MutexLockGuard m(lock_options_);
2205  delete opt_metalink_.chain;
2206  opt_metalink_.current = 0;
2207 
2208  if (metalink_list.empty()) {
2209  opt_metalink_.chain = NULL;
2210  return;
2211  }
2212 
2213  opt_metalink_.chain = new vector<string>(metalink_list);
2214 }
2215 
2216 
2221 void DownloadManager::GetMetalinkInfo(vector<string> *metalink_chain,
2222  unsigned *current_metalink) {
2223  const MutexLockGuard m(lock_options_);
2224  if (opt_metalink_.chain) {
2225  if (current_metalink) {
2226  *current_metalink = opt_metalink_.current;
2227  }
2228  if (metalink_chain) {
2229  *metalink_chain = *opt_metalink_.chain;
2230  }
2231  }
2232 }
2233 
2234 
2239 void DownloadManager::SetHostChain(const string &host_list) {
2240  SetHostChain(SplitString(host_list, ';'));
2241 }
2242 
2243 
2244 void DownloadManager::SetHostChain(const std::vector<std::string> &host_list) {
2245  const MutexLockGuard m(lock_options_);
2247  delete opt_host_.chain;
2248  delete opt_host_chain_rtt_;
2249  opt_host_.current = 0;
2250 
2251  if (host_list.empty()) {
2252  opt_host_.chain = NULL;
2253  opt_host_chain_rtt_ = NULL;
2254  return;
2255  }
2256 
2257  opt_host_.chain = new vector<string>(host_list);
2258  opt_host_chain_rtt_ = new vector<int>(opt_host_.chain->size(),
2259  kProbeUnprobed);
2260  // LogCvmfs(kLogDownload, kLogSyslog, "using host %s",
2261  // (*opt_host_.chain)[0].c_str());
2262 }
2263 
2264 
2269 void DownloadManager::GetHostInfo(vector<string> *host_chain, vector<int> *rtt,
2270  unsigned *current_host) {
2271  const MutexLockGuard m(lock_options_);
2272  if (opt_host_.chain) {
2273  if (current_host) {
2274  *current_host = opt_host_.current;
2275  }
2276  if (host_chain) {
2277  *host_chain = *opt_host_.chain;
2278  }
2279  if (rtt) {
2280  *rtt = *opt_host_chain_rtt_;
2281  }
2282  }
2283 }
2284 
2285 
2295  const MutexLockGuard m(lock_options_);
2296 
2297  if (!opt_proxy_groups_) {
2298  return;
2299  }
2300 
2301  // Fail any matching proxies within the current load-balancing group
2302  vector<ProxyInfo> *group = current_proxy_group();
2303  const unsigned group_size = group->size();
2304  unsigned failed = 0;
2305  for (unsigned i = 0; i < group_size - opt_proxy_groups_current_burned_; ++i) {
2306  if (info && (info->proxy() == (*group)[i].url)) {
2307  // Move to list of failed proxies
2308  opt_proxy_groups_current_burned_++;
2309  swap((*group)[i],
2310  (*group)[group_size - opt_proxy_groups_current_burned_]);
2312  failed++;
2313  }
2314  }
2315 
2316  // Do nothing more unless at least one proxy was marked as failed
2317  if (!failed)
2318  return;
2319 
2320  // If all proxies from the current load-balancing group are burned, switch to
2321  // another group
2322  if (opt_proxy_groups_current_burned_ == group->size()) {
2323  opt_proxy_groups_current_burned_ = 0;
2324  if (opt_proxy_groups_->size() > 1) {
2326  % opt_proxy_groups_->size();
2327  // Remember the timestamp of switching to backup proxies
2329  if (opt_proxy_groups_current_ > 0) {
2331  opt_timestamp_backup_proxies_ = time(NULL);
2332  // LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2333  // "switched to (another) backup proxy group");
2334  } else {
2336  // LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2337  // "switched back to primary proxy group");
2338  }
2340  }
2341  }
2342  } else {
2343  // Record failover time
2346  opt_timestamp_failover_proxies_ = time(NULL);
2347  }
2348  }
2349 
2350  UpdateProxiesUnlocked("failed proxy");
2352  "(manager '%s' - id %" PRId64 ") "
2353  "%lu proxies remain in group",
2354  name_.c_str(), info->id(),
2356 }
2357 
2358 
2364 void DownloadManager::SwitchHostInfo(const std::string &typ,
2365  HostInfo &info,
2366  JobInfo *jobinfo) {
2367  const MutexLockGuard m(lock_options_);
2368 
2369  if (!info.chain || (info.chain->size() == 1)) {
2370  return;
2371  }
2372 
2373  if (jobinfo) {
2374  int lastused;
2375  if (typ == "host") {
2376  lastused = jobinfo->current_host_chain_index();
2377  } else {
2378  lastused = jobinfo->current_metalink_chain_index();
2379  }
2380  if (lastused != info.current) {
2382  "(manager '%s' - id %" PRId64 ")"
2383  "don't switch %s, "
2384  "last used %s: %s, current %s: %s",
2385  name_.c_str(), jobinfo->id(), typ.c_str(), typ.c_str(),
2386  (*info.chain)[lastused].c_str(), typ.c_str(),
2387  (*info.chain)[info.current].c_str());
2388  return;
2389  }
2390  }
2391 
2392  string reason = "manually triggered";
2393  string info_id = "(manager " + name_;
2394  if (jobinfo) {
2395  reason = download::Code2Ascii(jobinfo->error_code());
2396  info_id = " - id " + StringifyInt(jobinfo->id());
2397  }
2398  info_id += ")";
2399 
2400  const std::string old_host = (*info.chain)[info.current];
2401  info.current = (info.current + 1) % static_cast<int>(info.chain->size());
2402  if (typ == "host") {
2404  } else {
2406  }
2408  "%s switching %s from %s to %s (%s)", info_id.c_str(), typ.c_str(),
2409  old_host.c_str(), (*info.chain)[info.current].c_str(),
2410  reason.c_str());
2411 
2412  // Remember the timestamp of switching to backup host
2413  if (info.reset_after > 0) {
2414  if (info.current != 0) {
2415  if (info.timestamp_backup == 0)
2416  info.timestamp_backup = time(NULL);
2417  } else {
2418  info.timestamp_backup = 0;
2419  }
2420  }
2421 }
2422 
2424  SwitchHostInfo("host", opt_host_, info);
2425 }
2426 
2428 
2429 
2431  SwitchHostInfo("metalink", opt_metalink_, info);
2432 }
2433 
2434 
2436 
2438  return (opt_metalink_.chain
2439  && ((opt_metalink_timestamp_link_ == 0)
2440  || (static_cast<int64_t>((now == 0) ? time(NULL) : now)
2441  > static_cast<int64_t>(opt_metalink_timestamp_link_
2442  + opt_metalink_.reset_after))));
2443 }
2444 
2445 
2453  vector<string> host_chain;
2454  vector<int> host_rtt;
2455  unsigned current_host;
2456 
2457  GetHostInfo(&host_chain, &host_rtt, &current_host);
2458 
2459  // Stopwatch, two times to fill caches first
2460  unsigned i, retries;
2461  string url;
2462 
2463  cvmfs::MemSink memsink;
2464  JobInfo info(&url, false, false, NULL, &memsink);
2465  for (retries = 0; retries < 2; ++retries) {
2466  for (i = 0; i < host_chain.size(); ++i) {
2467  url = host_chain[i] + "/.cvmfspublished";
2468 
2469  struct timeval tv_start, tv_end;
2470  gettimeofday(&tv_start, NULL);
2471  const Failures result = Fetch(&info);
2472  gettimeofday(&tv_end, NULL);
2473  memsink.Reset();
2474  if (result == kFailOk) {
2475  host_rtt[i] = static_cast<int>(DiffTimeSeconds(tv_start, tv_end)
2476  * 1000);
2478  "(manager '%s' - id %" PRId64 ") "
2479  "probing host %s had %dms rtt",
2480  name_.c_str(), info.id(), url.c_str(), host_rtt[i]);
2481  } else {
2483  "(manager '%s' - id %" PRId64 ") "
2484  "error while probing host %s: %d %s",
2485  name_.c_str(), info.id(), url.c_str(), result,
2486  Code2Ascii(result));
2487  host_rtt[i] = INT_MAX;
2488  }
2489  }
2490  }
2491 
2492  SortTeam(&host_rtt, &host_chain);
2493  for (i = 0; i < host_chain.size(); ++i) {
2494  if (host_rtt[i] == INT_MAX)
2495  host_rtt[i] = kProbeDown;
2496  }
2497 
2498  const MutexLockGuard m(lock_options_);
2499  delete opt_host_.chain;
2500  delete opt_host_chain_rtt_;
2501  opt_host_.chain = new vector<string>(host_chain);
2502  opt_host_chain_rtt_ = new vector<int>(host_rtt);
2503  opt_host_.current = 0;
2504 }
2505 
2506 bool DownloadManager::GeoSortServers(std::vector<std::string> *servers,
2507  std::vector<uint64_t> *output_order) {
2508  if (!servers) {
2509  return false;
2510  }
2511  if (servers->size() == 1) {
2512  if (output_order) {
2513  output_order->clear();
2514  output_order->push_back(0);
2515  }
2516  return true;
2517  }
2518 
2519  std::vector<std::string> host_chain;
2520  GetHostInfo(&host_chain, NULL, NULL);
2521 
2522  std::vector<std::string> server_dns_names;
2523  server_dns_names.reserve(servers->size());
2524  for (unsigned i = 0; i < servers->size(); ++i) {
2525  const std::string host = dns::ExtractHost((*servers)[i]);
2526  server_dns_names.push_back(host.empty() ? (*servers)[i] : host);
2527  }
2528  const std::string host_list = JoinStrings(server_dns_names, ",");
2529 
2530  vector<string> host_chain_shuffled;
2531  {
2532  // Protect against concurrent access to prng_
2533  const MutexLockGuard m(lock_options_);
2534  // Determine random hosts for the Geo-API query
2535  host_chain_shuffled = Shuffle(host_chain, &prng_);
2536  }
2537  // Request ordered list via Geo-API
2538  bool success = false;
2539  const unsigned max_attempts = std::min(host_chain_shuffled.size(), size_t(3));
2540  vector<uint64_t> geo_order(servers->size());
2541  for (unsigned i = 0; i < max_attempts; ++i) {
2542  const string url =
2543  host_chain_shuffled[i] + "/api/v1.0/geo/@proxy@/" + host_list;
2545  "(manager '%s') requesting ordered server list from %s",
2546  name_.c_str(), url.c_str());
2547  cvmfs::MemSink memsink;
2548  JobInfo info(&url, false, false, NULL, &memsink);
2549  const Failures result = Fetch(&info);
2550  if (result == kFailOk) {
2551  const string order(reinterpret_cast<char *>(memsink.data()),
2552  memsink.pos());
2553  memsink.Reset();
2554  const bool retval = ValidateGeoReply(order, servers->size(), &geo_order);
2555  if (!retval) {
2557  "(manager '%s') retrieved invalid GeoAPI reply from %s [%s]",
2558  name_.c_str(), url.c_str(), order.c_str());
2559  } else {
2561  "(manager '%s') "
2562  "geographic order of servers retrieved from %s",
2563  name_.c_str(),
2564  dns::ExtractHost(host_chain_shuffled[i]).c_str());
2565  // remove new line at end of "order"
2566  LogCvmfs(kLogDownload, kLogDebug, "order is %s",
2567  Trim(order, true /* trim_newline */).c_str());
2568  success = true;
2569  break;
2570  }
2571  } else {
2573  "(manager '%s') GeoAPI request for %s failed with error %d [%s]",
2574  name_.c_str(), url.c_str(), result, Code2Ascii(result));
2575  }
2576  }
2577  if (!success) {
2579  "(manager '%s') "
2580  "failed to retrieve geographic order from stratum 1 servers",
2581  name_.c_str());
2582  return false;
2583  }
2584 
2585  if (output_order) {
2586  output_order->swap(geo_order);
2587  } else {
2588  std::vector<std::string> sorted_servers;
2589  sorted_servers.reserve(geo_order.size());
2590  for (unsigned i = 0; i < geo_order.size(); ++i) {
2591  const uint64_t orderval = geo_order[i];
2592  sorted_servers.push_back((*servers)[orderval]);
2593  }
2594  servers->swap(sorted_servers);
2595  }
2596  return true;
2597 }
2598 
2599 
2608  vector<string> host_chain;
2609  vector<int> host_rtt;
2610  unsigned current_host;
2611  vector<vector<ProxyInfo> > proxy_chain;
2612  unsigned fallback_group;
2613 
2614  GetHostInfo(&host_chain, &host_rtt, &current_host);
2615  GetProxyInfo(&proxy_chain, NULL, &fallback_group);
2616  if ((host_chain.size() < 2) && ((proxy_chain.size() - fallback_group) < 2))
2617  return true;
2618 
2619  vector<string> host_names;
2620  for (unsigned i = 0; i < host_chain.size(); ++i)
2621  host_names.push_back(dns::ExtractHost(host_chain[i]));
2622  SortTeam(&host_names, &host_chain);
2623  const unsigned last_geo_host = host_names.size();
2624 
2625  if ((fallback_group == 0) && (last_geo_host > 1)) {
2626  // There are no non-fallback proxies, which means that the client
2627  // will always use the fallback proxies. Add a keyword separator
2628  // between the hosts and fallback proxies so the geosorting service
2629  // will know to sort the hosts based on the distance from the
2630  // closest fallback proxy rather than the distance from the client.
2631  host_names.push_back("+PXYSEP+");
2632  }
2633 
2634  // Add fallback proxy names to the end of the host list
2635  const unsigned first_geo_fallback = host_names.size();
2636  for (unsigned i = fallback_group; i < proxy_chain.size(); ++i) {
2637  // We only take the first fallback proxy name from every group under the
2638  // assumption that load-balanced servers are at the same location
2639  host_names.push_back(proxy_chain[i][0].host.name());
2640  }
2641 
2642  std::vector<uint64_t> geo_order;
2643  const bool success = GeoSortServers(&host_names, &geo_order);
2644  if (!success) {
2645  // GeoSortServers already logged a failure message.
2646  return false;
2647  }
2648 
2649  // Re-install host chain and proxy chain
2650  const MutexLockGuard m(lock_options_);
2651  delete opt_host_.chain;
2652  opt_num_proxies_ = 0;
2653  opt_host_.chain = new vector<string>(host_chain.size());
2654 
2655  // It's possible that opt_proxy_groups_fallback_ might have changed while
2656  // the lock wasn't held
2657  vector<vector<ProxyInfo> > *proxy_groups = new vector<vector<ProxyInfo> >(
2658  opt_proxy_groups_fallback_ + proxy_chain.size() - fallback_group);
2659  // First copy the non-fallback part of the current proxy chain
2660  for (unsigned i = 0; i < opt_proxy_groups_fallback_; ++i) {
2661  (*proxy_groups)[i] = (*opt_proxy_groups_)[i];
2662  opt_num_proxies_ += (*opt_proxy_groups_)[i].size();
2663  }
2664 
2665  // Copy the host chain and fallback proxies by geo order. Array indices
2666  // in geo_order that are smaller than last_geo_host refer to a stratum 1,
2667  // and those indices greater than or equal to first_geo_fallback refer to
2668  // a fallback proxy.
2669  unsigned hosti = 0;
2670  unsigned proxyi = opt_proxy_groups_fallback_;
2671  for (unsigned i = 0; i < geo_order.size(); ++i) {
2672  const uint64_t orderval = geo_order[i];
2673  if (orderval < static_cast<uint64_t>(last_geo_host)) {
2674  // LogCvmfs(kLogCvmfs, kLogSyslog, "this is orderval %u at host index
2675  // %u", orderval, hosti);
2676  (*opt_host_.chain)[hosti++] = host_chain[orderval];
2677  } else if (orderval >= static_cast<uint64_t>(first_geo_fallback)) {
2678  // LogCvmfs(kLogCvmfs, kLogSyslog,
2679  // "this is orderval %u at proxy index %u, using proxy_chain index %u",
2680  // orderval, proxyi, fallback_group + orderval - first_geo_fallback);
2681  (*proxy_groups)[proxyi] = proxy_chain[fallback_group + orderval
2682  - first_geo_fallback];
2683  opt_num_proxies_ += (*proxy_groups)[proxyi].size();
2684  proxyi++;
2685  }
2686  }
2687 
2688  opt_proxy_map_.clear();
2689  delete opt_proxy_groups_;
2690  opt_proxy_groups_ = proxy_groups;
2691  // In pathological cases, opt_proxy_groups_current_ can be larger now when
2692  // proxies changed in-between.
2694  if (opt_proxy_groups_->size() == 0) {
2696  } else {
2698  }
2700  }
2701 
2702  UpdateProxiesUnlocked("geosort");
2703 
2704  delete opt_host_chain_rtt_;
2705  opt_host_chain_rtt_ = new vector<int>(host_chain.size(), kProbeGeo);
2706  opt_host_.current = 0;
2707 
2708  return true;
2709 }
2710 
2711 
2719 bool DownloadManager::ValidateGeoReply(const string &reply_order,
2720  const unsigned expected_size,
2721  vector<uint64_t> *reply_vals) {
2722  if (reply_order.empty())
2723  return false;
2724  const sanitizer::InputSanitizer sanitizer("09 , \n");
2725  if (!sanitizer.IsValid(reply_order))
2726  return false;
2727  const sanitizer::InputSanitizer strip_newline("09 ,");
2728  vector<string> reply_strings = SplitString(strip_newline.Filter(reply_order),
2729  ',');
2730  vector<uint64_t> tmp_vals;
2731  for (unsigned i = 0; i < reply_strings.size(); ++i) {
2732  if (reply_strings[i].empty())
2733  return false;
2734  tmp_vals.push_back(String2Uint64(reply_strings[i]));
2735  }
2736  if (tmp_vals.size() != expected_size)
2737  return false;
2738 
2739  // Check if tmp_vals contains the number 1..n
2740  set<uint64_t> coverage(tmp_vals.begin(), tmp_vals.end());
2741  if (coverage.size() != tmp_vals.size())
2742  return false;
2743  if ((*coverage.begin() != 1) || (*coverage.rbegin() != coverage.size()))
2744  return false;
2745 
2746  for (unsigned i = 0; i < expected_size; ++i) {
2747  (*reply_vals)[i] = tmp_vals[i] - 1;
2748  }
2749  return true;
2750 }
2751 
2752 
2757 bool DownloadManager::StripDirect(const string &proxy_list,
2758  string *cleaned_list) {
2759  assert(cleaned_list);
2760  if (proxy_list == "") {
2761  *cleaned_list = "";
2762  return false;
2763  }
2764  bool result = false;
2765 
2766  vector<string> proxy_groups = SplitString(proxy_list, ';');
2767  vector<string> cleaned_groups;
2768  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2769  vector<string> group = SplitString(proxy_groups[i], '|');
2770  vector<string> cleaned;
2771  for (unsigned j = 0; j < group.size(); ++j) {
2772  if ((group[j] == "DIRECT") || (group[j] == "")) {
2773  result = true;
2774  } else {
2775  cleaned.push_back(group[j]);
2776  }
2777  }
2778  if (!cleaned.empty())
2779  cleaned_groups.push_back(JoinStrings(cleaned, "|"));
2780  }
2781 
2782  *cleaned_list = JoinStrings(cleaned_groups, ";");
2783  return result;
2784 }
2785 
2786 
2795 void DownloadManager::SetProxyChain(const string &proxy_list,
2796  const string &fallback_proxy_list,
2797  const ProxySetModes set_mode) {
2798  const MutexLockGuard m(lock_options_);
2799 
2802  string set_proxy_list = opt_proxy_list_;
2803  string set_proxy_fallback_list = opt_proxy_fallback_list_;
2804  bool contains_direct;
2805  if ((set_mode == kSetProxyFallback) || (set_mode == kSetProxyBoth)) {
2806  opt_proxy_fallback_list_ = fallback_proxy_list;
2807  }
2808  if ((set_mode == kSetProxyRegular) || (set_mode == kSetProxyBoth)) {
2809  opt_proxy_list_ = proxy_list;
2810  }
2811  contains_direct = StripDirect(opt_proxy_fallback_list_,
2812  &set_proxy_fallback_list);
2813  if (contains_direct) {
2815  "(manager '%s') fallback proxies do not support DIRECT, removing",
2816  name_.c_str());
2817  }
2818  if (set_proxy_fallback_list == "") {
2819  set_proxy_list = opt_proxy_list_;
2820  } else {
2821  const bool contains_direct = StripDirect(opt_proxy_list_, &set_proxy_list);
2822  if (contains_direct) {
2824  "(manager '%s') skipping DIRECT proxy to use fallback proxy",
2825  name_.c_str());
2826  }
2827  }
2828 
2829  // From this point on, use set_proxy_list and set_fallback_proxy_list as
2830  // effective proxy lists!
2831 
2832  opt_proxy_map_.clear();
2833  delete opt_proxy_groups_;
2834  if ((set_proxy_list == "") && (set_proxy_fallback_list == "")) {
2835  opt_proxy_groups_ = NULL;
2839  opt_num_proxies_ = 0;
2840  return;
2841  }
2842 
2843  // Determine number of regular proxy groups (== first fallback proxy group)
2845  if (set_proxy_list != "") {
2846  opt_proxy_groups_fallback_ = SplitString(set_proxy_list, ';').size();
2847  }
2849  "(manager '%s') "
2850  "first fallback proxy group %u",
2852 
2853  // Concatenate regular proxies and fallback proxies, both of which can be
2854  // empty.
2855  string all_proxy_list = set_proxy_list;
2856  if (set_proxy_fallback_list != "") {
2857  if (all_proxy_list != "")
2858  all_proxy_list += ";";
2859  all_proxy_list += set_proxy_fallback_list;
2860  }
2861  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s') full proxy list %s",
2862  name_.c_str(), all_proxy_list.c_str());
2863 
2864  // Resolve server names in provided urls
2865  vector<string> hostnames; // All encountered hostnames
2866  vector<string> proxy_groups;
2867  if (all_proxy_list != "")
2868  proxy_groups = SplitString(all_proxy_list, ';');
2869  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2870  vector<string> this_group = SplitString(proxy_groups[i], '|');
2871  for (unsigned j = 0; j < this_group.size(); ++j) {
2872  this_group[j] = dns::AddDefaultScheme(this_group[j]);
2873  // Note: DIRECT strings will be "extracted" to an empty string.
2874  const string hostname = dns::ExtractHost(this_group[j]);
2875  // Save the hostname. Leave empty (DIRECT) names so indexes will
2876  // match later.
2877  hostnames.push_back(hostname);
2878  }
2879  }
2880  vector<dns::Host> hosts;
2882  "(manager '%s') "
2883  "resolving %lu proxy addresses",
2884  name_.c_str(), hostnames.size());
2885  resolver_->ResolveMany(hostnames, &hosts);
2886 
2887  // Construct opt_proxy_groups_: traverse proxy list in same order and expand
2888  // names to resolved IP addresses.
2889  opt_proxy_groups_ = new vector<vector<ProxyInfo> >();
2890  opt_num_proxies_ = 0;
2891  unsigned num_proxy = 0; // Combined i, j counter
2892  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2893  vector<string> this_group = SplitString(proxy_groups[i], '|');
2894  // Construct ProxyInfo objects from proxy string and DNS resolver result for
2895  // every proxy in this_group. One URL can result in multiple ProxyInfo
2896  // objects, one for each IP address.
2897  vector<ProxyInfo> infos;
2898  for (unsigned j = 0; j < this_group.size(); ++j, ++num_proxy) {
2899  this_group[j] = dns::AddDefaultScheme(this_group[j]);
2900  if (this_group[j] == "DIRECT") {
2901  infos.push_back(ProxyInfo("DIRECT"));
2902  continue;
2903  }
2904 
2905  if (hosts[num_proxy].status() != dns::kFailOk) {
2907  "(manager '%s') "
2908  "failed to resolve IP addresses for %s (%d - %s)",
2909  name_.c_str(), hosts[num_proxy].name().c_str(),
2910  hosts[num_proxy].status(),
2911  dns::Code2Ascii(hosts[num_proxy].status()));
2912  const dns::Host failed_host =
2913  dns::Host::ExtendDeadline(hosts[num_proxy], resolver_->min_ttl());
2914  infos.push_back(ProxyInfo(failed_host, this_group[j]));
2915  continue;
2916  }
2917 
2918  // IPv4 addresses have precedence
2919  set<string> best_addresses = hosts[num_proxy].ViewBestAddresses(
2921  set<string>::const_iterator iter_ips = best_addresses.begin();
2922  for (; iter_ips != best_addresses.end(); ++iter_ips) {
2923  const string url_ip = dns::RewriteUrl(this_group[j], *iter_ips);
2924  infos.push_back(ProxyInfo(hosts[num_proxy], url_ip));
2925 
2926  if (sharding_policy_.UseCount() > 0) {
2927  sharding_policy_->AddProxy(url_ip);
2928  }
2929  }
2930  }
2931  opt_proxy_groups_->push_back(infos);
2932  opt_num_proxies_ += infos.size();
2933  }
2935  "(manager '%s') installed %u proxies in %lu load-balance groups",
2936  name_.c_str(), opt_num_proxies_, opt_proxy_groups_->size());
2939 
2940  // Select random start proxy from the first group.
2941  if (opt_proxy_groups_->size() > 0) {
2942  // Select random start proxy from the first group.
2943  UpdateProxiesUnlocked("set random start proxy from the first proxy group");
2944  }
2945 }
2946 
2947 
2954 void DownloadManager::GetProxyInfo(vector<vector<ProxyInfo> > *proxy_chain,
2955  unsigned *current_group,
2956  unsigned *fallback_group) {
2957  assert(proxy_chain != NULL);
2958  const MutexLockGuard m(lock_options_);
2959 
2960  if (!opt_proxy_groups_) {
2961  const vector< vector<ProxyInfo> > empty_chain;
2962  *proxy_chain = empty_chain;
2963  if (current_group != NULL)
2964  *current_group = 0;
2965  if (fallback_group != NULL)
2966  *fallback_group = 0;
2967  return;
2968  }
2969 
2970  *proxy_chain = *opt_proxy_groups_;
2971  if (current_group != NULL)
2972  *current_group = opt_proxy_groups_current_;
2973  if (fallback_group != NULL)
2974  *fallback_group = opt_proxy_groups_fallback_;
2975 }
2976 
2978 
2980  return opt_proxy_fallback_list_;
2981 }
2982 
2987  const shash::Any *hash) {
2988  if (!opt_proxy_groups_)
2989  return NULL;
2990 
2991  const uint32_t key = (hash ? hash->Partial32() : 0);
2992  const map<uint32_t, ProxyInfo *>::iterator it =
2993  opt_proxy_map_.lower_bound(key);
2994  ProxyInfo *proxy = it->second;
2995 
2996  return proxy;
2997 }
2998 
3002 void DownloadManager::UpdateProxiesUnlocked(const string &reason) {
3003  if (!opt_proxy_groups_)
3004  return;
3005 
3006  // Identify number of non-burned proxies within the current group
3007  vector<ProxyInfo> *group = current_proxy_group();
3008  const unsigned num_alive = (group->size() - opt_proxy_groups_current_burned_);
3009  const string old_proxy = JoinStrings(opt_proxies_, "|");
3010 
3011  // Rebuild proxy map and URL list
3012  opt_proxy_map_.clear();
3013  opt_proxies_.clear();
3014  const uint32_t max_key = 0xffffffffUL;
3015  if (opt_proxy_shard_) {
3016  // Build a consistent map with multiple entries for each proxy
3017  for (unsigned i = 0; i < num_alive; ++i) {
3018  ProxyInfo *proxy = &(*group)[i];
3019  shash::Any proxy_hash(shash::kSha1);
3020  HashString(proxy->url, &proxy_hash);
3021  Prng prng;
3022  prng.InitSeed(proxy_hash.Partial32());
3023  for (unsigned j = 0; j < kProxyMapScale; ++j) {
3024  const std::pair<uint32_t, ProxyInfo *> entry(prng.Next(max_key), proxy);
3025  opt_proxy_map_.insert(entry);
3026  }
3027  const std::string proxy_name =
3028  proxy->host.name().empty() ? "" : " (" + proxy->host.name() + ")";
3029  opt_proxies_.push_back(proxy->url + proxy_name);
3030  }
3031  // Ensure lower_bound() finds a value for all keys
3032  ProxyInfo *first_proxy = opt_proxy_map_.begin()->second;
3033  const std::pair<uint32_t, ProxyInfo *> last_entry(max_key, first_proxy);
3034  opt_proxy_map_.insert(last_entry);
3035  } else {
3036  // Build a map with a single entry for one randomly selected proxy
3037  const unsigned select = prng_.Next(num_alive);
3038  ProxyInfo *proxy = &(*group)[select];
3039  const std::pair<uint32_t, ProxyInfo *> entry(max_key, proxy);
3040  opt_proxy_map_.insert(entry);
3041  const std::string proxy_name =
3042  proxy->host.name().empty() ? "" : " (" + proxy->host.name() + ")";
3043  opt_proxies_.push_back(proxy->url + proxy_name);
3044  }
3045  sort(opt_proxies_.begin(), opt_proxies_.end());
3046 
3047  // Report any change in proxy usage
3048  const string new_proxy = JoinStrings(opt_proxies_, "|");
3049  const string curr_host = "Current host: "
3050  + (opt_host_.chain
3052  : "");
3053  if (new_proxy != old_proxy) {
3055  "(manager '%s') switching proxy from %s to %s. Reason: %s [%s]",
3056  name_.c_str(), (old_proxy.empty() ? "(none)" : old_proxy.c_str()),
3057  (new_proxy.empty() ? "(none)" : new_proxy.c_str()), reason.c_str(),
3058  curr_host.c_str());
3059  }
3060 }
3061 
3066  opt_proxy_shard_ = true;
3067  RebalanceProxiesUnlocked("enable sharding");
3068 }
3069 
3074 void DownloadManager::RebalanceProxiesUnlocked(const string &reason) {
3075  if (!opt_proxy_groups_)
3076  return;
3077 
3080  UpdateProxiesUnlocked(reason);
3081 }
3082 
3083 
3085  const MutexLockGuard m(lock_options_);
3086  RebalanceProxiesUnlocked("rebalance invoked manually");
3087 }
3088 
3089 
3094  const MutexLockGuard m(lock_options_);
3095 
3096  if (!opt_proxy_groups_ || (opt_proxy_groups_->size() < 2)) {
3097  return;
3098  }
3099 
3101  % opt_proxy_groups_->size();
3102  opt_timestamp_backup_proxies_ = time(NULL);
3103 
3104  const std::string msg =
3105  "switch to proxy group " + StringifyUint(opt_proxy_groups_current_);
3107 }
3108 
3109 
3110 void DownloadManager::SetProxyGroupResetDelay(const unsigned seconds) {
3111  const MutexLockGuard m(lock_options_);
3113  if (opt_proxy_groups_reset_after_ == 0) {
3116  }
3117 }
3118 
3119 
3120 void DownloadManager::SetMetalinkResetDelay(const unsigned seconds) {
3121  const MutexLockGuard m(lock_options_);
3122  opt_metalink_.reset_after = seconds;
3123  if (opt_metalink_.reset_after == 0)
3125 }
3126 
3127 
3128 void DownloadManager::SetHostResetDelay(const unsigned seconds) {
3129  const MutexLockGuard m(lock_options_);
3130  opt_host_.reset_after = seconds;
3131  if (opt_host_.reset_after == 0)
3133 }
3134 
3135 
3136 void DownloadManager::SetRetryParameters(const unsigned max_retries,
3137  const unsigned backoff_init_ms,
3138  const unsigned backoff_max_ms) {
3139  const MutexLockGuard m(lock_options_);
3140  opt_max_retries_ = max_retries;
3141  opt_backoff_init_ms_ = backoff_init_ms;
3142  opt_backoff_max_ms_ = backoff_max_ms;
3143 }
3144 
3145 
3147  const MutexLockGuard m(lock_options_);
3148  resolver_->set_throttle(limit);
3149 }
3150 
3151 
3152 void DownloadManager::SetProxyTemplates(const std::string &direct,
3153  const std::string &forced) {
3154  const MutexLockGuard m(lock_options_);
3155  proxy_template_direct_ = direct;
3156  proxy_template_forced_ = forced;
3157 }
3158 
3159 
3161 
3162 
3164 
3167 }
3168 
3170 
3171 void DownloadManager::AddHTTPTracingHeader(const std::string &header) {
3172  http_tracing_headers_.push_back(header);
3173 }
3174 
3177 }
3178 
3180  const bool success = false;
3181  switch (type) {
3182  default:
3183  LogCvmfs(
3185  "(manager '%s') "
3186  "Proposed sharding policy does not exist. Falling back to default",
3187  name_.c_str());
3188  }
3189  return success;
3190 }
3191 
3193  failover_indefinitely_ = true;
3194 }
3195 
3201  const perf::StatisticsTemplate &statistics,
3202  const std::string &cloned_name) {
3203  DownloadManager *clone = new DownloadManager(pool_max_handles_, statistics,
3204  cloned_name);
3205 
3209 
3210  if (!opt_dns_server_.empty())
3211  clone->SetDnsServer(opt_dns_server_);
3223  if (opt_host_.chain) {
3224  clone->opt_host_.chain = new vector<string>(*opt_host_.chain);
3225  clone->opt_host_chain_rtt_ = new vector<int>(*opt_host_chain_rtt_);
3226  }
3227 
3228  CloneProxyConfig(clone);
3237 
3238  clone->health_check_ = health_check_;
3241  clone->fqrn_ = fqrn_;
3242 
3243  return clone;
3244 }
3245 
3246 
3255  if (opt_proxy_groups_ == NULL)
3256  return;
3257 
3258  clone->opt_proxy_groups_ = new vector<vector<ProxyInfo> >(*opt_proxy_groups_);
3259  clone->UpdateProxiesUnlocked("cloned");
3260 }
3261 
3262 } // namespace download
unsigned opt_timeout_direct_
Definition: download.h:328
std::vector< std::string > http_tracing_headers_
Definition: download.h:344
bool StripDirect(const std::string &proxy_list, std::string *cleaned_list)
Definition: download.cc:2757
unsigned opt_low_speed_limit_
Definition: download.h:329
void HashString(const std::string &content, Any *any_digest)
Definition: hash.cc:267
Definition: prng.h:27
static const unsigned kDnsDefaultTimeoutMs
Definition: download.h:173
std::vector< T > Shuffle(const std::vector< T > &input, Prng *prng)
Definition: algorithm.h:50
unsigned throttle() const
Definition: dns.h:206
z_stream * GetZstreamPtr()
Definition: jobinfo.h:160
unsigned opt_backoff_init_ms_
Definition: download.h:331
void SetInfoHeader(char *info_header)
Definition: jobinfo.h:242
shash::ContextPtr * GetHashContextPtr()
Definition: jobinfo.h:165
uid_t uid() const
Definition: jobinfo.h:178
int64_t Xadd(class Counter *counter, const int64_t delta)
Definition: statistics.h:51
struct stat64 platform_stat64
const char * Code2Ascii(const Failures error)
Definition: dns.h:53
bool Write(const T &data)
Definition: pipe.h:123
int64_t id() const
Definition: dns.h:108
unsigned opt_proxy_groups_current_burned_
Definition: download.h:368
double DiffTimeSeconds(struct timeval start, struct timeval end)
Definition: algorithm.cc:31
unsigned opt_proxy_groups_reset_after_
Definition: download.h:462
bool probe_hosts() const
Definition: jobinfo.h:173
virtual bool IsCanceled()
Definition: interrupt.h:16
void SetUrlOptions(JobInfo *info)
Definition: download.cc:1027
SharedPtr< ShardingPolicy > sharding_policy_
Definition: download.h:407
StreamStates DecompressZStream2Sink(const void *buf, const int64_t size, z_stream *strm, cvmfs::Sink *sink)
Definition: compression.cc:233
void ResolveMany(const std::vector< std::string > &names, std::vector< Host > *hosts)
Definition: dns.cc:355
std::string opt_proxy_fallback_list_
Definition: download.h:385
void SetHostChain(const std::string &host_list)
bool CheckMetalinkChain(const time_t now)
Definition: download.cc:2437
static NormalResolver * Create(const bool ipv4_only, const unsigned retries, const unsigned timeout_ms)
Definition: dns.cc:1228
int64_t id() const
Definition: jobinfo.h:213
void SetMetalinkChain(const std::string &metalink_list)
void SetLowSpeedLimit(const unsigned low_speed_limit)
Definition: download.cc:2175
virtual int Purge()=0
DownloadManager(const unsigned max_pool_handles, const perf::StatisticsTemplate &statistics, const std::string &name="standard")
Definition: download.cc:1880
std::string proxy_template_direct_
Definition: download.h:446
std::vector< std::string > opt_proxies_
Definition: download.h:393
curl_slist ** GetHeadersPtr()
Definition: jobinfo.h:163
static const int kProbeGeo
Definition: download.h:170
#define PANIC(...)
Definition: exception.h:29
string Trim(const string &raw, bool trim_newline)
Definition: string.cc:466
string ReplaceAll(const string &haystack, const string &needle, const string &replace_by)
Definition: string.cc:516
void set_min_ttl(unsigned seconds)
Definition: dns.h:207
string JoinStrings(const vector< string > &strings, const string &joint)
Definition: string.cc:356
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:241
unsigned opt_proxy_groups_current_
Definition: download.h:363
void SetCurrentHostChainIndex(int current_host_chain_index)
Definition: jobinfo.h:275
virtual bool RequiresReserve()=0
bool ValidateGeoReply(const std::string &reply_order, const unsigned expected_size, std::vector< uint64_t > *reply_vals)
Definition: download.cc:2719
std::vector< ProxyInfo > * current_proxy_group() const
Definition: download.h:299
void DecompressInit(z_stream *strm)
Definition: compression.cc:190
const std::string * url() const
Definition: jobinfo.h:171
void * cred_data() const
Definition: jobinfo.h:180
time_t opt_timestamp_backup_proxies_
Definition: download.h:460
void SetProxyChain(const std::string &proxy_list, const std::string &fallback_proxy_list, const ProxySetModes set_mode)
Definition: download.cc:2795
std::string GetProxyList()
Definition: download.cc:2977
unsigned min_ttl() const
Definition: dns.h:208
void InitLocaltime()
Definition: prng.h:34
bool allow_failure() const
Definition: jobinfo.h:212
void GetMetalinkInfo(std::vector< std::string > *metalink_chain, unsigned *current_metalink)
Definition: download.cc:2221
std::set< CURL * > * pool_handles_inuse_
Definition: download.h:307
CURL * curl_handle() const
Definition: jobinfo.h:189
pthread_mutex_t * lock_options_
Definition: download.h:324
ProxyInfo * ChooseProxyUnlocked(const shash::Any *hash)
Definition: download.cc:2986
pthread_t thread_download_
Definition: download.h:314
std::string opt_proxy_list_
Definition: download.h:381
const std::string & name() const
Definition: dns.h:118
perf::Counter * sz_transfer_time
Definition: download.h:43
void SetTracingHeaderGid(char *tracing_header_gid)
Definition: jobinfo.h:246
assert((mem||(size==0))&&"Out Of Memory")
void SetNocache(bool nocache)
Definition: jobinfo.h:258
unsigned opt_proxy_groups_fallback_
Definition: download.h:373
void ReleaseCurlHandle(CURL *handle)
Definition: download.cc:900
bool IsValidDataTube()
Definition: jobinfo.h:148
static bool sortlinks(const std::string &s1, const std::string &s2)
Definition: download.cc:1390
StreamStates
Definition: compression.h:36
void SetTracingHeaderPid(char *tracing_header_pid)
Definition: jobinfo.h:243
void set_max_ttl(unsigned seconds)
Definition: dns.h:209
Tube< DataTubeElement > * GetDataTubePtr()
Definition: jobinfo.h:169
Algorithms algorithm
Definition: hash.h:122
char * tracing_header_gid() const
Definition: jobinfo.h:193
const std::string path()
Definition: sink_path.h:98
const std::set< std::string > & ViewBestAddresses(IpPreference preference) const
Definition: dns.cc:198
char * info_header() const
Definition: jobinfo.h:191
void SetDnsServer(const std::string &address)
Definition: download.cc:2110
int platform_stat(const char *path, platform_stat64 *buf)
DownloadManager * Clone(const perf::StatisticsTemplate &statistics, const std::string &cloned_name)
Definition: download.cc:3200
bool force_nocache() const
Definition: jobinfo.h:176
std::string StringifyUint(const uint64_t value)
Definition: string.cc:83
void DecompressFini(z_stream *strm)
Definition: compression.cc:204
virtual std::string Describe()=0
static void * MainDownload(void *data)
Definition: download.cc:568
void InitSeed(const uint64_t seed)
Definition: prng.h:32
void SetTimeout(const unsigned seconds_proxy, const unsigned seconds_direct)
Definition: download.cc:2162
void SetLink(const std::string &link)
Definition: jobinfo.h:257
std::string opt_dns_server_
Definition: download.h:326
void SwitchHostInfo(const std::string &typ, HostInfo &info, JobInfo *jobinfo)
Definition: download.cc:2364
off_t range_offset() const
Definition: jobinfo.h:186
bool nocache() const
Definition: jobinfo.h:199
char algorithm
unsigned char num_used_hosts() const
Definition: jobinfo.h:204
virtual bool IsValid()=0
bool follow_redirects() const
Definition: jobinfo.h:175
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:140
void Init(ContextPtr context)
Definition: hash.cc:164
int http_code() const
Definition: jobinfo.h:201
bool IsValid(const std::string &input) const
Definition: sanitizer.cc:112
Algorithms
Definition: hash.h:41
void CreateDataTube()
Definition: jobinfo.h:143
void SetNumUsedMetalinks(unsigned char num_used_metalinks)
Definition: jobinfo.h:264
bool FileExists(const std::string &path)
Definition: posix.cc:803
unsigned max_ttl() const
Definition: dns.h:210
Pipe< kPipeDownloadJobsResults > * GetPipeJobResultPtr()
Definition: jobinfo.h:166
void SetDnsTtlLimits(const unsigned min_seconds, const unsigned max_seconds)
Definition: download.cc:2143
static Failures PrepareDownloadDestination(JobInfo *info)
Definition: download.cc:116
perf::Counter * n_metalink_failover
Definition: download.h:46
void SetHttpCode(int http_code)
Definition: jobinfo.h:260
DataTubeAction action
Definition: jobinfo.h:50
const char * Code2Ascii(const Failures error)
bool head_request() const
Definition: jobinfo.h:174
unsigned char num_retries() const
Definition: jobinfo.h:205
std::vector< std::string > * chain
Definition: download.h:136
bool IsValidPipeJobResults()
Definition: jobinfo.h:141
std::vector< std::vector< ProxyInfo > > * opt_proxy_groups_
Definition: download.h:359
off_t range_size() const
Definition: jobinfo.h:187
void GetProxyInfo(std::vector< std::vector< ProxyInfo > > *proxy_chain, unsigned *current_group, unsigned *fallback_group)
Definition: download.cc:2954
void SetProxyGroupResetDelay(const unsigned seconds)
Definition: download.cc:3110
void SetCurrentMetalinkChainIndex(int current_metalink_chain_index)
Definition: jobinfo.h:272
atomic_int32 multi_threaded_
Definition: download.h:315
bool compressed() const
Definition: jobinfo.h:172
std::string AddDefaultScheme(const std::string &proxy)
Definition: dns.cc:182
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:306
cvmfs::Sink * sink() const
Definition: jobinfo.h:182
gid_t gid() const
Definition: jobinfo.h:179
dns::NormalResolver * resolver_
Definition: download.h:434
bool Interrupted(const std::string &fqrn, JobInfo *info)
Definition: download.cc:85
std::string proxy() const
Definition: jobinfo.h:197
dns::IpPreference opt_ip_preference_
Definition: download.h:439
Algorithms algorithm
Definition: hash.h:488
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:255
void UseSystemCertificatePath()
Definition: ssl.cc:72
Definition: dns.h:90
bool SetShardingPolicy(const ShardingPolicySelector type)
Definition: download.cc:3179
perf::Counter * n_host_failover
Definition: download.h:47
void UpdateProxiesUnlocked(const std::string &reason)
Definition: download.cc:3002
time_t opt_metalink_timestamp_link_
Definition: download.h:348
bool IsEquivalent(const Host &other) const
Definition: dns.cc:257
std::string link() const
Definition: jobinfo.h:198
bool IsProxyTransferError(const Failures error)
void SetNumRetries(unsigned char num_retries)
Definition: jobinfo.h:270
bool IsExpired() const
Definition: dns.cc:267
void set_throttle(const unsigned throttle)
Definition: dns.h:205
InterruptCue * interrupt_cue() const
Definition: jobinfo.h:181
void SetIpPreference(const dns::IpPreference preference)
Definition: download.cc:2151
virtual int Reset()=0
shash::ContextPtr hash_context() const
Definition: jobinfo.h:196
perf::Counter * n_requests
Definition: download.h:44
bool Read(T *data)
Definition: pipe.h:164
void Final(ContextPtr context, Any *any_digest)
Definition: hash.cc:221
void SetRetryParameters(const unsigned max_retries, const unsigned backoff_init_ms, const unsigned backoff_max_ms)
Definition: download.cc:3136
string StringifyInt(const int64_t value)
Definition: string.cc:77
char * tracing_header_uid() const
Definition: jobinfo.h:194
Failures error_code() const
Definition: jobinfo.h:200
void CloneProxyConfig(DownloadManager *clone)
Definition: download.cc:3247
void SetMaxIpaddrPerProxy(unsigned limit)
Definition: download.cc:3146
void EnableIgnoreSignatureFailures()
Definition: download.cc:3165
void Inc(class Counter *counter)
Definition: statistics.h:50
void * buffer
Definition: hash.h:489
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:279
std::string ExtractHost(const std::string &url)
Definition: dns.cc:108
void ** GetCredDataPtr()
Definition: jobinfo.h:162
std::vector< int > * opt_host_chain_rtt_
Definition: download.h:356
SslCertificateStore ssl_certificate_store_
Definition: download.h:475
std::string GetFallbackProxyList()
Definition: download.cc:2979
uint32_t Partial32() const
Definition: hash.h:382
void SetProxyTemplates(const std::string &direct, const std::string &forced)
Definition: download.cc:3152
IpPreference
Definition: dns.h:46
unsigned retries() const
Definition: dns.h:203
unsigned opt_backoff_max_ms_
Definition: download.h:332
void SetErrorCode(Failures error_code)
Definition: jobinfo.h:259
void SetNumUsedProxies(unsigned char num_used_proxies)
Definition: jobinfo.h:261
unsigned GetContextSize(const Algorithms algorithm)
Definition: hash.cc:148
std::string GetDnsServer() const
Definition: download.cc:2104
int current_metalink_chain_index() const
Definition: jobinfo.h:207
CredentialsAttachment * credentials_attachment_
Definition: download.h:464
struct pollfd * watch_fds_
Definition: download.h:319
void Update(const unsigned char *buffer, const unsigned buffer_length, ContextPtr context)
Definition: hash.cc:190
std::map< uint32_t, ProxyInfo * > opt_proxy_map_
Definition: download.h:389
uint64_t String2Uint64(const string &value)
Definition: string.cc:240
UniquePtr< Pipe< kPipeDownloadJobs > > pipe_jobs_
Definition: download.h:318
void CreatePipeJobResults()
Definition: jobinfo.h:137
Failures Fetch(JobInfo *info)
Definition: download.cc:1984
unsigned char num_used_metalinks() const
Definition: jobinfo.h:203
void SetCurlHandle(CURL *curl_handle)
Definition: jobinfo.h:240
void SetTracingHeaderUid(char *tracing_header_uid)
Definition: jobinfo.h:249
Definition: mutex.h:42
curl_slist * headers() const
Definition: jobinfo.h:190
const shash::Any * expected_hash() const
Definition: jobinfo.h:183
perf::Counter * n_proxy_failover
Definition: download.h:48
virtual int Flush()=0
void GetTimeout(unsigned *seconds_proxy, unsigned *seconds_direct)
Definition: download.cc:2184
void SetCredData(void *cred_data)
Definition: jobinfo.h:227
std::string Filter(const std::string &input) const
Definition: sanitizer.cc:105
std::string proxy_template_forced_
Definition: download.h:452
time_t opt_timestamp_failover_proxies_
Definition: download.h:461
void SetDnsParameters(const unsigned retries, const unsigned timeout_ms)
Definition: download.cc:2129
unsigned EscapeHeader(const std::string &header, char *escaped_buf, size_t buf_size)
Definition: download.cc:442
const std::string * extra_info() const
Definition: jobinfo.h:184
static Host ExtendDeadline(const Host &original, unsigned seconds_from_now)
Definition: dns.cc:223
UniquePtr< Pipe< kPipeThreadTerminator > > pipe_terminate_
Definition: download.h:316
void SetProxy(const std::string &proxy)
Definition: jobinfo.h:256
static const int kProbeUnprobed
Definition: download.h:161
unsigned backoff_ms() const
Definition: jobinfo.h:206
virtual int Reset()
Definition: sink_mem.cc:52
pid_t pid() const
Definition: jobinfo.h:177
unsigned char num_used_proxies() const
Definition: jobinfo.h:202
int current_host_chain_index() const
Definition: jobinfo.h:210
void SafeSleepMs(const unsigned ms)
Definition: posix.cc:2025
void SetMetalinkResetDelay(const unsigned seconds)
Definition: download.cc:3120
bool IsHostTransferError(const Failures error)
static const unsigned kDnsDefaultRetries
Definition: download.h:172
void SortTeam(std::vector< T > *tractor, std::vector< U > *towed)
Definition: algorithm.h:68
bool GeoSortServers(std::vector< std::string > *servers, std::vector< uint64_t > *output_order=NULL)
Definition: download.cc:2506
virtual bool Reserve(size_t size)=0
static const int kProbeDown
Definition: download.h:166
void SetNumUsedHosts(unsigned char num_used_hosts)
Definition: jobinfo.h:267
unsigned size
Definition: hash.h:490
void SetHeaders(curl_slist *headers)
Definition: jobinfo.h:241
Failures status() const
Definition: dns.h:119
static const unsigned kProxyMapScale
Definition: download.h:174
void GetHostInfo(std::vector< std::string > *host_chain, std::vector< int > *rtt, unsigned *current_host)
Definition: download.cc:2269
static void size_t size
Definition: smalloc.h:54
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
Definition: download.cc:1470
char * tracing_header_pid() const
Definition: jobinfo.h:192
void SetBackoffMs(unsigned backoff_ms)
Definition: jobinfo.h:271
SharedPtr< HealthCheck > health_check_
Definition: download.h:415
void SwitchProxy(JobInfo *info)
Definition: download.cc:2294
void AddHTTPTracingHeader(const std::string &header)
Definition: download.cc:3171
void SetCredentialsAttachment(CredentialsAttachment *ca)
Definition: download.cc:2096
void SetFollowRedirects(bool follow_redirects)
Definition: jobinfo.h:220
void RebalanceProxiesUnlocked(const std::string &reason)
Definition: download.cc:3074
pthread_mutex_t * lock_synchronous_mode_
Definition: download.h:325
virtual bool SetResolvers(const std::vector< std::string > &resolvers)
Definition: dns.cc:1258
void InitializeRequest(JobInfo *info, CURL *handle)
Definition: download.cc:918
static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, void *userp, void *socketp)
Definition: download.cc:489
string RewriteUrl(const string &url, const string &ip)
Definition: dns.cc:152
void SetHostResetDelay(const unsigned seconds)
Definition: download.cc:3128
uint32_t Next(const uint64_t boundary)
Definition: prng.h:44
virtual int64_t Write(const void *buf, uint64_t sz)=0
unsigned timeout_ms() const
Definition: dns.h:204
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545