CernVM-FS  2.10.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
download.cc
Go to the documentation of this file.
1 
27 // TODO(jblomer): MS for time summing
28 // NOLINTNEXTLINE
29 #define __STDC_FORMAT_MACROS
30 
31 #include "cvmfs_config.h"
32 #include "download.h"
33 
34 #include <alloca.h>
35 #include <errno.h>
36 #include <inttypes.h>
37 #include <poll.h>
38 #include <pthread.h>
39 #include <signal.h>
40 #include <stdint.h>
41 #include <sys/time.h>
42 #include <unistd.h>
43 
44 #include <algorithm>
45 #include <cassert>
46 #include <cstdio>
47 #include <cstdlib>
48 #include <cstring>
49 #include <map>
50 #include <set>
51 
52 #include "atomic.h"
53 #include "compression.h"
54 #include "duplex_curl.h"
55 #include "hash.h"
56 #include "logging.h"
57 #include "prng.h"
58 #include "sanitizer.h"
59 #include "smalloc.h"
60 #include "ssl.h"
61 #include "util/algorithm.h"
62 #include "util/exception.h"
63 #include "util/posix.h"
64 #include "util/string.h"
65 #include "util_concurrency.h"
66 
67 using namespace std; // NOLINT
68 
69 namespace download {
70 
71 static inline bool EscapeUrlChar(char input, char output[3]) {
72  if (((input >= '0') && (input <= '9')) ||
73  ((input >= 'A') && (input <= 'Z')) ||
74  ((input >= 'a') && (input <= 'z')) ||
75  (input == '/') || (input == ':') || (input == '.') ||
76  (input == '@') ||
77  (input == '+') || (input == '-') ||
78  (input == '_') || (input == '~') ||
79  (input == '[') || (input == ']') || (input == ','))
80  {
81  output[0] = input;
82  return false;
83  }
84 
85  output[0] = '%';
86  output[1] = static_cast<char>(
87  (input / 16) + ((input / 16 <= 9) ? '0' : 'A'-10));
88  output[2] = static_cast<char>(
89  (input % 16) + ((input % 16 <= 9) ? '0' : 'A'-10));
90  return true;
91 }
92 
93 
98 static string EscapeUrl(const string &url) {
99  string escaped;
100  escaped.reserve(url.length());
101 
102  char escaped_char[3];
103  for (unsigned i = 0, s = url.length(); i < s; ++i) {
104  if (EscapeUrlChar(url[i], escaped_char)) {
105  escaped.append(escaped_char, 3);
106  } else {
107  escaped.push_back(escaped_char[0]);
108  }
109  }
110  LogCvmfs(kLogDownload, kLogDebug, "escaped %s to %s",
111  url.c_str(), escaped.c_str());
112 
113  return escaped;
114 }
115 
116 
121 static unsigned EscapeHeader(const string &header,
122  char *escaped_buf,
123  size_t buf_size)
124 {
125  unsigned esc_pos = 0;
126  char escaped_char[3];
127  for (unsigned i = 0, s = header.size(); i < s; ++i) {
128  if (EscapeUrlChar(header[i], escaped_char)) {
129  for (unsigned j = 0; j < 3; ++j) {
130  if (escaped_buf) {
131  if (esc_pos >= buf_size)
132  return esc_pos;
133  escaped_buf[esc_pos] = escaped_char[j];
134  }
135  esc_pos++;
136  }
137  } else {
138  if (escaped_buf) {
139  if (esc_pos >= buf_size)
140  return esc_pos;
141  escaped_buf[esc_pos] = escaped_char[0];
142  }
143  esc_pos++;
144  }
145  }
146 
147  return esc_pos;
148 }
149 
150 
152  info->destination_mem.size = 0;
153  info->destination_mem.pos = 0;
154  info->destination_mem.data = NULL;
155 
156  if (info->destination == kDestinationFile)
157  assert(info->destination_file != NULL);
158 
159  if (info->destination == kDestinationPath) {
160  assert(info->destination_path != NULL);
161  info->destination_file = fopen(info->destination_path->c_str(), "w");
162  if (info->destination_file == NULL) {
163  LogCvmfs(kLogDownload, kLogDebug, "Failed to open path %s: %s"
164  " (errno=%d).",
165  info->destination_path->c_str(), strerror(errno), errno);
166  return kFailLocalIO;
167  }
168  }
169 
170  if (info->destination == kDestinationSink)
171  assert(info->destination_sink != NULL);
172 
173  return kFailOk;
174 }
175 
176 
180 static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb,
181  void *info_link)
182 {
183  const size_t num_bytes = size*nmemb;
184  const string header_line(static_cast<const char *>(ptr), num_bytes);
185  JobInfo *info = static_cast<JobInfo *>(info_link);
186 
187  // LogCvmfs(kLogDownload, kLogDebug, "REMOVE-ME: Header callback with %s",
188  // header_line.c_str());
189 
190  // Check http status codes
191  if (HasPrefix(header_line, "HTTP/1.", false)) {
192  if (header_line.length() < 10)
193  return 0;
194 
195  unsigned i;
196  for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {}
197 
198  // Code is initialized to -1
199  if (header_line.length() > i+2) {
200  info->http_code = DownloadManager::ParseHttpCode(&header_line[i]);
201  }
202 
203  if ((info->http_code / 100) == 2) {
204  return num_bytes;
205  } else if ((info->http_code == 301) ||
206  (info->http_code == 302) ||
207  (info->http_code == 303) ||
208  (info->http_code == 307))
209  {
210  if (!info->follow_redirects) {
211  LogCvmfs(kLogDownload, kLogDebug, "redirect support not enabled: %s",
212  header_line.c_str());
213  info->error_code = kFailHostHttp;
214  return 0;
215  }
216  LogCvmfs(kLogDownload, kLogDebug, "http redirect: %s",
217  header_line.c_str());
218  // libcurl will handle this because of CURLOPT_FOLLOWLOCATION
219  return num_bytes;
220  } else {
221  LogCvmfs(kLogDownload, kLogDebug, "http status error code: %s [%d]",
222  header_line.c_str(), info->http_code);
223  if (((info->http_code / 100) == 5) ||
224  (info->http_code == 400) || (info->http_code == 404))
225  {
226  // 5XX returned by host
227  // 400: error from the GeoAPI module
228  // 404: the stratum 1 does not have the newest files
229  info->error_code = kFailHostHttp;
230  } else if (info->http_code == 429) {
231  // 429: rate throttling (we ignore the backoff hint for the time being)
233  } else {
234  info->error_code = (info->proxy == "DIRECT") ? kFailHostHttp :
236  }
237  return 0;
238  }
239  }
240 
241  // Allocate memory for kDestinationMemory
242  if ((info->destination == kDestinationMem) &&
243  HasPrefix(header_line, "CONTENT-LENGTH:", true))
244  {
245  char *tmp = reinterpret_cast<char *>(alloca(num_bytes+1));
246  uint64_t length = 0;
247  sscanf(header_line.c_str(), "%s %" PRIu64, tmp, &length);
248  if (length > 0) {
249  if (length > DownloadManager::kMaxMemSize) {
251  "resource %s too large to store in memory (%" PRIu64 ")",
252  info->url->c_str(), length);
253  info->error_code = kFailTooBig;
254  return 0;
255  }
256  info->destination_mem.data = static_cast<char *>(smalloc(length));
257  } else {
258  // Empty resource
259  info->destination_mem.data = NULL;
260  }
261  info->destination_mem.size = length;
262  } else if (HasPrefix(header_line, "LOCATION:", true)) {
263  // This comes along with redirects
264  LogCvmfs(kLogDownload, kLogDebug, "%s", header_line.c_str());
265  } else if (HasPrefix(header_line, "X-SQUID-ERROR:", true)) {
266  // Reinterpret host error as proxy error
267  if (info->error_code == kFailHostHttp) {
268  info->error_code = kFailProxyHttp;
269  }
270  } else if (HasPrefix(header_line, "PROXY-STATUS:", true)) {
271  // Reinterpret host error as proxy error if applicable
272  if ((info->error_code == kFailHostHttp) &&
273  (header_line.find("error=") != string::npos)) {
274  info->error_code = kFailProxyHttp;
275  }
276  }
277 
278  return num_bytes;
279 }
280 
281 
285 static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb,
286  void *info_link)
287 {
288  const size_t num_bytes = size*nmemb;
289  JobInfo *info = static_cast<JobInfo *>(info_link);
290 
291  // LogCvmfs(kLogDownload, kLogDebug, "Data callback, %d bytes", num_bytes);
292 
293  if (num_bytes == 0)
294  return 0;
295 
296  if (info->expected_hash) {
297  shash::Update(reinterpret_cast<unsigned char *>(ptr),
298  num_bytes, info->hash_context);
299  }
300 
301  if (info->destination == kDestinationSink) {
302  if (info->compressed) {
303  zlib::StreamStates retval =
304  zlib::DecompressZStream2Sink(ptr, static_cast<int64_t>(num_bytes),
305  &info->zstream, info->destination_sink);
306  if (retval == zlib::kStreamDataError) {
307  LogCvmfs(kLogDownload, kLogSyslogErr, "failed to decompress %s",
308  info->url->c_str());
309  info->error_code = kFailBadData;
310  return 0;
311  } else if (retval == zlib::kStreamIOError) {
313  "decompressing %s, local IO error", info->url->c_str());
314  info->error_code = kFailLocalIO;
315  return 0;
316  }
317  } else {
318  int64_t written = info->destination_sink->Write(ptr, num_bytes);
319  if ((written < 0) || (static_cast<uint64_t>(written) != num_bytes)) {
320  LogCvmfs(kLogDownload, kLogDebug, "Failed to perform write on %s (%"
321  PRId64 ")", info->url->c_str(), written);
322  info->error_code = kFailLocalIO;
323  return 0;
324  }
325  }
326  } else if (info->destination == kDestinationMem) {
327  // Write to memory
328  if (info->destination_mem.pos + num_bytes > info->destination_mem.size) {
329  if (info->destination_mem.size == 0) {
331  "Content-Length was missing or zero, but %zu bytes received",
332  info->destination_mem.pos + num_bytes);
333  } else {
334  LogCvmfs(kLogDownload, kLogDebug, "Callback had too much data: "
335  "start %zu, bytes %zu, expected %zu",
336  info->destination_mem.pos,
337  num_bytes,
338  info->destination_mem.size);
339  }
340  info->error_code = kFailBadData;
341  return 0;
342  }
343  memcpy(info->destination_mem.data + info->destination_mem.pos,
344  ptr, num_bytes);
345  info->destination_mem.pos += num_bytes;
346  } else {
347  // Write to file
348  if (info->compressed) {
349  // LogCvmfs(kLogDownload, kLogDebug, "REMOVE-ME: writing %d bytes for %s",
350  // num_bytes, info->url->c_str());
351  zlib::StreamStates retval =
352  zlib::DecompressZStream2File(ptr, static_cast<int64_t>(num_bytes),
353  &info->zstream, info->destination_file);
354  if (retval == zlib::kStreamDataError) {
355  LogCvmfs(kLogDownload, kLogSyslogErr, "failed to decompress %s",
356  info->url->c_str());
357  info->error_code = kFailBadData;
358  return 0;
359  } else if (retval == zlib::kStreamIOError) {
361  "decompressing %s, local IO error", info->url->c_str());
362  info->error_code = kFailLocalIO;
363  return 0;
364  }
365  } else {
366  if (fwrite(ptr, 1, num_bytes, info->destination_file) != num_bytes) {
368  "downloading %s, IO failure: %s (errno=%d)",
369  info->url->c_str(), strerror(errno), errno);
370  info->error_code = kFailLocalIO;
371  return 0;
372  }
373  }
374  }
375 
376  return num_bytes;
377 }
378 
379 
380 //------------------------------------------------------------------------------
381 
382 
383 bool JobInfo::IsFileNotFound() {
384  if (HasPrefix(*url, "file://", true /* ignore_case */))
385  return error_code == kFailHostConnection;
386 
387  return http_code == 404;
388 }
389 
390 
391 //------------------------------------------------------------------------------
392 
393 
394 const int DownloadManager::kProbeUnprobed = -1;
395 const int DownloadManager::kProbeDown = -2;
396 const int DownloadManager::kProbeGeo = -3;
397 const unsigned DownloadManager::kMaxMemSize = 1024*1024;
398 
399 
403 int DownloadManager::ParseHttpCode(const char digits[3]) {
404  int result = 0;
405  int factor = 100;
406  for (int i = 0; i < 3; ++i) {
407  if ((digits[i] < '0') || (digits[i] > '9'))
408  return -1;
409  result += (digits[i] - '0') * factor;
410  factor /= 10;
411  }
412  return result;
413 }
414 
415 
419 int DownloadManager::CallbackCurlSocket(CURL * /* easy */,
420  curl_socket_t s,
421  int action,
422  void *userp,
423  void * /* socketp */)
424 {
425  // LogCvmfs(kLogDownload, kLogDebug, "CallbackCurlSocket called with easy "
426  // "handle %p, socket %d, action %d", easy, s, action);
427  DownloadManager *download_mgr = static_cast<DownloadManager *>(userp);
428  if (action == CURL_POLL_NONE)
429  return 0;
430 
431  // Find s in watch_fds_
432  unsigned index;
433  for (index = 0; index < download_mgr->watch_fds_inuse_; ++index) {
434  if (download_mgr->watch_fds_[index].fd == s)
435  break;
436  }
437  // Or create newly
438  if (index == download_mgr->watch_fds_inuse_) {
439  // Extend array if necessary
440  if (download_mgr->watch_fds_inuse_ == download_mgr->watch_fds_size_)
441  {
442  assert(download_mgr->watch_fds_size_ > 0);
443  download_mgr->watch_fds_size_ *= 2;
444  download_mgr->watch_fds_ = static_cast<struct pollfd *>(
445  srealloc(download_mgr->watch_fds_,
446  download_mgr->watch_fds_size_ * sizeof(struct pollfd)));
447  }
448  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].fd = s;
449  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].events = 0;
450  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].revents = 0;
451  download_mgr->watch_fds_inuse_++;
452  }
453 
454  switch (action) {
455  case CURL_POLL_IN:
456  download_mgr->watch_fds_[index].events = POLLIN | POLLPRI;
457  break;
458  case CURL_POLL_OUT:
459  download_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND;
460  break;
461  case CURL_POLL_INOUT:
462  download_mgr->watch_fds_[index].events =
463  POLLIN | POLLPRI | POLLOUT | POLLWRBAND;
464  break;
465  case CURL_POLL_REMOVE:
466  if (index < download_mgr->watch_fds_inuse_-1) {
467  download_mgr->watch_fds_[index] =
468  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_-1];
469  }
470  download_mgr->watch_fds_inuse_--;
471  // Shrink array if necessary
472  if ((download_mgr->watch_fds_inuse_ > download_mgr->watch_fds_max_) &&
473  (download_mgr->watch_fds_inuse_ < download_mgr->watch_fds_size_/2))
474  {
475  download_mgr->watch_fds_size_ /= 2;
476  // LogCvmfs(kLogDownload, kLogDebug, "shrinking watch_fds_ (%d)",
477  // watch_fds_size_);
478  download_mgr->watch_fds_ = static_cast<struct pollfd *>(
479  srealloc(download_mgr->watch_fds_,
480  download_mgr->watch_fds_size_*sizeof(struct pollfd)));
481  // LogCvmfs(kLogDownload, kLogDebug, "shrinking watch_fds_ done",
482  // watch_fds_size_);
483  }
484  break;
485  default:
486  break;
487  }
488 
489  return 0;
490 }
491 
492 
496 void *DownloadManager::MainDownload(void *data) {
497  LogCvmfs(kLogDownload, kLogDebug, "download I/O thread started");
498  DownloadManager *download_mgr = static_cast<DownloadManager *>(data);
499 
500  download_mgr->watch_fds_ =
501  static_cast<struct pollfd *>(smalloc(2 * sizeof(struct pollfd)));
502  download_mgr->watch_fds_size_ = 2;
503  download_mgr->watch_fds_[0].fd = download_mgr->pipe_terminate_[0];
504  download_mgr->watch_fds_[0].events = POLLIN | POLLPRI;
505  download_mgr->watch_fds_[0].revents = 0;
506  download_mgr->watch_fds_[1].fd = download_mgr->pipe_jobs_[0];
507  download_mgr->watch_fds_[1].events = POLLIN | POLLPRI;
508  download_mgr->watch_fds_[1].revents = 0;
509  download_mgr->watch_fds_inuse_ = 2;
510 
511  int still_running = 0;
512  struct timeval timeval_start, timeval_stop;
513  gettimeofday(&timeval_start, NULL);
514  while (true) {
515  int timeout;
516  if (still_running) {
517  /* NOTE: The following might degrade the performance for many small files
518  * use case. TODO(jblomer): look into it.
519  // Specify a timeout for polling in ms; this allows us to return
520  // to libcurl once a second so it can look for internal operations
521  // which timed out. libcurl has a more elaborate mechanism
522  // (CURLMOPT_TIMERFUNCTION) that would inform us of the next potential
523  // timeout. TODO(bbockelm) we should switch to that in the future.
524  timeout = 100;
525  */
526  timeout = 1;
527  } else {
528  timeout = -1;
529  gettimeofday(&timeval_stop, NULL);
530  int64_t delta = static_cast<int64_t>(
531  1000 * DiffTimeSeconds(timeval_start, timeval_stop));
532  perf::Xadd(download_mgr->counters_->sz_transfer_time, delta);
533  }
534  int retval = poll(download_mgr->watch_fds_, download_mgr->watch_fds_inuse_,
535  timeout);
536  if (retval < 0) {
537  continue;
538  }
539 
540  // Handle timeout
541  if (retval == 0) {
542  curl_multi_socket_action(download_mgr->curl_multi_,
543  CURL_SOCKET_TIMEOUT,
544  0,
545  &still_running);
546  }
547 
548  // Terminate I/O thread
549  if (download_mgr->watch_fds_[0].revents)
550  break;
551 
552  // New job arrives
553  if (download_mgr->watch_fds_[1].revents) {
554  download_mgr->watch_fds_[1].revents = 0;
555  JobInfo *info;
556  // NOLINTNEXTLINE(bugprone-sizeof-expression)
557  ReadPipe(download_mgr->pipe_jobs_[0], &info, sizeof(info));
558  if (!still_running)
559  gettimeofday(&timeval_start, NULL);
560  CURL *handle = download_mgr->AcquireCurlHandle();
561  download_mgr->InitializeRequest(info, handle);
562  download_mgr->SetUrlOptions(info);
563  curl_multi_add_handle(download_mgr->curl_multi_, handle);
564  curl_multi_socket_action(download_mgr->curl_multi_,
565  CURL_SOCKET_TIMEOUT,
566  0,
567  &still_running);
568  }
569 
570  // Activity on curl sockets
571  // Within this loop the curl_multi_socket_action() may cause socket(s)
572  // to be removed from watch_fds_. If a socket is removed it is replaced
573  // by the socket at the end of the array and the inuse count is decreased.
574  // Therefore loop over the array in reverse order.
575  for (int64_t i = download_mgr->watch_fds_inuse_-1; i >= 2; --i) {
576  if (i >= download_mgr->watch_fds_inuse_) {
577  continue;
578  }
579  if (download_mgr->watch_fds_[i].revents) {
580  int ev_bitmask = 0;
581  if (download_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI))
582  ev_bitmask |= CURL_CSELECT_IN;
583  if (download_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
584  ev_bitmask |= CURL_CSELECT_OUT;
585  if (download_mgr->watch_fds_[i].revents &
586  (POLLERR | POLLHUP | POLLNVAL))
587  {
588  ev_bitmask |= CURL_CSELECT_ERR;
589  }
590  download_mgr->watch_fds_[i].revents = 0;
591 
592  curl_multi_socket_action(download_mgr->curl_multi_,
593  download_mgr->watch_fds_[i].fd,
594  ev_bitmask,
595  &still_running);
596  }
597  }
598 
599  // Check if transfers are completed
600  CURLMsg *curl_msg;
601  int msgs_in_queue;
602  while ((curl_msg = curl_multi_info_read(download_mgr->curl_multi_,
603  &msgs_in_queue)))
604  {
605  if (curl_msg->msg == CURLMSG_DONE) {
606  perf::Inc(download_mgr->counters_->n_requests);
607  JobInfo *info;
608  CURL *easy_handle = curl_msg->easy_handle;
609  int curl_error = curl_msg->data.result;
610  curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
611 
612  curl_multi_remove_handle(download_mgr->curl_multi_, easy_handle);
613  if (download_mgr->VerifyAndFinalize(curl_error, info)) {
614  curl_multi_add_handle(download_mgr->curl_multi_, easy_handle);
615  curl_multi_socket_action(download_mgr->curl_multi_,
616  CURL_SOCKET_TIMEOUT,
617  0,
618  &still_running);
619  } else {
620  // Return easy handle into pool and write result back
621  download_mgr->ReleaseCurlHandle(easy_handle);
622 
623  WritePipe(info->wait_at[1], &info->error_code,
624  sizeof(info->error_code));
625  }
626  }
627  }
628  }
629 
630  for (set<CURL *>::iterator i = download_mgr->pool_handles_inuse_->begin(),
631  iEnd = download_mgr->pool_handles_inuse_->end(); i != iEnd; ++i)
632  {
633  curl_multi_remove_handle(download_mgr->curl_multi_, *i);
634  curl_easy_cleanup(*i);
635  }
636  download_mgr->pool_handles_inuse_->clear();
637  free(download_mgr->watch_fds_);
638 
639  LogCvmfs(kLogDownload, kLogDebug, "download I/O thread terminated");
640  return NULL;
641 }
642 
643 
644 //------------------------------------------------------------------------------
645 
646 
647 HeaderLists::~HeaderLists() {
648  for (unsigned i = 0; i < blocks_.size(); ++i) {
649  delete[] blocks_[i];
650  }
651  blocks_.clear();
652 }
653 
654 
655 curl_slist *HeaderLists::GetList(const char *header) {
656  return Get(header);
657 }
658 
659 
660 curl_slist *HeaderLists::DuplicateList(curl_slist *slist) {
661  assert(slist);
662  curl_slist *copy = GetList(slist->data);
663  copy->next = slist->next;
664  curl_slist *prev = copy;
665  slist = slist->next;
666  while (slist) {
667  curl_slist *new_link = Get(slist->data);
668  new_link->next = slist->next;
669  prev->next = new_link;
670  prev = new_link;
671  slist = slist->next;
672  }
673  return copy;
674 }
675 
676 
677 void HeaderLists::AppendHeader(curl_slist *slist, const char *header) {
678  assert(slist);
679  curl_slist *new_link = Get(header);
680  new_link->next = NULL;
681 
682  while (slist->next)
683  slist = slist->next;
684  slist->next = new_link;
685 }
686 
687 
693 void HeaderLists::CutHeader(const char *header, curl_slist **slist) {
694  assert(slist);
695  curl_slist head;
696  head.next = *slist;
697  curl_slist *prev = &head;
698  curl_slist *rover = *slist;
699  while (rover) {
700  if (strcmp(rover->data, header) == 0) {
701  prev->next = rover->next;
702  Put(rover);
703  rover = prev;
704  }
705  prev = rover;
706  rover = rover->next;
707  }
708  *slist = head.next;
709 }
710 
711 
712 void HeaderLists::PutList(curl_slist *slist) {
713  while (slist) {
714  curl_slist *next = slist->next;
715  Put(slist);
716  slist = next;
717  }
718 }
719 
720 
721 string HeaderLists::Print(curl_slist *slist) {
722  string verbose;
723  while (slist) {
724  verbose += string(slist->data) + "\n";
725  slist = slist->next;
726  }
727  return verbose;
728 }
729 
730 
731 curl_slist *HeaderLists::Get(const char *header) {
732  for (unsigned i = 0; i < blocks_.size(); ++i) {
733  for (unsigned j = 0; j < kBlockSize; ++j) {
734  if (!IsUsed(&(blocks_[i][j]))) {
735  blocks_[i][j].data = const_cast<char *>(header);
736  return &(blocks_[i][j]);
737  }
738  }
739  }
740 
741  // All used, new block
742  AddBlock();
743  blocks_[blocks_.size()-1][0].data = const_cast<char *>(header);
744  return &(blocks_[blocks_.size()-1][0]);
745 }
746 
747 
748 void HeaderLists::Put(curl_slist *slist) {
749  slist->data = NULL;
750  slist->next = NULL;
751 }
752 
753 
754 void HeaderLists::AddBlock() {
755  curl_slist *new_block = new curl_slist[kBlockSize];
756  for (unsigned i = 0; i < kBlockSize; ++i) {
757  Put(&new_block[i]);
758  }
759  blocks_.push_back(new_block);
760 }
761 
762 
763 //------------------------------------------------------------------------------
764 
765 
766 string DownloadManager::ProxyInfo::Print() {
767  if (url == "DIRECT")
768  return url;
769 
770  string result = url;
771  int remaining =
772  static_cast<int>(host.deadline()) - static_cast<int>(time(NULL));
773  string expinfo = (remaining >= 0) ? "+" : "";
774  if (abs(remaining) >= 3600) {
775  expinfo += StringifyInt(remaining/3600) + "h";
776  } else if (abs(remaining) >= 60) {
777  expinfo += StringifyInt(remaining/60) + "m";
778  } else {
779  expinfo += StringifyInt(remaining) + "s";
780  }
781  if (host.status() == dns::kFailOk) {
782  result += " (" + host.name() + ", " + expinfo + ")";
783  } else {
784  result += " (:unresolved:, " + expinfo + ")";
785  }
786  return result;
787 }
788 
789 
794 CURL *DownloadManager::AcquireCurlHandle() {
795  CURL *handle;
796 
797  if (pool_handles_idle_->empty()) {
798  // Create a new handle
799  handle = curl_easy_init();
800  assert(handle != NULL);
801 
802  curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
803  // curl_easy_setopt(curl_default, CURLOPT_FAILONERROR, 1);
804  curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION, CallbackCurlHeader);
805  curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlData);
806  } else {
807  handle = *(pool_handles_idle_->begin());
808  pool_handles_idle_->erase(pool_handles_idle_->begin());
809  }
810 
811  pool_handles_inuse_->insert(handle);
812 
813  return handle;
814 }
815 
816 
817 void DownloadManager::ReleaseCurlHandle(CURL *handle) {
818  set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
819  assert(elem != pool_handles_inuse_->end());
820 
821  if (pool_handles_idle_->size() > pool_max_handles_) {
822  curl_easy_cleanup(*elem);
823  } else {
824  pool_handles_idle_->insert(*elem);
825  }
826 
827  pool_handles_inuse_->erase(elem);
828 }
829 
830 
835 void DownloadManager::InitializeRequest(JobInfo *info, CURL *handle) {
836  // Initialize internal download state
837  info->curl_handle = handle;
838  info->error_code = kFailOk;
839  info->http_code = -1;
840  info->follow_redirects = follow_redirects_;
841  info->num_used_proxies = 1;
842  info->num_used_hosts = 1;
843  info->num_retries = 0;
844  info->backoff_ms = 0;
845  info->headers = header_lists_->DuplicateList(default_headers_);
846  if (info->info_header) {
847  header_lists_->AppendHeader(info->headers, info->info_header);
848  }
849  if (info->force_nocache) {
850  SetNocache(info);
851  } else {
852  info->nocache = false;
853  }
854  if (info->compressed) {
855  zlib::DecompressInit(&(info->zstream));
856  }
857  if (info->expected_hash) {
858  assert(info->hash_context.buffer != NULL);
859  shash::Init(info->hash_context);
860  }
861 
862  if ((info->range_offset != -1) && (info->range_size)) {
863  char byte_range_array[100];
864  const int64_t range_lower = static_cast<int64_t>(info->range_offset);
865  const int64_t range_upper = static_cast<int64_t>(
866  info->range_offset + info->range_size - 1);
867  if (snprintf(byte_range_array, sizeof(byte_range_array),
868  "%" PRId64 "-%" PRId64,
869  range_lower, range_upper) == 100)
870  {
871  PANIC(NULL); // Should be impossible given limits on offset size.
872  }
873  curl_easy_setopt(handle, CURLOPT_RANGE, byte_range_array);
874  } else {
875  curl_easy_setopt(handle, CURLOPT_RANGE, NULL);
876  }
877 
878  // Set curl parameters
879  curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
880  curl_easy_setopt(handle, CURLOPT_WRITEHEADER,
881  static_cast<void *>(info));
882  curl_easy_setopt(handle, CURLOPT_WRITEDATA, static_cast<void *>(info));
883  curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->headers);
884  if (info->head_request) {
885  curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
886  } else {
887  curl_easy_setopt(handle, CURLOPT_HTTPGET, 1);
888  }
889  if (opt_ipv4_only_) {
890  curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
891  }
892  if (follow_redirects_) {
893  curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1);
894  curl_easy_setopt(handle, CURLOPT_MAXREDIRS, 4);
895  }
896 }
897 
898 
903 void DownloadManager::SetUrlOptions(JobInfo *info) {
904  CURL *curl_handle = info->curl_handle;
905  string url_prefix;
906 
907  MutexLockGuard m(lock_options_);
908  // Check if proxy group needs to be reset from backup to primary
909  if (opt_timestamp_backup_proxies_ > 0) {
910  const time_t now = time(NULL);
911  if (static_cast<int64_t>(now) >
912  static_cast<int64_t>(opt_timestamp_backup_proxies_ +
913  opt_proxy_groups_reset_after_))
914  {
915  opt_proxy_groups_current_ = 0;
916  opt_timestamp_backup_proxies_ = 0;
917  RebalanceProxiesUnlocked("reset proxy group");
918  }
919  }
920  // Check if load-balanced proxies within the group need to be reset
921  if (opt_timestamp_failover_proxies_ > 0) {
922  const time_t now = time(NULL);
923  if (static_cast<int64_t>(now) >
924  static_cast<int64_t>(opt_timestamp_failover_proxies_ +
925  opt_proxy_groups_reset_after_))
926  {
927  RebalanceProxiesUnlocked("reset load-balanced proxies");
928  }
929  }
930  // Check if host needs to be reset
931  if (opt_timestamp_backup_host_ > 0) {
932  const time_t now = time(NULL);
933  if (static_cast<int64_t>(now) >
934  static_cast<int64_t>(opt_timestamp_backup_host_ +
935  opt_host_reset_after_))
936  {
938  "switching host from %s to %s (reset host)",
939  (*opt_host_chain_)[opt_host_chain_current_].c_str(),
940  (*opt_host_chain_)[0].c_str());
941  opt_host_chain_current_ = 0;
942  opt_timestamp_backup_host_ = 0;
943  }
944  }
945 
946  ProxyInfo *proxy = ChooseProxyUnlocked(info->expected_hash);
947  if (!proxy || (proxy->url == "DIRECT")) {
948  info->proxy = "DIRECT";
949  curl_easy_setopt(info->curl_handle, CURLOPT_PROXY, "");
950  } else {
951  // Note: inside ValidateProxyIpsUnlocked() we may change the proxy data
952  // structure, so we must not pass proxy->... (== current_proxy())
953  // parameters directly
954  std::string purl = proxy->url;
955  dns::Host phost = proxy->host;
956  const bool changed = ValidateProxyIpsUnlocked(purl, phost);
957  // Current proxy may have changed
958  if (changed)
959  proxy = ChooseProxyUnlocked(info->expected_hash);
960  info->proxy = proxy->url;
961  if (proxy->host.status() == dns::kFailOk) {
962  curl_easy_setopt(info->curl_handle, CURLOPT_PROXY, info->proxy.c_str());
963  } else {
964  // We know it can't work, don't even try to download
965  curl_easy_setopt(info->curl_handle, CURLOPT_PROXY, "0.0.0.0");
966  }
967  }
968  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, opt_low_speed_limit_);
969  if (info->proxy != "DIRECT") {
970  curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_proxy_);
971  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_proxy_);
972  } else {
973  curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_direct_);
974  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_direct_);
975  }
976  if (!opt_dns_server_.empty())
977  curl_easy_setopt(curl_handle, CURLOPT_DNS_SERVERS, opt_dns_server_.c_str());
978 
979  if (info->probe_hosts && opt_host_chain_) {
980  url_prefix = (*opt_host_chain_)[opt_host_chain_current_];
981  info->current_host_chain_index = opt_host_chain_current_;
982  }
983 
984  string url = url_prefix + *(info->url);
985 
986  curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 1L);
987  if (url.substr(0, 5) == "https") {
988  bool rvb = ssl_certificate_store_.ApplySslCertificatePath(curl_handle);
989  if (!rvb) {
991  "Failed to set SSL certificate path %s",
992  ssl_certificate_store_.GetCaPath().c_str());
993  }
994  if (info->pid != -1) {
995  if (credentials_attachment_ == NULL) {
997  "uses secure downloads but no credentials attachment set");
998  } else {
999  bool retval = credentials_attachment_->ConfigureCurlHandle(
1000  curl_handle, info->pid, &info->cred_data);
1001  if (!retval) {
1002  LogCvmfs(kLogDownload, kLogDebug, "failed attaching credentials");
1003  }
1004  }
1005  }
1006  // The download manager disables signal handling in the curl library;
1007  // as OpenSSL's implementation of TLS will generate a sigpipe in some
1008  // error paths, we must explicitly disable SIGPIPE here.
1009  // TODO(jblomer): it should be enough to do this once
1010  signal(SIGPIPE, SIG_IGN);
1011  }
1012 
1013  if (url.find("@proxy@") != string::npos) {
1014  // This is used in Geo-API requests (only), to replace a portion of the
1015  // URL with the current proxy name for the sake of caching the result.
1016  // Replace the @proxy@ either with a passed in "forced" template (which
1017  // is set from $CVMFS_PROXY_TEMPLATE) if there is one, or a "direct"
1018  // template (which is the uuid) if there's no proxy, or the name of the
1019  // proxy.
1020  string replacement;
1021  if (proxy_template_forced_ != "") {
1022  replacement = proxy_template_forced_;
1023  } else if (info->proxy == "DIRECT") {
1024  replacement = proxy_template_direct_;
1025  } else {
1026  if (opt_proxy_groups_current_ >= opt_proxy_groups_fallback_) {
1027  // It doesn't make sense to use the fallback proxies in Geo-API requests
1028  // since the fallback proxies are supposed to get sorted, too.
1029  info->proxy = "DIRECT";
1030  curl_easy_setopt(info->curl_handle, CURLOPT_PROXY, "");
1031  replacement = proxy_template_direct_;
1032  } else {
1033  replacement = ChooseProxyUnlocked(info->expected_hash)->host.name();
1034  }
1035  }
1036  replacement = (replacement == "") ? proxy_template_direct_ : replacement;
1037  LogCvmfs(kLogDownload, kLogDebug, "replacing @proxy@ by %s",
1038  replacement.c_str());
1039  url = ReplaceAll(url, "@proxy@", replacement);
1040  }
1041 
1042  if ((info->destination == kDestinationMem) &&
1043  (info->destination_mem.size == 0) &&
1044  HasPrefix(url, "file://", false))
1045  {
1046  info->destination_mem.size = 64*1024;
1047  info->destination_mem.data = static_cast<char *>(smalloc(64*1024));
1048  }
1049 
1050  curl_easy_setopt(curl_handle, CURLOPT_URL, EscapeUrl(url).c_str());
1051 }
1052 
1053 
1063 bool DownloadManager::ValidateProxyIpsUnlocked(
1064  const string &url,
1065  const dns::Host &host)
1066 {
1067  if (!host.IsExpired())
1068  return false;
1069  LogCvmfs(kLogDownload, kLogDebug, "validate DNS entry for %s",
1070  host.name().c_str());
1071 
1072  unsigned group_idx = opt_proxy_groups_current_;
1073  dns::Host new_host = resolver_->Resolve(host.name());
1074 
1075  bool update_only = true; // No changes to the list of IP addresses.
1076  if (new_host.status() != dns::kFailOk) {
1077  // Try again later in case resolving fails.
1079  "failed to resolve IP addresses for %s (%d - %s)",
1080  host.name().c_str(), new_host.status(),
1081  dns::Code2Ascii(new_host.status()));
1082  new_host = dns::Host::ExtendDeadline(host, resolver_->min_ttl());
1083  } else if (!host.IsEquivalent(new_host)) {
1084  update_only = false;
1085  }
1086 
1087  if (update_only) {
1088  for (unsigned i = 0; i < (*opt_proxy_groups_)[group_idx].size(); ++i) {
1089  if ((*opt_proxy_groups_)[group_idx][i].host.id() == host.id())
1090  (*opt_proxy_groups_)[group_idx][i].host = new_host;
1091  }
1092  return false;
1093  }
1094 
1095  assert(new_host.status() == dns::kFailOk);
1096 
1097  // Remove old host objects, insert new objects, and rebalance.
1099  "DNS entries for proxy %s changed, adjusting", host.name().c_str());
1100  vector<ProxyInfo> *group = current_proxy_group();
1101  opt_num_proxies_ -= group->size();
1102  for (unsigned i = 0; i < group->size(); ) {
1103  if ((*group)[i].host.id() == host.id()) {
1104  group->erase(group->begin() + i);
1105  } else {
1106  i++;
1107  }
1108  }
1109  vector<ProxyInfo> new_infos;
1110  set<string> best_addresses = new_host.ViewBestAddresses(opt_ip_preference_);
1111  set<string>::const_iterator iter_ips = best_addresses.begin();
1112  for (; iter_ips != best_addresses.end(); ++iter_ips) {
1113  string url_ip = dns::RewriteUrl(url, *iter_ips);
1114  new_infos.push_back(ProxyInfo(new_host, url_ip));
1115  }
1116  group->insert(group->end(), new_infos.begin(), new_infos.end());
1117  opt_num_proxies_ += new_infos.size();
1118 
1119  RebalanceProxiesUnlocked("DNS change");
1120  return true;
1121 }
1122 
1123 
1127 void DownloadManager::UpdateStatistics(CURL *handle) {
1128  double val;
1129  int retval;
1130  int64_t sum = 0;
1131 
1132  retval = curl_easy_getinfo(handle, CURLINFO_SIZE_DOWNLOAD, &val);
1133  assert(retval == CURLE_OK);
1134  sum += static_cast<int64_t>(val);
1135  /*retval = curl_easy_getinfo(handle, CURLINFO_HEADER_SIZE, &val);
1136  assert(retval == CURLE_OK);
1137  sum += static_cast<int64_t>(val);*/
1138  perf::Xadd(counters_->sz_transferred_bytes, sum);
1139 }
1140 
1141 
1145 bool DownloadManager::CanRetry(const JobInfo *info) {
1146  MutexLockGuard m(lock_options_);
1147  unsigned max_retries = opt_max_retries_;
1148 
1149  return !info->nocache && (info->num_retries < max_retries) &&
1150  (IsProxyTransferError(info->error_code) ||
1152 }
1153 
1161 void DownloadManager::Backoff(JobInfo *info) {
1162  unsigned backoff_init_ms = 0;
1163  unsigned backoff_max_ms = 0;
1164  {
1165  MutexLockGuard m(lock_options_);
1166  backoff_init_ms = opt_backoff_init_ms_;
1167  backoff_max_ms = opt_backoff_max_ms_;
1168  }
1169 
1170  info->num_retries++;
1171  perf::Inc(counters_->n_retries);
1172  if (info->backoff_ms == 0) {
1173  info->backoff_ms = prng_.Next(backoff_init_ms + 1); // Must be != 0
1174  } else {
1175  info->backoff_ms *= 2;
1176  }
1177  if (info->backoff_ms > backoff_max_ms) info->backoff_ms = backoff_max_ms;
1178 
1179  LogCvmfs(kLogDownload, kLogDebug, "backing off for %d ms", info->backoff_ms);
1180  SafeSleepMs(info->backoff_ms);
1181 }
1182 
1183 void DownloadManager::SetNocache(JobInfo *info) {
1184  if (info->nocache)
1185  return;
1186  header_lists_->AppendHeader(info->headers, "Pragma: no-cache");
1187  header_lists_->AppendHeader(info->headers, "Cache-Control: no-cache");
1188  curl_easy_setopt(info->curl_handle, CURLOPT_HTTPHEADER, info->headers);
1189  info->nocache = true;
1190 }
1191 
1192 
1197 void DownloadManager::SetRegularCache(JobInfo *info) {
1198  if (info->nocache == false)
1199  return;
1200  header_lists_->CutHeader("Pragma: no-cache", &(info->headers));
1201  header_lists_->CutHeader("Cache-Control: no-cache", &(info->headers));
1202  curl_easy_setopt(info->curl_handle, CURLOPT_HTTPHEADER, info->headers);
1203  info->nocache = false;
1204 }
1205 
1206 
1210 void DownloadManager::ReleaseCredential(JobInfo *info) {
1211  if (info->cred_data) {
1212  assert(credentials_attachment_ != NULL); // Someone must have set it
1213  credentials_attachment_->ReleaseCurlHandle(info->curl_handle,
1214  info->cred_data);
1215  info->cred_data = NULL;
1216  }
1217 }
1218 
1219 
1226 bool DownloadManager::VerifyAndFinalize(const int curl_error, JobInfo *info) {
1228  "Verify downloaded url %s, proxy %s (curl error %d)",
1229  info->url->c_str(), info->proxy.c_str(), curl_error);
1230  UpdateStatistics(info->curl_handle);
1231 
1232  // Verification and error classification
1233  switch (curl_error) {
1234  case CURLE_OK:
1235  // Verify content hash
1236  if (info->expected_hash) {
1237  shash::Any match_hash;
1238  shash::Final(info->hash_context, &match_hash);
1239  if (match_hash != *(info->expected_hash)) {
1241  "hash verification of %s failed (expected %s, got %s)",
1242  info->url->c_str(), info->expected_hash->ToString().c_str(),
1243  match_hash.ToString().c_str());
1244  info->error_code = kFailBadData;
1245  break;
1246  }
1247  }
1248 
1249  // Decompress memory in a single run
1250  if ((info->destination == kDestinationMem) && info->compressed) {
1251  void *buf;
1252  uint64_t size;
1253  bool retval = zlib::DecompressMem2Mem(
1254  info->destination_mem.data,
1255  static_cast<int64_t>(info->destination_mem.pos),
1256  &buf, &size);
1257  if (retval) {
1258  free(info->destination_mem.data);
1259  info->destination_mem.data = static_cast<char *>(buf);
1260  info->destination_mem.pos = info->destination_mem.size = size;
1261  } else {
1263  "decompression (memory) of url %s failed",
1264  info->url->c_str());
1265  info->error_code = kFailBadData;
1266  break;
1267  }
1268  }
1269 
1270  info->error_code = kFailOk;
1271  break;
1272  case CURLE_UNSUPPORTED_PROTOCOL:
1274  break;
1275  case CURLE_URL_MALFORMAT:
1276  info->error_code = kFailBadUrl;
1277  break;
1278  case CURLE_COULDNT_RESOLVE_PROXY:
1279  info->error_code = kFailProxyResolve;
1280  break;
1281  case CURLE_COULDNT_RESOLVE_HOST:
1282  info->error_code = kFailHostResolve;
1283  break;
1284  case CURLE_OPERATION_TIMEDOUT:
1285  info->error_code = (info->proxy == "DIRECT") ?
1287  break;
1288  case CURLE_PARTIAL_FILE:
1289  case CURLE_GOT_NOTHING:
1290  case CURLE_RECV_ERROR:
1291  info->error_code = (info->proxy == "DIRECT") ?
1293  break;
1294  case CURLE_FILE_COULDNT_READ_FILE:
1295  case CURLE_COULDNT_CONNECT:
1296  if (info->proxy != "DIRECT") {
1297  // This is a guess. Fail-over can still change to switching host
1299  } else {
1301  }
1302  break;
1303  case CURLE_TOO_MANY_REDIRECTS:
1305  break;
1306  case CURLE_SSL_CACERT_BADFILE:
1308  "Failed to load certificate bundle. "
1309  "X509_CERT_BUNDLE might point to the wrong location.");
1311  break;
1312  // As of curl 7.62.0, CURLE_SSL_CACERT is the same as
1313  // CURLE_PEER_FAILED_VERIFICATION
1314  case CURLE_PEER_FAILED_VERIFICATION:
1316  "invalid SSL certificate of remote host. "
1317  "X509_CERT_DIR and/or X509_CERT_BUNDLE might point to the wrong "
1318  "location.");
1320  break;
1321  case CURLE_ABORTED_BY_CALLBACK:
1322  case CURLE_WRITE_ERROR:
1323  // Error set by callback
1324  break;
1325  default:
1326  LogCvmfs(kLogDownload, kLogSyslogErr, "unexpected curl error (%d) while "
1327  "trying to fetch %s", curl_error, info->url->c_str());
1328  info->error_code = kFailOther;
1329  break;
1330  }
1331 
1332  std::vector<std::string> *host_chain = opt_host_chain_;
1333 
1334  // Determination if download should be repeated
1335  bool try_again = false;
1336  bool same_url_retry = CanRetry(info);
1337  if (info->error_code != kFailOk) {
1338  MutexLockGuard m(lock_options_);
1339  if (info->error_code == kFailBadData) {
1340  if (!info->nocache) {
1341  try_again = true;
1342  } else {
1343  // Make it a host failure
1345  "data corruption with no-cache header, try another host");
1346 
1347  info->error_code = kFailHostHttp;
1348  }
1349  }
1350  if ( same_url_retry || (
1351  ( (info->error_code == kFailHostResolve) ||
1353  (info->error_code == kFailHostHttp)) &&
1354  info->probe_hosts &&
1355  host_chain && (info->num_used_hosts < host_chain->size()))
1356  )
1357  {
1358  try_again = true;
1359  }
1360  if ( same_url_retry || (
1361  ( (info->error_code == kFailProxyResolve) ||
1363  (info->error_code == kFailProxyHttp)) )
1364  )
1365  {
1366  try_again = true;
1367  // If all proxies failed, do a next round with the next host
1368  if (!same_url_retry && (info->num_used_proxies >= opt_num_proxies_)) {
1369  // Check if this can be made a host fail-over
1370  if (info->probe_hosts &&
1371  host_chain &&
1372  (info->num_used_hosts < host_chain->size()))
1373  {
1374  // reset proxy group if not already performed by other handle
1375  if (opt_proxy_groups_) {
1376  if ((opt_proxy_groups_current_ > 0) ||
1377  (opt_proxy_groups_current_burned_ > 0))
1378  {
1379  opt_proxy_groups_current_ = 0;
1380  opt_timestamp_backup_proxies_ = 0;
1381  RebalanceProxiesUnlocked("reset proxies for host failover");
1382  }
1383  }
1384 
1385  // Make it a host failure
1386  LogCvmfs(kLogDownload, kLogDebug, "make it a host failure");
1387  info->num_used_proxies = 1;
1389  } else {
1390  try_again = false;
1391  }
1392  } // Make a proxy failure a host failure
1393  } // Proxy failure assumed
1394  }
1395 
1396  if (try_again) {
1397  LogCvmfs(kLogDownload, kLogDebug, "Trying again on same curl handle, "
1398  "same url: %d, error code %d", same_url_retry, info->error_code);
1399  // Reset internal state and destination
1400  if ((info->destination == kDestinationMem) && info->destination_mem.data) {
1401  free(info->destination_mem.data);
1402  info->destination_mem.data = NULL;
1403  info->destination_mem.size = 0;
1404  info->destination_mem.pos = 0;
1405  }
1406  if ((info->destination == kDestinationFile) ||
1407  (info->destination == kDestinationPath))
1408  {
1409  if ((fflush(info->destination_file) != 0) ||
1410  (ftruncate(fileno(info->destination_file), 0) != 0))
1411  {
1412  info->error_code = kFailLocalIO;
1413  goto verify_and_finalize_stop;
1414  }
1415  rewind(info->destination_file);
1416  }
1417  if (info->destination == kDestinationSink) {
1418  if (info->destination_sink->Reset() != 0) {
1419  info->error_code = kFailLocalIO;
1420  goto verify_and_finalize_stop;
1421  }
1422  }
1423  if (info->expected_hash)
1424  shash::Init(info->hash_context);
1425  if (info->compressed)
1426  zlib::DecompressInit(&info->zstream);
1427  SetRegularCache(info);
1428 
1429  // Failure handling
1430  bool switch_proxy = false;
1431  bool switch_host = false;
1432  switch (info->error_code) {
1433  case kFailBadData:
1434  SetNocache(info);
1435  break;
1436  case kFailProxyResolve:
1437  case kFailProxyHttp:
1438  switch_proxy = true;
1439  break;
1440  case kFailHostResolve:
1441  case kFailHostHttp:
1442  case kFailHostAfterProxy:
1443  switch_host = true;
1444  break;
1445  default:
1446  if (IsProxyTransferError(info->error_code)) {
1447  if (same_url_retry) {
1448  Backoff(info);
1449  } else {
1450  switch_proxy = true;
1451  }
1452  } else if (IsHostTransferError(info->error_code)) {
1453  if (same_url_retry) {
1454  Backoff(info);
1455  } else {
1456  switch_host = true;
1457  }
1458  } else {
1459  // No other errors expected when retrying
1460  PANIC(NULL);
1461  }
1462  }
1463  if (switch_proxy) {
1464  ReleaseCredential(info);
1465  SwitchProxy(info);
1466  info->num_used_proxies++;
1467  SetUrlOptions(info);
1468  }
1469  if (switch_host) {
1470  ReleaseCredential(info);
1471  SwitchHost(info);
1472  info->num_used_hosts++;
1473  SetUrlOptions(info);
1474  }
1475 
1476  return true; // try again
1477  }
1478 
1479  verify_and_finalize_stop:
1480  // Finalize, flush destination file
1481  ReleaseCredential(info);
1482  if ((info->destination == kDestinationFile) &&
1483  fflush(info->destination_file) != 0)
1484  {
1485  info->error_code = kFailLocalIO;
1486  } else if (info->destination == kDestinationPath) {
1487  if (fclose(info->destination_file) != 0)
1488  info->error_code = kFailLocalIO;
1489  info->destination_file = NULL;
1490  }
1491 
1492  if (info->compressed)
1493  zlib::DecompressFini(&info->zstream);
1494 
1495  if (info->headers) {
1496  header_lists_->PutList(info->headers);
1497  info->headers = NULL;
1498  }
1499 
1500  return false; // stop transfer and return to Fetch()
1501 }
1502 
1503 
1504 DownloadManager::DownloadManager() {
1505  pool_handles_idle_ = NULL;
1506  pool_handles_inuse_ = NULL;
1507  pool_max_handles_ = 0;
1508  curl_multi_ = NULL;
1509  default_headers_ = NULL;
1510 
1511  atomic_init32(&multi_threaded_);
1512  pipe_terminate_[0] = pipe_terminate_[1] = -1;
1513 
1514  pipe_jobs_[0] = pipe_jobs_[1] = -1;
1515  watch_fds_ = NULL;
1516  watch_fds_size_ = 0;
1517  watch_fds_inuse_ = 0;
1518  watch_fds_max_ = 0;
1519 
1520  lock_options_ =
1521  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1522  int retval = pthread_mutex_init(lock_options_, NULL);
1523  assert(retval == 0);
1524  lock_synchronous_mode_ =
1525  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1526  retval = pthread_mutex_init(lock_synchronous_mode_, NULL);
1527  assert(retval == 0);
1528 
1529  opt_dns_server_ = "";
1530  opt_ip_preference_ = dns::kIpPreferSystem;
1531  opt_timeout_proxy_ = 0;
1532  opt_timeout_direct_ = 0;
1533  opt_low_speed_limit_ = 0;
1534  opt_host_chain_ = NULL;
1535  opt_host_chain_rtt_ = NULL;
1536  opt_host_chain_current_ = 0;
1537  opt_proxy_groups_ = NULL;
1538  opt_proxy_groups_current_ = 0;
1539  opt_proxy_groups_current_burned_ = 0;
1540  opt_num_proxies_ = 0;
1541  opt_proxy_shard_ = false;
1542  opt_max_retries_ = 0;
1543  opt_backoff_init_ms_ = 0;
1544  opt_backoff_max_ms_ = 0;
1545  enable_info_header_ = false;
1546  opt_ipv4_only_ = false;
1547  follow_redirects_ = false;
1548 
1549  resolver_ = NULL;
1550 
1551  opt_timestamp_backup_proxies_ = 0;
1552  opt_timestamp_failover_proxies_ = 0;
1553  opt_proxy_groups_reset_after_ = 0;
1554  opt_timestamp_backup_host_ = 0;
1555  opt_host_reset_after_ = 0;
1556 
1557  credentials_attachment_ = NULL;
1558 
1559  counters_ = NULL;
1560 }
1561 
1562 
1563 DownloadManager::~DownloadManager() {
1564  pthread_mutex_destroy(lock_options_);
1565  pthread_mutex_destroy(lock_synchronous_mode_);
1566  free(lock_options_);
1567  free(lock_synchronous_mode_);
1568 }
1569 
1570 void DownloadManager::InitHeaders() {
1571  // User-Agent
1572  string cernvm_id = "User-Agent: cvmfs ";
1573 #ifdef CVMFS_LIBCVMFS
1574  cernvm_id += "libcvmfs ";
1575 #else
1576  cernvm_id += "Fuse ";
1577 #endif
1578  cernvm_id += string(VERSION);
1579  if (getenv("CERNVM_UUID") != NULL) {
1580  cernvm_id += " " +
1581  sanitizer::InputSanitizer("az AZ 09 -").Filter(getenv("CERNVM_UUID"));
1582  }
1583  user_agent_ = strdup(cernvm_id.c_str());
1584 
1585  header_lists_ = new HeaderLists();
1586 
1587  default_headers_ = header_lists_->GetList("Connection: Keep-Alive");
1588  header_lists_->AppendHeader(default_headers_, "Pragma:");
1589  header_lists_->AppendHeader(default_headers_, user_agent_);
1590 }
1591 
1592 
1593 void DownloadManager::FiniHeaders() {
1594  delete header_lists_;
1595  header_lists_ = NULL;
1596  default_headers_ = NULL;
1597 }
1598 
1599 
1600 void DownloadManager::Init(const unsigned max_pool_handles,
1601  const perf::StatisticsTemplate &statistics)
1602 {
1603  atomic_init32(&multi_threaded_);
1604  int retval = curl_global_init(CURL_GLOBAL_ALL);
1605  assert(retval == CURLE_OK);
1606  pool_handles_idle_ = new set<CURL *>;
1607  pool_handles_inuse_ = new set<CURL *>;
1608  pool_max_handles_ = max_pool_handles;
1609  watch_fds_max_ = 4*pool_max_handles_;
1610 
1611  opt_timeout_proxy_ = 5;
1612  opt_timeout_direct_ = 10;
1613  opt_low_speed_limit_ = 1024;
1614  opt_proxy_groups_current_ = 0;
1615  opt_proxy_groups_current_burned_ = 0;
1616  opt_num_proxies_ = 0;
1617  opt_proxy_shard_ = false;
1618  opt_host_chain_current_ = 0;
1619  opt_ip_preference_ = dns::kIpPreferSystem;
1620 
1621  counters_ = new Counters(statistics);
1622 
1623  user_agent_ = NULL;
1624  InitHeaders();
1625 
1626  curl_multi_ = curl_multi_init();
1627  assert(curl_multi_ != NULL);
1628  curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION, CallbackCurlSocket);
1629  curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1630  static_cast<void *>(this));
1631  curl_multi_setopt(curl_multi_, CURLMOPT_MAXCONNECTS, watch_fds_max_);
1632  curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1633  pool_max_handles_);
1634 
1635  prng_.InitLocaltime();
1636 
1637  // Name resolving
1638  if ((getenv("CVMFS_IPV4_ONLY") != NULL) &&
1639  (strlen(getenv("CVMFS_IPV4_ONLY")) > 0))
1640  {
1641  opt_ipv4_only_ = true;
1642  }
1643  resolver_ = dns::NormalResolver::Create(opt_ipv4_only_,
1644  kDnsDefaultRetries, kDnsDefaultTimeoutMs);
1645  assert(resolver_);
1646 }
1647 
1648 
1650  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1651  // Shutdown I/O thread
1652  char buf = 'T';
1653  WritePipe(pipe_terminate_[1], &buf, 1);
1654  pthread_join(thread_download_, NULL);
1655  // All handles are removed from the multi stack
1656  close(pipe_terminate_[1]);
1657  close(pipe_terminate_[0]);
1658  close(pipe_jobs_[1]);
1659  close(pipe_jobs_[0]);
1660  }
1661 
1662  for (set<CURL *>::iterator i = pool_handles_idle_->begin(),
1663  iEnd = pool_handles_idle_->end(); i != iEnd; ++i)
1664  {
1665  curl_easy_cleanup(*i);
1666  }
1667  delete pool_handles_idle_;
1668  delete pool_handles_inuse_;
1669  curl_multi_cleanup(curl_multi_);
1670  pool_handles_idle_ = NULL;
1671  pool_handles_inuse_ = NULL;
1672  curl_multi_ = NULL;
1673 
1674  FiniHeaders();
1675  if (user_agent_)
1676  free(user_agent_);
1677  user_agent_ = NULL;
1678 
1679  delete counters_;
1680  counters_ = NULL;
1681 
1682  delete opt_host_chain_;
1683  delete opt_host_chain_rtt_;
1684  opt_proxy_map_.clear();
1685  delete opt_proxy_groups_;
1686  opt_host_chain_ = NULL;
1687  opt_host_chain_rtt_ = NULL;
1688  opt_proxy_groups_ = NULL;
1689 
1690  curl_global_cleanup();
1691 
1692  delete resolver_;
1693  resolver_ = NULL;
1694 }
1695 
1696 
1702  MakePipe(pipe_terminate_);
1703  MakePipe(pipe_jobs_);
1704 
1705  int retval = pthread_create(&thread_download_, NULL, MainDownload,
1706  static_cast<void *>(this));
1707  assert(retval == 0);
1708 
1709  atomic_inc32(&multi_threaded_);
1710 }
1711 
1712 
1717  assert(info != NULL);
1718  assert(info->url != NULL);
1719 
1720  Failures result;
1721  result = PrepareDownloadDestination(info);
1722  if (result != kFailOk)
1723  return result;
1724 
1725  if (info->expected_hash) {
1728  info->hash_context.size = shash::GetContextSize(algorithm);
1729  info->hash_context.buffer = alloca(info->hash_context.size);
1730  }
1731 
1732  // Prepare cvmfs-info: header, allocate string on the stack
1733  info->info_header = NULL;
1734  if (enable_info_header_ && info->extra_info) {
1735  const char *header_name = "cvmfs-info: ";
1736  const size_t header_name_len = strlen(header_name);
1737  const unsigned header_size = 1 + header_name_len +
1738  EscapeHeader(*(info->extra_info), NULL, 0);
1739  info->info_header = static_cast<char *>(alloca(header_size));
1740  memcpy(info->info_header, header_name, header_name_len);
1741  EscapeHeader(*(info->extra_info), info->info_header + header_name_len,
1742  header_size - header_name_len);
1743  info->info_header[header_size-1] = '\0';
1744  }
1745 
1746  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1747  if (info->wait_at[0] == -1) {
1748  MakePipe(info->wait_at);
1749  }
1750 
1751  // LogCvmfs(kLogDownload, kLogDebug, "send job to thread, pipe %d %d",
1752  // info->wait_at[0], info->wait_at[1]);
1753  // NOLINTNEXTLINE(bugprone-sizeof-expression)
1754  WritePipe(pipe_jobs_[1], &info, sizeof(info));
1755  ReadPipe(info->wait_at[0], &result, sizeof(result));
1756  // LogCvmfs(kLogDownload, kLogDebug, "got result %d", result);
1757  } else {
1758  MutexLockGuard l(lock_synchronous_mode_);
1759  CURL *handle = AcquireCurlHandle();
1760  InitializeRequest(info, handle);
1761  SetUrlOptions(info);
1762  // curl_easy_setopt(handle, CURLOPT_VERBOSE, 1);
1763  int retval;
1764  do {
1765  retval = curl_easy_perform(handle);
1766  perf::Inc(counters_->n_requests);
1767  double elapsed;
1768  if (curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &elapsed) == CURLE_OK)
1769  {
1770  perf::Xadd(counters_->sz_transfer_time,
1771  static_cast<int64_t>(elapsed * 1000));
1772  }
1773  } while (VerifyAndFinalize(retval, info));
1774  result = info->error_code;
1775  ReleaseCurlHandle(info->curl_handle);
1776  }
1777 
1778  if (result != kFailOk) {
1779  LogCvmfs(kLogDownload, kLogDebug, "download failed (error %d - %s)", result,
1780  Code2Ascii(result));
1781 
1782  if (info->destination == kDestinationPath)
1783  unlink(info->destination_path->c_str());
1784 
1785  if (info->destination_mem.data) {
1786  free(info->destination_mem.data);
1787  info->destination_mem.data = NULL;
1788  info->destination_mem.size = 0;
1789  }
1790  }
1791 
1792  return result;
1793 }
1794 
1795 
1800 void DownloadManager::SetCredentialsAttachment(CredentialsAttachment *ca) {
1801  MutexLockGuard m(lock_options_);
1802  credentials_attachment_ = ca;
1803 }
1804 
1808 std::string DownloadManager::GetDnsServer() const {
1809  return opt_dns_server_;
1810 }
1811 
1816 void DownloadManager::SetDnsServer(const string &address) {
1817  if (!address.empty()) {
1818  MutexLockGuard m(lock_options_);
1819  opt_dns_server_ = address;
1820  assert(!opt_dns_server_.empty());
1821 
1822  vector<string> servers;
1823  servers.push_back(address);
1824  bool retval = resolver_->SetResolvers(servers);
1825  assert(retval);
1826  }
1827  LogCvmfs(kLogDownload, kLogSyslog, "set nameserver to %s", address.c_str());
1828 }
1829 
1830 
1834 void DownloadManager::SetDnsParameters(
1835  const unsigned retries,
1836  const unsigned timeout_ms)
1837 {
1838  MutexLockGuard m(lock_options_);
1839  if ((resolver_->retries() == retries) &&
1840  (resolver_->timeout_ms() == timeout_ms))
1841  {
1842  return;
1843  }
1844  delete resolver_;
1845  resolver_ = NULL;
1846  resolver_ =
1847  dns::NormalResolver::Create(opt_ipv4_only_, retries, timeout_ms);
1848  assert(resolver_);
1849 }
1850 
1851 
1852 void DownloadManager::SetDnsTtlLimits(
1853  const unsigned min_seconds,
1854  const unsigned max_seconds)
1855 {
1856  MutexLockGuard m(lock_options_);
1857  resolver_->set_min_ttl(min_seconds);
1858  resolver_->set_max_ttl(max_seconds);
1859 }
1860 
1861 
1862 void DownloadManager::SetIpPreference(dns::IpPreference preference) {
1863  MutexLockGuard m(lock_options_);
1864  opt_ip_preference_ = preference;
1865 }
1866 
1867 
1873 void DownloadManager::SetTimeout(const unsigned seconds_proxy,
1874  const unsigned seconds_direct)
1875 {
1876  MutexLockGuard m(lock_options_);
1877  opt_timeout_proxy_ = seconds_proxy;
1878  opt_timeout_direct_ = seconds_direct;
1879 }
1880 
1881 
1887 void DownloadManager::SetLowSpeedLimit(const unsigned low_speed_limit) {
1888  MutexLockGuard m(lock_options_);
1889  opt_low_speed_limit_ = low_speed_limit;
1890 }
1891 
1892 
1896 void DownloadManager::GetTimeout(unsigned *seconds_proxy,
1897  unsigned *seconds_direct)
1898 {
1899  MutexLockGuard m(lock_options_);
1900  *seconds_proxy = opt_timeout_proxy_;
1901  *seconds_direct = opt_timeout_direct_;
1902 }
1903 
1904 
1909 void DownloadManager::SetHostChain(const string &host_list) {
1910  SetHostChain(SplitString(host_list, ';'));
1911 }
1912 
1913 
1914 void DownloadManager::SetHostChain(const std::vector<std::string> &host_list) {
1915  MutexLockGuard m(lock_options_);
1916  opt_timestamp_backup_host_ = 0;
1917  delete opt_host_chain_;
1918  delete opt_host_chain_rtt_;
1919  opt_host_chain_current_ = 0;
1920 
1921  if (host_list.empty()) {
1922  opt_host_chain_ = NULL;
1923  opt_host_chain_rtt_ = NULL;
1924  return;
1925  }
1926 
1927  opt_host_chain_ = new vector<string>(host_list);
1928  opt_host_chain_rtt_ =
1929  new vector<int>(opt_host_chain_->size(), kProbeUnprobed);
1930  // LogCvmfs(kLogDownload, kLogSyslog, "using host %s",
1931  // (*opt_host_chain_)[0].c_str());
1932 }
1933 
1934 
1935 
1940 void DownloadManager::GetHostInfo(vector<string> *host_chain, vector<int> *rtt,
1941  unsigned *current_host)
1942 {
1943  MutexLockGuard m(lock_options_);
1944  if (opt_host_chain_) {
1945  if (current_host) {*current_host = opt_host_chain_current_;}
1946  if (host_chain) {*host_chain = *opt_host_chain_;}
1947  if (rtt) {*rtt = *opt_host_chain_rtt_;}
1948  }
1949 }
1950 
1951 
1960 void DownloadManager::SwitchProxy(JobInfo *info) {
1961  MutexLockGuard m(lock_options_);
1962 
1963  if (!opt_proxy_groups_) {
1964  return;
1965  }
1966 
1967  // Fail any matching proxies within the current load-balancing group
1968  vector<ProxyInfo> *group = current_proxy_group();
1969  const unsigned group_size = group->size();
1970  unsigned failed = 0;
1971  for (unsigned i = 0; i < group_size - opt_proxy_groups_current_burned_; ++i) {
1972  if (info && (info->proxy == (*group)[i].url)) {
1973  // Move to list of failed proxies
1974  opt_proxy_groups_current_burned_++;
1975  swap((*group)[i],
1976  (*group)[group_size - opt_proxy_groups_current_burned_]);
1977  perf::Inc(counters_->n_proxy_failover);
1978  failed++;
1979  }
1980  }
1981 
1982  // Do nothing more unless at least one proxy was marked as failed
1983  if (!failed)
1984  return;
1985 
1986  // If all proxies from the current load-balancing group are burned, switch to
1987  // another group
1988  if (opt_proxy_groups_current_burned_ == group->size()) {
1989  opt_proxy_groups_current_burned_ = 0;
1990  if (opt_proxy_groups_->size() > 1) {
1991  opt_proxy_groups_current_ = (opt_proxy_groups_current_ + 1) %
1992  opt_proxy_groups_->size();
1993  // Remeber the timestamp of switching to backup proxies
1994  if (opt_proxy_groups_reset_after_ > 0) {
1995  if (opt_proxy_groups_current_ > 0) {
1996  if (opt_timestamp_backup_proxies_ == 0)
1997  opt_timestamp_backup_proxies_ = time(NULL);
1998  // LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
1999  // "switched to (another) backup proxy group");
2000  } else {
2001  opt_timestamp_backup_proxies_ = 0;
2002  // LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2003  // "switched back to primary proxy group");
2004  }
2005  opt_timestamp_failover_proxies_ = 0;
2006  }
2007  }
2008  } else {
2009  // Record failover time
2010  if (opt_proxy_groups_reset_after_ > 0) {
2011  if (opt_timestamp_failover_proxies_ == 0)
2012  opt_timestamp_failover_proxies_ = time(NULL);
2013  }
2014  }
2015 
2016  UpdateProxiesUnlocked("failed proxy");
2017  LogCvmfs(kLogDownload, kLogDebug, "%d proxies remain in group",
2018  current_proxy_group()->size() - opt_proxy_groups_current_burned_);
2019 }
2020 
2021 
2027 void DownloadManager::SwitchHost(JobInfo *info) {
2028  MutexLockGuard m(lock_options_);
2029 
2030  if (!opt_host_chain_ || (opt_host_chain_->size() == 1)) {
2031  return;
2032  }
2033 
2034  if (info && (info->current_host_chain_index != opt_host_chain_current_)) {
2036  "don't switch host, "
2037  "last used host: %s, current host: %s",
2038  (*opt_host_chain_)[info->current_host_chain_index].c_str(),
2039  (*opt_host_chain_)[opt_host_chain_current_].c_str());
2040  return;
2041  }
2042 
2043  string reason = "manually triggered";
2044  if (info) {
2045  reason = download::Code2Ascii(info->error_code);
2046  }
2047 
2048  string old_host = (*opt_host_chain_)[opt_host_chain_current_];
2049  opt_host_chain_current_ =
2050  (opt_host_chain_current_ + 1) % opt_host_chain_->size();
2051  perf::Inc(counters_->n_host_failover);
2053  "switching host from %s to %s (%s)", old_host.c_str(),
2054  (*opt_host_chain_)[opt_host_chain_current_].c_str(),
2055  reason.c_str());
2056 
2057  // Remember the timestamp of switching to backup host
2058  if (opt_host_reset_after_ > 0) {
2059  if (opt_host_chain_current_ != 0) {
2060  if (opt_timestamp_backup_host_ == 0)
2061  opt_timestamp_backup_host_ = time(NULL);
2062  } else {
2063  opt_timestamp_backup_host_ = 0;
2064  }
2065  }
2066 }
2067 
2068 void DownloadManager::SwitchHost() {
2069  SwitchHost(NULL);
2070 }
2071 
2072 
2079 void DownloadManager::ProbeHosts() {
2080  vector<string> host_chain;
2081  vector<int> host_rtt;
2082  unsigned current_host;
2083 
2084  GetHostInfo(&host_chain, &host_rtt, &current_host);
2085 
2086  // Stopwatch, two times to fill caches first
2087  unsigned i, retries;
2088  string url;
2089  JobInfo info(&url, false, false, NULL);
2090  for (retries = 0; retries < 2; ++retries) {
2091  for (i = 0; i < host_chain.size(); ++i) {
2092  url = host_chain[i] + "/.cvmfspublished";
2093 
2094  struct timeval tv_start, tv_end;
2095  gettimeofday(&tv_start, NULL);
2096  Failures result = Fetch(&info);
2097  gettimeofday(&tv_end, NULL);
2098  if (info.destination_mem.data)
2099  free(info.destination_mem.data);
2100  if (result == kFailOk) {
2101  host_rtt[i] = static_cast<int>(
2102  DiffTimeSeconds(tv_start, tv_end) * 1000);
2103  LogCvmfs(kLogDownload, kLogDebug, "probing host %s had %dms rtt",
2104  url.c_str(), host_rtt[i]);
2105  } else {
2106  LogCvmfs(kLogDownload, kLogDebug, "error while probing host %s: %d %s",
2107  url.c_str(), result, Code2Ascii(result));
2108  host_rtt[i] = INT_MAX;
2109  }
2110  }
2111  }
2112 
2113  SortTeam(&host_rtt, &host_chain);
2114  for (i = 0; i < host_chain.size(); ++i) {
2115  if (host_rtt[i] == INT_MAX) host_rtt[i] = kProbeDown;
2116  }
2117 
2118  MutexLockGuard m(lock_options_);
2119  delete opt_host_chain_;
2120  delete opt_host_chain_rtt_;
2121  opt_host_chain_ = new vector<string>(host_chain);
2122  opt_host_chain_rtt_ = new vector<int>(host_rtt);
2123  opt_host_chain_current_ = 0;
2124 }
2125 
2126 bool DownloadManager::GeoSortServers(std::vector<std::string> *servers,
2127  std::vector<uint64_t> *output_order) {
2128  if (!servers) {return false;}
2129  if (servers->size() == 1) {
2130  if (output_order) {
2131  output_order->clear();
2132  output_order->push_back(0);
2133  }
2134  return true;
2135  }
2136 
2137  std::vector<std::string> host_chain;
2138  GetHostInfo(&host_chain, NULL, NULL);
2139 
2140  std::vector<std::string> server_dns_names;
2141  server_dns_names.reserve(servers->size());
2142  for (unsigned i = 0; i < servers->size(); ++i) {
2143  std::string host = dns::ExtractHost((*servers)[i]);
2144  server_dns_names.push_back(host.empty() ? (*servers)[i] : host);
2145  }
2146  std::string host_list = JoinStrings(server_dns_names, ",");
2147 
2148  vector<string> host_chain_shuffled;
2149  {
2150  // Protect against concurrent access to prng_
2151  MutexLockGuard m(lock_options_);
2152  // Determine random hosts for the Geo-API query
2153  host_chain_shuffled = Shuffle(host_chain, &prng_);
2154  }
2155  // Request ordered list via Geo-API
2156  bool success = false;
2157  unsigned max_attempts = std::min(host_chain_shuffled.size(), size_t(3));
2158  vector<uint64_t> geo_order(servers->size());
2159  for (unsigned i = 0; i < max_attempts; ++i) {
2160  string url = host_chain_shuffled[i] + "/api/v1.0/geo/@proxy@/" + host_list;
2162  "requesting ordered server list from %s", url.c_str());
2163  JobInfo info(&url, false, false, NULL);
2164  Failures result = Fetch(&info);
2165  if (result == kFailOk) {
2166  string order(info.destination_mem.data, info.destination_mem.size);
2167  free(info.destination_mem.data);
2168  bool retval = ValidateGeoReply(order, servers->size(), &geo_order);
2169  if (!retval) {
2171  "retrieved invalid GeoAPI reply from %s [%s]",
2172  url.c_str(), order.c_str());
2173  } else {
2175  "geographic order of servers retrieved from %s",
2176  dns::ExtractHost(host_chain_shuffled[i]).c_str());
2177  LogCvmfs(kLogDownload, kLogDebug, "order is %s", order.c_str());
2178  success = true;
2179  break;
2180  }
2181  } else {
2183  "GeoAPI request %s failed with error %d [%s]",
2184  url.c_str(), result, Code2Ascii(result));
2185  }
2186  }
2187  if (!success) {
2189  "failed to retrieve geographic order from stratum 1 servers");
2190  return false;
2191  }
2192 
2193  if (output_order) {
2194  output_order->swap(geo_order);
2195  } else {
2196  std::vector<std::string> sorted_servers;
2197  sorted_servers.reserve(geo_order.size());
2198  for (unsigned i = 0; i < geo_order.size(); ++i) {
2199  uint64_t orderval = geo_order[i];
2200  sorted_servers.push_back((*servers)[orderval]);
2201  }
2202  servers->swap(sorted_servers);
2203  }
2204  return true;
2205 }
2206 
2207 
2215 bool DownloadManager::ProbeGeo() {
2216  vector<string> host_chain;
2217  vector<int> host_rtt;
2218  unsigned current_host;
2219  vector< vector<ProxyInfo> > proxy_chain;
2220  unsigned fallback_group;
2221 
2222  GetHostInfo(&host_chain, &host_rtt, &current_host);
2223  GetProxyInfo(&proxy_chain, NULL, &fallback_group);
2224  if ((host_chain.size() < 2) && ((proxy_chain.size() - fallback_group) < 2))
2225  return true;
2226 
2227  vector<string> host_names;
2228  for (unsigned i = 0; i < host_chain.size(); ++i)
2229  host_names.push_back(dns::ExtractHost(host_chain[i]));
2230  SortTeam(&host_names, &host_chain);
2231  unsigned last_geo_host = host_names.size();
2232 
2233  if ((fallback_group == 0) && (last_geo_host > 1)) {
2234  // There are no non-fallback proxies, which means that the client
2235  // will always use the fallback proxies. Add a keyword separator
2236  // between the hosts and fallback proxies so the geosorting service
2237  // will know to sort the hosts based on the distance from the
2238  // closest fallback proxy rather than the distance from the client.
2239  host_names.push_back("+PXYSEP+");
2240  }
2241 
2242  // Add fallback proxy names to the end of the host list
2243  unsigned first_geo_fallback = host_names.size();
2244  for (unsigned i = fallback_group; i < proxy_chain.size(); ++i) {
2245  // We only take the first fallback proxy name from every group under the
2246  // assumption that load-balanced servers are at the same location
2247  host_names.push_back(proxy_chain[i][0].host.name());
2248  }
2249 
2250  std::vector<uint64_t> geo_order;
2251  bool success = GeoSortServers(&host_names, &geo_order);
2252  if (!success) {
2253  // GeoSortServers already logged a failure message.
2254  return false;
2255  }
2256 
2257  // Re-install host chain and proxy chain
2258  MutexLockGuard m(lock_options_);
2259  delete opt_host_chain_;
2260  opt_num_proxies_ = 0;
2261  opt_host_chain_ = new vector<string>(host_chain.size());
2262 
2263  // It's possible that opt_proxy_groups_fallback_ might have changed while
2264  // the lock wasn't held
2265  vector<vector<ProxyInfo> > *proxy_groups = new vector<vector<ProxyInfo> >(
2266  opt_proxy_groups_fallback_ + proxy_chain.size() - fallback_group);
2267  // First copy the non-fallback part of the current proxy chain
2268  for (unsigned i = 0; i < opt_proxy_groups_fallback_; ++i) {
2269  (*proxy_groups)[i] = (*opt_proxy_groups_)[i];
2270  opt_num_proxies_ += (*opt_proxy_groups_)[i].size();
2271  }
2272 
2273  // Copy the host chain and fallback proxies by geo order. Array indices
2274  // in geo_order that are smaller than last_geo_host refer to a stratum 1,
2275  // and those indices greater than or equal to first_geo_fallback refer to
2276  // a fallback proxy.
2277  unsigned hosti = 0;
2278  unsigned proxyi = opt_proxy_groups_fallback_;
2279  for (unsigned i = 0; i < geo_order.size(); ++i) {
2280  uint64_t orderval = geo_order[i];
2281  if (orderval < static_cast<uint64_t>(last_geo_host)) {
2282  // LogCvmfs(kLogCvmfs, kLogSyslog, "this is orderval %u at host index
2283  // %u", orderval, hosti);
2284  (*opt_host_chain_)[hosti++] = host_chain[orderval];
2285  } else if (orderval >= static_cast<uint64_t>(first_geo_fallback)) {
2286  // LogCvmfs(kLogCvmfs, kLogSyslog,
2287  // "this is orderval %u at proxy index %u, using proxy_chain index %u",
2288  // orderval, proxyi, fallback_group + orderval - first_geo_fallback);
2289  (*proxy_groups)[proxyi] =
2290  proxy_chain[fallback_group + orderval - first_geo_fallback];
2291  opt_num_proxies_ += (*proxy_groups)[proxyi].size();
2292  proxyi++;
2293  }
2294  }
2295 
2296  opt_proxy_map_.clear();
2297  delete opt_proxy_groups_;
2298  opt_proxy_groups_ = proxy_groups;
2299  // In pathological cases, opt_proxy_groups_current_ can be larger now when
2300  // proxies changed in-between.
2301  if (opt_proxy_groups_current_ > opt_proxy_groups_->size()) {
2302  if (opt_proxy_groups_->size() == 0) {
2303  opt_proxy_groups_current_ = 0;
2304  } else {
2305  opt_proxy_groups_current_ = opt_proxy_groups_->size() - 1;
2306  }
2307  opt_proxy_groups_current_burned_ = 0;
2308  }
2309 
2310  UpdateProxiesUnlocked("geosort");
2311 
2312  delete opt_host_chain_rtt_;
2313  opt_host_chain_rtt_ = new vector<int>(host_chain.size(), kProbeGeo);
2314  opt_host_chain_current_ = 0;
2315 
2316  return true;
2317 }
2318 
2319 
2327 bool DownloadManager::ValidateGeoReply(
2328  const string &reply_order,
2329  const unsigned expected_size,
2330  vector<uint64_t> *reply_vals)
2331 {
2332  if (reply_order.empty())
2333  return false;
2334  sanitizer::InputSanitizer sanitizer("09 , \n");
2335  if (!sanitizer.IsValid(reply_order))
2336  return false;
2337  sanitizer::InputSanitizer strip_newline("09 ,");
2338  vector<string> reply_strings =
2339  SplitString(strip_newline.Filter(reply_order), ',');
2340  vector<uint64_t> tmp_vals;
2341  for (unsigned i = 0; i < reply_strings.size(); ++i) {
2342  if (reply_strings[i].empty())
2343  return false;
2344  tmp_vals.push_back(String2Uint64(reply_strings[i]));
2345  }
2346  if (tmp_vals.size() != expected_size)
2347  return false;
2348 
2349  // Check if tmp_vals contains the number 1..n
2350  set<uint64_t> coverage(tmp_vals.begin(), tmp_vals.end());
2351  if (coverage.size() != tmp_vals.size())
2352  return false;
2353  if ((*coverage.begin() != 1) || (*coverage.rbegin() != coverage.size()))
2354  return false;
2355 
2356  for (unsigned i = 0; i < expected_size; ++i) {
2357  (*reply_vals)[i] = tmp_vals[i] - 1;
2358  }
2359  return true;
2360 }
2361 
2362 
2367 bool DownloadManager::StripDirect(
2368  const string &proxy_list,
2369  string *cleaned_list)
2370 {
2371  assert(cleaned_list);
2372  if (proxy_list == "") {
2373  *cleaned_list = "";
2374  return false;
2375  }
2376  bool result = false;
2377 
2378  vector<string> proxy_groups = SplitString(proxy_list, ';');
2379  vector<string> cleaned_groups;
2380  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2381  vector<string> group = SplitString(proxy_groups[i], '|');
2382  vector<string> cleaned;
2383  for (unsigned j = 0; j < group.size(); ++j) {
2384  if ((group[j] == "DIRECT") || (group[j] == "")) {
2385  result = true;
2386  } else {
2387  cleaned.push_back(group[j]);
2388  }
2389  }
2390  if (!cleaned.empty())
2391  cleaned_groups.push_back(JoinStrings(cleaned, "|"));
2392  }
2393 
2394  *cleaned_list = JoinStrings(cleaned_groups, ";");
2395  return result;
2396 }
2397 
2398 
2407 void DownloadManager::SetProxyChain(
2408  const string &proxy_list,
2409  const string &fallback_proxy_list,
2410  const ProxySetModes set_mode)
2411 {
2412  MutexLockGuard m(lock_options_);
2413 
2414  opt_timestamp_backup_proxies_ = 0;
2415  opt_timestamp_failover_proxies_ = 0;
2416  string set_proxy_list = opt_proxy_list_;
2417  string set_proxy_fallback_list = opt_proxy_fallback_list_;
2418  bool contains_direct;
2419  if ((set_mode == kSetProxyFallback) || (set_mode == kSetProxyBoth)) {
2420  opt_proxy_fallback_list_ = fallback_proxy_list;
2421  }
2422  if ((set_mode == kSetProxyRegular) || (set_mode == kSetProxyBoth)) {
2423  opt_proxy_list_ = proxy_list;
2424  }
2425  contains_direct =
2426  StripDirect(opt_proxy_fallback_list_, &set_proxy_fallback_list);
2427  if (contains_direct) {
2429  "fallback proxies do not support DIRECT, removing");
2430  }
2431  if (set_proxy_fallback_list == "") {
2432  set_proxy_list = opt_proxy_list_;
2433  } else {
2434  bool contains_direct = StripDirect(opt_proxy_list_, &set_proxy_list);
2435  if (contains_direct) {
2437  "skipping DIRECT proxy to use fallback proxy");
2438  }
2439  }
2440 
2441  // From this point on, use set_proxy_list and set_fallback_proxy_list as
2442  // effective proxy lists!
2443 
2444  opt_proxy_map_.clear();
2445  delete opt_proxy_groups_;
2446  if ((set_proxy_list == "") && (set_proxy_fallback_list == "")) {
2447  opt_proxy_groups_ = NULL;
2448  opt_proxy_groups_current_ = 0;
2449  opt_proxy_groups_current_burned_ = 0;
2450  opt_proxy_groups_fallback_ = 0;
2451  opt_num_proxies_ = 0;
2452  return;
2453  }
2454 
2455  // Determine number of regular proxy groups (== first fallback proxy group)
2456  opt_proxy_groups_fallback_ = 0;
2457  if (set_proxy_list != "") {
2458  opt_proxy_groups_fallback_ = SplitString(set_proxy_list, ';').size();
2459  }
2460  LogCvmfs(kLogDownload, kLogDebug, "first fallback proxy group %u",
2461  opt_proxy_groups_fallback_);
2462 
2463  // Concatenate regular proxies and fallback proxies, both of which can be
2464  // empty.
2465  string all_proxy_list = set_proxy_list;
2466  if (set_proxy_fallback_list != "") {
2467  if (all_proxy_list != "")
2468  all_proxy_list += ";";
2469  all_proxy_list += set_proxy_fallback_list;
2470  }
2471  LogCvmfs(kLogDownload, kLogDebug, "full proxy list %s",
2472  all_proxy_list.c_str());
2473 
2474  // Resolve server names in provided urls
2475  vector<string> hostnames; // All encountered hostnames
2476  vector<string> proxy_groups;
2477  if (all_proxy_list != "")
2478  proxy_groups = SplitString(all_proxy_list, ';');
2479  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2480  vector<string> this_group = SplitString(proxy_groups[i], '|');
2481  for (unsigned j = 0; j < this_group.size(); ++j) {
2482  this_group[j] = dns::AddDefaultScheme(this_group[j]);
2483  // Note: DIRECT strings will be "extracted" to an empty string.
2484  string hostname = dns::ExtractHost(this_group[j]);
2485  // Save the hostname. Leave empty (DIRECT) names so indexes will
2486  // match later.
2487  hostnames.push_back(hostname);
2488  }
2489  }
2490  vector<dns::Host> hosts;
2491  LogCvmfs(kLogDownload, kLogDebug, "resolving %u proxy addresses",
2492  hostnames.size());
2493  resolver_->ResolveMany(hostnames, &hosts);
2494 
2495  // Construct opt_proxy_groups_: traverse proxy list in same order and expand
2496  // names to resolved IP addresses.
2497  opt_proxy_groups_ = new vector< vector<ProxyInfo> >();
2498  opt_num_proxies_ = 0;
2499  unsigned num_proxy = 0; // Combined i, j counter
2500  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2501  vector<string> this_group = SplitString(proxy_groups[i], '|');
2502  // Construct ProxyInfo objects from proxy string and DNS resolver result for
2503  // every proxy in this_group. One URL can result in multiple ProxyInfo
2504  // objects, one for each IP address.
2505  vector<ProxyInfo> infos;
2506  for (unsigned j = 0; j < this_group.size(); ++j, ++num_proxy) {
2507  this_group[j] = dns::AddDefaultScheme(this_group[j]);
2508  if (this_group[j] == "DIRECT") {
2509  infos.push_back(ProxyInfo("DIRECT"));
2510  continue;
2511  }
2512 
2513  if (hosts[num_proxy].status() != dns::kFailOk) {
2515  "failed to resolve IP addresses for %s (%d - %s)",
2516  hosts[num_proxy].name().c_str(), hosts[num_proxy].status(),
2517  dns::Code2Ascii(hosts[num_proxy].status()));
2518  dns::Host failed_host =
2519  dns::Host::ExtendDeadline(hosts[num_proxy], resolver_->min_ttl());
2520  infos.push_back(ProxyInfo(failed_host, this_group[j]));
2521  continue;
2522  }
2523 
2524  // IPv4 addresses have precedence
2525  set<string> best_addresses =
2526  hosts[num_proxy].ViewBestAddresses(opt_ip_preference_);
2527  set<string>::const_iterator iter_ips = best_addresses.begin();
2528  for (; iter_ips != best_addresses.end(); ++iter_ips) {
2529  string url_ip = dns::RewriteUrl(this_group[j], *iter_ips);
2530  infos.push_back(ProxyInfo(hosts[num_proxy], url_ip));
2531  }
2532  }
2533  opt_proxy_groups_->push_back(infos);
2534  opt_num_proxies_ += infos.size();
2535  }
2537  "installed %u proxies in %u load-balance groups",
2538  opt_num_proxies_, opt_proxy_groups_->size());
2539  opt_proxy_groups_current_ = 0;
2540  opt_proxy_groups_current_burned_ = 0;
2541 
2542  // Select random start proxy from the first group.
2543  if (opt_proxy_groups_->size() > 0) {
2544  // Select random start proxy from the first group.
2545  UpdateProxiesUnlocked("set proxies");
2546  }
2547 }
2548 
2549 
2556 void DownloadManager::GetProxyInfo(vector< vector<ProxyInfo> > *proxy_chain,
2557  unsigned *current_group,
2558  unsigned *fallback_group)
2559 {
2560  assert(proxy_chain != NULL);
2561  MutexLockGuard m(lock_options_);
2562 
2563 
2564  if (!opt_proxy_groups_) {
2565  vector< vector<ProxyInfo> > empty_chain;
2566  *proxy_chain = empty_chain;
2567  if (current_group != NULL)
2568  *current_group = 0;
2569  if (fallback_group != NULL)
2570  *fallback_group = 0;
2571  return;
2572  }
2573 
2574  *proxy_chain = *opt_proxy_groups_;
2575  if (current_group != NULL)
2576  *current_group = opt_proxy_groups_current_;
2577  if (fallback_group != NULL)
2578  *fallback_group = opt_proxy_groups_fallback_;
2579 }
2580 
2581 string DownloadManager::GetProxyList() {
2582  return opt_proxy_list_;
2583 }
2584 
2585 string DownloadManager::GetFallbackProxyList() {
2586  return opt_proxy_fallback_list_;
2587 }
2588 
2593 DownloadManager::ChooseProxyUnlocked(const shash::Any *hash) {
2594  if (!opt_proxy_groups_)
2595  return NULL;
2596 
2597  uint32_t key = (hash ? hash->Partial32() : 0);
2598  map<uint32_t, ProxyInfo *>::iterator it = opt_proxy_map_.lower_bound(key);
2599  ProxyInfo *proxy = it->second;
2600 
2601  return proxy;
2602 }
2603 
2607 void DownloadManager::UpdateProxiesUnlocked(const string &reason) {
2608  if (!opt_proxy_groups_)
2609  return;
2610 
2611  // Identify number of non-burned proxies within the current group
2612  vector<ProxyInfo> *group = current_proxy_group();
2613  unsigned num_alive = (group->size() - opt_proxy_groups_current_burned_);
2614  string old_proxy = JoinStrings(opt_proxy_urls_, "|");
2615 
2616  // Rebuild proxy map and URL list
2617  opt_proxy_map_.clear();
2618  opt_proxy_urls_.clear();
2619  const uint32_t max_key = 0xffffffffUL;
2620  if (opt_proxy_shard_) {
2621  // Build a consistent map with multiple entries for each proxy
2622  for (unsigned i = 0; i < num_alive; ++i) {
2623  ProxyInfo *proxy = &(*group)[i];
2624  shash::Any proxy_hash(shash::kSha1);
2625  HashString(proxy->url, &proxy_hash);
2626  Prng prng;
2627  prng.InitSeed(proxy_hash.Partial32());
2628  for (unsigned j = 0; j < kProxyMapScale; ++j) {
2629  const std::pair<uint32_t, ProxyInfo *> entry(prng.Next(max_key), proxy);
2630  opt_proxy_map_.insert(entry);
2631  }
2632  opt_proxy_urls_.push_back(proxy->url);
2633  }
2634  // Ensure lower_bound() finds a value for all keys
2635  ProxyInfo *first_proxy = opt_proxy_map_.begin()->second;
2636  const std::pair<uint32_t, ProxyInfo *> last_entry(max_key, first_proxy);
2637  opt_proxy_map_.insert(last_entry);
2638  } else {
2639  // Build a map with a single entry for one randomly selected proxy
2640  unsigned select = prng_.Next(num_alive);
2641  ProxyInfo *proxy = &(*group)[select];
2642  const std::pair<uint32_t, ProxyInfo *> entry(max_key, proxy);
2643  opt_proxy_map_.insert(entry);
2644  opt_proxy_urls_.push_back(proxy->url);
2645  }
2646  sort(opt_proxy_urls_.begin(), opt_proxy_urls_.end());
2647 
2648  // Report any change in proxy usage
2649  string new_proxy = JoinStrings(opt_proxy_urls_, "|");
2650  if (new_proxy != old_proxy) {
2652  "switching proxy from %s to %s (%s)",
2653  (old_proxy.empty() ? "(none)" : old_proxy.c_str()),
2654  (new_proxy.empty() ? "(none)" : new_proxy.c_str()),
2655  reason.c_str());
2656  }
2657 }
2658 
2662 void DownloadManager::ShardProxies() {
2663  opt_proxy_shard_ = true;
2664  RebalanceProxiesUnlocked("enable sharding");
2665 }
2666 
2671 void DownloadManager::RebalanceProxiesUnlocked(const string &reason) {
2672  if (!opt_proxy_groups_)
2673  return;
2674 
2675  opt_timestamp_failover_proxies_ = 0;
2676  opt_proxy_groups_current_burned_ = 0;
2677  UpdateProxiesUnlocked(reason);
2678 }
2679 
2680 
2681 void DownloadManager::RebalanceProxies() {
2682  MutexLockGuard m(lock_options_);
2683  RebalanceProxiesUnlocked("rebalance");
2684 }
2685 
2686 
2690 void DownloadManager::SwitchProxyGroup() {
2691  MutexLockGuard m(lock_options_);
2692 
2693  if (!opt_proxy_groups_ || (opt_proxy_groups_->size() < 2)) {
2694  return;
2695  }
2696 
2697  opt_proxy_groups_current_ = (opt_proxy_groups_current_ + 1) %
2698  opt_proxy_groups_->size();
2699  opt_timestamp_backup_proxies_ = time(NULL);
2700  RebalanceProxiesUnlocked("switch proxy group");
2701 }
2702 
2703 
2704 void DownloadManager::SetProxyGroupResetDelay(const unsigned seconds) {
2705  MutexLockGuard m(lock_options_);
2706  opt_proxy_groups_reset_after_ = seconds;
2707  if (opt_proxy_groups_reset_after_ == 0) {
2708  opt_timestamp_backup_proxies_ = 0;
2709  opt_timestamp_failover_proxies_ = 0;
2710  }
2711 }
2712 
2713 
2714 void DownloadManager::SetHostResetDelay(const unsigned seconds)
2715 {
2716  MutexLockGuard m(lock_options_);
2717  opt_host_reset_after_ = seconds;
2718  if (opt_host_reset_after_ == 0)
2719  opt_timestamp_backup_host_ = 0;
2720 }
2721 
2722 
2723 void DownloadManager::SetRetryParameters(const unsigned max_retries,
2724  const unsigned backoff_init_ms,
2725  const unsigned backoff_max_ms)
2726 {
2727  MutexLockGuard m(lock_options_);
2728  opt_max_retries_ = max_retries;
2729  opt_backoff_init_ms_ = backoff_init_ms;
2730  opt_backoff_max_ms_ = backoff_max_ms;
2731 }
2732 
2733 
2734 void DownloadManager::SetMaxIpaddrPerProxy(unsigned limit) {
2735  MutexLockGuard m(lock_options_);
2736  resolver_->set_throttle(limit);
2737 }
2738 
2739 
2740 void DownloadManager::SetProxyTemplates(
2741  const std::string &direct,
2742  const std::string &forced)
2743 {
2744  MutexLockGuard m(lock_options_);
2745  proxy_template_direct_ = direct;
2746  proxy_template_forced_ = forced;
2747 }
2748 
2749 
2750 void DownloadManager::EnableInfoHeader() {
2751  enable_info_header_ = true;
2752 }
2753 
2754 
2755 void DownloadManager::EnableRedirects() {
2756  follow_redirects_ = true;
2757 }
2758 
2759 void DownloadManager::UseSystemCertificatePath() {
2760  ssl_certificate_store_.UseSystemCertificatePath();
2761 }
2762 
2767 DownloadManager *DownloadManager::Clone(
2768  const perf::StatisticsTemplate &statistics)
2769 {
2770  DownloadManager *clone = new DownloadManager();
2771  clone->Init(pool_max_handles_, statistics);
2772  if (resolver_) {
2773  clone->SetDnsParameters(resolver_->retries(), resolver_->timeout_ms());
2774  clone->SetDnsTtlLimits(resolver_->min_ttl(), resolver_->max_ttl());
2775  clone->SetMaxIpaddrPerProxy(resolver_->throttle());
2776  }
2777  if (!opt_dns_server_.empty())
2778  clone->SetDnsServer(opt_dns_server_);
2779  clone->opt_timeout_proxy_ = opt_timeout_proxy_;
2780  clone->opt_timeout_direct_ = opt_timeout_direct_;
2781  clone->opt_low_speed_limit_ = opt_low_speed_limit_;
2782  clone->opt_max_retries_ = opt_max_retries_;
2783  clone->opt_backoff_init_ms_ = opt_backoff_init_ms_;
2784  clone->opt_backoff_max_ms_ = opt_backoff_max_ms_;
2785  clone->enable_info_header_ = enable_info_header_;
2786  clone->follow_redirects_ = follow_redirects_;
2787  if (opt_host_chain_) {
2788  clone->opt_host_chain_ = new vector<string>(*opt_host_chain_);
2789  clone->opt_host_chain_rtt_ = new vector<int>(*opt_host_chain_rtt_);
2790  }
2791  CloneProxyConfig(clone);
2792  clone->opt_ip_preference_ = opt_ip_preference_;
2793  clone->proxy_template_direct_ = proxy_template_direct_;
2794  clone->proxy_template_forced_ = proxy_template_forced_;
2795  clone->opt_proxy_groups_reset_after_ = opt_proxy_groups_reset_after_;
2796  clone->opt_host_reset_after_ = opt_host_reset_after_;
2797  clone->credentials_attachment_ = credentials_attachment_;
2798  clone->ssl_certificate_store_ = ssl_certificate_store_;
2799 
2800  return clone;
2801 }
2802 
2803 
2804 void DownloadManager::CloneProxyConfig(DownloadManager *clone) {
2805  clone->opt_proxy_groups_current_ = opt_proxy_groups_current_;
2806  clone->opt_proxy_groups_current_burned_ = opt_proxy_groups_current_burned_;
2807  clone->opt_proxy_groups_fallback_ = opt_proxy_groups_fallback_;
2808  clone->opt_num_proxies_ = opt_num_proxies_;
2809  clone->opt_proxy_shard_ = opt_proxy_shard_;
2810  clone->opt_proxy_list_ = opt_proxy_list_;
2811  clone->opt_proxy_fallback_list_ = opt_proxy_fallback_list_;
2812  if (opt_proxy_groups_ == NULL)
2813  return;
2814 
2815  clone->opt_proxy_groups_ = new vector< vector<ProxyInfo> >(
2816  *opt_proxy_groups_);
2817  clone->UpdateProxiesUnlocked("cloned");
2818 }
2819 
2820 } // namespace download
unsigned opt_timeout_direct_
Definition: download.h:519
unsigned opt_low_speed_limit_
Definition: download.h:520
Destination destination
Definition: download.h:162
#define LogCvmfs(source, mask,...)
Definition: logging.h:20
Definition: prng.h:25
std::vector< T > Shuffle(const std::vector< T > &input, Prng *prng)
Definition: algorithm.h:33
const char * Code2Ascii(const ObjectFetcherFailures::Failures error)
unsigned opt_backoff_init_ms_
Definition: download.h:522
static unsigned EscapeHeader(const string &header, char *escaped_buf, size_t buf_size)
Definition: download.cc:121
unsigned char num_used_hosts
Definition: download.h:292
static bool EscapeUrlChar(char input, char output[3])
Definition: download.cc:71
int64_t Xadd(class Counter *counter, const int64_t delta)
Definition: statistics.h:51
const char * Code2Ascii(const Failures error)
Definition: dns.h:53
int64_t id() const
Definition: dns.h:108
unsigned opt_proxy_groups_current_burned_
Definition: download.h:547
double DiffTimeSeconds(struct timeval start, struct timeval end)
Definition: algorithm.cc:31
unsigned opt_proxy_groups_reset_after_
Definition: download.h:609
void SetUrlOptions(JobInfo *info)
Definition: download.cc:903
StreamStates DecompressZStream2Sink(const void *buf, const int64_t size, z_stream *strm, cvmfs::Sink *sink)
Definition: compression.cc:234
unsigned backoff_ms
Definition: download.h:294
std::string opt_proxy_fallback_list_
Definition: download.h:564
static NormalResolver * Create(const bool ipv4_only, const unsigned retries, const unsigned timeout_ms)
Definition: dns.cc:1259
vector< string > SplitString(const string &str, const char delim, const unsigned max_chunks)
Definition: string.cc:288
unsigned opt_host_reset_after_
Definition: download.h:617
std::string proxy_template_direct_
Definition: download.h:593
#define PANIC(...)
Definition: exception.h:26
string ReplaceAll(const string &haystack, const string &needle, const string &replace_by)
Definition: string.cc:454
string JoinStrings(const vector< string > &strings, const string &joint)
Definition: string.cc:318
void Init(ContextPtr context)
Definition: hash.cc:164
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:248
unsigned opt_proxy_groups_current_
Definition: download.h:542
off_t range_offset
Definition: download.h:175
shash::ContextPtr hash_context
Definition: download.h:285
void DecompressInit(z_stream *strm)
Definition: compression.cc:185
unsigned int current_host_chain_index
Definition: download.h:295
bool DecompressMem2Mem(const void *buf, const int64_t size, void **out_buf, uint64_t *out_size)
Definition: compression.cc:797
std::set< CURL * > * pool_handles_inuse_
Definition: download.h:498
void * cred_data
Definition: download.h:161
StreamStates DecompressZStream2File(const void *buf, const int64_t size, z_stream *strm, FILE *f)
Definition: compression.cc:275
std::string opt_proxy_list_
Definition: download.h:560
const std::string & name() const
Definition: dns.h:118
perf::Counter * sz_transfer_time
Definition: download.h:126
std::vector< std::vector< ProxyInfo > > * opt_proxy_groups_
Definition: download.h:538
assert((mem||(size==0))&&"Out Of Memory")
unsigned opt_proxy_groups_fallback_
Definition: download.h:552
void ReleaseCurlHandle(CURL *handle)
Definition: download.cc:817
StreamStates
Definition: compression.h:36
Algorithms algorithm
Definition: hash.h:124
z_stream zstream
Definition: download.h:284
const std::set< std::string > & ViewBestAddresses(IpPreference preference) const
Definition: dns.cc:205
void SetDnsServer(const std::string &address)
Definition: download.cc:1816
void DecompressFini(z_stream *strm)
Definition: compression.cc:201
void InitSeed(const uint64_t seed)
Definition: prng.h:32
void MakePipe(int pipe_fd[2])
Definition: posix.cc:525
char algorithm
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:180
bool IsValid(const std::string &input) const
Definition: sanitizer.cc:114
Algorithms
Definition: hash.h:40
void SetDnsTtlLimits(const unsigned min_seconds, const unsigned max_seconds)
Definition: download.cc:1852
static Failures PrepareDownloadDestination(JobInfo *info)
Definition: download.cc:151
void Init(const unsigned max_pool_handles, const perf::StatisticsTemplate &statistics)
Definition: download.cc:1600
const char * Code2Ascii(const Failures error)
Definition: download.h:88
void Update(const unsigned char *buffer, const unsigned buffer_length, ContextPtr context)
Definition: hash.cc:190
unsigned char num_retries
Definition: download.h:293
void Final(ContextPtr context, Any *any_digest)
Definition: hash.cc:221
std::string AddDefaultScheme(const std::string &proxy)
Definition: dns.cc:187
static string EscapeUrl(const string &url)
Definition: download.cc:98
dns::IpPreference opt_ip_preference_
Definition: download.h:586
Algorithms algorithm
Definition: hash.h:499
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:285
Definition: dns.h:90
Failures Fetch(const std::string &base_url, const std::string &repository_name, const uint64_t minimum_timestamp, const shash::Any *base_catalog, signature::SignatureManager *signature_manager, download::DownloadManager *download_manager, ManifestEnsemble *ensemble)
void UpdateProxiesUnlocked(const std::string &reason)
Definition: download.cc:2607
bool IsEquivalent(const Host &other) const
Definition: dns.cc:269
bool IsProxyTransferError(const Failures error)
Definition: download.h:76
bool IsExpired() const
Definition: dns.cc:280
FILE * destination_file
Definition: download.h:168
bool follow_redirects
Definition: download.h:156
unsigned GetContextSize(const Algorithms algorithm)
Definition: hash.cc:148
virtual int Reset()=0
perf::Counter * n_requests
Definition: download.h:127
static int Init(const loader::LoaderExports *loader_exports)
Definition: cvmfs.cc:1755
string StringifyInt(const int64_t value)
Definition: string.cc:78
void SetMaxIpaddrPerProxy(unsigned limit)
Definition: download.cc:2734
const shash::Any * expected_hash
Definition: download.h:171
void Inc(class Counter *counter)
Definition: statistics.h:50
void * buffer
Definition: hash.h:500
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:265
const std::string * extra_info
Definition: download.h:172
std::string ExtractHost(const std::string &url)
Definition: dns.cc:110
std::vector< int > * opt_host_chain_rtt_
Definition: download.h:534
cvmfs::Sink * destination_sink
Definition: download.h:170
SslCertificateStore ssl_certificate_store_
Definition: download.h:630
uint32_t Partial32() const
Definition: hash.h:393
IpPreference
Definition: dns.h:46
unsigned opt_backoff_max_ms_
Definition: download.h:523
CURL * curl_handle
Definition: download.h:281
CredentialsAttachment * credentials_attachment_
Definition: download.h:619
std::vector< std::string > * opt_host_chain_
Definition: download.h:529
struct pollfd * watch_fds_
Definition: download.h:510
uint64_t String2Uint64(const string &value)
Definition: string.cc:228
Failures error_code
Definition: download.h:289
static void Fini()
Definition: cvmfs.cc:1961
static void Spawn()
Definition: cvmfs.cc:1876
struct download::JobInfo::@3 destination_mem
Definition: mutex.h:42
std::string Filter(const std::string &input) const
Definition: sanitizer.cc:107
std::string proxy_template_forced_
Definition: download.h:599
const std::string * destination_path
Definition: download.h:169
void SetDnsParameters(const unsigned retries, const unsigned timeout_ms)
Definition: download.cc:1834
static Host ExtendDeadline(const Host &original, unsigned seconds_from_now)
Definition: dns.cc:231
void SafeSleepMs(const unsigned ms)
Definition: posix.cc:1918
bool IsHostTransferError(const Failures error)
Definition: download.h:64
void SortTeam(std::vector< T > *tractor, std::vector< U > *towed)
Definition: algorithm.h:51
curl_slist * headers
Definition: download.h:282
unsigned size
Definition: hash.h:501
unsigned char num_used_proxies
Definition: download.h:291
Failures status() const
Definition: dns.h:119
std::string proxy
Definition: download.h:287
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:534
static void size_t size
Definition: smalloc.h:47
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
Definition: download.cc:1226
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:546
const std::string * url
Definition: download.h:152
void HashString(const std::string &content, Any *any_digest)
Definition: hash.cc:268
void InitializeRequest(JobInfo *info, CURL *handle)
Definition: download.cc:835
string RewriteUrl(const string &url, const string &ip)
Definition: dns.cc:156
uint32_t Next(const uint64_t boundary)
Definition: prng.h:46
virtual int64_t Write(const void *buf, uint64_t sz)=0
char * info_header
Definition: download.h:283