CernVM-FS  2.11.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
download.cc
Go to the documentation of this file.
1 
27 // TODO(jblomer): MS for time summing
28 // NOLINTNEXTLINE
29 #define __STDC_FORMAT_MACROS
30 
31 #include "cvmfs_config.h"
32 #include "download.h"
33 
34 #include <alloca.h>
35 #include <errno.h>
36 #include <inttypes.h>
37 #include <poll.h>
38 #include <pthread.h>
39 #include <signal.h>
40 #include <stdint.h>
41 #include <sys/time.h>
42 #include <unistd.h>
43 
44 #include <algorithm>
45 #include <cassert>
46 #include <cstdio>
47 #include <cstdlib>
48 #include <cstring>
49 #include <map>
50 #include <set>
51 #include <utility>
52 
53 #include "compression.h"
54 #include "crypto/hash.h"
55 #include "duplex_curl.h"
56 #include "interrupt.h"
57 #include "sanitizer.h"
58 #include "ssl.h"
59 #include "util/algorithm.h"
60 #include "util/atomic.h"
61 #include "util/concurrency.h"
62 #include "util/exception.h"
63 #include "util/logging.h"
64 #include "util/posix.h"
65 #include "util/prng.h"
66 #include "util/smalloc.h"
67 #include "util/string.h"
68 
69 using namespace std; // NOLINT
70 
71 namespace download {
72 
73 static inline bool EscapeUrlChar(char input, char output[3]) {
74  if (((input >= '0') && (input <= '9')) ||
75  ((input >= 'A') && (input <= 'Z')) ||
76  ((input >= 'a') && (input <= 'z')) ||
77  (input == '/') || (input == ':') || (input == '.') ||
78  (input == '@') ||
79  (input == '+') || (input == '-') ||
80  (input == '_') || (input == '~') ||
81  (input == '[') || (input == ']') || (input == ','))
82  {
83  output[0] = input;
84  return false;
85  }
86 
87  output[0] = '%';
88  output[1] = static_cast<char>(
89  (input / 16) + ((input / 16 <= 9) ? '0' : 'A'-10));
90  output[2] = static_cast<char>(
91  (input % 16) + ((input % 16 <= 9) ? '0' : 'A'-10));
92  return true;
93 }
94 
95 
100 static string EscapeUrl(const string &url) {
101  string escaped;
102  escaped.reserve(url.length());
103 
104  char escaped_char[3];
105  for (unsigned i = 0, s = url.length(); i < s; ++i) {
106  if (EscapeUrlChar(url[i], escaped_char)) {
107  escaped.append(escaped_char, 3);
108  } else {
109  escaped.push_back(escaped_char[0]);
110  }
111  }
112  LogCvmfs(kLogDownload, kLogDebug, "escaped %s to %s",
113  url.c_str(), escaped.c_str());
114 
115  return escaped;
116 }
117 
118 
123 static unsigned EscapeHeader(const string &header,
124  char *escaped_buf,
125  size_t buf_size)
126 {
127  unsigned esc_pos = 0;
128  char escaped_char[3];
129  for (unsigned i = 0, s = header.size(); i < s; ++i) {
130  if (EscapeUrlChar(header[i], escaped_char)) {
131  for (unsigned j = 0; j < 3; ++j) {
132  if (escaped_buf) {
133  if (esc_pos >= buf_size)
134  return esc_pos;
135  escaped_buf[esc_pos] = escaped_char[j];
136  }
137  esc_pos++;
138  }
139  } else {
140  if (escaped_buf) {
141  if (esc_pos >= buf_size)
142  return esc_pos;
143  escaped_buf[esc_pos] = escaped_char[0];
144  }
145  esc_pos++;
146  }
147  }
148 
149  return esc_pos;
150 }
151 
152 
154  info->destination_mem.size = 0;
155  info->destination_mem.pos = 0;
156  info->destination_mem.data = NULL;
157 
158  if (info->destination == kDestinationFile)
159  assert(info->destination_file != NULL);
160 
161  if (info->destination == kDestinationPath) {
162  assert(info->destination_path != NULL);
163  info->destination_file = fopen(info->destination_path->c_str(), "w");
164  if (info->destination_file == NULL) {
165  LogCvmfs(kLogDownload, kLogDebug, "Failed to open path %s: %s"
166  " (errno=%d).",
167  info->destination_path->c_str(), strerror(errno), errno);
168  return kFailLocalIO;
169  }
170  }
171 
172  if (info->destination == kDestinationSink)
173  assert(info->destination_sink != NULL);
174 
175  return kFailOk;
176 }
177 
178 
182 static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb,
183  void *info_link)
184 {
185  const size_t num_bytes = size*nmemb;
186  const string header_line(static_cast<const char *>(ptr), num_bytes);
187  JobInfo *info = static_cast<JobInfo *>(info_link);
188 
189  // LogCvmfs(kLogDownload, kLogDebug, "REMOVE-ME: Header callback with %s",
190  // header_line.c_str());
191 
192  // Check http status codes
193  if (HasPrefix(header_line, "HTTP/1.", false)) {
194  if (header_line.length() < 10)
195  return 0;
196 
197  unsigned i;
198  for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {}
199 
200  // Code is initialized to -1
201  if (header_line.length() > i+2) {
202  info->http_code = DownloadManager::ParseHttpCode(&header_line[i]);
203  }
204 
205  if ((info->http_code / 100) == 2) {
206  return num_bytes;
207  } else if ((info->http_code == 301) ||
208  (info->http_code == 302) ||
209  (info->http_code == 303) ||
210  (info->http_code == 307))
211  {
212  if (!info->follow_redirects) {
213  LogCvmfs(kLogDownload, kLogDebug, "redirect support not enabled: %s",
214  header_line.c_str());
215  info->error_code = kFailHostHttp;
216  return 0;
217  }
218  LogCvmfs(kLogDownload, kLogDebug, "http redirect: %s",
219  header_line.c_str());
220  // libcurl will handle this because of CURLOPT_FOLLOWLOCATION
221  return num_bytes;
222  } else {
223  LogCvmfs(kLogDownload, kLogDebug, "http status error code: %s [%d]",
224  header_line.c_str(), info->http_code);
225  if (((info->http_code / 100) == 5) ||
226  (info->http_code == 400) || (info->http_code == 404))
227  {
228  // 5XX returned by host
229  // 400: error from the GeoAPI module
230  // 404: the stratum 1 does not have the newest files
231  info->error_code = kFailHostHttp;
232  } else if (info->http_code == 429) {
233  // 429: rate throttling (we ignore the backoff hint for the time being)
235  } else {
236  info->error_code = (info->proxy == "DIRECT") ? kFailHostHttp :
238  }
239  return 0;
240  }
241  }
242 
243  // Allocate memory for kDestinationMemory
244  if ((info->destination == kDestinationMem) &&
245  HasPrefix(header_line, "CONTENT-LENGTH:", true))
246  {
247  char *tmp = reinterpret_cast<char *>(alloca(num_bytes+1));
248  uint64_t length = 0;
249  sscanf(header_line.c_str(), "%s %" PRIu64, tmp, &length);
250  if (length > 0) {
251  if (length > DownloadManager::kMaxMemSize) {
253  "resource %s too large to store in memory (%" PRIu64 ")",
254  info->url->c_str(), length);
255  info->error_code = kFailTooBig;
256  return 0;
257  }
258  info->destination_mem.data = static_cast<char *>(smalloc(length));
259  } else {
260  // Empty resource
261  info->destination_mem.data = NULL;
262  }
263  info->destination_mem.size = length;
264  } else if (HasPrefix(header_line, "LOCATION:", true)) {
265  // This comes along with redirects
266  LogCvmfs(kLogDownload, kLogDebug, "%s", header_line.c_str());
267  } else if (HasPrefix(header_line, "X-SQUID-ERROR:", true)) {
268  // Reinterpret host error as proxy error
269  if (info->error_code == kFailHostHttp) {
270  info->error_code = kFailProxyHttp;
271  }
272  } else if (HasPrefix(header_line, "PROXY-STATUS:", true)) {
273  // Reinterpret host error as proxy error if applicable
274  if ((info->error_code == kFailHostHttp) &&
275  (header_line.find("error=") != string::npos)) {
276  info->error_code = kFailProxyHttp;
277  }
278  }
279 
280  return num_bytes;
281 }
282 
283 
287 static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb,
288  void *info_link)
289 {
290  const size_t num_bytes = size*nmemb;
291  JobInfo *info = static_cast<JobInfo *>(info_link);
292 
293  // LogCvmfs(kLogDownload, kLogDebug, "Data callback, %d bytes", num_bytes);
294 
295  if (num_bytes == 0)
296  return 0;
297 
298  if (info->expected_hash) {
299  shash::Update(reinterpret_cast<unsigned char *>(ptr),
300  num_bytes, info->hash_context);
301  }
302 
303  if (info->destination == kDestinationSink) {
304  if (info->compressed) {
305  zlib::StreamStates retval =
306  zlib::DecompressZStream2Sink(ptr, static_cast<int64_t>(num_bytes),
307  &info->zstream, info->destination_sink);
308  if (retval == zlib::kStreamDataError) {
309  LogCvmfs(kLogDownload, kLogSyslogErr, "failed to decompress %s",
310  info->url->c_str());
311  info->error_code = kFailBadData;
312  return 0;
313  } else if (retval == zlib::kStreamIOError) {
315  "decompressing %s, local IO error", info->url->c_str());
316  info->error_code = kFailLocalIO;
317  return 0;
318  }
319  } else {
320  int64_t written = info->destination_sink->Write(ptr, num_bytes);
321  if ((written < 0) || (static_cast<uint64_t>(written) != num_bytes)) {
322  LogCvmfs(kLogDownload, kLogDebug, "Failed to perform write on %s (%"
323  PRId64 ")", info->url->c_str(), written);
324  info->error_code = kFailLocalIO;
325  return 0;
326  }
327  }
328  } else if (info->destination == kDestinationMem) {
329  // Write to memory
330  if (info->destination_mem.pos + num_bytes > info->destination_mem.size) {
331  if (info->destination_mem.size == 0) {
333  "Content-Length was missing or zero, but %zu bytes received",
334  info->destination_mem.pos + num_bytes);
335  } else {
336  LogCvmfs(kLogDownload, kLogDebug, "Callback had too much data: "
337  "start %zu, bytes %zu, expected %zu",
338  info->destination_mem.pos,
339  num_bytes,
340  info->destination_mem.size);
341  }
342  info->error_code = kFailBadData;
343  return 0;
344  }
345  memcpy(info->destination_mem.data + info->destination_mem.pos,
346  ptr, num_bytes);
347  info->destination_mem.pos += num_bytes;
348  } else {
349  // Write to file
350  if (info->compressed) {
351  // LogCvmfs(kLogDownload, kLogDebug, "REMOVE-ME: writing %d bytes for %s",
352  // num_bytes, info->url->c_str());
353  zlib::StreamStates retval =
354  zlib::DecompressZStream2File(ptr, static_cast<int64_t>(num_bytes),
355  &info->zstream, info->destination_file);
356  if (retval == zlib::kStreamDataError) {
357  LogCvmfs(kLogDownload, kLogSyslogErr, "failed to decompress %s",
358  info->url->c_str());
359  info->error_code = kFailBadData;
360  return 0;
361  } else if (retval == zlib::kStreamIOError) {
363  "decompressing %s, local IO error", info->url->c_str());
364  info->error_code = kFailLocalIO;
365  return 0;
366  }
367  } else {
368  if (fwrite(ptr, 1, num_bytes, info->destination_file) != num_bytes) {
370  "downloading %s, IO failure: %s (errno=%d)",
371  info->url->c_str(), strerror(errno), errno);
372  info->error_code = kFailLocalIO;
373  return 0;
374  }
375  }
376  }
377 
378  return num_bytes;
379 }
380 
381 
382 //------------------------------------------------------------------------------
383 
384 
385 bool JobInfo::IsFileNotFound() {
386  if (HasPrefix(*url, "file://", true /* ignore_case */))
387  return error_code == kFailHostConnection;
388 
389  return http_code == 404;
390 }
391 
392 
393 //------------------------------------------------------------------------------
394 
395 
396 const int DownloadManager::kProbeUnprobed = -1;
397 const int DownloadManager::kProbeDown = -2;
398 const int DownloadManager::kProbeGeo = -3;
399 const unsigned DownloadManager::kMaxMemSize = 1024*1024;
400 
401 
405 int DownloadManager::ParseHttpCode(const char digits[3]) {
406  int result = 0;
407  int factor = 100;
408  for (int i = 0; i < 3; ++i) {
409  if ((digits[i] < '0') || (digits[i] > '9'))
410  return -1;
411  result += (digits[i] - '0') * factor;
412  factor /= 10;
413  }
414  return result;
415 }
416 
417 
421 int DownloadManager::CallbackCurlSocket(CURL * /* easy */,
422  curl_socket_t s,
423  int action,
424  void *userp,
425  void * /* socketp */)
426 {
427  // LogCvmfs(kLogDownload, kLogDebug, "CallbackCurlSocket called with easy "
428  // "handle %p, socket %d, action %d", easy, s, action);
429  DownloadManager *download_mgr = static_cast<DownloadManager *>(userp);
430  if (action == CURL_POLL_NONE)
431  return 0;
432 
433  // Find s in watch_fds_
434  unsigned index;
435  for (index = 0; index < download_mgr->watch_fds_inuse_; ++index) {
436  if (download_mgr->watch_fds_[index].fd == s)
437  break;
438  }
439  // Or create newly
440  if (index == download_mgr->watch_fds_inuse_) {
441  // Extend array if necessary
442  if (download_mgr->watch_fds_inuse_ == download_mgr->watch_fds_size_)
443  {
444  assert(download_mgr->watch_fds_size_ > 0);
445  download_mgr->watch_fds_size_ *= 2;
446  download_mgr->watch_fds_ = static_cast<struct pollfd *>(
447  srealloc(download_mgr->watch_fds_,
448  download_mgr->watch_fds_size_ * sizeof(struct pollfd)));
449  }
450  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].fd = s;
451  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].events = 0;
452  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].revents = 0;
453  download_mgr->watch_fds_inuse_++;
454  }
455 
456  switch (action) {
457  case CURL_POLL_IN:
458  download_mgr->watch_fds_[index].events = POLLIN | POLLPRI;
459  break;
460  case CURL_POLL_OUT:
461  download_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND;
462  break;
463  case CURL_POLL_INOUT:
464  download_mgr->watch_fds_[index].events =
465  POLLIN | POLLPRI | POLLOUT | POLLWRBAND;
466  break;
467  case CURL_POLL_REMOVE:
468  if (index < download_mgr->watch_fds_inuse_-1) {
469  download_mgr->watch_fds_[index] =
470  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_-1];
471  }
472  download_mgr->watch_fds_inuse_--;
473  // Shrink array if necessary
474  if ((download_mgr->watch_fds_inuse_ > download_mgr->watch_fds_max_) &&
475  (download_mgr->watch_fds_inuse_ < download_mgr->watch_fds_size_/2))
476  {
477  download_mgr->watch_fds_size_ /= 2;
478  // LogCvmfs(kLogDownload, kLogDebug, "shrinking watch_fds_ (%d)",
479  // watch_fds_size_);
480  download_mgr->watch_fds_ = static_cast<struct pollfd *>(
481  srealloc(download_mgr->watch_fds_,
482  download_mgr->watch_fds_size_*sizeof(struct pollfd)));
483  // LogCvmfs(kLogDownload, kLogDebug, "shrinking watch_fds_ done",
484  // watch_fds_size_);
485  }
486  break;
487  default:
488  break;
489  }
490 
491  return 0;
492 }
493 
494 
498 void *DownloadManager::MainDownload(void *data) {
499  LogCvmfs(kLogDownload, kLogDebug, "download I/O thread started");
500  DownloadManager *download_mgr = static_cast<DownloadManager *>(data);
501 
502  download_mgr->watch_fds_ =
503  static_cast<struct pollfd *>(smalloc(2 * sizeof(struct pollfd)));
504  download_mgr->watch_fds_size_ = 2;
505  download_mgr->watch_fds_[0].fd = download_mgr->pipe_terminate_[0];
506  download_mgr->watch_fds_[0].events = POLLIN | POLLPRI;
507  download_mgr->watch_fds_[0].revents = 0;
508  download_mgr->watch_fds_[1].fd = download_mgr->pipe_jobs_[0];
509  download_mgr->watch_fds_[1].events = POLLIN | POLLPRI;
510  download_mgr->watch_fds_[1].revents = 0;
511  download_mgr->watch_fds_inuse_ = 2;
512 
513  int still_running = 0;
514  struct timeval timeval_start, timeval_stop;
515  gettimeofday(&timeval_start, NULL);
516  while (true) {
517  int timeout;
518  if (still_running) {
519  /* NOTE: The following might degrade the performance for many small files
520  * use case. TODO(jblomer): look into it.
521  // Specify a timeout for polling in ms; this allows us to return
522  // to libcurl once a second so it can look for internal operations
523  // which timed out. libcurl has a more elaborate mechanism
524  // (CURLMOPT_TIMERFUNCTION) that would inform us of the next potential
525  // timeout. TODO(bbockelm) we should switch to that in the future.
526  timeout = 100;
527  */
528  timeout = 1;
529  } else {
530  timeout = -1;
531  gettimeofday(&timeval_stop, NULL);
532  int64_t delta = static_cast<int64_t>(
533  1000 * DiffTimeSeconds(timeval_start, timeval_stop));
534  perf::Xadd(download_mgr->counters_->sz_transfer_time, delta);
535  }
536  int retval = poll(download_mgr->watch_fds_, download_mgr->watch_fds_inuse_,
537  timeout);
538  if (retval < 0) {
539  continue;
540  }
541 
542  // Handle timeout
543  if (retval == 0) {
544  curl_multi_socket_action(download_mgr->curl_multi_,
545  CURL_SOCKET_TIMEOUT,
546  0,
547  &still_running);
548  }
549 
550  // Terminate I/O thread
551  if (download_mgr->watch_fds_[0].revents)
552  break;
553 
554  // New job arrives
555  if (download_mgr->watch_fds_[1].revents) {
556  download_mgr->watch_fds_[1].revents = 0;
557  JobInfo *info;
558  // NOLINTNEXTLINE(bugprone-sizeof-expression)
559  ReadPipe(download_mgr->pipe_jobs_[0], &info, sizeof(info));
560  if (!still_running)
561  gettimeofday(&timeval_start, NULL);
562  CURL *handle = download_mgr->AcquireCurlHandle();
563  download_mgr->InitializeRequest(info, handle);
564  download_mgr->SetUrlOptions(info);
565  curl_multi_add_handle(download_mgr->curl_multi_, handle);
566  curl_multi_socket_action(download_mgr->curl_multi_,
567  CURL_SOCKET_TIMEOUT,
568  0,
569  &still_running);
570  }
571 
572  // Activity on curl sockets
573  // Within this loop the curl_multi_socket_action() may cause socket(s)
574  // to be removed from watch_fds_. If a socket is removed it is replaced
575  // by the socket at the end of the array and the inuse count is decreased.
576  // Therefore loop over the array in reverse order.
577  for (int64_t i = download_mgr->watch_fds_inuse_-1; i >= 2; --i) {
578  if (i >= download_mgr->watch_fds_inuse_) {
579  continue;
580  }
581  if (download_mgr->watch_fds_[i].revents) {
582  int ev_bitmask = 0;
583  if (download_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI))
584  ev_bitmask |= CURL_CSELECT_IN;
585  if (download_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
586  ev_bitmask |= CURL_CSELECT_OUT;
587  if (download_mgr->watch_fds_[i].revents &
588  (POLLERR | POLLHUP | POLLNVAL))
589  {
590  ev_bitmask |= CURL_CSELECT_ERR;
591  }
592  download_mgr->watch_fds_[i].revents = 0;
593 
594  curl_multi_socket_action(download_mgr->curl_multi_,
595  download_mgr->watch_fds_[i].fd,
596  ev_bitmask,
597  &still_running);
598  }
599  }
600 
601  // Check if transfers are completed
602  CURLMsg *curl_msg;
603  int msgs_in_queue;
604  while ((curl_msg = curl_multi_info_read(download_mgr->curl_multi_,
605  &msgs_in_queue)))
606  {
607  if (curl_msg->msg == CURLMSG_DONE) {
608  perf::Inc(download_mgr->counters_->n_requests);
609  JobInfo *info;
610  CURL *easy_handle = curl_msg->easy_handle;
611  int curl_error = curl_msg->data.result;
612  curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
613 
614  curl_multi_remove_handle(download_mgr->curl_multi_, easy_handle);
615  if (download_mgr->VerifyAndFinalize(curl_error, info)) {
616  curl_multi_add_handle(download_mgr->curl_multi_, easy_handle);
617  curl_multi_socket_action(download_mgr->curl_multi_,
618  CURL_SOCKET_TIMEOUT,
619  0,
620  &still_running);
621  } else {
622  // Return easy handle into pool and write result back
623  download_mgr->ReleaseCurlHandle(easy_handle);
624 
625  WritePipe(info->wait_at[1], &info->error_code,
626  sizeof(info->error_code));
627  }
628  }
629  }
630  }
631 
632  for (set<CURL *>::iterator i = download_mgr->pool_handles_inuse_->begin(),
633  iEnd = download_mgr->pool_handles_inuse_->end(); i != iEnd; ++i)
634  {
635  curl_multi_remove_handle(download_mgr->curl_multi_, *i);
636  curl_easy_cleanup(*i);
637  }
638  download_mgr->pool_handles_inuse_->clear();
639  free(download_mgr->watch_fds_);
640 
641  LogCvmfs(kLogDownload, kLogDebug, "download I/O thread terminated");
642  return NULL;
643 }
644 
645 
646 //------------------------------------------------------------------------------
647 
648 
649 HeaderLists::~HeaderLists() {
650  for (unsigned i = 0; i < blocks_.size(); ++i) {
651  delete[] blocks_[i];
652  }
653  blocks_.clear();
654 }
655 
656 
657 curl_slist *HeaderLists::GetList(const char *header) {
658  return Get(header);
659 }
660 
661 
662 curl_slist *HeaderLists::DuplicateList(curl_slist *slist) {
663  assert(slist);
664  curl_slist *copy = GetList(slist->data);
665  copy->next = slist->next;
666  curl_slist *prev = copy;
667  slist = slist->next;
668  while (slist) {
669  curl_slist *new_link = Get(slist->data);
670  new_link->next = slist->next;
671  prev->next = new_link;
672  prev = new_link;
673  slist = slist->next;
674  }
675  return copy;
676 }
677 
678 
679 void HeaderLists::AppendHeader(curl_slist *slist, const char *header) {
680  assert(slist);
681  curl_slist *new_link = Get(header);
682  new_link->next = NULL;
683 
684  while (slist->next)
685  slist = slist->next;
686  slist->next = new_link;
687 }
688 
689 
695 void HeaderLists::CutHeader(const char *header, curl_slist **slist) {
696  assert(slist);
697  curl_slist head;
698  head.next = *slist;
699  curl_slist *prev = &head;
700  curl_slist *rover = *slist;
701  while (rover) {
702  if (strcmp(rover->data, header) == 0) {
703  prev->next = rover->next;
704  Put(rover);
705  rover = prev;
706  }
707  prev = rover;
708  rover = rover->next;
709  }
710  *slist = head.next;
711 }
712 
713 
714 void HeaderLists::PutList(curl_slist *slist) {
715  while (slist) {
716  curl_slist *next = slist->next;
717  Put(slist);
718  slist = next;
719  }
720 }
721 
722 
723 string HeaderLists::Print(curl_slist *slist) {
724  string verbose;
725  while (slist) {
726  verbose += string(slist->data) + "\n";
727  slist = slist->next;
728  }
729  return verbose;
730 }
731 
732 
733 curl_slist *HeaderLists::Get(const char *header) {
734  for (unsigned i = 0; i < blocks_.size(); ++i) {
735  for (unsigned j = 0; j < kBlockSize; ++j) {
736  if (!IsUsed(&(blocks_[i][j]))) {
737  blocks_[i][j].data = const_cast<char *>(header);
738  return &(blocks_[i][j]);
739  }
740  }
741  }
742 
743  // All used, new block
744  AddBlock();
745  blocks_[blocks_.size()-1][0].data = const_cast<char *>(header);
746  return &(blocks_[blocks_.size()-1][0]);
747 }
748 
749 
750 void HeaderLists::Put(curl_slist *slist) {
751  slist->data = NULL;
752  slist->next = NULL;
753 }
754 
755 
756 void HeaderLists::AddBlock() {
757  curl_slist *new_block = new curl_slist[kBlockSize];
758  for (unsigned i = 0; i < kBlockSize; ++i) {
759  Put(&new_block[i]);
760  }
761  blocks_.push_back(new_block);
762 }
763 
764 
765 //------------------------------------------------------------------------------
766 
767 
768 string DownloadManager::ProxyInfo::Print() {
769  if (url == "DIRECT")
770  return url;
771 
772  string result = url;
773  int remaining =
774  static_cast<int>(host.deadline()) - static_cast<int>(time(NULL));
775  string expinfo = (remaining >= 0) ? "+" : "";
776  if (abs(remaining) >= 3600) {
777  expinfo += StringifyInt(remaining/3600) + "h";
778  } else if (abs(remaining) >= 60) {
779  expinfo += StringifyInt(remaining/60) + "m";
780  } else {
781  expinfo += StringifyInt(remaining) + "s";
782  }
783  if (host.status() == dns::kFailOk) {
784  result += " (" + host.name() + ", " + expinfo + ")";
785  } else {
786  result += " (:unresolved:, " + expinfo + ")";
787  }
788  return result;
789 }
790 
791 
796 CURL *DownloadManager::AcquireCurlHandle() {
797  CURL *handle;
798 
799  if (pool_handles_idle_->empty()) {
800  // Create a new handle
801  handle = curl_easy_init();
802  assert(handle != NULL);
803 
804  curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
805  // curl_easy_setopt(curl_default, CURLOPT_FAILONERROR, 1);
806  curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION, CallbackCurlHeader);
807  curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlData);
808  } else {
809  handle = *(pool_handles_idle_->begin());
810  pool_handles_idle_->erase(pool_handles_idle_->begin());
811  }
812 
813  pool_handles_inuse_->insert(handle);
814 
815  return handle;
816 }
817 
818 
819 void DownloadManager::ReleaseCurlHandle(CURL *handle) {
820  set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
821  assert(elem != pool_handles_inuse_->end());
822 
823  if (pool_handles_idle_->size() > pool_max_handles_) {
824  curl_easy_cleanup(*elem);
825  } else {
826  pool_handles_idle_->insert(*elem);
827  }
828 
829  pool_handles_inuse_->erase(elem);
830 }
831 
832 
837 void DownloadManager::InitializeRequest(JobInfo *info, CURL *handle) {
838  // Initialize internal download state
839  info->curl_handle = handle;
840  info->error_code = kFailOk;
841  info->http_code = -1;
842  info->follow_redirects = follow_redirects_;
843  info->num_used_proxies = 1;
844  info->num_used_hosts = 1;
845  info->num_retries = 0;
846  info->backoff_ms = 0;
847  info->headers = header_lists_->DuplicateList(default_headers_);
848  if (info->info_header) {
849  header_lists_->AppendHeader(info->headers, info->info_header);
850  }
851  if (info->force_nocache) {
852  SetNocache(info);
853  } else {
854  info->nocache = false;
855  }
856  if (info->compressed) {
857  zlib::DecompressInit(&(info->zstream));
858  }
859  if (info->expected_hash) {
860  assert(info->hash_context.buffer != NULL);
861  shash::Init(info->hash_context);
862  }
863 
864  if ((info->range_offset != -1) && (info->range_size)) {
865  char byte_range_array[100];
866  const int64_t range_lower = static_cast<int64_t>(info->range_offset);
867  const int64_t range_upper = static_cast<int64_t>(
868  info->range_offset + info->range_size - 1);
869  if (snprintf(byte_range_array, sizeof(byte_range_array),
870  "%" PRId64 "-%" PRId64,
871  range_lower, range_upper) == 100)
872  {
873  PANIC(NULL); // Should be impossible given limits on offset size.
874  }
875  curl_easy_setopt(handle, CURLOPT_RANGE, byte_range_array);
876  } else {
877  curl_easy_setopt(handle, CURLOPT_RANGE, NULL);
878  }
879 
880  // Set curl parameters
881  curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
882  curl_easy_setopt(handle, CURLOPT_WRITEHEADER,
883  static_cast<void *>(info));
884  curl_easy_setopt(handle, CURLOPT_WRITEDATA, static_cast<void *>(info));
885  curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->headers);
886  if (info->head_request) {
887  curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
888  } else {
889  curl_easy_setopt(handle, CURLOPT_HTTPGET, 1);
890  }
891  if (opt_ipv4_only_) {
892  curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
893  }
894  if (follow_redirects_) {
895  curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1);
896  curl_easy_setopt(handle, CURLOPT_MAXREDIRS, 4);
897  }
898 }
899 
900 
905 void DownloadManager::SetUrlOptions(JobInfo *info) {
906  CURL *curl_handle = info->curl_handle;
907  string url_prefix;
908 
909  MutexLockGuard m(lock_options_);
910  // Check if proxy group needs to be reset from backup to primary
911  if (opt_timestamp_backup_proxies_ > 0) {
912  const time_t now = time(NULL);
913  if (static_cast<int64_t>(now) >
914  static_cast<int64_t>(opt_timestamp_backup_proxies_ +
915  opt_proxy_groups_reset_after_))
916  {
917  opt_proxy_groups_current_ = 0;
918  opt_timestamp_backup_proxies_ = 0;
919  RebalanceProxiesUnlocked("reset proxy group");
920  }
921  }
922  // Check if load-balanced proxies within the group need to be reset
923  if (opt_timestamp_failover_proxies_ > 0) {
924  const time_t now = time(NULL);
925  if (static_cast<int64_t>(now) >
926  static_cast<int64_t>(opt_timestamp_failover_proxies_ +
927  opt_proxy_groups_reset_after_))
928  {
929  RebalanceProxiesUnlocked("reset load-balanced proxies");
930  }
931  }
932  // Check if host needs to be reset
933  if (opt_timestamp_backup_host_ > 0) {
934  const time_t now = time(NULL);
935  if (static_cast<int64_t>(now) >
936  static_cast<int64_t>(opt_timestamp_backup_host_ +
937  opt_host_reset_after_))
938  {
940  "switching host from %s to %s (reset host)",
941  (*opt_host_chain_)[opt_host_chain_current_].c_str(),
942  (*opt_host_chain_)[0].c_str());
943  opt_host_chain_current_ = 0;
944  opt_timestamp_backup_host_ = 0;
945  }
946  }
947 
948  ProxyInfo *proxy = ChooseProxyUnlocked(info->expected_hash);
949  if (!proxy || (proxy->url == "DIRECT")) {
950  info->proxy = "DIRECT";
951  curl_easy_setopt(info->curl_handle, CURLOPT_PROXY, "");
952  } else {
953  // Note: inside ValidateProxyIpsUnlocked() we may change the proxy data
954  // structure, so we must not pass proxy->... (== current_proxy())
955  // parameters directly
956  std::string purl = proxy->url;
957  dns::Host phost = proxy->host;
958  const bool changed = ValidateProxyIpsUnlocked(purl, phost);
959  // Current proxy may have changed
960  if (changed)
961  proxy = ChooseProxyUnlocked(info->expected_hash);
962  info->proxy = proxy->url;
963  if (proxy->host.status() == dns::kFailOk) {
964  curl_easy_setopt(info->curl_handle, CURLOPT_PROXY, info->proxy.c_str());
965  } else {
966  // We know it can't work, don't even try to download
967  curl_easy_setopt(info->curl_handle, CURLOPT_PROXY, "0.0.0.0");
968  }
969  }
970  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, opt_low_speed_limit_);
971  if (info->proxy != "DIRECT") {
972  curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_proxy_);
973  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_proxy_);
974  } else {
975  curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_direct_);
976  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_direct_);
977  }
978  if (!opt_dns_server_.empty())
979  curl_easy_setopt(curl_handle, CURLOPT_DNS_SERVERS, opt_dns_server_.c_str());
980 
981  if (info->probe_hosts && opt_host_chain_) {
982  url_prefix = (*opt_host_chain_)[opt_host_chain_current_];
983  info->current_host_chain_index = opt_host_chain_current_;
984  }
985 
986  string url = url_prefix + *(info->url);
987 
988  curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 1L);
989  if (url.substr(0, 5) == "https") {
990  bool rvb = ssl_certificate_store_.ApplySslCertificatePath(curl_handle);
991  if (!rvb) {
993  "Failed to set SSL certificate path %s",
994  ssl_certificate_store_.GetCaPath().c_str());
995  }
996  if (info->pid != -1) {
997  if (credentials_attachment_ == NULL) {
999  "uses secure downloads but no credentials attachment set");
1000  } else {
1001  bool retval = credentials_attachment_->ConfigureCurlHandle(
1002  curl_handle, info->pid, &info->cred_data);
1003  if (!retval) {
1004  LogCvmfs(kLogDownload, kLogDebug, "failed attaching credentials");
1005  }
1006  }
1007  }
1008  // The download manager disables signal handling in the curl library;
1009  // as OpenSSL's implementation of TLS will generate a sigpipe in some
1010  // error paths, we must explicitly disable SIGPIPE here.
1011  // TODO(jblomer): it should be enough to do this once
1012  signal(SIGPIPE, SIG_IGN);
1013  }
1014 
1015  if (url.find("@proxy@") != string::npos) {
1016  // This is used in Geo-API requests (only), to replace a portion of the
1017  // URL with the current proxy name for the sake of caching the result.
1018  // Replace the @proxy@ either with a passed in "forced" template (which
1019  // is set from $CVMFS_PROXY_TEMPLATE) if there is one, or a "direct"
1020  // template (which is the uuid) if there's no proxy, or the name of the
1021  // proxy.
1022  string replacement;
1023  if (proxy_template_forced_ != "") {
1024  replacement = proxy_template_forced_;
1025  } else if (info->proxy == "DIRECT") {
1026  replacement = proxy_template_direct_;
1027  } else {
1028  if (opt_proxy_groups_current_ >= opt_proxy_groups_fallback_) {
1029  // It doesn't make sense to use the fallback proxies in Geo-API requests
1030  // since the fallback proxies are supposed to get sorted, too.
1031  info->proxy = "DIRECT";
1032  curl_easy_setopt(info->curl_handle, CURLOPT_PROXY, "");
1033  replacement = proxy_template_direct_;
1034  } else {
1035  replacement = ChooseProxyUnlocked(info->expected_hash)->host.name();
1036  }
1037  }
1038  replacement = (replacement == "") ? proxy_template_direct_ : replacement;
1039  LogCvmfs(kLogDownload, kLogDebug, "replacing @proxy@ by %s",
1040  replacement.c_str());
1041  url = ReplaceAll(url, "@proxy@", replacement);
1042  }
1043 
1044  if ((info->destination == kDestinationMem) &&
1045  (info->destination_mem.size == 0) &&
1046  HasPrefix(url, "file://", false))
1047  {
1048  info->destination_mem.size = 64*1024;
1049  info->destination_mem.data = static_cast<char *>(smalloc(64*1024));
1050  }
1051 
1052  curl_easy_setopt(curl_handle, CURLOPT_URL, EscapeUrl(url).c_str());
1053 }
1054 
1055 
1065 bool DownloadManager::ValidateProxyIpsUnlocked(
1066  const string &url,
1067  const dns::Host &host)
1068 {
1069  if (!host.IsExpired())
1070  return false;
1071  LogCvmfs(kLogDownload, kLogDebug, "validate DNS entry for %s",
1072  host.name().c_str());
1073 
1074  unsigned group_idx = opt_proxy_groups_current_;
1075  dns::Host new_host = resolver_->Resolve(host.name());
1076 
1077  bool update_only = true; // No changes to the list of IP addresses.
1078  if (new_host.status() != dns::kFailOk) {
1079  // Try again later in case resolving fails.
1081  "failed to resolve IP addresses for %s (%d - %s)",
1082  host.name().c_str(), new_host.status(),
1083  dns::Code2Ascii(new_host.status()));
1084  new_host = dns::Host::ExtendDeadline(host, resolver_->min_ttl());
1085  } else if (!host.IsEquivalent(new_host)) {
1086  update_only = false;
1087  }
1088 
1089  if (update_only) {
1090  for (unsigned i = 0; i < (*opt_proxy_groups_)[group_idx].size(); ++i) {
1091  if ((*opt_proxy_groups_)[group_idx][i].host.id() == host.id())
1092  (*opt_proxy_groups_)[group_idx][i].host = new_host;
1093  }
1094  return false;
1095  }
1096 
1097  assert(new_host.status() == dns::kFailOk);
1098 
1099  // Remove old host objects, insert new objects, and rebalance.
1101  "DNS entries for proxy %s changed, adjusting", host.name().c_str());
1102  vector<ProxyInfo> *group = current_proxy_group();
1103  opt_num_proxies_ -= group->size();
1104  for (unsigned i = 0; i < group->size(); ) {
1105  if ((*group)[i].host.id() == host.id()) {
1106  group->erase(group->begin() + i);
1107  } else {
1108  i++;
1109  }
1110  }
1111  vector<ProxyInfo> new_infos;
1112  set<string> best_addresses = new_host.ViewBestAddresses(opt_ip_preference_);
1113  set<string>::const_iterator iter_ips = best_addresses.begin();
1114  for (; iter_ips != best_addresses.end(); ++iter_ips) {
1115  string url_ip = dns::RewriteUrl(url, *iter_ips);
1116  new_infos.push_back(ProxyInfo(new_host, url_ip));
1117  }
1118  group->insert(group->end(), new_infos.begin(), new_infos.end());
1119  opt_num_proxies_ += new_infos.size();
1120 
1121  RebalanceProxiesUnlocked("DNS change");
1122  return true;
1123 }
1124 
1125 
1129 void DownloadManager::UpdateStatistics(CURL *handle) {
1130  double val;
1131  int retval;
1132  int64_t sum = 0;
1133 
1134  retval = curl_easy_getinfo(handle, CURLINFO_SIZE_DOWNLOAD, &val);
1135  assert(retval == CURLE_OK);
1136  sum += static_cast<int64_t>(val);
1137  /*retval = curl_easy_getinfo(handle, CURLINFO_HEADER_SIZE, &val);
1138  assert(retval == CURLE_OK);
1139  sum += static_cast<int64_t>(val);*/
1140  perf::Xadd(counters_->sz_transferred_bytes, sum);
1141 }
1142 
1143 
1147 bool DownloadManager::CanRetry(const JobInfo *info) {
1148  MutexLockGuard m(lock_options_);
1149  unsigned max_retries = opt_max_retries_;
1150 
1151  return !info->nocache && (info->num_retries < max_retries) &&
1152  (IsProxyTransferError(info->error_code) ||
1154 }
1155 
1163 void DownloadManager::Backoff(JobInfo *info) {
1164  unsigned backoff_init_ms = 0;
1165  unsigned backoff_max_ms = 0;
1166  {
1167  MutexLockGuard m(lock_options_);
1168  backoff_init_ms = opt_backoff_init_ms_;
1169  backoff_max_ms = opt_backoff_max_ms_;
1170  }
1171 
1172  info->num_retries++;
1173  perf::Inc(counters_->n_retries);
1174  if (info->backoff_ms == 0) {
1175  info->backoff_ms = prng_.Next(backoff_init_ms + 1); // Must be != 0
1176  } else {
1177  info->backoff_ms *= 2;
1178  }
1179  if (info->backoff_ms > backoff_max_ms) info->backoff_ms = backoff_max_ms;
1180 
1181  LogCvmfs(kLogDownload, kLogDebug, "backing off for %d ms", info->backoff_ms);
1182  SafeSleepMs(info->backoff_ms);
1183 }
1184 
1185 void DownloadManager::SetNocache(JobInfo *info) {
1186  if (info->nocache)
1187  return;
1188  header_lists_->AppendHeader(info->headers, "Pragma: no-cache");
1189  header_lists_->AppendHeader(info->headers, "Cache-Control: no-cache");
1190  curl_easy_setopt(info->curl_handle, CURLOPT_HTTPHEADER, info->headers);
1191  info->nocache = true;
1192 }
1193 
1194 
1199 void DownloadManager::SetRegularCache(JobInfo *info) {
1200  if (info->nocache == false)
1201  return;
1202  header_lists_->CutHeader("Pragma: no-cache", &(info->headers));
1203  header_lists_->CutHeader("Cache-Control: no-cache", &(info->headers));
1204  curl_easy_setopt(info->curl_handle, CURLOPT_HTTPHEADER, info->headers);
1205  info->nocache = false;
1206 }
1207 
1208 
1212 void DownloadManager::ReleaseCredential(JobInfo *info) {
1213  if (info->cred_data) {
1214  assert(credentials_attachment_ != NULL); // Someone must have set it
1215  credentials_attachment_->ReleaseCurlHandle(info->curl_handle,
1216  info->cred_data);
1217  info->cred_data = NULL;
1218  }
1219 }
1220 
1221 
1228 bool DownloadManager::VerifyAndFinalize(const int curl_error, JobInfo *info) {
1230  "Verify downloaded url %s, proxy %s (curl error %d)",
1231  info->url->c_str(), info->proxy.c_str(), curl_error);
1232  UpdateStatistics(info->curl_handle);
1233 
1234  // Verification and error classification
1235  switch (curl_error) {
1236  case CURLE_OK:
1237  // Verify content hash
1238  if (info->expected_hash) {
1239  shash::Any match_hash;
1240  shash::Final(info->hash_context, &match_hash);
1241  if (match_hash != *(info->expected_hash)) {
1243  "hash verification of %s failed (expected %s, got %s)",
1244  info->url->c_str(), info->expected_hash->ToString().c_str(),
1245  match_hash.ToString().c_str());
1246  info->error_code = kFailBadData;
1247  break;
1248  }
1249  }
1250 
1251  // Decompress memory in a single run
1252  if ((info->destination == kDestinationMem) && info->compressed) {
1253  void *buf;
1254  uint64_t size;
1255  bool retval = zlib::DecompressMem2Mem(
1256  info->destination_mem.data,
1257  static_cast<int64_t>(info->destination_mem.pos),
1258  &buf, &size);
1259  if (retval) {
1260  free(info->destination_mem.data);
1261  info->destination_mem.data = static_cast<char *>(buf);
1262  info->destination_mem.pos = info->destination_mem.size = size;
1263  } else {
1265  "decompression (memory) of url %s failed",
1266  info->url->c_str());
1267  info->error_code = kFailBadData;
1268  break;
1269  }
1270  }
1271 
1272  info->error_code = kFailOk;
1273  break;
1274  case CURLE_UNSUPPORTED_PROTOCOL:
1276  break;
1277  case CURLE_URL_MALFORMAT:
1278  info->error_code = kFailBadUrl;
1279  break;
1280  case CURLE_COULDNT_RESOLVE_PROXY:
1281  info->error_code = kFailProxyResolve;
1282  break;
1283  case CURLE_COULDNT_RESOLVE_HOST:
1284  info->error_code = kFailHostResolve;
1285  break;
1286  case CURLE_OPERATION_TIMEDOUT:
1287  info->error_code = (info->proxy == "DIRECT") ?
1289  break;
1290  case CURLE_PARTIAL_FILE:
1291  case CURLE_GOT_NOTHING:
1292  case CURLE_RECV_ERROR:
1293  info->error_code = (info->proxy == "DIRECT") ?
1295  break;
1296  case CURLE_FILE_COULDNT_READ_FILE:
1297  case CURLE_COULDNT_CONNECT:
1298  if (info->proxy != "DIRECT") {
1299  // This is a guess. Fail-over can still change to switching host
1301  } else {
1303  }
1304  break;
1305  case CURLE_TOO_MANY_REDIRECTS:
1307  break;
1308  case CURLE_SSL_CACERT_BADFILE:
1310  "Failed to load certificate bundle. "
1311  "X509_CERT_BUNDLE might point to the wrong location.");
1313  break;
1314  // As of curl 7.62.0, CURLE_SSL_CACERT is the same as
1315  // CURLE_PEER_FAILED_VERIFICATION
1316  case CURLE_PEER_FAILED_VERIFICATION:
1318  "invalid SSL certificate of remote host. "
1319  "X509_CERT_DIR and/or X509_CERT_BUNDLE might point to the wrong "
1320  "location.");
1322  break;
1323  case CURLE_ABORTED_BY_CALLBACK:
1324  case CURLE_WRITE_ERROR:
1325  // Error set by callback
1326  break;
1327  case CURLE_SEND_ERROR:
1328  // The curl error CURLE_SEND_ERROR can be seen when a cache is misbehaving
1329  // and closing connections before the http request send is completed.
1330  // Handle this error, treating it as a short transfer error.
1331  info->error_code = (info->proxy == "DIRECT") ?
1333  break;
1334  default:
1335  LogCvmfs(kLogDownload, kLogSyslogErr, "unexpected curl error (%d) while "
1336  "trying to fetch %s", curl_error, info->url->c_str());
1337  info->error_code = kFailOther;
1338  break;
1339  }
1340 
1341  std::vector<std::string> *host_chain = opt_host_chain_;
1342 
1343  // Determination if download should be repeated
1344  bool try_again = false;
1345  bool same_url_retry = CanRetry(info);
1346  if (info->error_code != kFailOk) {
1347  MutexLockGuard m(lock_options_);
1348  if (info->error_code == kFailBadData) {
1349  if (!info->nocache) {
1350  try_again = true;
1351  } else {
1352  // Make it a host failure
1354  "data corruption with no-cache header, try another host");
1355 
1356  info->error_code = kFailHostHttp;
1357  }
1358  }
1359  if ( same_url_retry || (
1360  ( (info->error_code == kFailHostResolve) ||
1362  (info->error_code == kFailHostHttp)) &&
1363  info->probe_hosts &&
1364  host_chain && (info->num_used_hosts < host_chain->size()))
1365  )
1366  {
1367  try_again = true;
1368  }
1369  if ( same_url_retry || (
1370  ( (info->error_code == kFailProxyResolve) ||
1372  (info->error_code == kFailProxyHttp)) )
1373  )
1374  {
1375  try_again = true;
1376  // If all proxies failed, do a next round with the next host
1377  if (!same_url_retry && (info->num_used_proxies >= opt_num_proxies_)) {
1378  // Check if this can be made a host fail-over
1379  if (info->probe_hosts &&
1380  host_chain &&
1381  (info->num_used_hosts < host_chain->size()))
1382  {
1383  // reset proxy group if not already performed by other handle
1384  if (opt_proxy_groups_) {
1385  if ((opt_proxy_groups_current_ > 0) ||
1386  (opt_proxy_groups_current_burned_ > 0))
1387  {
1388  opt_proxy_groups_current_ = 0;
1389  opt_timestamp_backup_proxies_ = 0;
1390  RebalanceProxiesUnlocked("reset proxies for host failover");
1391  }
1392  }
1393 
1394  // Make it a host failure
1395  LogCvmfs(kLogDownload, kLogDebug, "make it a host failure");
1396  info->num_used_proxies = 1;
1398  } else {
1399  try_again = false;
1400  }
1401  } // Make a proxy failure a host failure
1402  } // Proxy failure assumed
1403  }
1404 
1405  if (try_again) {
1406  LogCvmfs(kLogDownload, kLogDebug, "Trying again on same curl handle, "
1407  "same url: %d, error code %d", same_url_retry, info->error_code);
1408  // Reset internal state and destination
1409  if ((info->destination == kDestinationMem) && info->destination_mem.data) {
1410  free(info->destination_mem.data);
1411  info->destination_mem.data = NULL;
1412  info->destination_mem.size = 0;
1413  info->destination_mem.pos = 0;
1414  }
1415  if (info->interrupt_cue && info->interrupt_cue->IsCanceled()) {
1416  info->error_code = kFailCanceled;
1417  goto verify_and_finalize_stop;
1418  }
1419  if ((info->destination == kDestinationFile) ||
1420  (info->destination == kDestinationPath))
1421  {
1422  if ((fflush(info->destination_file) != 0) ||
1423  (ftruncate(fileno(info->destination_file), 0) != 0) ||
1424  (freopen(NULL, "w", info->destination_file) != info->destination_file)
1425  )
1426  {
1427  info->error_code = kFailLocalIO;
1428  goto verify_and_finalize_stop;
1429  }
1430  }
1431  if (info->destination == kDestinationSink) {
1432  if (info->destination_sink->Reset() != 0) {
1433  info->error_code = kFailLocalIO;
1434  goto verify_and_finalize_stop;
1435  }
1436  }
1437  if (info->expected_hash)
1438  shash::Init(info->hash_context);
1439  if (info->compressed)
1440  zlib::DecompressInit(&info->zstream);
1441  SetRegularCache(info);
1442 
1443  // Failure handling
1444  bool switch_proxy = false;
1445  bool switch_host = false;
1446  switch (info->error_code) {
1447  case kFailBadData:
1448  SetNocache(info);
1449  break;
1450  case kFailProxyResolve:
1451  case kFailProxyHttp:
1452  switch_proxy = true;
1453  break;
1454  case kFailHostResolve:
1455  case kFailHostHttp:
1456  case kFailHostAfterProxy:
1457  switch_host = true;
1458  break;
1459  default:
1460  if (IsProxyTransferError(info->error_code)) {
1461  if (same_url_retry) {
1462  Backoff(info);
1463  } else {
1464  switch_proxy = true;
1465  }
1466  } else if (IsHostTransferError(info->error_code)) {
1467  if (same_url_retry) {
1468  Backoff(info);
1469  } else {
1470  switch_host = true;
1471  }
1472  } else {
1473  // No other errors expected when retrying
1474  PANIC(NULL);
1475  }
1476  }
1477  if (switch_proxy) {
1478  ReleaseCredential(info);
1479  SwitchProxy(info);
1480  info->num_used_proxies++;
1481  SetUrlOptions(info);
1482  }
1483  if (switch_host) {
1484  ReleaseCredential(info);
1485  SwitchHost(info);
1486  info->num_used_hosts++;
1487  SetUrlOptions(info);
1488  }
1489 
1490  return true; // try again
1491  }
1492 
1493  verify_and_finalize_stop:
1494  // Finalize, flush destination file
1495  ReleaseCredential(info);
1496  if ((info->destination == kDestinationFile) &&
1497  fflush(info->destination_file) != 0)
1498  {
1499  info->error_code = kFailLocalIO;
1500  } else if (info->destination == kDestinationPath) {
1501  if (fclose(info->destination_file) != 0)
1502  info->error_code = kFailLocalIO;
1503  info->destination_file = NULL;
1504  }
1505 
1506  if (info->compressed)
1507  zlib::DecompressFini(&info->zstream);
1508 
1509  if (info->headers) {
1510  header_lists_->PutList(info->headers);
1511  info->headers = NULL;
1512  }
1513 
1514  return false; // stop transfer and return to Fetch()
1515 }
1516 
1517 
1518 DownloadManager::DownloadManager() {
1519  pool_handles_idle_ = NULL;
1520  pool_handles_inuse_ = NULL;
1521  pool_max_handles_ = 0;
1522  curl_multi_ = NULL;
1523  default_headers_ = NULL;
1524 
1525  atomic_init32(&multi_threaded_);
1526  pipe_terminate_[0] = pipe_terminate_[1] = -1;
1527 
1528  pipe_jobs_[0] = pipe_jobs_[1] = -1;
1529  watch_fds_ = NULL;
1530  watch_fds_size_ = 0;
1531  watch_fds_inuse_ = 0;
1532  watch_fds_max_ = 0;
1533 
1534  lock_options_ =
1535  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1536  int retval = pthread_mutex_init(lock_options_, NULL);
1537  assert(retval == 0);
1538  lock_synchronous_mode_ =
1539  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1540  retval = pthread_mutex_init(lock_synchronous_mode_, NULL);
1541  assert(retval == 0);
1542 
1543  opt_dns_server_ = "";
1544  opt_ip_preference_ = dns::kIpPreferSystem;
1545  opt_timeout_proxy_ = 0;
1546  opt_timeout_direct_ = 0;
1547  opt_low_speed_limit_ = 0;
1548  opt_host_chain_ = NULL;
1549  opt_host_chain_rtt_ = NULL;
1550  opt_host_chain_current_ = 0;
1551  opt_proxy_groups_ = NULL;
1552  opt_proxy_groups_current_ = 0;
1553  opt_proxy_groups_current_burned_ = 0;
1554  opt_num_proxies_ = 0;
1555  opt_proxy_shard_ = false;
1556  opt_max_retries_ = 0;
1557  opt_backoff_init_ms_ = 0;
1558  opt_backoff_max_ms_ = 0;
1559  enable_info_header_ = false;
1560  opt_ipv4_only_ = false;
1561  follow_redirects_ = false;
1562 
1563  resolver_ = NULL;
1564 
1565  opt_timestamp_backup_proxies_ = 0;
1566  opt_timestamp_failover_proxies_ = 0;
1567  opt_proxy_groups_reset_after_ = 0;
1568  opt_timestamp_backup_host_ = 0;
1569  opt_host_reset_after_ = 0;
1570 
1571  credentials_attachment_ = NULL;
1572 
1573  counters_ = NULL;
1574 }
1575 
1576 
1577 DownloadManager::~DownloadManager() {
1578  pthread_mutex_destroy(lock_options_);
1579  pthread_mutex_destroy(lock_synchronous_mode_);
1580  free(lock_options_);
1581  free(lock_synchronous_mode_);
1582 }
1583 
1584 void DownloadManager::InitHeaders() {
1585  // User-Agent
1586  string cernvm_id = "User-Agent: cvmfs ";
1587 #ifdef CVMFS_LIBCVMFS
1588  cernvm_id += "libcvmfs ";
1589 #else
1590  cernvm_id += "Fuse ";
1591 #endif
1592  cernvm_id += string(VERSION);
1593  if (getenv("CERNVM_UUID") != NULL) {
1594  cernvm_id += " " +
1595  sanitizer::InputSanitizer("az AZ 09 -").Filter(getenv("CERNVM_UUID"));
1596  }
1597  user_agent_ = strdup(cernvm_id.c_str());
1598 
1599  header_lists_ = new HeaderLists();
1600 
1601  default_headers_ = header_lists_->GetList("Connection: Keep-Alive");
1602  header_lists_->AppendHeader(default_headers_, "Pragma:");
1603  header_lists_->AppendHeader(default_headers_, user_agent_);
1604 }
1605 
1606 
1607 void DownloadManager::FiniHeaders() {
1608  delete header_lists_;
1609  header_lists_ = NULL;
1610  default_headers_ = NULL;
1611 }
1612 
1613 
1614 void DownloadManager::Init(const unsigned max_pool_handles,
1615  const perf::StatisticsTemplate &statistics)
1616 {
1617  atomic_init32(&multi_threaded_);
1618  int retval = curl_global_init(CURL_GLOBAL_ALL);
1619  assert(retval == CURLE_OK);
1620  pool_handles_idle_ = new set<CURL *>;
1621  pool_handles_inuse_ = new set<CURL *>;
1622  pool_max_handles_ = max_pool_handles;
1623  watch_fds_max_ = 4*pool_max_handles_;
1624 
1625  opt_timeout_proxy_ = 5;
1626  opt_timeout_direct_ = 10;
1627  opt_low_speed_limit_ = 1024;
1628  opt_proxy_groups_current_ = 0;
1629  opt_proxy_groups_current_burned_ = 0;
1630  opt_num_proxies_ = 0;
1631  opt_proxy_shard_ = false;
1632  opt_host_chain_current_ = 0;
1633  opt_ip_preference_ = dns::kIpPreferSystem;
1634 
1635  counters_ = new Counters(statistics);
1636 
1637  user_agent_ = NULL;
1638  InitHeaders();
1639 
1640  curl_multi_ = curl_multi_init();
1641  assert(curl_multi_ != NULL);
1642  curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION, CallbackCurlSocket);
1643  curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1644  static_cast<void *>(this));
1645  curl_multi_setopt(curl_multi_, CURLMOPT_MAXCONNECTS, watch_fds_max_);
1646  curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1647  pool_max_handles_);
1648 
1649  prng_.InitLocaltime();
1650 
1651  // Name resolving
1652  if ((getenv("CVMFS_IPV4_ONLY") != NULL) &&
1653  (strlen(getenv("CVMFS_IPV4_ONLY")) > 0))
1654  {
1655  opt_ipv4_only_ = true;
1656  }
1657  resolver_ = dns::NormalResolver::Create(opt_ipv4_only_,
1658  kDnsDefaultRetries, kDnsDefaultTimeoutMs);
1659  assert(resolver_);
1660 }
1661 
1662 
1664  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1665  // Shutdown I/O thread
1666  char buf = 'T';
1667  WritePipe(pipe_terminate_[1], &buf, 1);
1668  pthread_join(thread_download_, NULL);
1669  // All handles are removed from the multi stack
1670  close(pipe_terminate_[1]);
1671  close(pipe_terminate_[0]);
1672  close(pipe_jobs_[1]);
1673  close(pipe_jobs_[0]);
1674  }
1675 
1676  for (set<CURL *>::iterator i = pool_handles_idle_->begin(),
1677  iEnd = pool_handles_idle_->end(); i != iEnd; ++i)
1678  {
1679  curl_easy_cleanup(*i);
1680  }
1681  delete pool_handles_idle_;
1682  delete pool_handles_inuse_;
1683  curl_multi_cleanup(curl_multi_);
1684  pool_handles_idle_ = NULL;
1685  pool_handles_inuse_ = NULL;
1686  curl_multi_ = NULL;
1687 
1688  FiniHeaders();
1689  if (user_agent_)
1690  free(user_agent_);
1691  user_agent_ = NULL;
1692 
1693  delete counters_;
1694  counters_ = NULL;
1695 
1696  delete opt_host_chain_;
1697  delete opt_host_chain_rtt_;
1698  opt_proxy_map_.clear();
1699  delete opt_proxy_groups_;
1700  opt_host_chain_ = NULL;
1701  opt_host_chain_rtt_ = NULL;
1702  opt_proxy_groups_ = NULL;
1703 
1704  curl_global_cleanup();
1705 
1706  delete resolver_;
1707  resolver_ = NULL;
1708 }
1709 
1710 
1716  MakePipe(pipe_terminate_);
1717  MakePipe(pipe_jobs_);
1718 
1719  int retval = pthread_create(&thread_download_, NULL, MainDownload,
1720  static_cast<void *>(this));
1721  assert(retval == 0);
1722 
1723  atomic_inc32(&multi_threaded_);
1724 }
1725 
1726 
1731  assert(info != NULL);
1732  assert(info->url != NULL);
1733 
1734  Failures result;
1735  result = PrepareDownloadDestination(info);
1736  if (result != kFailOk)
1737  return result;
1738 
1739  if (info->expected_hash) {
1742  info->hash_context.size = shash::GetContextSize(algorithm);
1743  info->hash_context.buffer = alloca(info->hash_context.size);
1744  }
1745 
1746  // Prepare cvmfs-info: header, allocate string on the stack
1747  info->info_header = NULL;
1748  if (enable_info_header_ && info->extra_info) {
1749  const char *header_name = "cvmfs-info: ";
1750  const size_t header_name_len = strlen(header_name);
1751  const unsigned header_size = 1 + header_name_len +
1752  EscapeHeader(*(info->extra_info), NULL, 0);
1753  info->info_header = static_cast<char *>(alloca(header_size));
1754  memcpy(info->info_header, header_name, header_name_len);
1755  EscapeHeader(*(info->extra_info), info->info_header + header_name_len,
1756  header_size - header_name_len);
1757  info->info_header[header_size-1] = '\0';
1758  }
1759 
1760  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1761  if (info->wait_at[0] == -1) {
1762  MakePipe(info->wait_at);
1763  }
1764 
1765  // LogCvmfs(kLogDownload, kLogDebug, "send job to thread, pipe %d %d",
1766  // info->wait_at[0], info->wait_at[1]);
1767  // NOLINTNEXTLINE(bugprone-sizeof-expression)
1768  WritePipe(pipe_jobs_[1], &info, sizeof(info));
1769  ReadPipe(info->wait_at[0], &result, sizeof(result));
1770  // LogCvmfs(kLogDownload, kLogDebug, "got result %d", result);
1771  } else {
1772  MutexLockGuard l(lock_synchronous_mode_);
1773  CURL *handle = AcquireCurlHandle();
1774  InitializeRequest(info, handle);
1775  SetUrlOptions(info);
1776  // curl_easy_setopt(handle, CURLOPT_VERBOSE, 1);
1777  int retval;
1778  do {
1779  retval = curl_easy_perform(handle);
1780  perf::Inc(counters_->n_requests);
1781  double elapsed;
1782  if (curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &elapsed) == CURLE_OK)
1783  {
1784  perf::Xadd(counters_->sz_transfer_time,
1785  static_cast<int64_t>(elapsed * 1000));
1786  }
1787  } while (VerifyAndFinalize(retval, info));
1788  result = info->error_code;
1789  ReleaseCurlHandle(info->curl_handle);
1790  }
1791 
1792  if (result != kFailOk) {
1793  LogCvmfs(kLogDownload, kLogDebug, "download failed (error %d - %s)", result,
1794  Code2Ascii(result));
1795 
1796  if (info->destination == kDestinationPath)
1797  unlink(info->destination_path->c_str());
1798 
1799  if (info->destination_mem.data) {
1800  free(info->destination_mem.data);
1801  info->destination_mem.data = NULL;
1802  info->destination_mem.size = 0;
1803  }
1804  }
1805 
1806  return result;
1807 }
1808 
1809 
1814 void DownloadManager::SetCredentialsAttachment(CredentialsAttachment *ca) {
1815  MutexLockGuard m(lock_options_);
1816  credentials_attachment_ = ca;
1817 }
1818 
1822 std::string DownloadManager::GetDnsServer() const {
1823  return opt_dns_server_;
1824 }
1825 
1830 void DownloadManager::SetDnsServer(const string &address) {
1831  if (!address.empty()) {
1832  MutexLockGuard m(lock_options_);
1833  opt_dns_server_ = address;
1834  assert(!opt_dns_server_.empty());
1835 
1836  vector<string> servers;
1837  servers.push_back(address);
1838  bool retval = resolver_->SetResolvers(servers);
1839  assert(retval);
1840  }
1841  LogCvmfs(kLogDownload, kLogSyslog, "set nameserver to %s", address.c_str());
1842 }
1843 
1844 
1848 void DownloadManager::SetDnsParameters(
1849  const unsigned retries,
1850  const unsigned timeout_ms)
1851 {
1852  MutexLockGuard m(lock_options_);
1853  if ((resolver_->retries() == retries) &&
1854  (resolver_->timeout_ms() == timeout_ms))
1855  {
1856  return;
1857  }
1858  delete resolver_;
1859  resolver_ = NULL;
1860  resolver_ =
1861  dns::NormalResolver::Create(opt_ipv4_only_, retries, timeout_ms);
1862  assert(resolver_);
1863 }
1864 
1865 
1866 void DownloadManager::SetDnsTtlLimits(
1867  const unsigned min_seconds,
1868  const unsigned max_seconds)
1869 {
1870  MutexLockGuard m(lock_options_);
1871  resolver_->set_min_ttl(min_seconds);
1872  resolver_->set_max_ttl(max_seconds);
1873 }
1874 
1875 
1876 void DownloadManager::SetIpPreference(dns::IpPreference preference) {
1877  MutexLockGuard m(lock_options_);
1878  opt_ip_preference_ = preference;
1879 }
1880 
1881 
1887 void DownloadManager::SetTimeout(const unsigned seconds_proxy,
1888  const unsigned seconds_direct)
1889 {
1890  MutexLockGuard m(lock_options_);
1891  opt_timeout_proxy_ = seconds_proxy;
1892  opt_timeout_direct_ = seconds_direct;
1893 }
1894 
1895 
1901 void DownloadManager::SetLowSpeedLimit(const unsigned low_speed_limit) {
1902  MutexLockGuard m(lock_options_);
1903  opt_low_speed_limit_ = low_speed_limit;
1904 }
1905 
1906 
1910 void DownloadManager::GetTimeout(unsigned *seconds_proxy,
1911  unsigned *seconds_direct)
1912 {
1913  MutexLockGuard m(lock_options_);
1914  *seconds_proxy = opt_timeout_proxy_;
1915  *seconds_direct = opt_timeout_direct_;
1916 }
1917 
1918 
1923 void DownloadManager::SetHostChain(const string &host_list) {
1924  SetHostChain(SplitString(host_list, ';'));
1925 }
1926 
1927 
1928 void DownloadManager::SetHostChain(const std::vector<std::string> &host_list) {
1929  MutexLockGuard m(lock_options_);
1930  opt_timestamp_backup_host_ = 0;
1931  delete opt_host_chain_;
1932  delete opt_host_chain_rtt_;
1933  opt_host_chain_current_ = 0;
1934 
1935  if (host_list.empty()) {
1936  opt_host_chain_ = NULL;
1937  opt_host_chain_rtt_ = NULL;
1938  return;
1939  }
1940 
1941  opt_host_chain_ = new vector<string>(host_list);
1942  opt_host_chain_rtt_ =
1943  new vector<int>(opt_host_chain_->size(), kProbeUnprobed);
1944  // LogCvmfs(kLogDownload, kLogSyslog, "using host %s",
1945  // (*opt_host_chain_)[0].c_str());
1946 }
1947 
1948 
1949 
1954 void DownloadManager::GetHostInfo(vector<string> *host_chain, vector<int> *rtt,
1955  unsigned *current_host)
1956 {
1957  MutexLockGuard m(lock_options_);
1958  if (opt_host_chain_) {
1959  if (current_host) {*current_host = opt_host_chain_current_;}
1960  if (host_chain) {*host_chain = *opt_host_chain_;}
1961  if (rtt) {*rtt = *opt_host_chain_rtt_;}
1962  }
1963 }
1964 
1965 
1974 void DownloadManager::SwitchProxy(JobInfo *info) {
1975  MutexLockGuard m(lock_options_);
1976 
1977  if (!opt_proxy_groups_) {
1978  return;
1979  }
1980 
1981  // Fail any matching proxies within the current load-balancing group
1982  vector<ProxyInfo> *group = current_proxy_group();
1983  const unsigned group_size = group->size();
1984  unsigned failed = 0;
1985  for (unsigned i = 0; i < group_size - opt_proxy_groups_current_burned_; ++i) {
1986  if (info && (info->proxy == (*group)[i].url)) {
1987  // Move to list of failed proxies
1988  opt_proxy_groups_current_burned_++;
1989  swap((*group)[i],
1990  (*group)[group_size - opt_proxy_groups_current_burned_]);
1991  perf::Inc(counters_->n_proxy_failover);
1992  failed++;
1993  }
1994  }
1995 
1996  // Do nothing more unless at least one proxy was marked as failed
1997  if (!failed)
1998  return;
1999 
2000  // If all proxies from the current load-balancing group are burned, switch to
2001  // another group
2002  if (opt_proxy_groups_current_burned_ == group->size()) {
2003  opt_proxy_groups_current_burned_ = 0;
2004  if (opt_proxy_groups_->size() > 1) {
2005  opt_proxy_groups_current_ = (opt_proxy_groups_current_ + 1) %
2006  opt_proxy_groups_->size();
2007  // Remeber the timestamp of switching to backup proxies
2008  if (opt_proxy_groups_reset_after_ > 0) {
2009  if (opt_proxy_groups_current_ > 0) {
2010  if (opt_timestamp_backup_proxies_ == 0)
2011  opt_timestamp_backup_proxies_ = time(NULL);
2012  // LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2013  // "switched to (another) backup proxy group");
2014  } else {
2015  opt_timestamp_backup_proxies_ = 0;
2016  // LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2017  // "switched back to primary proxy group");
2018  }
2019  opt_timestamp_failover_proxies_ = 0;
2020  }
2021  }
2022  } else {
2023  // Record failover time
2024  if (opt_proxy_groups_reset_after_ > 0) {
2025  if (opt_timestamp_failover_proxies_ == 0)
2026  opt_timestamp_failover_proxies_ = time(NULL);
2027  }
2028  }
2029 
2030  UpdateProxiesUnlocked("failed proxy");
2031  LogCvmfs(kLogDownload, kLogDebug, "%d proxies remain in group",
2032  current_proxy_group()->size() - opt_proxy_groups_current_burned_);
2033 }
2034 
2035 
2041 void DownloadManager::SwitchHost(JobInfo *info) {
2042  MutexLockGuard m(lock_options_);
2043 
2044  if (!opt_host_chain_ || (opt_host_chain_->size() == 1)) {
2045  return;
2046  }
2047 
2048  if (info && (info->current_host_chain_index != opt_host_chain_current_)) {
2050  "don't switch host, "
2051  "last used host: %s, current host: %s",
2052  (*opt_host_chain_)[info->current_host_chain_index].c_str(),
2053  (*opt_host_chain_)[opt_host_chain_current_].c_str());
2054  return;
2055  }
2056 
2057  string reason = "manually triggered";
2058  if (info) {
2059  reason = download::Code2Ascii(info->error_code);
2060  }
2061 
2062  string old_host = (*opt_host_chain_)[opt_host_chain_current_];
2063  opt_host_chain_current_ =
2064  (opt_host_chain_current_ + 1) % opt_host_chain_->size();
2065  perf::Inc(counters_->n_host_failover);
2067  "switching host from %s to %s (%s)", old_host.c_str(),
2068  (*opt_host_chain_)[opt_host_chain_current_].c_str(),
2069  reason.c_str());
2070 
2071  // Remember the timestamp of switching to backup host
2072  if (opt_host_reset_after_ > 0) {
2073  if (opt_host_chain_current_ != 0) {
2074  if (opt_timestamp_backup_host_ == 0)
2075  opt_timestamp_backup_host_ = time(NULL);
2076  } else {
2077  opt_timestamp_backup_host_ = 0;
2078  }
2079  }
2080 }
2081 
2082 void DownloadManager::SwitchHost() {
2083  SwitchHost(NULL);
2084 }
2085 
2086 
2093 void DownloadManager::ProbeHosts() {
2094  vector<string> host_chain;
2095  vector<int> host_rtt;
2096  unsigned current_host;
2097 
2098  GetHostInfo(&host_chain, &host_rtt, &current_host);
2099 
2100  // Stopwatch, two times to fill caches first
2101  unsigned i, retries;
2102  string url;
2103  JobInfo info(&url, false, false, NULL);
2104  for (retries = 0; retries < 2; ++retries) {
2105  for (i = 0; i < host_chain.size(); ++i) {
2106  url = host_chain[i] + "/.cvmfspublished";
2107 
2108  struct timeval tv_start, tv_end;
2109  gettimeofday(&tv_start, NULL);
2110  Failures result = Fetch(&info);
2111  gettimeofday(&tv_end, NULL);
2112  if (info.destination_mem.data)
2113  free(info.destination_mem.data);
2114  if (result == kFailOk) {
2115  host_rtt[i] = static_cast<int>(
2116  DiffTimeSeconds(tv_start, tv_end) * 1000);
2117  LogCvmfs(kLogDownload, kLogDebug, "probing host %s had %dms rtt",
2118  url.c_str(), host_rtt[i]);
2119  } else {
2120  LogCvmfs(kLogDownload, kLogDebug, "error while probing host %s: %d %s",
2121  url.c_str(), result, Code2Ascii(result));
2122  host_rtt[i] = INT_MAX;
2123  }
2124  }
2125  }
2126 
2127  SortTeam(&host_rtt, &host_chain);
2128  for (i = 0; i < host_chain.size(); ++i) {
2129  if (host_rtt[i] == INT_MAX) host_rtt[i] = kProbeDown;
2130  }
2131 
2132  MutexLockGuard m(lock_options_);
2133  delete opt_host_chain_;
2134  delete opt_host_chain_rtt_;
2135  opt_host_chain_ = new vector<string>(host_chain);
2136  opt_host_chain_rtt_ = new vector<int>(host_rtt);
2137  opt_host_chain_current_ = 0;
2138 }
2139 
2140 bool DownloadManager::GeoSortServers(std::vector<std::string> *servers,
2141  std::vector<uint64_t> *output_order) {
2142  if (!servers) {return false;}
2143  if (servers->size() == 1) {
2144  if (output_order) {
2145  output_order->clear();
2146  output_order->push_back(0);
2147  }
2148  return true;
2149  }
2150 
2151  std::vector<std::string> host_chain;
2152  GetHostInfo(&host_chain, NULL, NULL);
2153 
2154  std::vector<std::string> server_dns_names;
2155  server_dns_names.reserve(servers->size());
2156  for (unsigned i = 0; i < servers->size(); ++i) {
2157  std::string host = dns::ExtractHost((*servers)[i]);
2158  server_dns_names.push_back(host.empty() ? (*servers)[i] : host);
2159  }
2160  std::string host_list = JoinStrings(server_dns_names, ",");
2161 
2162  vector<string> host_chain_shuffled;
2163  {
2164  // Protect against concurrent access to prng_
2165  MutexLockGuard m(lock_options_);
2166  // Determine random hosts for the Geo-API query
2167  host_chain_shuffled = Shuffle(host_chain, &prng_);
2168  }
2169  // Request ordered list via Geo-API
2170  bool success = false;
2171  unsigned max_attempts = std::min(host_chain_shuffled.size(), size_t(3));
2172  vector<uint64_t> geo_order(servers->size());
2173  for (unsigned i = 0; i < max_attempts; ++i) {
2174  string url = host_chain_shuffled[i] + "/api/v1.0/geo/@proxy@/" + host_list;
2176  "requesting ordered server list from %s", url.c_str());
2177  JobInfo info(&url, false, false, NULL);
2178  Failures result = Fetch(&info);
2179  if (result == kFailOk) {
2180  string order(info.destination_mem.data, info.destination_mem.size);
2181  free(info.destination_mem.data);
2182  bool retval = ValidateGeoReply(order, servers->size(), &geo_order);
2183  if (!retval) {
2185  "retrieved invalid GeoAPI reply from %s [%s]",
2186  url.c_str(), order.c_str());
2187  } else {
2189  "geographic order of servers retrieved from %s",
2190  dns::ExtractHost(host_chain_shuffled[i]).c_str());
2191  LogCvmfs(kLogDownload, kLogDebug, "order is %s", order.c_str());
2192  success = true;
2193  break;
2194  }
2195  } else {
2197  "GeoAPI request %s failed with error %d [%s]",
2198  url.c_str(), result, Code2Ascii(result));
2199  }
2200  }
2201  if (!success) {
2203  "failed to retrieve geographic order from stratum 1 servers");
2204  return false;
2205  }
2206 
2207  if (output_order) {
2208  output_order->swap(geo_order);
2209  } else {
2210  std::vector<std::string> sorted_servers;
2211  sorted_servers.reserve(geo_order.size());
2212  for (unsigned i = 0; i < geo_order.size(); ++i) {
2213  uint64_t orderval = geo_order[i];
2214  sorted_servers.push_back((*servers)[orderval]);
2215  }
2216  servers->swap(sorted_servers);
2217  }
2218  return true;
2219 }
2220 
2221 
2229 bool DownloadManager::ProbeGeo() {
2230  vector<string> host_chain;
2231  vector<int> host_rtt;
2232  unsigned current_host;
2233  vector< vector<ProxyInfo> > proxy_chain;
2234  unsigned fallback_group;
2235 
2236  GetHostInfo(&host_chain, &host_rtt, &current_host);
2237  GetProxyInfo(&proxy_chain, NULL, &fallback_group);
2238  if ((host_chain.size() < 2) && ((proxy_chain.size() - fallback_group) < 2))
2239  return true;
2240 
2241  vector<string> host_names;
2242  for (unsigned i = 0; i < host_chain.size(); ++i)
2243  host_names.push_back(dns::ExtractHost(host_chain[i]));
2244  SortTeam(&host_names, &host_chain);
2245  unsigned last_geo_host = host_names.size();
2246 
2247  if ((fallback_group == 0) && (last_geo_host > 1)) {
2248  // There are no non-fallback proxies, which means that the client
2249  // will always use the fallback proxies. Add a keyword separator
2250  // between the hosts and fallback proxies so the geosorting service
2251  // will know to sort the hosts based on the distance from the
2252  // closest fallback proxy rather than the distance from the client.
2253  host_names.push_back("+PXYSEP+");
2254  }
2255 
2256  // Add fallback proxy names to the end of the host list
2257  unsigned first_geo_fallback = host_names.size();
2258  for (unsigned i = fallback_group; i < proxy_chain.size(); ++i) {
2259  // We only take the first fallback proxy name from every group under the
2260  // assumption that load-balanced servers are at the same location
2261  host_names.push_back(proxy_chain[i][0].host.name());
2262  }
2263 
2264  std::vector<uint64_t> geo_order;
2265  bool success = GeoSortServers(&host_names, &geo_order);
2266  if (!success) {
2267  // GeoSortServers already logged a failure message.
2268  return false;
2269  }
2270 
2271  // Re-install host chain and proxy chain
2272  MutexLockGuard m(lock_options_);
2273  delete opt_host_chain_;
2274  opt_num_proxies_ = 0;
2275  opt_host_chain_ = new vector<string>(host_chain.size());
2276 
2277  // It's possible that opt_proxy_groups_fallback_ might have changed while
2278  // the lock wasn't held
2279  vector<vector<ProxyInfo> > *proxy_groups = new vector<vector<ProxyInfo> >(
2280  opt_proxy_groups_fallback_ + proxy_chain.size() - fallback_group);
2281  // First copy the non-fallback part of the current proxy chain
2282  for (unsigned i = 0; i < opt_proxy_groups_fallback_; ++i) {
2283  (*proxy_groups)[i] = (*opt_proxy_groups_)[i];
2284  opt_num_proxies_ += (*opt_proxy_groups_)[i].size();
2285  }
2286 
2287  // Copy the host chain and fallback proxies by geo order. Array indices
2288  // in geo_order that are smaller than last_geo_host refer to a stratum 1,
2289  // and those indices greater than or equal to first_geo_fallback refer to
2290  // a fallback proxy.
2291  unsigned hosti = 0;
2292  unsigned proxyi = opt_proxy_groups_fallback_;
2293  for (unsigned i = 0; i < geo_order.size(); ++i) {
2294  uint64_t orderval = geo_order[i];
2295  if (orderval < static_cast<uint64_t>(last_geo_host)) {
2296  // LogCvmfs(kLogCvmfs, kLogSyslog, "this is orderval %u at host index
2297  // %u", orderval, hosti);
2298  (*opt_host_chain_)[hosti++] = host_chain[orderval];
2299  } else if (orderval >= static_cast<uint64_t>(first_geo_fallback)) {
2300  // LogCvmfs(kLogCvmfs, kLogSyslog,
2301  // "this is orderval %u at proxy index %u, using proxy_chain index %u",
2302  // orderval, proxyi, fallback_group + orderval - first_geo_fallback);
2303  (*proxy_groups)[proxyi] =
2304  proxy_chain[fallback_group + orderval - first_geo_fallback];
2305  opt_num_proxies_ += (*proxy_groups)[proxyi].size();
2306  proxyi++;
2307  }
2308  }
2309 
2310  opt_proxy_map_.clear();
2311  delete opt_proxy_groups_;
2312  opt_proxy_groups_ = proxy_groups;
2313  // In pathological cases, opt_proxy_groups_current_ can be larger now when
2314  // proxies changed in-between.
2315  if (opt_proxy_groups_current_ > opt_proxy_groups_->size()) {
2316  if (opt_proxy_groups_->size() == 0) {
2317  opt_proxy_groups_current_ = 0;
2318  } else {
2319  opt_proxy_groups_current_ = opt_proxy_groups_->size() - 1;
2320  }
2321  opt_proxy_groups_current_burned_ = 0;
2322  }
2323 
2324  UpdateProxiesUnlocked("geosort");
2325 
2326  delete opt_host_chain_rtt_;
2327  opt_host_chain_rtt_ = new vector<int>(host_chain.size(), kProbeGeo);
2328  opt_host_chain_current_ = 0;
2329 
2330  return true;
2331 }
2332 
2333 
2341 bool DownloadManager::ValidateGeoReply(
2342  const string &reply_order,
2343  const unsigned expected_size,
2344  vector<uint64_t> *reply_vals)
2345 {
2346  if (reply_order.empty())
2347  return false;
2348  sanitizer::InputSanitizer sanitizer("09 , \n");
2349  if (!sanitizer.IsValid(reply_order))
2350  return false;
2351  sanitizer::InputSanitizer strip_newline("09 ,");
2352  vector<string> reply_strings =
2353  SplitString(strip_newline.Filter(reply_order), ',');
2354  vector<uint64_t> tmp_vals;
2355  for (unsigned i = 0; i < reply_strings.size(); ++i) {
2356  if (reply_strings[i].empty())
2357  return false;
2358  tmp_vals.push_back(String2Uint64(reply_strings[i]));
2359  }
2360  if (tmp_vals.size() != expected_size)
2361  return false;
2362 
2363  // Check if tmp_vals contains the number 1..n
2364  set<uint64_t> coverage(tmp_vals.begin(), tmp_vals.end());
2365  if (coverage.size() != tmp_vals.size())
2366  return false;
2367  if ((*coverage.begin() != 1) || (*coverage.rbegin() != coverage.size()))
2368  return false;
2369 
2370  for (unsigned i = 0; i < expected_size; ++i) {
2371  (*reply_vals)[i] = tmp_vals[i] - 1;
2372  }
2373  return true;
2374 }
2375 
2376 
2381 bool DownloadManager::StripDirect(
2382  const string &proxy_list,
2383  string *cleaned_list)
2384 {
2385  assert(cleaned_list);
2386  if (proxy_list == "") {
2387  *cleaned_list = "";
2388  return false;
2389  }
2390  bool result = false;
2391 
2392  vector<string> proxy_groups = SplitString(proxy_list, ';');
2393  vector<string> cleaned_groups;
2394  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2395  vector<string> group = SplitString(proxy_groups[i], '|');
2396  vector<string> cleaned;
2397  for (unsigned j = 0; j < group.size(); ++j) {
2398  if ((group[j] == "DIRECT") || (group[j] == "")) {
2399  result = true;
2400  } else {
2401  cleaned.push_back(group[j]);
2402  }
2403  }
2404  if (!cleaned.empty())
2405  cleaned_groups.push_back(JoinStrings(cleaned, "|"));
2406  }
2407 
2408  *cleaned_list = JoinStrings(cleaned_groups, ";");
2409  return result;
2410 }
2411 
2412 
2421 void DownloadManager::SetProxyChain(
2422  const string &proxy_list,
2423  const string &fallback_proxy_list,
2424  const ProxySetModes set_mode)
2425 {
2426  MutexLockGuard m(lock_options_);
2427 
2428  opt_timestamp_backup_proxies_ = 0;
2429  opt_timestamp_failover_proxies_ = 0;
2430  string set_proxy_list = opt_proxy_list_;
2431  string set_proxy_fallback_list = opt_proxy_fallback_list_;
2432  bool contains_direct;
2433  if ((set_mode == kSetProxyFallback) || (set_mode == kSetProxyBoth)) {
2434  opt_proxy_fallback_list_ = fallback_proxy_list;
2435  }
2436  if ((set_mode == kSetProxyRegular) || (set_mode == kSetProxyBoth)) {
2437  opt_proxy_list_ = proxy_list;
2438  }
2439  contains_direct =
2440  StripDirect(opt_proxy_fallback_list_, &set_proxy_fallback_list);
2441  if (contains_direct) {
2443  "fallback proxies do not support DIRECT, removing");
2444  }
2445  if (set_proxy_fallback_list == "") {
2446  set_proxy_list = opt_proxy_list_;
2447  } else {
2448  bool contains_direct = StripDirect(opt_proxy_list_, &set_proxy_list);
2449  if (contains_direct) {
2451  "skipping DIRECT proxy to use fallback proxy");
2452  }
2453  }
2454 
2455  // From this point on, use set_proxy_list and set_fallback_proxy_list as
2456  // effective proxy lists!
2457 
2458  opt_proxy_map_.clear();
2459  delete opt_proxy_groups_;
2460  if ((set_proxy_list == "") && (set_proxy_fallback_list == "")) {
2461  opt_proxy_groups_ = NULL;
2462  opt_proxy_groups_current_ = 0;
2463  opt_proxy_groups_current_burned_ = 0;
2464  opt_proxy_groups_fallback_ = 0;
2465  opt_num_proxies_ = 0;
2466  return;
2467  }
2468 
2469  // Determine number of regular proxy groups (== first fallback proxy group)
2470  opt_proxy_groups_fallback_ = 0;
2471  if (set_proxy_list != "") {
2472  opt_proxy_groups_fallback_ = SplitString(set_proxy_list, ';').size();
2473  }
2474  LogCvmfs(kLogDownload, kLogDebug, "first fallback proxy group %u",
2475  opt_proxy_groups_fallback_);
2476 
2477  // Concatenate regular proxies and fallback proxies, both of which can be
2478  // empty.
2479  string all_proxy_list = set_proxy_list;
2480  if (set_proxy_fallback_list != "") {
2481  if (all_proxy_list != "")
2482  all_proxy_list += ";";
2483  all_proxy_list += set_proxy_fallback_list;
2484  }
2485  LogCvmfs(kLogDownload, kLogDebug, "full proxy list %s",
2486  all_proxy_list.c_str());
2487 
2488  // Resolve server names in provided urls
2489  vector<string> hostnames; // All encountered hostnames
2490  vector<string> proxy_groups;
2491  if (all_proxy_list != "")
2492  proxy_groups = SplitString(all_proxy_list, ';');
2493  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2494  vector<string> this_group = SplitString(proxy_groups[i], '|');
2495  for (unsigned j = 0; j < this_group.size(); ++j) {
2496  this_group[j] = dns::AddDefaultScheme(this_group[j]);
2497  // Note: DIRECT strings will be "extracted" to an empty string.
2498  string hostname = dns::ExtractHost(this_group[j]);
2499  // Save the hostname. Leave empty (DIRECT) names so indexes will
2500  // match later.
2501  hostnames.push_back(hostname);
2502  }
2503  }
2504  vector<dns::Host> hosts;
2505  LogCvmfs(kLogDownload, kLogDebug, "resolving %u proxy addresses",
2506  hostnames.size());
2507  resolver_->ResolveMany(hostnames, &hosts);
2508 
2509  // Construct opt_proxy_groups_: traverse proxy list in same order and expand
2510  // names to resolved IP addresses.
2511  opt_proxy_groups_ = new vector< vector<ProxyInfo> >();
2512  opt_num_proxies_ = 0;
2513  unsigned num_proxy = 0; // Combined i, j counter
2514  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2515  vector<string> this_group = SplitString(proxy_groups[i], '|');
2516  // Construct ProxyInfo objects from proxy string and DNS resolver result for
2517  // every proxy in this_group. One URL can result in multiple ProxyInfo
2518  // objects, one for each IP address.
2519  vector<ProxyInfo> infos;
2520  for (unsigned j = 0; j < this_group.size(); ++j, ++num_proxy) {
2521  this_group[j] = dns::AddDefaultScheme(this_group[j]);
2522  if (this_group[j] == "DIRECT") {
2523  infos.push_back(ProxyInfo("DIRECT"));
2524  continue;
2525  }
2526 
2527  if (hosts[num_proxy].status() != dns::kFailOk) {
2529  "failed to resolve IP addresses for %s (%d - %s)",
2530  hosts[num_proxy].name().c_str(), hosts[num_proxy].status(),
2531  dns::Code2Ascii(hosts[num_proxy].status()));
2532  dns::Host failed_host =
2533  dns::Host::ExtendDeadline(hosts[num_proxy], resolver_->min_ttl());
2534  infos.push_back(ProxyInfo(failed_host, this_group[j]));
2535  continue;
2536  }
2537 
2538  // IPv4 addresses have precedence
2539  set<string> best_addresses =
2540  hosts[num_proxy].ViewBestAddresses(opt_ip_preference_);
2541  set<string>::const_iterator iter_ips = best_addresses.begin();
2542  for (; iter_ips != best_addresses.end(); ++iter_ips) {
2543  string url_ip = dns::RewriteUrl(this_group[j], *iter_ips);
2544  infos.push_back(ProxyInfo(hosts[num_proxy], url_ip));
2545  }
2546  }
2547  opt_proxy_groups_->push_back(infos);
2548  opt_num_proxies_ += infos.size();
2549  }
2551  "installed %u proxies in %u load-balance groups",
2552  opt_num_proxies_, opt_proxy_groups_->size());
2553  opt_proxy_groups_current_ = 0;
2554  opt_proxy_groups_current_burned_ = 0;
2555 
2556  // Select random start proxy from the first group.
2557  if (opt_proxy_groups_->size() > 0) {
2558  // Select random start proxy from the first group.
2559  UpdateProxiesUnlocked("set proxies");
2560  }
2561 }
2562 
2563 
2570 void DownloadManager::GetProxyInfo(vector< vector<ProxyInfo> > *proxy_chain,
2571  unsigned *current_group,
2572  unsigned *fallback_group)
2573 {
2574  assert(proxy_chain != NULL);
2575  MutexLockGuard m(lock_options_);
2576 
2577 
2578  if (!opt_proxy_groups_) {
2579  vector< vector<ProxyInfo> > empty_chain;
2580  *proxy_chain = empty_chain;
2581  if (current_group != NULL)
2582  *current_group = 0;
2583  if (fallback_group != NULL)
2584  *fallback_group = 0;
2585  return;
2586  }
2587 
2588  *proxy_chain = *opt_proxy_groups_;
2589  if (current_group != NULL)
2590  *current_group = opt_proxy_groups_current_;
2591  if (fallback_group != NULL)
2592  *fallback_group = opt_proxy_groups_fallback_;
2593 }
2594 
2595 string DownloadManager::GetProxyList() {
2596  return opt_proxy_list_;
2597 }
2598 
2599 string DownloadManager::GetFallbackProxyList() {
2600  return opt_proxy_fallback_list_;
2601 }
2602 
2607 DownloadManager::ChooseProxyUnlocked(const shash::Any *hash) {
2608  if (!opt_proxy_groups_)
2609  return NULL;
2610 
2611  uint32_t key = (hash ? hash->Partial32() : 0);
2612  map<uint32_t, ProxyInfo *>::iterator it = opt_proxy_map_.lower_bound(key);
2613  ProxyInfo *proxy = it->second;
2614 
2615  return proxy;
2616 }
2617 
2621 void DownloadManager::UpdateProxiesUnlocked(const string &reason) {
2622  if (!opt_proxy_groups_)
2623  return;
2624 
2625  // Identify number of non-burned proxies within the current group
2626  vector<ProxyInfo> *group = current_proxy_group();
2627  unsigned num_alive = (group->size() - opt_proxy_groups_current_burned_);
2628  string old_proxy = JoinStrings(opt_proxy_urls_, "|");
2629 
2630  // Rebuild proxy map and URL list
2631  opt_proxy_map_.clear();
2632  opt_proxy_urls_.clear();
2633  const uint32_t max_key = 0xffffffffUL;
2634  if (opt_proxy_shard_) {
2635  // Build a consistent map with multiple entries for each proxy
2636  for (unsigned i = 0; i < num_alive; ++i) {
2637  ProxyInfo *proxy = &(*group)[i];
2638  shash::Any proxy_hash(shash::kSha1);
2639  HashString(proxy->url, &proxy_hash);
2640  Prng prng;
2641  prng.InitSeed(proxy_hash.Partial32());
2642  for (unsigned j = 0; j < kProxyMapScale; ++j) {
2643  const std::pair<uint32_t, ProxyInfo *> entry(prng.Next(max_key), proxy);
2644  opt_proxy_map_.insert(entry);
2645  }
2646  opt_proxy_urls_.push_back(proxy->url);
2647  }
2648  // Ensure lower_bound() finds a value for all keys
2649  ProxyInfo *first_proxy = opt_proxy_map_.begin()->second;
2650  const std::pair<uint32_t, ProxyInfo *> last_entry(max_key, first_proxy);
2651  opt_proxy_map_.insert(last_entry);
2652  } else {
2653  // Build a map with a single entry for one randomly selected proxy
2654  unsigned select = prng_.Next(num_alive);
2655  ProxyInfo *proxy = &(*group)[select];
2656  const std::pair<uint32_t, ProxyInfo *> entry(max_key, proxy);
2657  opt_proxy_map_.insert(entry);
2658  opt_proxy_urls_.push_back(proxy->url);
2659  }
2660  sort(opt_proxy_urls_.begin(), opt_proxy_urls_.end());
2661 
2662  // Report any change in proxy usage
2663  string new_proxy = JoinStrings(opt_proxy_urls_, "|");
2664  if (new_proxy != old_proxy) {
2666  "switching proxy from %s to %s (%s)",
2667  (old_proxy.empty() ? "(none)" : old_proxy.c_str()),
2668  (new_proxy.empty() ? "(none)" : new_proxy.c_str()),
2669  reason.c_str());
2670  }
2671 }
2672 
2676 void DownloadManager::ShardProxies() {
2677  opt_proxy_shard_ = true;
2678  RebalanceProxiesUnlocked("enable sharding");
2679 }
2680 
2685 void DownloadManager::RebalanceProxiesUnlocked(const string &reason) {
2686  if (!opt_proxy_groups_)
2687  return;
2688 
2689  opt_timestamp_failover_proxies_ = 0;
2690  opt_proxy_groups_current_burned_ = 0;
2691  UpdateProxiesUnlocked(reason);
2692 }
2693 
2694 
2695 void DownloadManager::RebalanceProxies() {
2696  MutexLockGuard m(lock_options_);
2697  RebalanceProxiesUnlocked("rebalance");
2698 }
2699 
2700 
2704 void DownloadManager::SwitchProxyGroup() {
2705  MutexLockGuard m(lock_options_);
2706 
2707  if (!opt_proxy_groups_ || (opt_proxy_groups_->size() < 2)) {
2708  return;
2709  }
2710 
2711  opt_proxy_groups_current_ = (opt_proxy_groups_current_ + 1) %
2712  opt_proxy_groups_->size();
2713  opt_timestamp_backup_proxies_ = time(NULL);
2714  RebalanceProxiesUnlocked("switch proxy group");
2715 }
2716 
2717 
2718 void DownloadManager::SetProxyGroupResetDelay(const unsigned seconds) {
2719  MutexLockGuard m(lock_options_);
2720  opt_proxy_groups_reset_after_ = seconds;
2721  if (opt_proxy_groups_reset_after_ == 0) {
2722  opt_timestamp_backup_proxies_ = 0;
2723  opt_timestamp_failover_proxies_ = 0;
2724  }
2725 }
2726 
2727 
2728 void DownloadManager::SetHostResetDelay(const unsigned seconds)
2729 {
2730  MutexLockGuard m(lock_options_);
2731  opt_host_reset_after_ = seconds;
2732  if (opt_host_reset_after_ == 0)
2733  opt_timestamp_backup_host_ = 0;
2734 }
2735 
2736 
2737 void DownloadManager::SetRetryParameters(const unsigned max_retries,
2738  const unsigned backoff_init_ms,
2739  const unsigned backoff_max_ms)
2740 {
2741  MutexLockGuard m(lock_options_);
2742  opt_max_retries_ = max_retries;
2743  opt_backoff_init_ms_ = backoff_init_ms;
2744  opt_backoff_max_ms_ = backoff_max_ms;
2745 }
2746 
2747 
2748 void DownloadManager::SetMaxIpaddrPerProxy(unsigned limit) {
2749  MutexLockGuard m(lock_options_);
2750  resolver_->set_throttle(limit);
2751 }
2752 
2753 
2754 void DownloadManager::SetProxyTemplates(
2755  const std::string &direct,
2756  const std::string &forced)
2757 {
2758  MutexLockGuard m(lock_options_);
2759  proxy_template_direct_ = direct;
2760  proxy_template_forced_ = forced;
2761 }
2762 
2763 
2764 void DownloadManager::EnableInfoHeader() {
2765  enable_info_header_ = true;
2766 }
2767 
2768 
2769 void DownloadManager::EnableRedirects() {
2770  follow_redirects_ = true;
2771 }
2772 
2773 void DownloadManager::UseSystemCertificatePath() {
2774  ssl_certificate_store_.UseSystemCertificatePath();
2775 }
2776 
2781 DownloadManager *DownloadManager::Clone(
2782  const perf::StatisticsTemplate &statistics)
2783 {
2784  DownloadManager *clone = new DownloadManager();
2785  clone->Init(pool_max_handles_, statistics);
2786  if (resolver_) {
2787  clone->SetDnsParameters(resolver_->retries(), resolver_->timeout_ms());
2788  clone->SetDnsTtlLimits(resolver_->min_ttl(), resolver_->max_ttl());
2789  clone->SetMaxIpaddrPerProxy(resolver_->throttle());
2790  }
2791  if (!opt_dns_server_.empty())
2792  clone->SetDnsServer(opt_dns_server_);
2793  clone->opt_timeout_proxy_ = opt_timeout_proxy_;
2794  clone->opt_timeout_direct_ = opt_timeout_direct_;
2795  clone->opt_low_speed_limit_ = opt_low_speed_limit_;
2796  clone->opt_max_retries_ = opt_max_retries_;
2797  clone->opt_backoff_init_ms_ = opt_backoff_init_ms_;
2798  clone->opt_backoff_max_ms_ = opt_backoff_max_ms_;
2799  clone->enable_info_header_ = enable_info_header_;
2800  clone->follow_redirects_ = follow_redirects_;
2801  if (opt_host_chain_) {
2802  clone->opt_host_chain_ = new vector<string>(*opt_host_chain_);
2803  clone->opt_host_chain_rtt_ = new vector<int>(*opt_host_chain_rtt_);
2804  }
2805  CloneProxyConfig(clone);
2806  clone->opt_ip_preference_ = opt_ip_preference_;
2807  clone->proxy_template_direct_ = proxy_template_direct_;
2808  clone->proxy_template_forced_ = proxy_template_forced_;
2809  clone->opt_proxy_groups_reset_after_ = opt_proxy_groups_reset_after_;
2810  clone->opt_host_reset_after_ = opt_host_reset_after_;
2811  clone->credentials_attachment_ = credentials_attachment_;
2812  clone->ssl_certificate_store_ = ssl_certificate_store_;
2813 
2814  return clone;
2815 }
2816 
2817 
2818 void DownloadManager::CloneProxyConfig(DownloadManager *clone) {
2819  clone->opt_proxy_groups_current_ = opt_proxy_groups_current_;
2820  clone->opt_proxy_groups_current_burned_ = opt_proxy_groups_current_burned_;
2821  clone->opt_proxy_groups_fallback_ = opt_proxy_groups_fallback_;
2822  clone->opt_num_proxies_ = opt_num_proxies_;
2823  clone->opt_proxy_shard_ = opt_proxy_shard_;
2824  clone->opt_proxy_list_ = opt_proxy_list_;
2825  clone->opt_proxy_fallback_list_ = opt_proxy_fallback_list_;
2826  if (opt_proxy_groups_ == NULL)
2827  return;
2828 
2829  clone->opt_proxy_groups_ = new vector< vector<ProxyInfo> >(
2830  *opt_proxy_groups_);
2831  clone->UpdateProxiesUnlocked("cloned");
2832 }
2833 
2834 } // namespace download
unsigned opt_timeout_direct_
Definition: download.h:524
unsigned opt_low_speed_limit_
Definition: download.h:525
void HashString(const std::string &content, Any *any_digest)
Definition: hash.cc:268
Destination destination
Definition: download.h:166
#define LogCvmfs(source, mask,...)
Definition: logging.h:22
Definition: prng.h:28
std::vector< T > Shuffle(const std::vector< T > &input, Prng *prng)
Definition: algorithm.h:49
const char * Code2Ascii(const ObjectFetcherFailures::Failures error)
unsigned opt_backoff_init_ms_
Definition: download.h:527
static unsigned EscapeHeader(const string &header, char *escaped_buf, size_t buf_size)
Definition: download.cc:123
unsigned char num_used_hosts
Definition: download.h:297
static bool EscapeUrlChar(char input, char output[3])
Definition: download.cc:73
int64_t Xadd(class Counter *counter, const int64_t delta)
Definition: statistics.h:51
const char * Code2Ascii(const Failures error)
Definition: dns.h:53
int64_t id() const
Definition: dns.h:108
unsigned opt_proxy_groups_current_burned_
Definition: download.h:552
double DiffTimeSeconds(struct timeval start, struct timeval end)
Definition: algorithm.cc:31
unsigned opt_proxy_groups_reset_after_
Definition: download.h:614
virtual bool IsCanceled()
Definition: interrupt.h:16
void SetUrlOptions(JobInfo *info)
Definition: download.cc:905
StreamStates DecompressZStream2Sink(const void *buf, const int64_t size, z_stream *strm, cvmfs::Sink *sink)
Definition: compression.cc:234
unsigned backoff_ms
Definition: download.h:299
std::string opt_proxy_fallback_list_
Definition: download.h:569
static NormalResolver * Create(const bool ipv4_only, const unsigned retries, const unsigned timeout_ms)
Definition: dns.cc:1259
unsigned opt_host_reset_after_
Definition: download.h:622
std::string proxy_template_direct_
Definition: download.h:598
#define PANIC(...)
Definition: exception.h:27
string ReplaceAll(const string &haystack, const string &needle, const string &replace_by)
Definition: string.cc:484
string JoinStrings(const vector< string > &strings, const string &joint)
Definition: string.cc:325
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:249
unsigned opt_proxy_groups_current_
Definition: download.h:547
off_t range_offset
Definition: download.h:179
shash::ContextPtr hash_context
Definition: download.h:290
void DecompressInit(z_stream *strm)
Definition: compression.cc:185
unsigned int current_host_chain_index
Definition: download.h:300
bool DecompressMem2Mem(const void *buf, const int64_t size, void **out_buf, uint64_t *out_size)
Definition: compression.cc:797
std::set< CURL * > * pool_handles_inuse_
Definition: download.h:503
void * cred_data
Definition: download.h:164
StreamStates DecompressZStream2File(const void *buf, const int64_t size, z_stream *strm, FILE *f)
Definition: compression.cc:275
std::string opt_proxy_list_
Definition: download.h:565
const std::string & name() const
Definition: dns.h:118
perf::Counter * sz_transfer_time
Definition: download.h:129
std::vector< std::vector< ProxyInfo > > * opt_proxy_groups_
Definition: download.h:543
assert((mem||(size==0))&&"Out Of Memory")
unsigned opt_proxy_groups_fallback_
Definition: download.h:557
void ReleaseCurlHandle(CURL *handle)
Definition: download.cc:819
StreamStates
Definition: compression.h:36
Algorithms algorithm
Definition: hash.h:125
z_stream zstream
Definition: download.h:289
const std::set< std::string > & ViewBestAddresses(IpPreference preference) const
Definition: dns.cc:205
void SetDnsServer(const std::string &address)
Definition: download.cc:1830
void DecompressFini(z_stream *strm)
Definition: compression.cc:201
void InitSeed(const uint64_t seed)
Definition: prng.h:35
void MakePipe(int pipe_fd[2])
Definition: posix.cc:488
char algorithm
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:182
void Init(ContextPtr context)
Definition: hash.cc:164
bool IsValid(const std::string &input) const
Definition: sanitizer.cc:114
Algorithms
Definition: hash.h:41
void SetDnsTtlLimits(const unsigned min_seconds, const unsigned max_seconds)
Definition: download.cc:1866
static Failures PrepareDownloadDestination(JobInfo *info)
Definition: download.cc:153
void Init(const unsigned max_pool_handles, const perf::StatisticsTemplate &statistics)
Definition: download.cc:1614
const char * Code2Ascii(const Failures error)
Definition: download.h:90
unsigned char num_retries
Definition: download.h:298
std::string AddDefaultScheme(const std::string &proxy)
Definition: dns.cc:187
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:290
static string EscapeUrl(const string &url)
Definition: download.cc:100
dns::IpPreference opt_ip_preference_
Definition: download.h:591
Algorithms algorithm
Definition: hash.h:500
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:287
Definition: dns.h:90
Failures Fetch(const std::string &base_url, const std::string &repository_name, const uint64_t minimum_timestamp, const shash::Any *base_catalog, signature::SignatureManager *signature_manager, download::DownloadManager *download_manager, ManifestEnsemble *ensemble)
void UpdateProxiesUnlocked(const std::string &reason)
Definition: download.cc:2621
bool IsEquivalent(const Host &other) const
Definition: dns.cc:269
bool IsProxyTransferError(const Failures error)
Definition: download.h:78
bool IsExpired() const
Definition: dns.cc:280
FILE * destination_file
Definition: download.h:172
bool follow_redirects
Definition: download.h:159
virtual int Reset()=0
perf::Counter * n_requests
Definition: download.h:130
static int Init(const loader::LoaderExports *loader_exports)
Definition: cvmfs.cc:2119
void Final(ContextPtr context, Any *any_digest)
Definition: hash.cc:221
string StringifyInt(const int64_t value)
Definition: string.cc:78
void SetMaxIpaddrPerProxy(unsigned limit)
Definition: download.cc:2748
const shash::Any * expected_hash
Definition: download.h:175
void Inc(class Counter *counter)
Definition: statistics.h:50
void * buffer
Definition: hash.h:501
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:267
const std::string * extra_info
Definition: download.h:176
std::string ExtractHost(const std::string &url)
Definition: dns.cc:110
std::vector< int > * opt_host_chain_rtt_
Definition: download.h:539
cvmfs::Sink * destination_sink
Definition: download.h:174
SslCertificateStore ssl_certificate_store_
Definition: download.h:635
uint32_t Partial32() const
Definition: hash.h:394
IpPreference
Definition: dns.h:46
unsigned opt_backoff_max_ms_
Definition: download.h:528
unsigned GetContextSize(const Algorithms algorithm)
Definition: hash.cc:148
CURL * curl_handle
Definition: download.h:286
CredentialsAttachment * credentials_attachment_
Definition: download.h:624
std::vector< std::string > * opt_host_chain_
Definition: download.h:534
struct pollfd * watch_fds_
Definition: download.h:515
void Update(const unsigned char *buffer, const unsigned buffer_length, ContextPtr context)
Definition: hash.cc:190
uint64_t String2Uint64(const string &value)
Definition: string.cc:228
Failures error_code
Definition: download.h:294
static void Fini()
Definition: cvmfs.cc:2328
static void Spawn()
Definition: cvmfs.cc:2243
InterruptCue * interrupt_cue
Definition: download.h:165
struct download::JobInfo::@3 destination_mem
Definition: mutex.h:42
std::string Filter(const std::string &input) const
Definition: sanitizer.cc:107
std::string proxy_template_forced_
Definition: download.h:604
const std::string * destination_path
Definition: download.h:173
void SetDnsParameters(const unsigned retries, const unsigned timeout_ms)
Definition: download.cc:1848
static Host ExtendDeadline(const Host &original, unsigned seconds_from_now)
Definition: dns.cc:231
void SafeSleepMs(const unsigned ms)
Definition: posix.cc:1899
bool IsHostTransferError(const Failures error)
Definition: download.h:66
void SortTeam(std::vector< T > *tractor, std::vector< U > *towed)
Definition: algorithm.h:67
curl_slist * headers
Definition: download.h:287
unsigned size
Definition: hash.h:502
unsigned char num_used_proxies
Definition: download.h:296
Failures status() const
Definition: dns.h:119
std::string proxy
Definition: download.h:292
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:497
static void size_t size
Definition: smalloc.h:47
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
Definition: download.cc:1228
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:509
const std::string * url
Definition: download.h:155
void InitializeRequest(JobInfo *info, CURL *handle)
Definition: download.cc:837
string RewriteUrl(const string &url, const string &ip)
Definition: dns.cc:156
uint32_t Next(const uint64_t boundary)
Definition: prng.h:49
virtual int64_t Write(const void *buf, uint64_t sz)=0
char * info_header
Definition: download.h:288