CernVM-FS  2.9.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
download.cc
Go to the documentation of this file.
1 
27 // TODO(jblomer): MS for time summing
28 #define __STDC_FORMAT_MACROS
29 
30 #include "cvmfs_config.h"
31 #include "download.h"
32 
33 #include <alloca.h>
34 #include <errno.h>
35 #include <inttypes.h>
36 #include <poll.h>
37 #include <pthread.h>
38 #include <signal.h>
39 #include <stdint.h>
40 #include <sys/time.h>
41 #include <unistd.h>
42 
43 #include <algorithm>
44 #include <cassert>
45 #include <cstdio>
46 #include <cstdlib>
47 #include <cstring>
48 #include <set>
49 
50 #include "atomic.h"
51 #include "compression.h"
52 #include "duplex_curl.h"
53 #include "hash.h"
54 #include "logging.h"
55 #include "prng.h"
56 #include "sanitizer.h"
57 #include "smalloc.h"
58 #include "util/algorithm.h"
59 #include "util/exception.h"
60 #include "util/posix.h"
61 #include "util/string.h"
62 #include "util_concurrency.h"
63 
64 using namespace std; // NOLINT
65 
66 namespace download {
67 
68 static inline bool EscapeUrlChar(char input, char output[3]) {
69  if (((input >= '0') && (input <= '9')) ||
70  ((input >= 'A') && (input <= 'Z')) ||
71  ((input >= 'a') && (input <= 'z')) ||
72  (input == '/') || (input == ':') || (input == '.') ||
73  (input == '@') ||
74  (input == '+') || (input == '-') ||
75  (input == '_') || (input == '~') ||
76  (input == '[') || (input == ']') || (input == ','))
77  {
78  output[0] = input;
79  return false;
80  }
81 
82  output[0] = '%';
83  output[1] = (input / 16) + ((input / 16 <= 9) ? '0' : 'A'-10);
84  output[2] = (input % 16) + ((input % 16 <= 9) ? '0' : 'A'-10);
85  return true;
86 }
87 
88 
93 static string EscapeUrl(const string &url) {
94  string escaped;
95  escaped.reserve(url.length());
96 
97  char escaped_char[3];
98  for (unsigned i = 0, s = url.length(); i < s; ++i) {
99  if (EscapeUrlChar(url[i], escaped_char))
100  escaped.append(escaped_char, 3);
101  else
102  escaped.push_back(escaped_char[0]);
103  }
104  LogCvmfs(kLogDownload, kLogDebug, "escaped %s to %s",
105  url.c_str(), escaped.c_str());
106 
107  return escaped;
108 }
109 
110 
115 static unsigned EscapeHeader(const string &header,
116  char *escaped_buf,
117  size_t buf_size)
118 {
119  unsigned esc_pos = 0;
120  char escaped_char[3];
121  for (unsigned i = 0, s = header.size(); i < s; ++i) {
122  if (EscapeUrlChar(header[i], escaped_char)) {
123  for (unsigned j = 0; j < 3; ++j) {
124  if (escaped_buf) {
125  if (esc_pos >= buf_size)
126  return esc_pos;
127  escaped_buf[esc_pos] = escaped_char[j];
128  }
129  esc_pos++;
130  }
131  } else {
132  if (escaped_buf) {
133  if (esc_pos >= buf_size)
134  return esc_pos;
135  escaped_buf[esc_pos] = escaped_char[0];
136  }
137  esc_pos++;
138  }
139  }
140 
141  return esc_pos;
142 }
143 
144 
146  info->destination_mem.size = 0;
147  info->destination_mem.pos = 0;
148  info->destination_mem.data = NULL;
149 
150  if (info->destination == kDestinationFile)
151  assert(info->destination_file != NULL);
152 
153  if (info->destination == kDestinationPath) {
154  assert(info->destination_path != NULL);
155  info->destination_file = fopen(info->destination_path->c_str(), "w");
156  if (info->destination_file == NULL) {
157  LogCvmfs(kLogDownload, kLogDebug, "Failed to open path %s: %s"
158  " (errno=%d).",
159  info->destination_path->c_str(), strerror(errno), errno);
160  return kFailLocalIO;
161  }
162  }
163 
164  if (info->destination == kDestinationSink)
165  assert(info->destination_sink != NULL);
166 
167  return kFailOk;
168 }
169 
170 
174 static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb,
175  void *info_link)
176 {
177  const size_t num_bytes = size*nmemb;
178  const string header_line(static_cast<const char *>(ptr), num_bytes);
179  JobInfo *info = static_cast<JobInfo *>(info_link);
180 
181  // LogCvmfs(kLogDownload, kLogDebug, "REMOVE-ME: Header callback with %s",
182  // header_line.c_str());
183 
184  // Check http status codes
185  if (HasPrefix(header_line, "HTTP/1.", false)) {
186  if (header_line.length() < 10)
187  return 0;
188 
189  unsigned i;
190  for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {}
191 
192  // Code is initialized to -1
193  if (header_line.length() > i+2) {
194  info->http_code = DownloadManager::ParseHttpCode(&header_line[i]);
195  }
196 
197  if ((info->http_code / 100) == 2) {
198  return num_bytes;
199  } else if ((info->http_code == 301) ||
200  (info->http_code == 302) ||
201  (info->http_code == 303) ||
202  (info->http_code == 307))
203  {
204  if (!info->follow_redirects) {
205  LogCvmfs(kLogDownload, kLogDebug, "redirect support not enabled: %s",
206  header_line.c_str());
207  info->error_code = kFailHostHttp;
208  return 0;
209  }
210  LogCvmfs(kLogDownload, kLogDebug, "http redirect: %s",
211  header_line.c_str());
212  // libcurl will handle this because of CURLOPT_FOLLOWLOCATION
213  return num_bytes;
214  } else {
215  LogCvmfs(kLogDownload, kLogDebug, "http status error code: %s",
216  header_line.c_str());
217  if ((info->http_code / 100) == 5) {
218  // 5XX returned by host
219  info->error_code = kFailHostHttp;
220  } else if ((info->http_code == 400) || (info->http_code == 404)) {
221  // 400: error from the GeoAPI module
222  // 404: the stratum 1 does not have the newest files
223  info->error_code = kFailHostHttp;
224  } else if (info->http_code == 429) {
225  // 429: rate throttling (we ignore the backoff hint for the time being)
227  } else {
228  info->error_code = (info->proxy == "DIRECT") ? kFailHostHttp :
230  }
231  return 0;
232  }
233  }
234 
235  // Allocate memory for kDestinationMemory
236  if ((info->destination == kDestinationMem) &&
237  HasPrefix(header_line, "CONTENT-LENGTH:", true))
238  {
239  char *tmp = reinterpret_cast<char *>(alloca(num_bytes+1));
240  uint64_t length = 0;
241  sscanf(header_line.c_str(), "%s %" PRIu64, tmp, &length);
242  if (length > 0) {
243  if (length > DownloadManager::kMaxMemSize) {
245  "resource %s too large to store in memory (%" PRIu64 ")",
246  info->url->c_str(), length);
247  info->error_code = kFailTooBig;
248  return 0;
249  }
250  info->destination_mem.data = static_cast<char *>(smalloc(length));
251  } else {
252  // Empty resource
253  info->destination_mem.data = NULL;
254  }
255  info->destination_mem.size = length;
256  } else if (HasPrefix(header_line, "LOCATION:", true)) {
257  // This comes along with redirects
258  LogCvmfs(kLogDownload, kLogDebug, "%s", header_line.c_str());
259  }
260 
261  return num_bytes;
262 }
263 
264 
268 static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb,
269  void *info_link)
270 {
271  const size_t num_bytes = size*nmemb;
272  JobInfo *info = static_cast<JobInfo *>(info_link);
273 
274  // LogCvmfs(kLogDownload, kLogDebug, "Data callback, %d bytes", num_bytes);
275 
276  if (num_bytes == 0)
277  return 0;
278 
279  if (info->expected_hash)
280  shash::Update((unsigned char *)ptr, num_bytes, info->hash_context);
281 
282  if (info->destination == kDestinationSink) {
283  if (info->compressed) {
284  zlib::StreamStates retval =
285  zlib::DecompressZStream2Sink(ptr, num_bytes,
286  &info->zstream, info->destination_sink);
287  if (retval == zlib::kStreamDataError) {
288  LogCvmfs(kLogDownload, kLogSyslogErr, "failed to decompress %s",
289  info->url->c_str());
290  info->error_code = kFailBadData;
291  return 0;
292  } else if (retval == zlib::kStreamIOError) {
294  "decompressing %s, local IO error", info->url->c_str());
295  info->error_code = kFailLocalIO;
296  return 0;
297  }
298  } else {
299  int64_t written = info->destination_sink->Write(ptr, num_bytes);
300  if ((written < 0) || (static_cast<uint64_t>(written) != num_bytes)) {
301  LogCvmfs(kLogDownload, kLogDebug, "Failed to perform write on %s (%"
302  PRId64 ")", info->url->c_str(), written);
303  info->error_code = kFailLocalIO;
304  return 0;
305  }
306  }
307  } else if (info->destination == kDestinationMem) {
308  // Write to memory
309  if (info->destination_mem.pos + num_bytes > info->destination_mem.size) {
310  if (info->destination_mem.size == 0) {
312  "Content-Length was missing or zero, but %zu bytes received",
313  info->destination_mem.pos + num_bytes);
314  } else {
315  LogCvmfs(kLogDownload, kLogDebug, "Callback had too much data: "
316  "start %zu, bytes %zu, expected %zu",
317  info->destination_mem.pos,
318  num_bytes,
319  info->destination_mem.size);
320  }
321  info->error_code = kFailBadData;
322  return 0;
323  }
324  memcpy(info->destination_mem.data + info->destination_mem.pos,
325  ptr, num_bytes);
326  info->destination_mem.pos += num_bytes;
327  } else {
328  // Write to file
329  if (info->compressed) {
330  // LogCvmfs(kLogDownload, kLogDebug, "REMOVE-ME: writing %d bytes for %s",
331  // num_bytes, info->url->c_str());
332  zlib::StreamStates retval =
333  zlib::DecompressZStream2File(ptr, num_bytes,
334  &info->zstream, info->destination_file);
335  if (retval == zlib::kStreamDataError) {
336  LogCvmfs(kLogDownload, kLogSyslogErr, "failed to decompress %s",
337  info->url->c_str());
338  info->error_code = kFailBadData;
339  return 0;
340  } else if (retval == zlib::kStreamIOError) {
342  "decompressing %s, local IO error", info->url->c_str());
343  info->error_code = kFailLocalIO;
344  return 0;
345  }
346  } else {
347  if (fwrite(ptr, 1, num_bytes, info->destination_file) != num_bytes) {
349  "downloading %s, IO failure: %s (errno=%d)",
350  info->url->c_str(), strerror(errno), errno);
351  info->error_code = kFailLocalIO;
352  return 0;
353  }
354  }
355  }
356 
357  return num_bytes;
358 }
359 
360 
361 //------------------------------------------------------------------------------
362 
363 
364 bool JobInfo::IsFileNotFound() {
365  if (HasPrefix(*url, "file://", true /* ignore_case */))
366  return error_code == kFailHostConnection;
367 
368  return http_code == 404;
369 }
370 
371 
372 //------------------------------------------------------------------------------
373 
374 
375 const int DownloadManager::kProbeUnprobed = -1;
376 const int DownloadManager::kProbeDown = -2;
377 const int DownloadManager::kProbeGeo = -3;
378 const unsigned DownloadManager::kMaxMemSize = 1024*1024;
379 
380 
384 int DownloadManager::ParseHttpCode(const char digits[3]) {
385  int result = 0;
386  int factor = 100;
387  for (int i = 0; i < 3; ++i) {
388  if ((digits[i] < '0') || (digits[i] > '9'))
389  return -1;
390  result += (digits[i] - '0') * factor;
391  factor /= 10;
392  }
393  return result;
394 }
395 
396 
400 int DownloadManager::CallbackCurlSocket(CURL *easy,
401  curl_socket_t s,
402  int action,
403  void *userp,
404  void *socketp)
405 {
406  // LogCvmfs(kLogDownload, kLogDebug, "CallbackCurlSocket called with easy "
407  // "handle %p, socket %d, action %d", easy, s, action);
408  DownloadManager *download_mgr = static_cast<DownloadManager *>(userp);
409  if (action == CURL_POLL_NONE)
410  return 0;
411 
412  // Find s in watch_fds_
413  unsigned index;
414  for (index = 0; index < download_mgr->watch_fds_inuse_; ++index) {
415  if (download_mgr->watch_fds_[index].fd == s)
416  break;
417  }
418  // Or create newly
419  if (index == download_mgr->watch_fds_inuse_) {
420  // Extend array if necessary
421  if (download_mgr->watch_fds_inuse_ == download_mgr->watch_fds_size_)
422  {
423  download_mgr->watch_fds_size_ *= 2;
424  download_mgr->watch_fds_ = static_cast<struct pollfd *>(
425  srealloc(download_mgr->watch_fds_,
426  download_mgr->watch_fds_size_*sizeof(struct pollfd)));
427  }
428  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].fd = s;
429  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].events = 0;
430  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].revents = 0;
431  download_mgr->watch_fds_inuse_++;
432  }
433 
434  switch (action) {
435  case CURL_POLL_IN:
436  download_mgr->watch_fds_[index].events = POLLIN | POLLPRI;
437  break;
438  case CURL_POLL_OUT:
439  download_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND;
440  break;
441  case CURL_POLL_INOUT:
442  download_mgr->watch_fds_[index].events =
443  POLLIN | POLLPRI | POLLOUT | POLLWRBAND;
444  break;
445  case CURL_POLL_REMOVE:
446  if (index < download_mgr->watch_fds_inuse_-1)
447  download_mgr->watch_fds_[index] =
448  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_-1];
449  download_mgr->watch_fds_inuse_--;
450  // Shrink array if necessary
451  if ((download_mgr->watch_fds_inuse_ > download_mgr->watch_fds_max_) &&
452  (download_mgr->watch_fds_inuse_ < download_mgr->watch_fds_size_/2))
453  {
454  download_mgr->watch_fds_size_ /= 2;
455  // LogCvmfs(kLogDownload, kLogDebug, "shrinking watch_fds_ (%d)",
456  // watch_fds_size_);
457  download_mgr->watch_fds_ = static_cast<struct pollfd *>(
458  srealloc(download_mgr->watch_fds_,
459  download_mgr->watch_fds_size_*sizeof(struct pollfd)));
460  // LogCvmfs(kLogDownload, kLogDebug, "shrinking watch_fds_ done",
461  // watch_fds_size_);
462  }
463  break;
464  default:
465  break;
466  }
467 
468  return 0;
469 }
470 
471 
475 void *DownloadManager::MainDownload(void *data) {
476  LogCvmfs(kLogDownload, kLogDebug, "download I/O thread started");
477  DownloadManager *download_mgr = static_cast<DownloadManager *>(data);
478 
479  download_mgr->watch_fds_ =
480  static_cast<struct pollfd *>(smalloc(2 * sizeof(struct pollfd)));
481  download_mgr->watch_fds_size_ = 2;
482  download_mgr->watch_fds_[0].fd = download_mgr->pipe_terminate_[0];
483  download_mgr->watch_fds_[0].events = POLLIN | POLLPRI;
484  download_mgr->watch_fds_[0].revents = 0;
485  download_mgr->watch_fds_[1].fd = download_mgr->pipe_jobs_[0];
486  download_mgr->watch_fds_[1].events = POLLIN | POLLPRI;
487  download_mgr->watch_fds_[1].revents = 0;
488  download_mgr->watch_fds_inuse_ = 2;
489 
490  int still_running = 0;
491  struct timeval timeval_start, timeval_stop;
492  gettimeofday(&timeval_start, NULL);
493  while (true) {
494  int timeout;
495  if (still_running) {
496  /* NOTE: The following might degrade the performance for many small files
497  * use case. TODO(jblomer): look into it.
498  // Specify a timeout for polling in ms; this allows us to return
499  // to libcurl once a second so it can look for internal operations
500  // which timed out. libcurl has a more elaborate mechanism
501  // (CURLMOPT_TIMERFUNCTION) that would inform us of the next potential
502  // timeout. TODO(bbockelm) we should switch to that in the future.
503  timeout = 100;
504  */
505  timeout = 1;
506  } else {
507  timeout = -1;
508  gettimeofday(&timeval_stop, NULL);
509  int64_t delta = static_cast<int64_t>(
510  1000 * DiffTimeSeconds(timeval_start, timeval_stop));
511  perf::Xadd(download_mgr->counters_->sz_transfer_time, delta);
512  }
513  int retval = poll(download_mgr->watch_fds_, download_mgr->watch_fds_inuse_,
514  timeout);
515  if (retval < 0) {
516  continue;
517  }
518 
519  // Handle timeout
520  if (retval == 0) {
521  retval = curl_multi_socket_action(download_mgr->curl_multi_,
522  CURL_SOCKET_TIMEOUT,
523  0,
524  &still_running);
525  }
526 
527  // Terminate I/O thread
528  if (download_mgr->watch_fds_[0].revents)
529  break;
530 
531  // New job arrives
532  if (download_mgr->watch_fds_[1].revents) {
533  download_mgr->watch_fds_[1].revents = 0;
534  JobInfo *info;
535  ReadPipe(download_mgr->pipe_jobs_[0], &info, sizeof(info));
536  if (!still_running)
537  gettimeofday(&timeval_start, NULL);
538  CURL *handle = download_mgr->AcquireCurlHandle();
539  download_mgr->InitializeRequest(info, handle);
540  download_mgr->SetUrlOptions(info);
541  curl_multi_add_handle(download_mgr->curl_multi_, handle);
542  retval = curl_multi_socket_action(download_mgr->curl_multi_,
543  CURL_SOCKET_TIMEOUT,
544  0,
545  &still_running);
546  }
547 
548  // Activity on curl sockets
549  // Within this loop the curl_multi_socket_action() may cause socket(s)
550  // to be removed from watch_fds_. If a socket is removed it is replaced
551  // by the socket at the end of the array and the inuse count is decreased.
552  // Therefore loop over the array in reverse order.
553  for (int64_t i = download_mgr->watch_fds_inuse_-1; i >= 2; --i) {
554  if (i >= download_mgr->watch_fds_inuse_) {
555  continue;
556  }
557  if (download_mgr->watch_fds_[i].revents) {
558  int ev_bitmask = 0;
559  if (download_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI))
560  ev_bitmask |= CURL_CSELECT_IN;
561  if (download_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
562  ev_bitmask |= CURL_CSELECT_OUT;
563  if (download_mgr->watch_fds_[i].revents &
564  (POLLERR | POLLHUP | POLLNVAL))
565  {
566  ev_bitmask |= CURL_CSELECT_ERR;
567  }
568  download_mgr->watch_fds_[i].revents = 0;
569 
570  retval = curl_multi_socket_action(download_mgr->curl_multi_,
571  download_mgr->watch_fds_[i].fd,
572  ev_bitmask,
573  &still_running);
574  }
575  }
576 
577  // Check if transfers are completed
578  CURLMsg *curl_msg;
579  int msgs_in_queue;
580  while ((curl_msg = curl_multi_info_read(download_mgr->curl_multi_,
581  &msgs_in_queue)))
582  {
583  if (curl_msg->msg == CURLMSG_DONE) {
584  perf::Inc(download_mgr->counters_->n_requests);
585  JobInfo *info;
586  CURL *easy_handle = curl_msg->easy_handle;
587  int curl_error = curl_msg->data.result;
588  curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
589 
590  curl_multi_remove_handle(download_mgr->curl_multi_, easy_handle);
591  if (download_mgr->VerifyAndFinalize(curl_error, info)) {
592  curl_multi_add_handle(download_mgr->curl_multi_, easy_handle);
593  retval = curl_multi_socket_action(download_mgr->curl_multi_,
594  CURL_SOCKET_TIMEOUT,
595  0,
596  &still_running);
597  } else {
598  // Return easy handle into pool and write result back
599  download_mgr->ReleaseCurlHandle(easy_handle);
600 
601  WritePipe(info->wait_at[1], &info->error_code,
602  sizeof(info->error_code));
603  }
604  }
605  }
606  }
607 
608  for (set<CURL *>::iterator i = download_mgr->pool_handles_inuse_->begin(),
609  iEnd = download_mgr->pool_handles_inuse_->end(); i != iEnd; ++i)
610  {
611  curl_multi_remove_handle(download_mgr->curl_multi_, *i);
612  curl_easy_cleanup(*i);
613  }
614  download_mgr->pool_handles_inuse_->clear();
615  free(download_mgr->watch_fds_);
616 
617  LogCvmfs(kLogDownload, kLogDebug, "download I/O thread terminated");
618  return NULL;
619 }
620 
621 
622 //------------------------------------------------------------------------------
623 
624 
625 HeaderLists::~HeaderLists() {
626  for (unsigned i = 0; i < blocks_.size(); ++i) {
627  delete[] blocks_[i];
628  }
629  blocks_.clear();
630 }
631 
632 
633 curl_slist *HeaderLists::GetList(const char *header) {
634  return Get(header);
635 }
636 
637 
638 curl_slist *HeaderLists::DuplicateList(curl_slist *slist) {
639  assert(slist);
640  curl_slist *copy = GetList(slist->data);
641  copy->next = slist->next;
642  curl_slist *prev = copy;
643  slist = slist->next;
644  while (slist) {
645  curl_slist *new_link = Get(slist->data);
646  new_link->next = slist->next;
647  prev->next = new_link;
648  prev = new_link;
649  slist = slist->next;
650  }
651  return copy;
652 }
653 
654 
655 void HeaderLists::AppendHeader(curl_slist *slist, const char *header) {
656  assert(slist);
657  curl_slist *new_link = Get(header);
658  new_link->next = NULL;
659 
660  while (slist->next)
661  slist = slist->next;
662  slist->next = new_link;
663 }
664 
665 
671 void HeaderLists::CutHeader(const char *header, curl_slist **slist) {
672  assert(slist);
673  curl_slist head;
674  head.next = *slist;
675  curl_slist *prev = &head;
676  curl_slist *rover = *slist;
677  while (rover) {
678  if (strcmp(rover->data, header) == 0) {
679  prev->next = rover->next;
680  Put(rover);
681  rover = prev;
682  }
683  prev = rover;
684  rover = rover->next;
685  }
686  *slist = head.next;
687 }
688 
689 
690 void HeaderLists::PutList(curl_slist *slist) {
691  while (slist) {
692  curl_slist *next = slist->next;
693  Put(slist);
694  slist = next;
695  }
696 }
697 
698 
699 string HeaderLists::Print(curl_slist *slist) {
700  string verbose;
701  while (slist) {
702  verbose += string(slist->data) + "\n";
703  slist = slist->next;
704  }
705  return verbose;
706 }
707 
708 
709 curl_slist *HeaderLists::Get(const char *header) {
710  for (unsigned i = 0; i < blocks_.size(); ++i) {
711  for (unsigned j = 0; j < kBlockSize; ++j) {
712  if (!IsUsed(&(blocks_[i][j]))) {
713  blocks_[i][j].data = const_cast<char *>(header);
714  return &(blocks_[i][j]);
715  }
716  }
717  }
718 
719  // All used, new block
720  AddBlock();
721  blocks_[blocks_.size()-1][0].data = const_cast<char *>(header);
722  return &(blocks_[blocks_.size()-1][0]);
723 }
724 
725 
726 void HeaderLists::Put(curl_slist *slist) {
727  slist->data = NULL;
728  slist->next = NULL;
729 }
730 
731 
732 void HeaderLists::AddBlock() {
733  curl_slist *new_block = new curl_slist[kBlockSize];
734  for (unsigned i = 0; i < kBlockSize; ++i) {
735  Put(&new_block[i]);
736  }
737  blocks_.push_back(new_block);
738 }
739 
740 
741 //------------------------------------------------------------------------------
742 
743 
744 string DownloadManager::ProxyInfo::Print() {
745  if (url == "DIRECT")
746  return url;
747 
748  string result = url;
749  int remaining =
750  static_cast<int>(host.deadline()) - static_cast<int>(time(NULL));
751  string expinfo = (remaining >= 0) ? "+" : "";
752  if (abs(remaining) >= 3600) {
753  expinfo += StringifyInt(remaining/3600) + "h";
754  } else if (abs(remaining) >= 60) {
755  expinfo += StringifyInt(remaining/60) + "m";
756  } else {
757  expinfo += StringifyInt(remaining) + "s";
758  }
759  if (host.status() == dns::kFailOk) {
760  result += " (" + host.name() + ", " + expinfo + ")";
761  } else {
762  result += " (:unresolved:, " + expinfo + ")";
763  }
764  return result;
765 }
766 
767 
772 CURL *DownloadManager::AcquireCurlHandle() {
773  CURL *handle;
774 
775  if (pool_handles_idle_->empty()) {
776  // Create a new handle
777  handle = curl_easy_init();
778  assert(handle != NULL);
779 
780  curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
781  // curl_easy_setopt(curl_default, CURLOPT_FAILONERROR, 1);
782  curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION, CallbackCurlHeader);
783  curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlData);
784  } else {
785  handle = *(pool_handles_idle_->begin());
786  pool_handles_idle_->erase(pool_handles_idle_->begin());
787  }
788 
789  pool_handles_inuse_->insert(handle);
790 
791  return handle;
792 }
793 
794 
795 void DownloadManager::ReleaseCurlHandle(CURL *handle) {
796  set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
797  assert(elem != pool_handles_inuse_->end());
798 
799  if (pool_handles_idle_->size() > pool_max_handles_)
800  curl_easy_cleanup(*elem);
801  else
802  pool_handles_idle_->insert(*elem);
803 
804  pool_handles_inuse_->erase(elem);
805 }
806 
807 
812 void DownloadManager::InitializeRequest(JobInfo *info, CURL *handle) {
813  // Initialize internal download state
814  info->curl_handle = handle;
815  info->error_code = kFailOk;
816  info->http_code = -1;
817  info->follow_redirects = follow_redirects_;
818  info->num_used_proxies = 1;
819  info->num_used_hosts = 1;
820  info->num_retries = 0;
821  info->backoff_ms = 0;
822  info->headers = header_lists_->DuplicateList(default_headers_);
823  if (info->info_header) {
824  header_lists_->AppendHeader(info->headers, info->info_header);
825  }
826  if (info->force_nocache) {
827  SetNocache(info);
828  } else {
829  info->nocache = false;
830  }
831  if (info->compressed) {
832  zlib::DecompressInit(&(info->zstream));
833  }
834  if (info->expected_hash) {
835  assert(info->hash_context.buffer != NULL);
836  shash::Init(info->hash_context);
837  }
838 
839  if ((info->range_offset != -1) && (info->range_size)) {
840  char byte_range_array[100];
841  const int64_t range_lower = static_cast<int64_t>(info->range_offset);
842  const int64_t range_upper = static_cast<int64_t>(
843  info->range_offset + info->range_size - 1);
844  if (snprintf(byte_range_array, sizeof(byte_range_array),
845  "%" PRId64 "-%" PRId64,
846  range_lower, range_upper) == 100)
847  {
848  PANIC(NULL); // Should be impossible given limits on offset size.
849  }
850  curl_easy_setopt(handle, CURLOPT_RANGE, byte_range_array);
851  } else {
852  curl_easy_setopt(handle, CURLOPT_RANGE, NULL);
853  }
854 
855  // Set curl parameters
856  curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
857  curl_easy_setopt(handle, CURLOPT_WRITEHEADER,
858  static_cast<void *>(info));
859  curl_easy_setopt(handle, CURLOPT_WRITEDATA, static_cast<void *>(info));
860  curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->headers);
861  if (info->head_request)
862  curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
863  else
864  curl_easy_setopt(handle, CURLOPT_HTTPGET, 1);
865  if (opt_ipv4_only_)
866  curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
867  if (follow_redirects_) {
868  curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1);
869  curl_easy_setopt(handle, CURLOPT_MAXREDIRS, 4);
870  }
871 }
872 
873 
878 void DownloadManager::SetUrlOptions(JobInfo *info) {
879  CURL *curl_handle = info->curl_handle;
880  string url_prefix;
881 
882  MutexLockGuard m(lock_options_);
883  // Check if proxy group needs to be reset from backup to primary
884  if (opt_timestamp_backup_proxies_ > 0) {
885  const time_t now = time(NULL);
886  if (static_cast<int64_t>(now) >
887  static_cast<int64_t>(opt_timestamp_backup_proxies_ +
888  opt_proxy_groups_reset_after_))
889  {
890  string old_proxy;
891  if (opt_proxy_groups_)
892  old_proxy = (*opt_proxy_groups_)[opt_proxy_groups_current_][0].url;
893 
894  opt_proxy_groups_current_ = 0;
895  RebalanceProxiesUnlocked();
896  opt_timestamp_backup_proxies_ = 0;
897 
898  if (opt_proxy_groups_) {
900  "switching proxy from %s to %s (reset proxy group)",
901  old_proxy.c_str(), (*opt_proxy_groups_)[0][0].url.c_str());
902  }
903  }
904  }
905  // Check if load-balanced proxies within the group need to be reset
906  if (opt_timestamp_failover_proxies_ > 0) {
907  const time_t now = time(NULL);
908  if (static_cast<int64_t>(now) >
909  static_cast<int64_t>(opt_timestamp_failover_proxies_ +
910  opt_proxy_groups_reset_after_))
911  {
912  string old_proxy;
913  if (opt_proxy_groups_)
914  old_proxy = (*opt_proxy_groups_)[opt_proxy_groups_current_][0].url;
915  RebalanceProxiesUnlocked();
916  if (opt_proxy_groups_ && (old_proxy != (*opt_proxy_groups_)[0][0].url)) {
918  "switching proxy from %s to %s (reset load-balanced proxies)",
919  old_proxy.c_str(), (*opt_proxy_groups_)[0][0].url.c_str());
920  }
921  }
922  }
923  // Check if host needs to be reset
924  if (opt_timestamp_backup_host_ > 0) {
925  const time_t now = time(NULL);
926  if (static_cast<int64_t>(now) >
927  static_cast<int64_t>(opt_timestamp_backup_host_ +
928  opt_host_reset_after_))
929  {
931  "switching host from %s to %s (reset host)",
932  (*opt_host_chain_)[opt_host_chain_current_].c_str(),
933  (*opt_host_chain_)[0].c_str());
934  opt_host_chain_current_ = 0;
935  opt_timestamp_backup_host_ = 0;
936  }
937  }
938 
939  if (!opt_proxy_groups_ ||
940  ((*opt_proxy_groups_)[opt_proxy_groups_current_][0].url == "DIRECT"))
941  {
942  info->proxy = "DIRECT";
943  curl_easy_setopt(info->curl_handle, CURLOPT_PROXY, "");
944  } else {
945  ProxyInfo proxy = (*opt_proxy_groups_)[opt_proxy_groups_current_][0];
946  ValidateProxyIpsUnlocked(proxy.url, proxy.host);
947  ProxyInfo *proxy_ptr =
948  &((*opt_proxy_groups_)[opt_proxy_groups_current_][0]);
949  info->proxy = proxy_ptr->url;
950  if (proxy_ptr->host.status() == dns::kFailOk) {
951  curl_easy_setopt(info->curl_handle, CURLOPT_PROXY, info->proxy.c_str());
952  } else {
953  // We know it can't work, don't even try to download
954  curl_easy_setopt(info->curl_handle, CURLOPT_PROXY, "0.0.0.0");
955  }
956  }
957  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, opt_low_speed_limit_);
958  if (info->proxy != "DIRECT") {
959  curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_proxy_);
960  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_proxy_);
961  } else {
962  curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_direct_);
963  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_direct_);
964  }
965  if (!opt_dns_server_.empty())
966  curl_easy_setopt(curl_handle, CURLOPT_DNS_SERVERS, opt_dns_server_.c_str());
967 
968  if (info->probe_hosts && opt_host_chain_) {
969  url_prefix = (*opt_host_chain_)[opt_host_chain_current_];
970  info->current_host_chain_index = opt_host_chain_current_;
971  }
972 
973  string url = url_prefix + *(info->url);
974 
975  curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 1L);
976  if (url.substr(0, 5) == "https") {
977  const char *cadir = getenv("X509_CERT_DIR");
978  if (!cadir || !*cadir) {cadir = "/etc/grid-security/certificates";}
979  curl_easy_setopt(curl_handle, CURLOPT_CAPATH, cadir);
980  const char *cabundle = getenv("X509_CERT_BUNDLE");
981  if (cabundle && *cabundle) {
982  curl_easy_setopt(curl_handle, CURLOPT_CAINFO, cabundle);
983  }
984  if (info->pid != -1) {
985  if (credentials_attachment_ == NULL) {
987  "uses secure downloads but no credentials attachment set");
988  } else {
989  bool retval = credentials_attachment_->ConfigureCurlHandle(
990  curl_handle, info->pid, &info->cred_data);
991  if (!retval) {
992  LogCvmfs(kLogDownload, kLogDebug, "failed attaching credentials");
993  }
994  }
995  }
996  // The download manager disables signal handling in the curl library;
997  // as OpenSSL's implementation of TLS will generate a sigpipe in some
998  // error paths, we must explicitly disable SIGPIPE here.
999  // TODO(jblomer): it should be enough to do this once
1000  signal(SIGPIPE, SIG_IGN);
1001  }
1002 
1003  if (url.find("@proxy@") != string::npos) {
1004  // This is used in Geo-API requests (only), to replace a portion of the
1005  // URL with the current proxy name for the sake of caching the result.
1006  // Replace the @proxy@ either with a passed in "forced" template (which
1007  // is set from $CVMFS_PROXY_TEMPLATE) if there is one, or a "direct"
1008  // template (which is the uuid) if there's no proxy, or the name of the
1009  // proxy.
1010  string replacement;
1011  if (proxy_template_forced_ != "") {
1012  replacement = proxy_template_forced_;
1013  } else if (info->proxy == "DIRECT") {
1014  replacement = proxy_template_direct_;
1015  } else {
1016  if (opt_proxy_groups_current_ >= opt_proxy_groups_fallback_) {
1017  // It doesn't make sense to use the fallback proxies in Geo-API requests
1018  // since the fallback proxies are supposed to get sorted, too.
1019  info->proxy = "DIRECT";
1020  curl_easy_setopt(info->curl_handle, CURLOPT_PROXY, "");
1021  replacement = proxy_template_direct_;
1022  } else {
1023  replacement =
1024  (*opt_proxy_groups_)[opt_proxy_groups_current_][0].host.name();
1025  }
1026  }
1027  replacement = (replacement == "") ? proxy_template_direct_ : replacement;
1028  LogCvmfs(kLogDownload, kLogDebug, "replacing @proxy@ by %s",
1029  replacement.c_str());
1030  url = ReplaceAll(url, "@proxy@", replacement);
1031  }
1032 
1033  if ((info->destination == kDestinationMem) &&
1034  (info->destination_mem.size == 0) &&
1035  HasPrefix(url, "file://", false))
1036  {
1037  info->destination_mem.size = 64*1024;
1038  info->destination_mem.data = static_cast<char *>(smalloc(64*1024));
1039  }
1040 
1041  curl_easy_setopt(curl_handle, CURLOPT_URL, EscapeUrl(url).c_str());
1042 }
1043 
1044 
1052 void DownloadManager::ValidateProxyIpsUnlocked(
1053  const string &url,
1054  const dns::Host &host)
1055 {
1056  if (!host.IsExpired())
1057  return;
1058  LogCvmfs(kLogDownload, kLogDebug, "validate DNS entry for %s",
1059  host.name().c_str());
1060 
1061  unsigned group_idx = opt_proxy_groups_current_;
1062  dns::Host new_host = resolver_->Resolve(host.name());
1063 
1064  bool update_only = true; // No changes to the list of IP addresses.
1065  if (new_host.status() != dns::kFailOk) {
1066  // Try again later in case resolving fails.
1068  "failed to resolve IP addresses for %s (%d - %s)",
1069  host.name().c_str(), new_host.status(),
1070  dns::Code2Ascii(new_host.status()));
1071  new_host = dns::Host::ExtendDeadline(host, resolver_->min_ttl());
1072  } else if (!host.IsEquivalent(new_host)) {
1073  update_only = false;
1074  }
1075 
1076  if (update_only) {
1077  for (unsigned i = 0; i < (*opt_proxy_groups_)[group_idx].size(); ++i) {
1078  if ((*opt_proxy_groups_)[group_idx][i].host.id() == host.id())
1079  (*opt_proxy_groups_)[group_idx][i].host = new_host;
1080  }
1081  return;
1082  }
1083 
1084  assert(new_host.status() == dns::kFailOk);
1085 
1086  // Remove old host objects, insert new objects, and rebalance.
1088  "DNS entries for proxy %s changed, adjusting", host.name().c_str());
1089  vector<ProxyInfo> *group = &((*opt_proxy_groups_)[opt_proxy_groups_current_]);
1090  opt_num_proxies_ -= group->size();
1091  for (unsigned i = 0; i < group->size(); ) {
1092  if ((*group)[i].host.id() == host.id()) {
1093  group->erase(group->begin() + i);
1094  } else {
1095  i++;
1096  }
1097  }
1098  vector<ProxyInfo> new_infos;
1099  set<string> best_addresses = new_host.ViewBestAddresses(opt_ip_preference_);
1100  set<string>::const_iterator iter_ips = best_addresses.begin();
1101  for (; iter_ips != best_addresses.end(); ++iter_ips) {
1102  string url_ip = dns::RewriteUrl(url, *iter_ips);
1103  new_infos.push_back(ProxyInfo(new_host, url_ip));
1104  }
1105  group->insert(group->end(), new_infos.begin(), new_infos.end());
1106  opt_num_proxies_ += new_infos.size();
1107 
1108  RebalanceProxiesUnlocked();
1109 }
1110 
1111 
1115 void DownloadManager::UpdateStatistics(CURL *handle) {
1116  double val;
1117  int retval;
1118  int64_t sum = 0;
1119 
1120  retval = curl_easy_getinfo(handle, CURLINFO_SIZE_DOWNLOAD, &val);
1121  assert(retval == CURLE_OK);
1122  sum += static_cast<int64_t>(val);
1123  /*retval = curl_easy_getinfo(handle, CURLINFO_HEADER_SIZE, &val);
1124  assert(retval == CURLE_OK);
1125  sum += static_cast<int64_t>(val);*/
1126  perf::Xadd(counters_->sz_transferred_bytes, sum);
1127 }
1128 
1129 
1133 bool DownloadManager::CanRetry(const JobInfo *info) {
1134  MutexLockGuard m(lock_options_);
1135  unsigned max_retries = opt_max_retries_;
1136 
1137  return !info->nocache && (info->num_retries < max_retries) &&
1138  (IsProxyTransferError(info->error_code) ||
1140 }
1141 
1149 void DownloadManager::Backoff(JobInfo *info) {
1150  unsigned backoff_init_ms = 0;
1151  unsigned backoff_max_ms = 0;
1152  {
1153  MutexLockGuard m(lock_options_);
1154  backoff_init_ms = opt_backoff_init_ms_;
1155  backoff_max_ms = opt_backoff_max_ms_;
1156  }
1157 
1158  info->num_retries++;
1159  perf::Inc(counters_->n_retries);
1160  if (info->backoff_ms == 0) {
1161  info->backoff_ms = prng_.Next(backoff_init_ms + 1); // Must be != 0
1162  } else {
1163  info->backoff_ms *= 2;
1164  }
1165  if (info->backoff_ms > backoff_max_ms) info->backoff_ms = backoff_max_ms;
1166 
1167  LogCvmfs(kLogDownload, kLogDebug, "backing off for %d ms", info->backoff_ms);
1168  SafeSleepMs(info->backoff_ms);
1169 }
1170 
1171 void DownloadManager::SetNocache(JobInfo *info) {
1172  if (info->nocache)
1173  return;
1174  header_lists_->AppendHeader(info->headers, "Pragma: no-cache");
1175  header_lists_->AppendHeader(info->headers, "Cache-Control: no-cache");
1176  curl_easy_setopt(info->curl_handle, CURLOPT_HTTPHEADER, info->headers);
1177  info->nocache = true;
1178 }
1179 
1180 
1185 void DownloadManager::SetRegularCache(JobInfo *info) {
1186  if (info->nocache == false)
1187  return;
1188  header_lists_->CutHeader("Pragma: no-cache", &(info->headers));
1189  header_lists_->CutHeader("Cache-Control: no-cache", &(info->headers));
1190  curl_easy_setopt(info->curl_handle, CURLOPT_HTTPHEADER, info->headers);
1191  info->nocache = false;
1192 }
1193 
1194 
1198 void DownloadManager::ReleaseCredential(JobInfo *info) {
1199  if (info->cred_data) {
1200  assert(credentials_attachment_ != NULL); // Someone must have set it
1201  credentials_attachment_->ReleaseCurlHandle(info->curl_handle,
1202  info->cred_data);
1203  info->cred_data = NULL;
1204  }
1205 }
1206 
1207 
1214 bool DownloadManager::VerifyAndFinalize(const int curl_error, JobInfo *info) {
1216  "Verify downloaded url %s, proxy %s (curl error %d)",
1217  info->url->c_str(), info->proxy.c_str(), curl_error);
1218  UpdateStatistics(info->curl_handle);
1219 
1220  // Verification and error classification
1221  switch (curl_error) {
1222  case CURLE_OK:
1223  // Verify content hash
1224  if (info->expected_hash) {
1225  shash::Any match_hash;
1226  shash::Final(info->hash_context, &match_hash);
1227  if (match_hash != *(info->expected_hash)) {
1229  "hash verification of %s failed (expected %s, got %s)",
1230  info->url->c_str(), info->expected_hash->ToString().c_str(),
1231  match_hash.ToString().c_str());
1232  info->error_code = kFailBadData;
1233  break;
1234  }
1235  }
1236 
1237  // Decompress memory in a single run
1238  if ((info->destination == kDestinationMem) && info->compressed) {
1239  void *buf;
1240  uint64_t size;
1241  bool retval = zlib::DecompressMem2Mem(info->destination_mem.data,
1242  info->destination_mem.pos,
1243  &buf, &size);
1244  if (retval) {
1245  free(info->destination_mem.data);
1246  info->destination_mem.data = static_cast<char *>(buf);
1247  info->destination_mem.pos = info->destination_mem.size = size;
1248  } else {
1250  "decompression (memory) of url %s failed",
1251  info->url->c_str());
1252  info->error_code = kFailBadData;
1253  break;
1254  }
1255  }
1256 
1257  info->error_code = kFailOk;
1258  break;
1259  case CURLE_UNSUPPORTED_PROTOCOL:
1261  break;
1262  case CURLE_URL_MALFORMAT:
1263  info->error_code = kFailBadUrl;
1264  break;
1265  case CURLE_COULDNT_RESOLVE_PROXY:
1266  info->error_code = kFailProxyResolve;
1267  break;
1268  case CURLE_COULDNT_RESOLVE_HOST:
1269  info->error_code = kFailHostResolve;
1270  break;
1271  case CURLE_OPERATION_TIMEDOUT:
1272  info->error_code = (info->proxy == "DIRECT") ?
1274  break;
1275  case CURLE_PARTIAL_FILE:
1276  case CURLE_GOT_NOTHING:
1277  case CURLE_RECV_ERROR:
1278  info->error_code = (info->proxy == "DIRECT") ?
1280  break;
1281  case CURLE_FILE_COULDNT_READ_FILE:
1282  case CURLE_COULDNT_CONNECT:
1283  if (info->proxy != "DIRECT")
1284  // This is a guess. Fail-over can still change to switching host
1286  else
1288  break;
1289  case CURLE_TOO_MANY_REDIRECTS:
1291  break;
1292  case CURLE_SSL_CACERT_BADFILE:
1294  "Failed to load certificate bundle. "
1295  "X509_CERT_BUNDLE might point to the wrong location.");
1297  break;
1298  // As of curl 7.62.0, CURLE_SSL_CACERT is the same as
1299  // CURLE_PEER_FAILED_VERIFICATION
1300  case CURLE_PEER_FAILED_VERIFICATION:
1302  "invalid SSL certificate of remote host. "
1303  "X509_CERT_DIR and/or X509_CERT_BUNDLE might point to the wrong "
1304  "location.");
1306  break;
1307  case CURLE_ABORTED_BY_CALLBACK:
1308  case CURLE_WRITE_ERROR:
1309  // Error set by callback
1310  break;
1311  default:
1312  LogCvmfs(kLogDownload, kLogSyslogErr, "unexpected curl error (%d) while "
1313  "trying to fetch %s", curl_error, info->url->c_str());
1314  info->error_code = kFailOther;
1315  break;
1316  }
1317 
1318  std::vector<std::string> *host_chain = opt_host_chain_;
1319 
1320  // Determination if download should be repeated
1321  bool try_again = false;
1322  bool same_url_retry = CanRetry(info);
1323  if (info->error_code != kFailOk) {
1324  MutexLockGuard m(lock_options_);
1325  if (info->error_code == kFailBadData) {
1326  if (!info->nocache) {
1327  try_again = true;
1328  } else {
1329  // Make it a host failure
1331  "data corruption with no-cache header, try another host");
1332 
1333  info->error_code = kFailHostHttp;
1334  }
1335  }
1336  if ( same_url_retry || (
1337  ( (info->error_code == kFailHostResolve) ||
1339  (info->error_code == kFailHostHttp)) &&
1340  info->probe_hosts &&
1341  host_chain && (info->num_used_hosts < host_chain->size()))
1342  )
1343  {
1344  try_again = true;
1345  }
1346  if ( same_url_retry || (
1347  ( (info->error_code == kFailProxyResolve) ||
1349  (info->error_code == kFailProxyHttp)) )
1350  )
1351  {
1352  try_again = true;
1353  // If all proxies failed, do a next round with the next host
1354  if (!same_url_retry && (info->num_used_proxies >= opt_num_proxies_)) {
1355  // Check if this can be made a host fail-over
1356  if (info->probe_hosts &&
1357  host_chain &&
1358  (info->num_used_hosts < host_chain->size()))
1359  {
1360  // reset proxy group if not already performed by other handle
1361  if (opt_proxy_groups_) {
1362  if ((opt_proxy_groups_current_ > 0) ||
1363  (opt_proxy_groups_current_burned_ > 1))
1364  {
1365  string old_proxy;
1366  old_proxy =
1367  (*opt_proxy_groups_)[opt_proxy_groups_current_][0].url;
1368  opt_proxy_groups_current_ = 0;
1369  RebalanceProxiesUnlocked();
1370  opt_timestamp_backup_proxies_ = 0;
1371  if (opt_proxy_groups_) {
1373  "switching proxy from %s to %s "
1374  "(reset proxies for host failover)",
1375  old_proxy.c_str(),
1376  (*opt_proxy_groups_)[0][0].url.c_str());
1377  }
1378  }
1379  }
1380 
1381  // Make it a host failure
1382  LogCvmfs(kLogDownload, kLogDebug, "make it a host failure");
1383  info->num_used_proxies = 1;
1385  } else {
1386  try_again = false;
1387  }
1388  } // Make a proxy failure a host failure
1389  } // Proxy failure assumed
1390  }
1391 
1392  if (try_again) {
1393  LogCvmfs(kLogDownload, kLogDebug, "Trying again on same curl handle, "
1394  "same url: %d, error code %d", same_url_retry, info->error_code);
1395  // Reset internal state and destination
1396  if ((info->destination == kDestinationMem) && info->destination_mem.data) {
1397  free(info->destination_mem.data);
1398  info->destination_mem.data = NULL;
1399  info->destination_mem.size = 0;
1400  info->destination_mem.pos = 0;
1401  }
1402  if ((info->destination == kDestinationFile) ||
1403  (info->destination == kDestinationPath))
1404  {
1405  if ((fflush(info->destination_file) != 0) ||
1406  (ftruncate(fileno(info->destination_file), 0) != 0))
1407  {
1408  info->error_code = kFailLocalIO;
1409  goto verify_and_finalize_stop;
1410  }
1411  rewind(info->destination_file);
1412  }
1413  if (info->destination == kDestinationSink) {
1414  if (info->destination_sink->Reset() != 0) {
1415  info->error_code = kFailLocalIO;
1416  goto verify_and_finalize_stop;
1417  }
1418  }
1419  if (info->expected_hash)
1420  shash::Init(info->hash_context);
1421  if (info->compressed)
1422  zlib::DecompressInit(&info->zstream);
1423  SetRegularCache(info);
1424 
1425  // Failure handling
1426  bool switch_proxy = false;
1427  bool switch_host = false;
1428  switch (info->error_code) {
1429  case kFailBadData:
1430  SetNocache(info);
1431  break;
1432  case kFailProxyResolve:
1433  case kFailProxyHttp:
1434  switch_proxy = true;
1435  break;
1436  case kFailHostResolve:
1437  case kFailHostHttp:
1438  case kFailHostAfterProxy:
1439  switch_host = true;
1440  break;
1441  default:
1442  if (IsProxyTransferError(info->error_code)) {
1443  if (same_url_retry)
1444  Backoff(info);
1445  else
1446  switch_proxy = true;
1447  } else if (IsHostTransferError(info->error_code)) {
1448  if (same_url_retry)
1449  Backoff(info);
1450  else
1451  switch_host = true;
1452  } else {
1453  // No other errors expected when retrying
1454  PANIC(NULL);
1455  }
1456  }
1457  if (switch_proxy) {
1458  ReleaseCredential(info);
1459  SwitchProxy(info);
1460  info->num_used_proxies++;
1461  SetUrlOptions(info);
1462  }
1463  if (switch_host) {
1464  ReleaseCredential(info);
1465  SwitchHost(info);
1466  info->num_used_hosts++;
1467  SetUrlOptions(info);
1468  }
1469 
1470  return true; // try again
1471  }
1472 
1473  verify_and_finalize_stop:
1474  // Finalize, flush destination file
1475  ReleaseCredential(info);
1476  if ((info->destination == kDestinationFile) &&
1477  fflush(info->destination_file) != 0)
1478  {
1479  info->error_code = kFailLocalIO;
1480  } else if (info->destination == kDestinationPath) {
1481  if (fclose(info->destination_file) != 0)
1482  info->error_code = kFailLocalIO;
1483  info->destination_file = NULL;
1484  }
1485 
1486  if (info->compressed)
1487  zlib::DecompressFini(&info->zstream);
1488 
1489  if (info->headers) {
1490  header_lists_->PutList(info->headers);
1491  info->headers = NULL;
1492  }
1493 
1494  return false; // stop transfer and return to Fetch()
1495 }
1496 
1497 
1498 DownloadManager::DownloadManager() {
1499  pool_handles_idle_ = NULL;
1500  pool_handles_inuse_ = NULL;
1501  pool_max_handles_ = 0;
1502  curl_multi_ = NULL;
1503  default_headers_ = NULL;
1504 
1505  atomic_init32(&multi_threaded_);
1506  pipe_terminate_[0] = pipe_terminate_[1] = -1;
1507 
1508  pipe_jobs_[0] = pipe_jobs_[1] = -1;
1509  watch_fds_ = NULL;
1510  watch_fds_size_ = 0;
1511  watch_fds_inuse_ = 0;
1512  watch_fds_max_ = 0;
1513 
1514  lock_options_ =
1515  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1516  int retval = pthread_mutex_init(lock_options_, NULL);
1517  assert(retval == 0);
1518  lock_synchronous_mode_ =
1519  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1520  retval = pthread_mutex_init(lock_synchronous_mode_, NULL);
1521  assert(retval == 0);
1522 
1523  opt_dns_server_ = "";
1524  opt_ip_preference_ = dns::kIpPreferSystem;
1525  opt_timeout_proxy_ = 0;
1526  opt_timeout_direct_ = 0;
1527  opt_low_speed_limit_ = 0;
1528  opt_host_chain_ = NULL;
1529  opt_host_chain_rtt_ = NULL;
1530  opt_host_chain_current_ = 0;
1531  opt_proxy_groups_ = NULL;
1532  opt_proxy_groups_current_ = 0;
1533  opt_proxy_groups_current_burned_ = 0;
1534  opt_num_proxies_ = 0;
1535  opt_max_retries_ = 0;
1536  opt_backoff_init_ms_ = 0;
1537  opt_backoff_max_ms_ = 0;
1538  enable_info_header_ = false;
1539  opt_ipv4_only_ = false;
1540  follow_redirects_ = false;
1541  use_system_proxy_ = false;
1542 
1543  resolver_ = NULL;
1544 
1545  opt_timestamp_backup_proxies_ = 0;
1546  opt_timestamp_failover_proxies_ = 0;
1547  opt_proxy_groups_reset_after_ = 0;
1548  opt_timestamp_backup_host_ = 0;
1549  opt_host_reset_after_ = 0;
1550 
1551  credentials_attachment_ = NULL;
1552 
1553  counters_ = NULL;
1554 }
1555 
1556 
1557 DownloadManager::~DownloadManager() {
1558  pthread_mutex_destroy(lock_options_);
1559  pthread_mutex_destroy(lock_synchronous_mode_);
1560  free(lock_options_);
1561  free(lock_synchronous_mode_);
1562 }
1563 
1564 void DownloadManager::InitHeaders() {
1565  // User-Agent
1566  string cernvm_id = "User-Agent: cvmfs ";
1567 #ifdef CVMFS_LIBCVMFS
1568  cernvm_id += "libcvmfs ";
1569 #else
1570  cernvm_id += "Fuse ";
1571 #endif
1572  cernvm_id += string(VERSION);
1573  if (getenv("CERNVM_UUID") != NULL) {
1574  cernvm_id += " " +
1575  sanitizer::InputSanitizer("az AZ 09 -").Filter(getenv("CERNVM_UUID"));
1576  }
1577  user_agent_ = strdup(cernvm_id.c_str());
1578 
1579  header_lists_ = new HeaderLists();
1580 
1581  default_headers_ = header_lists_->GetList("Connection: Keep-Alive");
1582  header_lists_->AppendHeader(default_headers_, "Pragma:");
1583  header_lists_->AppendHeader(default_headers_, user_agent_);
1584 }
1585 
1586 
1587 void DownloadManager::FiniHeaders() {
1588  delete header_lists_;
1589  header_lists_ = NULL;
1590  default_headers_ = NULL;
1591 }
1592 
1593 
1594 void DownloadManager::Init(const unsigned max_pool_handles,
1595  const bool use_system_proxy,
1596  perf::StatisticsTemplate statistics)
1597 {
1598  atomic_init32(&multi_threaded_);
1599  int retval = curl_global_init(CURL_GLOBAL_ALL);
1600  assert(retval == CURLE_OK);
1601  pool_handles_idle_ = new set<CURL *>;
1602  pool_handles_inuse_ = new set<CURL *>;
1603  pool_max_handles_ = max_pool_handles;
1604  watch_fds_max_ = 4*pool_max_handles_;
1605 
1606  opt_timeout_proxy_ = 5;
1607  opt_timeout_direct_ = 10;
1608  opt_low_speed_limit_ = 1024;
1609  opt_proxy_groups_current_ = 0;
1610  opt_proxy_groups_current_burned_ = 0;
1611  opt_num_proxies_ = 0;
1612  opt_host_chain_current_ = 0;
1613  opt_ip_preference_ = dns::kIpPreferSystem;
1614 
1615  counters_ = new Counters(statistics);
1616 
1617  user_agent_ = NULL;
1618  InitHeaders();
1619 
1620  curl_multi_ = curl_multi_init();
1621  assert(curl_multi_ != NULL);
1622  curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION, CallbackCurlSocket);
1623  curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1624  static_cast<void *>(this));
1625  curl_multi_setopt(curl_multi_, CURLMOPT_MAXCONNECTS, watch_fds_max_);
1626  curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1627  pool_max_handles_);
1628 
1629  prng_.InitLocaltime();
1630 
1631  // Name resolving
1632  if ((getenv("CVMFS_IPV4_ONLY") != NULL) &&
1633  (strlen(getenv("CVMFS_IPV4_ONLY")) > 0))
1634  {
1635  opt_ipv4_only_ = true;
1636  }
1637  resolver_ = dns::NormalResolver::Create(opt_ipv4_only_,
1638  kDnsDefaultRetries, kDnsDefaultTimeoutMs);
1639  assert(resolver_);
1640 
1641  // Parsing environment variables
1642  if (use_system_proxy) {
1643  use_system_proxy_ = true;
1644  if (getenv("http_proxy") == NULL) {
1645  SetProxyChain("", "", kSetProxyRegular);
1646  } else {
1647  SetProxyChain(string(getenv("http_proxy")), "", kSetProxyRegular);
1648  }
1649  }
1650 }
1651 
1652 
1654  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1655  // Shutdown I/O thread
1656  char buf = 'T';
1657  WritePipe(pipe_terminate_[1], &buf, 1);
1658  pthread_join(thread_download_, NULL);
1659  // All handles are removed from the multi stack
1660  close(pipe_terminate_[1]);
1661  close(pipe_terminate_[0]);
1662  close(pipe_jobs_[1]);
1663  close(pipe_jobs_[0]);
1664  }
1665 
1666  for (set<CURL *>::iterator i = pool_handles_idle_->begin(),
1667  iEnd = pool_handles_idle_->end(); i != iEnd; ++i)
1668  {
1669  curl_easy_cleanup(*i);
1670  }
1671  delete pool_handles_idle_;
1672  delete pool_handles_inuse_;
1673  curl_multi_cleanup(curl_multi_);
1674  pool_handles_idle_ = NULL;
1675  pool_handles_inuse_ = NULL;
1676  curl_multi_ = NULL;
1677 
1678  FiniHeaders();
1679  if (user_agent_)
1680  free(user_agent_);
1681  user_agent_ = NULL;
1682 
1683  delete counters_;
1684  counters_ = NULL;
1685 
1686  delete opt_host_chain_;
1687  delete opt_host_chain_rtt_;
1688  delete opt_proxy_groups_;
1689  opt_host_chain_ = NULL;
1690  opt_host_chain_rtt_ = NULL;
1691  opt_proxy_groups_ = NULL;
1692 
1693  curl_global_cleanup();
1694 
1695  delete resolver_;
1696  resolver_ = NULL;
1697 }
1698 
1699 
1705  MakePipe(pipe_terminate_);
1706  MakePipe(pipe_jobs_);
1707 
1708  int retval = pthread_create(&thread_download_, NULL, MainDownload,
1709  static_cast<void *>(this));
1710  assert(retval == 0);
1711 
1712  atomic_inc32(&multi_threaded_);
1713 }
1714 
1715 
1720  assert(info != NULL);
1721  assert(info->url != NULL);
1722 
1723  Failures result;
1724  result = PrepareDownloadDestination(info);
1725  if (result != kFailOk)
1726  return result;
1727 
1728  if (info->expected_hash) {
1731  info->hash_context.size = shash::GetContextSize(algorithm);
1732  info->hash_context.buffer = alloca(info->hash_context.size);
1733  }
1734 
1735  // Prepare cvmfs-info: header, allocate string on the stack
1736  info->info_header = NULL;
1737  if (enable_info_header_ && info->extra_info) {
1738  const char *header_name = "cvmfs-info: ";
1739  const size_t header_name_len = strlen(header_name);
1740  const unsigned header_size = 1 + header_name_len +
1741  EscapeHeader(*(info->extra_info), NULL, 0);
1742  info->info_header = static_cast<char *>(alloca(header_size));
1743  memcpy(info->info_header, header_name, header_name_len);
1744  EscapeHeader(*(info->extra_info), info->info_header + header_name_len,
1745  header_size - header_name_len);
1746  info->info_header[header_size-1] = '\0';
1747  }
1748 
1749  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1750  if (info->wait_at[0] == -1) {
1751  MakePipe(info->wait_at);
1752  }
1753 
1754  // LogCvmfs(kLogDownload, kLogDebug, "send job to thread, pipe %d %d",
1755  // info->wait_at[0], info->wait_at[1]);
1756  WritePipe(pipe_jobs_[1], &info, sizeof(info));
1757  ReadPipe(info->wait_at[0], &result, sizeof(result));
1758  // LogCvmfs(kLogDownload, kLogDebug, "got result %d", result);
1759  } else {
1760  MutexLockGuard l(lock_synchronous_mode_);
1761  CURL *handle = AcquireCurlHandle();
1762  InitializeRequest(info, handle);
1763  SetUrlOptions(info);
1764  // curl_easy_setopt(handle, CURLOPT_VERBOSE, 1);
1765  int retval;
1766  do {
1767  retval = curl_easy_perform(handle);
1768  perf::Inc(counters_->n_requests);
1769  double elapsed;
1770  if (curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &elapsed) == CURLE_OK)
1771  perf::Xadd(counters_->sz_transfer_time, (int64_t)(elapsed * 1000));
1772  } while (VerifyAndFinalize(retval, info));
1773  result = info->error_code;
1774  ReleaseCurlHandle(info->curl_handle);
1775  }
1776 
1777  if (result != kFailOk) {
1778  LogCvmfs(kLogDownload, kLogDebug, "download failed (error %d - %s)", result,
1779  Code2Ascii(result));
1780 
1781  if (info->destination == kDestinationPath)
1782  unlink(info->destination_path->c_str());
1783 
1784  if (info->destination_mem.data) {
1785  free(info->destination_mem.data);
1786  info->destination_mem.data = NULL;
1787  info->destination_mem.size = 0;
1788  }
1789  }
1790 
1791  return result;
1792 }
1793 
1794 
1799 void DownloadManager::SetCredentialsAttachment(CredentialsAttachment *ca) {
1800  MutexLockGuard m(lock_options_);
1801  credentials_attachment_ = ca;
1802 }
1803 
1807 std::string DownloadManager::GetDnsServer() const {
1808  return opt_dns_server_;
1809 }
1810 
1815 void DownloadManager::SetDnsServer(const string &address) {
1816  if (!address.empty()) {
1817  MutexLockGuard m(lock_options_);
1818  opt_dns_server_ = address;
1819  assert(!opt_dns_server_.empty());
1820 
1821  vector<string> servers;
1822  servers.push_back(address);
1823  bool retval = resolver_->SetResolvers(servers);
1824  assert(retval);
1825  }
1826  LogCvmfs(kLogDownload, kLogSyslog, "set nameserver to %s", address.c_str());
1827 }
1828 
1829 
1833 void DownloadManager::SetDnsParameters(
1834  const unsigned retries,
1835  const unsigned timeout_ms)
1836 {
1837  MutexLockGuard m(lock_options_);
1838  if ((resolver_->retries() == retries) &&
1839  (resolver_->timeout_ms() == timeout_ms))
1840  {
1841  return;
1842  }
1843  delete resolver_;
1844  resolver_ = NULL;
1845  resolver_ =
1846  dns::NormalResolver::Create(opt_ipv4_only_, retries, timeout_ms);
1847  assert(resolver_);
1848 }
1849 
1850 
1851 void DownloadManager::SetDnsTtlLimits(
1852  const unsigned min_seconds,
1853  const unsigned max_seconds)
1854 {
1855  MutexLockGuard m(lock_options_);
1856  resolver_->set_min_ttl(min_seconds);
1857  resolver_->set_max_ttl(max_seconds);
1858 }
1859 
1860 
1861 void DownloadManager::SetIpPreference(dns::IpPreference preference) {
1862  MutexLockGuard m(lock_options_);
1863  opt_ip_preference_ = preference;
1864 }
1865 
1866 
1872 void DownloadManager::SetTimeout(const unsigned seconds_proxy,
1873  const unsigned seconds_direct)
1874 {
1875  MutexLockGuard m(lock_options_);
1876  opt_timeout_proxy_ = seconds_proxy;
1877  opt_timeout_direct_ = seconds_direct;
1878 }
1879 
1880 
1886 void DownloadManager::SetLowSpeedLimit(const unsigned low_speed_limit) {
1887  MutexLockGuard m(lock_options_);
1888  opt_low_speed_limit_ = low_speed_limit;
1889 }
1890 
1891 
1895 void DownloadManager::GetTimeout(unsigned *seconds_proxy,
1896  unsigned *seconds_direct)
1897 {
1898  MutexLockGuard m(lock_options_);
1899  *seconds_proxy = opt_timeout_proxy_;
1900  *seconds_direct = opt_timeout_direct_;
1901 }
1902 
1903 
1908 void DownloadManager::SetHostChain(const string &host_list) {
1909  SetHostChain(SplitString(host_list, ';'));
1910 }
1911 
1912 
1913 void DownloadManager::SetHostChain(const std::vector<std::string> &host_list) {
1914  MutexLockGuard m(lock_options_);
1915  opt_timestamp_backup_host_ = 0;
1916  delete opt_host_chain_;
1917  delete opt_host_chain_rtt_;
1918  opt_host_chain_current_ = 0;
1919 
1920  if (host_list.empty()) {
1921  opt_host_chain_ = NULL;
1922  opt_host_chain_rtt_ = NULL;
1923  return;
1924  }
1925 
1926  opt_host_chain_ = new vector<string>(host_list);
1927  opt_host_chain_rtt_ =
1928  new vector<int>(opt_host_chain_->size(), kProbeUnprobed);
1929  // LogCvmfs(kLogDownload, kLogSyslog, "using host %s",
1930  // (*opt_host_chain_)[0].c_str());
1931 }
1932 
1933 
1934 
1939 void DownloadManager::GetHostInfo(vector<string> *host_chain, vector<int> *rtt,
1940  unsigned *current_host)
1941 {
1942  MutexLockGuard m(lock_options_);
1943  if (opt_host_chain_) {
1944  if (current_host) {*current_host = opt_host_chain_current_;}
1945  if (host_chain) {*host_chain = *opt_host_chain_;}
1946  if (rtt) {*rtt = *opt_host_chain_rtt_;}
1947  }
1948 }
1949 
1950 
1958 void DownloadManager::SwitchProxy(JobInfo *info) {
1959  MutexLockGuard m(lock_options_);
1960 
1961  if (!opt_proxy_groups_) {
1962  return;
1963  }
1964  if (info &&
1965  ((*opt_proxy_groups_)[opt_proxy_groups_current_][0].url != info->proxy))
1966  {
1967  return;
1968  }
1969 
1970  perf::Inc(counters_->n_proxy_failover);
1971  string old_proxy = (*opt_proxy_groups_)[opt_proxy_groups_current_][0].url;
1972 
1973  // If all proxies from the current load-balancing group are burned, switch to
1974  // another group
1975  if (opt_proxy_groups_current_burned_ ==
1976  (*opt_proxy_groups_)[opt_proxy_groups_current_].size())
1977  {
1978  opt_proxy_groups_current_burned_ = 0;
1979  if (opt_proxy_groups_->size() > 1) {
1980  opt_proxy_groups_current_ = (opt_proxy_groups_current_ + 1) %
1981  opt_proxy_groups_->size();
1982  // Remeber the timestamp of switching to backup proxies
1983  if (opt_proxy_groups_reset_after_ > 0) {
1984  if (opt_proxy_groups_current_ > 0) {
1985  if (opt_timestamp_backup_proxies_ == 0)
1986  opt_timestamp_backup_proxies_ = time(NULL);
1987  // LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
1988  // "switched to (another) backup proxy group");
1989  } else {
1990  opt_timestamp_backup_proxies_ = 0;
1991  // LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
1992  // "switched back to primary proxy group");
1993  }
1994  opt_timestamp_failover_proxies_ = 0;
1995  }
1996  }
1997  } else {
1998  // failover within the same group
1999  if (opt_proxy_groups_reset_after_ > 0) {
2000  if (opt_timestamp_failover_proxies_ == 0)
2001  opt_timestamp_failover_proxies_ = time(NULL);
2002  }
2003  }
2004 
2005  vector<ProxyInfo> *group = &((*opt_proxy_groups_)[opt_proxy_groups_current_]);
2006  const unsigned group_size = group->size();
2007 
2008  // Move active proxy to the back
2009  if (opt_proxy_groups_current_burned_) {
2010  const ProxyInfo swap = (*group)[0];
2011  (*group)[0] = (*group)[group_size - opt_proxy_groups_current_burned_];
2012  (*group)[group_size - opt_proxy_groups_current_burned_] = swap;
2013  }
2014  opt_proxy_groups_current_burned_++;
2015 
2016  // Select new one
2017  if ((group_size - opt_proxy_groups_current_burned_) > 0) {
2018  int select = prng_.Next(group_size - opt_proxy_groups_current_burned_ + 1);
2019 
2020  // Move selected proxy to front
2021  const ProxyInfo swap = (*group)[select];
2022  (*group)[select] = (*group)[0];
2023  (*group)[0] = swap;
2024  }
2025 
2027  "switching proxy from %s to %s",
2028  old_proxy.c_str(), (*group)[0].url.c_str());
2029  LogCvmfs(kLogDownload, kLogDebug, "%d proxies remain in group",
2030  group_size - opt_proxy_groups_current_burned_);
2031 }
2032 
2033 
2039 void DownloadManager::SwitchHost(JobInfo *info) {
2040  MutexLockGuard m(lock_options_);
2041 
2042  if (!opt_host_chain_ || (opt_host_chain_->size() == 1)) {
2043  return;
2044  }
2045 
2046  if (info && (info->current_host_chain_index != opt_host_chain_current_)) {
2048  "don't switch host, "
2049  "last used host: %s, current host: %s",
2050  (*opt_host_chain_)[info->current_host_chain_index].c_str(),
2051  (*opt_host_chain_)[opt_host_chain_current_].c_str());
2052  return;
2053  }
2054 
2055  string reason = "manually triggered";
2056  if (info) {
2057  reason = download::Code2Ascii(info->error_code);
2058  }
2059 
2060  string old_host = (*opt_host_chain_)[opt_host_chain_current_];
2061  opt_host_chain_current_ =
2062  (opt_host_chain_current_ + 1) % opt_host_chain_->size();
2063  perf::Inc(counters_->n_host_failover);
2065  "switching host from %s to %s (%s)", old_host.c_str(),
2066  (*opt_host_chain_)[opt_host_chain_current_].c_str(),
2067  reason.c_str());
2068 
2069  // Remember the timestamp of switching to backup host
2070  if (opt_host_reset_after_ > 0) {
2071  if (opt_host_chain_current_ != 0) {
2072  if (opt_timestamp_backup_host_ == 0)
2073  opt_timestamp_backup_host_ = time(NULL);
2074  } else {
2075  opt_timestamp_backup_host_ = 0;
2076  }
2077  }
2078 }
2079 
2080 void DownloadManager::SwitchHost() {
2081  SwitchHost(NULL);
2082 }
2083 
2084 
2091 void DownloadManager::ProbeHosts() {
2092  vector<string> host_chain;
2093  vector<int> host_rtt;
2094  unsigned current_host;
2095 
2096  GetHostInfo(&host_chain, &host_rtt, &current_host);
2097 
2098  // Stopwatch, two times to fill caches first
2099  unsigned i, retries;
2100  string url;
2101  JobInfo info(&url, false, false, NULL);
2102  for (retries = 0; retries < 2; ++retries) {
2103  for (i = 0; i < host_chain.size(); ++i) {
2104  url = host_chain[i] + "/.cvmfspublished";
2105 
2106  struct timeval tv_start, tv_end;
2107  gettimeofday(&tv_start, NULL);
2108  Failures result = Fetch(&info);
2109  gettimeofday(&tv_end, NULL);
2110  if (info.destination_mem.data)
2111  free(info.destination_mem.data);
2112  if (result == kFailOk) {
2113  host_rtt[i] = static_cast<int>(
2114  DiffTimeSeconds(tv_start, tv_end) * 1000);
2115  LogCvmfs(kLogDownload, kLogDebug, "probing host %s had %dms rtt",
2116  url.c_str(), host_rtt[i]);
2117  } else {
2118  LogCvmfs(kLogDownload, kLogDebug, "error while probing host %s: %d %s",
2119  url.c_str(), result, Code2Ascii(result));
2120  host_rtt[i] = INT_MAX;
2121  }
2122  }
2123  }
2124 
2125  SortTeam(&host_rtt, &host_chain);
2126  for (i = 0; i < host_chain.size(); ++i) {
2127  if (host_rtt[i] == INT_MAX) host_rtt[i] = kProbeDown;
2128  }
2129 
2130  MutexLockGuard m(lock_options_);
2131  delete opt_host_chain_;
2132  delete opt_host_chain_rtt_;
2133  opt_host_chain_ = new vector<string>(host_chain);
2134  opt_host_chain_rtt_ = new vector<int>(host_rtt);
2135  opt_host_chain_current_ = 0;
2136 }
2137 
2138 bool DownloadManager::GeoSortServers(std::vector<std::string> *servers,
2139  std::vector<uint64_t> *output_order) {
2140  if (!servers) {return false;}
2141  if (servers->size() == 1) {
2142  if (output_order) {
2143  output_order->clear();
2144  output_order->push_back(0);
2145  }
2146  return true;
2147  }
2148 
2149  std::vector<std::string> host_chain;
2150  GetHostInfo(&host_chain, NULL, NULL);
2151 
2152  std::vector<std::string> server_dns_names;
2153  server_dns_names.reserve(servers->size());
2154  for (unsigned i = 0; i < servers->size(); ++i) {
2155  std::string host = dns::ExtractHost((*servers)[i]);
2156  server_dns_names.push_back(host.empty() ? (*servers)[i] : host);
2157  }
2158  std::string host_list = JoinStrings(server_dns_names, ",");
2159 
2160  vector<string> host_chain_shuffled;
2161  {
2162  // Protect against concurrent access to prng_
2163  MutexLockGuard m(lock_options_);
2164  // Determine random hosts for the Geo-API query
2165  host_chain_shuffled = Shuffle(host_chain, &prng_);
2166  }
2167  // Request ordered list via Geo-API
2168  bool success = false;
2169  unsigned max_attempts = std::min(host_chain_shuffled.size(), size_t(3));
2170  vector<uint64_t> geo_order(servers->size());
2171  for (unsigned i = 0; i < max_attempts; ++i) {
2172  string url = host_chain_shuffled[i] + "/api/v1.0/geo/@proxy@/" + host_list;
2174  "requesting ordered server list from %s", url.c_str());
2175  JobInfo info(&url, false, false, NULL);
2176  Failures result = Fetch(&info);
2177  if (result == kFailOk) {
2178  string order(info.destination_mem.data, info.destination_mem.size);
2179  free(info.destination_mem.data);
2180  bool retval = ValidateGeoReply(order, servers->size(), &geo_order);
2181  if (!retval) {
2183  "retrieved invalid GeoAPI reply from %s [%s]",
2184  url.c_str(), order.c_str());
2185  } else {
2187  "geographic order of servers retrieved from %s",
2188  dns::ExtractHost(host_chain_shuffled[i]).c_str());
2189  LogCvmfs(kLogDownload, kLogDebug, "order is %s", order.c_str());
2190  success = true;
2191  break;
2192  }
2193  } else {
2195  "GeoAPI request %s failed with error %d [%s]",
2196  url.c_str(), result, Code2Ascii(result));
2197  }
2198  }
2199  if (!success) {
2201  "failed to retrieve geographic order from stratum 1 servers");
2202  return false;
2203  }
2204 
2205  if (output_order) {
2206  output_order->swap(geo_order);
2207  } else {
2208  std::vector<std::string> sorted_servers;
2209  sorted_servers.reserve(geo_order.size());
2210  for (unsigned i = 0; i < geo_order.size(); ++i) {
2211  uint64_t orderval = geo_order[i];
2212  sorted_servers.push_back((*servers)[orderval]);
2213  }
2214  servers->swap(sorted_servers);
2215  }
2216  return true;
2217 }
2218 
2219 
2227 bool DownloadManager::ProbeGeo() {
2228  vector<string> host_chain;
2229  vector<int> host_rtt;
2230  unsigned current_host;
2231  vector< vector<ProxyInfo> > proxy_chain;
2232  unsigned fallback_group;
2233 
2234  GetHostInfo(&host_chain, &host_rtt, &current_host);
2235  GetProxyInfo(&proxy_chain, NULL, &fallback_group);
2236  if ((host_chain.size() < 2) && ((proxy_chain.size() - fallback_group) < 2))
2237  return true;
2238 
2239  vector<string> host_names;
2240  for (unsigned i = 0; i < host_chain.size(); ++i)
2241  host_names.push_back(dns::ExtractHost(host_chain[i]));
2242  SortTeam(&host_names, &host_chain);
2243  unsigned last_geo_host = host_names.size();
2244 
2245  if ((fallback_group == 0) && (last_geo_host > 1)) {
2246  // There are no non-fallback proxies, which means that the client
2247  // will always use the fallback proxies. Add a keyword separator
2248  // between the hosts and fallback proxies so the geosorting service
2249  // will know to sort the hosts based on the distance from the
2250  // closest fallback proxy rather than the distance from the client.
2251  host_names.push_back("+PXYSEP+");
2252  }
2253 
2254  // Add fallback proxy names to the end of the host list
2255  unsigned first_geo_fallback = host_names.size();
2256  for (unsigned i = fallback_group; i < proxy_chain.size(); ++i) {
2257  // We only take the first fallback proxy name from every group under the
2258  // assumption that load-balanced servers are at the same location
2259  host_names.push_back(proxy_chain[i][0].host.name());
2260  }
2261 
2262  std::vector<uint64_t> geo_order;
2263  bool success = GeoSortServers(&host_names, &geo_order);
2264  if (!success) {
2265  // GeoSortServers already logged a failure message.
2266  return false;
2267  }
2268 
2269  // Re-install host chain and proxy chain
2270  MutexLockGuard m(lock_options_);
2271  delete opt_host_chain_;
2272  opt_num_proxies_ = 0;
2273  opt_host_chain_ = new vector<string>(host_chain.size());
2274  string old_proxy;
2275  if (opt_proxy_groups_ != NULL) {
2276  old_proxy = (*opt_proxy_groups_)[opt_proxy_groups_current_][0].url;
2277  }
2278 
2279  // It's possible that opt_proxy_groups_fallback_ might have changed while
2280  // the lock wasn't held
2281  vector<vector<ProxyInfo> > *proxy_groups = new vector<vector<ProxyInfo> >(
2282  opt_proxy_groups_fallback_ + proxy_chain.size() - fallback_group);
2283  // First copy the non-fallback part of the current proxy chain
2284  for (unsigned i = 0; i < opt_proxy_groups_fallback_; ++i) {
2285  (*proxy_groups)[i] = (*opt_proxy_groups_)[i];
2286  opt_num_proxies_ += (*opt_proxy_groups_)[i].size();
2287  }
2288 
2289  // Copy the host chain and fallback proxies by geo order. Array indices
2290  // in geo_order that are smaller than last_geo_host refer to a stratum 1,
2291  // and those indices greater than or equal to first_geo_fallback refer to
2292  // a fallback proxy.
2293  unsigned hosti = 0;
2294  unsigned proxyi = opt_proxy_groups_fallback_;
2295  for (unsigned i = 0; i < geo_order.size(); ++i) {
2296  uint64_t orderval = geo_order[i];
2297  if (orderval < (uint64_t)last_geo_host) {
2298  // LogCvmfs(kLogCvmfs, kLogSyslog, "this is orderval %u at host index
2299  // %u", orderval, hosti);
2300  (*opt_host_chain_)[hosti++] = host_chain[orderval];
2301  } else if (orderval >= (uint64_t)first_geo_fallback) {
2302  // LogCvmfs(kLogCvmfs, kLogSyslog,
2303  // "this is orderval %u at proxy index %u, using proxy_chain index %u",
2304  // orderval, proxyi, fallback_group + orderval - first_geo_fallback);
2305  (*proxy_groups)[proxyi] =
2306  proxy_chain[fallback_group + orderval - first_geo_fallback];
2307  opt_num_proxies_ += (*proxy_groups)[proxyi].size();
2308  proxyi++;
2309  }
2310  }
2311 
2312  delete opt_proxy_groups_;
2313  opt_proxy_groups_ = proxy_groups;
2314  // In pathological cases, opt_proxy_groups_current_ can be larger now when
2315  // proxies changed in-between.
2316  if (opt_proxy_groups_current_ > opt_proxy_groups_->size()) {
2317  if (opt_proxy_groups_->size() == 0) {
2318  opt_proxy_groups_current_ = 0;
2319  } else {
2320  opt_proxy_groups_current_ = opt_proxy_groups_->size() - 1;
2321  }
2322  opt_proxy_groups_current_burned_ = 0;
2323  }
2324 
2325  string new_proxy;
2326  if (opt_proxy_groups_ != NULL) {
2327  new_proxy = (*opt_proxy_groups_)[opt_proxy_groups_current_][0].url;
2328  }
2329  if (old_proxy != new_proxy) {
2331  "switching proxy from %s to %s (geosort)",
2332  old_proxy.c_str(), new_proxy.c_str());
2333  }
2334 
2335  delete opt_host_chain_rtt_;
2336  opt_host_chain_rtt_ = new vector<int>(host_chain.size(), kProbeGeo);
2337  opt_host_chain_current_ = 0;
2338 
2339  return true;
2340 }
2341 
2342 
2350 bool DownloadManager::ValidateGeoReply(
2351  const string &reply_order,
2352  const unsigned expected_size,
2353  vector<uint64_t> *reply_vals)
2354 {
2355  if (reply_order.empty())
2356  return false;
2357  sanitizer::InputSanitizer sanitizer("09 , \n");
2358  if (!sanitizer.IsValid(reply_order))
2359  return false;
2360  sanitizer::InputSanitizer strip_newline("09 ,");
2361  vector<string> reply_strings =
2362  SplitString(strip_newline.Filter(reply_order), ',');
2363  vector<uint64_t> tmp_vals;
2364  for (unsigned i = 0; i < reply_strings.size(); ++i) {
2365  if (reply_strings[i].empty())
2366  return false;
2367  tmp_vals.push_back(String2Uint64(reply_strings[i]));
2368  }
2369  if (tmp_vals.size() != expected_size)
2370  return false;
2371 
2372  // Check if tmp_vals contains the number 1..n
2373  set<uint64_t> coverage(tmp_vals.begin(), tmp_vals.end());
2374  if (coverage.size() != tmp_vals.size())
2375  return false;
2376  if ((*coverage.begin() != 1) || (*coverage.rbegin() != coverage.size()))
2377  return false;
2378 
2379  for (unsigned i = 0; i < expected_size; ++i) {
2380  (*reply_vals)[i] = tmp_vals[i] - 1;
2381  }
2382  return true;
2383 }
2384 
2385 
2390 bool DownloadManager::StripDirect(
2391  const string &proxy_list,
2392  string *cleaned_list)
2393 {
2394  assert(cleaned_list);
2395  if (proxy_list == "") {
2396  *cleaned_list = "";
2397  return false;
2398  }
2399  bool result = false;
2400 
2401  vector<string> proxy_groups = SplitString(proxy_list, ';');
2402  vector<string> cleaned_groups;
2403  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2404  vector<string> group = SplitString(proxy_groups[i], '|');
2405  vector<string> cleaned;
2406  for (unsigned j = 0; j < group.size(); ++j) {
2407  if ((group[j] == "DIRECT") || (group[j] == "")) {
2408  result = true;
2409  } else {
2410  cleaned.push_back(group[j]);
2411  }
2412  }
2413  if (!cleaned.empty())
2414  cleaned_groups.push_back(JoinStrings(cleaned, "|"));
2415  }
2416 
2417  *cleaned_list = JoinStrings(cleaned_groups, ";");
2418  return result;
2419 }
2420 
2421 
2430 void DownloadManager::SetProxyChain(
2431  const string &proxy_list,
2432  const string &fallback_proxy_list,
2433  const ProxySetModes set_mode)
2434 {
2435  MutexLockGuard m(lock_options_);
2436 
2437  opt_timestamp_backup_proxies_ = 0;
2438  opt_timestamp_failover_proxies_ = 0;
2439  string set_proxy_list = opt_proxy_list_;
2440  string set_proxy_fallback_list = opt_proxy_fallback_list_;
2441  bool contains_direct;
2442  if ((set_mode == kSetProxyFallback) || (set_mode == kSetProxyBoth)) {
2443  opt_proxy_fallback_list_ = fallback_proxy_list;
2444  }
2445  if ((set_mode == kSetProxyRegular) || (set_mode == kSetProxyBoth)) {
2446  opt_proxy_list_ = proxy_list;
2447  }
2448  contains_direct =
2449  StripDirect(opt_proxy_fallback_list_, &set_proxy_fallback_list);
2450  if (contains_direct) {
2452  "fallback proxies do not support DIRECT, removing");
2453  }
2454  if (set_proxy_fallback_list == "") {
2455  set_proxy_list = opt_proxy_list_;
2456  } else {
2457  bool contains_direct = StripDirect(opt_proxy_list_, &set_proxy_list);
2458  if (contains_direct) {
2460  "skipping DIRECT proxy to use fallback proxy");
2461  }
2462  }
2463 
2464  // From this point on, use set_proxy_list and set_fallback_proxy_list as
2465  // effective proxy lists!
2466 
2467  delete opt_proxy_groups_;
2468  if ((set_proxy_list == "") && (set_proxy_fallback_list == "")) {
2469  opt_proxy_groups_ = NULL;
2470  opt_proxy_groups_current_ = 0;
2471  opt_proxy_groups_current_burned_ = 0;
2472  opt_proxy_groups_fallback_ = 0;
2473  opt_num_proxies_ = 0;
2474  return;
2475  }
2476 
2477  // Determine number of regular proxy groups (== first fallback proxy group)
2478  opt_proxy_groups_fallback_ = 0;
2479  if (set_proxy_list != "") {
2480  opt_proxy_groups_fallback_ = SplitString(set_proxy_list, ';').size();
2481  }
2482  LogCvmfs(kLogDownload, kLogDebug, "first fallback proxy group %u",
2483  opt_proxy_groups_fallback_);
2484 
2485  // Concatenate regular proxies and fallback proxies, both of which can be
2486  // empty.
2487  string all_proxy_list = set_proxy_list;
2488  if (set_proxy_fallback_list != "") {
2489  if (all_proxy_list != "")
2490  all_proxy_list += ";";
2491  all_proxy_list += set_proxy_fallback_list;
2492  }
2493  LogCvmfs(kLogDownload, kLogDebug, "full proxy list %s",
2494  all_proxy_list.c_str());
2495 
2496  // Resolve server names in provided urls
2497  vector<string> hostnames; // All encountered hostnames
2498  vector<string> proxy_groups;
2499  if (all_proxy_list != "")
2500  proxy_groups = SplitString(all_proxy_list, ';');
2501  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2502  vector<string> this_group = SplitString(proxy_groups[i], '|');
2503  for (unsigned j = 0; j < this_group.size(); ++j) {
2504  this_group[j] = dns::AddDefaultScheme(this_group[j]);
2505  // Note: DIRECT strings will be "extracted" to an empty string.
2506  string hostname = dns::ExtractHost(this_group[j]);
2507  // Save the hostname. Leave empty (DIRECT) names so indexes will
2508  // match later.
2509  hostnames.push_back(hostname);
2510  }
2511  }
2512  vector<dns::Host> hosts;
2513  LogCvmfs(kLogDownload, kLogDebug, "resolving %u proxy addresses",
2514  hostnames.size());
2515  resolver_->ResolveMany(hostnames, &hosts);
2516 
2517  // Construct opt_proxy_groups_: traverse proxy list in same order and expand
2518  // names to resolved IP addresses.
2519  opt_proxy_groups_ = new vector< vector<ProxyInfo> >();
2520  opt_num_proxies_ = 0;
2521  unsigned num_proxy = 0; // Combined i, j counter
2522  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2523  vector<string> this_group = SplitString(proxy_groups[i], '|');
2524  // Construct ProxyInfo objects from proxy string and DNS resolver result for
2525  // every proxy in this_group. One URL can result in multiple ProxyInfo
2526  // objects, one for each IP address.
2527  vector<ProxyInfo> infos;
2528  for (unsigned j = 0; j < this_group.size(); ++j, ++num_proxy) {
2529  this_group[j] = dns::AddDefaultScheme(this_group[j]);
2530  if (this_group[j] == "DIRECT") {
2531  infos.push_back(ProxyInfo("DIRECT"));
2532  continue;
2533  }
2534 
2535  if (hosts[num_proxy].status() != dns::kFailOk) {
2537  "failed to resolve IP addresses for %s (%d - %s)",
2538  hosts[num_proxy].name().c_str(), hosts[num_proxy].status(),
2539  dns::Code2Ascii(hosts[num_proxy].status()));
2540  dns::Host failed_host =
2541  dns::Host::ExtendDeadline(hosts[num_proxy], resolver_->min_ttl());
2542  infos.push_back(ProxyInfo(failed_host, this_group[j]));
2543  continue;
2544  }
2545 
2546  // IPv4 addresses have precedence
2547  set<string> best_addresses =
2548  hosts[num_proxy].ViewBestAddresses(opt_ip_preference_);
2549  set<string>::const_iterator iter_ips = best_addresses.begin();
2550  for (; iter_ips != best_addresses.end(); ++iter_ips) {
2551  string url_ip = dns::RewriteUrl(this_group[j], *iter_ips);
2552  infos.push_back(ProxyInfo(hosts[num_proxy], url_ip));
2553  }
2554  }
2555  opt_proxy_groups_->push_back(infos);
2556  opt_num_proxies_ += infos.size();
2557  }
2559  "installed %u proxies in %u load-balance groups",
2560  opt_num_proxies_, opt_proxy_groups_->size());
2561  opt_proxy_groups_current_ = 0;
2562  opt_proxy_groups_current_burned_ = 1;
2563 
2564  // Select random start proxy from the first group.
2565  if (opt_proxy_groups_->size() > 0) {
2566  // Select random start proxy from the first group.
2567  if ((*opt_proxy_groups_)[0].size() > 1) {
2568  int random_index = prng_.Next((*opt_proxy_groups_)[0].size());
2569  swap((*opt_proxy_groups_)[0][0], (*opt_proxy_groups_)[0][random_index]);
2570  }
2571  // LogCvmfs(kLogDownload, kLogSyslog, "using proxy %s",
2572  // (*opt_proxy_groups_)[0][0].c_str());
2573  }
2574 }
2575 
2576 
2583 void DownloadManager::GetProxyInfo(vector< vector<ProxyInfo> > *proxy_chain,
2584  unsigned *current_group,
2585  unsigned *fallback_group)
2586 {
2587  assert(proxy_chain != NULL);
2588  MutexLockGuard m(lock_options_);
2589 
2590 
2591  if (!opt_proxy_groups_) {
2592  vector< vector<ProxyInfo> > empty_chain;
2593  *proxy_chain = empty_chain;
2594  if (current_group != NULL)
2595  *current_group = 0;
2596  if (fallback_group != NULL)
2597  *fallback_group = 0;
2598  return;
2599  }
2600 
2601  *proxy_chain = *opt_proxy_groups_;
2602  if (current_group != NULL)
2603  *current_group = opt_proxy_groups_current_;
2604  if (fallback_group != NULL)
2605  *fallback_group = opt_proxy_groups_fallback_;
2606 }
2607 
2608 string DownloadManager::GetProxyList() {
2609  return opt_proxy_list_;
2610 }
2611 
2612 string DownloadManager::GetFallbackProxyList() {
2613  return opt_proxy_fallback_list_;
2614 }
2615 
2620 void DownloadManager::RebalanceProxiesUnlocked() {
2621  if (!opt_proxy_groups_)
2622  return;
2623 
2624  opt_timestamp_failover_proxies_ = 0;
2625  opt_proxy_groups_current_burned_ = 1;
2626  vector<ProxyInfo> *group = &((*opt_proxy_groups_)[opt_proxy_groups_current_]);
2627  int select = prng_.Next(group->size());
2628  swap((*group)[select], (*group)[0]);
2629  // LogCvmfs(kLogDownload, kLogDebug | kLogSyslog,
2630  // "switching proxy from %s to %s (rebalance)",
2631  // (*group)[select].c_str(), swap.c_str());
2632 }
2633 
2634 
2635 void DownloadManager::RebalanceProxies() {
2636  MutexLockGuard m(lock_options_);
2637  RebalanceProxiesUnlocked();
2638 }
2639 
2640 
2644 void DownloadManager::SwitchProxyGroup() {
2645  MutexLockGuard m(lock_options_);
2646 
2647  if (!opt_proxy_groups_ || (opt_proxy_groups_->size() < 2)) {
2648  return;
2649  }
2650 
2651  // string old_proxy = (*opt_proxy_groups_)[opt_proxy_groups_current_][0];
2652  opt_proxy_groups_current_ = (opt_proxy_groups_current_ + 1) %
2653  opt_proxy_groups_->size();
2654  opt_proxy_groups_current_burned_ = 1;
2655  opt_timestamp_backup_proxies_ = time(NULL);
2656  opt_timestamp_failover_proxies_ = 0;
2657  // LogCvmfs(kLogDownload, kLogDebug | kLogSyslog,
2658  // "switching proxy from %s to %s (manual group change)",
2659  // old_proxy.c_str(),
2660  // (*opt_proxy_groups_)[opt_proxy_groups_current_][0].c_str());
2661 }
2662 
2663 
2664 void DownloadManager::SetProxyGroupResetDelay(const unsigned seconds) {
2665  MutexLockGuard m(lock_options_);
2666  opt_proxy_groups_reset_after_ = seconds;
2667  if (opt_proxy_groups_reset_after_ == 0) {
2668  opt_timestamp_backup_proxies_ = 0;
2669  opt_timestamp_failover_proxies_ = 0;
2670  }
2671 }
2672 
2673 
2674 void DownloadManager::SetHostResetDelay(const unsigned seconds)
2675 {
2676  MutexLockGuard m(lock_options_);
2677  opt_host_reset_after_ = seconds;
2678  if (opt_host_reset_after_ == 0)
2679  opt_timestamp_backup_host_ = 0;
2680 }
2681 
2682 
2683 void DownloadManager::SetRetryParameters(const unsigned max_retries,
2684  const unsigned backoff_init_ms,
2685  const unsigned backoff_max_ms)
2686 {
2687  MutexLockGuard m(lock_options_);
2688  opt_max_retries_ = max_retries;
2689  opt_backoff_init_ms_ = backoff_init_ms;
2690  opt_backoff_max_ms_ = backoff_max_ms;
2691 }
2692 
2693 
2694 void DownloadManager::SetMaxIpaddrPerProxy(unsigned limit) {
2695  MutexLockGuard m(lock_options_);
2696  resolver_->set_throttle(limit);
2697 }
2698 
2699 
2700 void DownloadManager::SetProxyTemplates(
2701  const std::string &direct,
2702  const std::string &forced)
2703 {
2704  MutexLockGuard m(lock_options_);
2705  proxy_template_direct_ = direct;
2706  proxy_template_forced_ = forced;
2707 }
2708 
2709 
2710 void DownloadManager::EnableInfoHeader() {
2711  enable_info_header_ = true;
2712 }
2713 
2714 
2715 void DownloadManager::EnableRedirects() {
2716  follow_redirects_ = true;
2717 }
2718 
2719 
2724 DownloadManager *DownloadManager::Clone(perf::StatisticsTemplate statistics) {
2725  DownloadManager *clone = new DownloadManager();
2726  clone->Init(pool_max_handles_, use_system_proxy_, statistics);
2727  if (resolver_) {
2728  clone->SetDnsParameters(resolver_->retries(), resolver_->timeout_ms());
2729  clone->SetDnsTtlLimits(resolver_->min_ttl(), resolver_->max_ttl());
2730  clone->SetMaxIpaddrPerProxy(resolver_->throttle());
2731  }
2732  if (!opt_dns_server_.empty())
2733  clone->SetDnsServer(opt_dns_server_);
2734  clone->opt_timeout_proxy_ = opt_timeout_proxy_;
2735  clone->opt_timeout_direct_ = opt_timeout_direct_;
2736  clone->opt_low_speed_limit_ = opt_low_speed_limit_;
2737  clone->opt_max_retries_ = opt_max_retries_;
2738  clone->opt_backoff_init_ms_ = opt_backoff_init_ms_;
2739  clone->opt_backoff_max_ms_ = opt_backoff_max_ms_;
2740  clone->enable_info_header_ = enable_info_header_;
2741  clone->follow_redirects_ = follow_redirects_;
2742  if (opt_host_chain_) {
2743  clone->opt_host_chain_ = new vector<string>(*opt_host_chain_);
2744  clone->opt_host_chain_rtt_ = new vector<int>(*opt_host_chain_rtt_);
2745  }
2746  CloneProxyConfig(clone);
2747  clone->opt_ip_preference_ = opt_ip_preference_;
2748  clone->proxy_template_direct_ = proxy_template_direct_;
2749  clone->proxy_template_forced_ = proxy_template_forced_;
2750  clone->opt_proxy_groups_reset_after_ = opt_proxy_groups_reset_after_;
2751  clone->opt_host_reset_after_ = opt_host_reset_after_;
2752  clone->credentials_attachment_ = credentials_attachment_;
2753 
2754  return clone;
2755 }
2756 
2757 
2758 void DownloadManager::CloneProxyConfig(DownloadManager *clone) {
2759  clone->opt_proxy_groups_current_ = opt_proxy_groups_current_;
2760  clone->opt_proxy_groups_current_burned_ = opt_proxy_groups_current_burned_;
2761  clone->opt_proxy_groups_fallback_ = opt_proxy_groups_fallback_;
2762  clone->opt_num_proxies_ = opt_num_proxies_;
2763  clone->opt_proxy_list_ = opt_proxy_list_;
2764  clone->opt_proxy_fallback_list_ = opt_proxy_fallback_list_;
2765  if (opt_proxy_groups_ == NULL)
2766  return;
2767 
2768  clone->opt_proxy_groups_ = new vector< vector<ProxyInfo> >(
2769  *opt_proxy_groups_);
2770 }
2771 
2772 } // namespace download
unsigned opt_timeout_direct_
Definition: download.h:507
unsigned opt_low_speed_limit_
Definition: download.h:508
Destination destination
Definition: download.h:160
#define LogCvmfs(source, mask,...)
Definition: logging.h:20
std::vector< T > Shuffle(const std::vector< T > &input, Prng *prng)
Definition: algorithm.h:33
const char * Code2Ascii(const ObjectFetcherFailures::Failures error)
unsigned opt_backoff_init_ms_
Definition: download.h:510
static unsigned EscapeHeader(const string &header, char *escaped_buf, size_t buf_size)
Definition: download.cc:115
unsigned char num_used_hosts
Definition: download.h:290
static bool EscapeUrlChar(char input, char output[3])
Definition: download.cc:68
int64_t Xadd(class Counter *counter, const int64_t delta)
Definition: statistics.h:51
const char * Code2Ascii(const Failures error)
Definition: dns.h:53
int64_t id() const
Definition: dns.h:108
unsigned opt_proxy_groups_current_burned_
Definition: download.h:536
double DiffTimeSeconds(struct timeval start, struct timeval end)
Definition: algorithm.cc:30
unsigned opt_proxy_groups_reset_after_
Definition: download.h:586
void SetUrlOptions(JobInfo *info)
Definition: download.cc:878
StreamStates DecompressZStream2Sink(const void *buf, const int64_t size, z_stream *strm, cvmfs::Sink *sink)
Definition: compression.cc:234
unsigned backoff_ms
Definition: download.h:292
std::string opt_proxy_fallback_list_
Definition: download.h:553
static NormalResolver * Create(const bool ipv4_only, const unsigned retries, const unsigned timeout_ms)
Definition: dns.cc:1259
vector< string > SplitString(const string &str, const char delim, const unsigned max_chunks)
Definition: string.cc:287
unsigned opt_host_reset_after_
Definition: download.h:594
std::string proxy_template_direct_
Definition: download.h:570
#define PANIC(...)
Definition: exception.h:26
string ReplaceAll(const string &haystack, const string &needle, const string &replace_by)
Definition: string.cc:453
string JoinStrings(const vector< string > &strings, const string &joint)
Definition: string.cc:317
void Init(ContextPtr context)
Definition: hash.cc:164
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:245
unsigned opt_proxy_groups_current_
Definition: download.h:531
off_t range_offset
Definition: download.h:173
shash::ContextPtr hash_context
Definition: download.h:283
void DecompressInit(z_stream *strm)
Definition: compression.cc:185
unsigned int current_host_chain_index
Definition: download.h:293
bool DecompressMem2Mem(const void *buf, const int64_t size, void **out_buf, uint64_t *out_size)
Definition: compression.cc:797
std::set< CURL * > * pool_handles_inuse_
Definition: download.h:486
void * cred_data
Definition: download.h:159
StreamStates DecompressZStream2File(const void *buf, const int64_t size, z_stream *strm, FILE *f)
Definition: compression.cc:275
std::string opt_proxy_list_
Definition: download.h:549
const std::string & name() const
Definition: dns.h:118
perf::Counter * sz_transfer_time
Definition: download.h:124
std::vector< std::vector< ProxyInfo > > * opt_proxy_groups_
Definition: download.h:527
assert((mem||(size==0))&&"Out Of Memory")
unsigned opt_proxy_groups_fallback_
Definition: download.h:541
void ReleaseCurlHandle(CURL *handle)
Definition: download.cc:795
StreamStates
Definition: compression.h:36
Algorithms algorithm
Definition: hash.h:123
z_stream zstream
Definition: download.h:282
const std::set< std::string > & ViewBestAddresses(IpPreference preference) const
Definition: dns.cc:205
void SetDnsServer(const std::string &address)
Definition: download.cc:1815
void DecompressFini(z_stream *strm)
Definition: compression.cc:201
void MakePipe(int pipe_fd[2])
Definition: posix.cc:520
char algorithm
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:174
bool IsValid(const std::string &input) const
Definition: sanitizer.cc:114
Algorithms
Definition: hash.h:39
void SetDnsTtlLimits(const unsigned min_seconds, const unsigned max_seconds)
Definition: download.cc:1851
static Failures PrepareDownloadDestination(JobInfo *info)
Definition: download.cc:145
const char * Code2Ascii(const Failures error)
Definition: download.h:86
void Update(const unsigned char *buffer, const unsigned buffer_length, ContextPtr context)
Definition: hash.cc:190
unsigned char num_retries
Definition: download.h:291
void Final(ContextPtr context, Any *any_digest)
Definition: hash.cc:221
std::string AddDefaultScheme(const std::string &proxy)
Definition: dns.cc:187
static string EscapeUrl(const string &url)
Definition: download.cc:93
dns::IpPreference opt_ip_preference_
Definition: download.h:563
Algorithms algorithm
Definition: hash.h:486
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:268
Definition: dns.h:90
Failures Fetch(const std::string &base_url, const std::string &repository_name, const uint64_t minimum_timestamp, const shash::Any *base_catalog, signature::SignatureManager *signature_manager, download::DownloadManager *download_manager, ManifestEnsemble *ensemble)
bool IsEquivalent(const Host &other) const
Definition: dns.cc:269
bool IsProxyTransferError(const Failures error)
Definition: download.h:74
bool IsExpired() const
Definition: dns.cc:280
FILE * destination_file
Definition: download.h:166
bool follow_redirects
Definition: download.h:154
unsigned GetContextSize(const Algorithms algorithm)
Definition: hash.cc:148
virtual int Reset()=0
perf::Counter * n_requests
Definition: download.h:125
static int Init(const loader::LoaderExports *loader_exports)
Definition: cvmfs.cc:1755
string StringifyInt(const int64_t value)
Definition: string.cc:77
void SetMaxIpaddrPerProxy(unsigned limit)
Definition: download.cc:2694
const shash::Any * expected_hash
Definition: download.h:169
void Inc(class Counter *counter)
Definition: statistics.h:50
void * buffer
Definition: hash.h:487
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:264
const std::string * extra_info
Definition: download.h:170
std::string ExtractHost(const std::string &url)
Definition: dns.cc:110
std::vector< int > * opt_host_chain_rtt_
Definition: download.h:523
cvmfs::Sink * destination_sink
Definition: download.h:168
IpPreference
Definition: dns.h:46
unsigned opt_backoff_max_ms_
Definition: download.h:511
CURL * curl_handle
Definition: download.h:279
CredentialsAttachment * credentials_attachment_
Definition: download.h:596
std::vector< std::string > * opt_host_chain_
Definition: download.h:518
struct pollfd * watch_fds_
Definition: download.h:498
uint64_t String2Uint64(const string &value)
Definition: string.cc:227
Failures error_code
Definition: download.h:287
static void Fini()
Definition: cvmfs.cc:1961
static void Spawn()
Definition: cvmfs.cc:1876
struct download::JobInfo::@3 destination_mem
std::string Filter(const std::string &input) const
Definition: sanitizer.cc:107
std::string proxy_template_forced_
Definition: download.h:576
const std::string * destination_path
Definition: download.h:167
void SetDnsParameters(const unsigned retries, const unsigned timeout_ms)
Definition: download.cc:1833
static Host ExtendDeadline(const Host &original, unsigned seconds_from_now)
Definition: dns.cc:231
void SafeSleepMs(const unsigned ms)
Definition: posix.cc:1908
bool IsHostTransferError(const Failures error)
Definition: download.h:62
void Init(const unsigned max_pool_handles, const bool use_system_proxy, perf::StatisticsTemplate statistics)
Definition: download.cc:1594
void SortTeam(std::vector< T > *tractor, std::vector< U > *towed)
Definition: algorithm.h:51
curl_slist * headers
Definition: download.h:280
unsigned size
Definition: hash.h:488
unsigned char num_used_proxies
Definition: download.h:289
Failures status() const
Definition: dns.h:119
std::string proxy
Definition: download.h:285
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:529
static void size_t size
Definition: smalloc.h:47
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
Definition: download.cc:1214
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:541
const std::string * url
Definition: download.h:150
void InitializeRequest(JobInfo *info, CURL *handle)
Definition: download.cc:812
string RewriteUrl(const string &url, const string &ip)
Definition: dns.cc:156
virtual int64_t Write(const void *buf, uint64_t sz)=0
char * info_header
Definition: download.h:281