CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
download.cc
Go to the documentation of this file.
1 
27 // TODO(jblomer): MS for time summing
28 // NOLINTNEXTLINE
29 #define __STDC_FORMAT_MACROS
30 
31 #include "cvmfs_config.h"
32 #include "download.h"
33 
34 #include <alloca.h>
35 #include <errno.h>
36 #include <inttypes.h>
37 #include <poll.h>
38 #include <pthread.h>
39 #include <signal.h>
40 #include <stdint.h>
41 #include <sys/time.h>
42 #include <unistd.h>
43 
44 #include <algorithm>
45 #include <cassert>
46 #include <cstdio>
47 #include <cstdlib>
48 #include <cstring>
49 #include <map>
50 #include <set>
51 #include <utility>
52 
53 #include "compression.h"
54 #include "crypto/hash.h"
55 #include "duplex_curl.h"
56 #include "interrupt.h"
57 #include "sanitizer.h"
58 #include "ssl.h"
59 #include "util/algorithm.h"
60 #include "util/atomic.h"
61 #include "util/concurrency.h"
62 #include "util/exception.h"
63 #include "util/logging.h"
64 #include "util/posix.h"
65 #include "util/prng.h"
66 #include "util/smalloc.h"
67 #include "util/string.h"
68 
69 using namespace std; // NOLINT
70 
71 namespace download {
72 
85 bool Interrupted(const std::string &fqrn, JobInfo *info) {
86  if (info->allow_failure()) {
87  return true;
88  }
89 
90  if (!fqrn.empty()) {
91  // it is up to the user the create this sentinel file ("pause_file") if
92  // CVMFS_FAILOVER_INDEFINITELY is used. It must be created during
93  // "cvmfs_config reload" and "cvmfs_config reload $fqrn"
94  std::string pause_file = std::string("/var/run/cvmfs/interrupt.") + fqrn;
95 
97  "(id %" PRId64 ") Interrupted(): checking for existence of %s",
98  info->id(), pause_file.c_str());
99  if (FileExists(pause_file)) {
101  "(id %" PRId64 ") Interrupt marker found - "
102  "Interrupting current download, this will EIO outstanding IO.",
103  info->id());
104  if (0 != unlink(pause_file.c_str())) {
106  "(id %" PRId64 ") Couldn't delete interrupt marker: errno=%d",
107  info->id(), errno);
108  }
109  return true;
110  }
111  }
112  return false;
113 }
114 
116  if (info->sink() != NULL && !info->sink()->IsValid()) {
117  cvmfs::PathSink* psink = dynamic_cast<cvmfs::PathSink*>(info->sink());
118  if (psink != NULL) {
120  "(id %" PRId64 ") Failed to open path %s: %s (errno=%d).",
121  info->id(), psink->path().c_str(), strerror(errno), errno);
122  return kFailLocalIO;
123  } else {
124  LogCvmfs(kLogDownload, kLogDebug, "(id %" PRId64 ") "
125  "Failed to create a valid sink: \n %s",
126  info->id(), info->sink()->Describe().c_str());
127  return kFailOther;
128  }
129  }
130 
131  return kFailOk;
132 }
133 
134 
138 static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb,
139  void *info_link)
140 {
141  const size_t num_bytes = size*nmemb;
142  const string header_line(static_cast<const char *>(ptr), num_bytes);
143  JobInfo *info = static_cast<JobInfo *>(info_link);
144 
145  // LogCvmfs(kLogDownload, kLogDebug, "REMOVE-ME: Header callback with %s",
146  // header_line.c_str());
147 
148  // Check http status codes
149  if (HasPrefix(header_line, "HTTP/1.", false)) {
150  if (header_line.length() < 10) {
151  return 0;
152  }
153 
154  unsigned i;
155  for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {}
156 
157  // Code is initialized to -1
158  if (header_line.length() > i+2) {
159  info->SetHttpCode(DownloadManager::ParseHttpCode(&header_line[i]));
160  }
161 
162  if ((info->http_code() / 100) == 2) {
163  return num_bytes;
164  } else if ((info->http_code() == 301) ||
165  (info->http_code() == 302) ||
166  (info->http_code() == 303) ||
167  (info->http_code() == 307))
168  {
169  if (!info->follow_redirects()) {
171  "(id %" PRId64 ") redirect support not enabled: %s",
172  info->id(), header_line.c_str());
174  return 0;
175  }
176  LogCvmfs(kLogDownload, kLogDebug, "(id %" PRId64 ") http redirect: %s",
177  info->id(), header_line.c_str());
178  // libcurl will handle this because of CURLOPT_FOLLOWLOCATION
179  return num_bytes;
180  } else {
182  "(id %" PRId64 ") http status error code: %s [%d]",
183  info->id(), header_line.c_str(), info->http_code());
184  if (((info->http_code() / 100) == 5) ||
185  (info->http_code() == 400) || (info->http_code() == 404))
186  {
187  // 5XX returned by host
188  // 400: error from the GeoAPI module
189  // 404: the stratum 1 does not have the newest files
191  } else if (info->http_code() == 429) {
192  // 429: rate throttling (we ignore the backoff hint for the time being)
194  } else {
195  info->SetErrorCode((info->proxy() == "DIRECT") ? kFailHostHttp :
197  }
198  return 0;
199  }
200  }
201 
202  // If needed: allocate space in sink
203  if (info->sink() != NULL && info->sink()->RequiresReserve() &&
204  HasPrefix(header_line, "CONTENT-LENGTH:", true))
205  {
206  char *tmp = reinterpret_cast<char *>(alloca(num_bytes+1));
207  uint64_t length = 0;
208  sscanf(header_line.c_str(), "%s %" PRIu64, tmp, &length);
209  if (length > 0) {
210  if (!info->sink()->Reserve(length)) {
211  LogCvmfs(kLogDownload, kLogDebug | kLogSyslogErr, "(id %" PRId64 ") "
212  "resource %s too large to store in memory (%" PRIu64 ")",
213  info->id(), info->url()->c_str(), length);
214  info->SetErrorCode(kFailTooBig);
215  return 0;
216  }
217  } else {
218  // Empty resource
219  info->sink()->Reserve(0);
220  }
221  } else if (HasPrefix(header_line, "LOCATION:", true)) {
222  // This comes along with redirects
223  LogCvmfs(kLogDownload, kLogDebug, "(id %" PRId64 ") %s",
224  info->id(), header_line.c_str());
225  } else if (HasPrefix(header_line, "X-SQUID-ERROR:", true)) {
226  // Reinterpret host error as proxy error
227  if (info->error_code() == kFailHostHttp) {
229  }
230  } else if (HasPrefix(header_line, "PROXY-STATUS:", true)) {
231  // Reinterpret host error as proxy error if applicable
232  if ((info->error_code() == kFailHostHttp) &&
233  (header_line.find("error=") != string::npos)) {
235  }
236  }
237 
238  return num_bytes;
239 }
240 
241 
245 static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb,
246  void *info_link)
247 {
248  const size_t num_bytes = size*nmemb;
249  JobInfo *info = static_cast<JobInfo *>(info_link);
250 
251  // TODO(heretherebedragons) remove if no error comes up
252  // as this means only jobinfo data request (and not header only)
253  // come here
254  assert(info->sink() != NULL);
255 
256  // LogCvmfs(kLogDownload, kLogDebug, "Data callback, %d bytes", num_bytes);
257 
258  if (num_bytes == 0)
259  return 0;
260 
261  if (info->expected_hash()) {
262  shash::Update(reinterpret_cast<unsigned char *>(ptr),
263  num_bytes, info->hash_context());
264  }
265 
266  if (info->compressed()) {
267  zlib::StreamStates retval =
268  zlib::DecompressZStream2Sink(ptr, static_cast<int64_t>(num_bytes),
269  info->GetZstreamPtr(), info->sink());
270  if (retval == zlib::kStreamDataError) {
272  "(id %" PRId64 ") failed to decompress %s",
273  info->id(), info->url()->c_str());
274  info->SetErrorCode(kFailBadData);
275  return 0;
276  } else if (retval == zlib::kStreamIOError) {
278  "(id %" PRId64 ") decompressing %s, local IO error",
279  info->id(), info->url()->c_str());
280  info->SetErrorCode(kFailLocalIO);
281  return 0;
282  }
283  } else {
284  int64_t written = info->sink()->Write(ptr, num_bytes);
285  if (written < 0 || static_cast<uint64_t>(written) != num_bytes) {
286  LogCvmfs(kLogDownload, kLogDebug, "(id %" PRId64 ") "
287  "Failed to perform write of %zu bytes to sink %s with errno %ld",
288  info->id(), num_bytes, info->sink()->Describe().c_str(), written);
289  }
290  }
291 
292  return num_bytes;
293 }
294 
295 #ifdef DEBUGMSG
296 static int CallbackCurlDebug(
297  CURL * handle,
298  curl_infotype type,
299  char *data,
300  size_t size,
301  void * /* clientp */)
302 {
303  JobInfo *info;
304  curl_easy_getinfo(handle, CURLINFO_PRIVATE, &info);
305 
306  std::string prefix = "(id " + StringifyInt(info->id()) + ") ";
307  switch (type) {
308  case CURLINFO_TEXT:
309  prefix += "{info} ";
310  break;
311  case CURLINFO_HEADER_IN:
312  prefix += "{header/recv} ";
313  break;
314  case CURLINFO_HEADER_OUT:
315  prefix += "{header/sent} ";
316  break;
317  case CURLINFO_DATA_IN:
318  if (size < 50) {
319  prefix += "{data/recv} ";
320  break;
321  } else {
322  LogCvmfs(kLogCurl, kLogDebug, "%s{data/recv} <snip>", prefix.c_str());
323  return 0;
324  }
325  case CURLINFO_DATA_OUT:
326  if (size < 50) {
327  prefix += "{data/sent} ";
328  break;
329  } else {
330  LogCvmfs(kLogCurl, kLogDebug, "%s{data/sent} <snip>", prefix.c_str());
331  return 0;
332  }
333  case CURLINFO_SSL_DATA_IN:
334  if (size < 50) {
335  prefix += "{ssldata/recv} ";
336  break;
337  } else {
338  LogCvmfs(kLogCurl, kLogDebug, "%s{ssldata/recv} <snip>",
339  prefix.c_str());
340  return 0;
341  }
342  case CURLINFO_SSL_DATA_OUT:
343  if (size < 50) {
344  prefix += "{ssldata/sent} ";
345  break;
346  } else {
347  LogCvmfs(kLogCurl, kLogDebug, "%s{ssldata/sent} <snip>",
348  prefix.c_str());
349  return 0;
350  }
351  default:
352  // just log the message
353  break;
354  }
355 
356  bool valid_char = true;
357  std::string msg(data, size);
358  for (size_t i = 0; i < msg.length(); ++i) {
359  if (msg[i] == '\0') {
360  msg[i] = '~';
361  }
362 
363  // verify that char is a valid printable char
364  if ((msg[i] < ' ' || msg[i] > '~')
365  && (msg[i] != 10 /*line feed*/
366  && msg[i] != 13 /*carriage return*/)) {
367  valid_char = false;
368  }
369  }
370 
371  if (!valid_char) {
372  msg = "<Non-plaintext sequence>";
373  }
374 
375  LogCvmfs(kLogCurl, kLogDebug, "%s%s",
376  prefix.c_str(), Trim(msg, true /* trim_newline */).c_str());
377  return 0;
378 }
379 #endif
380 
381 //------------------------------------------------------------------------------
382 
383 
384 const int DownloadManager::kProbeUnprobed = -1;
385 const int DownloadManager::kProbeDown = -2;
386 const int DownloadManager::kProbeGeo = -3;
387 
388 bool DownloadManager::EscapeUrlChar(unsigned char input, char output[3]) {
389  if (((input >= '0') && (input <= '9')) ||
390  ((input >= 'A') && (input <= 'Z')) ||
391  ((input >= 'a') && (input <= 'z')) ||
392  (input == '/') || (input == ':') || (input == '.') ||
393  (input == '@') ||
394  (input == '+') || (input == '-') ||
395  (input == '_') || (input == '~') ||
396  (input == '[') || (input == ']') || (input == ','))
397  {
398  output[0] = static_cast<char>(input);
399  return false;
400  }
401 
402  output[0] = '%';
403  output[1] = static_cast<char>(
404  (input / 16) + ((input / 16 <= 9) ? '0' : 'A'-10));
405  output[2] = static_cast<char>(
406  (input % 16) + ((input % 16 <= 9) ? '0' : 'A'-10));
407  return true;
408 }
409 
410 
415 string DownloadManager::EscapeUrl(const int64_t jobinfo_id, const string &url) {
416  string escaped;
417  escaped.reserve(url.length());
418 
419  char escaped_char[3];
420  for (unsigned i = 0, s = url.length(); i < s; ++i) {
421  if (EscapeUrlChar(url[i], escaped_char)) {
422  escaped.append(escaped_char, 3);
423  } else {
424  escaped.push_back(escaped_char[0]);
425  }
426  }
427  LogCvmfs(kLogDownload, kLogDebug, "(id %" PRId64 ") escaped %s to %s",
428  jobinfo_id, url.c_str(), escaped.c_str());
429 
430  return escaped;
431 }
432 
437 unsigned DownloadManager::EscapeHeader(const string &header,
438  char *escaped_buf,
439  size_t buf_size)
440 {
441  unsigned esc_pos = 0;
442  char escaped_char[3];
443  for (unsigned i = 0, s = header.size(); i < s; ++i) {
444  if (EscapeUrlChar(header[i], escaped_char)) {
445  for (unsigned j = 0; j < 3; ++j) {
446  if (escaped_buf) {
447  if (esc_pos >= buf_size)
448  return esc_pos;
449  escaped_buf[esc_pos] = escaped_char[j];
450  }
451  esc_pos++;
452  }
453  } else {
454  if (escaped_buf) {
455  if (esc_pos >= buf_size)
456  return esc_pos;
457  escaped_buf[esc_pos] = escaped_char[0];
458  }
459  esc_pos++;
460  }
461  }
462 
463  return esc_pos;
464 }
465 
469 int DownloadManager::ParseHttpCode(const char digits[3]) {
470  int result = 0;
471  int factor = 100;
472  for (int i = 0; i < 3; ++i) {
473  if ((digits[i] < '0') || (digits[i] > '9'))
474  return -1;
475  result += (digits[i] - '0') * factor;
476  factor /= 10;
477  }
478  return result;
479 }
480 
481 
485 int DownloadManager::CallbackCurlSocket(CURL * /* easy */,
486  curl_socket_t s,
487  int action,
488  void *userp,
489  void * /* socketp */)
490 {
491  // LogCvmfs(kLogDownload, kLogDebug, "CallbackCurlSocket called with easy "
492  // "handle %p, socket %d, action %d", easy, s, action);
493  DownloadManager *download_mgr = static_cast<DownloadManager *>(userp);
494  if (action == CURL_POLL_NONE)
495  return 0;
496 
497  // Find s in watch_fds_
498  unsigned index;
499 
500  // TODO(heretherebedragons) why start at index = 0 and not 2?
501  // fd[0] and fd[1] are fixed?
502  for (index = 0; index < download_mgr->watch_fds_inuse_; ++index) {
503  if (download_mgr->watch_fds_[index].fd == s)
504  break;
505  }
506  // Or create newly
507  if (index == download_mgr->watch_fds_inuse_) {
508  // Extend array if necessary
509  if (download_mgr->watch_fds_inuse_ == download_mgr->watch_fds_size_)
510  {
511  assert(download_mgr->watch_fds_size_ > 0);
512  download_mgr->watch_fds_size_ *= 2;
513  download_mgr->watch_fds_ = static_cast<struct pollfd *>(
514  srealloc(download_mgr->watch_fds_,
515  download_mgr->watch_fds_size_ * sizeof(struct pollfd)));
516  }
517  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].fd = s;
518  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].events = 0;
519  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].revents = 0;
520  download_mgr->watch_fds_inuse_++;
521  }
522 
523  switch (action) {
524  case CURL_POLL_IN:
525  download_mgr->watch_fds_[index].events = POLLIN | POLLPRI;
526  break;
527  case CURL_POLL_OUT:
528  download_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND;
529  break;
530  case CURL_POLL_INOUT:
531  download_mgr->watch_fds_[index].events =
532  POLLIN | POLLPRI | POLLOUT | POLLWRBAND;
533  break;
534  case CURL_POLL_REMOVE:
535  if (index < download_mgr->watch_fds_inuse_-1) {
536  download_mgr->watch_fds_[index] =
537  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_-1];
538  }
539  download_mgr->watch_fds_inuse_--;
540  // Shrink array if necessary
541  if ((download_mgr->watch_fds_inuse_ > download_mgr->watch_fds_max_) &&
542  (download_mgr->watch_fds_inuse_ < download_mgr->watch_fds_size_/2))
543  {
544  download_mgr->watch_fds_size_ /= 2;
545  // LogCvmfs(kLogDownload, kLogDebug, "shrinking watch_fds_ (%d)",
546  // watch_fds_size_);
547  download_mgr->watch_fds_ = static_cast<struct pollfd *>(
548  srealloc(download_mgr->watch_fds_,
549  download_mgr->watch_fds_size_*sizeof(struct pollfd)));
550  // LogCvmfs(kLogDownload, kLogDebug, "shrinking watch_fds_ done",
551  // watch_fds_size_);
552  }
553  break;
554  default:
555  break;
556  }
557 
558  return 0;
559 }
560 
561 
565 void *DownloadManager::MainDownload(void *data) {
566  DownloadManager *download_mgr = static_cast<DownloadManager *>(data);
568  "download I/O thread of DownloadManager '%s' started",
569  download_mgr->name_.c_str());
570 
571  const int kIdxPipeTerminate = 0;
572  const int kIdxPipeJobs = 1;
573 
574  download_mgr->watch_fds_ =
575  static_cast<struct pollfd *>(smalloc(2 * sizeof(struct pollfd)));
576  download_mgr->watch_fds_size_ = 2;
577  download_mgr->watch_fds_[kIdxPipeTerminate].fd =
578  download_mgr->pipe_terminate_->GetReadFd();
579  download_mgr->watch_fds_[kIdxPipeTerminate].events = POLLIN | POLLPRI;
580  download_mgr->watch_fds_[kIdxPipeTerminate].revents = 0;
581  download_mgr->watch_fds_[kIdxPipeJobs].fd =
582  download_mgr->pipe_jobs_->GetReadFd();
583  download_mgr->watch_fds_[kIdxPipeJobs].events = POLLIN | POLLPRI;
584  download_mgr->watch_fds_[kIdxPipeJobs].revents = 0;
585  download_mgr->watch_fds_inuse_ = 2;
586 
587  int still_running = 0;
588  struct timeval timeval_start, timeval_stop;
589  gettimeofday(&timeval_start, NULL);
590  while (true) {
591  int timeout;
592  if (still_running) {
593  /* NOTE: The following might degrade the performance for many small files
594  * use case. TODO(jblomer): look into it.
595  // Specify a timeout for polling in ms; this allows us to return
596  // to libcurl once a second so it can look for internal operations
597  // which timed out. libcurl has a more elaborate mechanism
598  // (CURLMOPT_TIMERFUNCTION) that would inform us of the next potential
599  // timeout. TODO(bbockelm) we should switch to that in the future.
600  timeout = 100;
601  */
602  timeout = 1;
603  } else {
604  timeout = -1;
605  gettimeofday(&timeval_stop, NULL);
606  int64_t delta = static_cast<int64_t>(
607  1000 * DiffTimeSeconds(timeval_start, timeval_stop));
608  perf::Xadd(download_mgr->counters_->sz_transfer_time, delta);
609  }
610  int retval = poll(download_mgr->watch_fds_, download_mgr->watch_fds_inuse_,
611  timeout);
612  if (retval < 0) {
613  continue;
614  }
615 
616  // Handle timeout
617  if (retval == 0) {
618  curl_multi_socket_action(download_mgr->curl_multi_,
619  CURL_SOCKET_TIMEOUT,
620  0,
621  &still_running);
622  }
623 
624  // Terminate I/O thread
625  if (download_mgr->watch_fds_[kIdxPipeTerminate].revents)
626  break;
627 
628  // New job arrives
629  if (download_mgr->watch_fds_[kIdxPipeJobs].revents) {
630  download_mgr->watch_fds_[kIdxPipeJobs].revents = 0;
631  JobInfo *info;
632  download_mgr->pipe_jobs_->Read<JobInfo*>(&info);
633  if (!still_running) {
634  gettimeofday(&timeval_start, NULL);
635  }
636  CURL *handle = download_mgr->AcquireCurlHandle();
637  download_mgr->InitializeRequest(info, handle);
638  download_mgr->SetUrlOptions(info);
639  curl_multi_add_handle(download_mgr->curl_multi_, handle);
640  curl_multi_socket_action(download_mgr->curl_multi_,
641  CURL_SOCKET_TIMEOUT,
642  0,
643  &still_running);
644  }
645 
646  // Activity on curl sockets
647  // Within this loop the curl_multi_socket_action() may cause socket(s)
648  // to be removed from watch_fds_. If a socket is removed it is replaced
649  // by the socket at the end of the array and the inuse count is decreased.
650  // Therefore loop over the array in reverse order.
651  for (int64_t i = download_mgr->watch_fds_inuse_-1; i >= 2; --i) {
652  if (i >= download_mgr->watch_fds_inuse_) {
653  continue;
654  }
655  if (download_mgr->watch_fds_[i].revents) {
656  int ev_bitmask = 0;
657  if (download_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI))
658  ev_bitmask |= CURL_CSELECT_IN;
659  if (download_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
660  ev_bitmask |= CURL_CSELECT_OUT;
661  if (download_mgr->watch_fds_[i].revents &
662  (POLLERR | POLLHUP | POLLNVAL))
663  {
664  ev_bitmask |= CURL_CSELECT_ERR;
665  }
666  download_mgr->watch_fds_[i].revents = 0;
667 
668  curl_multi_socket_action(download_mgr->curl_multi_,
669  download_mgr->watch_fds_[i].fd,
670  ev_bitmask,
671  &still_running);
672  }
673  }
674 
675  // Check if transfers are completed
676  CURLMsg *curl_msg;
677  int msgs_in_queue;
678  while ((curl_msg = curl_multi_info_read(download_mgr->curl_multi_,
679  &msgs_in_queue)))
680  {
681  if (curl_msg->msg == CURLMSG_DONE) {
682  perf::Inc(download_mgr->counters_->n_requests);
683  JobInfo *info;
684  CURL *easy_handle = curl_msg->easy_handle;
685  int curl_error = curl_msg->data.result;
686  curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
687 
688  int64_t redir_count;
689  curl_easy_getinfo(easy_handle, CURLINFO_REDIRECT_COUNT, &redir_count);
690  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s' - id %" PRId64 ") "
691  "Number of CURL redirects %" PRId64 ,
692  download_mgr->name_.c_str(), info->id(),
693  redir_count);
694 
695  curl_multi_remove_handle(download_mgr->curl_multi_, easy_handle);
696  if (download_mgr->VerifyAndFinalize(curl_error, info)) {
697  curl_multi_add_handle(download_mgr->curl_multi_, easy_handle);
698  curl_multi_socket_action(download_mgr->curl_multi_,
699  CURL_SOCKET_TIMEOUT,
700  0,
701  &still_running);
702  } else {
703  // Return easy handle into pool and write result back
704  download_mgr->ReleaseCurlHandle(easy_handle);
705 
707  info->GetDataTubePtr()->EnqueueBack(ele);
708  info->GetPipeJobResultPtr()->
709  Write<download::Failures>(info->error_code());
710  }
711  }
712  }
713  }
714 
715  for (set<CURL *>::iterator i = download_mgr->pool_handles_inuse_->begin(),
716  iEnd = download_mgr->pool_handles_inuse_->end(); i != iEnd; ++i)
717  {
718  curl_multi_remove_handle(download_mgr->curl_multi_, *i);
719  curl_easy_cleanup(*i);
720  }
721  download_mgr->pool_handles_inuse_->clear();
722  free(download_mgr->watch_fds_);
723 
725  "download I/O thread of DownloadManager '%s' terminated",
726  download_mgr->name_.c_str());
727  return NULL;
728 }
729 
730 
731 //------------------------------------------------------------------------------
732 
733 
734 HeaderLists::~HeaderLists() {
735  for (unsigned i = 0; i < blocks_.size(); ++i) {
736  delete[] blocks_[i];
737  }
738  blocks_.clear();
739 }
740 
741 
742 curl_slist *HeaderLists::GetList(const char *header) {
743  return Get(header);
744 }
745 
746 
747 curl_slist *HeaderLists::DuplicateList(curl_slist *slist) {
748  assert(slist);
749  curl_slist *copy = GetList(slist->data);
750  copy->next = slist->next;
751  curl_slist *prev = copy;
752  slist = slist->next;
753  while (slist) {
754  curl_slist *new_link = Get(slist->data);
755  new_link->next = slist->next;
756  prev->next = new_link;
757  prev = new_link;
758  slist = slist->next;
759  }
760  return copy;
761 }
762 
763 
764 void HeaderLists::AppendHeader(curl_slist *slist, const char *header) {
765  assert(slist);
766  curl_slist *new_link = Get(header);
767  new_link->next = NULL;
768 
769  while (slist->next)
770  slist = slist->next;
771  slist->next = new_link;
772 }
773 
774 
780 void HeaderLists::CutHeader(const char *header, curl_slist **slist) {
781  assert(slist);
782  curl_slist head;
783  head.next = *slist;
784  curl_slist *prev = &head;
785  curl_slist *rover = *slist;
786  while (rover) {
787  if (strcmp(rover->data, header) == 0) {
788  prev->next = rover->next;
789  Put(rover);
790  rover = prev;
791  }
792  prev = rover;
793  rover = rover->next;
794  }
795  *slist = head.next;
796 }
797 
798 
799 void HeaderLists::PutList(curl_slist *slist) {
800  while (slist) {
801  curl_slist *next = slist->next;
802  Put(slist);
803  slist = next;
804  }
805 }
806 
807 
808 string HeaderLists::Print(curl_slist *slist) {
809  string verbose;
810  while (slist) {
811  verbose += string(slist->data) + "\n";
812  slist = slist->next;
813  }
814  return verbose;
815 }
816 
817 
818 curl_slist *HeaderLists::Get(const char *header) {
819  for (unsigned i = 0; i < blocks_.size(); ++i) {
820  for (unsigned j = 0; j < kBlockSize; ++j) {
821  if (!IsUsed(&(blocks_[i][j]))) {
822  blocks_[i][j].data = const_cast<char *>(header);
823  return &(blocks_[i][j]);
824  }
825  }
826  }
827 
828  // All used, new block
829  AddBlock();
830  blocks_[blocks_.size()-1][0].data = const_cast<char *>(header);
831  return &(blocks_[blocks_.size()-1][0]);
832 }
833 
834 
835 void HeaderLists::Put(curl_slist *slist) {
836  slist->data = NULL;
837  slist->next = NULL;
838 }
839 
840 
841 void HeaderLists::AddBlock() {
842  curl_slist *new_block = new curl_slist[kBlockSize];
843  for (unsigned i = 0; i < kBlockSize; ++i) {
844  Put(&new_block[i]);
845  }
846  blocks_.push_back(new_block);
847 }
848 
849 
850 //------------------------------------------------------------------------------
851 
852 
853 string DownloadManager::ProxyInfo::Print() {
854  if (url == "DIRECT")
855  return url;
856 
857  string result = url;
858  int remaining =
859  static_cast<int>(host.deadline()) - static_cast<int>(time(NULL));
860  string expinfo = (remaining >= 0) ? "+" : "";
861  if (abs(remaining) >= 3600) {
862  expinfo += StringifyInt(remaining/3600) + "h";
863  } else if (abs(remaining) >= 60) {
864  expinfo += StringifyInt(remaining/60) + "m";
865  } else {
866  expinfo += StringifyInt(remaining) + "s";
867  }
868  if (host.status() == dns::kFailOk) {
869  result += " (" + host.name() + ", " + expinfo + ")";
870  } else {
871  result += " (:unresolved:, " + expinfo + ")";
872  }
873  return result;
874 }
875 
876 
881 CURL *DownloadManager::AcquireCurlHandle() {
882  CURL *handle;
883 
884  if (pool_handles_idle_->empty()) {
885  // Create a new handle
886  handle = curl_easy_init();
887  assert(handle != NULL);
888 
889  curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
890  // curl_easy_setopt(curl_default, CURLOPT_FAILONERROR, 1);
891  curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION, CallbackCurlHeader);
892  curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlData);
893  } else {
894  handle = *(pool_handles_idle_->begin());
895  pool_handles_idle_->erase(pool_handles_idle_->begin());
896  }
897 
898  pool_handles_inuse_->insert(handle);
899 
900  return handle;
901 }
902 
903 
904 void DownloadManager::ReleaseCurlHandle(CURL *handle) {
905  set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
906  assert(elem != pool_handles_inuse_->end());
907 
908  if (pool_handles_idle_->size() > pool_max_handles_) {
909  curl_easy_cleanup(*elem);
910  } else {
911  pool_handles_idle_->insert(*elem);
912  }
913 
914  pool_handles_inuse_->erase(elem);
915 }
916 
917 
922 void DownloadManager::InitializeRequest(JobInfo *info, CURL *handle) {
923  // Initialize internal download state
924  info->SetCurlHandle(handle);
925  info->SetErrorCode(kFailOk);
926  info->SetHttpCode(-1);
927  info->SetFollowRedirects(follow_redirects_);
928  info->SetNumUsedProxies(1);
929  info->SetNumUsedHosts(1);
930  info->SetNumRetries(0);
931  info->SetBackoffMs(0);
932  info->SetHeaders(header_lists_->DuplicateList(default_headers_));
933  if (info->info_header()) {
934  header_lists_->AppendHeader(info->headers(), info->info_header());
935  }
936  if (enable_http_tracing_) {
937  for (unsigned int i = 0; i < http_tracing_headers_.size(); i++) {
938  header_lists_->AppendHeader(info->headers(),
939  (http_tracing_headers_)[i].c_str());
940  }
941 
942  header_lists_->AppendHeader(info->headers(), info->tracing_header_pid());
943  header_lists_->AppendHeader(info->headers(), info->tracing_header_gid());
944  header_lists_->AppendHeader(info->headers(), info->tracing_header_uid());
945 
946  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s' - id %" PRId64 ") "
947  "CURL Header for URL: %s is:\n %s",
948  name_.c_str(), info->id(), info->url()->c_str(),
949  header_lists_->Print(info->headers()).c_str());
950  }
951 
952  if (info->force_nocache()) {
953  SetNocache(info);
954  } else {
955  info->SetNocache(false);
956  }
957  if (info->compressed()) {
959  }
960  if (info->expected_hash()) {
961  assert(info->hash_context().buffer != NULL);
962  shash::Init(info->hash_context());
963  }
964 
965  if ((info->range_offset() != -1) && (info->range_size())) {
966  char byte_range_array[100];
967  const int64_t range_lower = static_cast<int64_t>(info->range_offset());
968  const int64_t range_upper = static_cast<int64_t>(
969  info->range_offset() + info->range_size() - 1);
970  if (snprintf(byte_range_array, sizeof(byte_range_array),
971  "%" PRId64 "-%" PRId64,
972  range_lower, range_upper) == 100)
973  {
974  PANIC(NULL); // Should be impossible given limits on offset size.
975  }
976  curl_easy_setopt(handle, CURLOPT_RANGE, byte_range_array);
977  } else {
978  curl_easy_setopt(handle, CURLOPT_RANGE, NULL);
979  }
980 
981  // Set curl parameters
982  curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
983  curl_easy_setopt(handle, CURLOPT_WRITEHEADER,
984  static_cast<void *>(info));
985  curl_easy_setopt(handle, CURLOPT_WRITEDATA, static_cast<void *>(info));
986  curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->headers());
987  if (info->head_request()) {
988  curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
989  } else {
990  curl_easy_setopt(handle, CURLOPT_HTTPGET, 1);
991  }
992  if (opt_ipv4_only_) {
993  curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
994  }
995  if (follow_redirects_) {
996  curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1);
997  curl_easy_setopt(handle, CURLOPT_MAXREDIRS, 4);
998  }
999 #ifdef DEBUGMSG
1000  curl_easy_setopt(handle, CURLOPT_VERBOSE, 1);
1001  curl_easy_setopt(handle, CURLOPT_DEBUGFUNCTION, CallbackCurlDebug);
1002 #endif
1003 }
1004 
1005 
1010 void DownloadManager::SetUrlOptions(JobInfo *info) {
1011  CURL *curl_handle = info->curl_handle();
1012  string url_prefix;
1013 
1014  MutexLockGuard m(lock_options_);
1015 
1016  // sharding policy
1017  if (sharding_policy_.UseCount() > 0) {
1018  if (info->proxy() != "") {
1019  // proxy already set, so this is a failover event
1020  perf::Inc(counters_->n_proxy_failover);
1021  }
1022  info->SetProxy(sharding_policy_->GetNextProxy(info->url(), info->proxy(),
1023  info->range_offset() == -1 ? 0 : info->range_offset()));
1024 
1025  curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY, info->proxy().c_str());
1026  } else { // no sharding policy
1027  // Check if proxy group needs to be reset from backup to primary
1028  if (opt_timestamp_backup_proxies_ > 0) {
1029  const time_t now = time(NULL);
1030  if (static_cast<int64_t>(now) >
1031  static_cast<int64_t>(opt_timestamp_backup_proxies_ +
1032  opt_proxy_groups_reset_after_))
1033  {
1034  opt_proxy_groups_current_ = 0;
1035  opt_timestamp_backup_proxies_ = 0;
1036  RebalanceProxiesUnlocked("Reset proxy group from backup to primary");
1037  }
1038  }
1039  // Check if load-balanced proxies within the group need to be reset
1040  if (opt_timestamp_failover_proxies_ > 0) {
1041  const time_t now = time(NULL);
1042  if (static_cast<int64_t>(now) >
1043  static_cast<int64_t>(opt_timestamp_failover_proxies_ +
1044  opt_proxy_groups_reset_after_))
1045  {
1046  RebalanceProxiesUnlocked(
1047  "Reset load-balanced proxies within the active group");
1048  }
1049  }
1050  // Check if host needs to be reset
1051  if (opt_timestamp_backup_host_ > 0) {
1052  const time_t now = time(NULL);
1053  if (static_cast<int64_t>(now) >
1054  static_cast<int64_t>(opt_timestamp_backup_host_ +
1055  opt_host_reset_after_))
1056  {
1058  "(manager %s - id %" PRId64 ") "
1059  "switching host from %s to %s (reset host)", name_.c_str(),
1060  info->id(), (*opt_host_chain_)[opt_host_chain_current_].c_str(),
1061  (*opt_host_chain_)[0].c_str());
1062  opt_host_chain_current_ = 0;
1063  opt_timestamp_backup_host_ = 0;
1064  }
1065  }
1066 
1067  ProxyInfo *proxy = ChooseProxyUnlocked(info->expected_hash());
1068  if (!proxy || (proxy->url == "DIRECT")) {
1069  info->SetProxy("DIRECT");
1070  curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY, "");
1071  } else {
1072  // Note: inside ValidateProxyIpsUnlocked() we may change the proxy data
1073  // structure, so we must not pass proxy->... (== current_proxy())
1074  // parameters directly
1075  std::string purl = proxy->url;
1076  dns::Host phost = proxy->host;
1077  const bool changed = ValidateProxyIpsUnlocked(purl, phost);
1078  // Current proxy may have changed
1079  if (changed) {
1080  proxy = ChooseProxyUnlocked(info->expected_hash());
1081  }
1082  info->SetProxy(proxy->url);
1083  if (proxy->host.status() == dns::kFailOk) {
1084  curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY,
1085  info->proxy().c_str());
1086  } else {
1087  // We know it can't work, don't even try to download
1088  curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY, "0.0.0.0");
1089  }
1090  }
1091  } // end !sharding
1092 
1093  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, opt_low_speed_limit_);
1094  if (info->proxy() != "DIRECT") {
1095  curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_proxy_);
1096  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_proxy_);
1097  } else {
1098  curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_direct_);
1099  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_direct_);
1100  }
1101  if (!opt_dns_server_.empty())
1102  curl_easy_setopt(curl_handle, CURLOPT_DNS_SERVERS, opt_dns_server_.c_str());
1103 
1104  if (info->probe_hosts() && opt_host_chain_) {
1105  url_prefix = (*opt_host_chain_)[opt_host_chain_current_];
1106  info->SetCurrentHostChainIndex(opt_host_chain_current_);
1107  }
1108 
1109  string url = url_prefix + *(info->url());
1110 
1111  curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 1L);
1112  if (url.substr(0, 5) == "https") {
1113  bool rvb = ssl_certificate_store_.ApplySslCertificatePath(curl_handle);
1114  if (!rvb) {
1116  "(manager %s - id %" PRId64 ") "
1117  "Failed to set SSL certificate path %s", name_.c_str(),
1118  info->id(), ssl_certificate_store_.GetCaPath().c_str());
1119  }
1120  if (info->pid() != -1) {
1121  if (credentials_attachment_ == NULL) {
1122  LogCvmfs(kLogDownload, kLogDebug, "(manager %s - id %" PRId64 ") "
1123  "uses secure downloads but no credentials attachment set",
1124  name_.c_str(), info->id());
1125  } else {
1126  bool retval = credentials_attachment_->ConfigureCurlHandle(
1127  curl_handle, info->pid(), info->GetCredDataPtr());
1128  if (!retval) {
1129  LogCvmfs(kLogDownload, kLogDebug, "(manager %s - id %" PRId64 ") "
1130  "failed attaching credentials",
1131  name_.c_str(), info->id());
1132  }
1133  }
1134  }
1135  // The download manager disables signal handling in the curl library;
1136  // as OpenSSL's implementation of TLS will generate a sigpipe in some
1137  // error paths, we must explicitly disable SIGPIPE here.
1138  // TODO(jblomer): it should be enough to do this once
1139  signal(SIGPIPE, SIG_IGN);
1140  }
1141 
1142  if (url.find("@proxy@") != string::npos) {
1143  // This is used in Geo-API requests (only), to replace a portion of the
1144  // URL with the current proxy name for the sake of caching the result.
1145  // Replace the @proxy@ either with a passed in "forced" template (which
1146  // is set from $CVMFS_PROXY_TEMPLATE) if there is one, or a "direct"
1147  // template (which is the uuid) if there's no proxy, or the name of the
1148  // proxy.
1149  string replacement;
1150  if (proxy_template_forced_ != "") {
1151  replacement = proxy_template_forced_;
1152  } else if (info->proxy() == "DIRECT") {
1153  replacement = proxy_template_direct_;
1154  } else {
1155  if (opt_proxy_groups_current_ >= opt_proxy_groups_fallback_) {
1156  // It doesn't make sense to use the fallback proxies in Geo-API requests
1157  // since the fallback proxies are supposed to get sorted, too.
1158  info->SetProxy("DIRECT");
1159  curl_easy_setopt(info->curl_handle(), CURLOPT_PROXY, "");
1160  replacement = proxy_template_direct_;
1161  } else {
1162  replacement = ChooseProxyUnlocked(info->expected_hash())->host.name();
1163  }
1164  }
1165  replacement = (replacement == "") ? proxy_template_direct_ : replacement;
1166  LogCvmfs(kLogDownload, kLogDebug, "(manager %s - id %" PRId64 ") "
1167  "replacing @proxy@ by %s",
1168  name_.c_str(), info->id(), replacement.c_str());
1169  url = ReplaceAll(url, "@proxy@", replacement);
1170  }
1171 
1172  // TODO(heretherebedragons) before removing
1173  // static_cast<cvmfs::MemSink*>(info->sink)->size() == 0
1174  // and just always call info->sink->Reserve()
1175  // we should do a speed check
1176  if ((info->sink() != NULL) && info->sink()->RequiresReserve() &&
1177  (static_cast<cvmfs::MemSink*>(info->sink())->size() == 0) &&
1178  HasPrefix(url, "file://", false)) {
1179  platform_stat64 stat_buf;
1180  int retval = platform_stat(url.c_str(), &stat_buf);
1181  if (retval != 0) {
1182  // this is an error: file does not exist or out of memory
1183  // error is caught in other code section.
1184  info->sink()->Reserve(64ul * 1024ul);
1185  } else {
1186  info->sink()->Reserve(stat_buf.st_size);
1187  }
1188  }
1189 
1190  curl_easy_setopt(curl_handle, CURLOPT_URL,
1191  EscapeUrl(info->id(), url).c_str());
1192 }
1193 
1194 
1204 bool DownloadManager::ValidateProxyIpsUnlocked(
1205  const string &url,
1206  const dns::Host &host)
1207 {
1208  if (!host.IsExpired())
1209  return false;
1210  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s') validate DNS entry for %s",
1211  name_.c_str(), host.name().c_str());
1212 
1213  unsigned group_idx = opt_proxy_groups_current_;
1214  dns::Host new_host = resolver_->Resolve(host.name());
1215 
1216  bool update_only = true; // No changes to the list of IP addresses.
1217  if (new_host.status() != dns::kFailOk) {
1218  // Try again later in case resolving fails.
1220  "(manager '%s') failed to resolve IP addresses for %s (%d - %s)",
1221  name_.c_str(), host.name().c_str(), new_host.status(),
1222  dns::Code2Ascii(new_host.status()));
1223  new_host = dns::Host::ExtendDeadline(host, resolver_->min_ttl());
1224  } else if (!host.IsEquivalent(new_host)) {
1225  update_only = false;
1226  }
1227 
1228  if (update_only) {
1229  for (unsigned i = 0; i < (*opt_proxy_groups_)[group_idx].size(); ++i) {
1230  if ((*opt_proxy_groups_)[group_idx][i].host.id() == host.id())
1231  (*opt_proxy_groups_)[group_idx][i].host = new_host;
1232  }
1233  return false;
1234  }
1235 
1236  assert(new_host.status() == dns::kFailOk);
1237 
1238  // Remove old host objects, insert new objects, and rebalance.
1240  "(manager '%s') DNS entries for proxy %s changed, adjusting",
1241  name_.c_str(), host.name().c_str());
1242  vector<ProxyInfo> *group = current_proxy_group();
1243  opt_num_proxies_ -= group->size();
1244  for (unsigned i = 0; i < group->size(); ) {
1245  if ((*group)[i].host.id() == host.id()) {
1246  group->erase(group->begin() + i);
1247  } else {
1248  i++;
1249  }
1250  }
1251  vector<ProxyInfo> new_infos;
1252  set<string> best_addresses = new_host.ViewBestAddresses(opt_ip_preference_);
1253  set<string>::const_iterator iter_ips = best_addresses.begin();
1254  for (; iter_ips != best_addresses.end(); ++iter_ips) {
1255  string url_ip = dns::RewriteUrl(url, *iter_ips);
1256  new_infos.push_back(ProxyInfo(new_host, url_ip));
1257  }
1258  group->insert(group->end(), new_infos.begin(), new_infos.end());
1259  opt_num_proxies_ += new_infos.size();
1260 
1261  std::string msg = "DNS entries for proxy " + host.name() + " changed";
1262 
1263  RebalanceProxiesUnlocked(msg);
1264  return true;
1265 }
1266 
1267 
1271 void DownloadManager::UpdateStatistics(CURL *handle) {
1272  double val;
1273  int retval;
1274  int64_t sum = 0;
1275 
1276  retval = curl_easy_getinfo(handle, CURLINFO_SIZE_DOWNLOAD, &val);
1277  assert(retval == CURLE_OK);
1278  sum += static_cast<int64_t>(val);
1279  /*retval = curl_easy_getinfo(handle, CURLINFO_HEADER_SIZE, &val);
1280  assert(retval == CURLE_OK);
1281  sum += static_cast<int64_t>(val);*/
1282  perf::Xadd(counters_->sz_transferred_bytes, sum);
1283 }
1284 
1285 
1289 bool DownloadManager::CanRetry(const JobInfo *info) {
1290  MutexLockGuard m(lock_options_);
1291  unsigned max_retries = opt_max_retries_;
1292 
1293  return !(info->nocache()) && (info->num_retries() < max_retries) &&
1294  (IsProxyTransferError(info->error_code()) ||
1295  IsHostTransferError(info->error_code()));
1296 }
1297 
1305 void DownloadManager::Backoff(JobInfo *info) {
1306  unsigned backoff_init_ms = 0;
1307  unsigned backoff_max_ms = 0;
1308  {
1309  MutexLockGuard m(lock_options_);
1310  backoff_init_ms = opt_backoff_init_ms_;
1311  backoff_max_ms = opt_backoff_max_ms_;
1312  }
1313 
1314  info->SetNumRetries(info->num_retries() + 1);
1315  perf::Inc(counters_->n_retries);
1316  if (info->backoff_ms() == 0) {
1317  info->SetBackoffMs(prng_.Next(backoff_init_ms + 1)); // Must be != 0
1318  } else {
1319  info->SetBackoffMs(info->backoff_ms() * 2);
1320  }
1321  if (info->backoff_ms() > backoff_max_ms) {
1322  info->SetBackoffMs(backoff_max_ms);
1323  }
1324 
1326  "(manager '%s' - id %" PRId64 ") backing off for %d ms",
1327  name_.c_str(), info->id(), info->backoff_ms());
1328  SafeSleepMs(info->backoff_ms());
1329 }
1330 
1331 void DownloadManager::SetNocache(JobInfo *info) {
1332  if (info->nocache())
1333  return;
1334  header_lists_->AppendHeader(info->headers(), "Pragma: no-cache");
1335  header_lists_->AppendHeader(info->headers(), "Cache-Control: no-cache");
1336  curl_easy_setopt(info->curl_handle(), CURLOPT_HTTPHEADER, info->headers());
1337  info->SetNocache(true);
1338 }
1339 
1340 
1345 void DownloadManager::SetRegularCache(JobInfo *info) {
1346  if (info->nocache() == false)
1347  return;
1348  header_lists_->CutHeader("Pragma: no-cache", info->GetHeadersPtr());
1349  header_lists_->CutHeader("Cache-Control: no-cache", info->GetHeadersPtr());
1350  curl_easy_setopt(info->curl_handle(), CURLOPT_HTTPHEADER, info->headers());
1351  info->SetNocache(false);
1352 }
1353 
1354 
1358 void DownloadManager::ReleaseCredential(JobInfo *info) {
1359  if (info->cred_data()) {
1360  assert(credentials_attachment_ != NULL); // Someone must have set it
1361  credentials_attachment_->ReleaseCurlHandle(info->curl_handle(),
1362  info->cred_data());
1363  info->SetCredData(NULL);
1364  }
1365 }
1366 
1367 
1374 bool DownloadManager::VerifyAndFinalize(const int curl_error, JobInfo *info) {
1375  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s' - id %" PRId64 ") "
1376  "Verify downloaded url %s, proxy %s (curl error %d)",
1377  name_.c_str(), info->id(), info->url()->c_str(),
1378  info->proxy().c_str(), curl_error);
1379  UpdateStatistics(info->curl_handle());
1380 
1381  // Verification and error classification
1382  switch (curl_error) {
1383  case CURLE_OK:
1384  // Verify content hash
1385  if (info->expected_hash()) {
1386  shash::Any match_hash;
1387  shash::Final(info->hash_context(), &match_hash);
1388  if (match_hash != *(info->expected_hash())) {
1389  if (ignore_signature_failures_) {
1391  "(manager '%s' - id %" PRId64 ") "
1392  "ignoring failed hash verification of %s (expected %s, got %s)",
1393  name_.c_str(), info->id(), info->url()->c_str(),
1394  info->expected_hash()->ToString().c_str(),
1395  match_hash.ToString().c_str());
1396  } else {
1397  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s' - id %" PRId64 ") "
1398  "hash verification of %s failed (expected %s, got %s)",
1399  name_.c_str(), info->id(), info->url()->c_str(),
1400  info->expected_hash()->ToString().c_str(),
1401  match_hash.ToString().c_str());
1402  info->SetErrorCode(kFailBadData);
1403  break;
1404  }
1405  }
1406  }
1407 
1408  info->SetErrorCode(kFailOk);
1409  break;
1410  case CURLE_UNSUPPORTED_PROTOCOL:
1412  break;
1413  case CURLE_URL_MALFORMAT:
1414  info->SetErrorCode(kFailBadUrl);
1415  break;
1416  case CURLE_COULDNT_RESOLVE_PROXY:
1418  break;
1419  case CURLE_COULDNT_RESOLVE_HOST:
1421  break;
1422  case CURLE_OPERATION_TIMEDOUT:
1423  info->SetErrorCode((info->proxy() == "DIRECT") ?
1425  break;
1426  case CURLE_PARTIAL_FILE:
1427  case CURLE_GOT_NOTHING:
1428  case CURLE_RECV_ERROR:
1429  info->SetErrorCode((info->proxy() == "DIRECT") ?
1431  break;
1432  case CURLE_FILE_COULDNT_READ_FILE:
1433  case CURLE_COULDNT_CONNECT:
1434  if (info->proxy() != "DIRECT") {
1435  // This is a guess. Fail-over can still change to switching host
1437  } else {
1439  }
1440  break;
1441  case CURLE_TOO_MANY_REDIRECTS:
1443  break;
1444  case CURLE_SSL_CACERT_BADFILE:
1446  "(manager '%s' -id %" PRId64 ") "
1447  "Failed to load certificate bundle. "
1448  "X509_CERT_BUNDLE might point to the wrong location.",
1449  name_.c_str(), info->id());
1451  break;
1452  // As of curl 7.62.0, CURLE_SSL_CACERT is the same as
1453  // CURLE_PEER_FAILED_VERIFICATION
1454  case CURLE_PEER_FAILED_VERIFICATION:
1456  "(manager '%s' - id %" PRId64 ") "
1457  "invalid SSL certificate of remote host. "
1458  "X509_CERT_DIR and/or X509_CERT_BUNDLE might point to the wrong "
1459  "location.", name_.c_str(), info->id());
1461  break;
1462  case CURLE_ABORTED_BY_CALLBACK:
1463  case CURLE_WRITE_ERROR:
1464  // Error set by callback
1465  break;
1466  case CURLE_SEND_ERROR:
1467  // The curl error CURLE_SEND_ERROR can be seen when a cache is misbehaving
1468  // and closing connections before the http request send is completed.
1469  // Handle this error, treating it as a short transfer error.
1470  info->SetErrorCode((info->proxy() == "DIRECT") ?
1471  kFailHostShortTransfer : kFailProxyShortTransfer);
1472  break;
1473  default:
1474  LogCvmfs(kLogDownload, kLogSyslogErr, "(manager '%s' - id %" PRId64 ") "
1475  "unexpected curl error (%d) while trying to fetch %s",
1476  name_.c_str(), info->id(), curl_error, info->url()->c_str());
1477  info->SetErrorCode(kFailOther);
1478  break;
1479  }
1480 
1481  std::vector<std::string> *host_chain = opt_host_chain_;
1482 
1483  // Determination if download should be repeated
1484  bool try_again = false;
1485  bool same_url_retry = CanRetry(info);
1486  if (info->error_code() != kFailOk) {
1487  MutexLockGuard m(lock_options_);
1488  if (info->error_code() == kFailBadData) {
1489  if (!info->nocache()) {
1490  try_again = true;
1491  } else {
1492  // Make it a host failure
1494  "(manager '%s' - id %" PRId64 ") "
1495  "data corruption with no-cache header, try another host",
1496  name_.c_str(), info->id());
1497 
1498  info->SetErrorCode(kFailHostHttp);
1499  }
1500  }
1501  if ( same_url_retry || (
1502  ( (info->error_code() == kFailHostResolve) ||
1503  IsHostTransferError(info->error_code()) ||
1504  (info->error_code() == kFailHostHttp)) &&
1505  info->probe_hosts() &&
1506  host_chain && (info->num_used_hosts() < host_chain->size()))
1507  )
1508  {
1509  try_again = true;
1510  }
1511  if ( same_url_retry || (
1512  ( (info->error_code() == kFailProxyResolve) ||
1513  IsProxyTransferError(info->error_code()) ||
1514  (info->error_code() == kFailProxyHttp)) )
1515  )
1516  {
1517  if (sharding_policy_.UseCount() > 0) { // sharding policy
1518  try_again = true;
1519  same_url_retry = false;
1520  } else { // no sharding
1521  try_again = true;
1522  // If all proxies failed, do a next round with the next host
1523  if (!same_url_retry && (info->num_used_proxies() >= opt_num_proxies_)) {
1524  // Check if this can be made a host fail-over
1525  if (info->probe_hosts() &&
1526  host_chain &&
1527  (info->num_used_hosts() < host_chain->size()))
1528  {
1529  // reset proxy group if not already performed by other handle
1530  if (opt_proxy_groups_) {
1531  if ((opt_proxy_groups_current_ > 0) ||
1532  (opt_proxy_groups_current_burned_ > 0))
1533  {
1534  opt_proxy_groups_current_ = 0;
1535  opt_timestamp_backup_proxies_ = 0;
1536  RebalanceProxiesUnlocked("reset proxies for host failover");
1537  }
1538  }
1539 
1540  // Make it a host failure
1542  "(manager '%s' - id %" PRId64 ") make it a host failure",
1543  name_.c_str(), info->id());
1544  info->SetNumUsedProxies(1);
1546  } else {
1547  if (failover_indefinitely_) {
1548  // Instead of giving up, reset the num_used_proxies counter,
1549  // switch proxy and try again
1551  "(manager '%s' - id %" PRId64 ") "
1552  "VerifyAndFinalize() would fail the download here. "
1553  "Instead switch proxy and retry download. "
1554  "info->probe_hosts=%d host_chain=%p info->num_used_hosts=%d "
1555  "host_chain->size()=%lu same_url_retry=%d "
1556  "info->num_used_proxies=%d opt_num_proxies_=%d",
1557  name_.c_str(), info->id(),
1558  static_cast<int>(info->probe_hosts()),
1559  host_chain, info->num_used_hosts(),
1560  host_chain ?
1561  host_chain->size() : -1, static_cast<int>(same_url_retry),
1562  info->num_used_proxies(), opt_num_proxies_);
1563  info->SetNumUsedProxies(1);
1564  RebalanceProxiesUnlocked(
1565  "download failed - failover indefinitely");
1566  try_again = !Interrupted(fqrn_, info);
1567  } else {
1568  try_again = false;
1569  }
1570  }
1571  } // Make a proxy failure a host failure
1572  } // Proxy failure assumed
1573  } // end !sharding
1574  }
1575 
1576  if (try_again) {
1577  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s' - id %" PRId64 ") "
1578  "Trying again on same curl handle, same url: %d, "
1579  "error code %d no-cache %d",
1580  name_.c_str(), info->id(), same_url_retry,
1581  info->error_code(), info->nocache());
1582  // Reset internal state and destination
1583  if (info->sink() != NULL && info->sink()->Reset() != 0) {
1584  info->SetErrorCode(kFailLocalIO);
1585  goto verify_and_finalize_stop;
1586  }
1587  if (info->interrupt_cue() && info->interrupt_cue()->IsCanceled()) {
1588  info->SetErrorCode(kFailCanceled);
1589  goto verify_and_finalize_stop;
1590  }
1591 
1592  if (info->expected_hash()) {
1593  shash::Init(info->hash_context());
1594  }
1595  if (info->compressed()) {
1597  }
1598 
1599  if (sharding_policy_.UseCount() > 0) { // sharding policy
1600  ReleaseCredential(info);
1601  SetUrlOptions(info);
1602  } else { // no sharding policy
1603  SetRegularCache(info);
1604 
1605  // Failure handling
1606  bool switch_proxy = false;
1607  bool switch_host = false;
1608  switch (info->error_code()) {
1609  case kFailBadData:
1610  SetNocache(info);
1611  break;
1612  case kFailProxyResolve:
1613  case kFailProxyHttp:
1614  switch_proxy = true;
1615  break;
1616  case kFailHostResolve:
1617  case kFailHostHttp:
1618  case kFailHostAfterProxy:
1619  switch_host = true;
1620  break;
1621  default:
1622  if (IsProxyTransferError(info->error_code())) {
1623  if (same_url_retry) {
1624  Backoff(info);
1625  } else {
1626  switch_proxy = true;
1627  }
1628  } else if (IsHostTransferError(info->error_code())) {
1629  if (same_url_retry) {
1630  Backoff(info);
1631  } else {
1632  switch_host = true;
1633  }
1634  } else {
1635  // No other errors expected when retrying
1636  PANIC(NULL);
1637  }
1638  }
1639  if (switch_proxy) {
1640  ReleaseCredential(info);
1641  SwitchProxy(info);
1642  info->SetNumUsedProxies(info->num_used_proxies() + 1);
1643  SetUrlOptions(info);
1644  }
1645  if (switch_host) {
1646  ReleaseCredential(info);
1647  SwitchHost(info);
1648  info->SetNumUsedHosts(info->num_used_hosts() + 1);
1649  SetUrlOptions(info);
1650  }
1651  } // end !sharding
1652 
1653  if (failover_indefinitely_) {
1654  // try again, breaking if there's a cvmfs reload happening and we are in a
1655  // proxy failover. This will EIO the call application.
1656  return !Interrupted(fqrn_, info);
1657  }
1658  return true; // try again
1659  }
1660 
1661  verify_and_finalize_stop:
1662  // Finalize, flush destination file
1663  ReleaseCredential(info);
1664  if (info->sink() != NULL && info->sink()->Flush() != 0) {
1665  info->SetErrorCode(kFailLocalIO);
1666  }
1667 
1668  if (info->compressed())
1670 
1671  if (info->headers()) {
1672  header_lists_->PutList(info->headers());
1673  info->SetHeaders(NULL);
1674  }
1675 
1676  return false; // stop transfer and return to Fetch()
1677 }
1678 
1679 DownloadManager::~DownloadManager() {
1680  // cleaned up fini
1681  if (sharding_policy_.UseCount() > 0) {
1682  sharding_policy_.Reset();
1683  }
1684  if (health_check_.UseCount() > 0) {
1685  if (health_check_.Unique()) {
1687  "(manager '%s') Stopping healthcheck thread", name_.c_str());
1688  health_check_->StopHealthcheck();
1689  }
1690  health_check_.Reset();
1691  }
1692 
1693  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1694  // Shutdown I/O thread
1695  pipe_terminate_->Write(kPipeTerminateSignal);
1696  pthread_join(thread_download_, NULL);
1697  // All handles are removed from the multi stack
1698  pipe_terminate_.Destroy();
1699  pipe_jobs_.Destroy();
1700  }
1701 
1702  for (set<CURL *>::iterator i = pool_handles_idle_->begin(),
1703  iEnd = pool_handles_idle_->end(); i != iEnd; ++i)
1704  {
1705  curl_easy_cleanup(*i);
1706  }
1707 
1708  delete pool_handles_idle_;
1709  delete pool_handles_inuse_;
1710  curl_multi_cleanup(curl_multi_);
1711 
1712  delete header_lists_;
1713  if (user_agent_)
1714  free(user_agent_);
1715 
1716  delete counters_;
1717  delete opt_host_chain_;
1718  delete opt_host_chain_rtt_;
1719  delete opt_proxy_groups_;
1720 
1721  curl_global_cleanup();
1722  delete resolver_;
1723 
1724  // old destructor
1725  pthread_mutex_destroy(lock_options_);
1726  pthread_mutex_destroy(lock_synchronous_mode_);
1727  free(lock_options_);
1728  free(lock_synchronous_mode_);
1729 }
1730 
1731 void DownloadManager::InitHeaders() {
1732  // User-Agent
1733  string cernvm_id = "User-Agent: cvmfs ";
1734 #ifdef CVMFS_LIBCVMFS
1735  cernvm_id += "libcvmfs ";
1736 #else
1737  cernvm_id += "Fuse ";
1738 #endif
1739  cernvm_id += string(VERSION);
1740  if (getenv("CERNVM_UUID") != NULL) {
1741  cernvm_id += " " +
1742  sanitizer::InputSanitizer("az AZ 09 -").Filter(getenv("CERNVM_UUID"));
1743  }
1744  user_agent_ = strdup(cernvm_id.c_str());
1745 
1746  header_lists_ = new HeaderLists();
1747 
1748  default_headers_ = header_lists_->GetList("Connection: Keep-Alive");
1749  header_lists_->AppendHeader(default_headers_, "Pragma:");
1750  header_lists_->AppendHeader(default_headers_, user_agent_);
1751 }
1752 
1753 DownloadManager::DownloadManager(const unsigned max_pool_handles,
1754  const perf::StatisticsTemplate &statistics,
1755  const std::string &name) :
1756  prng_(Prng()),
1757  pool_handles_idle_(new set<CURL *>),
1758  pool_handles_inuse_(new set<CURL *>),
1759  pool_max_handles_(max_pool_handles),
1760  pipe_terminate_(NULL),
1761  pipe_jobs_(NULL),
1762  watch_fds_(NULL),
1763  watch_fds_size_(0),
1764  watch_fds_inuse_(0),
1765  watch_fds_max_(4 * max_pool_handles),
1766  opt_timeout_proxy_(5),
1767  opt_timeout_direct_(10),
1768  opt_low_speed_limit_(1024),
1769  opt_max_retries_(0),
1770  opt_backoff_init_ms_(0),
1771  opt_backoff_max_ms_(0),
1772  enable_info_header_(false),
1773  opt_ipv4_only_(false),
1774  follow_redirects_(false),
1775  ignore_signature_failures_(false),
1776  enable_http_tracing_(false),
1777  opt_host_chain_(NULL),
1778  opt_host_chain_rtt_(NULL),
1779  opt_host_chain_current_(0),
1780  opt_proxy_groups_(NULL),
1781  opt_proxy_groups_current_(0),
1782  opt_proxy_groups_current_burned_(0),
1783  opt_proxy_groups_fallback_(0),
1784  opt_num_proxies_(0),
1785  opt_proxy_shard_(false),
1786  failover_indefinitely_(false),
1787  name_(name),
1788  opt_ip_preference_(dns::kIpPreferSystem),
1789  opt_timestamp_backup_proxies_(0),
1790  opt_timestamp_failover_proxies_(0),
1791  opt_proxy_groups_reset_after_(0),
1792  opt_timestamp_backup_host_(0),
1793  opt_host_reset_after_(0),
1794  credentials_attachment_(NULL),
1795  counters_(new Counters(statistics))
1796 {
1797  atomic_init32(&multi_threaded_);
1798 
1799  lock_options_ =
1800  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1801  int retval = pthread_mutex_init(lock_options_, NULL);
1802  assert(retval == 0);
1804  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1805  retval = pthread_mutex_init(lock_synchronous_mode_, NULL);
1806  assert(retval == 0);
1807 
1808  retval = curl_global_init(CURL_GLOBAL_ALL);
1809  assert(retval == CURLE_OK);
1810 
1811  InitHeaders();
1812 
1813  curl_multi_ = curl_multi_init();
1814  assert(curl_multi_ != NULL);
1815  curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION, CallbackCurlSocket);
1816  curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1817  static_cast<void *>(this));
1818  curl_multi_setopt(curl_multi_, CURLMOPT_MAXCONNECTS, watch_fds_max_);
1819  curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1821 
1822  prng_.InitLocaltime();
1823 
1824  // Name resolving
1825  if ((getenv("CVMFS_IPV4_ONLY") != NULL) &&
1826  (strlen(getenv("CVMFS_IPV4_ONLY")) > 0))
1827  {
1828  opt_ipv4_only_ = true;
1829  }
1832  assert(resolver_);
1833 }
1834 
1842 
1843  int retval = pthread_create(&thread_download_, NULL, MainDownload,
1844  static_cast<void *>(this));
1845  assert(retval == 0);
1846 
1847  atomic_inc32(&multi_threaded_);
1848 
1849  if (health_check_.UseCount() > 0) {
1851  "(manager '%s') Starting healthcheck thread", name_.c_str());
1852  health_check_->StartHealthcheck();
1853  }
1854 }
1855 
1856 
1861  assert(info != NULL);
1862  assert(info->url() != NULL);
1863 
1864  Failures result;
1865  result = PrepareDownloadDestination(info);
1866  if (result != kFailOk)
1867  return result;
1868 
1869  if (info->expected_hash()) {
1872  info->GetHashContextPtr()->size = shash::GetContextSize(algorithm);
1873  info->GetHashContextPtr()->buffer = alloca(info->hash_context().size);
1874  }
1875 
1876  // Prepare cvmfs-info: header, allocate string on the stack
1877  info->SetInfoHeader(NULL);
1878  if (enable_info_header_ && info->extra_info()) {
1879  const char *header_name = "cvmfs-info: ";
1880  const size_t header_name_len = strlen(header_name);
1881  const unsigned header_size = 1 + header_name_len +
1882  EscapeHeader(*(info->extra_info()), NULL, 0);
1883  info->SetInfoHeader(static_cast<char *>(alloca(header_size)));
1884  memcpy(info->info_header(), header_name, header_name_len);
1885  EscapeHeader(*(info->extra_info()), info->info_header() + header_name_len,
1886  header_size - header_name_len);
1887  info->info_header()[header_size-1] = '\0';
1888  }
1889 
1890  if (enable_http_tracing_) {
1891  const std::string str_pid = "X-CVMFS-PID: " + StringifyInt(info->pid());
1892  const std::string str_gid = "X-CVMFS-GID: " + StringifyUint(info->gid());
1893  const std::string str_uid = "X-CVMFS-UID: " + StringifyUint(info->uid());
1894 
1895  // will be auto freed at the end of this function Fetch(JobInfo *info)
1896  info->SetTracingHeaderPid(static_cast<char *>(alloca(str_pid.size() + 1)));
1897  info->SetTracingHeaderGid(static_cast<char *>(alloca(str_gid.size() + 1)));
1898  info->SetTracingHeaderUid(static_cast<char *>(alloca(str_uid.size() + 1)));
1899 
1900  memcpy(info->tracing_header_pid(), str_pid.c_str(), str_pid.size() + 1);
1901  memcpy(info->tracing_header_gid(), str_gid.c_str(), str_gid.size() + 1);
1902  memcpy(info->tracing_header_uid(), str_uid.c_str(), str_uid.size() + 1);
1903  }
1904 
1905  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1906  if (!info->IsValidPipeJobResults()) {
1907  info->CreatePipeJobResults();
1908  }
1909  if (!info->IsValidDataTube()) {
1910  info->CreateDataTube();
1911  }
1912 
1913  // LogCvmfs(kLogDownload, kLogDebug, "send job to thread, pipe %d %d",
1914  // info->wait_at[0], info->wait_at[1]);
1915  pipe_jobs_->Write<JobInfo*>(info);
1916 
1917  do {
1918  DataTubeElement* ele = info->GetDataTubePtr()->PopFront();
1919 
1920  if (ele->action == kActionStop) {
1921  delete ele;
1922  break;
1923  }
1924  // TODO(heretherebedragons) add compression
1925  } while (true);
1926 
1927  info->GetPipeJobResultPtr()->Read<download::Failures>(&result);
1928  // LogCvmfs(kLogDownload, kLogDebug, "got result %d", result);
1929  } else {
1931  CURL *handle = AcquireCurlHandle();
1932  InitializeRequest(info, handle);
1933  SetUrlOptions(info);
1934  // curl_easy_setopt(handle, CURLOPT_VERBOSE, 1);
1935  int retval;
1936  do {
1937  retval = curl_easy_perform(handle);
1939  double elapsed;
1940  if (curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &elapsed) == CURLE_OK)
1941  {
1943  static_cast<int64_t>(elapsed * 1000));
1944  }
1945  } while (VerifyAndFinalize(retval, info));
1946  result = info->error_code();
1947  ReleaseCurlHandle(info->curl_handle());
1948  }
1949 
1950  if (result != kFailOk) {
1951  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s' - id %" PRId64 ") "
1952  "download failed (error %d - %s)",
1953  name_.c_str(),
1954  info->id(), result, Code2Ascii(result));
1955 
1956  if (info->sink() != NULL) {
1957  info->sink()->Purge();
1958  }
1959  }
1960 
1961  return result;
1962 }
1963 
1964 
1972 }
1973 
1977 std::string DownloadManager::GetDnsServer() const {
1978  return opt_dns_server_;
1979 }
1980 
1985 void DownloadManager::SetDnsServer(const string &address) {
1986  if (!address.empty()) {
1988  opt_dns_server_ = address;
1989  assert(!opt_dns_server_.empty());
1990 
1991  vector<string> servers;
1992  servers.push_back(address);
1993  bool retval = resolver_->SetResolvers(servers);
1994  assert(retval);
1995  }
1996  LogCvmfs(kLogDownload, kLogSyslog, "(manager '%s') set nameserver to %s",
1997  name_.c_str(), address.c_str());
1998 }
1999 
2000 
2005  const unsigned retries,
2006  const unsigned timeout_ms)
2007 {
2009  if ((resolver_->retries() == retries) &&
2010  (resolver_->timeout_ms() == timeout_ms))
2011  {
2012  return;
2013  }
2014  delete resolver_;
2015  resolver_ = NULL;
2016  resolver_ =
2017  dns::NormalResolver::Create(opt_ipv4_only_, retries, timeout_ms);
2018  assert(resolver_);
2019 }
2020 
2021 
2023  const unsigned min_seconds,
2024  const unsigned max_seconds)
2025 {
2027  resolver_->set_min_ttl(min_seconds);
2028  resolver_->set_max_ttl(max_seconds);
2029 }
2030 
2031 
2034  opt_ip_preference_ = preference;
2035 }
2036 
2037 
2043 void DownloadManager::SetTimeout(const unsigned seconds_proxy,
2044  const unsigned seconds_direct)
2045 {
2047  opt_timeout_proxy_ = seconds_proxy;
2048  opt_timeout_direct_ = seconds_direct;
2049 }
2050 
2051 
2057 void DownloadManager::SetLowSpeedLimit(const unsigned low_speed_limit) {
2059  opt_low_speed_limit_ = low_speed_limit;
2060 }
2061 
2062 
2066 void DownloadManager::GetTimeout(unsigned *seconds_proxy,
2067  unsigned *seconds_direct)
2068 {
2070  *seconds_proxy = opt_timeout_proxy_;
2071  *seconds_direct = opt_timeout_direct_;
2072 }
2073 
2074 
2079 void DownloadManager::SetHostChain(const string &host_list) {
2080  SetHostChain(SplitString(host_list, ';'));
2081 }
2082 
2083 
2084 void DownloadManager::SetHostChain(const std::vector<std::string> &host_list) {
2087  delete opt_host_chain_;
2088  delete opt_host_chain_rtt_;
2090 
2091  if (host_list.empty()) {
2092  opt_host_chain_ = NULL;
2093  opt_host_chain_rtt_ = NULL;
2094  return;
2095  }
2096 
2097  opt_host_chain_ = new vector<string>(host_list);
2099  new vector<int>(opt_host_chain_->size(), kProbeUnprobed);
2100  // LogCvmfs(kLogDownload, kLogSyslog, "using host %s",
2101  // (*opt_host_chain_)[0].c_str());
2102 }
2103 
2104 
2105 
2110 void DownloadManager::GetHostInfo(vector<string> *host_chain, vector<int> *rtt,
2111  unsigned *current_host)
2112 {
2114  if (opt_host_chain_) {
2115  if (current_host) {*current_host = opt_host_chain_current_;}
2116  if (host_chain) {*host_chain = *opt_host_chain_;}
2117  if (rtt) {*rtt = *opt_host_chain_rtt_;}
2118  }
2119 }
2120 
2121 
2132 
2133  if (!opt_proxy_groups_) {
2134  return;
2135  }
2136 
2137  // Fail any matching proxies within the current load-balancing group
2138  vector<ProxyInfo> *group = current_proxy_group();
2139  const unsigned group_size = group->size();
2140  unsigned failed = 0;
2141  for (unsigned i = 0; i < group_size - opt_proxy_groups_current_burned_; ++i) {
2142  if (info && (info->proxy() == (*group)[i].url)) {
2143  // Move to list of failed proxies
2144  opt_proxy_groups_current_burned_++;
2145  swap((*group)[i],
2146  (*group)[group_size - opt_proxy_groups_current_burned_]);
2148  failed++;
2149  }
2150  }
2151 
2152  // Do nothing more unless at least one proxy was marked as failed
2153  if (!failed)
2154  return;
2155 
2156  // If all proxies from the current load-balancing group are burned, switch to
2157  // another group
2158  if (opt_proxy_groups_current_burned_ == group->size()) {
2159  opt_proxy_groups_current_burned_ = 0;
2160  if (opt_proxy_groups_->size() > 1) {
2162  opt_proxy_groups_->size();
2163  // Remember the timestamp of switching to backup proxies
2165  if (opt_proxy_groups_current_ > 0) {
2167  opt_timestamp_backup_proxies_ = time(NULL);
2168  // LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2169  // "switched to (another) backup proxy group");
2170  } else {
2172  // LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2173  // "switched back to primary proxy group");
2174  }
2176  }
2177  }
2178  } else {
2179  // Record failover time
2182  opt_timestamp_failover_proxies_ = time(NULL);
2183  }
2184  }
2185 
2186  UpdateProxiesUnlocked("failed proxy");
2187  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s' - id %" PRId64 ") "
2188  "%lu proxies remain in group", name_.c_str(), info->id(),
2190 }
2191 
2192 
2200 
2201  if (!opt_host_chain_ || (opt_host_chain_->size() == 1)) {
2202  return;
2203  }
2204 
2205  if (info && (info->current_host_chain_index() != opt_host_chain_current_)) {
2207  "(manager '%s' - id %" PRId64 ")"
2208  "don't switch host, "
2209  "last used host: %s, current host: %s", name_.c_str(), info->id(),
2210  (*opt_host_chain_)[info->current_host_chain_index()].c_str(),
2211  (*opt_host_chain_)[opt_host_chain_current_].c_str());
2212  return;
2213  }
2214 
2215  string reason = "manually triggered";
2216  string info_id = "(manager " + name_;
2217  if (info) {
2218  reason = download::Code2Ascii(info->error_code());
2219  info_id = " - id " + StringifyInt(info->id());
2220  }
2221  info_id += ")";
2222 
2223  string old_host = (*opt_host_chain_)[opt_host_chain_current_];
2225  (opt_host_chain_current_ + 1) % opt_host_chain_->size();
2228  "%s switching host from %s to %s (%s)", info_id.c_str(),
2229  old_host.c_str(), (*opt_host_chain_)[opt_host_chain_current_].c_str(),
2230  reason.c_str());
2231 
2232  // Remember the timestamp of switching to backup host
2233  if (opt_host_reset_after_ > 0) {
2234  if (opt_host_chain_current_ != 0) {
2235  if (opt_timestamp_backup_host_ == 0)
2236  opt_timestamp_backup_host_ = time(NULL);
2237  } else {
2239  }
2240  }
2241 }
2242 
2244  SwitchHost(NULL);
2245 }
2246 
2247 
2255  vector<string> host_chain;
2256  vector<int> host_rtt;
2257  unsigned current_host;
2258 
2259  GetHostInfo(&host_chain, &host_rtt, &current_host);
2260 
2261  // Stopwatch, two times to fill caches first
2262  unsigned i, retries;
2263  string url;
2264 
2265  cvmfs::MemSink memsink;
2266  JobInfo info(&url, false, false, NULL, &memsink);
2267  for (retries = 0; retries < 2; ++retries) {
2268  for (i = 0; i < host_chain.size(); ++i) {
2269  url = host_chain[i] + "/.cvmfspublished";
2270 
2271  struct timeval tv_start, tv_end;
2272  gettimeofday(&tv_start, NULL);
2273  Failures result = Fetch(&info);
2274  gettimeofday(&tv_end, NULL);
2275  memsink.Reset();
2276  if (result == kFailOk) {
2277  host_rtt[i] = static_cast<int>(
2278  DiffTimeSeconds(tv_start, tv_end) * 1000);
2279  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s' - id %" PRId64 ") "
2280  "probing host %s had %dms rtt",
2281  name_.c_str(), info.id(),
2282  url.c_str(), host_rtt[i]);
2283  } else {
2284  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s' - id %" PRId64 ") "
2285  "error while probing host %s: %d %s",
2286  name_.c_str(), info.id(),
2287  url.c_str(), result, Code2Ascii(result));
2288  host_rtt[i] = INT_MAX;
2289  }
2290  }
2291  }
2292 
2293  SortTeam(&host_rtt, &host_chain);
2294  for (i = 0; i < host_chain.size(); ++i) {
2295  if (host_rtt[i] == INT_MAX) host_rtt[i] = kProbeDown;
2296  }
2297 
2299  delete opt_host_chain_;
2300  delete opt_host_chain_rtt_;
2301  opt_host_chain_ = new vector<string>(host_chain);
2302  opt_host_chain_rtt_ = new vector<int>(host_rtt);
2304 }
2305 
2306 bool DownloadManager::GeoSortServers(std::vector<std::string> *servers,
2307  std::vector<uint64_t> *output_order) {
2308  if (!servers) {return false;}
2309  if (servers->size() == 1) {
2310  if (output_order) {
2311  output_order->clear();
2312  output_order->push_back(0);
2313  }
2314  return true;
2315  }
2316 
2317  std::vector<std::string> host_chain;
2318  GetHostInfo(&host_chain, NULL, NULL);
2319 
2320  std::vector<std::string> server_dns_names;
2321  server_dns_names.reserve(servers->size());
2322  for (unsigned i = 0; i < servers->size(); ++i) {
2323  std::string host = dns::ExtractHost((*servers)[i]);
2324  server_dns_names.push_back(host.empty() ? (*servers)[i] : host);
2325  }
2326  std::string host_list = JoinStrings(server_dns_names, ",");
2327 
2328  vector<string> host_chain_shuffled;
2329  {
2330  // Protect against concurrent access to prng_
2332  // Determine random hosts for the Geo-API query
2333  host_chain_shuffled = Shuffle(host_chain, &prng_);
2334  }
2335  // Request ordered list via Geo-API
2336  bool success = false;
2337  unsigned max_attempts = std::min(host_chain_shuffled.size(), size_t(3));
2338  vector<uint64_t> geo_order(servers->size());
2339  for (unsigned i = 0; i < max_attempts; ++i) {
2340  string url = host_chain_shuffled[i] + "/api/v1.0/geo/@proxy@/" + host_list;
2342  "(manager '%s') requesting ordered server list from %s",
2343  name_.c_str(), url.c_str());
2344  cvmfs::MemSink memsink;
2345  JobInfo info(&url, false, false, NULL, &memsink);
2346  Failures result = Fetch(&info);
2347  if (result == kFailOk) {
2348  string order(reinterpret_cast<char*>(memsink.data()), memsink.pos());
2349  memsink.Reset();
2350  bool retval = ValidateGeoReply(order, servers->size(), &geo_order);
2351  if (!retval) {
2353  "(manager '%s') retrieved invalid GeoAPI reply from %s [%s]",
2354  name_.c_str(), url.c_str(), order.c_str());
2355  } else {
2356  LogCvmfs(kLogDownload, kLogDebug | kLogSyslog, "(manager '%s') "
2357  "geographic order of servers retrieved from %s",
2358  name_.c_str(),
2359  dns::ExtractHost(host_chain_shuffled[i]).c_str());
2360  // remove new line at end of "order"
2361  LogCvmfs(kLogDownload, kLogDebug, "order is %s",
2362  Trim(order, true /* trim_newline */).c_str());
2363  success = true;
2364  break;
2365  }
2366  } else {
2368  "(manager '%s') GeoAPI request %s failed with error %d [%s]",
2369  name_.c_str(), url.c_str(), result, Code2Ascii(result));
2370  }
2371  }
2372  if (!success) {
2373  LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn, "(manager '%s') "
2374  "failed to retrieve geographic order from stratum 1 servers",
2375  name_.c_str());
2376  return false;
2377  }
2378 
2379  if (output_order) {
2380  output_order->swap(geo_order);
2381  } else {
2382  std::vector<std::string> sorted_servers;
2383  sorted_servers.reserve(geo_order.size());
2384  for (unsigned i = 0; i < geo_order.size(); ++i) {
2385  uint64_t orderval = geo_order[i];
2386  sorted_servers.push_back((*servers)[orderval]);
2387  }
2388  servers->swap(sorted_servers);
2389  }
2390  return true;
2391 }
2392 
2393 
2402  vector<string> host_chain;
2403  vector<int> host_rtt;
2404  unsigned current_host;
2405  vector< vector<ProxyInfo> > proxy_chain;
2406  unsigned fallback_group;
2407 
2408  GetHostInfo(&host_chain, &host_rtt, &current_host);
2409  GetProxyInfo(&proxy_chain, NULL, &fallback_group);
2410  if ((host_chain.size() < 2) && ((proxy_chain.size() - fallback_group) < 2))
2411  return true;
2412 
2413  vector<string> host_names;
2414  for (unsigned i = 0; i < host_chain.size(); ++i)
2415  host_names.push_back(dns::ExtractHost(host_chain[i]));
2416  SortTeam(&host_names, &host_chain);
2417  unsigned last_geo_host = host_names.size();
2418 
2419  if ((fallback_group == 0) && (last_geo_host > 1)) {
2420  // There are no non-fallback proxies, which means that the client
2421  // will always use the fallback proxies. Add a keyword separator
2422  // between the hosts and fallback proxies so the geosorting service
2423  // will know to sort the hosts based on the distance from the
2424  // closest fallback proxy rather than the distance from the client.
2425  host_names.push_back("+PXYSEP+");
2426  }
2427 
2428  // Add fallback proxy names to the end of the host list
2429  unsigned first_geo_fallback = host_names.size();
2430  for (unsigned i = fallback_group; i < proxy_chain.size(); ++i) {
2431  // We only take the first fallback proxy name from every group under the
2432  // assumption that load-balanced servers are at the same location
2433  host_names.push_back(proxy_chain[i][0].host.name());
2434  }
2435 
2436  std::vector<uint64_t> geo_order;
2437  bool success = GeoSortServers(&host_names, &geo_order);
2438  if (!success) {
2439  // GeoSortServers already logged a failure message.
2440  return false;
2441  }
2442 
2443  // Re-install host chain and proxy chain
2445  delete opt_host_chain_;
2446  opt_num_proxies_ = 0;
2447  opt_host_chain_ = new vector<string>(host_chain.size());
2448 
2449  // It's possible that opt_proxy_groups_fallback_ might have changed while
2450  // the lock wasn't held
2451  vector<vector<ProxyInfo> > *proxy_groups = new vector<vector<ProxyInfo> >(
2452  opt_proxy_groups_fallback_ + proxy_chain.size() - fallback_group);
2453  // First copy the non-fallback part of the current proxy chain
2454  for (unsigned i = 0; i < opt_proxy_groups_fallback_; ++i) {
2455  (*proxy_groups)[i] = (*opt_proxy_groups_)[i];
2456  opt_num_proxies_ += (*opt_proxy_groups_)[i].size();
2457  }
2458 
2459  // Copy the host chain and fallback proxies by geo order. Array indices
2460  // in geo_order that are smaller than last_geo_host refer to a stratum 1,
2461  // and those indices greater than or equal to first_geo_fallback refer to
2462  // a fallback proxy.
2463  unsigned hosti = 0;
2464  unsigned proxyi = opt_proxy_groups_fallback_;
2465  for (unsigned i = 0; i < geo_order.size(); ++i) {
2466  uint64_t orderval = geo_order[i];
2467  if (orderval < static_cast<uint64_t>(last_geo_host)) {
2468  // LogCvmfs(kLogCvmfs, kLogSyslog, "this is orderval %u at host index
2469  // %u", orderval, hosti);
2470  (*opt_host_chain_)[hosti++] = host_chain[orderval];
2471  } else if (orderval >= static_cast<uint64_t>(first_geo_fallback)) {
2472  // LogCvmfs(kLogCvmfs, kLogSyslog,
2473  // "this is orderval %u at proxy index %u, using proxy_chain index %u",
2474  // orderval, proxyi, fallback_group + orderval - first_geo_fallback);
2475  (*proxy_groups)[proxyi] =
2476  proxy_chain[fallback_group + orderval - first_geo_fallback];
2477  opt_num_proxies_ += (*proxy_groups)[proxyi].size();
2478  proxyi++;
2479  }
2480  }
2481 
2482  opt_proxy_map_.clear();
2483  delete opt_proxy_groups_;
2484  opt_proxy_groups_ = proxy_groups;
2485  // In pathological cases, opt_proxy_groups_current_ can be larger now when
2486  // proxies changed in-between.
2488  if (opt_proxy_groups_->size() == 0) {
2490  } else {
2492  }
2494  }
2495 
2496  UpdateProxiesUnlocked("geosort");
2497 
2498  delete opt_host_chain_rtt_;
2499  opt_host_chain_rtt_ = new vector<int>(host_chain.size(), kProbeGeo);
2501 
2502  return true;
2503 }
2504 
2505 
2514  const string &reply_order,
2515  const unsigned expected_size,
2516  vector<uint64_t> *reply_vals)
2517 {
2518  if (reply_order.empty())
2519  return false;
2520  sanitizer::InputSanitizer sanitizer("09 , \n");
2521  if (!sanitizer.IsValid(reply_order))
2522  return false;
2523  sanitizer::InputSanitizer strip_newline("09 ,");
2524  vector<string> reply_strings =
2525  SplitString(strip_newline.Filter(reply_order), ',');
2526  vector<uint64_t> tmp_vals;
2527  for (unsigned i = 0; i < reply_strings.size(); ++i) {
2528  if (reply_strings[i].empty())
2529  return false;
2530  tmp_vals.push_back(String2Uint64(reply_strings[i]));
2531  }
2532  if (tmp_vals.size() != expected_size)
2533  return false;
2534 
2535  // Check if tmp_vals contains the number 1..n
2536  set<uint64_t> coverage(tmp_vals.begin(), tmp_vals.end());
2537  if (coverage.size() != tmp_vals.size())
2538  return false;
2539  if ((*coverage.begin() != 1) || (*coverage.rbegin() != coverage.size()))
2540  return false;
2541 
2542  for (unsigned i = 0; i < expected_size; ++i) {
2543  (*reply_vals)[i] = tmp_vals[i] - 1;
2544  }
2545  return true;
2546 }
2547 
2548 
2554  const string &proxy_list,
2555  string *cleaned_list)
2556 {
2557  assert(cleaned_list);
2558  if (proxy_list == "") {
2559  *cleaned_list = "";
2560  return false;
2561  }
2562  bool result = false;
2563 
2564  vector<string> proxy_groups = SplitString(proxy_list, ';');
2565  vector<string> cleaned_groups;
2566  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2567  vector<string> group = SplitString(proxy_groups[i], '|');
2568  vector<string> cleaned;
2569  for (unsigned j = 0; j < group.size(); ++j) {
2570  if ((group[j] == "DIRECT") || (group[j] == "")) {
2571  result = true;
2572  } else {
2573  cleaned.push_back(group[j]);
2574  }
2575  }
2576  if (!cleaned.empty())
2577  cleaned_groups.push_back(JoinStrings(cleaned, "|"));
2578  }
2579 
2580  *cleaned_list = JoinStrings(cleaned_groups, ";");
2581  return result;
2582 }
2583 
2584 
2594  const string &proxy_list,
2595  const string &fallback_proxy_list,
2596  const ProxySetModes set_mode)
2597 {
2599 
2602  string set_proxy_list = opt_proxy_list_;
2603  string set_proxy_fallback_list = opt_proxy_fallback_list_;
2604  bool contains_direct;
2605  if ((set_mode == kSetProxyFallback) || (set_mode == kSetProxyBoth)) {
2606  opt_proxy_fallback_list_ = fallback_proxy_list;
2607  }
2608  if ((set_mode == kSetProxyRegular) || (set_mode == kSetProxyBoth)) {
2609  opt_proxy_list_ = proxy_list;
2610  }
2611  contains_direct =
2612  StripDirect(opt_proxy_fallback_list_, &set_proxy_fallback_list);
2613  if (contains_direct) {
2615  "(manager '%s') fallback proxies do not support DIRECT, removing",
2616  name_.c_str());
2617  }
2618  if (set_proxy_fallback_list == "") {
2619  set_proxy_list = opt_proxy_list_;
2620  } else {
2621  bool contains_direct = StripDirect(opt_proxy_list_, &set_proxy_list);
2622  if (contains_direct) {
2624  "(manager '%s') skipping DIRECT proxy to use fallback proxy",
2625  name_.c_str());
2626  }
2627  }
2628 
2629  // From this point on, use set_proxy_list and set_fallback_proxy_list as
2630  // effective proxy lists!
2631 
2632  opt_proxy_map_.clear();
2633  delete opt_proxy_groups_;
2634  if ((set_proxy_list == "") && (set_proxy_fallback_list == "")) {
2635  opt_proxy_groups_ = NULL;
2639  opt_num_proxies_ = 0;
2640  return;
2641  }
2642 
2643  // Determine number of regular proxy groups (== first fallback proxy group)
2645  if (set_proxy_list != "") {
2646  opt_proxy_groups_fallback_ = SplitString(set_proxy_list, ';').size();
2647  }
2648  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s') "
2649  "first fallback proxy group %u",
2651 
2652  // Concatenate regular proxies and fallback proxies, both of which can be
2653  // empty.
2654  string all_proxy_list = set_proxy_list;
2655  if (set_proxy_fallback_list != "") {
2656  if (all_proxy_list != "")
2657  all_proxy_list += ";";
2658  all_proxy_list += set_proxy_fallback_list;
2659  }
2660  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s') full proxy list %s",
2661  name_.c_str(), all_proxy_list.c_str());
2662 
2663  // Resolve server names in provided urls
2664  vector<string> hostnames; // All encountered hostnames
2665  vector<string> proxy_groups;
2666  if (all_proxy_list != "")
2667  proxy_groups = SplitString(all_proxy_list, ';');
2668  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2669  vector<string> this_group = SplitString(proxy_groups[i], '|');
2670  for (unsigned j = 0; j < this_group.size(); ++j) {
2671  this_group[j] = dns::AddDefaultScheme(this_group[j]);
2672  // Note: DIRECT strings will be "extracted" to an empty string.
2673  string hostname = dns::ExtractHost(this_group[j]);
2674  // Save the hostname. Leave empty (DIRECT) names so indexes will
2675  // match later.
2676  hostnames.push_back(hostname);
2677  }
2678  }
2679  vector<dns::Host> hosts;
2680  LogCvmfs(kLogDownload, kLogDebug, "(manager '%s') "
2681  "resolving %lu proxy addresses",
2682  name_.c_str(), hostnames.size());
2683  resolver_->ResolveMany(hostnames, &hosts);
2684 
2685  // Construct opt_proxy_groups_: traverse proxy list in same order and expand
2686  // names to resolved IP addresses.
2687  opt_proxy_groups_ = new vector< vector<ProxyInfo> >();
2688  opt_num_proxies_ = 0;
2689  unsigned num_proxy = 0; // Combined i, j counter
2690  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2691  vector<string> this_group = SplitString(proxy_groups[i], '|');
2692  // Construct ProxyInfo objects from proxy string and DNS resolver result for
2693  // every proxy in this_group. One URL can result in multiple ProxyInfo
2694  // objects, one for each IP address.
2695  vector<ProxyInfo> infos;
2696  for (unsigned j = 0; j < this_group.size(); ++j, ++num_proxy) {
2697  this_group[j] = dns::AddDefaultScheme(this_group[j]);
2698  if (this_group[j] == "DIRECT") {
2699  infos.push_back(ProxyInfo("DIRECT"));
2700  continue;
2701  }
2702 
2703  if (hosts[num_proxy].status() != dns::kFailOk) {
2704  LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn, "(manager '%s') "
2705  "failed to resolve IP addresses for %s (%d - %s)", name_.c_str(),
2706  hosts[num_proxy].name().c_str(), hosts[num_proxy].status(),
2707  dns::Code2Ascii(hosts[num_proxy].status()));
2708  dns::Host failed_host =
2709  dns::Host::ExtendDeadline(hosts[num_proxy], resolver_->min_ttl());
2710  infos.push_back(ProxyInfo(failed_host, this_group[j]));
2711  continue;
2712  }
2713 
2714  // IPv4 addresses have precedence
2715  set<string> best_addresses =
2716  hosts[num_proxy].ViewBestAddresses(opt_ip_preference_);
2717  set<string>::const_iterator iter_ips = best_addresses.begin();
2718  for (; iter_ips != best_addresses.end(); ++iter_ips) {
2719  string url_ip = dns::RewriteUrl(this_group[j], *iter_ips);
2720  infos.push_back(ProxyInfo(hosts[num_proxy], url_ip));
2721 
2722  if (sharding_policy_.UseCount() > 0) {
2723  sharding_policy_->AddProxy(url_ip);
2724  }
2725  }
2726  }
2727  opt_proxy_groups_->push_back(infos);
2728  opt_num_proxies_ += infos.size();
2729  }
2731  "(manager '%s') installed %u proxies in %lu load-balance groups",
2732  name_.c_str(), opt_num_proxies_, opt_proxy_groups_->size());
2735 
2736  // Select random start proxy from the first group.
2737  if (opt_proxy_groups_->size() > 0) {
2738  // Select random start proxy from the first group.
2739  UpdateProxiesUnlocked("set random start proxy from the first proxy group");
2740  }
2741 }
2742 
2743 
2750 void DownloadManager::GetProxyInfo(vector< vector<ProxyInfo> > *proxy_chain,
2751  unsigned *current_group,
2752  unsigned *fallback_group)
2753 {
2754  assert(proxy_chain != NULL);
2756 
2757 
2758  if (!opt_proxy_groups_) {
2759  vector< vector<ProxyInfo> > empty_chain;
2760  *proxy_chain = empty_chain;
2761  if (current_group != NULL)
2762  *current_group = 0;
2763  if (fallback_group != NULL)
2764  *fallback_group = 0;
2765  return;
2766  }
2767 
2768  *proxy_chain = *opt_proxy_groups_;
2769  if (current_group != NULL)
2770  *current_group = opt_proxy_groups_current_;
2771  if (fallback_group != NULL)
2772  *fallback_group = opt_proxy_groups_fallback_;
2773 }
2774 
2776  return opt_proxy_list_;
2777 }
2778 
2780  return opt_proxy_fallback_list_;
2781 }
2782 
2788  if (!opt_proxy_groups_)
2789  return NULL;
2790 
2791  uint32_t key = (hash ? hash->Partial32() : 0);
2792  map<uint32_t, ProxyInfo *>::iterator it = opt_proxy_map_.lower_bound(key);
2793  ProxyInfo *proxy = it->second;
2794 
2795  return proxy;
2796 }
2797 
2801 void DownloadManager::UpdateProxiesUnlocked(const string &reason) {
2802  if (!opt_proxy_groups_)
2803  return;
2804 
2805  // Identify number of non-burned proxies within the current group
2806  vector<ProxyInfo> *group = current_proxy_group();
2807  unsigned num_alive = (group->size() - opt_proxy_groups_current_burned_);
2808  string old_proxy = JoinStrings(opt_proxy_urls_, "|");
2809 
2810  // Rebuild proxy map and URL list
2811  opt_proxy_map_.clear();
2812  opt_proxy_urls_.clear();
2813  const uint32_t max_key = 0xffffffffUL;
2814  if (opt_proxy_shard_) {
2815  // Build a consistent map with multiple entries for each proxy
2816  for (unsigned i = 0; i < num_alive; ++i) {
2817  ProxyInfo *proxy = &(*group)[i];
2818  shash::Any proxy_hash(shash::kSha1);
2819  HashString(proxy->url, &proxy_hash);
2820  Prng prng;
2821  prng.InitSeed(proxy_hash.Partial32());
2822  for (unsigned j = 0; j < kProxyMapScale; ++j) {
2823  const std::pair<uint32_t, ProxyInfo *> entry(prng.Next(max_key), proxy);
2824  opt_proxy_map_.insert(entry);
2825  }
2826  opt_proxy_urls_.push_back(proxy->url);
2827  }
2828  // Ensure lower_bound() finds a value for all keys
2829  ProxyInfo *first_proxy = opt_proxy_map_.begin()->second;
2830  const std::pair<uint32_t, ProxyInfo *> last_entry(max_key, first_proxy);
2831  opt_proxy_map_.insert(last_entry);
2832  } else {
2833  // Build a map with a single entry for one randomly selected proxy
2834  unsigned select = prng_.Next(num_alive);
2835  ProxyInfo *proxy = &(*group)[select];
2836  const std::pair<uint32_t, ProxyInfo *> entry(max_key, proxy);
2837  opt_proxy_map_.insert(entry);
2838  opt_proxy_urls_.push_back(proxy->url);
2839  }
2840  sort(opt_proxy_urls_.begin(), opt_proxy_urls_.end());
2841 
2842  // Report any change in proxy usage
2843  string new_proxy = JoinStrings(opt_proxy_urls_, "|");
2844  if (new_proxy != old_proxy) {
2846  "(manager '%s') switching proxy from %s to %s. Reason: %s",
2847  name_.c_str(), (old_proxy.empty() ? "(none)" : old_proxy.c_str()),
2848  (new_proxy.empty() ? "(none)" : new_proxy.c_str()),
2849  reason.c_str());
2850  }
2851 }
2852 
2857  opt_proxy_shard_ = true;
2858  RebalanceProxiesUnlocked("enable sharding");
2859 }
2860 
2865 void DownloadManager::RebalanceProxiesUnlocked(const string &reason) {
2866  if (!opt_proxy_groups_)
2867  return;
2868 
2871  UpdateProxiesUnlocked(reason);
2872 }
2873 
2874 
2877  RebalanceProxiesUnlocked("rebalance invoked manually");
2878 }
2879 
2880 
2886 
2887  if (!opt_proxy_groups_ || (opt_proxy_groups_->size() < 2)) {
2888  return;
2889  }
2890 
2892  opt_proxy_groups_->size();
2893  opt_timestamp_backup_proxies_ = time(NULL);
2894 
2895  std::string msg = "switch to proxy group " +
2898 }
2899 
2900 
2901 void DownloadManager::SetProxyGroupResetDelay(const unsigned seconds) {
2904  if (opt_proxy_groups_reset_after_ == 0) {
2907  }
2908 }
2909 
2910 
2911 void DownloadManager::SetHostResetDelay(const unsigned seconds)
2912 {
2914  opt_host_reset_after_ = seconds;
2915  if (opt_host_reset_after_ == 0)
2917 }
2918 
2919 
2920 void DownloadManager::SetRetryParameters(const unsigned max_retries,
2921  const unsigned backoff_init_ms,
2922  const unsigned backoff_max_ms)
2923 {
2925  opt_max_retries_ = max_retries;
2926  opt_backoff_init_ms_ = backoff_init_ms;
2927  opt_backoff_max_ms_ = backoff_max_ms;
2928 }
2929 
2930 
2933  resolver_->set_throttle(limit);
2934 }
2935 
2936 
2938  const std::string &direct,
2939  const std::string &forced)
2940 {
2942  proxy_template_direct_ = direct;
2943  proxy_template_forced_ = forced;
2944 }
2945 
2946 
2948  enable_info_header_ = true;
2949 }
2950 
2951 
2953  follow_redirects_ = true;
2954 }
2955 
2958 }
2959 
2961  enable_http_tracing_ = true;
2962 }
2963 
2964 void DownloadManager::AddHTTPTracingHeader(const std::string &header) {
2965  http_tracing_headers_.push_back(header);
2966 }
2967 
2970 }
2971 
2973  bool success = false;
2974  switch (type) {
2975  default:
2976  LogCvmfs(kLogDownload, kLogDebug | kLogSyslogErr, "(manager '%s') "
2977  "Proposed sharding policy does not exist. Falling back to default",
2978  name_.c_str());
2979  }
2980  return success;
2981 }
2982 
2984  failover_indefinitely_ = true;
2985 }
2986 
2992  const perf::StatisticsTemplate &statistics, const std::string &cloned_name)
2993 {
2994  DownloadManager *clone = new DownloadManager(pool_max_handles_, statistics,
2995  cloned_name);
2996 
3000 
3001  if (!opt_dns_server_.empty())
3002  clone->SetDnsServer(opt_dns_server_);
3014  if (opt_host_chain_) {
3015  clone->opt_host_chain_ = new vector<string>(*opt_host_chain_);
3016  clone->opt_host_chain_rtt_ = new vector<int>(*opt_host_chain_rtt_);
3017  }
3018 
3019  CloneProxyConfig(clone);
3027 
3028  clone->health_check_ = health_check_;
3031  clone->fqrn_ = fqrn_;
3032 
3033  return clone;
3034 }
3035 
3036 
3045  if (opt_proxy_groups_ == NULL)
3046  return;
3047 
3048  clone->opt_proxy_groups_ = new vector< vector<ProxyInfo> >(
3050  clone->UpdateProxiesUnlocked("cloned");
3051 }
3052 
3053 } // namespace download
unsigned opt_timeout_direct_
Definition: download.h:292
std::vector< std::string > http_tracing_headers_
Definition: download.h:308
bool StripDirect(const std::string &proxy_list, std::string *cleaned_list)
Definition: download.cc:2553
unsigned opt_low_speed_limit_
Definition: download.h:293
void HashString(const std::string &content, Any *any_digest)
Definition: hash.cc:268
Definition: prng.h:28
static const unsigned kDnsDefaultTimeoutMs
Definition: download.h:155
std::vector< T > Shuffle(const std::vector< T > &input, Prng *prng)
Definition: algorithm.h:50
unsigned throttle() const
Definition: dns.h:206
z_stream * GetZstreamPtr()
Definition: jobinfo.h:162
unsigned opt_backoff_init_ms_
Definition: download.h:295
void SetInfoHeader(char *info_header)
Definition: jobinfo.h:238
shash::ContextPtr * GetHashContextPtr()
Definition: jobinfo.h:167
uid_t uid() const
Definition: jobinfo.h:179
int64_t Xadd(class Counter *counter, const int64_t delta)
Definition: statistics.h:51
struct stat64 platform_stat64
const char * Code2Ascii(const Failures error)
Definition: dns.h:53
int64_t id() const
Definition: dns.h:108
unsigned opt_proxy_groups_current_burned_
Definition: download.h:329
double DiffTimeSeconds(struct timeval start, struct timeval end)
Definition: algorithm.cc:31
unsigned opt_proxy_groups_reset_after_
Definition: download.h:423
bool probe_hosts() const
Definition: jobinfo.h:174
virtual bool IsCanceled()
Definition: interrupt.h:16
void SetUrlOptions(JobInfo *info)
Definition: download.cc:1010
SharedPtr< ShardingPolicy > sharding_policy_
Definition: download.h:368
StreamStates DecompressZStream2Sink(const void *buf, const int64_t size, z_stream *strm, cvmfs::Sink *sink)
Definition: compression.cc:234
void ResolveMany(const std::vector< std::string > &names, std::vector< Host > *hosts)
Definition: dns.cc:370
std::string opt_proxy_fallback_list_
Definition: download.h:346
void SetHostChain(const std::string &host_list)
static NormalResolver * Create(const bool ipv4_only, const unsigned retries, const unsigned timeout_ms)
Definition: dns.cc:1269
int64_t id() const
Definition: jobinfo.h:210
unsigned opt_host_reset_after_
Definition: download.h:431
void SetLowSpeedLimit(const unsigned low_speed_limit)
Definition: download.cc:2057
virtual int Purge()=0
DownloadManager(const unsigned max_pool_handles, const perf::StatisticsTemplate &statistics, const std::string &name="standard")
Definition: download.cc:1753
std::string proxy_template_direct_
Definition: download.h:407
curl_slist ** GetHeadersPtr()
Definition: jobinfo.h:165
static const int kProbeGeo
Definition: download.h:152
#define PANIC(...)
Definition: exception.h:29
string Trim(const string &raw, bool trim_newline)
Definition: string.cc:428
string ReplaceAll(const string &haystack, const string &needle, const string &replace_by)
Definition: string.cc:484
void set_min_ttl(unsigned seconds)
Definition: dns.h:207
string JoinStrings(const vector< string > &strings, const string &joint)
Definition: string.cc:325
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:249
unsigned opt_proxy_groups_current_
Definition: download.h:324
virtual bool RequiresReserve()=0
bool ValidateGeoReply(const std::string &reply_order, const unsigned expected_size, std::vector< uint64_t > *reply_vals)
Definition: download.cc:2513
std::vector< ProxyInfo > * current_proxy_group() const
Definition: download.h:264
void DecompressInit(z_stream *strm)
Definition: compression.cc:185
const std::string * url() const
Definition: jobinfo.h:172
void * cred_data() const
Definition: jobinfo.h:181
time_t opt_timestamp_backup_proxies_
Definition: download.h:421
void SetProxyChain(const std::string &proxy_list, const std::string &fallback_proxy_list, const ProxySetModes set_mode)
Definition: download.cc:2593
std::string GetProxyList()
Definition: download.cc:2775
unsigned min_ttl() const
Definition: dns.h:208
void InitLocaltime()
Definition: prng.h:39
bool allow_failure() const
Definition: jobinfo.h:209
std::set< CURL * > * pool_handles_inuse_
Definition: download.h:271
CURL * curl_handle() const
Definition: jobinfo.h:190
pthread_mutex_t * lock_options_
Definition: download.h:288
ProxyInfo * ChooseProxyUnlocked(const shash::Any *hash)
Definition: download.cc:2787
pthread_t thread_download_
Definition: download.h:278
std::string opt_proxy_list_
Definition: download.h:342
const std::string & name() const
Definition: dns.h:118
perf::Counter * sz_transfer_time
Definition: download.h:44
void SetTracingHeaderGid(char *tracing_header_gid)
Definition: jobinfo.h:241
std::vector< std::vector< ProxyInfo > > * opt_proxy_groups_
Definition: download.h:320
assert((mem||(size==0))&&"Out Of Memory")
void SetNocache(bool nocache)
Definition: jobinfo.h:249
unsigned opt_proxy_groups_fallback_
Definition: download.h:334
void ReleaseCurlHandle(CURL *handle)
Definition: download.cc:904
bool IsValidDataTube()
Definition: jobinfo.h:148
StreamStates
Definition: compression.h:36
void SetTracingHeaderPid(char *tracing_header_pid)
Definition: jobinfo.h:239
void set_max_ttl(unsigned seconds)
Definition: dns.h:209
Tube< DataTubeElement > * GetDataTubePtr()
Definition: jobinfo.h:170
Algorithms algorithm
Definition: hash.h:125
char * tracing_header_gid() const
Definition: jobinfo.h:194
const std::string path()
Definition: sink_path.h:109
const std::set< std::string > & ViewBestAddresses(IpPreference preference) const
Definition: dns.cc:205
char * info_header() const
Definition: jobinfo.h:192
void SetDnsServer(const std::string &address)
Definition: download.cc:1985
int platform_stat(const char *path, platform_stat64 *buf)
DownloadManager * Clone(const perf::StatisticsTemplate &statistics, const std::string &cloned_name)
Definition: download.cc:2991
bool force_nocache() const
Definition: jobinfo.h:177
std::string StringifyUint(const uint64_t value)
Definition: string.cc:84
void DecompressFini(z_stream *strm)
Definition: compression.cc:201
virtual std::string Describe()=0
static void * MainDownload(void *data)
Definition: download.cc:565
void InitSeed(const uint64_t seed)
Definition: prng.h:35
void SetTimeout(const unsigned seconds_proxy, const unsigned seconds_direct)
Definition: download.cc:2043
std::string opt_dns_server_
Definition: download.h:290
off_t range_offset() const
Definition: jobinfo.h:187
bool nocache() const
Definition: jobinfo.h:199
char algorithm
unsigned char num_used_hosts() const
Definition: jobinfo.h:203
virtual bool IsValid()=0
bool follow_redirects() const
Definition: jobinfo.h:176
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:138
void Init(ContextPtr context)
Definition: hash.cc:164
int http_code() const
Definition: jobinfo.h:201
bool IsValid(const std::string &input) const
Definition: sanitizer.cc:114
Algorithms
Definition: hash.h:41
void CreateDataTube()
Definition: jobinfo.h:143
bool FileExists(const std::string &path)
Definition: posix.cc:791
unsigned max_ttl() const
Definition: dns.h:210
Pipe< kPipeDownloadJobsResults > * GetPipeJobResultPtr()
Definition: jobinfo.h:168
void SetDnsTtlLimits(const unsigned min_seconds, const unsigned max_seconds)
Definition: download.cc:2022
static Failures PrepareDownloadDestination(JobInfo *info)
Definition: download.cc:115
void SetHttpCode(int http_code)
Definition: jobinfo.h:251
DataTubeAction action
Definition: jobinfo.h:49
std::vector< std::string > opt_proxy_urls_
Definition: download.h:354
const char * Code2Ascii(const Failures error)
bool head_request() const
Definition: jobinfo.h:175
unsigned char num_retries() const
Definition: jobinfo.h:204
bool IsValidPipeJobResults()
Definition: jobinfo.h:139
off_t range_size() const
Definition: jobinfo.h:188
void GetProxyInfo(std::vector< std::vector< ProxyInfo > > *proxy_chain, unsigned *current_group, unsigned *fallback_group)
Definition: download.cc:2750
void SetProxyGroupResetDelay(const unsigned seconds)
Definition: download.cc:2901
atomic_int32 multi_threaded_
Definition: download.h:279
bool compressed() const
Definition: jobinfo.h:173
void SetCurrentHostChainIndex(unsigned int current_host_chain_index)
Definition: jobinfo.h:258
std::string AddDefaultScheme(const std::string &proxy)
Definition: dns.cc:187
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:290
cvmfs::Sink * sink() const
Definition: jobinfo.h:183
gid_t gid() const
Definition: jobinfo.h:180
dns::NormalResolver * resolver_
Definition: download.h:395
bool Interrupted(const std::string &fqrn, JobInfo *info)
Definition: download.cc:85
std::string proxy() const
Definition: jobinfo.h:198
dns::IpPreference opt_ip_preference_
Definition: download.h:400
Algorithms algorithm
Definition: hash.h:500
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:245
unsigned int current_host_chain_index() const
Definition: jobinfo.h:206
void UseSystemCertificatePath()
Definition: ssl.cc:68
Definition: dns.h:90
bool SetShardingPolicy(const ShardingPolicySelector type)
Definition: download.cc:2972
perf::Counter * n_host_failover
Definition: download.h:48
void UpdateProxiesUnlocked(const std::string &reason)
Definition: download.cc:2801
bool IsEquivalent(const Host &other) const
Definition: dns.cc:269
bool IsProxyTransferError(const Failures error)
void SetNumRetries(unsigned char num_retries)
Definition: jobinfo.h:256
bool IsExpired() const
Definition: dns.cc:280
void set_throttle(const unsigned throttle)
Definition: dns.h:205
InterruptCue * interrupt_cue() const
Definition: jobinfo.h:182
void SetIpPreference(const dns::IpPreference preference)
Definition: download.cc:2032
virtual int Reset()=0
shash::ContextPtr hash_context() const
Definition: jobinfo.h:197
perf::Counter * n_requests
Definition: download.h:45
bool Read(T *data)
Definition: pipe.h:166
void Final(ContextPtr context, Any *any_digest)
Definition: hash.cc:221
void SetRetryParameters(const unsigned max_retries, const unsigned backoff_init_ms, const unsigned backoff_max_ms)
Definition: download.cc:2920
string StringifyInt(const int64_t value)
Definition: string.cc:78
char * tracing_header_uid() const
Definition: jobinfo.h:195
Failures error_code() const
Definition: jobinfo.h:200
void CloneProxyConfig(DownloadManager *clone)
Definition: download.cc:3037
void SetMaxIpaddrPerProxy(unsigned limit)
Definition: download.cc:2931
void EnableIgnoreSignatureFailures()
Definition: download.cc:2956
void Inc(class Counter *counter)
Definition: statistics.h:50
void * buffer
Definition: hash.h:501
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:267
std::string ExtractHost(const std::string &url)
Definition: dns.cc:110
void ** GetCredDataPtr()
Definition: jobinfo.h:164
std::vector< int > * opt_host_chain_rtt_
Definition: download.h:316
SslCertificateStore ssl_certificate_store_
Definition: download.h:444
time_t opt_timestamp_backup_host_
Definition: download.h:430
std::string GetFallbackProxyList()
Definition: download.cc:2779
uint32_t Partial32() const
Definition: hash.h:394
void SetProxyTemplates(const std::string &direct, const std::string &forced)
Definition: download.cc:2937
IpPreference
Definition: dns.h:46
unsigned retries() const
Definition: dns.h:203
unsigned opt_backoff_max_ms_
Definition: download.h:296
void SetErrorCode(Failures error_code)
Definition: jobinfo.h:250
void SetNumUsedProxies(unsigned char num_used_proxies)
Definition: jobinfo.h:252
unsigned GetContextSize(const Algorithms algorithm)
Definition: hash.cc:148
std::string GetDnsServer() const
Definition: download.cc:1977
unsigned opt_host_chain_current_
Definition: download.h:317
CredentialsAttachment * credentials_attachment_
Definition: download.h:433
std::vector< std::string > * opt_host_chain_
Definition: download.h:311
struct pollfd * watch_fds_
Definition: download.h:283
void Update(const unsigned char *buffer, const unsigned buffer_length, ContextPtr context)
Definition: hash.cc:190
std::map< uint32_t, ProxyInfo * > opt_proxy_map_
Definition: download.h:350
uint64_t String2Uint64(const string &value)
Definition: string.cc:228
UniquePtr< Pipe< kPipeDownloadJobs > > pipe_jobs_
Definition: download.h:282
void CreatePipeJobResults()
Definition: jobinfo.h:135
Failures Fetch(JobInfo *info)
Definition: download.cc:1860
void SetCurlHandle(CURL *curl_handle)
Definition: jobinfo.h:236
void SetTracingHeaderUid(char *tracing_header_uid)
Definition: jobinfo.h:243
Definition: mutex.h:42
curl_slist * headers() const
Definition: jobinfo.h:191
const shash::Any * expected_hash() const
Definition: jobinfo.h:184
perf::Counter * n_proxy_failover
Definition: download.h:47
virtual int Flush()=0
void GetTimeout(unsigned *seconds_proxy, unsigned *seconds_direct)
Definition: download.cc:2066
void SetCredData(void *cred_data)
Definition: jobinfo.h:223
std::string Filter(const std::string &input) const
Definition: sanitizer.cc:107
std::string proxy_template_forced_
Definition: download.h:413
time_t opt_timestamp_failover_proxies_
Definition: download.h:422
void SetDnsParameters(const unsigned retries, const unsigned timeout_ms)
Definition: download.cc:2004
unsigned EscapeHeader(const std::string &header, char *escaped_buf, size_t buf_size)
Definition: download.cc:437
const std::string * extra_info() const
Definition: jobinfo.h:185
static Host ExtendDeadline(const Host &original, unsigned seconds_from_now)
Definition: dns.cc:231
UniquePtr< Pipe< kPipeThreadTerminator > > pipe_terminate_
Definition: download.h:280
void SetProxy(const std::string &proxy)
Definition: jobinfo.h:248
static const int kProbeUnprobed
Definition: download.h:143
unsigned backoff_ms() const
Definition: jobinfo.h:205
virtual int Reset()
Definition: sink_mem.cc:52
pid_t pid() const
Definition: jobinfo.h:178
unsigned char num_used_proxies() const
Definition: jobinfo.h:202
void SafeSleepMs(const unsigned ms)
Definition: posix.cc:1975
bool IsHostTransferError(const Failures error)
static const unsigned kDnsDefaultRetries
Definition: download.h:154
void SortTeam(std::vector< T > *tractor, std::vector< U > *towed)
Definition: algorithm.h:68
bool GeoSortServers(std::vector< std::string > *servers, std::vector< uint64_t > *output_order=NULL)
Definition: download.cc:2306
virtual bool Reserve(size_t size)=0
static const int kProbeDown
Definition: download.h:148
void SetNumUsedHosts(unsigned char num_used_hosts)
Definition: jobinfo.h:254
unsigned size
Definition: hash.h:502
void SetHeaders(curl_slist *headers)
Definition: jobinfo.h:237
Failures status() const
Definition: dns.h:119
static const unsigned kProxyMapScale
Definition: download.h:156
void GetHostInfo(std::vector< std::string > *host_chain, std::vector< int > *rtt, unsigned *current_host)
Definition: download.cc:2110
static void size_t size
Definition: smalloc.h:54
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
Definition: download.cc:1374
char * tracing_header_pid() const
Definition: jobinfo.h:193
void SetBackoffMs(unsigned backoff_ms)
Definition: jobinfo.h:257
SharedPtr< HealthCheck > health_check_
Definition: download.h:376
void SwitchProxy(JobInfo *info)
Definition: download.cc:2130
void AddHTTPTracingHeader(const std::string &header)
Definition: download.cc:2964
void SetCredentialsAttachment(CredentialsAttachment *ca)
Definition: download.cc:1969
void SetFollowRedirects(bool follow_redirects)
Definition: jobinfo.h:217
void RebalanceProxiesUnlocked(const std::string &reason)
Definition: download.cc:2865
pthread_mutex_t * lock_synchronous_mode_
Definition: download.h:289
virtual bool SetResolvers(const std::vector< std::string > &resolvers)
Definition: dns.cc:1300
void InitializeRequest(JobInfo *info, CURL *handle)
Definition: download.cc:922
static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, void *userp, void *socketp)
Definition: download.cc:485
string RewriteUrl(const string &url, const string &ip)
Definition: dns.cc:156
void SetHostResetDelay(const unsigned seconds)
Definition: download.cc:2911
uint32_t Next(const uint64_t boundary)
Definition: prng.h:49
virtual int64_t Write(const void *buf, uint64_t sz)=0
unsigned timeout_ms() const
Definition: dns.h:204
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528