CernVM-FS  2.10.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
download.cc
Go to the documentation of this file.
1 
27 // TODO(jblomer): MS for time summing
28 // NOLINTNEXTLINE
29 #define __STDC_FORMAT_MACROS
30 
31 #include "cvmfs_config.h"
32 #include "download.h"
33 
34 #include <alloca.h>
35 #include <errno.h>
36 #include <inttypes.h>
37 #include <poll.h>
38 #include <pthread.h>
39 #include <signal.h>
40 #include <stdint.h>
41 #include <sys/time.h>
42 #include <unistd.h>
43 
44 #include <algorithm>
45 #include <cassert>
46 #include <cstdio>
47 #include <cstdlib>
48 #include <cstring>
49 #include <map>
50 #include <set>
51 #include <utility>
52 
53 #include "compression.h"
54 #include "crypto/hash.h"
55 #include "duplex_curl.h"
56 #include "interrupt.h"
57 #include "sanitizer.h"
58 #include "ssl.h"
59 #include "util/algorithm.h"
60 #include "util/atomic.h"
61 #include "util/concurrency.h"
62 #include "util/exception.h"
63 #include "util/logging.h"
64 #include "util/posix.h"
65 #include "util/prng.h"
66 #include "util/smalloc.h"
67 #include "util/string.h"
68 
69 using namespace std; // NOLINT
70 
71 namespace download {
72 
73 static inline bool EscapeUrlChar(char input, char output[3]) {
74  if (((input >= '0') && (input <= '9')) ||
75  ((input >= 'A') && (input <= 'Z')) ||
76  ((input >= 'a') && (input <= 'z')) ||
77  (input == '/') || (input == ':') || (input == '.') ||
78  (input == '@') ||
79  (input == '+') || (input == '-') ||
80  (input == '_') || (input == '~') ||
81  (input == '[') || (input == ']') || (input == ','))
82  {
83  output[0] = input;
84  return false;
85  }
86 
87  output[0] = '%';
88  output[1] = static_cast<char>(
89  (input / 16) + ((input / 16 <= 9) ? '0' : 'A'-10));
90  output[2] = static_cast<char>(
91  (input % 16) + ((input % 16 <= 9) ? '0' : 'A'-10));
92  return true;
93 }
94 
95 
100 static string EscapeUrl(const string &url) {
101  string escaped;
102  escaped.reserve(url.length());
103 
104  char escaped_char[3];
105  for (unsigned i = 0, s = url.length(); i < s; ++i) {
106  if (EscapeUrlChar(url[i], escaped_char)) {
107  escaped.append(escaped_char, 3);
108  } else {
109  escaped.push_back(escaped_char[0]);
110  }
111  }
112  LogCvmfs(kLogDownload, kLogDebug, "escaped %s to %s",
113  url.c_str(), escaped.c_str());
114 
115  return escaped;
116 }
117 
118 
123 static unsigned EscapeHeader(const string &header,
124  char *escaped_buf,
125  size_t buf_size)
126 {
127  unsigned esc_pos = 0;
128  char escaped_char[3];
129  for (unsigned i = 0, s = header.size(); i < s; ++i) {
130  if (EscapeUrlChar(header[i], escaped_char)) {
131  for (unsigned j = 0; j < 3; ++j) {
132  if (escaped_buf) {
133  if (esc_pos >= buf_size)
134  return esc_pos;
135  escaped_buf[esc_pos] = escaped_char[j];
136  }
137  esc_pos++;
138  }
139  } else {
140  if (escaped_buf) {
141  if (esc_pos >= buf_size)
142  return esc_pos;
143  escaped_buf[esc_pos] = escaped_char[0];
144  }
145  esc_pos++;
146  }
147  }
148 
149  return esc_pos;
150 }
151 
152 
154  info->destination_mem.size = 0;
155  info->destination_mem.pos = 0;
156  info->destination_mem.data = NULL;
157 
158  if (info->destination == kDestinationFile)
159  assert(info->destination_file != NULL);
160 
161  if (info->destination == kDestinationPath) {
162  assert(info->destination_path != NULL);
163  info->destination_file = fopen(info->destination_path->c_str(), "w");
164  if (info->destination_file == NULL) {
165  LogCvmfs(kLogDownload, kLogDebug, "Failed to open path %s: %s"
166  " (errno=%d).",
167  info->destination_path->c_str(), strerror(errno), errno);
168  return kFailLocalIO;
169  }
170  }
171 
172  if (info->destination == kDestinationSink)
173  assert(info->destination_sink != NULL);
174 
175  return kFailOk;
176 }
177 
178 
182 static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb,
183  void *info_link)
184 {
185  const size_t num_bytes = size*nmemb;
186  const string header_line(static_cast<const char *>(ptr), num_bytes);
187  JobInfo *info = static_cast<JobInfo *>(info_link);
188 
189  // LogCvmfs(kLogDownload, kLogDebug, "REMOVE-ME: Header callback with %s",
190  // header_line.c_str());
191 
192  // Check http status codes
193  if (HasPrefix(header_line, "HTTP/1.", false)) {
194  if (header_line.length() < 10)
195  return 0;
196 
197  unsigned i;
198  for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {}
199 
200  // Code is initialized to -1
201  if (header_line.length() > i+2) {
202  info->http_code = DownloadManager::ParseHttpCode(&header_line[i]);
203  }
204 
205  if ((info->http_code / 100) == 2) {
206  return num_bytes;
207  } else if ((info->http_code == 301) ||
208  (info->http_code == 302) ||
209  (info->http_code == 303) ||
210  (info->http_code == 307))
211  {
212  if (!info->follow_redirects) {
213  LogCvmfs(kLogDownload, kLogDebug, "redirect support not enabled: %s",
214  header_line.c_str());
215  info->error_code = kFailHostHttp;
216  return 0;
217  }
218  LogCvmfs(kLogDownload, kLogDebug, "http redirect: %s",
219  header_line.c_str());
220  // libcurl will handle this because of CURLOPT_FOLLOWLOCATION
221  return num_bytes;
222  } else {
223  LogCvmfs(kLogDownload, kLogDebug, "http status error code: %s [%d]",
224  header_line.c_str(), info->http_code);
225  if (((info->http_code / 100) == 5) ||
226  (info->http_code == 400) || (info->http_code == 404))
227  {
228  // 5XX returned by host
229  // 400: error from the GeoAPI module
230  // 404: the stratum 1 does not have the newest files
231  info->error_code = kFailHostHttp;
232  } else if (info->http_code == 429) {
233  // 429: rate throttling (we ignore the backoff hint for the time being)
235  } else {
236  info->error_code = (info->proxy == "DIRECT") ? kFailHostHttp :
238  }
239  return 0;
240  }
241  }
242 
243  // Allocate memory for kDestinationMemory
244  if ((info->destination == kDestinationMem) &&
245  HasPrefix(header_line, "CONTENT-LENGTH:", true))
246  {
247  char *tmp = reinterpret_cast<char *>(alloca(num_bytes+1));
248  uint64_t length = 0;
249  sscanf(header_line.c_str(), "%s %" PRIu64, tmp, &length);
250  if (length > 0) {
251  if (length > DownloadManager::kMaxMemSize) {
253  "resource %s too large to store in memory (%" PRIu64 ")",
254  info->url->c_str(), length);
255  info->error_code = kFailTooBig;
256  return 0;
257  }
258  info->destination_mem.data = static_cast<char *>(smalloc(length));
259  } else {
260  // Empty resource
261  info->destination_mem.data = NULL;
262  }
263  info->destination_mem.size = length;
264  } else if (HasPrefix(header_line, "LOCATION:", true)) {
265  // This comes along with redirects
266  LogCvmfs(kLogDownload, kLogDebug, "%s", header_line.c_str());
267  } else if (HasPrefix(header_line, "X-SQUID-ERROR:", true)) {
268  // Reinterpret host error as proxy error
269  if (info->error_code == kFailHostHttp) {
270  info->error_code = kFailProxyHttp;
271  }
272  } else if (HasPrefix(header_line, "PROXY-STATUS:", true)) {
273  // Reinterpret host error as proxy error if applicable
274  if ((info->error_code == kFailHostHttp) &&
275  (header_line.find("error=") != string::npos)) {
276  info->error_code = kFailProxyHttp;
277  }
278  }
279 
280  return num_bytes;
281 }
282 
283 
287 static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb,
288  void *info_link)
289 {
290  const size_t num_bytes = size*nmemb;
291  JobInfo *info = static_cast<JobInfo *>(info_link);
292 
293  // LogCvmfs(kLogDownload, kLogDebug, "Data callback, %d bytes", num_bytes);
294 
295  if (num_bytes == 0)
296  return 0;
297 
298  if (info->expected_hash) {
299  shash::Update(reinterpret_cast<unsigned char *>(ptr),
300  num_bytes, info->hash_context);
301  }
302 
303  if (info->destination == kDestinationSink) {
304  if (info->compressed) {
305  zlib::StreamStates retval =
306  zlib::DecompressZStream2Sink(ptr, static_cast<int64_t>(num_bytes),
307  &info->zstream, info->destination_sink);
308  if (retval == zlib::kStreamDataError) {
309  LogCvmfs(kLogDownload, kLogSyslogErr, "failed to decompress %s",
310  info->url->c_str());
311  info->error_code = kFailBadData;
312  return 0;
313  } else if (retval == zlib::kStreamIOError) {
315  "decompressing %s, local IO error", info->url->c_str());
316  info->error_code = kFailLocalIO;
317  return 0;
318  }
319  } else {
320  int64_t written = info->destination_sink->Write(ptr, num_bytes);
321  if ((written < 0) || (static_cast<uint64_t>(written) != num_bytes)) {
322  LogCvmfs(kLogDownload, kLogDebug, "Failed to perform write on %s (%"
323  PRId64 ")", info->url->c_str(), written);
324  info->error_code = kFailLocalIO;
325  return 0;
326  }
327  }
328  } else if (info->destination == kDestinationMem) {
329  // Write to memory
330  if (info->destination_mem.pos + num_bytes > info->destination_mem.size) {
331  if (info->destination_mem.size == 0) {
333  "Content-Length was missing or zero, but %zu bytes received",
334  info->destination_mem.pos + num_bytes);
335  } else {
336  LogCvmfs(kLogDownload, kLogDebug, "Callback had too much data: "
337  "start %zu, bytes %zu, expected %zu",
338  info->destination_mem.pos,
339  num_bytes,
340  info->destination_mem.size);
341  }
342  info->error_code = kFailBadData;
343  return 0;
344  }
345  memcpy(info->destination_mem.data + info->destination_mem.pos,
346  ptr, num_bytes);
347  info->destination_mem.pos += num_bytes;
348  } else {
349  // Write to file
350  if (info->compressed) {
351  // LogCvmfs(kLogDownload, kLogDebug, "REMOVE-ME: writing %d bytes for %s",
352  // num_bytes, info->url->c_str());
353  zlib::StreamStates retval =
354  zlib::DecompressZStream2File(ptr, static_cast<int64_t>(num_bytes),
355  &info->zstream, info->destination_file);
356  if (retval == zlib::kStreamDataError) {
357  LogCvmfs(kLogDownload, kLogSyslogErr, "failed to decompress %s",
358  info->url->c_str());
359  info->error_code = kFailBadData;
360  return 0;
361  } else if (retval == zlib::kStreamIOError) {
363  "decompressing %s, local IO error", info->url->c_str());
364  info->error_code = kFailLocalIO;
365  return 0;
366  }
367  } else {
368  if (fwrite(ptr, 1, num_bytes, info->destination_file) != num_bytes) {
370  "downloading %s, IO failure: %s (errno=%d)",
371  info->url->c_str(), strerror(errno), errno);
372  info->error_code = kFailLocalIO;
373  return 0;
374  }
375  }
376  }
377 
378  return num_bytes;
379 }
380 
381 
382 //------------------------------------------------------------------------------
383 
384 
385 bool JobInfo::IsFileNotFound() {
386  if (HasPrefix(*url, "file://", true /* ignore_case */))
387  return error_code == kFailHostConnection;
388 
389  return http_code == 404;
390 }
391 
392 
393 //------------------------------------------------------------------------------
394 
395 
396 const int DownloadManager::kProbeUnprobed = -1;
397 const int DownloadManager::kProbeDown = -2;
398 const int DownloadManager::kProbeGeo = -3;
399 const unsigned DownloadManager::kMaxMemSize = 1024*1024;
400 
401 
405 int DownloadManager::ParseHttpCode(const char digits[3]) {
406  int result = 0;
407  int factor = 100;
408  for (int i = 0; i < 3; ++i) {
409  if ((digits[i] < '0') || (digits[i] > '9'))
410  return -1;
411  result += (digits[i] - '0') * factor;
412  factor /= 10;
413  }
414  return result;
415 }
416 
417 
421 int DownloadManager::CallbackCurlSocket(CURL * /* easy */,
422  curl_socket_t s,
423  int action,
424  void *userp,
425  void * /* socketp */)
426 {
427  // LogCvmfs(kLogDownload, kLogDebug, "CallbackCurlSocket called with easy "
428  // "handle %p, socket %d, action %d", easy, s, action);
429  DownloadManager *download_mgr = static_cast<DownloadManager *>(userp);
430  if (action == CURL_POLL_NONE)
431  return 0;
432 
433  // Find s in watch_fds_
434  unsigned index;
435  for (index = 0; index < download_mgr->watch_fds_inuse_; ++index) {
436  if (download_mgr->watch_fds_[index].fd == s)
437  break;
438  }
439  // Or create newly
440  if (index == download_mgr->watch_fds_inuse_) {
441  // Extend array if necessary
442  if (download_mgr->watch_fds_inuse_ == download_mgr->watch_fds_size_)
443  {
444  assert(download_mgr->watch_fds_size_ > 0);
445  download_mgr->watch_fds_size_ *= 2;
446  download_mgr->watch_fds_ = static_cast<struct pollfd *>(
447  srealloc(download_mgr->watch_fds_,
448  download_mgr->watch_fds_size_ * sizeof(struct pollfd)));
449  }
450  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].fd = s;
451  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].events = 0;
452  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].revents = 0;
453  download_mgr->watch_fds_inuse_++;
454  }
455 
456  switch (action) {
457  case CURL_POLL_IN:
458  download_mgr->watch_fds_[index].events = POLLIN | POLLPRI;
459  break;
460  case CURL_POLL_OUT:
461  download_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND;
462  break;
463  case CURL_POLL_INOUT:
464  download_mgr->watch_fds_[index].events =
465  POLLIN | POLLPRI | POLLOUT | POLLWRBAND;
466  break;
467  case CURL_POLL_REMOVE:
468  if (index < download_mgr->watch_fds_inuse_-1) {
469  download_mgr->watch_fds_[index] =
470  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_-1];
471  }
472  download_mgr->watch_fds_inuse_--;
473  // Shrink array if necessary
474  if ((download_mgr->watch_fds_inuse_ > download_mgr->watch_fds_max_) &&
475  (download_mgr->watch_fds_inuse_ < download_mgr->watch_fds_size_/2))
476  {
477  download_mgr->watch_fds_size_ /= 2;
478  // LogCvmfs(kLogDownload, kLogDebug, "shrinking watch_fds_ (%d)",
479  // watch_fds_size_);
480  download_mgr->watch_fds_ = static_cast<struct pollfd *>(
481  srealloc(download_mgr->watch_fds_,
482  download_mgr->watch_fds_size_*sizeof(struct pollfd)));
483  // LogCvmfs(kLogDownload, kLogDebug, "shrinking watch_fds_ done",
484  // watch_fds_size_);
485  }
486  break;
487  default:
488  break;
489  }
490 
491  return 0;
492 }
493 
494 
498 void *DownloadManager::MainDownload(void *data) {
499  LogCvmfs(kLogDownload, kLogDebug, "download I/O thread started");
500  DownloadManager *download_mgr = static_cast<DownloadManager *>(data);
501 
502  download_mgr->watch_fds_ =
503  static_cast<struct pollfd *>(smalloc(2 * sizeof(struct pollfd)));
504  download_mgr->watch_fds_size_ = 2;
505  download_mgr->watch_fds_[0].fd = download_mgr->pipe_terminate_[0];
506  download_mgr->watch_fds_[0].events = POLLIN | POLLPRI;
507  download_mgr->watch_fds_[0].revents = 0;
508  download_mgr->watch_fds_[1].fd = download_mgr->pipe_jobs_[0];
509  download_mgr->watch_fds_[1].events = POLLIN | POLLPRI;
510  download_mgr->watch_fds_[1].revents = 0;
511  download_mgr->watch_fds_inuse_ = 2;
512 
513  int still_running = 0;
514  struct timeval timeval_start, timeval_stop;
515  gettimeofday(&timeval_start, NULL);
516  while (true) {
517  int timeout;
518  if (still_running) {
519  /* NOTE: The following might degrade the performance for many small files
520  * use case. TODO(jblomer): look into it.
521  // Specify a timeout for polling in ms; this allows us to return
522  // to libcurl once a second so it can look for internal operations
523  // which timed out. libcurl has a more elaborate mechanism
524  // (CURLMOPT_TIMERFUNCTION) that would inform us of the next potential
525  // timeout. TODO(bbockelm) we should switch to that in the future.
526  timeout = 100;
527  */
528  timeout = 1;
529  } else {
530  timeout = -1;
531  gettimeofday(&timeval_stop, NULL);
532  int64_t delta = static_cast<int64_t>(
533  1000 * DiffTimeSeconds(timeval_start, timeval_stop));
534  perf::Xadd(download_mgr->counters_->sz_transfer_time, delta);
535  }
536  int retval = poll(download_mgr->watch_fds_, download_mgr->watch_fds_inuse_,
537  timeout);
538  if (retval < 0) {
539  continue;
540  }
541 
542  // Handle timeout
543  if (retval == 0) {
544  curl_multi_socket_action(download_mgr->curl_multi_,
545  CURL_SOCKET_TIMEOUT,
546  0,
547  &still_running);
548  }
549 
550  // Terminate I/O thread
551  if (download_mgr->watch_fds_[0].revents)
552  break;
553 
554  // New job arrives
555  if (download_mgr->watch_fds_[1].revents) {
556  download_mgr->watch_fds_[1].revents = 0;
557  JobInfo *info;
558  // NOLINTNEXTLINE(bugprone-sizeof-expression)
559  ReadPipe(download_mgr->pipe_jobs_[0], &info, sizeof(info));
560  if (!still_running)
561  gettimeofday(&timeval_start, NULL);
562  CURL *handle = download_mgr->AcquireCurlHandle();
563  download_mgr->InitializeRequest(info, handle);
564  download_mgr->SetUrlOptions(info);
565  curl_multi_add_handle(download_mgr->curl_multi_, handle);
566  curl_multi_socket_action(download_mgr->curl_multi_,
567  CURL_SOCKET_TIMEOUT,
568  0,
569  &still_running);
570  }
571 
572  // Activity on curl sockets
573  // Within this loop the curl_multi_socket_action() may cause socket(s)
574  // to be removed from watch_fds_. If a socket is removed it is replaced
575  // by the socket at the end of the array and the inuse count is decreased.
576  // Therefore loop over the array in reverse order.
577  for (int64_t i = download_mgr->watch_fds_inuse_-1; i >= 2; --i) {
578  if (i >= download_mgr->watch_fds_inuse_) {
579  continue;
580  }
581  if (download_mgr->watch_fds_[i].revents) {
582  int ev_bitmask = 0;
583  if (download_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI))
584  ev_bitmask |= CURL_CSELECT_IN;
585  if (download_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
586  ev_bitmask |= CURL_CSELECT_OUT;
587  if (download_mgr->watch_fds_[i].revents &
588  (POLLERR | POLLHUP | POLLNVAL))
589  {
590  ev_bitmask |= CURL_CSELECT_ERR;
591  }
592  download_mgr->watch_fds_[i].revents = 0;
593 
594  curl_multi_socket_action(download_mgr->curl_multi_,
595  download_mgr->watch_fds_[i].fd,
596  ev_bitmask,
597  &still_running);
598  }
599  }
600 
601  // Check if transfers are completed
602  CURLMsg *curl_msg;
603  int msgs_in_queue;
604  while ((curl_msg = curl_multi_info_read(download_mgr->curl_multi_,
605  &msgs_in_queue)))
606  {
607  if (curl_msg->msg == CURLMSG_DONE) {
608  perf::Inc(download_mgr->counters_->n_requests);
609  JobInfo *info;
610  CURL *easy_handle = curl_msg->easy_handle;
611  int curl_error = curl_msg->data.result;
612  curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
613 
614  curl_multi_remove_handle(download_mgr->curl_multi_, easy_handle);
615  if (download_mgr->VerifyAndFinalize(curl_error, info)) {
616  curl_multi_add_handle(download_mgr->curl_multi_, easy_handle);
617  curl_multi_socket_action(download_mgr->curl_multi_,
618  CURL_SOCKET_TIMEOUT,
619  0,
620  &still_running);
621  } else {
622  // Return easy handle into pool and write result back
623  download_mgr->ReleaseCurlHandle(easy_handle);
624 
625  WritePipe(info->wait_at[1], &info->error_code,
626  sizeof(info->error_code));
627  }
628  }
629  }
630  }
631 
632  for (set<CURL *>::iterator i = download_mgr->pool_handles_inuse_->begin(),
633  iEnd = download_mgr->pool_handles_inuse_->end(); i != iEnd; ++i)
634  {
635  curl_multi_remove_handle(download_mgr->curl_multi_, *i);
636  curl_easy_cleanup(*i);
637  }
638  download_mgr->pool_handles_inuse_->clear();
639  free(download_mgr->watch_fds_);
640 
641  LogCvmfs(kLogDownload, kLogDebug, "download I/O thread terminated");
642  return NULL;
643 }
644 
645 
646 //------------------------------------------------------------------------------
647 
648 
649 HeaderLists::~HeaderLists() {
650  for (unsigned i = 0; i < blocks_.size(); ++i) {
651  delete[] blocks_[i];
652  }
653  blocks_.clear();
654 }
655 
656 
657 curl_slist *HeaderLists::GetList(const char *header) {
658  return Get(header);
659 }
660 
661 
662 curl_slist *HeaderLists::DuplicateList(curl_slist *slist) {
663  assert(slist);
664  curl_slist *copy = GetList(slist->data);
665  copy->next = slist->next;
666  curl_slist *prev = copy;
667  slist = slist->next;
668  while (slist) {
669  curl_slist *new_link = Get(slist->data);
670  new_link->next = slist->next;
671  prev->next = new_link;
672  prev = new_link;
673  slist = slist->next;
674  }
675  return copy;
676 }
677 
678 
679 void HeaderLists::AppendHeader(curl_slist *slist, const char *header) {
680  assert(slist);
681  curl_slist *new_link = Get(header);
682  new_link->next = NULL;
683 
684  while (slist->next)
685  slist = slist->next;
686  slist->next = new_link;
687 }
688 
689 
695 void HeaderLists::CutHeader(const char *header, curl_slist **slist) {
696  assert(slist);
697  curl_slist head;
698  head.next = *slist;
699  curl_slist *prev = &head;
700  curl_slist *rover = *slist;
701  while (rover) {
702  if (strcmp(rover->data, header) == 0) {
703  prev->next = rover->next;
704  Put(rover);
705  rover = prev;
706  }
707  prev = rover;
708  rover = rover->next;
709  }
710  *slist = head.next;
711 }
712 
713 
714 void HeaderLists::PutList(curl_slist *slist) {
715  while (slist) {
716  curl_slist *next = slist->next;
717  Put(slist);
718  slist = next;
719  }
720 }
721 
722 
723 string HeaderLists::Print(curl_slist *slist) {
724  string verbose;
725  while (slist) {
726  verbose += string(slist->data) + "\n";
727  slist = slist->next;
728  }
729  return verbose;
730 }
731 
732 
733 curl_slist *HeaderLists::Get(const char *header) {
734  for (unsigned i = 0; i < blocks_.size(); ++i) {
735  for (unsigned j = 0; j < kBlockSize; ++j) {
736  if (!IsUsed(&(blocks_[i][j]))) {
737  blocks_[i][j].data = const_cast<char *>(header);
738  return &(blocks_[i][j]);
739  }
740  }
741  }
742 
743  // All used, new block
744  AddBlock();
745  blocks_[blocks_.size()-1][0].data = const_cast<char *>(header);
746  return &(blocks_[blocks_.size()-1][0]);
747 }
748 
749 
750 void HeaderLists::Put(curl_slist *slist) {
751  slist->data = NULL;
752  slist->next = NULL;
753 }
754 
755 
756 void HeaderLists::AddBlock() {
757  curl_slist *new_block = new curl_slist[kBlockSize];
758  for (unsigned i = 0; i < kBlockSize; ++i) {
759  Put(&new_block[i]);
760  }
761  blocks_.push_back(new_block);
762 }
763 
764 
765 //------------------------------------------------------------------------------
766 
767 
768 string DownloadManager::ProxyInfo::Print() {
769  if (url == "DIRECT")
770  return url;
771 
772  string result = url;
773  int remaining =
774  static_cast<int>(host.deadline()) - static_cast<int>(time(NULL));
775  string expinfo = (remaining >= 0) ? "+" : "";
776  if (abs(remaining) >= 3600) {
777  expinfo += StringifyInt(remaining/3600) + "h";
778  } else if (abs(remaining) >= 60) {
779  expinfo += StringifyInt(remaining/60) + "m";
780  } else {
781  expinfo += StringifyInt(remaining) + "s";
782  }
783  if (host.status() == dns::kFailOk) {
784  result += " (" + host.name() + ", " + expinfo + ")";
785  } else {
786  result += " (:unresolved:, " + expinfo + ")";
787  }
788  return result;
789 }
790 
791 
796 CURL *DownloadManager::AcquireCurlHandle() {
797  CURL *handle;
798 
799  if (pool_handles_idle_->empty()) {
800  // Create a new handle
801  handle = curl_easy_init();
802  assert(handle != NULL);
803 
804  curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
805  // curl_easy_setopt(curl_default, CURLOPT_FAILONERROR, 1);
806  curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION, CallbackCurlHeader);
807  curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlData);
808  } else {
809  handle = *(pool_handles_idle_->begin());
810  pool_handles_idle_->erase(pool_handles_idle_->begin());
811  }
812 
813  pool_handles_inuse_->insert(handle);
814 
815  return handle;
816 }
817 
818 
819 void DownloadManager::ReleaseCurlHandle(CURL *handle) {
820  set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
821  assert(elem != pool_handles_inuse_->end());
822 
823  if (pool_handles_idle_->size() > pool_max_handles_) {
824  curl_easy_cleanup(*elem);
825  } else {
826  pool_handles_idle_->insert(*elem);
827  }
828 
829  pool_handles_inuse_->erase(elem);
830 }
831 
832 
837 void DownloadManager::InitializeRequest(JobInfo *info, CURL *handle) {
838  // Initialize internal download state
839  info->curl_handle = handle;
840  info->error_code = kFailOk;
841  info->http_code = -1;
842  info->follow_redirects = follow_redirects_;
843  info->num_used_proxies = 1;
844  info->num_used_hosts = 1;
845  info->num_retries = 0;
846  info->backoff_ms = 0;
847  info->headers = header_lists_->DuplicateList(default_headers_);
848  if (info->info_header) {
849  header_lists_->AppendHeader(info->headers, info->info_header);
850  }
851  if (info->force_nocache) {
852  SetNocache(info);
853  } else {
854  info->nocache = false;
855  }
856  if (info->compressed) {
857  zlib::DecompressInit(&(info->zstream));
858  }
859  if (info->expected_hash) {
860  assert(info->hash_context.buffer != NULL);
861  shash::Init(info->hash_context);
862  }
863 
864  if ((info->range_offset != -1) && (info->range_size)) {
865  char byte_range_array[100];
866  const int64_t range_lower = static_cast<int64_t>(info->range_offset);
867  const int64_t range_upper = static_cast<int64_t>(
868  info->range_offset + info->range_size - 1);
869  if (snprintf(byte_range_array, sizeof(byte_range_array),
870  "%" PRId64 "-%" PRId64,
871  range_lower, range_upper) == 100)
872  {
873  PANIC(NULL); // Should be impossible given limits on offset size.
874  }
875  curl_easy_setopt(handle, CURLOPT_RANGE, byte_range_array);
876  } else {
877  curl_easy_setopt(handle, CURLOPT_RANGE, NULL);
878  }
879 
880  // Set curl parameters
881  curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
882  curl_easy_setopt(handle, CURLOPT_WRITEHEADER,
883  static_cast<void *>(info));
884  curl_easy_setopt(handle, CURLOPT_WRITEDATA, static_cast<void *>(info));
885  curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->headers);
886  if (info->head_request) {
887  curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
888  } else {
889  curl_easy_setopt(handle, CURLOPT_HTTPGET, 1);
890  }
891  if (opt_ipv4_only_) {
892  curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
893  }
894  if (follow_redirects_) {
895  curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1);
896  curl_easy_setopt(handle, CURLOPT_MAXREDIRS, 4);
897  }
898 }
899 
900 
905 void DownloadManager::SetUrlOptions(JobInfo *info) {
906  CURL *curl_handle = info->curl_handle;
907  string url_prefix;
908 
909  MutexLockGuard m(lock_options_);
910  // Check if proxy group needs to be reset from backup to primary
911  if (opt_timestamp_backup_proxies_ > 0) {
912  const time_t now = time(NULL);
913  if (static_cast<int64_t>(now) >
914  static_cast<int64_t>(opt_timestamp_backup_proxies_ +
915  opt_proxy_groups_reset_after_))
916  {
917  opt_proxy_groups_current_ = 0;
918  opt_timestamp_backup_proxies_ = 0;
919  RebalanceProxiesUnlocked("reset proxy group");
920  }
921  }
922  // Check if load-balanced proxies within the group need to be reset
923  if (opt_timestamp_failover_proxies_ > 0) {
924  const time_t now = time(NULL);
925  if (static_cast<int64_t>(now) >
926  static_cast<int64_t>(opt_timestamp_failover_proxies_ +
927  opt_proxy_groups_reset_after_))
928  {
929  RebalanceProxiesUnlocked("reset load-balanced proxies");
930  }
931  }
932  // Check if host needs to be reset
933  if (opt_timestamp_backup_host_ > 0) {
934  const time_t now = time(NULL);
935  if (static_cast<int64_t>(now) >
936  static_cast<int64_t>(opt_timestamp_backup_host_ +
937  opt_host_reset_after_))
938  {
940  "switching host from %s to %s (reset host)",
941  (*opt_host_chain_)[opt_host_chain_current_].c_str(),
942  (*opt_host_chain_)[0].c_str());
943  opt_host_chain_current_ = 0;
944  opt_timestamp_backup_host_ = 0;
945  }
946  }
947 
948  ProxyInfo *proxy = ChooseProxyUnlocked(info->expected_hash);
949  if (!proxy || (proxy->url == "DIRECT")) {
950  info->proxy = "DIRECT";
951  curl_easy_setopt(info->curl_handle, CURLOPT_PROXY, "");
952  } else {
953  // Note: inside ValidateProxyIpsUnlocked() we may change the proxy data
954  // structure, so we must not pass proxy->... (== current_proxy())
955  // parameters directly
956  std::string purl = proxy->url;
957  dns::Host phost = proxy->host;
958  const bool changed = ValidateProxyIpsUnlocked(purl, phost);
959  // Current proxy may have changed
960  if (changed)
961  proxy = ChooseProxyUnlocked(info->expected_hash);
962  info->proxy = proxy->url;
963  if (proxy->host.status() == dns::kFailOk) {
964  curl_easy_setopt(info->curl_handle, CURLOPT_PROXY, info->proxy.c_str());
965  } else {
966  // We know it can't work, don't even try to download
967  curl_easy_setopt(info->curl_handle, CURLOPT_PROXY, "0.0.0.0");
968  }
969  }
970  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, opt_low_speed_limit_);
971  if (info->proxy != "DIRECT") {
972  curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_proxy_);
973  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_proxy_);
974  } else {
975  curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_direct_);
976  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_direct_);
977  }
978  if (!opt_dns_server_.empty())
979  curl_easy_setopt(curl_handle, CURLOPT_DNS_SERVERS, opt_dns_server_.c_str());
980 
981  if (info->probe_hosts && opt_host_chain_) {
982  url_prefix = (*opt_host_chain_)[opt_host_chain_current_];
983  info->current_host_chain_index = opt_host_chain_current_;
984  }
985 
986  string url = url_prefix + *(info->url);
987 
988  curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 1L);
989  if (url.substr(0, 5) == "https") {
990  bool rvb = ssl_certificate_store_.ApplySslCertificatePath(curl_handle);
991  if (!rvb) {
993  "Failed to set SSL certificate path %s",
994  ssl_certificate_store_.GetCaPath().c_str());
995  }
996  if (info->pid != -1) {
997  if (credentials_attachment_ == NULL) {
999  "uses secure downloads but no credentials attachment set");
1000  } else {
1001  bool retval = credentials_attachment_->ConfigureCurlHandle(
1002  curl_handle, info->pid, &info->cred_data);
1003  if (!retval) {
1004  LogCvmfs(kLogDownload, kLogDebug, "failed attaching credentials");
1005  }
1006  }
1007  }
1008  // The download manager disables signal handling in the curl library;
1009  // as OpenSSL's implementation of TLS will generate a sigpipe in some
1010  // error paths, we must explicitly disable SIGPIPE here.
1011  // TODO(jblomer): it should be enough to do this once
1012  signal(SIGPIPE, SIG_IGN);
1013  }
1014 
1015  if (url.find("@proxy@") != string::npos) {
1016  // This is used in Geo-API requests (only), to replace a portion of the
1017  // URL with the current proxy name for the sake of caching the result.
1018  // Replace the @proxy@ either with a passed in "forced" template (which
1019  // is set from $CVMFS_PROXY_TEMPLATE) if there is one, or a "direct"
1020  // template (which is the uuid) if there's no proxy, or the name of the
1021  // proxy.
1022  string replacement;
1023  if (proxy_template_forced_ != "") {
1024  replacement = proxy_template_forced_;
1025  } else if (info->proxy == "DIRECT") {
1026  replacement = proxy_template_direct_;
1027  } else {
1028  if (opt_proxy_groups_current_ >= opt_proxy_groups_fallback_) {
1029  // It doesn't make sense to use the fallback proxies in Geo-API requests
1030  // since the fallback proxies are supposed to get sorted, too.
1031  info->proxy = "DIRECT";
1032  curl_easy_setopt(info->curl_handle, CURLOPT_PROXY, "");
1033  replacement = proxy_template_direct_;
1034  } else {
1035  replacement = ChooseProxyUnlocked(info->expected_hash)->host.name();
1036  }
1037  }
1038  replacement = (replacement == "") ? proxy_template_direct_ : replacement;
1039  LogCvmfs(kLogDownload, kLogDebug, "replacing @proxy@ by %s",
1040  replacement.c_str());
1041  url = ReplaceAll(url, "@proxy@", replacement);
1042  }
1043 
1044  if ((info->destination == kDestinationMem) &&
1045  (info->destination_mem.size == 0) &&
1046  HasPrefix(url, "file://", false))
1047  {
1048  info->destination_mem.size = 64*1024;
1049  info->destination_mem.data = static_cast<char *>(smalloc(64*1024));
1050  }
1051 
1052  curl_easy_setopt(curl_handle, CURLOPT_URL, EscapeUrl(url).c_str());
1053 }
1054 
1055 
1065 bool DownloadManager::ValidateProxyIpsUnlocked(
1066  const string &url,
1067  const dns::Host &host)
1068 {
1069  if (!host.IsExpired())
1070  return false;
1071  LogCvmfs(kLogDownload, kLogDebug, "validate DNS entry for %s",
1072  host.name().c_str());
1073 
1074  unsigned group_idx = opt_proxy_groups_current_;
1075  dns::Host new_host = resolver_->Resolve(host.name());
1076 
1077  bool update_only = true; // No changes to the list of IP addresses.
1078  if (new_host.status() != dns::kFailOk) {
1079  // Try again later in case resolving fails.
1081  "failed to resolve IP addresses for %s (%d - %s)",
1082  host.name().c_str(), new_host.status(),
1083  dns::Code2Ascii(new_host.status()));
1084  new_host = dns::Host::ExtendDeadline(host, resolver_->min_ttl());
1085  } else if (!host.IsEquivalent(new_host)) {
1086  update_only = false;
1087  }
1088 
1089  if (update_only) {
1090  for (unsigned i = 0; i < (*opt_proxy_groups_)[group_idx].size(); ++i) {
1091  if ((*opt_proxy_groups_)[group_idx][i].host.id() == host.id())
1092  (*opt_proxy_groups_)[group_idx][i].host = new_host;
1093  }
1094  return false;
1095  }
1096 
1097  assert(new_host.status() == dns::kFailOk);
1098 
1099  // Remove old host objects, insert new objects, and rebalance.
1101  "DNS entries for proxy %s changed, adjusting", host.name().c_str());
1102  vector<ProxyInfo> *group = current_proxy_group();
1103  opt_num_proxies_ -= group->size();
1104  for (unsigned i = 0; i < group->size(); ) {
1105  if ((*group)[i].host.id() == host.id()) {
1106  group->erase(group->begin() + i);
1107  } else {
1108  i++;
1109  }
1110  }
1111  vector<ProxyInfo> new_infos;
1112  set<string> best_addresses = new_host.ViewBestAddresses(opt_ip_preference_);
1113  set<string>::const_iterator iter_ips = best_addresses.begin();
1114  for (; iter_ips != best_addresses.end(); ++iter_ips) {
1115  string url_ip = dns::RewriteUrl(url, *iter_ips);
1116  new_infos.push_back(ProxyInfo(new_host, url_ip));
1117  }
1118  group->insert(group->end(), new_infos.begin(), new_infos.end());
1119  opt_num_proxies_ += new_infos.size();
1120 
1121  RebalanceProxiesUnlocked("DNS change");
1122  return true;
1123 }
1124 
1125 
1129 void DownloadManager::UpdateStatistics(CURL *handle) {
1130  double val;
1131  int retval;
1132  int64_t sum = 0;
1133 
1134  retval = curl_easy_getinfo(handle, CURLINFO_SIZE_DOWNLOAD, &val);
1135  assert(retval == CURLE_OK);
1136  sum += static_cast<int64_t>(val);
1137  /*retval = curl_easy_getinfo(handle, CURLINFO_HEADER_SIZE, &val);
1138  assert(retval == CURLE_OK);
1139  sum += static_cast<int64_t>(val);*/
1140  perf::Xadd(counters_->sz_transferred_bytes, sum);
1141 }
1142 
1143 
1147 bool DownloadManager::CanRetry(const JobInfo *info) {
1148  MutexLockGuard m(lock_options_);
1149  unsigned max_retries = opt_max_retries_;
1150 
1151  return !info->nocache && (info->num_retries < max_retries) &&
1152  (IsProxyTransferError(info->error_code) ||
1154 }
1155 
1163 void DownloadManager::Backoff(JobInfo *info) {
1164  unsigned backoff_init_ms = 0;
1165  unsigned backoff_max_ms = 0;
1166  {
1167  MutexLockGuard m(lock_options_);
1168  backoff_init_ms = opt_backoff_init_ms_;
1169  backoff_max_ms = opt_backoff_max_ms_;
1170  }
1171 
1172  info->num_retries++;
1173  perf::Inc(counters_->n_retries);
1174  if (info->backoff_ms == 0) {
1175  info->backoff_ms = prng_.Next(backoff_init_ms + 1); // Must be != 0
1176  } else {
1177  info->backoff_ms *= 2;
1178  }
1179  if (info->backoff_ms > backoff_max_ms) info->backoff_ms = backoff_max_ms;
1180 
1181  LogCvmfs(kLogDownload, kLogDebug, "backing off for %d ms", info->backoff_ms);
1182  SafeSleepMs(info->backoff_ms);
1183 }
1184 
1185 void DownloadManager::SetNocache(JobInfo *info) {
1186  if (info->nocache)
1187  return;
1188  header_lists_->AppendHeader(info->headers, "Pragma: no-cache");
1189  header_lists_->AppendHeader(info->headers, "Cache-Control: no-cache");
1190  curl_easy_setopt(info->curl_handle, CURLOPT_HTTPHEADER, info->headers);
1191  info->nocache = true;
1192 }
1193 
1194 
1199 void DownloadManager::SetRegularCache(JobInfo *info) {
1200  if (info->nocache == false)
1201  return;
1202  header_lists_->CutHeader("Pragma: no-cache", &(info->headers));
1203  header_lists_->CutHeader("Cache-Control: no-cache", &(info->headers));
1204  curl_easy_setopt(info->curl_handle, CURLOPT_HTTPHEADER, info->headers);
1205  info->nocache = false;
1206 }
1207 
1208 
1212 void DownloadManager::ReleaseCredential(JobInfo *info) {
1213  if (info->cred_data) {
1214  assert(credentials_attachment_ != NULL); // Someone must have set it
1215  credentials_attachment_->ReleaseCurlHandle(info->curl_handle,
1216  info->cred_data);
1217  info->cred_data = NULL;
1218  }
1219 }
1220 
1221 
1228 bool DownloadManager::VerifyAndFinalize(const int curl_error, JobInfo *info) {
1230  "Verify downloaded url %s, proxy %s (curl error %d)",
1231  info->url->c_str(), info->proxy.c_str(), curl_error);
1232  UpdateStatistics(info->curl_handle);
1233 
1234  // Verification and error classification
1235  switch (curl_error) {
1236  case CURLE_OK:
1237  // Verify content hash
1238  if (info->expected_hash) {
1239  shash::Any match_hash;
1240  shash::Final(info->hash_context, &match_hash);
1241  if (match_hash != *(info->expected_hash)) {
1243  "hash verification of %s failed (expected %s, got %s)",
1244  info->url->c_str(), info->expected_hash->ToString().c_str(),
1245  match_hash.ToString().c_str());
1246  info->error_code = kFailBadData;
1247  break;
1248  }
1249  }
1250 
1251  // Decompress memory in a single run
1252  if ((info->destination == kDestinationMem) && info->compressed) {
1253  void *buf;
1254  uint64_t size;
1255  bool retval = zlib::DecompressMem2Mem(
1256  info->destination_mem.data,
1257  static_cast<int64_t>(info->destination_mem.pos),
1258  &buf, &size);
1259  if (retval) {
1260  free(info->destination_mem.data);
1261  info->destination_mem.data = static_cast<char *>(buf);
1262  info->destination_mem.pos = info->destination_mem.size = size;
1263  } else {
1265  "decompression (memory) of url %s failed",
1266  info->url->c_str());
1267  info->error_code = kFailBadData;
1268  break;
1269  }
1270  }
1271 
1272  info->error_code = kFailOk;
1273  break;
1274  case CURLE_UNSUPPORTED_PROTOCOL:
1276  break;
1277  case CURLE_URL_MALFORMAT:
1278  info->error_code = kFailBadUrl;
1279  break;
1280  case CURLE_COULDNT_RESOLVE_PROXY:
1281  info->error_code = kFailProxyResolve;
1282  break;
1283  case CURLE_COULDNT_RESOLVE_HOST:
1284  info->error_code = kFailHostResolve;
1285  break;
1286  case CURLE_OPERATION_TIMEDOUT:
1287  info->error_code = (info->proxy == "DIRECT") ?
1289  break;
1290  case CURLE_PARTIAL_FILE:
1291  case CURLE_GOT_NOTHING:
1292  case CURLE_RECV_ERROR:
1293  info->error_code = (info->proxy == "DIRECT") ?
1295  break;
1296  case CURLE_FILE_COULDNT_READ_FILE:
1297  case CURLE_COULDNT_CONNECT:
1298  if (info->proxy != "DIRECT") {
1299  // This is a guess. Fail-over can still change to switching host
1301  } else {
1303  }
1304  break;
1305  case CURLE_TOO_MANY_REDIRECTS:
1307  break;
1308  case CURLE_SSL_CACERT_BADFILE:
1310  "Failed to load certificate bundle. "
1311  "X509_CERT_BUNDLE might point to the wrong location.");
1313  break;
1314  // As of curl 7.62.0, CURLE_SSL_CACERT is the same as
1315  // CURLE_PEER_FAILED_VERIFICATION
1316  case CURLE_PEER_FAILED_VERIFICATION:
1318  "invalid SSL certificate of remote host. "
1319  "X509_CERT_DIR and/or X509_CERT_BUNDLE might point to the wrong "
1320  "location.");
1322  break;
1323  case CURLE_ABORTED_BY_CALLBACK:
1324  case CURLE_WRITE_ERROR:
1325  // Error set by callback
1326  break;
1327  default:
1328  LogCvmfs(kLogDownload, kLogSyslogErr, "unexpected curl error (%d) while "
1329  "trying to fetch %s", curl_error, info->url->c_str());
1330  info->error_code = kFailOther;
1331  break;
1332  }
1333 
1334  std::vector<std::string> *host_chain = opt_host_chain_;
1335 
1336  // Determination if download should be repeated
1337  bool try_again = false;
1338  bool same_url_retry = CanRetry(info);
1339  if (info->error_code != kFailOk) {
1340  MutexLockGuard m(lock_options_);
1341  if (info->error_code == kFailBadData) {
1342  if (!info->nocache) {
1343  try_again = true;
1344  } else {
1345  // Make it a host failure
1347  "data corruption with no-cache header, try another host");
1348 
1349  info->error_code = kFailHostHttp;
1350  }
1351  }
1352  if ( same_url_retry || (
1353  ( (info->error_code == kFailHostResolve) ||
1355  (info->error_code == kFailHostHttp)) &&
1356  info->probe_hosts &&
1357  host_chain && (info->num_used_hosts < host_chain->size()))
1358  )
1359  {
1360  try_again = true;
1361  }
1362  if ( same_url_retry || (
1363  ( (info->error_code == kFailProxyResolve) ||
1365  (info->error_code == kFailProxyHttp)) )
1366  )
1367  {
1368  try_again = true;
1369  // If all proxies failed, do a next round with the next host
1370  if (!same_url_retry && (info->num_used_proxies >= opt_num_proxies_)) {
1371  // Check if this can be made a host fail-over
1372  if (info->probe_hosts &&
1373  host_chain &&
1374  (info->num_used_hosts < host_chain->size()))
1375  {
1376  // reset proxy group if not already performed by other handle
1377  if (opt_proxy_groups_) {
1378  if ((opt_proxy_groups_current_ > 0) ||
1379  (opt_proxy_groups_current_burned_ > 0))
1380  {
1381  opt_proxy_groups_current_ = 0;
1382  opt_timestamp_backup_proxies_ = 0;
1383  RebalanceProxiesUnlocked("reset proxies for host failover");
1384  }
1385  }
1386 
1387  // Make it a host failure
1388  LogCvmfs(kLogDownload, kLogDebug, "make it a host failure");
1389  info->num_used_proxies = 1;
1391  } else {
1392  try_again = false;
1393  }
1394  } // Make a proxy failure a host failure
1395  } // Proxy failure assumed
1396  }
1397 
1398  if (try_again) {
1399  LogCvmfs(kLogDownload, kLogDebug, "Trying again on same curl handle, "
1400  "same url: %d, error code %d", same_url_retry, info->error_code);
1401  // Reset internal state and destination
1402  if ((info->destination == kDestinationMem) && info->destination_mem.data) {
1403  free(info->destination_mem.data);
1404  info->destination_mem.data = NULL;
1405  info->destination_mem.size = 0;
1406  info->destination_mem.pos = 0;
1407  }
1408  if (info->interrupt_cue && info->interrupt_cue->IsCanceled()) {
1409  info->error_code = kFailCanceled;
1410  goto verify_and_finalize_stop;
1411  }
1412  if ((info->destination == kDestinationFile) ||
1413  (info->destination == kDestinationPath))
1414  {
1415  if ((fflush(info->destination_file) != 0) ||
1416  (ftruncate(fileno(info->destination_file), 0) != 0))
1417  {
1418  info->error_code = kFailLocalIO;
1419  goto verify_and_finalize_stop;
1420  }
1421  rewind(info->destination_file);
1422  }
1423  if (info->destination == kDestinationSink) {
1424  if (info->destination_sink->Reset() != 0) {
1425  info->error_code = kFailLocalIO;
1426  goto verify_and_finalize_stop;
1427  }
1428  }
1429  if (info->expected_hash)
1430  shash::Init(info->hash_context);
1431  if (info->compressed)
1432  zlib::DecompressInit(&info->zstream);
1433  SetRegularCache(info);
1434 
1435  // Failure handling
1436  bool switch_proxy = false;
1437  bool switch_host = false;
1438  switch (info->error_code) {
1439  case kFailBadData:
1440  SetNocache(info);
1441  break;
1442  case kFailProxyResolve:
1443  case kFailProxyHttp:
1444  switch_proxy = true;
1445  break;
1446  case kFailHostResolve:
1447  case kFailHostHttp:
1448  case kFailHostAfterProxy:
1449  switch_host = true;
1450  break;
1451  default:
1452  if (IsProxyTransferError(info->error_code)) {
1453  if (same_url_retry) {
1454  Backoff(info);
1455  } else {
1456  switch_proxy = true;
1457  }
1458  } else if (IsHostTransferError(info->error_code)) {
1459  if (same_url_retry) {
1460  Backoff(info);
1461  } else {
1462  switch_host = true;
1463  }
1464  } else {
1465  // No other errors expected when retrying
1466  PANIC(NULL);
1467  }
1468  }
1469  if (switch_proxy) {
1470  ReleaseCredential(info);
1471  SwitchProxy(info);
1472  info->num_used_proxies++;
1473  SetUrlOptions(info);
1474  }
1475  if (switch_host) {
1476  ReleaseCredential(info);
1477  SwitchHost(info);
1478  info->num_used_hosts++;
1479  SetUrlOptions(info);
1480  }
1481 
1482  return true; // try again
1483  }
1484 
1485  verify_and_finalize_stop:
1486  // Finalize, flush destination file
1487  ReleaseCredential(info);
1488  if ((info->destination == kDestinationFile) &&
1489  fflush(info->destination_file) != 0)
1490  {
1491  info->error_code = kFailLocalIO;
1492  } else if (info->destination == kDestinationPath) {
1493  if (fclose(info->destination_file) != 0)
1494  info->error_code = kFailLocalIO;
1495  info->destination_file = NULL;
1496  }
1497 
1498  if (info->compressed)
1499  zlib::DecompressFini(&info->zstream);
1500 
1501  if (info->headers) {
1502  header_lists_->PutList(info->headers);
1503  info->headers = NULL;
1504  }
1505 
1506  return false; // stop transfer and return to Fetch()
1507 }
1508 
1509 
1510 DownloadManager::DownloadManager() {
1511  pool_handles_idle_ = NULL;
1512  pool_handles_inuse_ = NULL;
1513  pool_max_handles_ = 0;
1514  curl_multi_ = NULL;
1515  default_headers_ = NULL;
1516 
1517  atomic_init32(&multi_threaded_);
1518  pipe_terminate_[0] = pipe_terminate_[1] = -1;
1519 
1520  pipe_jobs_[0] = pipe_jobs_[1] = -1;
1521  watch_fds_ = NULL;
1522  watch_fds_size_ = 0;
1523  watch_fds_inuse_ = 0;
1524  watch_fds_max_ = 0;
1525 
1526  lock_options_ =
1527  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1528  int retval = pthread_mutex_init(lock_options_, NULL);
1529  assert(retval == 0);
1530  lock_synchronous_mode_ =
1531  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1532  retval = pthread_mutex_init(lock_synchronous_mode_, NULL);
1533  assert(retval == 0);
1534 
1535  opt_dns_server_ = "";
1536  opt_ip_preference_ = dns::kIpPreferSystem;
1537  opt_timeout_proxy_ = 0;
1538  opt_timeout_direct_ = 0;
1539  opt_low_speed_limit_ = 0;
1540  opt_host_chain_ = NULL;
1541  opt_host_chain_rtt_ = NULL;
1542  opt_host_chain_current_ = 0;
1543  opt_proxy_groups_ = NULL;
1544  opt_proxy_groups_current_ = 0;
1545  opt_proxy_groups_current_burned_ = 0;
1546  opt_num_proxies_ = 0;
1547  opt_proxy_shard_ = false;
1548  opt_max_retries_ = 0;
1549  opt_backoff_init_ms_ = 0;
1550  opt_backoff_max_ms_ = 0;
1551  enable_info_header_ = false;
1552  opt_ipv4_only_ = false;
1553  follow_redirects_ = false;
1554 
1555  resolver_ = NULL;
1556 
1557  opt_timestamp_backup_proxies_ = 0;
1558  opt_timestamp_failover_proxies_ = 0;
1559  opt_proxy_groups_reset_after_ = 0;
1560  opt_timestamp_backup_host_ = 0;
1561  opt_host_reset_after_ = 0;
1562 
1563  credentials_attachment_ = NULL;
1564 
1565  counters_ = NULL;
1566 }
1567 
1568 
1569 DownloadManager::~DownloadManager() {
1570  pthread_mutex_destroy(lock_options_);
1571  pthread_mutex_destroy(lock_synchronous_mode_);
1572  free(lock_options_);
1573  free(lock_synchronous_mode_);
1574 }
1575 
1576 void DownloadManager::InitHeaders() {
1577  // User-Agent
1578  string cernvm_id = "User-Agent: cvmfs ";
1579 #ifdef CVMFS_LIBCVMFS
1580  cernvm_id += "libcvmfs ";
1581 #else
1582  cernvm_id += "Fuse ";
1583 #endif
1584  cernvm_id += string(VERSION);
1585  if (getenv("CERNVM_UUID") != NULL) {
1586  cernvm_id += " " +
1587  sanitizer::InputSanitizer("az AZ 09 -").Filter(getenv("CERNVM_UUID"));
1588  }
1589  user_agent_ = strdup(cernvm_id.c_str());
1590 
1591  header_lists_ = new HeaderLists();
1592 
1593  default_headers_ = header_lists_->GetList("Connection: Keep-Alive");
1594  header_lists_->AppendHeader(default_headers_, "Pragma:");
1595  header_lists_->AppendHeader(default_headers_, user_agent_);
1596 }
1597 
1598 
1599 void DownloadManager::FiniHeaders() {
1600  delete header_lists_;
1601  header_lists_ = NULL;
1602  default_headers_ = NULL;
1603 }
1604 
1605 
1606 void DownloadManager::Init(const unsigned max_pool_handles,
1607  const perf::StatisticsTemplate &statistics)
1608 {
1609  atomic_init32(&multi_threaded_);
1610  int retval = curl_global_init(CURL_GLOBAL_ALL);
1611  assert(retval == CURLE_OK);
1612  pool_handles_idle_ = new set<CURL *>;
1613  pool_handles_inuse_ = new set<CURL *>;
1614  pool_max_handles_ = max_pool_handles;
1615  watch_fds_max_ = 4*pool_max_handles_;
1616 
1617  opt_timeout_proxy_ = 5;
1618  opt_timeout_direct_ = 10;
1619  opt_low_speed_limit_ = 1024;
1620  opt_proxy_groups_current_ = 0;
1621  opt_proxy_groups_current_burned_ = 0;
1622  opt_num_proxies_ = 0;
1623  opt_proxy_shard_ = false;
1624  opt_host_chain_current_ = 0;
1625  opt_ip_preference_ = dns::kIpPreferSystem;
1626 
1627  counters_ = new Counters(statistics);
1628 
1629  user_agent_ = NULL;
1630  InitHeaders();
1631 
1632  curl_multi_ = curl_multi_init();
1633  assert(curl_multi_ != NULL);
1634  curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION, CallbackCurlSocket);
1635  curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1636  static_cast<void *>(this));
1637  curl_multi_setopt(curl_multi_, CURLMOPT_MAXCONNECTS, watch_fds_max_);
1638  curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1639  pool_max_handles_);
1640 
1641  prng_.InitLocaltime();
1642 
1643  // Name resolving
1644  if ((getenv("CVMFS_IPV4_ONLY") != NULL) &&
1645  (strlen(getenv("CVMFS_IPV4_ONLY")) > 0))
1646  {
1647  opt_ipv4_only_ = true;
1648  }
1649  resolver_ = dns::NormalResolver::Create(opt_ipv4_only_,
1650  kDnsDefaultRetries, kDnsDefaultTimeoutMs);
1651  assert(resolver_);
1652 }
1653 
1654 
1656  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1657  // Shutdown I/O thread
1658  char buf = 'T';
1659  WritePipe(pipe_terminate_[1], &buf, 1);
1660  pthread_join(thread_download_, NULL);
1661  // All handles are removed from the multi stack
1662  close(pipe_terminate_[1]);
1663  close(pipe_terminate_[0]);
1664  close(pipe_jobs_[1]);
1665  close(pipe_jobs_[0]);
1666  }
1667 
1668  for (set<CURL *>::iterator i = pool_handles_idle_->begin(),
1669  iEnd = pool_handles_idle_->end(); i != iEnd; ++i)
1670  {
1671  curl_easy_cleanup(*i);
1672  }
1673  delete pool_handles_idle_;
1674  delete pool_handles_inuse_;
1675  curl_multi_cleanup(curl_multi_);
1676  pool_handles_idle_ = NULL;
1677  pool_handles_inuse_ = NULL;
1678  curl_multi_ = NULL;
1679 
1680  FiniHeaders();
1681  if (user_agent_)
1682  free(user_agent_);
1683  user_agent_ = NULL;
1684 
1685  delete counters_;
1686  counters_ = NULL;
1687 
1688  delete opt_host_chain_;
1689  delete opt_host_chain_rtt_;
1690  opt_proxy_map_.clear();
1691  delete opt_proxy_groups_;
1692  opt_host_chain_ = NULL;
1693  opt_host_chain_rtt_ = NULL;
1694  opt_proxy_groups_ = NULL;
1695 
1696  curl_global_cleanup();
1697 
1698  delete resolver_;
1699  resolver_ = NULL;
1700 }
1701 
1702 
1708  MakePipe(pipe_terminate_);
1709  MakePipe(pipe_jobs_);
1710 
1711  int retval = pthread_create(&thread_download_, NULL, MainDownload,
1712  static_cast<void *>(this));
1713  assert(retval == 0);
1714 
1715  atomic_inc32(&multi_threaded_);
1716 }
1717 
1718 
1723  assert(info != NULL);
1724  assert(info->url != NULL);
1725 
1726  Failures result;
1727  result = PrepareDownloadDestination(info);
1728  if (result != kFailOk)
1729  return result;
1730 
1731  if (info->expected_hash) {
1734  info->hash_context.size = shash::GetContextSize(algorithm);
1735  info->hash_context.buffer = alloca(info->hash_context.size);
1736  }
1737 
1738  // Prepare cvmfs-info: header, allocate string on the stack
1739  info->info_header = NULL;
1740  if (enable_info_header_ && info->extra_info) {
1741  const char *header_name = "cvmfs-info: ";
1742  const size_t header_name_len = strlen(header_name);
1743  const unsigned header_size = 1 + header_name_len +
1744  EscapeHeader(*(info->extra_info), NULL, 0);
1745  info->info_header = static_cast<char *>(alloca(header_size));
1746  memcpy(info->info_header, header_name, header_name_len);
1747  EscapeHeader(*(info->extra_info), info->info_header + header_name_len,
1748  header_size - header_name_len);
1749  info->info_header[header_size-1] = '\0';
1750  }
1751 
1752  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1753  if (info->wait_at[0] == -1) {
1754  MakePipe(info->wait_at);
1755  }
1756 
1757  // LogCvmfs(kLogDownload, kLogDebug, "send job to thread, pipe %d %d",
1758  // info->wait_at[0], info->wait_at[1]);
1759  // NOLINTNEXTLINE(bugprone-sizeof-expression)
1760  WritePipe(pipe_jobs_[1], &info, sizeof(info));
1761  ReadPipe(info->wait_at[0], &result, sizeof(result));
1762  // LogCvmfs(kLogDownload, kLogDebug, "got result %d", result);
1763  } else {
1764  MutexLockGuard l(lock_synchronous_mode_);
1765  CURL *handle = AcquireCurlHandle();
1766  InitializeRequest(info, handle);
1767  SetUrlOptions(info);
1768  // curl_easy_setopt(handle, CURLOPT_VERBOSE, 1);
1769  int retval;
1770  do {
1771  retval = curl_easy_perform(handle);
1772  perf::Inc(counters_->n_requests);
1773  double elapsed;
1774  if (curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &elapsed) == CURLE_OK)
1775  {
1776  perf::Xadd(counters_->sz_transfer_time,
1777  static_cast<int64_t>(elapsed * 1000));
1778  }
1779  } while (VerifyAndFinalize(retval, info));
1780  result = info->error_code;
1781  ReleaseCurlHandle(info->curl_handle);
1782  }
1783 
1784  if (result != kFailOk) {
1785  LogCvmfs(kLogDownload, kLogDebug, "download failed (error %d - %s)", result,
1786  Code2Ascii(result));
1787 
1788  if (info->destination == kDestinationPath)
1789  unlink(info->destination_path->c_str());
1790 
1791  if (info->destination_mem.data) {
1792  free(info->destination_mem.data);
1793  info->destination_mem.data = NULL;
1794  info->destination_mem.size = 0;
1795  }
1796  }
1797 
1798  return result;
1799 }
1800 
1801 
1806 void DownloadManager::SetCredentialsAttachment(CredentialsAttachment *ca) {
1807  MutexLockGuard m(lock_options_);
1808  credentials_attachment_ = ca;
1809 }
1810 
1814 std::string DownloadManager::GetDnsServer() const {
1815  return opt_dns_server_;
1816 }
1817 
1822 void DownloadManager::SetDnsServer(const string &address) {
1823  if (!address.empty()) {
1824  MutexLockGuard m(lock_options_);
1825  opt_dns_server_ = address;
1826  assert(!opt_dns_server_.empty());
1827 
1828  vector<string> servers;
1829  servers.push_back(address);
1830  bool retval = resolver_->SetResolvers(servers);
1831  assert(retval);
1832  }
1833  LogCvmfs(kLogDownload, kLogSyslog, "set nameserver to %s", address.c_str());
1834 }
1835 
1836 
1840 void DownloadManager::SetDnsParameters(
1841  const unsigned retries,
1842  const unsigned timeout_ms)
1843 {
1844  MutexLockGuard m(lock_options_);
1845  if ((resolver_->retries() == retries) &&
1846  (resolver_->timeout_ms() == timeout_ms))
1847  {
1848  return;
1849  }
1850  delete resolver_;
1851  resolver_ = NULL;
1852  resolver_ =
1853  dns::NormalResolver::Create(opt_ipv4_only_, retries, timeout_ms);
1854  assert(resolver_);
1855 }
1856 
1857 
1858 void DownloadManager::SetDnsTtlLimits(
1859  const unsigned min_seconds,
1860  const unsigned max_seconds)
1861 {
1862  MutexLockGuard m(lock_options_);
1863  resolver_->set_min_ttl(min_seconds);
1864  resolver_->set_max_ttl(max_seconds);
1865 }
1866 
1867 
1868 void DownloadManager::SetIpPreference(dns::IpPreference preference) {
1869  MutexLockGuard m(lock_options_);
1870  opt_ip_preference_ = preference;
1871 }
1872 
1873 
1879 void DownloadManager::SetTimeout(const unsigned seconds_proxy,
1880  const unsigned seconds_direct)
1881 {
1882  MutexLockGuard m(lock_options_);
1883  opt_timeout_proxy_ = seconds_proxy;
1884  opt_timeout_direct_ = seconds_direct;
1885 }
1886 
1887 
1893 void DownloadManager::SetLowSpeedLimit(const unsigned low_speed_limit) {
1894  MutexLockGuard m(lock_options_);
1895  opt_low_speed_limit_ = low_speed_limit;
1896 }
1897 
1898 
1902 void DownloadManager::GetTimeout(unsigned *seconds_proxy,
1903  unsigned *seconds_direct)
1904 {
1905  MutexLockGuard m(lock_options_);
1906  *seconds_proxy = opt_timeout_proxy_;
1907  *seconds_direct = opt_timeout_direct_;
1908 }
1909 
1910 
1915 void DownloadManager::SetHostChain(const string &host_list) {
1916  SetHostChain(SplitString(host_list, ';'));
1917 }
1918 
1919 
1920 void DownloadManager::SetHostChain(const std::vector<std::string> &host_list) {
1921  MutexLockGuard m(lock_options_);
1922  opt_timestamp_backup_host_ = 0;
1923  delete opt_host_chain_;
1924  delete opt_host_chain_rtt_;
1925  opt_host_chain_current_ = 0;
1926 
1927  if (host_list.empty()) {
1928  opt_host_chain_ = NULL;
1929  opt_host_chain_rtt_ = NULL;
1930  return;
1931  }
1932 
1933  opt_host_chain_ = new vector<string>(host_list);
1934  opt_host_chain_rtt_ =
1935  new vector<int>(opt_host_chain_->size(), kProbeUnprobed);
1936  // LogCvmfs(kLogDownload, kLogSyslog, "using host %s",
1937  // (*opt_host_chain_)[0].c_str());
1938 }
1939 
1940 
1941 
1946 void DownloadManager::GetHostInfo(vector<string> *host_chain, vector<int> *rtt,
1947  unsigned *current_host)
1948 {
1949  MutexLockGuard m(lock_options_);
1950  if (opt_host_chain_) {
1951  if (current_host) {*current_host = opt_host_chain_current_;}
1952  if (host_chain) {*host_chain = *opt_host_chain_;}
1953  if (rtt) {*rtt = *opt_host_chain_rtt_;}
1954  }
1955 }
1956 
1957 
1966 void DownloadManager::SwitchProxy(JobInfo *info) {
1967  MutexLockGuard m(lock_options_);
1968 
1969  if (!opt_proxy_groups_) {
1970  return;
1971  }
1972 
1973  // Fail any matching proxies within the current load-balancing group
1974  vector<ProxyInfo> *group = current_proxy_group();
1975  const unsigned group_size = group->size();
1976  unsigned failed = 0;
1977  for (unsigned i = 0; i < group_size - opt_proxy_groups_current_burned_; ++i) {
1978  if (info && (info->proxy == (*group)[i].url)) {
1979  // Move to list of failed proxies
1980  opt_proxy_groups_current_burned_++;
1981  swap((*group)[i],
1982  (*group)[group_size - opt_proxy_groups_current_burned_]);
1983  perf::Inc(counters_->n_proxy_failover);
1984  failed++;
1985  }
1986  }
1987 
1988  // Do nothing more unless at least one proxy was marked as failed
1989  if (!failed)
1990  return;
1991 
1992  // If all proxies from the current load-balancing group are burned, switch to
1993  // another group
1994  if (opt_proxy_groups_current_burned_ == group->size()) {
1995  opt_proxy_groups_current_burned_ = 0;
1996  if (opt_proxy_groups_->size() > 1) {
1997  opt_proxy_groups_current_ = (opt_proxy_groups_current_ + 1) %
1998  opt_proxy_groups_->size();
1999  // Remeber the timestamp of switching to backup proxies
2000  if (opt_proxy_groups_reset_after_ > 0) {
2001  if (opt_proxy_groups_current_ > 0) {
2002  if (opt_timestamp_backup_proxies_ == 0)
2003  opt_timestamp_backup_proxies_ = time(NULL);
2004  // LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2005  // "switched to (another) backup proxy group");
2006  } else {
2007  opt_timestamp_backup_proxies_ = 0;
2008  // LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2009  // "switched back to primary proxy group");
2010  }
2011  opt_timestamp_failover_proxies_ = 0;
2012  }
2013  }
2014  } else {
2015  // Record failover time
2016  if (opt_proxy_groups_reset_after_ > 0) {
2017  if (opt_timestamp_failover_proxies_ == 0)
2018  opt_timestamp_failover_proxies_ = time(NULL);
2019  }
2020  }
2021 
2022  UpdateProxiesUnlocked("failed proxy");
2023  LogCvmfs(kLogDownload, kLogDebug, "%d proxies remain in group",
2024  current_proxy_group()->size() - opt_proxy_groups_current_burned_);
2025 }
2026 
2027 
2033 void DownloadManager::SwitchHost(JobInfo *info) {
2034  MutexLockGuard m(lock_options_);
2035 
2036  if (!opt_host_chain_ || (opt_host_chain_->size() == 1)) {
2037  return;
2038  }
2039 
2040  if (info && (info->current_host_chain_index != opt_host_chain_current_)) {
2042  "don't switch host, "
2043  "last used host: %s, current host: %s",
2044  (*opt_host_chain_)[info->current_host_chain_index].c_str(),
2045  (*opt_host_chain_)[opt_host_chain_current_].c_str());
2046  return;
2047  }
2048 
2049  string reason = "manually triggered";
2050  if (info) {
2051  reason = download::Code2Ascii(info->error_code);
2052  }
2053 
2054  string old_host = (*opt_host_chain_)[opt_host_chain_current_];
2055  opt_host_chain_current_ =
2056  (opt_host_chain_current_ + 1) % opt_host_chain_->size();
2057  perf::Inc(counters_->n_host_failover);
2059  "switching host from %s to %s (%s)", old_host.c_str(),
2060  (*opt_host_chain_)[opt_host_chain_current_].c_str(),
2061  reason.c_str());
2062 
2063  // Remember the timestamp of switching to backup host
2064  if (opt_host_reset_after_ > 0) {
2065  if (opt_host_chain_current_ != 0) {
2066  if (opt_timestamp_backup_host_ == 0)
2067  opt_timestamp_backup_host_ = time(NULL);
2068  } else {
2069  opt_timestamp_backup_host_ = 0;
2070  }
2071  }
2072 }
2073 
2074 void DownloadManager::SwitchHost() {
2075  SwitchHost(NULL);
2076 }
2077 
2078 
2085 void DownloadManager::ProbeHosts() {
2086  vector<string> host_chain;
2087  vector<int> host_rtt;
2088  unsigned current_host;
2089 
2090  GetHostInfo(&host_chain, &host_rtt, &current_host);
2091 
2092  // Stopwatch, two times to fill caches first
2093  unsigned i, retries;
2094  string url;
2095  JobInfo info(&url, false, false, NULL);
2096  for (retries = 0; retries < 2; ++retries) {
2097  for (i = 0; i < host_chain.size(); ++i) {
2098  url = host_chain[i] + "/.cvmfspublished";
2099 
2100  struct timeval tv_start, tv_end;
2101  gettimeofday(&tv_start, NULL);
2102  Failures result = Fetch(&info);
2103  gettimeofday(&tv_end, NULL);
2104  if (info.destination_mem.data)
2105  free(info.destination_mem.data);
2106  if (result == kFailOk) {
2107  host_rtt[i] = static_cast<int>(
2108  DiffTimeSeconds(tv_start, tv_end) * 1000);
2109  LogCvmfs(kLogDownload, kLogDebug, "probing host %s had %dms rtt",
2110  url.c_str(), host_rtt[i]);
2111  } else {
2112  LogCvmfs(kLogDownload, kLogDebug, "error while probing host %s: %d %s",
2113  url.c_str(), result, Code2Ascii(result));
2114  host_rtt[i] = INT_MAX;
2115  }
2116  }
2117  }
2118 
2119  SortTeam(&host_rtt, &host_chain);
2120  for (i = 0; i < host_chain.size(); ++i) {
2121  if (host_rtt[i] == INT_MAX) host_rtt[i] = kProbeDown;
2122  }
2123 
2124  MutexLockGuard m(lock_options_);
2125  delete opt_host_chain_;
2126  delete opt_host_chain_rtt_;
2127  opt_host_chain_ = new vector<string>(host_chain);
2128  opt_host_chain_rtt_ = new vector<int>(host_rtt);
2129  opt_host_chain_current_ = 0;
2130 }
2131 
2132 bool DownloadManager::GeoSortServers(std::vector<std::string> *servers,
2133  std::vector<uint64_t> *output_order) {
2134  if (!servers) {return false;}
2135  if (servers->size() == 1) {
2136  if (output_order) {
2137  output_order->clear();
2138  output_order->push_back(0);
2139  }
2140  return true;
2141  }
2142 
2143  std::vector<std::string> host_chain;
2144  GetHostInfo(&host_chain, NULL, NULL);
2145 
2146  std::vector<std::string> server_dns_names;
2147  server_dns_names.reserve(servers->size());
2148  for (unsigned i = 0; i < servers->size(); ++i) {
2149  std::string host = dns::ExtractHost((*servers)[i]);
2150  server_dns_names.push_back(host.empty() ? (*servers)[i] : host);
2151  }
2152  std::string host_list = JoinStrings(server_dns_names, ",");
2153 
2154  vector<string> host_chain_shuffled;
2155  {
2156  // Protect against concurrent access to prng_
2157  MutexLockGuard m(lock_options_);
2158  // Determine random hosts for the Geo-API query
2159  host_chain_shuffled = Shuffle(host_chain, &prng_);
2160  }
2161  // Request ordered list via Geo-API
2162  bool success = false;
2163  unsigned max_attempts = std::min(host_chain_shuffled.size(), size_t(3));
2164  vector<uint64_t> geo_order(servers->size());
2165  for (unsigned i = 0; i < max_attempts; ++i) {
2166  string url = host_chain_shuffled[i] + "/api/v1.0/geo/@proxy@/" + host_list;
2168  "requesting ordered server list from %s", url.c_str());
2169  JobInfo info(&url, false, false, NULL);
2170  Failures result = Fetch(&info);
2171  if (result == kFailOk) {
2172  string order(info.destination_mem.data, info.destination_mem.size);
2173  free(info.destination_mem.data);
2174  bool retval = ValidateGeoReply(order, servers->size(), &geo_order);
2175  if (!retval) {
2177  "retrieved invalid GeoAPI reply from %s [%s]",
2178  url.c_str(), order.c_str());
2179  } else {
2181  "geographic order of servers retrieved from %s",
2182  dns::ExtractHost(host_chain_shuffled[i]).c_str());
2183  LogCvmfs(kLogDownload, kLogDebug, "order is %s", order.c_str());
2184  success = true;
2185  break;
2186  }
2187  } else {
2189  "GeoAPI request %s failed with error %d [%s]",
2190  url.c_str(), result, Code2Ascii(result));
2191  }
2192  }
2193  if (!success) {
2195  "failed to retrieve geographic order from stratum 1 servers");
2196  return false;
2197  }
2198 
2199  if (output_order) {
2200  output_order->swap(geo_order);
2201  } else {
2202  std::vector<std::string> sorted_servers;
2203  sorted_servers.reserve(geo_order.size());
2204  for (unsigned i = 0; i < geo_order.size(); ++i) {
2205  uint64_t orderval = geo_order[i];
2206  sorted_servers.push_back((*servers)[orderval]);
2207  }
2208  servers->swap(sorted_servers);
2209  }
2210  return true;
2211 }
2212 
2213 
2221 bool DownloadManager::ProbeGeo() {
2222  vector<string> host_chain;
2223  vector<int> host_rtt;
2224  unsigned current_host;
2225  vector< vector<ProxyInfo> > proxy_chain;
2226  unsigned fallback_group;
2227 
2228  GetHostInfo(&host_chain, &host_rtt, &current_host);
2229  GetProxyInfo(&proxy_chain, NULL, &fallback_group);
2230  if ((host_chain.size() < 2) && ((proxy_chain.size() - fallback_group) < 2))
2231  return true;
2232 
2233  vector<string> host_names;
2234  for (unsigned i = 0; i < host_chain.size(); ++i)
2235  host_names.push_back(dns::ExtractHost(host_chain[i]));
2236  SortTeam(&host_names, &host_chain);
2237  unsigned last_geo_host = host_names.size();
2238 
2239  if ((fallback_group == 0) && (last_geo_host > 1)) {
2240  // There are no non-fallback proxies, which means that the client
2241  // will always use the fallback proxies. Add a keyword separator
2242  // between the hosts and fallback proxies so the geosorting service
2243  // will know to sort the hosts based on the distance from the
2244  // closest fallback proxy rather than the distance from the client.
2245  host_names.push_back("+PXYSEP+");
2246  }
2247 
2248  // Add fallback proxy names to the end of the host list
2249  unsigned first_geo_fallback = host_names.size();
2250  for (unsigned i = fallback_group; i < proxy_chain.size(); ++i) {
2251  // We only take the first fallback proxy name from every group under the
2252  // assumption that load-balanced servers are at the same location
2253  host_names.push_back(proxy_chain[i][0].host.name());
2254  }
2255 
2256  std::vector<uint64_t> geo_order;
2257  bool success = GeoSortServers(&host_names, &geo_order);
2258  if (!success) {
2259  // GeoSortServers already logged a failure message.
2260  return false;
2261  }
2262 
2263  // Re-install host chain and proxy chain
2264  MutexLockGuard m(lock_options_);
2265  delete opt_host_chain_;
2266  opt_num_proxies_ = 0;
2267  opt_host_chain_ = new vector<string>(host_chain.size());
2268 
2269  // It's possible that opt_proxy_groups_fallback_ might have changed while
2270  // the lock wasn't held
2271  vector<vector<ProxyInfo> > *proxy_groups = new vector<vector<ProxyInfo> >(
2272  opt_proxy_groups_fallback_ + proxy_chain.size() - fallback_group);
2273  // First copy the non-fallback part of the current proxy chain
2274  for (unsigned i = 0; i < opt_proxy_groups_fallback_; ++i) {
2275  (*proxy_groups)[i] = (*opt_proxy_groups_)[i];
2276  opt_num_proxies_ += (*opt_proxy_groups_)[i].size();
2277  }
2278 
2279  // Copy the host chain and fallback proxies by geo order. Array indices
2280  // in geo_order that are smaller than last_geo_host refer to a stratum 1,
2281  // and those indices greater than or equal to first_geo_fallback refer to
2282  // a fallback proxy.
2283  unsigned hosti = 0;
2284  unsigned proxyi = opt_proxy_groups_fallback_;
2285  for (unsigned i = 0; i < geo_order.size(); ++i) {
2286  uint64_t orderval = geo_order[i];
2287  if (orderval < static_cast<uint64_t>(last_geo_host)) {
2288  // LogCvmfs(kLogCvmfs, kLogSyslog, "this is orderval %u at host index
2289  // %u", orderval, hosti);
2290  (*opt_host_chain_)[hosti++] = host_chain[orderval];
2291  } else if (orderval >= static_cast<uint64_t>(first_geo_fallback)) {
2292  // LogCvmfs(kLogCvmfs, kLogSyslog,
2293  // "this is orderval %u at proxy index %u, using proxy_chain index %u",
2294  // orderval, proxyi, fallback_group + orderval - first_geo_fallback);
2295  (*proxy_groups)[proxyi] =
2296  proxy_chain[fallback_group + orderval - first_geo_fallback];
2297  opt_num_proxies_ += (*proxy_groups)[proxyi].size();
2298  proxyi++;
2299  }
2300  }
2301 
2302  opt_proxy_map_.clear();
2303  delete opt_proxy_groups_;
2304  opt_proxy_groups_ = proxy_groups;
2305  // In pathological cases, opt_proxy_groups_current_ can be larger now when
2306  // proxies changed in-between.
2307  if (opt_proxy_groups_current_ > opt_proxy_groups_->size()) {
2308  if (opt_proxy_groups_->size() == 0) {
2309  opt_proxy_groups_current_ = 0;
2310  } else {
2311  opt_proxy_groups_current_ = opt_proxy_groups_->size() - 1;
2312  }
2313  opt_proxy_groups_current_burned_ = 0;
2314  }
2315 
2316  UpdateProxiesUnlocked("geosort");
2317 
2318  delete opt_host_chain_rtt_;
2319  opt_host_chain_rtt_ = new vector<int>(host_chain.size(), kProbeGeo);
2320  opt_host_chain_current_ = 0;
2321 
2322  return true;
2323 }
2324 
2325 
2333 bool DownloadManager::ValidateGeoReply(
2334  const string &reply_order,
2335  const unsigned expected_size,
2336  vector<uint64_t> *reply_vals)
2337 {
2338  if (reply_order.empty())
2339  return false;
2340  sanitizer::InputSanitizer sanitizer("09 , \n");
2341  if (!sanitizer.IsValid(reply_order))
2342  return false;
2343  sanitizer::InputSanitizer strip_newline("09 ,");
2344  vector<string> reply_strings =
2345  SplitString(strip_newline.Filter(reply_order), ',');
2346  vector<uint64_t> tmp_vals;
2347  for (unsigned i = 0; i < reply_strings.size(); ++i) {
2348  if (reply_strings[i].empty())
2349  return false;
2350  tmp_vals.push_back(String2Uint64(reply_strings[i]));
2351  }
2352  if (tmp_vals.size() != expected_size)
2353  return false;
2354 
2355  // Check if tmp_vals contains the number 1..n
2356  set<uint64_t> coverage(tmp_vals.begin(), tmp_vals.end());
2357  if (coverage.size() != tmp_vals.size())
2358  return false;
2359  if ((*coverage.begin() != 1) || (*coverage.rbegin() != coverage.size()))
2360  return false;
2361 
2362  for (unsigned i = 0; i < expected_size; ++i) {
2363  (*reply_vals)[i] = tmp_vals[i] - 1;
2364  }
2365  return true;
2366 }
2367 
2368 
2373 bool DownloadManager::StripDirect(
2374  const string &proxy_list,
2375  string *cleaned_list)
2376 {
2377  assert(cleaned_list);
2378  if (proxy_list == "") {
2379  *cleaned_list = "";
2380  return false;
2381  }
2382  bool result = false;
2383 
2384  vector<string> proxy_groups = SplitString(proxy_list, ';');
2385  vector<string> cleaned_groups;
2386  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2387  vector<string> group = SplitString(proxy_groups[i], '|');
2388  vector<string> cleaned;
2389  for (unsigned j = 0; j < group.size(); ++j) {
2390  if ((group[j] == "DIRECT") || (group[j] == "")) {
2391  result = true;
2392  } else {
2393  cleaned.push_back(group[j]);
2394  }
2395  }
2396  if (!cleaned.empty())
2397  cleaned_groups.push_back(JoinStrings(cleaned, "|"));
2398  }
2399 
2400  *cleaned_list = JoinStrings(cleaned_groups, ";");
2401  return result;
2402 }
2403 
2404 
2413 void DownloadManager::SetProxyChain(
2414  const string &proxy_list,
2415  const string &fallback_proxy_list,
2416  const ProxySetModes set_mode)
2417 {
2418  MutexLockGuard m(lock_options_);
2419 
2420  opt_timestamp_backup_proxies_ = 0;
2421  opt_timestamp_failover_proxies_ = 0;
2422  string set_proxy_list = opt_proxy_list_;
2423  string set_proxy_fallback_list = opt_proxy_fallback_list_;
2424  bool contains_direct;
2425  if ((set_mode == kSetProxyFallback) || (set_mode == kSetProxyBoth)) {
2426  opt_proxy_fallback_list_ = fallback_proxy_list;
2427  }
2428  if ((set_mode == kSetProxyRegular) || (set_mode == kSetProxyBoth)) {
2429  opt_proxy_list_ = proxy_list;
2430  }
2431  contains_direct =
2432  StripDirect(opt_proxy_fallback_list_, &set_proxy_fallback_list);
2433  if (contains_direct) {
2435  "fallback proxies do not support DIRECT, removing");
2436  }
2437  if (set_proxy_fallback_list == "") {
2438  set_proxy_list = opt_proxy_list_;
2439  } else {
2440  bool contains_direct = StripDirect(opt_proxy_list_, &set_proxy_list);
2441  if (contains_direct) {
2443  "skipping DIRECT proxy to use fallback proxy");
2444  }
2445  }
2446 
2447  // From this point on, use set_proxy_list and set_fallback_proxy_list as
2448  // effective proxy lists!
2449 
2450  opt_proxy_map_.clear();
2451  delete opt_proxy_groups_;
2452  if ((set_proxy_list == "") && (set_proxy_fallback_list == "")) {
2453  opt_proxy_groups_ = NULL;
2454  opt_proxy_groups_current_ = 0;
2455  opt_proxy_groups_current_burned_ = 0;
2456  opt_proxy_groups_fallback_ = 0;
2457  opt_num_proxies_ = 0;
2458  return;
2459  }
2460 
2461  // Determine number of regular proxy groups (== first fallback proxy group)
2462  opt_proxy_groups_fallback_ = 0;
2463  if (set_proxy_list != "") {
2464  opt_proxy_groups_fallback_ = SplitString(set_proxy_list, ';').size();
2465  }
2466  LogCvmfs(kLogDownload, kLogDebug, "first fallback proxy group %u",
2467  opt_proxy_groups_fallback_);
2468 
2469  // Concatenate regular proxies and fallback proxies, both of which can be
2470  // empty.
2471  string all_proxy_list = set_proxy_list;
2472  if (set_proxy_fallback_list != "") {
2473  if (all_proxy_list != "")
2474  all_proxy_list += ";";
2475  all_proxy_list += set_proxy_fallback_list;
2476  }
2477  LogCvmfs(kLogDownload, kLogDebug, "full proxy list %s",
2478  all_proxy_list.c_str());
2479 
2480  // Resolve server names in provided urls
2481  vector<string> hostnames; // All encountered hostnames
2482  vector<string> proxy_groups;
2483  if (all_proxy_list != "")
2484  proxy_groups = SplitString(all_proxy_list, ';');
2485  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2486  vector<string> this_group = SplitString(proxy_groups[i], '|');
2487  for (unsigned j = 0; j < this_group.size(); ++j) {
2488  this_group[j] = dns::AddDefaultScheme(this_group[j]);
2489  // Note: DIRECT strings will be "extracted" to an empty string.
2490  string hostname = dns::ExtractHost(this_group[j]);
2491  // Save the hostname. Leave empty (DIRECT) names so indexes will
2492  // match later.
2493  hostnames.push_back(hostname);
2494  }
2495  }
2496  vector<dns::Host> hosts;
2497  LogCvmfs(kLogDownload, kLogDebug, "resolving %u proxy addresses",
2498  hostnames.size());
2499  resolver_->ResolveMany(hostnames, &hosts);
2500 
2501  // Construct opt_proxy_groups_: traverse proxy list in same order and expand
2502  // names to resolved IP addresses.
2503  opt_proxy_groups_ = new vector< vector<ProxyInfo> >();
2504  opt_num_proxies_ = 0;
2505  unsigned num_proxy = 0; // Combined i, j counter
2506  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2507  vector<string> this_group = SplitString(proxy_groups[i], '|');
2508  // Construct ProxyInfo objects from proxy string and DNS resolver result for
2509  // every proxy in this_group. One URL can result in multiple ProxyInfo
2510  // objects, one for each IP address.
2511  vector<ProxyInfo> infos;
2512  for (unsigned j = 0; j < this_group.size(); ++j, ++num_proxy) {
2513  this_group[j] = dns::AddDefaultScheme(this_group[j]);
2514  if (this_group[j] == "DIRECT") {
2515  infos.push_back(ProxyInfo("DIRECT"));
2516  continue;
2517  }
2518 
2519  if (hosts[num_proxy].status() != dns::kFailOk) {
2521  "failed to resolve IP addresses for %s (%d - %s)",
2522  hosts[num_proxy].name().c_str(), hosts[num_proxy].status(),
2523  dns::Code2Ascii(hosts[num_proxy].status()));
2524  dns::Host failed_host =
2525  dns::Host::ExtendDeadline(hosts[num_proxy], resolver_->min_ttl());
2526  infos.push_back(ProxyInfo(failed_host, this_group[j]));
2527  continue;
2528  }
2529 
2530  // IPv4 addresses have precedence
2531  set<string> best_addresses =
2532  hosts[num_proxy].ViewBestAddresses(opt_ip_preference_);
2533  set<string>::const_iterator iter_ips = best_addresses.begin();
2534  for (; iter_ips != best_addresses.end(); ++iter_ips) {
2535  string url_ip = dns::RewriteUrl(this_group[j], *iter_ips);
2536  infos.push_back(ProxyInfo(hosts[num_proxy], url_ip));
2537  }
2538  }
2539  opt_proxy_groups_->push_back(infos);
2540  opt_num_proxies_ += infos.size();
2541  }
2543  "installed %u proxies in %u load-balance groups",
2544  opt_num_proxies_, opt_proxy_groups_->size());
2545  opt_proxy_groups_current_ = 0;
2546  opt_proxy_groups_current_burned_ = 0;
2547 
2548  // Select random start proxy from the first group.
2549  if (opt_proxy_groups_->size() > 0) {
2550  // Select random start proxy from the first group.
2551  UpdateProxiesUnlocked("set proxies");
2552  }
2553 }
2554 
2555 
2562 void DownloadManager::GetProxyInfo(vector< vector<ProxyInfo> > *proxy_chain,
2563  unsigned *current_group,
2564  unsigned *fallback_group)
2565 {
2566  assert(proxy_chain != NULL);
2567  MutexLockGuard m(lock_options_);
2568 
2569 
2570  if (!opt_proxy_groups_) {
2571  vector< vector<ProxyInfo> > empty_chain;
2572  *proxy_chain = empty_chain;
2573  if (current_group != NULL)
2574  *current_group = 0;
2575  if (fallback_group != NULL)
2576  *fallback_group = 0;
2577  return;
2578  }
2579 
2580  *proxy_chain = *opt_proxy_groups_;
2581  if (current_group != NULL)
2582  *current_group = opt_proxy_groups_current_;
2583  if (fallback_group != NULL)
2584  *fallback_group = opt_proxy_groups_fallback_;
2585 }
2586 
2587 string DownloadManager::GetProxyList() {
2588  return opt_proxy_list_;
2589 }
2590 
2591 string DownloadManager::GetFallbackProxyList() {
2592  return opt_proxy_fallback_list_;
2593 }
2594 
2599 DownloadManager::ChooseProxyUnlocked(const shash::Any *hash) {
2600  if (!opt_proxy_groups_)
2601  return NULL;
2602 
2603  uint32_t key = (hash ? hash->Partial32() : 0);
2604  map<uint32_t, ProxyInfo *>::iterator it = opt_proxy_map_.lower_bound(key);
2605  ProxyInfo *proxy = it->second;
2606 
2607  return proxy;
2608 }
2609 
2613 void DownloadManager::UpdateProxiesUnlocked(const string &reason) {
2614  if (!opt_proxy_groups_)
2615  return;
2616 
2617  // Identify number of non-burned proxies within the current group
2618  vector<ProxyInfo> *group = current_proxy_group();
2619  unsigned num_alive = (group->size() - opt_proxy_groups_current_burned_);
2620  string old_proxy = JoinStrings(opt_proxy_urls_, "|");
2621 
2622  // Rebuild proxy map and URL list
2623  opt_proxy_map_.clear();
2624  opt_proxy_urls_.clear();
2625  const uint32_t max_key = 0xffffffffUL;
2626  if (opt_proxy_shard_) {
2627  // Build a consistent map with multiple entries for each proxy
2628  for (unsigned i = 0; i < num_alive; ++i) {
2629  ProxyInfo *proxy = &(*group)[i];
2630  shash::Any proxy_hash(shash::kSha1);
2631  HashString(proxy->url, &proxy_hash);
2632  Prng prng;
2633  prng.InitSeed(proxy_hash.Partial32());
2634  for (unsigned j = 0; j < kProxyMapScale; ++j) {
2635  const std::pair<uint32_t, ProxyInfo *> entry(prng.Next(max_key), proxy);
2636  opt_proxy_map_.insert(entry);
2637  }
2638  opt_proxy_urls_.push_back(proxy->url);
2639  }
2640  // Ensure lower_bound() finds a value for all keys
2641  ProxyInfo *first_proxy = opt_proxy_map_.begin()->second;
2642  const std::pair<uint32_t, ProxyInfo *> last_entry(max_key, first_proxy);
2643  opt_proxy_map_.insert(last_entry);
2644  } else {
2645  // Build a map with a single entry for one randomly selected proxy
2646  unsigned select = prng_.Next(num_alive);
2647  ProxyInfo *proxy = &(*group)[select];
2648  const std::pair<uint32_t, ProxyInfo *> entry(max_key, proxy);
2649  opt_proxy_map_.insert(entry);
2650  opt_proxy_urls_.push_back(proxy->url);
2651  }
2652  sort(opt_proxy_urls_.begin(), opt_proxy_urls_.end());
2653 
2654  // Report any change in proxy usage
2655  string new_proxy = JoinStrings(opt_proxy_urls_, "|");
2656  if (new_proxy != old_proxy) {
2658  "switching proxy from %s to %s (%s)",
2659  (old_proxy.empty() ? "(none)" : old_proxy.c_str()),
2660  (new_proxy.empty() ? "(none)" : new_proxy.c_str()),
2661  reason.c_str());
2662  }
2663 }
2664 
2668 void DownloadManager::ShardProxies() {
2669  opt_proxy_shard_ = true;
2670  RebalanceProxiesUnlocked("enable sharding");
2671 }
2672 
2677 void DownloadManager::RebalanceProxiesUnlocked(const string &reason) {
2678  if (!opt_proxy_groups_)
2679  return;
2680 
2681  opt_timestamp_failover_proxies_ = 0;
2682  opt_proxy_groups_current_burned_ = 0;
2683  UpdateProxiesUnlocked(reason);
2684 }
2685 
2686 
2687 void DownloadManager::RebalanceProxies() {
2688  MutexLockGuard m(lock_options_);
2689  RebalanceProxiesUnlocked("rebalance");
2690 }
2691 
2692 
2696 void DownloadManager::SwitchProxyGroup() {
2697  MutexLockGuard m(lock_options_);
2698 
2699  if (!opt_proxy_groups_ || (opt_proxy_groups_->size() < 2)) {
2700  return;
2701  }
2702 
2703  opt_proxy_groups_current_ = (opt_proxy_groups_current_ + 1) %
2704  opt_proxy_groups_->size();
2705  opt_timestamp_backup_proxies_ = time(NULL);
2706  RebalanceProxiesUnlocked("switch proxy group");
2707 }
2708 
2709 
2710 void DownloadManager::SetProxyGroupResetDelay(const unsigned seconds) {
2711  MutexLockGuard m(lock_options_);
2712  opt_proxy_groups_reset_after_ = seconds;
2713  if (opt_proxy_groups_reset_after_ == 0) {
2714  opt_timestamp_backup_proxies_ = 0;
2715  opt_timestamp_failover_proxies_ = 0;
2716  }
2717 }
2718 
2719 
2720 void DownloadManager::SetHostResetDelay(const unsigned seconds)
2721 {
2722  MutexLockGuard m(lock_options_);
2723  opt_host_reset_after_ = seconds;
2724  if (opt_host_reset_after_ == 0)
2725  opt_timestamp_backup_host_ = 0;
2726 }
2727 
2728 
2729 void DownloadManager::SetRetryParameters(const unsigned max_retries,
2730  const unsigned backoff_init_ms,
2731  const unsigned backoff_max_ms)
2732 {
2733  MutexLockGuard m(lock_options_);
2734  opt_max_retries_ = max_retries;
2735  opt_backoff_init_ms_ = backoff_init_ms;
2736  opt_backoff_max_ms_ = backoff_max_ms;
2737 }
2738 
2739 
2740 void DownloadManager::SetMaxIpaddrPerProxy(unsigned limit) {
2741  MutexLockGuard m(lock_options_);
2742  resolver_->set_throttle(limit);
2743 }
2744 
2745 
2746 void DownloadManager::SetProxyTemplates(
2747  const std::string &direct,
2748  const std::string &forced)
2749 {
2750  MutexLockGuard m(lock_options_);
2751  proxy_template_direct_ = direct;
2752  proxy_template_forced_ = forced;
2753 }
2754 
2755 
2756 void DownloadManager::EnableInfoHeader() {
2757  enable_info_header_ = true;
2758 }
2759 
2760 
2761 void DownloadManager::EnableRedirects() {
2762  follow_redirects_ = true;
2763 }
2764 
2765 void DownloadManager::UseSystemCertificatePath() {
2766  ssl_certificate_store_.UseSystemCertificatePath();
2767 }
2768 
2773 DownloadManager *DownloadManager::Clone(
2774  const perf::StatisticsTemplate &statistics)
2775 {
2776  DownloadManager *clone = new DownloadManager();
2777  clone->Init(pool_max_handles_, statistics);
2778  if (resolver_) {
2779  clone->SetDnsParameters(resolver_->retries(), resolver_->timeout_ms());
2780  clone->SetDnsTtlLimits(resolver_->min_ttl(), resolver_->max_ttl());
2781  clone->SetMaxIpaddrPerProxy(resolver_->throttle());
2782  }
2783  if (!opt_dns_server_.empty())
2784  clone->SetDnsServer(opt_dns_server_);
2785  clone->opt_timeout_proxy_ = opt_timeout_proxy_;
2786  clone->opt_timeout_direct_ = opt_timeout_direct_;
2787  clone->opt_low_speed_limit_ = opt_low_speed_limit_;
2788  clone->opt_max_retries_ = opt_max_retries_;
2789  clone->opt_backoff_init_ms_ = opt_backoff_init_ms_;
2790  clone->opt_backoff_max_ms_ = opt_backoff_max_ms_;
2791  clone->enable_info_header_ = enable_info_header_;
2792  clone->follow_redirects_ = follow_redirects_;
2793  if (opt_host_chain_) {
2794  clone->opt_host_chain_ = new vector<string>(*opt_host_chain_);
2795  clone->opt_host_chain_rtt_ = new vector<int>(*opt_host_chain_rtt_);
2796  }
2797  CloneProxyConfig(clone);
2798  clone->opt_ip_preference_ = opt_ip_preference_;
2799  clone->proxy_template_direct_ = proxy_template_direct_;
2800  clone->proxy_template_forced_ = proxy_template_forced_;
2801  clone->opt_proxy_groups_reset_after_ = opt_proxy_groups_reset_after_;
2802  clone->opt_host_reset_after_ = opt_host_reset_after_;
2803  clone->credentials_attachment_ = credentials_attachment_;
2804  clone->ssl_certificate_store_ = ssl_certificate_store_;
2805 
2806  return clone;
2807 }
2808 
2809 
2810 void DownloadManager::CloneProxyConfig(DownloadManager *clone) {
2811  clone->opt_proxy_groups_current_ = opt_proxy_groups_current_;
2812  clone->opt_proxy_groups_current_burned_ = opt_proxy_groups_current_burned_;
2813  clone->opt_proxy_groups_fallback_ = opt_proxy_groups_fallback_;
2814  clone->opt_num_proxies_ = opt_num_proxies_;
2815  clone->opt_proxy_shard_ = opt_proxy_shard_;
2816  clone->opt_proxy_list_ = opt_proxy_list_;
2817  clone->opt_proxy_fallback_list_ = opt_proxy_fallback_list_;
2818  if (opt_proxy_groups_ == NULL)
2819  return;
2820 
2821  clone->opt_proxy_groups_ = new vector< vector<ProxyInfo> >(
2822  *opt_proxy_groups_);
2823  clone->UpdateProxiesUnlocked("cloned");
2824 }
2825 
2826 } // namespace download
unsigned opt_timeout_direct_
Definition: download.h:524
unsigned opt_low_speed_limit_
Definition: download.h:525
void HashString(const std::string &content, Any *any_digest)
Definition: hash.cc:268
Destination destination
Definition: download.h:166
#define LogCvmfs(source, mask,...)
Definition: logging.h:22
Definition: prng.h:28
std::vector< T > Shuffle(const std::vector< T > &input, Prng *prng)
Definition: algorithm.h:49
const char * Code2Ascii(const ObjectFetcherFailures::Failures error)
unsigned opt_backoff_init_ms_
Definition: download.h:527
static unsigned EscapeHeader(const string &header, char *escaped_buf, size_t buf_size)
Definition: download.cc:123
unsigned char num_used_hosts
Definition: download.h:297
static bool EscapeUrlChar(char input, char output[3])
Definition: download.cc:73
int64_t Xadd(class Counter *counter, const int64_t delta)
Definition: statistics.h:51
const char * Code2Ascii(const Failures error)
Definition: dns.h:53
int64_t id() const
Definition: dns.h:108
unsigned opt_proxy_groups_current_burned_
Definition: download.h:552
double DiffTimeSeconds(struct timeval start, struct timeval end)
Definition: algorithm.cc:31
unsigned opt_proxy_groups_reset_after_
Definition: download.h:614
virtual bool IsCanceled()
Definition: interrupt.h:16
void SetUrlOptions(JobInfo *info)
Definition: download.cc:905
StreamStates DecompressZStream2Sink(const void *buf, const int64_t size, z_stream *strm, cvmfs::Sink *sink)
Definition: compression.cc:234
unsigned backoff_ms
Definition: download.h:299
std::string opt_proxy_fallback_list_
Definition: download.h:569
static NormalResolver * Create(const bool ipv4_only, const unsigned retries, const unsigned timeout_ms)
Definition: dns.cc:1259
unsigned opt_host_reset_after_
Definition: download.h:622
std::string proxy_template_direct_
Definition: download.h:598
#define PANIC(...)
Definition: exception.h:27
string ReplaceAll(const string &haystack, const string &needle, const string &replace_by)
Definition: string.cc:482
string JoinStrings(const vector< string > &strings, const string &joint)
Definition: string.cc:323
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:249
unsigned opt_proxy_groups_current_
Definition: download.h:547
off_t range_offset
Definition: download.h:179
shash::ContextPtr hash_context
Definition: download.h:290
void DecompressInit(z_stream *strm)
Definition: compression.cc:185
unsigned int current_host_chain_index
Definition: download.h:300
bool DecompressMem2Mem(const void *buf, const int64_t size, void **out_buf, uint64_t *out_size)
Definition: compression.cc:797
std::set< CURL * > * pool_handles_inuse_
Definition: download.h:503
void * cred_data
Definition: download.h:164
StreamStates DecompressZStream2File(const void *buf, const int64_t size, z_stream *strm, FILE *f)
Definition: compression.cc:275
std::string opt_proxy_list_
Definition: download.h:565
const std::string & name() const
Definition: dns.h:118
perf::Counter * sz_transfer_time
Definition: download.h:129
std::vector< std::vector< ProxyInfo > > * opt_proxy_groups_
Definition: download.h:543
assert((mem||(size==0))&&"Out Of Memory")
unsigned opt_proxy_groups_fallback_
Definition: download.h:557
void ReleaseCurlHandle(CURL *handle)
Definition: download.cc:819
StreamStates
Definition: compression.h:36
Algorithms algorithm
Definition: hash.h:125
z_stream zstream
Definition: download.h:289
const std::set< std::string > & ViewBestAddresses(IpPreference preference) const
Definition: dns.cc:205
void SetDnsServer(const std::string &address)
Definition: download.cc:1822
void DecompressFini(z_stream *strm)
Definition: compression.cc:201
void InitSeed(const uint64_t seed)
Definition: prng.h:35
void MakePipe(int pipe_fd[2])
Definition: posix.cc:487
char algorithm
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:182
void Init(ContextPtr context)
Definition: hash.cc:164
bool IsValid(const std::string &input) const
Definition: sanitizer.cc:114
Algorithms
Definition: hash.h:41
void SetDnsTtlLimits(const unsigned min_seconds, const unsigned max_seconds)
Definition: download.cc:1858
static Failures PrepareDownloadDestination(JobInfo *info)
Definition: download.cc:153
void Init(const unsigned max_pool_handles, const perf::StatisticsTemplate &statistics)
Definition: download.cc:1606
const char * Code2Ascii(const Failures error)
Definition: download.h:90
unsigned char num_retries
Definition: download.h:298
std::string AddDefaultScheme(const std::string &proxy)
Definition: dns.cc:187
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:288
static string EscapeUrl(const string &url)
Definition: download.cc:100
dns::IpPreference opt_ip_preference_
Definition: download.h:591
Algorithms algorithm
Definition: hash.h:500
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:287
Definition: dns.h:90
Failures Fetch(const std::string &base_url, const std::string &repository_name, const uint64_t minimum_timestamp, const shash::Any *base_catalog, signature::SignatureManager *signature_manager, download::DownloadManager *download_manager, ManifestEnsemble *ensemble)
void UpdateProxiesUnlocked(const std::string &reason)
Definition: download.cc:2613
bool IsEquivalent(const Host &other) const
Definition: dns.cc:269
bool IsProxyTransferError(const Failures error)
Definition: download.h:78
bool IsExpired() const
Definition: dns.cc:280
FILE * destination_file
Definition: download.h:172
bool follow_redirects
Definition: download.h:159
virtual int Reset()=0
perf::Counter * n_requests
Definition: download.h:130
static int Init(const loader::LoaderExports *loader_exports)
Definition: cvmfs.cc:1950
void Final(ContextPtr context, Any *any_digest)
Definition: hash.cc:221
string StringifyInt(const int64_t value)
Definition: string.cc:78
void SetMaxIpaddrPerProxy(unsigned limit)
Definition: download.cc:2740
const shash::Any * expected_hash
Definition: download.h:175
void Inc(class Counter *counter)
Definition: statistics.h:50
void * buffer
Definition: hash.h:501
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:265
const std::string * extra_info
Definition: download.h:176
std::string ExtractHost(const std::string &url)
Definition: dns.cc:110
std::vector< int > * opt_host_chain_rtt_
Definition: download.h:539
cvmfs::Sink * destination_sink
Definition: download.h:174
SslCertificateStore ssl_certificate_store_
Definition: download.h:635
uint32_t Partial32() const
Definition: hash.h:394
IpPreference
Definition: dns.h:46
unsigned opt_backoff_max_ms_
Definition: download.h:528
unsigned GetContextSize(const Algorithms algorithm)
Definition: hash.cc:148
CURL * curl_handle
Definition: download.h:286
CredentialsAttachment * credentials_attachment_
Definition: download.h:624
std::vector< std::string > * opt_host_chain_
Definition: download.h:534
struct pollfd * watch_fds_
Definition: download.h:515
void Update(const unsigned char *buffer, const unsigned buffer_length, ContextPtr context)
Definition: hash.cc:190
uint64_t String2Uint64(const string &value)
Definition: string.cc:228
Failures error_code
Definition: download.h:294
static void Fini()
Definition: cvmfs.cc:2156
static void Spawn()
Definition: cvmfs.cc:2071
InterruptCue * interrupt_cue
Definition: download.h:165
struct download::JobInfo::@3 destination_mem
Definition: mutex.h:42
std::string Filter(const std::string &input) const
Definition: sanitizer.cc:107
std::string proxy_template_forced_
Definition: download.h:604
const std::string * destination_path
Definition: download.h:173
void SetDnsParameters(const unsigned retries, const unsigned timeout_ms)
Definition: download.cc:1840
static Host ExtendDeadline(const Host &original, unsigned seconds_from_now)
Definition: dns.cc:231
void SafeSleepMs(const unsigned ms)
Definition: posix.cc:1879
bool IsHostTransferError(const Failures error)
Definition: download.h:66
void SortTeam(std::vector< T > *tractor, std::vector< U > *towed)
Definition: algorithm.h:67
curl_slist * headers
Definition: download.h:287
unsigned size
Definition: hash.h:502
unsigned char num_used_proxies
Definition: download.h:296
Failures status() const
Definition: dns.h:119
std::string proxy
Definition: download.h:292
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:496
static void size_t size
Definition: smalloc.h:47
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
Definition: download.cc:1228
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:508
const std::string * url
Definition: download.h:155
void InitializeRequest(JobInfo *info, CURL *handle)
Definition: download.cc:837
string RewriteUrl(const string &url, const string &ip)
Definition: dns.cc:156
uint32_t Next(const uint64_t boundary)
Definition: prng.h:49
virtual int64_t Write(const void *buf, uint64_t sz)=0
char * info_header
Definition: download.h:288