CernVM-FS  2.10.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
download.cc
Go to the documentation of this file.
1 
27 // TODO(jblomer): MS for time summing
28 // NOLINTNEXTLINE
29 #define __STDC_FORMAT_MACROS
30 
31 #include "cvmfs_config.h"
32 #include "download.h"
33 
34 #include <alloca.h>
35 #include <errno.h>
36 #include <inttypes.h>
37 #include <poll.h>
38 #include <pthread.h>
39 #include <signal.h>
40 #include <stdint.h>
41 #include <sys/time.h>
42 #include <unistd.h>
43 
44 #include <algorithm>
45 #include <cassert>
46 #include <cstdio>
47 #include <cstdlib>
48 #include <cstring>
49 #include <map>
50 #include <set>
51 #include <utility>
52 
53 #include "atomic.h"
54 #include "compression.h"
55 #include "duplex_curl.h"
56 #include "hash.h"
57 #include "logging.h"
58 #include "prng.h"
59 #include "sanitizer.h"
60 #include "smalloc.h"
61 #include "ssl.h"
62 #include "util/algorithm.h"
63 #include "util/exception.h"
64 #include "util/posix.h"
65 #include "util/string.h"
66 #include "util_concurrency.h"
67 
68 using namespace std; // NOLINT
69 
70 namespace download {
71 
72 static inline bool EscapeUrlChar(char input, char output[3]) {
73  if (((input >= '0') && (input <= '9')) ||
74  ((input >= 'A') && (input <= 'Z')) ||
75  ((input >= 'a') && (input <= 'z')) ||
76  (input == '/') || (input == ':') || (input == '.') ||
77  (input == '@') ||
78  (input == '+') || (input == '-') ||
79  (input == '_') || (input == '~') ||
80  (input == '[') || (input == ']') || (input == ','))
81  {
82  output[0] = input;
83  return false;
84  }
85 
86  output[0] = '%';
87  output[1] = static_cast<char>(
88  (input / 16) + ((input / 16 <= 9) ? '0' : 'A'-10));
89  output[2] = static_cast<char>(
90  (input % 16) + ((input % 16 <= 9) ? '0' : 'A'-10));
91  return true;
92 }
93 
94 
99 static string EscapeUrl(const string &url) {
100  string escaped;
101  escaped.reserve(url.length());
102 
103  char escaped_char[3];
104  for (unsigned i = 0, s = url.length(); i < s; ++i) {
105  if (EscapeUrlChar(url[i], escaped_char)) {
106  escaped.append(escaped_char, 3);
107  } else {
108  escaped.push_back(escaped_char[0]);
109  }
110  }
111  LogCvmfs(kLogDownload, kLogDebug, "escaped %s to %s",
112  url.c_str(), escaped.c_str());
113 
114  return escaped;
115 }
116 
117 
122 static unsigned EscapeHeader(const string &header,
123  char *escaped_buf,
124  size_t buf_size)
125 {
126  unsigned esc_pos = 0;
127  char escaped_char[3];
128  for (unsigned i = 0, s = header.size(); i < s; ++i) {
129  if (EscapeUrlChar(header[i], escaped_char)) {
130  for (unsigned j = 0; j < 3; ++j) {
131  if (escaped_buf) {
132  if (esc_pos >= buf_size)
133  return esc_pos;
134  escaped_buf[esc_pos] = escaped_char[j];
135  }
136  esc_pos++;
137  }
138  } else {
139  if (escaped_buf) {
140  if (esc_pos >= buf_size)
141  return esc_pos;
142  escaped_buf[esc_pos] = escaped_char[0];
143  }
144  esc_pos++;
145  }
146  }
147 
148  return esc_pos;
149 }
150 
151 
153  info->destination_mem.size = 0;
154  info->destination_mem.pos = 0;
155  info->destination_mem.data = NULL;
156 
157  if (info->destination == kDestinationFile)
158  assert(info->destination_file != NULL);
159 
160  if (info->destination == kDestinationPath) {
161  assert(info->destination_path != NULL);
162  info->destination_file = fopen(info->destination_path->c_str(), "w");
163  if (info->destination_file == NULL) {
164  LogCvmfs(kLogDownload, kLogDebug, "Failed to open path %s: %s"
165  " (errno=%d).",
166  info->destination_path->c_str(), strerror(errno), errno);
167  return kFailLocalIO;
168  }
169  }
170 
171  if (info->destination == kDestinationSink)
172  assert(info->destination_sink != NULL);
173 
174  return kFailOk;
175 }
176 
177 
181 static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb,
182  void *info_link)
183 {
184  const size_t num_bytes = size*nmemb;
185  const string header_line(static_cast<const char *>(ptr), num_bytes);
186  JobInfo *info = static_cast<JobInfo *>(info_link);
187 
188  // LogCvmfs(kLogDownload, kLogDebug, "REMOVE-ME: Header callback with %s",
189  // header_line.c_str());
190 
191  // Check http status codes
192  if (HasPrefix(header_line, "HTTP/1.", false)) {
193  if (header_line.length() < 10)
194  return 0;
195 
196  unsigned i;
197  for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {}
198 
199  // Code is initialized to -1
200  if (header_line.length() > i+2) {
201  info->http_code = DownloadManager::ParseHttpCode(&header_line[i]);
202  }
203 
204  if ((info->http_code / 100) == 2) {
205  return num_bytes;
206  } else if ((info->http_code == 301) ||
207  (info->http_code == 302) ||
208  (info->http_code == 303) ||
209  (info->http_code == 307))
210  {
211  if (!info->follow_redirects) {
212  LogCvmfs(kLogDownload, kLogDebug, "redirect support not enabled: %s",
213  header_line.c_str());
214  info->error_code = kFailHostHttp;
215  return 0;
216  }
217  LogCvmfs(kLogDownload, kLogDebug, "http redirect: %s",
218  header_line.c_str());
219  // libcurl will handle this because of CURLOPT_FOLLOWLOCATION
220  return num_bytes;
221  } else {
222  LogCvmfs(kLogDownload, kLogDebug, "http status error code: %s [%d]",
223  header_line.c_str(), info->http_code);
224  if (((info->http_code / 100) == 5) ||
225  (info->http_code == 400) || (info->http_code == 404))
226  {
227  // 5XX returned by host
228  // 400: error from the GeoAPI module
229  // 404: the stratum 1 does not have the newest files
230  info->error_code = kFailHostHttp;
231  } else if (info->http_code == 429) {
232  // 429: rate throttling (we ignore the backoff hint for the time being)
234  } else {
235  info->error_code = (info->proxy == "DIRECT") ? kFailHostHttp :
237  }
238  return 0;
239  }
240  }
241 
242  // Allocate memory for kDestinationMemory
243  if ((info->destination == kDestinationMem) &&
244  HasPrefix(header_line, "CONTENT-LENGTH:", true))
245  {
246  char *tmp = reinterpret_cast<char *>(alloca(num_bytes+1));
247  uint64_t length = 0;
248  sscanf(header_line.c_str(), "%s %" PRIu64, tmp, &length);
249  if (length > 0) {
250  if (length > DownloadManager::kMaxMemSize) {
252  "resource %s too large to store in memory (%" PRIu64 ")",
253  info->url->c_str(), length);
254  info->error_code = kFailTooBig;
255  return 0;
256  }
257  info->destination_mem.data = static_cast<char *>(smalloc(length));
258  } else {
259  // Empty resource
260  info->destination_mem.data = NULL;
261  }
262  info->destination_mem.size = length;
263  } else if (HasPrefix(header_line, "LOCATION:", true)) {
264  // This comes along with redirects
265  LogCvmfs(kLogDownload, kLogDebug, "%s", header_line.c_str());
266  } else if (HasPrefix(header_line, "X-SQUID-ERROR:", true)) {
267  // Reinterpret host error as proxy error
268  if (info->error_code == kFailHostHttp) {
269  info->error_code = kFailProxyHttp;
270  }
271  } else if (HasPrefix(header_line, "PROXY-STATUS:", true)) {
272  // Reinterpret host error as proxy error if applicable
273  if ((info->error_code == kFailHostHttp) &&
274  (header_line.find("error=") != string::npos)) {
275  info->error_code = kFailProxyHttp;
276  }
277  }
278 
279  return num_bytes;
280 }
281 
282 
286 static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb,
287  void *info_link)
288 {
289  const size_t num_bytes = size*nmemb;
290  JobInfo *info = static_cast<JobInfo *>(info_link);
291 
292  // LogCvmfs(kLogDownload, kLogDebug, "Data callback, %d bytes", num_bytes);
293 
294  if (num_bytes == 0)
295  return 0;
296 
297  if (info->expected_hash) {
298  shash::Update(reinterpret_cast<unsigned char *>(ptr),
299  num_bytes, info->hash_context);
300  }
301 
302  if (info->destination == kDestinationSink) {
303  if (info->compressed) {
304  zlib::StreamStates retval =
305  zlib::DecompressZStream2Sink(ptr, static_cast<int64_t>(num_bytes),
306  &info->zstream, info->destination_sink);
307  if (retval == zlib::kStreamDataError) {
308  LogCvmfs(kLogDownload, kLogSyslogErr, "failed to decompress %s",
309  info->url->c_str());
310  info->error_code = kFailBadData;
311  return 0;
312  } else if (retval == zlib::kStreamIOError) {
314  "decompressing %s, local IO error", info->url->c_str());
315  info->error_code = kFailLocalIO;
316  return 0;
317  }
318  } else {
319  int64_t written = info->destination_sink->Write(ptr, num_bytes);
320  if ((written < 0) || (static_cast<uint64_t>(written) != num_bytes)) {
321  LogCvmfs(kLogDownload, kLogDebug, "Failed to perform write on %s (%"
322  PRId64 ")", info->url->c_str(), written);
323  info->error_code = kFailLocalIO;
324  return 0;
325  }
326  }
327  } else if (info->destination == kDestinationMem) {
328  // Write to memory
329  if (info->destination_mem.pos + num_bytes > info->destination_mem.size) {
330  if (info->destination_mem.size == 0) {
332  "Content-Length was missing or zero, but %zu bytes received",
333  info->destination_mem.pos + num_bytes);
334  } else {
335  LogCvmfs(kLogDownload, kLogDebug, "Callback had too much data: "
336  "start %zu, bytes %zu, expected %zu",
337  info->destination_mem.pos,
338  num_bytes,
339  info->destination_mem.size);
340  }
341  info->error_code = kFailBadData;
342  return 0;
343  }
344  memcpy(info->destination_mem.data + info->destination_mem.pos,
345  ptr, num_bytes);
346  info->destination_mem.pos += num_bytes;
347  } else {
348  // Write to file
349  if (info->compressed) {
350  // LogCvmfs(kLogDownload, kLogDebug, "REMOVE-ME: writing %d bytes for %s",
351  // num_bytes, info->url->c_str());
352  zlib::StreamStates retval =
353  zlib::DecompressZStream2File(ptr, static_cast<int64_t>(num_bytes),
354  &info->zstream, info->destination_file);
355  if (retval == zlib::kStreamDataError) {
356  LogCvmfs(kLogDownload, kLogSyslogErr, "failed to decompress %s",
357  info->url->c_str());
358  info->error_code = kFailBadData;
359  return 0;
360  } else if (retval == zlib::kStreamIOError) {
362  "decompressing %s, local IO error", info->url->c_str());
363  info->error_code = kFailLocalIO;
364  return 0;
365  }
366  } else {
367  if (fwrite(ptr, 1, num_bytes, info->destination_file) != num_bytes) {
369  "downloading %s, IO failure: %s (errno=%d)",
370  info->url->c_str(), strerror(errno), errno);
371  info->error_code = kFailLocalIO;
372  return 0;
373  }
374  }
375  }
376 
377  return num_bytes;
378 }
379 
380 
381 //------------------------------------------------------------------------------
382 
383 
384 bool JobInfo::IsFileNotFound() {
385  if (HasPrefix(*url, "file://", true /* ignore_case */))
386  return error_code == kFailHostConnection;
387 
388  return http_code == 404;
389 }
390 
391 
392 //------------------------------------------------------------------------------
393 
394 
395 const int DownloadManager::kProbeUnprobed = -1;
396 const int DownloadManager::kProbeDown = -2;
397 const int DownloadManager::kProbeGeo = -3;
398 const unsigned DownloadManager::kMaxMemSize = 1024*1024;
399 
400 
404 int DownloadManager::ParseHttpCode(const char digits[3]) {
405  int result = 0;
406  int factor = 100;
407  for (int i = 0; i < 3; ++i) {
408  if ((digits[i] < '0') || (digits[i] > '9'))
409  return -1;
410  result += (digits[i] - '0') * factor;
411  factor /= 10;
412  }
413  return result;
414 }
415 
416 
420 int DownloadManager::CallbackCurlSocket(CURL * /* easy */,
421  curl_socket_t s,
422  int action,
423  void *userp,
424  void * /* socketp */)
425 {
426  // LogCvmfs(kLogDownload, kLogDebug, "CallbackCurlSocket called with easy "
427  // "handle %p, socket %d, action %d", easy, s, action);
428  DownloadManager *download_mgr = static_cast<DownloadManager *>(userp);
429  if (action == CURL_POLL_NONE)
430  return 0;
431 
432  // Find s in watch_fds_
433  unsigned index;
434  for (index = 0; index < download_mgr->watch_fds_inuse_; ++index) {
435  if (download_mgr->watch_fds_[index].fd == s)
436  break;
437  }
438  // Or create newly
439  if (index == download_mgr->watch_fds_inuse_) {
440  // Extend array if necessary
441  if (download_mgr->watch_fds_inuse_ == download_mgr->watch_fds_size_)
442  {
443  assert(download_mgr->watch_fds_size_ > 0);
444  download_mgr->watch_fds_size_ *= 2;
445  download_mgr->watch_fds_ = static_cast<struct pollfd *>(
446  srealloc(download_mgr->watch_fds_,
447  download_mgr->watch_fds_size_ * sizeof(struct pollfd)));
448  }
449  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].fd = s;
450  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].events = 0;
451  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].revents = 0;
452  download_mgr->watch_fds_inuse_++;
453  }
454 
455  switch (action) {
456  case CURL_POLL_IN:
457  download_mgr->watch_fds_[index].events = POLLIN | POLLPRI;
458  break;
459  case CURL_POLL_OUT:
460  download_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND;
461  break;
462  case CURL_POLL_INOUT:
463  download_mgr->watch_fds_[index].events =
464  POLLIN | POLLPRI | POLLOUT | POLLWRBAND;
465  break;
466  case CURL_POLL_REMOVE:
467  if (index < download_mgr->watch_fds_inuse_-1) {
468  download_mgr->watch_fds_[index] =
469  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_-1];
470  }
471  download_mgr->watch_fds_inuse_--;
472  // Shrink array if necessary
473  if ((download_mgr->watch_fds_inuse_ > download_mgr->watch_fds_max_) &&
474  (download_mgr->watch_fds_inuse_ < download_mgr->watch_fds_size_/2))
475  {
476  download_mgr->watch_fds_size_ /= 2;
477  // LogCvmfs(kLogDownload, kLogDebug, "shrinking watch_fds_ (%d)",
478  // watch_fds_size_);
479  download_mgr->watch_fds_ = static_cast<struct pollfd *>(
480  srealloc(download_mgr->watch_fds_,
481  download_mgr->watch_fds_size_*sizeof(struct pollfd)));
482  // LogCvmfs(kLogDownload, kLogDebug, "shrinking watch_fds_ done",
483  // watch_fds_size_);
484  }
485  break;
486  default:
487  break;
488  }
489 
490  return 0;
491 }
492 
493 
497 void *DownloadManager::MainDownload(void *data) {
498  LogCvmfs(kLogDownload, kLogDebug, "download I/O thread started");
499  DownloadManager *download_mgr = static_cast<DownloadManager *>(data);
500 
501  download_mgr->watch_fds_ =
502  static_cast<struct pollfd *>(smalloc(2 * sizeof(struct pollfd)));
503  download_mgr->watch_fds_size_ = 2;
504  download_mgr->watch_fds_[0].fd = download_mgr->pipe_terminate_[0];
505  download_mgr->watch_fds_[0].events = POLLIN | POLLPRI;
506  download_mgr->watch_fds_[0].revents = 0;
507  download_mgr->watch_fds_[1].fd = download_mgr->pipe_jobs_[0];
508  download_mgr->watch_fds_[1].events = POLLIN | POLLPRI;
509  download_mgr->watch_fds_[1].revents = 0;
510  download_mgr->watch_fds_inuse_ = 2;
511 
512  int still_running = 0;
513  struct timeval timeval_start, timeval_stop;
514  gettimeofday(&timeval_start, NULL);
515  while (true) {
516  int timeout;
517  if (still_running) {
518  /* NOTE: The following might degrade the performance for many small files
519  * use case. TODO(jblomer): look into it.
520  // Specify a timeout for polling in ms; this allows us to return
521  // to libcurl once a second so it can look for internal operations
522  // which timed out. libcurl has a more elaborate mechanism
523  // (CURLMOPT_TIMERFUNCTION) that would inform us of the next potential
524  // timeout. TODO(bbockelm) we should switch to that in the future.
525  timeout = 100;
526  */
527  timeout = 1;
528  } else {
529  timeout = -1;
530  gettimeofday(&timeval_stop, NULL);
531  int64_t delta = static_cast<int64_t>(
532  1000 * DiffTimeSeconds(timeval_start, timeval_stop));
533  perf::Xadd(download_mgr->counters_->sz_transfer_time, delta);
534  }
535  int retval = poll(download_mgr->watch_fds_, download_mgr->watch_fds_inuse_,
536  timeout);
537  if (retval < 0) {
538  continue;
539  }
540 
541  // Handle timeout
542  if (retval == 0) {
543  curl_multi_socket_action(download_mgr->curl_multi_,
544  CURL_SOCKET_TIMEOUT,
545  0,
546  &still_running);
547  }
548 
549  // Terminate I/O thread
550  if (download_mgr->watch_fds_[0].revents)
551  break;
552 
553  // New job arrives
554  if (download_mgr->watch_fds_[1].revents) {
555  download_mgr->watch_fds_[1].revents = 0;
556  JobInfo *info;
557  // NOLINTNEXTLINE(bugprone-sizeof-expression)
558  ReadPipe(download_mgr->pipe_jobs_[0], &info, sizeof(info));
559  if (!still_running)
560  gettimeofday(&timeval_start, NULL);
561  CURL *handle = download_mgr->AcquireCurlHandle();
562  download_mgr->InitializeRequest(info, handle);
563  download_mgr->SetUrlOptions(info);
564  curl_multi_add_handle(download_mgr->curl_multi_, handle);
565  curl_multi_socket_action(download_mgr->curl_multi_,
566  CURL_SOCKET_TIMEOUT,
567  0,
568  &still_running);
569  }
570 
571  // Activity on curl sockets
572  // Within this loop the curl_multi_socket_action() may cause socket(s)
573  // to be removed from watch_fds_. If a socket is removed it is replaced
574  // by the socket at the end of the array and the inuse count is decreased.
575  // Therefore loop over the array in reverse order.
576  for (int64_t i = download_mgr->watch_fds_inuse_-1; i >= 2; --i) {
577  if (i >= download_mgr->watch_fds_inuse_) {
578  continue;
579  }
580  if (download_mgr->watch_fds_[i].revents) {
581  int ev_bitmask = 0;
582  if (download_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI))
583  ev_bitmask |= CURL_CSELECT_IN;
584  if (download_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
585  ev_bitmask |= CURL_CSELECT_OUT;
586  if (download_mgr->watch_fds_[i].revents &
587  (POLLERR | POLLHUP | POLLNVAL))
588  {
589  ev_bitmask |= CURL_CSELECT_ERR;
590  }
591  download_mgr->watch_fds_[i].revents = 0;
592 
593  curl_multi_socket_action(download_mgr->curl_multi_,
594  download_mgr->watch_fds_[i].fd,
595  ev_bitmask,
596  &still_running);
597  }
598  }
599 
600  // Check if transfers are completed
601  CURLMsg *curl_msg;
602  int msgs_in_queue;
603  while ((curl_msg = curl_multi_info_read(download_mgr->curl_multi_,
604  &msgs_in_queue)))
605  {
606  if (curl_msg->msg == CURLMSG_DONE) {
607  perf::Inc(download_mgr->counters_->n_requests);
608  JobInfo *info;
609  CURL *easy_handle = curl_msg->easy_handle;
610  int curl_error = curl_msg->data.result;
611  curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
612 
613  curl_multi_remove_handle(download_mgr->curl_multi_, easy_handle);
614  if (download_mgr->VerifyAndFinalize(curl_error, info)) {
615  curl_multi_add_handle(download_mgr->curl_multi_, easy_handle);
616  curl_multi_socket_action(download_mgr->curl_multi_,
617  CURL_SOCKET_TIMEOUT,
618  0,
619  &still_running);
620  } else {
621  // Return easy handle into pool and write result back
622  download_mgr->ReleaseCurlHandle(easy_handle);
623 
624  WritePipe(info->wait_at[1], &info->error_code,
625  sizeof(info->error_code));
626  }
627  }
628  }
629  }
630 
631  for (set<CURL *>::iterator i = download_mgr->pool_handles_inuse_->begin(),
632  iEnd = download_mgr->pool_handles_inuse_->end(); i != iEnd; ++i)
633  {
634  curl_multi_remove_handle(download_mgr->curl_multi_, *i);
635  curl_easy_cleanup(*i);
636  }
637  download_mgr->pool_handles_inuse_->clear();
638  free(download_mgr->watch_fds_);
639 
640  LogCvmfs(kLogDownload, kLogDebug, "download I/O thread terminated");
641  return NULL;
642 }
643 
644 
645 //------------------------------------------------------------------------------
646 
647 
648 HeaderLists::~HeaderLists() {
649  for (unsigned i = 0; i < blocks_.size(); ++i) {
650  delete[] blocks_[i];
651  }
652  blocks_.clear();
653 }
654 
655 
656 curl_slist *HeaderLists::GetList(const char *header) {
657  return Get(header);
658 }
659 
660 
661 curl_slist *HeaderLists::DuplicateList(curl_slist *slist) {
662  assert(slist);
663  curl_slist *copy = GetList(slist->data);
664  copy->next = slist->next;
665  curl_slist *prev = copy;
666  slist = slist->next;
667  while (slist) {
668  curl_slist *new_link = Get(slist->data);
669  new_link->next = slist->next;
670  prev->next = new_link;
671  prev = new_link;
672  slist = slist->next;
673  }
674  return copy;
675 }
676 
677 
678 void HeaderLists::AppendHeader(curl_slist *slist, const char *header) {
679  assert(slist);
680  curl_slist *new_link = Get(header);
681  new_link->next = NULL;
682 
683  while (slist->next)
684  slist = slist->next;
685  slist->next = new_link;
686 }
687 
688 
694 void HeaderLists::CutHeader(const char *header, curl_slist **slist) {
695  assert(slist);
696  curl_slist head;
697  head.next = *slist;
698  curl_slist *prev = &head;
699  curl_slist *rover = *slist;
700  while (rover) {
701  if (strcmp(rover->data, header) == 0) {
702  prev->next = rover->next;
703  Put(rover);
704  rover = prev;
705  }
706  prev = rover;
707  rover = rover->next;
708  }
709  *slist = head.next;
710 }
711 
712 
713 void HeaderLists::PutList(curl_slist *slist) {
714  while (slist) {
715  curl_slist *next = slist->next;
716  Put(slist);
717  slist = next;
718  }
719 }
720 
721 
722 string HeaderLists::Print(curl_slist *slist) {
723  string verbose;
724  while (slist) {
725  verbose += string(slist->data) + "\n";
726  slist = slist->next;
727  }
728  return verbose;
729 }
730 
731 
732 curl_slist *HeaderLists::Get(const char *header) {
733  for (unsigned i = 0; i < blocks_.size(); ++i) {
734  for (unsigned j = 0; j < kBlockSize; ++j) {
735  if (!IsUsed(&(blocks_[i][j]))) {
736  blocks_[i][j].data = const_cast<char *>(header);
737  return &(blocks_[i][j]);
738  }
739  }
740  }
741 
742  // All used, new block
743  AddBlock();
744  blocks_[blocks_.size()-1][0].data = const_cast<char *>(header);
745  return &(blocks_[blocks_.size()-1][0]);
746 }
747 
748 
749 void HeaderLists::Put(curl_slist *slist) {
750  slist->data = NULL;
751  slist->next = NULL;
752 }
753 
754 
755 void HeaderLists::AddBlock() {
756  curl_slist *new_block = new curl_slist[kBlockSize];
757  for (unsigned i = 0; i < kBlockSize; ++i) {
758  Put(&new_block[i]);
759  }
760  blocks_.push_back(new_block);
761 }
762 
763 
764 //------------------------------------------------------------------------------
765 
766 
767 string DownloadManager::ProxyInfo::Print() {
768  if (url == "DIRECT")
769  return url;
770 
771  string result = url;
772  int remaining =
773  static_cast<int>(host.deadline()) - static_cast<int>(time(NULL));
774  string expinfo = (remaining >= 0) ? "+" : "";
775  if (abs(remaining) >= 3600) {
776  expinfo += StringifyInt(remaining/3600) + "h";
777  } else if (abs(remaining) >= 60) {
778  expinfo += StringifyInt(remaining/60) + "m";
779  } else {
780  expinfo += StringifyInt(remaining) + "s";
781  }
782  if (host.status() == dns::kFailOk) {
783  result += " (" + host.name() + ", " + expinfo + ")";
784  } else {
785  result += " (:unresolved:, " + expinfo + ")";
786  }
787  return result;
788 }
789 
790 
795 CURL *DownloadManager::AcquireCurlHandle() {
796  CURL *handle;
797 
798  if (pool_handles_idle_->empty()) {
799  // Create a new handle
800  handle = curl_easy_init();
801  assert(handle != NULL);
802 
803  curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
804  // curl_easy_setopt(curl_default, CURLOPT_FAILONERROR, 1);
805  curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION, CallbackCurlHeader);
806  curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlData);
807  } else {
808  handle = *(pool_handles_idle_->begin());
809  pool_handles_idle_->erase(pool_handles_idle_->begin());
810  }
811 
812  pool_handles_inuse_->insert(handle);
813 
814  return handle;
815 }
816 
817 
818 void DownloadManager::ReleaseCurlHandle(CURL *handle) {
819  set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
820  assert(elem != pool_handles_inuse_->end());
821 
822  if (pool_handles_idle_->size() > pool_max_handles_) {
823  curl_easy_cleanup(*elem);
824  } else {
825  pool_handles_idle_->insert(*elem);
826  }
827 
828  pool_handles_inuse_->erase(elem);
829 }
830 
831 
836 void DownloadManager::InitializeRequest(JobInfo *info, CURL *handle) {
837  // Initialize internal download state
838  info->curl_handle = handle;
839  info->error_code = kFailOk;
840  info->http_code = -1;
841  info->follow_redirects = follow_redirects_;
842  info->num_used_proxies = 1;
843  info->num_used_hosts = 1;
844  info->num_retries = 0;
845  info->backoff_ms = 0;
846  info->headers = header_lists_->DuplicateList(default_headers_);
847  if (info->info_header) {
848  header_lists_->AppendHeader(info->headers, info->info_header);
849  }
850  if (info->force_nocache) {
851  SetNocache(info);
852  } else {
853  info->nocache = false;
854  }
855  if (info->compressed) {
856  zlib::DecompressInit(&(info->zstream));
857  }
858  if (info->expected_hash) {
859  assert(info->hash_context.buffer != NULL);
860  shash::Init(info->hash_context);
861  }
862 
863  if ((info->range_offset != -1) && (info->range_size)) {
864  char byte_range_array[100];
865  const int64_t range_lower = static_cast<int64_t>(info->range_offset);
866  const int64_t range_upper = static_cast<int64_t>(
867  info->range_offset + info->range_size - 1);
868  if (snprintf(byte_range_array, sizeof(byte_range_array),
869  "%" PRId64 "-%" PRId64,
870  range_lower, range_upper) == 100)
871  {
872  PANIC(NULL); // Should be impossible given limits on offset size.
873  }
874  curl_easy_setopt(handle, CURLOPT_RANGE, byte_range_array);
875  } else {
876  curl_easy_setopt(handle, CURLOPT_RANGE, NULL);
877  }
878 
879  // Set curl parameters
880  curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
881  curl_easy_setopt(handle, CURLOPT_WRITEHEADER,
882  static_cast<void *>(info));
883  curl_easy_setopt(handle, CURLOPT_WRITEDATA, static_cast<void *>(info));
884  curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->headers);
885  if (info->head_request) {
886  curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
887  } else {
888  curl_easy_setopt(handle, CURLOPT_HTTPGET, 1);
889  }
890  if (opt_ipv4_only_) {
891  curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
892  }
893  if (follow_redirects_) {
894  curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1);
895  curl_easy_setopt(handle, CURLOPT_MAXREDIRS, 4);
896  }
897 }
898 
899 
904 void DownloadManager::SetUrlOptions(JobInfo *info) {
905  CURL *curl_handle = info->curl_handle;
906  string url_prefix;
907 
908  MutexLockGuard m(lock_options_);
909  // Check if proxy group needs to be reset from backup to primary
910  if (opt_timestamp_backup_proxies_ > 0) {
911  const time_t now = time(NULL);
912  if (static_cast<int64_t>(now) >
913  static_cast<int64_t>(opt_timestamp_backup_proxies_ +
914  opt_proxy_groups_reset_after_))
915  {
916  opt_proxy_groups_current_ = 0;
917  opt_timestamp_backup_proxies_ = 0;
918  RebalanceProxiesUnlocked("reset proxy group");
919  }
920  }
921  // Check if load-balanced proxies within the group need to be reset
922  if (opt_timestamp_failover_proxies_ > 0) {
923  const time_t now = time(NULL);
924  if (static_cast<int64_t>(now) >
925  static_cast<int64_t>(opt_timestamp_failover_proxies_ +
926  opt_proxy_groups_reset_after_))
927  {
928  RebalanceProxiesUnlocked("reset load-balanced proxies");
929  }
930  }
931  // Check if host needs to be reset
932  if (opt_timestamp_backup_host_ > 0) {
933  const time_t now = time(NULL);
934  if (static_cast<int64_t>(now) >
935  static_cast<int64_t>(opt_timestamp_backup_host_ +
936  opt_host_reset_after_))
937  {
939  "switching host from %s to %s (reset host)",
940  (*opt_host_chain_)[opt_host_chain_current_].c_str(),
941  (*opt_host_chain_)[0].c_str());
942  opt_host_chain_current_ = 0;
943  opt_timestamp_backup_host_ = 0;
944  }
945  }
946 
947  ProxyInfo *proxy = ChooseProxyUnlocked(info->expected_hash);
948  if (!proxy || (proxy->url == "DIRECT")) {
949  info->proxy = "DIRECT";
950  curl_easy_setopt(info->curl_handle, CURLOPT_PROXY, "");
951  } else {
952  // Note: inside ValidateProxyIpsUnlocked() we may change the proxy data
953  // structure, so we must not pass proxy->... (== current_proxy())
954  // parameters directly
955  std::string purl = proxy->url;
956  dns::Host phost = proxy->host;
957  const bool changed = ValidateProxyIpsUnlocked(purl, phost);
958  // Current proxy may have changed
959  if (changed)
960  proxy = ChooseProxyUnlocked(info->expected_hash);
961  info->proxy = proxy->url;
962  if (proxy->host.status() == dns::kFailOk) {
963  curl_easy_setopt(info->curl_handle, CURLOPT_PROXY, info->proxy.c_str());
964  } else {
965  // We know it can't work, don't even try to download
966  curl_easy_setopt(info->curl_handle, CURLOPT_PROXY, "0.0.0.0");
967  }
968  }
969  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, opt_low_speed_limit_);
970  if (info->proxy != "DIRECT") {
971  curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_proxy_);
972  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_proxy_);
973  } else {
974  curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_direct_);
975  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_direct_);
976  }
977  if (!opt_dns_server_.empty())
978  curl_easy_setopt(curl_handle, CURLOPT_DNS_SERVERS, opt_dns_server_.c_str());
979 
980  if (info->probe_hosts && opt_host_chain_) {
981  url_prefix = (*opt_host_chain_)[opt_host_chain_current_];
982  info->current_host_chain_index = opt_host_chain_current_;
983  }
984 
985  string url = url_prefix + *(info->url);
986 
987  curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 1L);
988  if (url.substr(0, 5) == "https") {
989  bool rvb = ssl_certificate_store_.ApplySslCertificatePath(curl_handle);
990  if (!rvb) {
992  "Failed to set SSL certificate path %s",
993  ssl_certificate_store_.GetCaPath().c_str());
994  }
995  if (info->pid != -1) {
996  if (credentials_attachment_ == NULL) {
998  "uses secure downloads but no credentials attachment set");
999  } else {
1000  bool retval = credentials_attachment_->ConfigureCurlHandle(
1001  curl_handle, info->pid, &info->cred_data);
1002  if (!retval) {
1003  LogCvmfs(kLogDownload, kLogDebug, "failed attaching credentials");
1004  }
1005  }
1006  }
1007  // The download manager disables signal handling in the curl library;
1008  // as OpenSSL's implementation of TLS will generate a sigpipe in some
1009  // error paths, we must explicitly disable SIGPIPE here.
1010  // TODO(jblomer): it should be enough to do this once
1011  signal(SIGPIPE, SIG_IGN);
1012  }
1013 
1014  if (url.find("@proxy@") != string::npos) {
1015  // This is used in Geo-API requests (only), to replace a portion of the
1016  // URL with the current proxy name for the sake of caching the result.
1017  // Replace the @proxy@ either with a passed in "forced" template (which
1018  // is set from $CVMFS_PROXY_TEMPLATE) if there is one, or a "direct"
1019  // template (which is the uuid) if there's no proxy, or the name of the
1020  // proxy.
1021  string replacement;
1022  if (proxy_template_forced_ != "") {
1023  replacement = proxy_template_forced_;
1024  } else if (info->proxy == "DIRECT") {
1025  replacement = proxy_template_direct_;
1026  } else {
1027  if (opt_proxy_groups_current_ >= opt_proxy_groups_fallback_) {
1028  // It doesn't make sense to use the fallback proxies in Geo-API requests
1029  // since the fallback proxies are supposed to get sorted, too.
1030  info->proxy = "DIRECT";
1031  curl_easy_setopt(info->curl_handle, CURLOPT_PROXY, "");
1032  replacement = proxy_template_direct_;
1033  } else {
1034  replacement = ChooseProxyUnlocked(info->expected_hash)->host.name();
1035  }
1036  }
1037  replacement = (replacement == "") ? proxy_template_direct_ : replacement;
1038  LogCvmfs(kLogDownload, kLogDebug, "replacing @proxy@ by %s",
1039  replacement.c_str());
1040  url = ReplaceAll(url, "@proxy@", replacement);
1041  }
1042 
1043  if ((info->destination == kDestinationMem) &&
1044  (info->destination_mem.size == 0) &&
1045  HasPrefix(url, "file://", false))
1046  {
1047  info->destination_mem.size = 64*1024;
1048  info->destination_mem.data = static_cast<char *>(smalloc(64*1024));
1049  }
1050 
1051  curl_easy_setopt(curl_handle, CURLOPT_URL, EscapeUrl(url).c_str());
1052 }
1053 
1054 
1064 bool DownloadManager::ValidateProxyIpsUnlocked(
1065  const string &url,
1066  const dns::Host &host)
1067 {
1068  if (!host.IsExpired())
1069  return false;
1070  LogCvmfs(kLogDownload, kLogDebug, "validate DNS entry for %s",
1071  host.name().c_str());
1072 
1073  unsigned group_idx = opt_proxy_groups_current_;
1074  dns::Host new_host = resolver_->Resolve(host.name());
1075 
1076  bool update_only = true; // No changes to the list of IP addresses.
1077  if (new_host.status() != dns::kFailOk) {
1078  // Try again later in case resolving fails.
1080  "failed to resolve IP addresses for %s (%d - %s)",
1081  host.name().c_str(), new_host.status(),
1082  dns::Code2Ascii(new_host.status()));
1083  new_host = dns::Host::ExtendDeadline(host, resolver_->min_ttl());
1084  } else if (!host.IsEquivalent(new_host)) {
1085  update_only = false;
1086  }
1087 
1088  if (update_only) {
1089  for (unsigned i = 0; i < (*opt_proxy_groups_)[group_idx].size(); ++i) {
1090  if ((*opt_proxy_groups_)[group_idx][i].host.id() == host.id())
1091  (*opt_proxy_groups_)[group_idx][i].host = new_host;
1092  }
1093  return false;
1094  }
1095 
1096  assert(new_host.status() == dns::kFailOk);
1097 
1098  // Remove old host objects, insert new objects, and rebalance.
1100  "DNS entries for proxy %s changed, adjusting", host.name().c_str());
1101  vector<ProxyInfo> *group = current_proxy_group();
1102  opt_num_proxies_ -= group->size();
1103  for (unsigned i = 0; i < group->size(); ) {
1104  if ((*group)[i].host.id() == host.id()) {
1105  group->erase(group->begin() + i);
1106  } else {
1107  i++;
1108  }
1109  }
1110  vector<ProxyInfo> new_infos;
1111  set<string> best_addresses = new_host.ViewBestAddresses(opt_ip_preference_);
1112  set<string>::const_iterator iter_ips = best_addresses.begin();
1113  for (; iter_ips != best_addresses.end(); ++iter_ips) {
1114  string url_ip = dns::RewriteUrl(url, *iter_ips);
1115  new_infos.push_back(ProxyInfo(new_host, url_ip));
1116  }
1117  group->insert(group->end(), new_infos.begin(), new_infos.end());
1118  opt_num_proxies_ += new_infos.size();
1119 
1120  RebalanceProxiesUnlocked("DNS change");
1121  return true;
1122 }
1123 
1124 
1128 void DownloadManager::UpdateStatistics(CURL *handle) {
1129  double val;
1130  int retval;
1131  int64_t sum = 0;
1132 
1133  retval = curl_easy_getinfo(handle, CURLINFO_SIZE_DOWNLOAD, &val);
1134  assert(retval == CURLE_OK);
1135  sum += static_cast<int64_t>(val);
1136  /*retval = curl_easy_getinfo(handle, CURLINFO_HEADER_SIZE, &val);
1137  assert(retval == CURLE_OK);
1138  sum += static_cast<int64_t>(val);*/
1139  perf::Xadd(counters_->sz_transferred_bytes, sum);
1140 }
1141 
1142 
1146 bool DownloadManager::CanRetry(const JobInfo *info) {
1147  MutexLockGuard m(lock_options_);
1148  unsigned max_retries = opt_max_retries_;
1149 
1150  return !info->nocache && (info->num_retries < max_retries) &&
1151  (IsProxyTransferError(info->error_code) ||
1153 }
1154 
1162 void DownloadManager::Backoff(JobInfo *info) {
1163  unsigned backoff_init_ms = 0;
1164  unsigned backoff_max_ms = 0;
1165  {
1166  MutexLockGuard m(lock_options_);
1167  backoff_init_ms = opt_backoff_init_ms_;
1168  backoff_max_ms = opt_backoff_max_ms_;
1169  }
1170 
1171  info->num_retries++;
1172  perf::Inc(counters_->n_retries);
1173  if (info->backoff_ms == 0) {
1174  info->backoff_ms = prng_.Next(backoff_init_ms + 1); // Must be != 0
1175  } else {
1176  info->backoff_ms *= 2;
1177  }
1178  if (info->backoff_ms > backoff_max_ms) info->backoff_ms = backoff_max_ms;
1179 
1180  LogCvmfs(kLogDownload, kLogDebug, "backing off for %d ms", info->backoff_ms);
1181  SafeSleepMs(info->backoff_ms);
1182 }
1183 
1184 void DownloadManager::SetNocache(JobInfo *info) {
1185  if (info->nocache)
1186  return;
1187  header_lists_->AppendHeader(info->headers, "Pragma: no-cache");
1188  header_lists_->AppendHeader(info->headers, "Cache-Control: no-cache");
1189  curl_easy_setopt(info->curl_handle, CURLOPT_HTTPHEADER, info->headers);
1190  info->nocache = true;
1191 }
1192 
1193 
1198 void DownloadManager::SetRegularCache(JobInfo *info) {
1199  if (info->nocache == false)
1200  return;
1201  header_lists_->CutHeader("Pragma: no-cache", &(info->headers));
1202  header_lists_->CutHeader("Cache-Control: no-cache", &(info->headers));
1203  curl_easy_setopt(info->curl_handle, CURLOPT_HTTPHEADER, info->headers);
1204  info->nocache = false;
1205 }
1206 
1207 
1211 void DownloadManager::ReleaseCredential(JobInfo *info) {
1212  if (info->cred_data) {
1213  assert(credentials_attachment_ != NULL); // Someone must have set it
1214  credentials_attachment_->ReleaseCurlHandle(info->curl_handle,
1215  info->cred_data);
1216  info->cred_data = NULL;
1217  }
1218 }
1219 
1220 
1227 bool DownloadManager::VerifyAndFinalize(const int curl_error, JobInfo *info) {
1229  "Verify downloaded url %s, proxy %s (curl error %d)",
1230  info->url->c_str(), info->proxy.c_str(), curl_error);
1231  UpdateStatistics(info->curl_handle);
1232 
1233  // Verification and error classification
1234  switch (curl_error) {
1235  case CURLE_OK:
1236  // Verify content hash
1237  if (info->expected_hash) {
1238  shash::Any match_hash;
1239  shash::Final(info->hash_context, &match_hash);
1240  if (match_hash != *(info->expected_hash)) {
1242  "hash verification of %s failed (expected %s, got %s)",
1243  info->url->c_str(), info->expected_hash->ToString().c_str(),
1244  match_hash.ToString().c_str());
1245  info->error_code = kFailBadData;
1246  break;
1247  }
1248  }
1249 
1250  // Decompress memory in a single run
1251  if ((info->destination == kDestinationMem) && info->compressed) {
1252  void *buf;
1253  uint64_t size;
1254  bool retval = zlib::DecompressMem2Mem(
1255  info->destination_mem.data,
1256  static_cast<int64_t>(info->destination_mem.pos),
1257  &buf, &size);
1258  if (retval) {
1259  free(info->destination_mem.data);
1260  info->destination_mem.data = static_cast<char *>(buf);
1261  info->destination_mem.pos = info->destination_mem.size = size;
1262  } else {
1264  "decompression (memory) of url %s failed",
1265  info->url->c_str());
1266  info->error_code = kFailBadData;
1267  break;
1268  }
1269  }
1270 
1271  info->error_code = kFailOk;
1272  break;
1273  case CURLE_UNSUPPORTED_PROTOCOL:
1275  break;
1276  case CURLE_URL_MALFORMAT:
1277  info->error_code = kFailBadUrl;
1278  break;
1279  case CURLE_COULDNT_RESOLVE_PROXY:
1280  info->error_code = kFailProxyResolve;
1281  break;
1282  case CURLE_COULDNT_RESOLVE_HOST:
1283  info->error_code = kFailHostResolve;
1284  break;
1285  case CURLE_OPERATION_TIMEDOUT:
1286  info->error_code = (info->proxy == "DIRECT") ?
1288  break;
1289  case CURLE_PARTIAL_FILE:
1290  case CURLE_GOT_NOTHING:
1291  case CURLE_RECV_ERROR:
1292  info->error_code = (info->proxy == "DIRECT") ?
1294  break;
1295  case CURLE_FILE_COULDNT_READ_FILE:
1296  case CURLE_COULDNT_CONNECT:
1297  if (info->proxy != "DIRECT") {
1298  // This is a guess. Fail-over can still change to switching host
1300  } else {
1302  }
1303  break;
1304  case CURLE_TOO_MANY_REDIRECTS:
1306  break;
1307  case CURLE_SSL_CACERT_BADFILE:
1309  "Failed to load certificate bundle. "
1310  "X509_CERT_BUNDLE might point to the wrong location.");
1312  break;
1313  // As of curl 7.62.0, CURLE_SSL_CACERT is the same as
1314  // CURLE_PEER_FAILED_VERIFICATION
1315  case CURLE_PEER_FAILED_VERIFICATION:
1317  "invalid SSL certificate of remote host. "
1318  "X509_CERT_DIR and/or X509_CERT_BUNDLE might point to the wrong "
1319  "location.");
1321  break;
1322  case CURLE_ABORTED_BY_CALLBACK:
1323  case CURLE_WRITE_ERROR:
1324  // Error set by callback
1325  break;
1326  default:
1327  LogCvmfs(kLogDownload, kLogSyslogErr, "unexpected curl error (%d) while "
1328  "trying to fetch %s", curl_error, info->url->c_str());
1329  info->error_code = kFailOther;
1330  break;
1331  }
1332 
1333  std::vector<std::string> *host_chain = opt_host_chain_;
1334 
1335  // Determination if download should be repeated
1336  bool try_again = false;
1337  bool same_url_retry = CanRetry(info);
1338  if (info->error_code != kFailOk) {
1339  MutexLockGuard m(lock_options_);
1340  if (info->error_code == kFailBadData) {
1341  if (!info->nocache) {
1342  try_again = true;
1343  } else {
1344  // Make it a host failure
1346  "data corruption with no-cache header, try another host");
1347 
1348  info->error_code = kFailHostHttp;
1349  }
1350  }
1351  if ( same_url_retry || (
1352  ( (info->error_code == kFailHostResolve) ||
1354  (info->error_code == kFailHostHttp)) &&
1355  info->probe_hosts &&
1356  host_chain && (info->num_used_hosts < host_chain->size()))
1357  )
1358  {
1359  try_again = true;
1360  }
1361  if ( same_url_retry || (
1362  ( (info->error_code == kFailProxyResolve) ||
1364  (info->error_code == kFailProxyHttp)) )
1365  )
1366  {
1367  try_again = true;
1368  // If all proxies failed, do a next round with the next host
1369  if (!same_url_retry && (info->num_used_proxies >= opt_num_proxies_)) {
1370  // Check if this can be made a host fail-over
1371  if (info->probe_hosts &&
1372  host_chain &&
1373  (info->num_used_hosts < host_chain->size()))
1374  {
1375  // reset proxy group if not already performed by other handle
1376  if (opt_proxy_groups_) {
1377  if ((opt_proxy_groups_current_ > 0) ||
1378  (opt_proxy_groups_current_burned_ > 0))
1379  {
1380  opt_proxy_groups_current_ = 0;
1381  opt_timestamp_backup_proxies_ = 0;
1382  RebalanceProxiesUnlocked("reset proxies for host failover");
1383  }
1384  }
1385 
1386  // Make it a host failure
1387  LogCvmfs(kLogDownload, kLogDebug, "make it a host failure");
1388  info->num_used_proxies = 1;
1390  } else {
1391  try_again = false;
1392  }
1393  } // Make a proxy failure a host failure
1394  } // Proxy failure assumed
1395  }
1396 
1397  if (try_again) {
1398  LogCvmfs(kLogDownload, kLogDebug, "Trying again on same curl handle, "
1399  "same url: %d, error code %d", same_url_retry, info->error_code);
1400  // Reset internal state and destination
1401  if ((info->destination == kDestinationMem) && info->destination_mem.data) {
1402  free(info->destination_mem.data);
1403  info->destination_mem.data = NULL;
1404  info->destination_mem.size = 0;
1405  info->destination_mem.pos = 0;
1406  }
1407  if ((info->destination == kDestinationFile) ||
1408  (info->destination == kDestinationPath))
1409  {
1410  if ((fflush(info->destination_file) != 0) ||
1411  (ftruncate(fileno(info->destination_file), 0) != 0))
1412  {
1413  info->error_code = kFailLocalIO;
1414  goto verify_and_finalize_stop;
1415  }
1416  rewind(info->destination_file);
1417  }
1418  if (info->destination == kDestinationSink) {
1419  if (info->destination_sink->Reset() != 0) {
1420  info->error_code = kFailLocalIO;
1421  goto verify_and_finalize_stop;
1422  }
1423  }
1424  if (info->expected_hash)
1425  shash::Init(info->hash_context);
1426  if (info->compressed)
1427  zlib::DecompressInit(&info->zstream);
1428  SetRegularCache(info);
1429 
1430  // Failure handling
1431  bool switch_proxy = false;
1432  bool switch_host = false;
1433  switch (info->error_code) {
1434  case kFailBadData:
1435  SetNocache(info);
1436  break;
1437  case kFailProxyResolve:
1438  case kFailProxyHttp:
1439  switch_proxy = true;
1440  break;
1441  case kFailHostResolve:
1442  case kFailHostHttp:
1443  case kFailHostAfterProxy:
1444  switch_host = true;
1445  break;
1446  default:
1447  if (IsProxyTransferError(info->error_code)) {
1448  if (same_url_retry) {
1449  Backoff(info);
1450  } else {
1451  switch_proxy = true;
1452  }
1453  } else if (IsHostTransferError(info->error_code)) {
1454  if (same_url_retry) {
1455  Backoff(info);
1456  } else {
1457  switch_host = true;
1458  }
1459  } else {
1460  // No other errors expected when retrying
1461  PANIC(NULL);
1462  }
1463  }
1464  if (switch_proxy) {
1465  ReleaseCredential(info);
1466  SwitchProxy(info);
1467  info->num_used_proxies++;
1468  SetUrlOptions(info);
1469  }
1470  if (switch_host) {
1471  ReleaseCredential(info);
1472  SwitchHost(info);
1473  info->num_used_hosts++;
1474  SetUrlOptions(info);
1475  }
1476 
1477  return true; // try again
1478  }
1479 
1480  verify_and_finalize_stop:
1481  // Finalize, flush destination file
1482  ReleaseCredential(info);
1483  if ((info->destination == kDestinationFile) &&
1484  fflush(info->destination_file) != 0)
1485  {
1486  info->error_code = kFailLocalIO;
1487  } else if (info->destination == kDestinationPath) {
1488  if (fclose(info->destination_file) != 0)
1489  info->error_code = kFailLocalIO;
1490  info->destination_file = NULL;
1491  }
1492 
1493  if (info->compressed)
1494  zlib::DecompressFini(&info->zstream);
1495 
1496  if (info->headers) {
1497  header_lists_->PutList(info->headers);
1498  info->headers = NULL;
1499  }
1500 
1501  return false; // stop transfer and return to Fetch()
1502 }
1503 
1504 
1505 DownloadManager::DownloadManager() {
1506  pool_handles_idle_ = NULL;
1507  pool_handles_inuse_ = NULL;
1508  pool_max_handles_ = 0;
1509  curl_multi_ = NULL;
1510  default_headers_ = NULL;
1511 
1512  atomic_init32(&multi_threaded_);
1513  pipe_terminate_[0] = pipe_terminate_[1] = -1;
1514 
1515  pipe_jobs_[0] = pipe_jobs_[1] = -1;
1516  watch_fds_ = NULL;
1517  watch_fds_size_ = 0;
1518  watch_fds_inuse_ = 0;
1519  watch_fds_max_ = 0;
1520 
1521  lock_options_ =
1522  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1523  int retval = pthread_mutex_init(lock_options_, NULL);
1524  assert(retval == 0);
1525  lock_synchronous_mode_ =
1526  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1527  retval = pthread_mutex_init(lock_synchronous_mode_, NULL);
1528  assert(retval == 0);
1529 
1530  opt_dns_server_ = "";
1531  opt_ip_preference_ = dns::kIpPreferSystem;
1532  opt_timeout_proxy_ = 0;
1533  opt_timeout_direct_ = 0;
1534  opt_low_speed_limit_ = 0;
1535  opt_host_chain_ = NULL;
1536  opt_host_chain_rtt_ = NULL;
1537  opt_host_chain_current_ = 0;
1538  opt_proxy_groups_ = NULL;
1539  opt_proxy_groups_current_ = 0;
1540  opt_proxy_groups_current_burned_ = 0;
1541  opt_num_proxies_ = 0;
1542  opt_proxy_shard_ = false;
1543  opt_max_retries_ = 0;
1544  opt_backoff_init_ms_ = 0;
1545  opt_backoff_max_ms_ = 0;
1546  enable_info_header_ = false;
1547  opt_ipv4_only_ = false;
1548  follow_redirects_ = false;
1549 
1550  resolver_ = NULL;
1551 
1552  opt_timestamp_backup_proxies_ = 0;
1553  opt_timestamp_failover_proxies_ = 0;
1554  opt_proxy_groups_reset_after_ = 0;
1555  opt_timestamp_backup_host_ = 0;
1556  opt_host_reset_after_ = 0;
1557 
1558  credentials_attachment_ = NULL;
1559 
1560  counters_ = NULL;
1561 }
1562 
1563 
1564 DownloadManager::~DownloadManager() {
1565  pthread_mutex_destroy(lock_options_);
1566  pthread_mutex_destroy(lock_synchronous_mode_);
1567  free(lock_options_);
1568  free(lock_synchronous_mode_);
1569 }
1570 
1571 void DownloadManager::InitHeaders() {
1572  // User-Agent
1573  string cernvm_id = "User-Agent: cvmfs ";
1574 #ifdef CVMFS_LIBCVMFS
1575  cernvm_id += "libcvmfs ";
1576 #else
1577  cernvm_id += "Fuse ";
1578 #endif
1579  cernvm_id += string(VERSION);
1580  if (getenv("CERNVM_UUID") != NULL) {
1581  cernvm_id += " " +
1582  sanitizer::InputSanitizer("az AZ 09 -").Filter(getenv("CERNVM_UUID"));
1583  }
1584  user_agent_ = strdup(cernvm_id.c_str());
1585 
1586  header_lists_ = new HeaderLists();
1587 
1588  default_headers_ = header_lists_->GetList("Connection: Keep-Alive");
1589  header_lists_->AppendHeader(default_headers_, "Pragma:");
1590  header_lists_->AppendHeader(default_headers_, user_agent_);
1591 }
1592 
1593 
1594 void DownloadManager::FiniHeaders() {
1595  delete header_lists_;
1596  header_lists_ = NULL;
1597  default_headers_ = NULL;
1598 }
1599 
1600 
1601 void DownloadManager::Init(const unsigned max_pool_handles,
1602  const perf::StatisticsTemplate &statistics)
1603 {
1604  atomic_init32(&multi_threaded_);
1605  int retval = curl_global_init(CURL_GLOBAL_ALL);
1606  assert(retval == CURLE_OK);
1607  pool_handles_idle_ = new set<CURL *>;
1608  pool_handles_inuse_ = new set<CURL *>;
1609  pool_max_handles_ = max_pool_handles;
1610  watch_fds_max_ = 4*pool_max_handles_;
1611 
1612  opt_timeout_proxy_ = 5;
1613  opt_timeout_direct_ = 10;
1614  opt_low_speed_limit_ = 1024;
1615  opt_proxy_groups_current_ = 0;
1616  opt_proxy_groups_current_burned_ = 0;
1617  opt_num_proxies_ = 0;
1618  opt_proxy_shard_ = false;
1619  opt_host_chain_current_ = 0;
1620  opt_ip_preference_ = dns::kIpPreferSystem;
1621 
1622  counters_ = new Counters(statistics);
1623 
1624  user_agent_ = NULL;
1625  InitHeaders();
1626 
1627  curl_multi_ = curl_multi_init();
1628  assert(curl_multi_ != NULL);
1629  curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION, CallbackCurlSocket);
1630  curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1631  static_cast<void *>(this));
1632  curl_multi_setopt(curl_multi_, CURLMOPT_MAXCONNECTS, watch_fds_max_);
1633  curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1634  pool_max_handles_);
1635 
1636  prng_.InitLocaltime();
1637 
1638  // Name resolving
1639  if ((getenv("CVMFS_IPV4_ONLY") != NULL) &&
1640  (strlen(getenv("CVMFS_IPV4_ONLY")) > 0))
1641  {
1642  opt_ipv4_only_ = true;
1643  }
1644  resolver_ = dns::NormalResolver::Create(opt_ipv4_only_,
1645  kDnsDefaultRetries, kDnsDefaultTimeoutMs);
1646  assert(resolver_);
1647 }
1648 
1649 
1651  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1652  // Shutdown I/O thread
1653  char buf = 'T';
1654  WritePipe(pipe_terminate_[1], &buf, 1);
1655  pthread_join(thread_download_, NULL);
1656  // All handles are removed from the multi stack
1657  close(pipe_terminate_[1]);
1658  close(pipe_terminate_[0]);
1659  close(pipe_jobs_[1]);
1660  close(pipe_jobs_[0]);
1661  }
1662 
1663  for (set<CURL *>::iterator i = pool_handles_idle_->begin(),
1664  iEnd = pool_handles_idle_->end(); i != iEnd; ++i)
1665  {
1666  curl_easy_cleanup(*i);
1667  }
1668  delete pool_handles_idle_;
1669  delete pool_handles_inuse_;
1670  curl_multi_cleanup(curl_multi_);
1671  pool_handles_idle_ = NULL;
1672  pool_handles_inuse_ = NULL;
1673  curl_multi_ = NULL;
1674 
1675  FiniHeaders();
1676  if (user_agent_)
1677  free(user_agent_);
1678  user_agent_ = NULL;
1679 
1680  delete counters_;
1681  counters_ = NULL;
1682 
1683  delete opt_host_chain_;
1684  delete opt_host_chain_rtt_;
1685  opt_proxy_map_.clear();
1686  delete opt_proxy_groups_;
1687  opt_host_chain_ = NULL;
1688  opt_host_chain_rtt_ = NULL;
1689  opt_proxy_groups_ = NULL;
1690 
1691  curl_global_cleanup();
1692 
1693  delete resolver_;
1694  resolver_ = NULL;
1695 }
1696 
1697 
1703  MakePipe(pipe_terminate_);
1704  MakePipe(pipe_jobs_);
1705 
1706  int retval = pthread_create(&thread_download_, NULL, MainDownload,
1707  static_cast<void *>(this));
1708  assert(retval == 0);
1709 
1710  atomic_inc32(&multi_threaded_);
1711 }
1712 
1713 
1718  assert(info != NULL);
1719  assert(info->url != NULL);
1720 
1721  Failures result;
1722  result = PrepareDownloadDestination(info);
1723  if (result != kFailOk)
1724  return result;
1725 
1726  if (info->expected_hash) {
1729  info->hash_context.size = shash::GetContextSize(algorithm);
1730  info->hash_context.buffer = alloca(info->hash_context.size);
1731  }
1732 
1733  // Prepare cvmfs-info: header, allocate string on the stack
1734  info->info_header = NULL;
1735  if (enable_info_header_ && info->extra_info) {
1736  const char *header_name = "cvmfs-info: ";
1737  const size_t header_name_len = strlen(header_name);
1738  const unsigned header_size = 1 + header_name_len +
1739  EscapeHeader(*(info->extra_info), NULL, 0);
1740  info->info_header = static_cast<char *>(alloca(header_size));
1741  memcpy(info->info_header, header_name, header_name_len);
1742  EscapeHeader(*(info->extra_info), info->info_header + header_name_len,
1743  header_size - header_name_len);
1744  info->info_header[header_size-1] = '\0';
1745  }
1746 
1747  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1748  if (info->wait_at[0] == -1) {
1749  MakePipe(info->wait_at);
1750  }
1751 
1752  // LogCvmfs(kLogDownload, kLogDebug, "send job to thread, pipe %d %d",
1753  // info->wait_at[0], info->wait_at[1]);
1754  // NOLINTNEXTLINE(bugprone-sizeof-expression)
1755  WritePipe(pipe_jobs_[1], &info, sizeof(info));
1756  ReadPipe(info->wait_at[0], &result, sizeof(result));
1757  // LogCvmfs(kLogDownload, kLogDebug, "got result %d", result);
1758  } else {
1759  MutexLockGuard l(lock_synchronous_mode_);
1760  CURL *handle = AcquireCurlHandle();
1761  InitializeRequest(info, handle);
1762  SetUrlOptions(info);
1763  // curl_easy_setopt(handle, CURLOPT_VERBOSE, 1);
1764  int retval;
1765  do {
1766  retval = curl_easy_perform(handle);
1767  perf::Inc(counters_->n_requests);
1768  double elapsed;
1769  if (curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &elapsed) == CURLE_OK)
1770  {
1771  perf::Xadd(counters_->sz_transfer_time,
1772  static_cast<int64_t>(elapsed * 1000));
1773  }
1774  } while (VerifyAndFinalize(retval, info));
1775  result = info->error_code;
1776  ReleaseCurlHandle(info->curl_handle);
1777  }
1778 
1779  if (result != kFailOk) {
1780  LogCvmfs(kLogDownload, kLogDebug, "download failed (error %d - %s)", result,
1781  Code2Ascii(result));
1782 
1783  if (info->destination == kDestinationPath)
1784  unlink(info->destination_path->c_str());
1785 
1786  if (info->destination_mem.data) {
1787  free(info->destination_mem.data);
1788  info->destination_mem.data = NULL;
1789  info->destination_mem.size = 0;
1790  }
1791  }
1792 
1793  return result;
1794 }
1795 
1796 
1801 void DownloadManager::SetCredentialsAttachment(CredentialsAttachment *ca) {
1802  MutexLockGuard m(lock_options_);
1803  credentials_attachment_ = ca;
1804 }
1805 
1809 std::string DownloadManager::GetDnsServer() const {
1810  return opt_dns_server_;
1811 }
1812 
1817 void DownloadManager::SetDnsServer(const string &address) {
1818  if (!address.empty()) {
1819  MutexLockGuard m(lock_options_);
1820  opt_dns_server_ = address;
1821  assert(!opt_dns_server_.empty());
1822 
1823  vector<string> servers;
1824  servers.push_back(address);
1825  bool retval = resolver_->SetResolvers(servers);
1826  assert(retval);
1827  }
1828  LogCvmfs(kLogDownload, kLogSyslog, "set nameserver to %s", address.c_str());
1829 }
1830 
1831 
1835 void DownloadManager::SetDnsParameters(
1836  const unsigned retries,
1837  const unsigned timeout_ms)
1838 {
1839  MutexLockGuard m(lock_options_);
1840  if ((resolver_->retries() == retries) &&
1841  (resolver_->timeout_ms() == timeout_ms))
1842  {
1843  return;
1844  }
1845  delete resolver_;
1846  resolver_ = NULL;
1847  resolver_ =
1848  dns::NormalResolver::Create(opt_ipv4_only_, retries, timeout_ms);
1849  assert(resolver_);
1850 }
1851 
1852 
1853 void DownloadManager::SetDnsTtlLimits(
1854  const unsigned min_seconds,
1855  const unsigned max_seconds)
1856 {
1857  MutexLockGuard m(lock_options_);
1858  resolver_->set_min_ttl(min_seconds);
1859  resolver_->set_max_ttl(max_seconds);
1860 }
1861 
1862 
1863 void DownloadManager::SetIpPreference(dns::IpPreference preference) {
1864  MutexLockGuard m(lock_options_);
1865  opt_ip_preference_ = preference;
1866 }
1867 
1868 
1874 void DownloadManager::SetTimeout(const unsigned seconds_proxy,
1875  const unsigned seconds_direct)
1876 {
1877  MutexLockGuard m(lock_options_);
1878  opt_timeout_proxy_ = seconds_proxy;
1879  opt_timeout_direct_ = seconds_direct;
1880 }
1881 
1882 
1888 void DownloadManager::SetLowSpeedLimit(const unsigned low_speed_limit) {
1889  MutexLockGuard m(lock_options_);
1890  opt_low_speed_limit_ = low_speed_limit;
1891 }
1892 
1893 
1897 void DownloadManager::GetTimeout(unsigned *seconds_proxy,
1898  unsigned *seconds_direct)
1899 {
1900  MutexLockGuard m(lock_options_);
1901  *seconds_proxy = opt_timeout_proxy_;
1902  *seconds_direct = opt_timeout_direct_;
1903 }
1904 
1905 
1910 void DownloadManager::SetHostChain(const string &host_list) {
1911  SetHostChain(SplitString(host_list, ';'));
1912 }
1913 
1914 
1915 void DownloadManager::SetHostChain(const std::vector<std::string> &host_list) {
1916  MutexLockGuard m(lock_options_);
1917  opt_timestamp_backup_host_ = 0;
1918  delete opt_host_chain_;
1919  delete opt_host_chain_rtt_;
1920  opt_host_chain_current_ = 0;
1921 
1922  if (host_list.empty()) {
1923  opt_host_chain_ = NULL;
1924  opt_host_chain_rtt_ = NULL;
1925  return;
1926  }
1927 
1928  opt_host_chain_ = new vector<string>(host_list);
1929  opt_host_chain_rtt_ =
1930  new vector<int>(opt_host_chain_->size(), kProbeUnprobed);
1931  // LogCvmfs(kLogDownload, kLogSyslog, "using host %s",
1932  // (*opt_host_chain_)[0].c_str());
1933 }
1934 
1935 
1936 
1941 void DownloadManager::GetHostInfo(vector<string> *host_chain, vector<int> *rtt,
1942  unsigned *current_host)
1943 {
1944  MutexLockGuard m(lock_options_);
1945  if (opt_host_chain_) {
1946  if (current_host) {*current_host = opt_host_chain_current_;}
1947  if (host_chain) {*host_chain = *opt_host_chain_;}
1948  if (rtt) {*rtt = *opt_host_chain_rtt_;}
1949  }
1950 }
1951 
1952 
1961 void DownloadManager::SwitchProxy(JobInfo *info) {
1962  MutexLockGuard m(lock_options_);
1963 
1964  if (!opt_proxy_groups_) {
1965  return;
1966  }
1967 
1968  // Fail any matching proxies within the current load-balancing group
1969  vector<ProxyInfo> *group = current_proxy_group();
1970  const unsigned group_size = group->size();
1971  unsigned failed = 0;
1972  for (unsigned i = 0; i < group_size - opt_proxy_groups_current_burned_; ++i) {
1973  if (info && (info->proxy == (*group)[i].url)) {
1974  // Move to list of failed proxies
1975  opt_proxy_groups_current_burned_++;
1976  swap((*group)[i],
1977  (*group)[group_size - opt_proxy_groups_current_burned_]);
1978  perf::Inc(counters_->n_proxy_failover);
1979  failed++;
1980  }
1981  }
1982 
1983  // Do nothing more unless at least one proxy was marked as failed
1984  if (!failed)
1985  return;
1986 
1987  // If all proxies from the current load-balancing group are burned, switch to
1988  // another group
1989  if (opt_proxy_groups_current_burned_ == group->size()) {
1990  opt_proxy_groups_current_burned_ = 0;
1991  if (opt_proxy_groups_->size() > 1) {
1992  opt_proxy_groups_current_ = (opt_proxy_groups_current_ + 1) %
1993  opt_proxy_groups_->size();
1994  // Remeber the timestamp of switching to backup proxies
1995  if (opt_proxy_groups_reset_after_ > 0) {
1996  if (opt_proxy_groups_current_ > 0) {
1997  if (opt_timestamp_backup_proxies_ == 0)
1998  opt_timestamp_backup_proxies_ = time(NULL);
1999  // LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2000  // "switched to (another) backup proxy group");
2001  } else {
2002  opt_timestamp_backup_proxies_ = 0;
2003  // LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2004  // "switched back to primary proxy group");
2005  }
2006  opt_timestamp_failover_proxies_ = 0;
2007  }
2008  }
2009  } else {
2010  // Record failover time
2011  if (opt_proxy_groups_reset_after_ > 0) {
2012  if (opt_timestamp_failover_proxies_ == 0)
2013  opt_timestamp_failover_proxies_ = time(NULL);
2014  }
2015  }
2016 
2017  UpdateProxiesUnlocked("failed proxy");
2018  LogCvmfs(kLogDownload, kLogDebug, "%d proxies remain in group",
2019  current_proxy_group()->size() - opt_proxy_groups_current_burned_);
2020 }
2021 
2022 
2028 void DownloadManager::SwitchHost(JobInfo *info) {
2029  MutexLockGuard m(lock_options_);
2030 
2031  if (!opt_host_chain_ || (opt_host_chain_->size() == 1)) {
2032  return;
2033  }
2034 
2035  if (info && (info->current_host_chain_index != opt_host_chain_current_)) {
2037  "don't switch host, "
2038  "last used host: %s, current host: %s",
2039  (*opt_host_chain_)[info->current_host_chain_index].c_str(),
2040  (*opt_host_chain_)[opt_host_chain_current_].c_str());
2041  return;
2042  }
2043 
2044  string reason = "manually triggered";
2045  if (info) {
2046  reason = download::Code2Ascii(info->error_code);
2047  }
2048 
2049  string old_host = (*opt_host_chain_)[opt_host_chain_current_];
2050  opt_host_chain_current_ =
2051  (opt_host_chain_current_ + 1) % opt_host_chain_->size();
2052  perf::Inc(counters_->n_host_failover);
2054  "switching host from %s to %s (%s)", old_host.c_str(),
2055  (*opt_host_chain_)[opt_host_chain_current_].c_str(),
2056  reason.c_str());
2057 
2058  // Remember the timestamp of switching to backup host
2059  if (opt_host_reset_after_ > 0) {
2060  if (opt_host_chain_current_ != 0) {
2061  if (opt_timestamp_backup_host_ == 0)
2062  opt_timestamp_backup_host_ = time(NULL);
2063  } else {
2064  opt_timestamp_backup_host_ = 0;
2065  }
2066  }
2067 }
2068 
2069 void DownloadManager::SwitchHost() {
2070  SwitchHost(NULL);
2071 }
2072 
2073 
2080 void DownloadManager::ProbeHosts() {
2081  vector<string> host_chain;
2082  vector<int> host_rtt;
2083  unsigned current_host;
2084 
2085  GetHostInfo(&host_chain, &host_rtt, &current_host);
2086 
2087  // Stopwatch, two times to fill caches first
2088  unsigned i, retries;
2089  string url;
2090  JobInfo info(&url, false, false, NULL);
2091  for (retries = 0; retries < 2; ++retries) {
2092  for (i = 0; i < host_chain.size(); ++i) {
2093  url = host_chain[i] + "/.cvmfspublished";
2094 
2095  struct timeval tv_start, tv_end;
2096  gettimeofday(&tv_start, NULL);
2097  Failures result = Fetch(&info);
2098  gettimeofday(&tv_end, NULL);
2099  if (info.destination_mem.data)
2100  free(info.destination_mem.data);
2101  if (result == kFailOk) {
2102  host_rtt[i] = static_cast<int>(
2103  DiffTimeSeconds(tv_start, tv_end) * 1000);
2104  LogCvmfs(kLogDownload, kLogDebug, "probing host %s had %dms rtt",
2105  url.c_str(), host_rtt[i]);
2106  } else {
2107  LogCvmfs(kLogDownload, kLogDebug, "error while probing host %s: %d %s",
2108  url.c_str(), result, Code2Ascii(result));
2109  host_rtt[i] = INT_MAX;
2110  }
2111  }
2112  }
2113 
2114  SortTeam(&host_rtt, &host_chain);
2115  for (i = 0; i < host_chain.size(); ++i) {
2116  if (host_rtt[i] == INT_MAX) host_rtt[i] = kProbeDown;
2117  }
2118 
2119  MutexLockGuard m(lock_options_);
2120  delete opt_host_chain_;
2121  delete opt_host_chain_rtt_;
2122  opt_host_chain_ = new vector<string>(host_chain);
2123  opt_host_chain_rtt_ = new vector<int>(host_rtt);
2124  opt_host_chain_current_ = 0;
2125 }
2126 
2127 bool DownloadManager::GeoSortServers(std::vector<std::string> *servers,
2128  std::vector<uint64_t> *output_order) {
2129  if (!servers) {return false;}
2130  if (servers->size() == 1) {
2131  if (output_order) {
2132  output_order->clear();
2133  output_order->push_back(0);
2134  }
2135  return true;
2136  }
2137 
2138  std::vector<std::string> host_chain;
2139  GetHostInfo(&host_chain, NULL, NULL);
2140 
2141  std::vector<std::string> server_dns_names;
2142  server_dns_names.reserve(servers->size());
2143  for (unsigned i = 0; i < servers->size(); ++i) {
2144  std::string host = dns::ExtractHost((*servers)[i]);
2145  server_dns_names.push_back(host.empty() ? (*servers)[i] : host);
2146  }
2147  std::string host_list = JoinStrings(server_dns_names, ",");
2148 
2149  vector<string> host_chain_shuffled;
2150  {
2151  // Protect against concurrent access to prng_
2152  MutexLockGuard m(lock_options_);
2153  // Determine random hosts for the Geo-API query
2154  host_chain_shuffled = Shuffle(host_chain, &prng_);
2155  }
2156  // Request ordered list via Geo-API
2157  bool success = false;
2158  unsigned max_attempts = std::min(host_chain_shuffled.size(), size_t(3));
2159  vector<uint64_t> geo_order(servers->size());
2160  for (unsigned i = 0; i < max_attempts; ++i) {
2161  string url = host_chain_shuffled[i] + "/api/v1.0/geo/@proxy@/" + host_list;
2163  "requesting ordered server list from %s", url.c_str());
2164  JobInfo info(&url, false, false, NULL);
2165  Failures result = Fetch(&info);
2166  if (result == kFailOk) {
2167  string order(info.destination_mem.data, info.destination_mem.size);
2168  free(info.destination_mem.data);
2169  bool retval = ValidateGeoReply(order, servers->size(), &geo_order);
2170  if (!retval) {
2172  "retrieved invalid GeoAPI reply from %s [%s]",
2173  url.c_str(), order.c_str());
2174  } else {
2176  "geographic order of servers retrieved from %s",
2177  dns::ExtractHost(host_chain_shuffled[i]).c_str());
2178  LogCvmfs(kLogDownload, kLogDebug, "order is %s", order.c_str());
2179  success = true;
2180  break;
2181  }
2182  } else {
2184  "GeoAPI request %s failed with error %d [%s]",
2185  url.c_str(), result, Code2Ascii(result));
2186  }
2187  }
2188  if (!success) {
2190  "failed to retrieve geographic order from stratum 1 servers");
2191  return false;
2192  }
2193 
2194  if (output_order) {
2195  output_order->swap(geo_order);
2196  } else {
2197  std::vector<std::string> sorted_servers;
2198  sorted_servers.reserve(geo_order.size());
2199  for (unsigned i = 0; i < geo_order.size(); ++i) {
2200  uint64_t orderval = geo_order[i];
2201  sorted_servers.push_back((*servers)[orderval]);
2202  }
2203  servers->swap(sorted_servers);
2204  }
2205  return true;
2206 }
2207 
2208 
2216 bool DownloadManager::ProbeGeo() {
2217  vector<string> host_chain;
2218  vector<int> host_rtt;
2219  unsigned current_host;
2220  vector< vector<ProxyInfo> > proxy_chain;
2221  unsigned fallback_group;
2222 
2223  GetHostInfo(&host_chain, &host_rtt, &current_host);
2224  GetProxyInfo(&proxy_chain, NULL, &fallback_group);
2225  if ((host_chain.size() < 2) && ((proxy_chain.size() - fallback_group) < 2))
2226  return true;
2227 
2228  vector<string> host_names;
2229  for (unsigned i = 0; i < host_chain.size(); ++i)
2230  host_names.push_back(dns::ExtractHost(host_chain[i]));
2231  SortTeam(&host_names, &host_chain);
2232  unsigned last_geo_host = host_names.size();
2233 
2234  if ((fallback_group == 0) && (last_geo_host > 1)) {
2235  // There are no non-fallback proxies, which means that the client
2236  // will always use the fallback proxies. Add a keyword separator
2237  // between the hosts and fallback proxies so the geosorting service
2238  // will know to sort the hosts based on the distance from the
2239  // closest fallback proxy rather than the distance from the client.
2240  host_names.push_back("+PXYSEP+");
2241  }
2242 
2243  // Add fallback proxy names to the end of the host list
2244  unsigned first_geo_fallback = host_names.size();
2245  for (unsigned i = fallback_group; i < proxy_chain.size(); ++i) {
2246  // We only take the first fallback proxy name from every group under the
2247  // assumption that load-balanced servers are at the same location
2248  host_names.push_back(proxy_chain[i][0].host.name());
2249  }
2250 
2251  std::vector<uint64_t> geo_order;
2252  bool success = GeoSortServers(&host_names, &geo_order);
2253  if (!success) {
2254  // GeoSortServers already logged a failure message.
2255  return false;
2256  }
2257 
2258  // Re-install host chain and proxy chain
2259  MutexLockGuard m(lock_options_);
2260  delete opt_host_chain_;
2261  opt_num_proxies_ = 0;
2262  opt_host_chain_ = new vector<string>(host_chain.size());
2263 
2264  // It's possible that opt_proxy_groups_fallback_ might have changed while
2265  // the lock wasn't held
2266  vector<vector<ProxyInfo> > *proxy_groups = new vector<vector<ProxyInfo> >(
2267  opt_proxy_groups_fallback_ + proxy_chain.size() - fallback_group);
2268  // First copy the non-fallback part of the current proxy chain
2269  for (unsigned i = 0; i < opt_proxy_groups_fallback_; ++i) {
2270  (*proxy_groups)[i] = (*opt_proxy_groups_)[i];
2271  opt_num_proxies_ += (*opt_proxy_groups_)[i].size();
2272  }
2273 
2274  // Copy the host chain and fallback proxies by geo order. Array indices
2275  // in geo_order that are smaller than last_geo_host refer to a stratum 1,
2276  // and those indices greater than or equal to first_geo_fallback refer to
2277  // a fallback proxy.
2278  unsigned hosti = 0;
2279  unsigned proxyi = opt_proxy_groups_fallback_;
2280  for (unsigned i = 0; i < geo_order.size(); ++i) {
2281  uint64_t orderval = geo_order[i];
2282  if (orderval < static_cast<uint64_t>(last_geo_host)) {
2283  // LogCvmfs(kLogCvmfs, kLogSyslog, "this is orderval %u at host index
2284  // %u", orderval, hosti);
2285  (*opt_host_chain_)[hosti++] = host_chain[orderval];
2286  } else if (orderval >= static_cast<uint64_t>(first_geo_fallback)) {
2287  // LogCvmfs(kLogCvmfs, kLogSyslog,
2288  // "this is orderval %u at proxy index %u, using proxy_chain index %u",
2289  // orderval, proxyi, fallback_group + orderval - first_geo_fallback);
2290  (*proxy_groups)[proxyi] =
2291  proxy_chain[fallback_group + orderval - first_geo_fallback];
2292  opt_num_proxies_ += (*proxy_groups)[proxyi].size();
2293  proxyi++;
2294  }
2295  }
2296 
2297  opt_proxy_map_.clear();
2298  delete opt_proxy_groups_;
2299  opt_proxy_groups_ = proxy_groups;
2300  // In pathological cases, opt_proxy_groups_current_ can be larger now when
2301  // proxies changed in-between.
2302  if (opt_proxy_groups_current_ > opt_proxy_groups_->size()) {
2303  if (opt_proxy_groups_->size() == 0) {
2304  opt_proxy_groups_current_ = 0;
2305  } else {
2306  opt_proxy_groups_current_ = opt_proxy_groups_->size() - 1;
2307  }
2308  opt_proxy_groups_current_burned_ = 0;
2309  }
2310 
2311  UpdateProxiesUnlocked("geosort");
2312 
2313  delete opt_host_chain_rtt_;
2314  opt_host_chain_rtt_ = new vector<int>(host_chain.size(), kProbeGeo);
2315  opt_host_chain_current_ = 0;
2316 
2317  return true;
2318 }
2319 
2320 
2328 bool DownloadManager::ValidateGeoReply(
2329  const string &reply_order,
2330  const unsigned expected_size,
2331  vector<uint64_t> *reply_vals)
2332 {
2333  if (reply_order.empty())
2334  return false;
2335  sanitizer::InputSanitizer sanitizer("09 , \n");
2336  if (!sanitizer.IsValid(reply_order))
2337  return false;
2338  sanitizer::InputSanitizer strip_newline("09 ,");
2339  vector<string> reply_strings =
2340  SplitString(strip_newline.Filter(reply_order), ',');
2341  vector<uint64_t> tmp_vals;
2342  for (unsigned i = 0; i < reply_strings.size(); ++i) {
2343  if (reply_strings[i].empty())
2344  return false;
2345  tmp_vals.push_back(String2Uint64(reply_strings[i]));
2346  }
2347  if (tmp_vals.size() != expected_size)
2348  return false;
2349 
2350  // Check if tmp_vals contains the number 1..n
2351  set<uint64_t> coverage(tmp_vals.begin(), tmp_vals.end());
2352  if (coverage.size() != tmp_vals.size())
2353  return false;
2354  if ((*coverage.begin() != 1) || (*coverage.rbegin() != coverage.size()))
2355  return false;
2356 
2357  for (unsigned i = 0; i < expected_size; ++i) {
2358  (*reply_vals)[i] = tmp_vals[i] - 1;
2359  }
2360  return true;
2361 }
2362 
2363 
2368 bool DownloadManager::StripDirect(
2369  const string &proxy_list,
2370  string *cleaned_list)
2371 {
2372  assert(cleaned_list);
2373  if (proxy_list == "") {
2374  *cleaned_list = "";
2375  return false;
2376  }
2377  bool result = false;
2378 
2379  vector<string> proxy_groups = SplitString(proxy_list, ';');
2380  vector<string> cleaned_groups;
2381  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2382  vector<string> group = SplitString(proxy_groups[i], '|');
2383  vector<string> cleaned;
2384  for (unsigned j = 0; j < group.size(); ++j) {
2385  if ((group[j] == "DIRECT") || (group[j] == "")) {
2386  result = true;
2387  } else {
2388  cleaned.push_back(group[j]);
2389  }
2390  }
2391  if (!cleaned.empty())
2392  cleaned_groups.push_back(JoinStrings(cleaned, "|"));
2393  }
2394 
2395  *cleaned_list = JoinStrings(cleaned_groups, ";");
2396  return result;
2397 }
2398 
2399 
2408 void DownloadManager::SetProxyChain(
2409  const string &proxy_list,
2410  const string &fallback_proxy_list,
2411  const ProxySetModes set_mode)
2412 {
2413  MutexLockGuard m(lock_options_);
2414 
2415  opt_timestamp_backup_proxies_ = 0;
2416  opt_timestamp_failover_proxies_ = 0;
2417  string set_proxy_list = opt_proxy_list_;
2418  string set_proxy_fallback_list = opt_proxy_fallback_list_;
2419  bool contains_direct;
2420  if ((set_mode == kSetProxyFallback) || (set_mode == kSetProxyBoth)) {
2421  opt_proxy_fallback_list_ = fallback_proxy_list;
2422  }
2423  if ((set_mode == kSetProxyRegular) || (set_mode == kSetProxyBoth)) {
2424  opt_proxy_list_ = proxy_list;
2425  }
2426  contains_direct =
2427  StripDirect(opt_proxy_fallback_list_, &set_proxy_fallback_list);
2428  if (contains_direct) {
2430  "fallback proxies do not support DIRECT, removing");
2431  }
2432  if (set_proxy_fallback_list == "") {
2433  set_proxy_list = opt_proxy_list_;
2434  } else {
2435  bool contains_direct = StripDirect(opt_proxy_list_, &set_proxy_list);
2436  if (contains_direct) {
2438  "skipping DIRECT proxy to use fallback proxy");
2439  }
2440  }
2441 
2442  // From this point on, use set_proxy_list and set_fallback_proxy_list as
2443  // effective proxy lists!
2444 
2445  opt_proxy_map_.clear();
2446  delete opt_proxy_groups_;
2447  if ((set_proxy_list == "") && (set_proxy_fallback_list == "")) {
2448  opt_proxy_groups_ = NULL;
2449  opt_proxy_groups_current_ = 0;
2450  opt_proxy_groups_current_burned_ = 0;
2451  opt_proxy_groups_fallback_ = 0;
2452  opt_num_proxies_ = 0;
2453  return;
2454  }
2455 
2456  // Determine number of regular proxy groups (== first fallback proxy group)
2457  opt_proxy_groups_fallback_ = 0;
2458  if (set_proxy_list != "") {
2459  opt_proxy_groups_fallback_ = SplitString(set_proxy_list, ';').size();
2460  }
2461  LogCvmfs(kLogDownload, kLogDebug, "first fallback proxy group %u",
2462  opt_proxy_groups_fallback_);
2463 
2464  // Concatenate regular proxies and fallback proxies, both of which can be
2465  // empty.
2466  string all_proxy_list = set_proxy_list;
2467  if (set_proxy_fallback_list != "") {
2468  if (all_proxy_list != "")
2469  all_proxy_list += ";";
2470  all_proxy_list += set_proxy_fallback_list;
2471  }
2472  LogCvmfs(kLogDownload, kLogDebug, "full proxy list %s",
2473  all_proxy_list.c_str());
2474 
2475  // Resolve server names in provided urls
2476  vector<string> hostnames; // All encountered hostnames
2477  vector<string> proxy_groups;
2478  if (all_proxy_list != "")
2479  proxy_groups = SplitString(all_proxy_list, ';');
2480  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2481  vector<string> this_group = SplitString(proxy_groups[i], '|');
2482  for (unsigned j = 0; j < this_group.size(); ++j) {
2483  this_group[j] = dns::AddDefaultScheme(this_group[j]);
2484  // Note: DIRECT strings will be "extracted" to an empty string.
2485  string hostname = dns::ExtractHost(this_group[j]);
2486  // Save the hostname. Leave empty (DIRECT) names so indexes will
2487  // match later.
2488  hostnames.push_back(hostname);
2489  }
2490  }
2491  vector<dns::Host> hosts;
2492  LogCvmfs(kLogDownload, kLogDebug, "resolving %u proxy addresses",
2493  hostnames.size());
2494  resolver_->ResolveMany(hostnames, &hosts);
2495 
2496  // Construct opt_proxy_groups_: traverse proxy list in same order and expand
2497  // names to resolved IP addresses.
2498  opt_proxy_groups_ = new vector< vector<ProxyInfo> >();
2499  opt_num_proxies_ = 0;
2500  unsigned num_proxy = 0; // Combined i, j counter
2501  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2502  vector<string> this_group = SplitString(proxy_groups[i], '|');
2503  // Construct ProxyInfo objects from proxy string and DNS resolver result for
2504  // every proxy in this_group. One URL can result in multiple ProxyInfo
2505  // objects, one for each IP address.
2506  vector<ProxyInfo> infos;
2507  for (unsigned j = 0; j < this_group.size(); ++j, ++num_proxy) {
2508  this_group[j] = dns::AddDefaultScheme(this_group[j]);
2509  if (this_group[j] == "DIRECT") {
2510  infos.push_back(ProxyInfo("DIRECT"));
2511  continue;
2512  }
2513 
2514  if (hosts[num_proxy].status() != dns::kFailOk) {
2516  "failed to resolve IP addresses for %s (%d - %s)",
2517  hosts[num_proxy].name().c_str(), hosts[num_proxy].status(),
2518  dns::Code2Ascii(hosts[num_proxy].status()));
2519  dns::Host failed_host =
2520  dns::Host::ExtendDeadline(hosts[num_proxy], resolver_->min_ttl());
2521  infos.push_back(ProxyInfo(failed_host, this_group[j]));
2522  continue;
2523  }
2524 
2525  // IPv4 addresses have precedence
2526  set<string> best_addresses =
2527  hosts[num_proxy].ViewBestAddresses(opt_ip_preference_);
2528  set<string>::const_iterator iter_ips = best_addresses.begin();
2529  for (; iter_ips != best_addresses.end(); ++iter_ips) {
2530  string url_ip = dns::RewriteUrl(this_group[j], *iter_ips);
2531  infos.push_back(ProxyInfo(hosts[num_proxy], url_ip));
2532  }
2533  }
2534  opt_proxy_groups_->push_back(infos);
2535  opt_num_proxies_ += infos.size();
2536  }
2538  "installed %u proxies in %u load-balance groups",
2539  opt_num_proxies_, opt_proxy_groups_->size());
2540  opt_proxy_groups_current_ = 0;
2541  opt_proxy_groups_current_burned_ = 0;
2542 
2543  // Select random start proxy from the first group.
2544  if (opt_proxy_groups_->size() > 0) {
2545  // Select random start proxy from the first group.
2546  UpdateProxiesUnlocked("set proxies");
2547  }
2548 }
2549 
2550 
2557 void DownloadManager::GetProxyInfo(vector< vector<ProxyInfo> > *proxy_chain,
2558  unsigned *current_group,
2559  unsigned *fallback_group)
2560 {
2561  assert(proxy_chain != NULL);
2562  MutexLockGuard m(lock_options_);
2563 
2564 
2565  if (!opt_proxy_groups_) {
2566  vector< vector<ProxyInfo> > empty_chain;
2567  *proxy_chain = empty_chain;
2568  if (current_group != NULL)
2569  *current_group = 0;
2570  if (fallback_group != NULL)
2571  *fallback_group = 0;
2572  return;
2573  }
2574 
2575  *proxy_chain = *opt_proxy_groups_;
2576  if (current_group != NULL)
2577  *current_group = opt_proxy_groups_current_;
2578  if (fallback_group != NULL)
2579  *fallback_group = opt_proxy_groups_fallback_;
2580 }
2581 
2582 string DownloadManager::GetProxyList() {
2583  return opt_proxy_list_;
2584 }
2585 
2586 string DownloadManager::GetFallbackProxyList() {
2587  return opt_proxy_fallback_list_;
2588 }
2589 
2594 DownloadManager::ChooseProxyUnlocked(const shash::Any *hash) {
2595  if (!opt_proxy_groups_)
2596  return NULL;
2597 
2598  uint32_t key = (hash ? hash->Partial32() : 0);
2599  map<uint32_t, ProxyInfo *>::iterator it = opt_proxy_map_.lower_bound(key);
2600  ProxyInfo *proxy = it->second;
2601 
2602  return proxy;
2603 }
2604 
2608 void DownloadManager::UpdateProxiesUnlocked(const string &reason) {
2609  if (!opt_proxy_groups_)
2610  return;
2611 
2612  // Identify number of non-burned proxies within the current group
2613  vector<ProxyInfo> *group = current_proxy_group();
2614  unsigned num_alive = (group->size() - opt_proxy_groups_current_burned_);
2615  string old_proxy = JoinStrings(opt_proxy_urls_, "|");
2616 
2617  // Rebuild proxy map and URL list
2618  opt_proxy_map_.clear();
2619  opt_proxy_urls_.clear();
2620  const uint32_t max_key = 0xffffffffUL;
2621  if (opt_proxy_shard_) {
2622  // Build a consistent map with multiple entries for each proxy
2623  for (unsigned i = 0; i < num_alive; ++i) {
2624  ProxyInfo *proxy = &(*group)[i];
2625  shash::Any proxy_hash(shash::kSha1);
2626  HashString(proxy->url, &proxy_hash);
2627  Prng prng;
2628  prng.InitSeed(proxy_hash.Partial32());
2629  for (unsigned j = 0; j < kProxyMapScale; ++j) {
2630  const std::pair<uint32_t, ProxyInfo *> entry(prng.Next(max_key), proxy);
2631  opt_proxy_map_.insert(entry);
2632  }
2633  opt_proxy_urls_.push_back(proxy->url);
2634  }
2635  // Ensure lower_bound() finds a value for all keys
2636  ProxyInfo *first_proxy = opt_proxy_map_.begin()->second;
2637  const std::pair<uint32_t, ProxyInfo *> last_entry(max_key, first_proxy);
2638  opt_proxy_map_.insert(last_entry);
2639  } else {
2640  // Build a map with a single entry for one randomly selected proxy
2641  unsigned select = prng_.Next(num_alive);
2642  ProxyInfo *proxy = &(*group)[select];
2643  const std::pair<uint32_t, ProxyInfo *> entry(max_key, proxy);
2644  opt_proxy_map_.insert(entry);
2645  opt_proxy_urls_.push_back(proxy->url);
2646  }
2647  sort(opt_proxy_urls_.begin(), opt_proxy_urls_.end());
2648 
2649  // Report any change in proxy usage
2650  string new_proxy = JoinStrings(opt_proxy_urls_, "|");
2651  if (new_proxy != old_proxy) {
2653  "switching proxy from %s to %s (%s)",
2654  (old_proxy.empty() ? "(none)" : old_proxy.c_str()),
2655  (new_proxy.empty() ? "(none)" : new_proxy.c_str()),
2656  reason.c_str());
2657  }
2658 }
2659 
2663 void DownloadManager::ShardProxies() {
2664  opt_proxy_shard_ = true;
2665  RebalanceProxiesUnlocked("enable sharding");
2666 }
2667 
2672 void DownloadManager::RebalanceProxiesUnlocked(const string &reason) {
2673  if (!opt_proxy_groups_)
2674  return;
2675 
2676  opt_timestamp_failover_proxies_ = 0;
2677  opt_proxy_groups_current_burned_ = 0;
2678  UpdateProxiesUnlocked(reason);
2679 }
2680 
2681 
2682 void DownloadManager::RebalanceProxies() {
2683  MutexLockGuard m(lock_options_);
2684  RebalanceProxiesUnlocked("rebalance");
2685 }
2686 
2687 
2691 void DownloadManager::SwitchProxyGroup() {
2692  MutexLockGuard m(lock_options_);
2693 
2694  if (!opt_proxy_groups_ || (opt_proxy_groups_->size() < 2)) {
2695  return;
2696  }
2697 
2698  opt_proxy_groups_current_ = (opt_proxy_groups_current_ + 1) %
2699  opt_proxy_groups_->size();
2700  opt_timestamp_backup_proxies_ = time(NULL);
2701  RebalanceProxiesUnlocked("switch proxy group");
2702 }
2703 
2704 
2705 void DownloadManager::SetProxyGroupResetDelay(const unsigned seconds) {
2706  MutexLockGuard m(lock_options_);
2707  opt_proxy_groups_reset_after_ = seconds;
2708  if (opt_proxy_groups_reset_after_ == 0) {
2709  opt_timestamp_backup_proxies_ = 0;
2710  opt_timestamp_failover_proxies_ = 0;
2711  }
2712 }
2713 
2714 
2715 void DownloadManager::SetHostResetDelay(const unsigned seconds)
2716 {
2717  MutexLockGuard m(lock_options_);
2718  opt_host_reset_after_ = seconds;
2719  if (opt_host_reset_after_ == 0)
2720  opt_timestamp_backup_host_ = 0;
2721 }
2722 
2723 
2724 void DownloadManager::SetRetryParameters(const unsigned max_retries,
2725  const unsigned backoff_init_ms,
2726  const unsigned backoff_max_ms)
2727 {
2728  MutexLockGuard m(lock_options_);
2729  opt_max_retries_ = max_retries;
2730  opt_backoff_init_ms_ = backoff_init_ms;
2731  opt_backoff_max_ms_ = backoff_max_ms;
2732 }
2733 
2734 
2735 void DownloadManager::SetMaxIpaddrPerProxy(unsigned limit) {
2736  MutexLockGuard m(lock_options_);
2737  resolver_->set_throttle(limit);
2738 }
2739 
2740 
2741 void DownloadManager::SetProxyTemplates(
2742  const std::string &direct,
2743  const std::string &forced)
2744 {
2745  MutexLockGuard m(lock_options_);
2746  proxy_template_direct_ = direct;
2747  proxy_template_forced_ = forced;
2748 }
2749 
2750 
2751 void DownloadManager::EnableInfoHeader() {
2752  enable_info_header_ = true;
2753 }
2754 
2755 
2756 void DownloadManager::EnableRedirects() {
2757  follow_redirects_ = true;
2758 }
2759 
2760 void DownloadManager::UseSystemCertificatePath() {
2761  ssl_certificate_store_.UseSystemCertificatePath();
2762 }
2763 
2768 DownloadManager *DownloadManager::Clone(
2769  const perf::StatisticsTemplate &statistics)
2770 {
2771  DownloadManager *clone = new DownloadManager();
2772  clone->Init(pool_max_handles_, statistics);
2773  if (resolver_) {
2774  clone->SetDnsParameters(resolver_->retries(), resolver_->timeout_ms());
2775  clone->SetDnsTtlLimits(resolver_->min_ttl(), resolver_->max_ttl());
2776  clone->SetMaxIpaddrPerProxy(resolver_->throttle());
2777  }
2778  if (!opt_dns_server_.empty())
2779  clone->SetDnsServer(opt_dns_server_);
2780  clone->opt_timeout_proxy_ = opt_timeout_proxy_;
2781  clone->opt_timeout_direct_ = opt_timeout_direct_;
2782  clone->opt_low_speed_limit_ = opt_low_speed_limit_;
2783  clone->opt_max_retries_ = opt_max_retries_;
2784  clone->opt_backoff_init_ms_ = opt_backoff_init_ms_;
2785  clone->opt_backoff_max_ms_ = opt_backoff_max_ms_;
2786  clone->enable_info_header_ = enable_info_header_;
2787  clone->follow_redirects_ = follow_redirects_;
2788  if (opt_host_chain_) {
2789  clone->opt_host_chain_ = new vector<string>(*opt_host_chain_);
2790  clone->opt_host_chain_rtt_ = new vector<int>(*opt_host_chain_rtt_);
2791  }
2792  CloneProxyConfig(clone);
2793  clone->opt_ip_preference_ = opt_ip_preference_;
2794  clone->proxy_template_direct_ = proxy_template_direct_;
2795  clone->proxy_template_forced_ = proxy_template_forced_;
2796  clone->opt_proxy_groups_reset_after_ = opt_proxy_groups_reset_after_;
2797  clone->opt_host_reset_after_ = opt_host_reset_after_;
2798  clone->credentials_attachment_ = credentials_attachment_;
2799  clone->ssl_certificate_store_ = ssl_certificate_store_;
2800 
2801  return clone;
2802 }
2803 
2804 
2805 void DownloadManager::CloneProxyConfig(DownloadManager *clone) {
2806  clone->opt_proxy_groups_current_ = opt_proxy_groups_current_;
2807  clone->opt_proxy_groups_current_burned_ = opt_proxy_groups_current_burned_;
2808  clone->opt_proxy_groups_fallback_ = opt_proxy_groups_fallback_;
2809  clone->opt_num_proxies_ = opt_num_proxies_;
2810  clone->opt_proxy_shard_ = opt_proxy_shard_;
2811  clone->opt_proxy_list_ = opt_proxy_list_;
2812  clone->opt_proxy_fallback_list_ = opt_proxy_fallback_list_;
2813  if (opt_proxy_groups_ == NULL)
2814  return;
2815 
2816  clone->opt_proxy_groups_ = new vector< vector<ProxyInfo> >(
2817  *opt_proxy_groups_);
2818  clone->UpdateProxiesUnlocked("cloned");
2819 }
2820 
2821 } // namespace download
unsigned opt_timeout_direct_
Definition: download.h:519
unsigned opt_low_speed_limit_
Definition: download.h:520
Destination destination
Definition: download.h:162
#define LogCvmfs(source, mask,...)
Definition: logging.h:20
Definition: prng.h:28
std::vector< T > Shuffle(const std::vector< T > &input, Prng *prng)
Definition: algorithm.h:49
const char * Code2Ascii(const ObjectFetcherFailures::Failures error)
unsigned opt_backoff_init_ms_
Definition: download.h:522
static unsigned EscapeHeader(const string &header, char *escaped_buf, size_t buf_size)
Definition: download.cc:122
unsigned char num_used_hosts
Definition: download.h:292
static bool EscapeUrlChar(char input, char output[3])
Definition: download.cc:72
int64_t Xadd(class Counter *counter, const int64_t delta)
Definition: statistics.h:51
const char * Code2Ascii(const Failures error)
Definition: dns.h:53
int64_t id() const
Definition: dns.h:108
unsigned opt_proxy_groups_current_burned_
Definition: download.h:547
double DiffTimeSeconds(struct timeval start, struct timeval end)
Definition: algorithm.cc:31
unsigned opt_proxy_groups_reset_after_
Definition: download.h:609
void SetUrlOptions(JobInfo *info)
Definition: download.cc:904
StreamStates DecompressZStream2Sink(const void *buf, const int64_t size, z_stream *strm, cvmfs::Sink *sink)
Definition: compression.cc:234
unsigned backoff_ms
Definition: download.h:294
std::string opt_proxy_fallback_list_
Definition: download.h:564
static NormalResolver * Create(const bool ipv4_only, const unsigned retries, const unsigned timeout_ms)
Definition: dns.cc:1259
vector< string > SplitString(const string &str, const char delim, const unsigned max_chunks)
Definition: string.cc:288
unsigned opt_host_reset_after_
Definition: download.h:617
std::string proxy_template_direct_
Definition: download.h:593
#define PANIC(...)
Definition: exception.h:26
string ReplaceAll(const string &haystack, const string &needle, const string &replace_by)
Definition: string.cc:477
string JoinStrings(const vector< string > &strings, const string &joint)
Definition: string.cc:318
void Init(ContextPtr context)
Definition: hash.cc:164
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:248
unsigned opt_proxy_groups_current_
Definition: download.h:542
off_t range_offset
Definition: download.h:175
shash::ContextPtr hash_context
Definition: download.h:285
void DecompressInit(z_stream *strm)
Definition: compression.cc:185
unsigned int current_host_chain_index
Definition: download.h:295
bool DecompressMem2Mem(const void *buf, const int64_t size, void **out_buf, uint64_t *out_size)
Definition: compression.cc:797
std::set< CURL * > * pool_handles_inuse_
Definition: download.h:498
void * cred_data
Definition: download.h:161
StreamStates DecompressZStream2File(const void *buf, const int64_t size, z_stream *strm, FILE *f)
Definition: compression.cc:275
std::string opt_proxy_list_
Definition: download.h:560
const std::string & name() const
Definition: dns.h:118
perf::Counter * sz_transfer_time
Definition: download.h:126
std::vector< std::vector< ProxyInfo > > * opt_proxy_groups_
Definition: download.h:538
assert((mem||(size==0))&&"Out Of Memory")
unsigned opt_proxy_groups_fallback_
Definition: download.h:552
void ReleaseCurlHandle(CURL *handle)
Definition: download.cc:818
StreamStates
Definition: compression.h:36
Algorithms algorithm
Definition: hash.h:124
z_stream zstream
Definition: download.h:284
const std::set< std::string > & ViewBestAddresses(IpPreference preference) const
Definition: dns.cc:205
void SetDnsServer(const std::string &address)
Definition: download.cc:1817
void DecompressFini(z_stream *strm)
Definition: compression.cc:201
void InitSeed(const uint64_t seed)
Definition: prng.h:35
void MakePipe(int pipe_fd[2])
Definition: posix.cc:524
char algorithm
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:181
bool IsValid(const std::string &input) const
Definition: sanitizer.cc:114
Algorithms
Definition: hash.h:40
void SetDnsTtlLimits(const unsigned min_seconds, const unsigned max_seconds)
Definition: download.cc:1853
static Failures PrepareDownloadDestination(JobInfo *info)
Definition: download.cc:152
void Init(const unsigned max_pool_handles, const perf::StatisticsTemplate &statistics)
Definition: download.cc:1601
const char * Code2Ascii(const Failures error)
Definition: download.h:88
void Update(const unsigned char *buffer, const unsigned buffer_length, ContextPtr context)
Definition: hash.cc:190
unsigned char num_retries
Definition: download.h:293
void Final(ContextPtr context, Any *any_digest)
Definition: hash.cc:221
std::string AddDefaultScheme(const std::string &proxy)
Definition: dns.cc:187
static string EscapeUrl(const string &url)
Definition: download.cc:99
dns::IpPreference opt_ip_preference_
Definition: download.h:586
Algorithms algorithm
Definition: hash.h:499
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:286
Definition: dns.h:90
Failures Fetch(const std::string &base_url, const std::string &repository_name, const uint64_t minimum_timestamp, const shash::Any *base_catalog, signature::SignatureManager *signature_manager, download::DownloadManager *download_manager, ManifestEnsemble *ensemble)
void UpdateProxiesUnlocked(const std::string &reason)
Definition: download.cc:2608
bool IsEquivalent(const Host &other) const
Definition: dns.cc:269
bool IsProxyTransferError(const Failures error)
Definition: download.h:76
bool IsExpired() const
Definition: dns.cc:280
FILE * destination_file
Definition: download.h:168
bool follow_redirects
Definition: download.h:156
unsigned GetContextSize(const Algorithms algorithm)
Definition: hash.cc:148
virtual int Reset()=0
perf::Counter * n_requests
Definition: download.h:127
static int Init(const loader::LoaderExports *loader_exports)
Definition: cvmfs.cc:1801
string StringifyInt(const int64_t value)
Definition: string.cc:78
void SetMaxIpaddrPerProxy(unsigned limit)
Definition: download.cc:2735
const shash::Any * expected_hash
Definition: download.h:171
void Inc(class Counter *counter)
Definition: statistics.h:50
void * buffer
Definition: hash.h:500
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:265
const std::string * extra_info
Definition: download.h:172
std::string ExtractHost(const std::string &url)
Definition: dns.cc:110
std::vector< int > * opt_host_chain_rtt_
Definition: download.h:534
cvmfs::Sink * destination_sink
Definition: download.h:170
SslCertificateStore ssl_certificate_store_
Definition: download.h:630
uint32_t Partial32() const
Definition: hash.h:393
IpPreference
Definition: dns.h:46
unsigned opt_backoff_max_ms_
Definition: download.h:523
CURL * curl_handle
Definition: download.h:281
CredentialsAttachment * credentials_attachment_
Definition: download.h:619
std::vector< std::string > * opt_host_chain_
Definition: download.h:529
struct pollfd * watch_fds_
Definition: download.h:510
uint64_t String2Uint64(const string &value)
Definition: string.cc:228
Failures error_code
Definition: download.h:289
static void Fini()
Definition: cvmfs.cc:2007
static void Spawn()
Definition: cvmfs.cc:1922
struct download::JobInfo::@3 destination_mem
Definition: mutex.h:42
std::string Filter(const std::string &input) const
Definition: sanitizer.cc:107
std::string proxy_template_forced_
Definition: download.h:599
const std::string * destination_path
Definition: download.h:169
void SetDnsParameters(const unsigned retries, const unsigned timeout_ms)
Definition: download.cc:1835
static Host ExtendDeadline(const Host &original, unsigned seconds_from_now)
Definition: dns.cc:231
void SafeSleepMs(const unsigned ms)
Definition: posix.cc:1917
bool IsHostTransferError(const Failures error)
Definition: download.h:64
void SortTeam(std::vector< T > *tractor, std::vector< U > *towed)
Definition: algorithm.h:67
curl_slist * headers
Definition: download.h:282
unsigned size
Definition: hash.h:501
unsigned char num_used_proxies
Definition: download.h:291
Failures status() const
Definition: dns.h:119
std::string proxy
Definition: download.h:287
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:533
static void size_t size
Definition: smalloc.h:47
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
Definition: download.cc:1227
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:545
const std::string * url
Definition: download.h:152
void HashString(const std::string &content, Any *any_digest)
Definition: hash.cc:268
void InitializeRequest(JobInfo *info, CURL *handle)
Definition: download.cc:836
string RewriteUrl(const string &url, const string &ip)
Definition: dns.cc:156
uint32_t Next(const uint64_t boundary)
Definition: prng.h:49
virtual int64_t Write(const void *buf, uint64_t sz)=0
char * info_header
Definition: download.h:283