CernVM-FS  2.11.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
download.cc
Go to the documentation of this file.
1 
27 // TODO(jblomer): MS for time summing
28 // NOLINTNEXTLINE
29 #define __STDC_FORMAT_MACROS
30 
31 #include "cvmfs_config.h"
32 #include "download.h"
33 
34 #include <alloca.h>
35 #include <errno.h>
36 #include <inttypes.h>
37 #include <poll.h>
38 #include <pthread.h>
39 #include <signal.h>
40 #include <stdint.h>
41 #include <sys/time.h>
42 #include <unistd.h>
43 
44 #include <algorithm>
45 #include <cassert>
46 #include <cstdio>
47 #include <cstdlib>
48 #include <cstring>
49 #include <map>
50 #include <set>
51 #include <utility>
52 
53 #include "compression.h"
54 #include "crypto/hash.h"
55 #include "duplex_curl.h"
56 #include "interrupt.h"
57 #include "sanitizer.h"
58 #include "ssl.h"
59 #include "util/algorithm.h"
60 #include "util/atomic.h"
61 #include "util/concurrency.h"
62 #include "util/exception.h"
63 #include "util/logging.h"
64 #include "util/posix.h"
65 #include "util/prng.h"
66 #include "util/smalloc.h"
67 #include "util/string.h"
68 
69 using namespace std; // NOLINT
70 
71 namespace download {
72 
73 static inline bool EscapeUrlChar(char input, char output[3]) {
74  if (((input >= '0') && (input <= '9')) ||
75  ((input >= 'A') && (input <= 'Z')) ||
76  ((input >= 'a') && (input <= 'z')) ||
77  (input == '/') || (input == ':') || (input == '.') ||
78  (input == '@') ||
79  (input == '+') || (input == '-') ||
80  (input == '_') || (input == '~') ||
81  (input == '[') || (input == ']') || (input == ','))
82  {
83  output[0] = input;
84  return false;
85  }
86 
87  output[0] = '%';
88  output[1] = static_cast<char>(
89  (input / 16) + ((input / 16 <= 9) ? '0' : 'A'-10));
90  output[2] = static_cast<char>(
91  (input % 16) + ((input % 16 <= 9) ? '0' : 'A'-10));
92  return true;
93 }
94 
95 
100 static string EscapeUrl(const string &url) {
101  string escaped;
102  escaped.reserve(url.length());
103 
104  char escaped_char[3];
105  for (unsigned i = 0, s = url.length(); i < s; ++i) {
106  if (EscapeUrlChar(url[i], escaped_char)) {
107  escaped.append(escaped_char, 3);
108  } else {
109  escaped.push_back(escaped_char[0]);
110  }
111  }
112  LogCvmfs(kLogDownload, kLogDebug, "escaped %s to %s",
113  url.c_str(), escaped.c_str());
114 
115  return escaped;
116 }
117 
118 
123 static unsigned EscapeHeader(const string &header,
124  char *escaped_buf,
125  size_t buf_size)
126 {
127  unsigned esc_pos = 0;
128  char escaped_char[3];
129  for (unsigned i = 0, s = header.size(); i < s; ++i) {
130  if (EscapeUrlChar(header[i], escaped_char)) {
131  for (unsigned j = 0; j < 3; ++j) {
132  if (escaped_buf) {
133  if (esc_pos >= buf_size)
134  return esc_pos;
135  escaped_buf[esc_pos] = escaped_char[j];
136  }
137  esc_pos++;
138  }
139  } else {
140  if (escaped_buf) {
141  if (esc_pos >= buf_size)
142  return esc_pos;
143  escaped_buf[esc_pos] = escaped_char[0];
144  }
145  esc_pos++;
146  }
147  }
148 
149  return esc_pos;
150 }
151 
152 
154  info->destination_mem.size = 0;
155  info->destination_mem.pos = 0;
156  info->destination_mem.data = NULL;
157 
158  if (info->destination == kDestinationFile)
159  assert(info->destination_file != NULL);
160 
161  if (info->destination == kDestinationPath) {
162  assert(info->destination_path != NULL);
163  info->destination_file = fopen(info->destination_path->c_str(), "w");
164  if (info->destination_file == NULL) {
165  LogCvmfs(kLogDownload, kLogDebug, "Failed to open path %s: %s"
166  " (errno=%d).",
167  info->destination_path->c_str(), strerror(errno), errno);
168  return kFailLocalIO;
169  }
170  }
171 
172  if (info->destination == kDestinationSink)
173  assert(info->destination_sink != NULL);
174 
175  return kFailOk;
176 }
177 
178 
182 static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb,
183  void *info_link)
184 {
185  const size_t num_bytes = size*nmemb;
186  const string header_line(static_cast<const char *>(ptr), num_bytes);
187  JobInfo *info = static_cast<JobInfo *>(info_link);
188 
189  // LogCvmfs(kLogDownload, kLogDebug, "REMOVE-ME: Header callback with %s",
190  // header_line.c_str());
191 
192  // Check http status codes
193  if (HasPrefix(header_line, "HTTP/1.", false)) {
194  if (header_line.length() < 10)
195  return 0;
196 
197  unsigned i;
198  for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {}
199 
200  // Code is initialized to -1
201  if (header_line.length() > i+2) {
202  info->http_code = DownloadManager::ParseHttpCode(&header_line[i]);
203  }
204 
205  if ((info->http_code / 100) == 2) {
206  return num_bytes;
207  } else if ((info->http_code == 301) ||
208  (info->http_code == 302) ||
209  (info->http_code == 303) ||
210  (info->http_code == 307))
211  {
212  if (!info->follow_redirects) {
213  LogCvmfs(kLogDownload, kLogDebug, "redirect support not enabled: %s",
214  header_line.c_str());
215  info->error_code = kFailHostHttp;
216  return 0;
217  }
218  LogCvmfs(kLogDownload, kLogDebug, "http redirect: %s",
219  header_line.c_str());
220  // libcurl will handle this because of CURLOPT_FOLLOWLOCATION
221  return num_bytes;
222  } else {
223  LogCvmfs(kLogDownload, kLogDebug, "http status error code: %s [%d]",
224  header_line.c_str(), info->http_code);
225  if (((info->http_code / 100) == 5) ||
226  (info->http_code == 400) || (info->http_code == 404))
227  {
228  // 5XX returned by host
229  // 400: error from the GeoAPI module
230  // 404: the stratum 1 does not have the newest files
231  info->error_code = kFailHostHttp;
232  } else if (info->http_code == 429) {
233  // 429: rate throttling (we ignore the backoff hint for the time being)
235  } else {
236  info->error_code = (info->proxy == "DIRECT") ? kFailHostHttp :
238  }
239  return 0;
240  }
241  }
242 
243  // Allocate memory for kDestinationMemory
244  if ((info->destination == kDestinationMem) &&
245  HasPrefix(header_line, "CONTENT-LENGTH:", true))
246  {
247  char *tmp = reinterpret_cast<char *>(alloca(num_bytes+1));
248  uint64_t length = 0;
249  sscanf(header_line.c_str(), "%s %" PRIu64, tmp, &length);
250  if (length > 0) {
251  if (length > DownloadManager::kMaxMemSize) {
253  "resource %s too large to store in memory (%" PRIu64 ")",
254  info->url->c_str(), length);
255  info->error_code = kFailTooBig;
256  return 0;
257  }
258  info->destination_mem.data = static_cast<char *>(smalloc(length));
259  } else {
260  // Empty resource
261  info->destination_mem.data = NULL;
262  }
263  info->destination_mem.size = length;
264  } else if (HasPrefix(header_line, "LOCATION:", true)) {
265  // This comes along with redirects
266  LogCvmfs(kLogDownload, kLogDebug, "%s", header_line.c_str());
267  } else if (HasPrefix(header_line, "X-SQUID-ERROR:", true)) {
268  // Reinterpret host error as proxy error
269  if (info->error_code == kFailHostHttp) {
270  info->error_code = kFailProxyHttp;
271  }
272  } else if (HasPrefix(header_line, "PROXY-STATUS:", true)) {
273  // Reinterpret host error as proxy error if applicable
274  if ((info->error_code == kFailHostHttp) &&
275  (header_line.find("error=") != string::npos)) {
276  info->error_code = kFailProxyHttp;
277  }
278  }
279 
280  return num_bytes;
281 }
282 
283 
287 static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb,
288  void *info_link)
289 {
290  const size_t num_bytes = size*nmemb;
291  JobInfo *info = static_cast<JobInfo *>(info_link);
292 
293  // LogCvmfs(kLogDownload, kLogDebug, "Data callback, %d bytes", num_bytes);
294 
295  if (num_bytes == 0)
296  return 0;
297 
298  if (info->expected_hash) {
299  shash::Update(reinterpret_cast<unsigned char *>(ptr),
300  num_bytes, info->hash_context);
301  }
302 
303  if (info->destination == kDestinationSink) {
304  if (info->compressed) {
305  zlib::StreamStates retval =
306  zlib::DecompressZStream2Sink(ptr, static_cast<int64_t>(num_bytes),
307  &info->zstream, info->destination_sink);
308  if (retval == zlib::kStreamDataError) {
309  LogCvmfs(kLogDownload, kLogSyslogErr, "failed to decompress %s",
310  info->url->c_str());
311  info->error_code = kFailBadData;
312  return 0;
313  } else if (retval == zlib::kStreamIOError) {
315  "decompressing %s, local IO error", info->url->c_str());
316  info->error_code = kFailLocalIO;
317  return 0;
318  }
319  } else {
320  int64_t written = info->destination_sink->Write(ptr, num_bytes);
321  if ((written < 0) || (static_cast<uint64_t>(written) != num_bytes)) {
322  LogCvmfs(kLogDownload, kLogDebug, "Failed to perform write on %s (%"
323  PRId64 ")", info->url->c_str(), written);
324  info->error_code = kFailLocalIO;
325  return 0;
326  }
327  }
328  } else if (info->destination == kDestinationMem) {
329  // Write to memory
330  if (info->destination_mem.pos + num_bytes > info->destination_mem.size) {
331  if (info->destination_mem.size == 0) {
333  "Content-Length was missing or zero, but %zu bytes received",
334  info->destination_mem.pos + num_bytes);
335  } else {
336  LogCvmfs(kLogDownload, kLogDebug, "Callback had too much data: "
337  "start %zu, bytes %zu, expected %zu",
338  info->destination_mem.pos,
339  num_bytes,
340  info->destination_mem.size);
341  }
342  info->error_code = kFailBadData;
343  return 0;
344  }
345  memcpy(info->destination_mem.data + info->destination_mem.pos,
346  ptr, num_bytes);
347  info->destination_mem.pos += num_bytes;
348  } else {
349  // Write to file
350  if (info->compressed) {
351  // LogCvmfs(kLogDownload, kLogDebug, "REMOVE-ME: writing %d bytes for %s",
352  // num_bytes, info->url->c_str());
353  zlib::StreamStates retval =
354  zlib::DecompressZStream2File(ptr, static_cast<int64_t>(num_bytes),
355  &info->zstream, info->destination_file);
356  if (retval == zlib::kStreamDataError) {
357  LogCvmfs(kLogDownload, kLogSyslogErr, "failed to decompress %s",
358  info->url->c_str());
359  info->error_code = kFailBadData;
360  return 0;
361  } else if (retval == zlib::kStreamIOError) {
363  "decompressing %s, local IO error", info->url->c_str());
364  info->error_code = kFailLocalIO;
365  return 0;
366  }
367  } else {
368  if (fwrite(ptr, 1, num_bytes, info->destination_file) != num_bytes) {
370  "downloading %s, IO failure: %s (errno=%d)",
371  info->url->c_str(), strerror(errno), errno);
372  info->error_code = kFailLocalIO;
373  return 0;
374  }
375  }
376  }
377 
378  return num_bytes;
379 }
380 
381 
382 //------------------------------------------------------------------------------
383 
384 
385 bool JobInfo::IsFileNotFound() {
386  if (HasPrefix(*url, "file://", true /* ignore_case */))
387  return error_code == kFailHostConnection;
388 
389  return http_code == 404;
390 }
391 
392 
393 //------------------------------------------------------------------------------
394 
395 
396 const int DownloadManager::kProbeUnprobed = -1;
397 const int DownloadManager::kProbeDown = -2;
398 const int DownloadManager::kProbeGeo = -3;
399 const unsigned DownloadManager::kMaxMemSize = 1024*1024;
400 
401 
405 int DownloadManager::ParseHttpCode(const char digits[3]) {
406  int result = 0;
407  int factor = 100;
408  for (int i = 0; i < 3; ++i) {
409  if ((digits[i] < '0') || (digits[i] > '9'))
410  return -1;
411  result += (digits[i] - '0') * factor;
412  factor /= 10;
413  }
414  return result;
415 }
416 
417 
421 int DownloadManager::CallbackCurlSocket(CURL * /* easy */,
422  curl_socket_t s,
423  int action,
424  void *userp,
425  void * /* socketp */)
426 {
427  // LogCvmfs(kLogDownload, kLogDebug, "CallbackCurlSocket called with easy "
428  // "handle %p, socket %d, action %d", easy, s, action);
429  DownloadManager *download_mgr = static_cast<DownloadManager *>(userp);
430  if (action == CURL_POLL_NONE)
431  return 0;
432 
433  // Find s in watch_fds_
434  unsigned index;
435 
436  // TODO(heretherebedragons) why start at index = 0 and not 2?
437  // fd[0] and fd[1] are fixed?
438  for (index = 0; index < download_mgr->watch_fds_inuse_; ++index) {
439  if (download_mgr->watch_fds_[index].fd == s)
440  break;
441  }
442  // Or create newly
443  if (index == download_mgr->watch_fds_inuse_) {
444  // Extend array if necessary
445  if (download_mgr->watch_fds_inuse_ == download_mgr->watch_fds_size_)
446  {
447  assert(download_mgr->watch_fds_size_ > 0);
448  download_mgr->watch_fds_size_ *= 2;
449  download_mgr->watch_fds_ = static_cast<struct pollfd *>(
450  srealloc(download_mgr->watch_fds_,
451  download_mgr->watch_fds_size_ * sizeof(struct pollfd)));
452  }
453  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].fd = s;
454  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].events = 0;
455  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_].revents = 0;
456  download_mgr->watch_fds_inuse_++;
457  }
458 
459  switch (action) {
460  case CURL_POLL_IN:
461  download_mgr->watch_fds_[index].events = POLLIN | POLLPRI;
462  break;
463  case CURL_POLL_OUT:
464  download_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND;
465  break;
466  case CURL_POLL_INOUT:
467  download_mgr->watch_fds_[index].events =
468  POLLIN | POLLPRI | POLLOUT | POLLWRBAND;
469  break;
470  case CURL_POLL_REMOVE:
471  if (index < download_mgr->watch_fds_inuse_-1) {
472  download_mgr->watch_fds_[index] =
473  download_mgr->watch_fds_[download_mgr->watch_fds_inuse_-1];
474  }
475  download_mgr->watch_fds_inuse_--;
476  // Shrink array if necessary
477  if ((download_mgr->watch_fds_inuse_ > download_mgr->watch_fds_max_) &&
478  (download_mgr->watch_fds_inuse_ < download_mgr->watch_fds_size_/2))
479  {
480  download_mgr->watch_fds_size_ /= 2;
481  // LogCvmfs(kLogDownload, kLogDebug, "shrinking watch_fds_ (%d)",
482  // watch_fds_size_);
483  download_mgr->watch_fds_ = static_cast<struct pollfd *>(
484  srealloc(download_mgr->watch_fds_,
485  download_mgr->watch_fds_size_*sizeof(struct pollfd)));
486  // LogCvmfs(kLogDownload, kLogDebug, "shrinking watch_fds_ done",
487  // watch_fds_size_);
488  }
489  break;
490  default:
491  break;
492  }
493 
494  return 0;
495 }
496 
497 
501 void *DownloadManager::MainDownload(void *data) {
502  LogCvmfs(kLogDownload, kLogDebug, "download I/O thread started");
503  DownloadManager *download_mgr = static_cast<DownloadManager *>(data);
504 
505  const int kIdxPipeTerminate = 0;
506  const int kIdxPipeJobs = 1;
507 
508  download_mgr->watch_fds_ =
509  static_cast<struct pollfd *>(smalloc(2 * sizeof(struct pollfd)));
510  download_mgr->watch_fds_size_ = 2;
511  download_mgr->watch_fds_[kIdxPipeTerminate].fd =
512  download_mgr->pipe_terminate_->GetReadFd();
513  download_mgr->watch_fds_[kIdxPipeTerminate].events = POLLIN | POLLPRI;
514  download_mgr->watch_fds_[kIdxPipeTerminate].revents = 0;
515  download_mgr->watch_fds_[kIdxPipeJobs].fd =
516  download_mgr->pipe_jobs_->GetReadFd();
517  download_mgr->watch_fds_[kIdxPipeJobs].events = POLLIN | POLLPRI;
518  download_mgr->watch_fds_[kIdxPipeJobs].revents = 0;
519  download_mgr->watch_fds_inuse_ = 2;
520 
521  int still_running = 0;
522  struct timeval timeval_start, timeval_stop;
523  gettimeofday(&timeval_start, NULL);
524  while (true) {
525  int timeout;
526  if (still_running) {
527  /* NOTE: The following might degrade the performance for many small files
528  * use case. TODO(jblomer): look into it.
529  // Specify a timeout for polling in ms; this allows us to return
530  // to libcurl once a second so it can look for internal operations
531  // which timed out. libcurl has a more elaborate mechanism
532  // (CURLMOPT_TIMERFUNCTION) that would inform us of the next potential
533  // timeout. TODO(bbockelm) we should switch to that in the future.
534  timeout = 100;
535  */
536  timeout = 1;
537  } else {
538  timeout = -1;
539  gettimeofday(&timeval_stop, NULL);
540  int64_t delta = static_cast<int64_t>(
541  1000 * DiffTimeSeconds(timeval_start, timeval_stop));
542  perf::Xadd(download_mgr->counters_->sz_transfer_time, delta);
543  }
544  int retval = poll(download_mgr->watch_fds_, download_mgr->watch_fds_inuse_,
545  timeout);
546  if (retval < 0) {
547  continue;
548  }
549 
550  // Handle timeout
551  if (retval == 0) {
552  curl_multi_socket_action(download_mgr->curl_multi_,
553  CURL_SOCKET_TIMEOUT,
554  0,
555  &still_running);
556  }
557 
558  // Terminate I/O thread
559  if (download_mgr->watch_fds_[kIdxPipeTerminate].revents)
560  break;
561 
562  // New job arrives
563  if (download_mgr->watch_fds_[kIdxPipeJobs].revents) {
564  download_mgr->watch_fds_[kIdxPipeJobs].revents = 0;
565  JobInfo *info;
566  download_mgr->pipe_jobs_->Read<JobInfo*>(&info);
567  if (!still_running) {
568  gettimeofday(&timeval_start, NULL);
569  }
570  CURL *handle = download_mgr->AcquireCurlHandle();
571  download_mgr->InitializeRequest(info, handle);
572  download_mgr->SetUrlOptions(info);
573  curl_multi_add_handle(download_mgr->curl_multi_, handle);
574  curl_multi_socket_action(download_mgr->curl_multi_,
575  CURL_SOCKET_TIMEOUT,
576  0,
577  &still_running);
578  }
579 
580  // Activity on curl sockets
581  // Within this loop the curl_multi_socket_action() may cause socket(s)
582  // to be removed from watch_fds_. If a socket is removed it is replaced
583  // by the socket at the end of the array and the inuse count is decreased.
584  // Therefore loop over the array in reverse order.
585  for (int64_t i = download_mgr->watch_fds_inuse_-1; i >= 2; --i) {
586  if (i >= download_mgr->watch_fds_inuse_) {
587  continue;
588  }
589  if (download_mgr->watch_fds_[i].revents) {
590  int ev_bitmask = 0;
591  if (download_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI))
592  ev_bitmask |= CURL_CSELECT_IN;
593  if (download_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
594  ev_bitmask |= CURL_CSELECT_OUT;
595  if (download_mgr->watch_fds_[i].revents &
596  (POLLERR | POLLHUP | POLLNVAL))
597  {
598  ev_bitmask |= CURL_CSELECT_ERR;
599  }
600  download_mgr->watch_fds_[i].revents = 0;
601 
602  curl_multi_socket_action(download_mgr->curl_multi_,
603  download_mgr->watch_fds_[i].fd,
604  ev_bitmask,
605  &still_running);
606  }
607  }
608 
609  // Check if transfers are completed
610  CURLMsg *curl_msg;
611  int msgs_in_queue;
612  while ((curl_msg = curl_multi_info_read(download_mgr->curl_multi_,
613  &msgs_in_queue)))
614  {
615  if (curl_msg->msg == CURLMSG_DONE) {
616  perf::Inc(download_mgr->counters_->n_requests);
617  JobInfo *info;
618  CURL *easy_handle = curl_msg->easy_handle;
619  int curl_error = curl_msg->data.result;
620  curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
621 
622  curl_multi_remove_handle(download_mgr->curl_multi_, easy_handle);
623  if (download_mgr->VerifyAndFinalize(curl_error, info)) {
624  curl_multi_add_handle(download_mgr->curl_multi_, easy_handle);
625  curl_multi_socket_action(download_mgr->curl_multi_,
626  CURL_SOCKET_TIMEOUT,
627  0,
628  &still_running);
629  } else {
630  // Return easy handle into pool and write result back
631  download_mgr->ReleaseCurlHandle(easy_handle);
632 
633  info->pipe_job_results->Write<download::Failures>(info->error_code);
634  }
635  }
636  }
637  }
638 
639  for (set<CURL *>::iterator i = download_mgr->pool_handles_inuse_->begin(),
640  iEnd = download_mgr->pool_handles_inuse_->end(); i != iEnd; ++i)
641  {
642  curl_multi_remove_handle(download_mgr->curl_multi_, *i);
643  curl_easy_cleanup(*i);
644  }
645  download_mgr->pool_handles_inuse_->clear();
646  free(download_mgr->watch_fds_);
647 
648  LogCvmfs(kLogDownload, kLogDebug, "download I/O thread terminated");
649  return NULL;
650 }
651 
652 
653 //------------------------------------------------------------------------------
654 
655 
656 HeaderLists::~HeaderLists() {
657  for (unsigned i = 0; i < blocks_.size(); ++i) {
658  delete[] blocks_[i];
659  }
660  blocks_.clear();
661 }
662 
663 
664 curl_slist *HeaderLists::GetList(const char *header) {
665  return Get(header);
666 }
667 
668 
669 curl_slist *HeaderLists::DuplicateList(curl_slist *slist) {
670  assert(slist);
671  curl_slist *copy = GetList(slist->data);
672  copy->next = slist->next;
673  curl_slist *prev = copy;
674  slist = slist->next;
675  while (slist) {
676  curl_slist *new_link = Get(slist->data);
677  new_link->next = slist->next;
678  prev->next = new_link;
679  prev = new_link;
680  slist = slist->next;
681  }
682  return copy;
683 }
684 
685 
686 void HeaderLists::AppendHeader(curl_slist *slist, const char *header) {
687  assert(slist);
688  curl_slist *new_link = Get(header);
689  new_link->next = NULL;
690 
691  while (slist->next)
692  slist = slist->next;
693  slist->next = new_link;
694 }
695 
696 
702 void HeaderLists::CutHeader(const char *header, curl_slist **slist) {
703  assert(slist);
704  curl_slist head;
705  head.next = *slist;
706  curl_slist *prev = &head;
707  curl_slist *rover = *slist;
708  while (rover) {
709  if (strcmp(rover->data, header) == 0) {
710  prev->next = rover->next;
711  Put(rover);
712  rover = prev;
713  }
714  prev = rover;
715  rover = rover->next;
716  }
717  *slist = head.next;
718 }
719 
720 
721 void HeaderLists::PutList(curl_slist *slist) {
722  while (slist) {
723  curl_slist *next = slist->next;
724  Put(slist);
725  slist = next;
726  }
727 }
728 
729 
730 string HeaderLists::Print(curl_slist *slist) {
731  string verbose;
732  while (slist) {
733  verbose += string(slist->data) + "\n";
734  slist = slist->next;
735  }
736  return verbose;
737 }
738 
739 
740 curl_slist *HeaderLists::Get(const char *header) {
741  for (unsigned i = 0; i < blocks_.size(); ++i) {
742  for (unsigned j = 0; j < kBlockSize; ++j) {
743  if (!IsUsed(&(blocks_[i][j]))) {
744  blocks_[i][j].data = const_cast<char *>(header);
745  return &(blocks_[i][j]);
746  }
747  }
748  }
749 
750  // All used, new block
751  AddBlock();
752  blocks_[blocks_.size()-1][0].data = const_cast<char *>(header);
753  return &(blocks_[blocks_.size()-1][0]);
754 }
755 
756 
757 void HeaderLists::Put(curl_slist *slist) {
758  slist->data = NULL;
759  slist->next = NULL;
760 }
761 
762 
763 void HeaderLists::AddBlock() {
764  curl_slist *new_block = new curl_slist[kBlockSize];
765  for (unsigned i = 0; i < kBlockSize; ++i) {
766  Put(&new_block[i]);
767  }
768  blocks_.push_back(new_block);
769 }
770 
771 
772 //------------------------------------------------------------------------------
773 
774 
775 string DownloadManager::ProxyInfo::Print() {
776  if (url == "DIRECT")
777  return url;
778 
779  string result = url;
780  int remaining =
781  static_cast<int>(host.deadline()) - static_cast<int>(time(NULL));
782  string expinfo = (remaining >= 0) ? "+" : "";
783  if (abs(remaining) >= 3600) {
784  expinfo += StringifyInt(remaining/3600) + "h";
785  } else if (abs(remaining) >= 60) {
786  expinfo += StringifyInt(remaining/60) + "m";
787  } else {
788  expinfo += StringifyInt(remaining) + "s";
789  }
790  if (host.status() == dns::kFailOk) {
791  result += " (" + host.name() + ", " + expinfo + ")";
792  } else {
793  result += " (:unresolved:, " + expinfo + ")";
794  }
795  return result;
796 }
797 
798 
803 CURL *DownloadManager::AcquireCurlHandle() {
804  CURL *handle;
805 
806  if (pool_handles_idle_->empty()) {
807  // Create a new handle
808  handle = curl_easy_init();
809  assert(handle != NULL);
810 
811  curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
812  // curl_easy_setopt(curl_default, CURLOPT_FAILONERROR, 1);
813  curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION, CallbackCurlHeader);
814  curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlData);
815  } else {
816  handle = *(pool_handles_idle_->begin());
817  pool_handles_idle_->erase(pool_handles_idle_->begin());
818  }
819 
820  pool_handles_inuse_->insert(handle);
821 
822  return handle;
823 }
824 
825 
826 void DownloadManager::ReleaseCurlHandle(CURL *handle) {
827  set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
828  assert(elem != pool_handles_inuse_->end());
829 
830  if (pool_handles_idle_->size() > pool_max_handles_) {
831  curl_easy_cleanup(*elem);
832  } else {
833  pool_handles_idle_->insert(*elem);
834  }
835 
836  pool_handles_inuse_->erase(elem);
837 }
838 
839 
844 void DownloadManager::InitializeRequest(JobInfo *info, CURL *handle) {
845  // Initialize internal download state
846  info->curl_handle = handle;
847  info->error_code = kFailOk;
848  info->http_code = -1;
849  info->follow_redirects = follow_redirects_;
850  info->num_used_proxies = 1;
851  info->num_used_hosts = 1;
852  info->num_retries = 0;
853  info->backoff_ms = 0;
854  info->headers = header_lists_->DuplicateList(default_headers_);
855  if (info->info_header) {
856  header_lists_->AppendHeader(info->headers, info->info_header);
857  }
858  if (info->force_nocache) {
859  SetNocache(info);
860  } else {
861  info->nocache = false;
862  }
863  if (info->compressed) {
864  zlib::DecompressInit(&(info->zstream));
865  }
866  if (info->expected_hash) {
867  assert(info->hash_context.buffer != NULL);
868  shash::Init(info->hash_context);
869  }
870 
871  if ((info->range_offset != -1) && (info->range_size)) {
872  char byte_range_array[100];
873  const int64_t range_lower = static_cast<int64_t>(info->range_offset);
874  const int64_t range_upper = static_cast<int64_t>(
875  info->range_offset + info->range_size - 1);
876  if (snprintf(byte_range_array, sizeof(byte_range_array),
877  "%" PRId64 "-%" PRId64,
878  range_lower, range_upper) == 100)
879  {
880  PANIC(NULL); // Should be impossible given limits on offset size.
881  }
882  curl_easy_setopt(handle, CURLOPT_RANGE, byte_range_array);
883  } else {
884  curl_easy_setopt(handle, CURLOPT_RANGE, NULL);
885  }
886 
887  // Set curl parameters
888  curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
889  curl_easy_setopt(handle, CURLOPT_WRITEHEADER,
890  static_cast<void *>(info));
891  curl_easy_setopt(handle, CURLOPT_WRITEDATA, static_cast<void *>(info));
892  curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->headers);
893  if (info->head_request) {
894  curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
895  } else {
896  curl_easy_setopt(handle, CURLOPT_HTTPGET, 1);
897  }
898  if (opt_ipv4_only_) {
899  curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
900  }
901  if (follow_redirects_) {
902  curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1);
903  curl_easy_setopt(handle, CURLOPT_MAXREDIRS, 4);
904  }
905 }
906 
907 
912 void DownloadManager::SetUrlOptions(JobInfo *info) {
913  CURL *curl_handle = info->curl_handle;
914  string url_prefix;
915 
916  MutexLockGuard m(lock_options_);
917  // Check if proxy group needs to be reset from backup to primary
918  if (opt_timestamp_backup_proxies_ > 0) {
919  const time_t now = time(NULL);
920  if (static_cast<int64_t>(now) >
921  static_cast<int64_t>(opt_timestamp_backup_proxies_ +
922  opt_proxy_groups_reset_after_))
923  {
924  opt_proxy_groups_current_ = 0;
925  opt_timestamp_backup_proxies_ = 0;
926  RebalanceProxiesUnlocked("reset proxy group");
927  }
928  }
929  // Check if load-balanced proxies within the group need to be reset
930  if (opt_timestamp_failover_proxies_ > 0) {
931  const time_t now = time(NULL);
932  if (static_cast<int64_t>(now) >
933  static_cast<int64_t>(opt_timestamp_failover_proxies_ +
934  opt_proxy_groups_reset_after_))
935  {
936  RebalanceProxiesUnlocked("reset load-balanced proxies");
937  }
938  }
939  // Check if host needs to be reset
940  if (opt_timestamp_backup_host_ > 0) {
941  const time_t now = time(NULL);
942  if (static_cast<int64_t>(now) >
943  static_cast<int64_t>(opt_timestamp_backup_host_ +
944  opt_host_reset_after_))
945  {
947  "switching host from %s to %s (reset host)",
948  (*opt_host_chain_)[opt_host_chain_current_].c_str(),
949  (*opt_host_chain_)[0].c_str());
950  opt_host_chain_current_ = 0;
951  opt_timestamp_backup_host_ = 0;
952  }
953  }
954 
955  ProxyInfo *proxy = ChooseProxyUnlocked(info->expected_hash);
956  if (!proxy || (proxy->url == "DIRECT")) {
957  info->proxy = "DIRECT";
958  curl_easy_setopt(info->curl_handle, CURLOPT_PROXY, "");
959  } else {
960  // Note: inside ValidateProxyIpsUnlocked() we may change the proxy data
961  // structure, so we must not pass proxy->... (== current_proxy())
962  // parameters directly
963  std::string purl = proxy->url;
964  dns::Host phost = proxy->host;
965  const bool changed = ValidateProxyIpsUnlocked(purl, phost);
966  // Current proxy may have changed
967  if (changed)
968  proxy = ChooseProxyUnlocked(info->expected_hash);
969  info->proxy = proxy->url;
970  if (proxy->host.status() == dns::kFailOk) {
971  curl_easy_setopt(info->curl_handle, CURLOPT_PROXY, info->proxy.c_str());
972  } else {
973  // We know it can't work, don't even try to download
974  curl_easy_setopt(info->curl_handle, CURLOPT_PROXY, "0.0.0.0");
975  }
976  }
977  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, opt_low_speed_limit_);
978  if (info->proxy != "DIRECT") {
979  curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_proxy_);
980  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_proxy_);
981  } else {
982  curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_direct_);
983  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_direct_);
984  }
985  if (!opt_dns_server_.empty())
986  curl_easy_setopt(curl_handle, CURLOPT_DNS_SERVERS, opt_dns_server_.c_str());
987 
988  if (info->probe_hosts && opt_host_chain_) {
989  url_prefix = (*opt_host_chain_)[opt_host_chain_current_];
990  info->current_host_chain_index = opt_host_chain_current_;
991  }
992 
993  string url = url_prefix + *(info->url);
994 
995  curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 1L);
996  if (url.substr(0, 5) == "https") {
997  bool rvb = ssl_certificate_store_.ApplySslCertificatePath(curl_handle);
998  if (!rvb) {
1000  "Failed to set SSL certificate path %s",
1001  ssl_certificate_store_.GetCaPath().c_str());
1002  }
1003  if (info->pid != -1) {
1004  if (credentials_attachment_ == NULL) {
1006  "uses secure downloads but no credentials attachment set");
1007  } else {
1008  bool retval = credentials_attachment_->ConfigureCurlHandle(
1009  curl_handle, info->pid, &info->cred_data);
1010  if (!retval) {
1011  LogCvmfs(kLogDownload, kLogDebug, "failed attaching credentials");
1012  }
1013  }
1014  }
1015  // The download manager disables signal handling in the curl library;
1016  // as OpenSSL's implementation of TLS will generate a sigpipe in some
1017  // error paths, we must explicitly disable SIGPIPE here.
1018  // TODO(jblomer): it should be enough to do this once
1019  signal(SIGPIPE, SIG_IGN);
1020  }
1021 
1022  if (url.find("@proxy@") != string::npos) {
1023  // This is used in Geo-API requests (only), to replace a portion of the
1024  // URL with the current proxy name for the sake of caching the result.
1025  // Replace the @proxy@ either with a passed in "forced" template (which
1026  // is set from $CVMFS_PROXY_TEMPLATE) if there is one, or a "direct"
1027  // template (which is the uuid) if there's no proxy, or the name of the
1028  // proxy.
1029  string replacement;
1030  if (proxy_template_forced_ != "") {
1031  replacement = proxy_template_forced_;
1032  } else if (info->proxy == "DIRECT") {
1033  replacement = proxy_template_direct_;
1034  } else {
1035  if (opt_proxy_groups_current_ >= opt_proxy_groups_fallback_) {
1036  // It doesn't make sense to use the fallback proxies in Geo-API requests
1037  // since the fallback proxies are supposed to get sorted, too.
1038  info->proxy = "DIRECT";
1039  curl_easy_setopt(info->curl_handle, CURLOPT_PROXY, "");
1040  replacement = proxy_template_direct_;
1041  } else {
1042  replacement = ChooseProxyUnlocked(info->expected_hash)->host.name();
1043  }
1044  }
1045  replacement = (replacement == "") ? proxy_template_direct_ : replacement;
1046  LogCvmfs(kLogDownload, kLogDebug, "replacing @proxy@ by %s",
1047  replacement.c_str());
1048  url = ReplaceAll(url, "@proxy@", replacement);
1049  }
1050 
1051  if ((info->destination == kDestinationMem) &&
1052  (info->destination_mem.size == 0) &&
1053  HasPrefix(url, "file://", false))
1054  {
1055  info->destination_mem.size = 64*1024;
1056  info->destination_mem.data = static_cast<char *>(smalloc(64*1024));
1057  }
1058 
1059  curl_easy_setopt(curl_handle, CURLOPT_URL, EscapeUrl(url).c_str());
1060 }
1061 
1062 
1072 bool DownloadManager::ValidateProxyIpsUnlocked(
1073  const string &url,
1074  const dns::Host &host)
1075 {
1076  if (!host.IsExpired())
1077  return false;
1078  LogCvmfs(kLogDownload, kLogDebug, "validate DNS entry for %s",
1079  host.name().c_str());
1080 
1081  unsigned group_idx = opt_proxy_groups_current_;
1082  dns::Host new_host = resolver_->Resolve(host.name());
1083 
1084  bool update_only = true; // No changes to the list of IP addresses.
1085  if (new_host.status() != dns::kFailOk) {
1086  // Try again later in case resolving fails.
1088  "failed to resolve IP addresses for %s (%d - %s)",
1089  host.name().c_str(), new_host.status(),
1090  dns::Code2Ascii(new_host.status()));
1091  new_host = dns::Host::ExtendDeadline(host, resolver_->min_ttl());
1092  } else if (!host.IsEquivalent(new_host)) {
1093  update_only = false;
1094  }
1095 
1096  if (update_only) {
1097  for (unsigned i = 0; i < (*opt_proxy_groups_)[group_idx].size(); ++i) {
1098  if ((*opt_proxy_groups_)[group_idx][i].host.id() == host.id())
1099  (*opt_proxy_groups_)[group_idx][i].host = new_host;
1100  }
1101  return false;
1102  }
1103 
1104  assert(new_host.status() == dns::kFailOk);
1105 
1106  // Remove old host objects, insert new objects, and rebalance.
1108  "DNS entries for proxy %s changed, adjusting", host.name().c_str());
1109  vector<ProxyInfo> *group = current_proxy_group();
1110  opt_num_proxies_ -= group->size();
1111  for (unsigned i = 0; i < group->size(); ) {
1112  if ((*group)[i].host.id() == host.id()) {
1113  group->erase(group->begin() + i);
1114  } else {
1115  i++;
1116  }
1117  }
1118  vector<ProxyInfo> new_infos;
1119  set<string> best_addresses = new_host.ViewBestAddresses(opt_ip_preference_);
1120  set<string>::const_iterator iter_ips = best_addresses.begin();
1121  for (; iter_ips != best_addresses.end(); ++iter_ips) {
1122  string url_ip = dns::RewriteUrl(url, *iter_ips);
1123  new_infos.push_back(ProxyInfo(new_host, url_ip));
1124  }
1125  group->insert(group->end(), new_infos.begin(), new_infos.end());
1126  opt_num_proxies_ += new_infos.size();
1127 
1128  RebalanceProxiesUnlocked("DNS change");
1129  return true;
1130 }
1131 
1132 
1136 void DownloadManager::UpdateStatistics(CURL *handle) {
1137  double val;
1138  int retval;
1139  int64_t sum = 0;
1140 
1141  retval = curl_easy_getinfo(handle, CURLINFO_SIZE_DOWNLOAD, &val);
1142  assert(retval == CURLE_OK);
1143  sum += static_cast<int64_t>(val);
1144  /*retval = curl_easy_getinfo(handle, CURLINFO_HEADER_SIZE, &val);
1145  assert(retval == CURLE_OK);
1146  sum += static_cast<int64_t>(val);*/
1147  perf::Xadd(counters_->sz_transferred_bytes, sum);
1148 }
1149 
1150 
1154 bool DownloadManager::CanRetry(const JobInfo *info) {
1155  MutexLockGuard m(lock_options_);
1156  unsigned max_retries = opt_max_retries_;
1157 
1158  return !info->nocache && (info->num_retries < max_retries) &&
1159  (IsProxyTransferError(info->error_code) ||
1161 }
1162 
1170 void DownloadManager::Backoff(JobInfo *info) {
1171  unsigned backoff_init_ms = 0;
1172  unsigned backoff_max_ms = 0;
1173  {
1174  MutexLockGuard m(lock_options_);
1175  backoff_init_ms = opt_backoff_init_ms_;
1176  backoff_max_ms = opt_backoff_max_ms_;
1177  }
1178 
1179  info->num_retries++;
1180  perf::Inc(counters_->n_retries);
1181  if (info->backoff_ms == 0) {
1182  info->backoff_ms = prng_.Next(backoff_init_ms + 1); // Must be != 0
1183  } else {
1184  info->backoff_ms *= 2;
1185  }
1186  if (info->backoff_ms > backoff_max_ms) info->backoff_ms = backoff_max_ms;
1187 
1188  LogCvmfs(kLogDownload, kLogDebug, "backing off for %d ms", info->backoff_ms);
1189  SafeSleepMs(info->backoff_ms);
1190 }
1191 
1192 void DownloadManager::SetNocache(JobInfo *info) {
1193  if (info->nocache)
1194  return;
1195  header_lists_->AppendHeader(info->headers, "Pragma: no-cache");
1196  header_lists_->AppendHeader(info->headers, "Cache-Control: no-cache");
1197  curl_easy_setopt(info->curl_handle, CURLOPT_HTTPHEADER, info->headers);
1198  info->nocache = true;
1199 }
1200 
1201 
1206 void DownloadManager::SetRegularCache(JobInfo *info) {
1207  if (info->nocache == false)
1208  return;
1209  header_lists_->CutHeader("Pragma: no-cache", &(info->headers));
1210  header_lists_->CutHeader("Cache-Control: no-cache", &(info->headers));
1211  curl_easy_setopt(info->curl_handle, CURLOPT_HTTPHEADER, info->headers);
1212  info->nocache = false;
1213 }
1214 
1215 
1219 void DownloadManager::ReleaseCredential(JobInfo *info) {
1220  if (info->cred_data) {
1221  assert(credentials_attachment_ != NULL); // Someone must have set it
1222  credentials_attachment_->ReleaseCurlHandle(info->curl_handle,
1223  info->cred_data);
1224  info->cred_data = NULL;
1225  }
1226 }
1227 
1228 
1235 bool DownloadManager::VerifyAndFinalize(const int curl_error, JobInfo *info) {
1237  "Verify downloaded url %s, proxy %s (curl error %d)",
1238  info->url->c_str(), info->proxy.c_str(), curl_error);
1239  UpdateStatistics(info->curl_handle);
1240 
1241  // Verification and error classification
1242  switch (curl_error) {
1243  case CURLE_OK:
1244  // Verify content hash
1245  if (info->expected_hash) {
1246  shash::Any match_hash;
1247  shash::Final(info->hash_context, &match_hash);
1248  if (match_hash != *(info->expected_hash)) {
1250  "hash verification of %s failed (expected %s, got %s)",
1251  info->url->c_str(), info->expected_hash->ToString().c_str(),
1252  match_hash.ToString().c_str());
1253  info->error_code = kFailBadData;
1254  break;
1255  }
1256  }
1257 
1258  // Decompress memory in a single run
1259  if ((info->destination == kDestinationMem) && info->compressed) {
1260  void *buf;
1261  uint64_t size;
1262  bool retval = zlib::DecompressMem2Mem(
1263  info->destination_mem.data,
1264  static_cast<int64_t>(info->destination_mem.pos),
1265  &buf, &size);
1266  if (retval) {
1267  free(info->destination_mem.data);
1268  info->destination_mem.data = static_cast<char *>(buf);
1269  info->destination_mem.pos = info->destination_mem.size = size;
1270  } else {
1272  "decompression (memory) of url %s failed",
1273  info->url->c_str());
1274  info->error_code = kFailBadData;
1275  break;
1276  }
1277  }
1278 
1279  info->error_code = kFailOk;
1280  break;
1281  case CURLE_UNSUPPORTED_PROTOCOL:
1283  break;
1284  case CURLE_URL_MALFORMAT:
1285  info->error_code = kFailBadUrl;
1286  break;
1287  case CURLE_COULDNT_RESOLVE_PROXY:
1288  info->error_code = kFailProxyResolve;
1289  break;
1290  case CURLE_COULDNT_RESOLVE_HOST:
1291  info->error_code = kFailHostResolve;
1292  break;
1293  case CURLE_OPERATION_TIMEDOUT:
1294  info->error_code = (info->proxy == "DIRECT") ?
1296  break;
1297  case CURLE_PARTIAL_FILE:
1298  case CURLE_GOT_NOTHING:
1299  case CURLE_RECV_ERROR:
1300  info->error_code = (info->proxy == "DIRECT") ?
1302  break;
1303  case CURLE_FILE_COULDNT_READ_FILE:
1304  case CURLE_COULDNT_CONNECT:
1305  if (info->proxy != "DIRECT") {
1306  // This is a guess. Fail-over can still change to switching host
1308  } else {
1310  }
1311  break;
1312  case CURLE_TOO_MANY_REDIRECTS:
1314  break;
1315  case CURLE_SSL_CACERT_BADFILE:
1317  "Failed to load certificate bundle. "
1318  "X509_CERT_BUNDLE might point to the wrong location.");
1320  break;
1321  // As of curl 7.62.0, CURLE_SSL_CACERT is the same as
1322  // CURLE_PEER_FAILED_VERIFICATION
1323  case CURLE_PEER_FAILED_VERIFICATION:
1325  "invalid SSL certificate of remote host. "
1326  "X509_CERT_DIR and/or X509_CERT_BUNDLE might point to the wrong "
1327  "location.");
1329  break;
1330  case CURLE_ABORTED_BY_CALLBACK:
1331  case CURLE_WRITE_ERROR:
1332  // Error set by callback
1333  break;
1334  case CURLE_SEND_ERROR:
1335  // The curl error CURLE_SEND_ERROR can be seen when a cache is misbehaving
1336  // and closing connections before the http request send is completed.
1337  // Handle this error, treating it as a short transfer error.
1338  info->error_code = (info->proxy == "DIRECT") ?
1340  break;
1341  default:
1342  LogCvmfs(kLogDownload, kLogSyslogErr, "unexpected curl error (%d) while "
1343  "trying to fetch %s", curl_error, info->url->c_str());
1344  info->error_code = kFailOther;
1345  break;
1346  }
1347 
1348  std::vector<std::string> *host_chain = opt_host_chain_;
1349 
1350  // Determination if download should be repeated
1351  bool try_again = false;
1352  bool same_url_retry = CanRetry(info);
1353  if (info->error_code != kFailOk) {
1354  MutexLockGuard m(lock_options_);
1355  if (info->error_code == kFailBadData) {
1356  if (!info->nocache) {
1357  try_again = true;
1358  } else {
1359  // Make it a host failure
1361  "data corruption with no-cache header, try another host");
1362 
1363  info->error_code = kFailHostHttp;
1364  }
1365  }
1366  if ( same_url_retry || (
1367  ( (info->error_code == kFailHostResolve) ||
1369  (info->error_code == kFailHostHttp)) &&
1370  info->probe_hosts &&
1371  host_chain && (info->num_used_hosts < host_chain->size()))
1372  )
1373  {
1374  try_again = true;
1375  }
1376  if ( same_url_retry || (
1377  ( (info->error_code == kFailProxyResolve) ||
1379  (info->error_code == kFailProxyHttp)) )
1380  )
1381  {
1382  try_again = true;
1383  // If all proxies failed, do a next round with the next host
1384  if (!same_url_retry && (info->num_used_proxies >= opt_num_proxies_)) {
1385  // Check if this can be made a host fail-over
1386  if (info->probe_hosts &&
1387  host_chain &&
1388  (info->num_used_hosts < host_chain->size()))
1389  {
1390  // reset proxy group if not already performed by other handle
1391  if (opt_proxy_groups_) {
1392  if ((opt_proxy_groups_current_ > 0) ||
1393  (opt_proxy_groups_current_burned_ > 0))
1394  {
1395  opt_proxy_groups_current_ = 0;
1396  opt_timestamp_backup_proxies_ = 0;
1397  RebalanceProxiesUnlocked("reset proxies for host failover");
1398  }
1399  }
1400 
1401  // Make it a host failure
1402  LogCvmfs(kLogDownload, kLogDebug, "make it a host failure");
1403  info->num_used_proxies = 1;
1405  } else {
1406  try_again = false;
1407  }
1408  } // Make a proxy failure a host failure
1409  } // Proxy failure assumed
1410  }
1411 
1412  if (try_again) {
1413  LogCvmfs(kLogDownload, kLogDebug, "Trying again on same curl handle, "
1414  "same url: %d, error code %d", same_url_retry, info->error_code);
1415  // Reset internal state and destination
1416  if ((info->destination == kDestinationMem) && info->destination_mem.data) {
1417  free(info->destination_mem.data);
1418  info->destination_mem.data = NULL;
1419  info->destination_mem.size = 0;
1420  info->destination_mem.pos = 0;
1421  }
1422  if (info->interrupt_cue && info->interrupt_cue->IsCanceled()) {
1423  info->error_code = kFailCanceled;
1424  goto verify_and_finalize_stop;
1425  }
1426  if ((info->destination == kDestinationFile) ||
1427  (info->destination == kDestinationPath))
1428  {
1429  if ((fflush(info->destination_file) != 0) ||
1430  (ftruncate(fileno(info->destination_file), 0) != 0) ||
1431  (freopen(NULL, "w", info->destination_file) != info->destination_file)
1432  )
1433  {
1434  info->error_code = kFailLocalIO;
1435  goto verify_and_finalize_stop;
1436  }
1437  }
1438  if (info->destination == kDestinationSink) {
1439  if (info->destination_sink->Reset() != 0) {
1440  info->error_code = kFailLocalIO;
1441  goto verify_and_finalize_stop;
1442  }
1443  }
1444  if (info->expected_hash)
1445  shash::Init(info->hash_context);
1446  if (info->compressed)
1447  zlib::DecompressInit(&info->zstream);
1448  SetRegularCache(info);
1449 
1450  // Failure handling
1451  bool switch_proxy = false;
1452  bool switch_host = false;
1453  switch (info->error_code) {
1454  case kFailBadData:
1455  SetNocache(info);
1456  break;
1457  case kFailProxyResolve:
1458  case kFailProxyHttp:
1459  switch_proxy = true;
1460  break;
1461  case kFailHostResolve:
1462  case kFailHostHttp:
1463  case kFailHostAfterProxy:
1464  switch_host = true;
1465  break;
1466  default:
1467  if (IsProxyTransferError(info->error_code)) {
1468  if (same_url_retry) {
1469  Backoff(info);
1470  } else {
1471  switch_proxy = true;
1472  }
1473  } else if (IsHostTransferError(info->error_code)) {
1474  if (same_url_retry) {
1475  Backoff(info);
1476  } else {
1477  switch_host = true;
1478  }
1479  } else {
1480  // No other errors expected when retrying
1481  PANIC(NULL);
1482  }
1483  }
1484  if (switch_proxy) {
1485  ReleaseCredential(info);
1486  SwitchProxy(info);
1487  info->num_used_proxies++;
1488  SetUrlOptions(info);
1489  }
1490  if (switch_host) {
1491  ReleaseCredential(info);
1492  SwitchHost(info);
1493  info->num_used_hosts++;
1494  SetUrlOptions(info);
1495  }
1496 
1497  return true; // try again
1498  }
1499 
1500  verify_and_finalize_stop:
1501  // Finalize, flush destination file
1502  ReleaseCredential(info);
1503  if ((info->destination == kDestinationFile) &&
1504  fflush(info->destination_file) != 0)
1505  {
1506  info->error_code = kFailLocalIO;
1507  } else if (info->destination == kDestinationPath) {
1508  if (fclose(info->destination_file) != 0)
1509  info->error_code = kFailLocalIO;
1510  info->destination_file = NULL;
1511  }
1512 
1513  if (info->compressed)
1514  zlib::DecompressFini(&info->zstream);
1515 
1516  if (info->headers) {
1517  header_lists_->PutList(info->headers);
1518  info->headers = NULL;
1519  }
1520 
1521  return false; // stop transfer and return to Fetch()
1522 }
1523 
1524 
1525 DownloadManager::DownloadManager() {
1526  pool_handles_idle_ = NULL;
1527  pool_handles_inuse_ = NULL;
1528  pool_max_handles_ = 0;
1529  curl_multi_ = NULL;
1530  default_headers_ = NULL;
1531 
1532  atomic_init32(&multi_threaded_);
1533  pipe_terminate_ = NULL;
1534 
1535  pipe_jobs_ = NULL;
1536  watch_fds_ = NULL;
1537  watch_fds_size_ = 0;
1538  watch_fds_inuse_ = 0;
1539  watch_fds_max_ = 0;
1540 
1541  lock_options_ =
1542  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1543  int retval = pthread_mutex_init(lock_options_, NULL);
1544  assert(retval == 0);
1545  lock_synchronous_mode_ =
1546  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1547  retval = pthread_mutex_init(lock_synchronous_mode_, NULL);
1548  assert(retval == 0);
1549 
1550  opt_dns_server_ = "";
1551  opt_ip_preference_ = dns::kIpPreferSystem;
1552  opt_timeout_proxy_ = 0;
1553  opt_timeout_direct_ = 0;
1554  opt_low_speed_limit_ = 0;
1555  opt_host_chain_ = NULL;
1556  opt_host_chain_rtt_ = NULL;
1557  opt_host_chain_current_ = 0;
1558  opt_proxy_groups_ = NULL;
1559  opt_proxy_groups_current_ = 0;
1560  opt_proxy_groups_current_burned_ = 0;
1561  opt_num_proxies_ = 0;
1562  opt_proxy_shard_ = false;
1563  opt_max_retries_ = 0;
1564  opt_backoff_init_ms_ = 0;
1565  opt_backoff_max_ms_ = 0;
1566  enable_info_header_ = false;
1567  opt_ipv4_only_ = false;
1568  follow_redirects_ = false;
1569 
1570  resolver_ = NULL;
1571 
1572  opt_timestamp_backup_proxies_ = 0;
1573  opt_timestamp_failover_proxies_ = 0;
1574  opt_proxy_groups_reset_after_ = 0;
1575  opt_timestamp_backup_host_ = 0;
1576  opt_host_reset_after_ = 0;
1577 
1578  credentials_attachment_ = NULL;
1579 
1580  counters_ = NULL;
1581 }
1582 
1583 
1584 DownloadManager::~DownloadManager() {
1585  pthread_mutex_destroy(lock_options_);
1586  pthread_mutex_destroy(lock_synchronous_mode_);
1587  free(lock_options_);
1588  free(lock_synchronous_mode_);
1589 }
1590 
1591 void DownloadManager::InitHeaders() {
1592  // User-Agent
1593  string cernvm_id = "User-Agent: cvmfs ";
1594 #ifdef CVMFS_LIBCVMFS
1595  cernvm_id += "libcvmfs ";
1596 #else
1597  cernvm_id += "Fuse ";
1598 #endif
1599  cernvm_id += string(VERSION);
1600  if (getenv("CERNVM_UUID") != NULL) {
1601  cernvm_id += " " +
1602  sanitizer::InputSanitizer("az AZ 09 -").Filter(getenv("CERNVM_UUID"));
1603  }
1604  user_agent_ = strdup(cernvm_id.c_str());
1605 
1606  header_lists_ = new HeaderLists();
1607 
1608  default_headers_ = header_lists_->GetList("Connection: Keep-Alive");
1609  header_lists_->AppendHeader(default_headers_, "Pragma:");
1610  header_lists_->AppendHeader(default_headers_, user_agent_);
1611 }
1612 
1613 
1614 void DownloadManager::FiniHeaders() {
1615  delete header_lists_;
1616  header_lists_ = NULL;
1617  default_headers_ = NULL;
1618 }
1619 
1620 
1621 void DownloadManager::Init(const unsigned max_pool_handles,
1622  const perf::StatisticsTemplate &statistics)
1623 {
1624  atomic_init32(&multi_threaded_);
1625  int retval = curl_global_init(CURL_GLOBAL_ALL);
1626  assert(retval == CURLE_OK);
1627  pool_handles_idle_ = new set<CURL *>;
1628  pool_handles_inuse_ = new set<CURL *>;
1629  pool_max_handles_ = max_pool_handles;
1630  watch_fds_max_ = 4*pool_max_handles_;
1631 
1632  opt_timeout_proxy_ = 5;
1633  opt_timeout_direct_ = 10;
1634  opt_low_speed_limit_ = 1024;
1635  opt_proxy_groups_current_ = 0;
1636  opt_proxy_groups_current_burned_ = 0;
1637  opt_num_proxies_ = 0;
1638  opt_proxy_shard_ = false;
1639  opt_host_chain_current_ = 0;
1640  opt_ip_preference_ = dns::kIpPreferSystem;
1641 
1642  counters_ = new Counters(statistics);
1643 
1644  user_agent_ = NULL;
1645  InitHeaders();
1646 
1647  curl_multi_ = curl_multi_init();
1648  assert(curl_multi_ != NULL);
1649  curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION, CallbackCurlSocket);
1650  curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1651  static_cast<void *>(this));
1652  curl_multi_setopt(curl_multi_, CURLMOPT_MAXCONNECTS, watch_fds_max_);
1653  curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1654  pool_max_handles_);
1655 
1656  prng_.InitLocaltime();
1657 
1658  // Name resolving
1659  if ((getenv("CVMFS_IPV4_ONLY") != NULL) &&
1660  (strlen(getenv("CVMFS_IPV4_ONLY")) > 0))
1661  {
1662  opt_ipv4_only_ = true;
1663  }
1664  resolver_ = dns::NormalResolver::Create(opt_ipv4_only_,
1665  kDnsDefaultRetries, kDnsDefaultTimeoutMs);
1666  assert(resolver_);
1667 }
1668 
1669 
1671  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1672  // Shutdown I/O thread
1673  pipe_terminate_->Write(kPipeTerminateSignal);
1674  pthread_join(thread_download_, NULL);
1675  // All handles are removed from the multi stack
1676  pipe_terminate_.Destroy();
1677  pipe_jobs_.Destroy();
1678  }
1679 
1680  for (set<CURL *>::iterator i = pool_handles_idle_->begin(),
1681  iEnd = pool_handles_idle_->end(); i != iEnd; ++i)
1682  {
1683  curl_easy_cleanup(*i);
1684  }
1685  delete pool_handles_idle_;
1686  delete pool_handles_inuse_;
1687  curl_multi_cleanup(curl_multi_);
1688  pool_handles_idle_ = NULL;
1689  pool_handles_inuse_ = NULL;
1690  curl_multi_ = NULL;
1691 
1692  FiniHeaders();
1693  if (user_agent_)
1694  free(user_agent_);
1695  user_agent_ = NULL;
1696 
1697  delete counters_;
1698  counters_ = NULL;
1699 
1700  delete opt_host_chain_;
1701  delete opt_host_chain_rtt_;
1702  opt_proxy_map_.clear();
1703  delete opt_proxy_groups_;
1704  opt_host_chain_ = NULL;
1705  opt_host_chain_rtt_ = NULL;
1706  opt_proxy_groups_ = NULL;
1707 
1708  curl_global_cleanup();
1709 
1710  delete resolver_;
1711  resolver_ = NULL;
1712 }
1713 
1714 
1720  pipe_terminate_ = new Pipe<kPipeThreadTerminator>();
1721  pipe_jobs_ = new Pipe<kPipeDownloadJobs>();
1722 
1723  int retval = pthread_create(&thread_download_, NULL, MainDownload,
1724  static_cast<void *>(this));
1725  assert(retval == 0);
1726 
1727  atomic_inc32(&multi_threaded_);
1728 }
1729 
1730 
1735  assert(info != NULL);
1736  assert(info->url != NULL);
1737 
1738  Failures result;
1739  result = PrepareDownloadDestination(info);
1740  if (result != kFailOk)
1741  return result;
1742 
1743  if (info->expected_hash) {
1746  info->hash_context.size = shash::GetContextSize(algorithm);
1747  info->hash_context.buffer = alloca(info->hash_context.size);
1748  }
1749 
1750  // Prepare cvmfs-info: header, allocate string on the stack
1751  info->info_header = NULL;
1752  if (enable_info_header_ && info->extra_info) {
1753  const char *header_name = "cvmfs-info: ";
1754  const size_t header_name_len = strlen(header_name);
1755  const unsigned header_size = 1 + header_name_len +
1756  EscapeHeader(*(info->extra_info), NULL, 0);
1757  info->info_header = static_cast<char *>(alloca(header_size));
1758  memcpy(info->info_header, header_name, header_name_len);
1759  EscapeHeader(*(info->extra_info), info->info_header + header_name_len,
1760  header_size - header_name_len);
1761  info->info_header[header_size-1] = '\0';
1762  }
1763 
1764  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1765  if (!info->pipe_job_results.IsValid()) {
1767  }
1768 
1769  // LogCvmfs(kLogDownload, kLogDebug, "send job to thread, pipe %d %d",
1770  // info->wait_at[0], info->wait_at[1]);
1771  // NOLINTNEXTLINE(bugprone-sizeof-expression)
1772  pipe_jobs_->Write<JobInfo*>(info);
1773  info->pipe_job_results->Read<download::Failures>(&result);
1774  // LogCvmfs(kLogDownload, kLogDebug, "got result %d", result);
1775  } else {
1776  MutexLockGuard l(lock_synchronous_mode_);
1777  CURL *handle = AcquireCurlHandle();
1778  InitializeRequest(info, handle);
1779  SetUrlOptions(info);
1780  // curl_easy_setopt(handle, CURLOPT_VERBOSE, 1);
1781  int retval;
1782  do {
1783  retval = curl_easy_perform(handle);
1784  perf::Inc(counters_->n_requests);
1785  double elapsed;
1786  if (curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &elapsed) == CURLE_OK)
1787  {
1788  perf::Xadd(counters_->sz_transfer_time,
1789  static_cast<int64_t>(elapsed * 1000));
1790  }
1791  } while (VerifyAndFinalize(retval, info));
1792  result = info->error_code;
1793  ReleaseCurlHandle(info->curl_handle);
1794  }
1795 
1796  if (result != kFailOk) {
1797  LogCvmfs(kLogDownload, kLogDebug, "download failed (error %d - %s)", result,
1798  Code2Ascii(result));
1799 
1800  if (info->destination == kDestinationPath)
1801  unlink(info->destination_path->c_str());
1802 
1803  if (info->destination_mem.data) {
1804  free(info->destination_mem.data);
1805  info->destination_mem.data = NULL;
1806  info->destination_mem.size = 0;
1807  }
1808  }
1809 
1810  return result;
1811 }
1812 
1813 
1818 void DownloadManager::SetCredentialsAttachment(CredentialsAttachment *ca) {
1819  MutexLockGuard m(lock_options_);
1820  credentials_attachment_ = ca;
1821 }
1822 
1826 std::string DownloadManager::GetDnsServer() const {
1827  return opt_dns_server_;
1828 }
1829 
1834 void DownloadManager::SetDnsServer(const string &address) {
1835  if (!address.empty()) {
1836  MutexLockGuard m(lock_options_);
1837  opt_dns_server_ = address;
1838  assert(!opt_dns_server_.empty());
1839 
1840  vector<string> servers;
1841  servers.push_back(address);
1842  bool retval = resolver_->SetResolvers(servers);
1843  assert(retval);
1844  }
1845  LogCvmfs(kLogDownload, kLogSyslog, "set nameserver to %s", address.c_str());
1846 }
1847 
1848 
1852 void DownloadManager::SetDnsParameters(
1853  const unsigned retries,
1854  const unsigned timeout_ms)
1855 {
1856  MutexLockGuard m(lock_options_);
1857  if ((resolver_->retries() == retries) &&
1858  (resolver_->timeout_ms() == timeout_ms))
1859  {
1860  return;
1861  }
1862  delete resolver_;
1863  resolver_ = NULL;
1864  resolver_ =
1865  dns::NormalResolver::Create(opt_ipv4_only_, retries, timeout_ms);
1866  assert(resolver_);
1867 }
1868 
1869 
1870 void DownloadManager::SetDnsTtlLimits(
1871  const unsigned min_seconds,
1872  const unsigned max_seconds)
1873 {
1874  MutexLockGuard m(lock_options_);
1875  resolver_->set_min_ttl(min_seconds);
1876  resolver_->set_max_ttl(max_seconds);
1877 }
1878 
1879 
1880 void DownloadManager::SetIpPreference(dns::IpPreference preference) {
1881  MutexLockGuard m(lock_options_);
1882  opt_ip_preference_ = preference;
1883 }
1884 
1885 
1891 void DownloadManager::SetTimeout(const unsigned seconds_proxy,
1892  const unsigned seconds_direct)
1893 {
1894  MutexLockGuard m(lock_options_);
1895  opt_timeout_proxy_ = seconds_proxy;
1896  opt_timeout_direct_ = seconds_direct;
1897 }
1898 
1899 
1905 void DownloadManager::SetLowSpeedLimit(const unsigned low_speed_limit) {
1906  MutexLockGuard m(lock_options_);
1907  opt_low_speed_limit_ = low_speed_limit;
1908 }
1909 
1910 
1914 void DownloadManager::GetTimeout(unsigned *seconds_proxy,
1915  unsigned *seconds_direct)
1916 {
1917  MutexLockGuard m(lock_options_);
1918  *seconds_proxy = opt_timeout_proxy_;
1919  *seconds_direct = opt_timeout_direct_;
1920 }
1921 
1922 
1927 void DownloadManager::SetHostChain(const string &host_list) {
1928  SetHostChain(SplitString(host_list, ';'));
1929 }
1930 
1931 
1932 void DownloadManager::SetHostChain(const std::vector<std::string> &host_list) {
1933  MutexLockGuard m(lock_options_);
1934  opt_timestamp_backup_host_ = 0;
1935  delete opt_host_chain_;
1936  delete opt_host_chain_rtt_;
1937  opt_host_chain_current_ = 0;
1938 
1939  if (host_list.empty()) {
1940  opt_host_chain_ = NULL;
1941  opt_host_chain_rtt_ = NULL;
1942  return;
1943  }
1944 
1945  opt_host_chain_ = new vector<string>(host_list);
1946  opt_host_chain_rtt_ =
1947  new vector<int>(opt_host_chain_->size(), kProbeUnprobed);
1948  // LogCvmfs(kLogDownload, kLogSyslog, "using host %s",
1949  // (*opt_host_chain_)[0].c_str());
1950 }
1951 
1952 
1953 
1958 void DownloadManager::GetHostInfo(vector<string> *host_chain, vector<int> *rtt,
1959  unsigned *current_host)
1960 {
1961  MutexLockGuard m(lock_options_);
1962  if (opt_host_chain_) {
1963  if (current_host) {*current_host = opt_host_chain_current_;}
1964  if (host_chain) {*host_chain = *opt_host_chain_;}
1965  if (rtt) {*rtt = *opt_host_chain_rtt_;}
1966  }
1967 }
1968 
1969 
1978 void DownloadManager::SwitchProxy(JobInfo *info) {
1979  MutexLockGuard m(lock_options_);
1980 
1981  if (!opt_proxy_groups_) {
1982  return;
1983  }
1984 
1985  // Fail any matching proxies within the current load-balancing group
1986  vector<ProxyInfo> *group = current_proxy_group();
1987  const unsigned group_size = group->size();
1988  unsigned failed = 0;
1989  for (unsigned i = 0; i < group_size - opt_proxy_groups_current_burned_; ++i) {
1990  if (info && (info->proxy == (*group)[i].url)) {
1991  // Move to list of failed proxies
1992  opt_proxy_groups_current_burned_++;
1993  swap((*group)[i],
1994  (*group)[group_size - opt_proxy_groups_current_burned_]);
1995  perf::Inc(counters_->n_proxy_failover);
1996  failed++;
1997  }
1998  }
1999 
2000  // Do nothing more unless at least one proxy was marked as failed
2001  if (!failed)
2002  return;
2003 
2004  // If all proxies from the current load-balancing group are burned, switch to
2005  // another group
2006  if (opt_proxy_groups_current_burned_ == group->size()) {
2007  opt_proxy_groups_current_burned_ = 0;
2008  if (opt_proxy_groups_->size() > 1) {
2009  opt_proxy_groups_current_ = (opt_proxy_groups_current_ + 1) %
2010  opt_proxy_groups_->size();
2011  // Remember the timestamp of switching to backup proxies
2012  if (opt_proxy_groups_reset_after_ > 0) {
2013  if (opt_proxy_groups_current_ > 0) {
2014  if (opt_timestamp_backup_proxies_ == 0)
2015  opt_timestamp_backup_proxies_ = time(NULL);
2016  // LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2017  // "switched to (another) backup proxy group");
2018  } else {
2019  opt_timestamp_backup_proxies_ = 0;
2020  // LogCvmfs(kLogDownload, kLogDebug | kLogSyslogWarn,
2021  // "switched back to primary proxy group");
2022  }
2023  opt_timestamp_failover_proxies_ = 0;
2024  }
2025  }
2026  } else {
2027  // Record failover time
2028  if (opt_proxy_groups_reset_after_ > 0) {
2029  if (opt_timestamp_failover_proxies_ == 0)
2030  opt_timestamp_failover_proxies_ = time(NULL);
2031  }
2032  }
2033 
2034  UpdateProxiesUnlocked("failed proxy");
2035  LogCvmfs(kLogDownload, kLogDebug, "%d proxies remain in group",
2036  current_proxy_group()->size() - opt_proxy_groups_current_burned_);
2037 }
2038 
2039 
2045 void DownloadManager::SwitchHost(JobInfo *info) {
2046  MutexLockGuard m(lock_options_);
2047 
2048  if (!opt_host_chain_ || (opt_host_chain_->size() == 1)) {
2049  return;
2050  }
2051 
2052  if (info && (info->current_host_chain_index != opt_host_chain_current_)) {
2054  "don't switch host, "
2055  "last used host: %s, current host: %s",
2056  (*opt_host_chain_)[info->current_host_chain_index].c_str(),
2057  (*opt_host_chain_)[opt_host_chain_current_].c_str());
2058  return;
2059  }
2060 
2061  string reason = "manually triggered";
2062  if (info) {
2063  reason = download::Code2Ascii(info->error_code);
2064  }
2065 
2066  string old_host = (*opt_host_chain_)[opt_host_chain_current_];
2067  opt_host_chain_current_ =
2068  (opt_host_chain_current_ + 1) % opt_host_chain_->size();
2069  perf::Inc(counters_->n_host_failover);
2071  "switching host from %s to %s (%s)", old_host.c_str(),
2072  (*opt_host_chain_)[opt_host_chain_current_].c_str(),
2073  reason.c_str());
2074 
2075  // Remember the timestamp of switching to backup host
2076  if (opt_host_reset_after_ > 0) {
2077  if (opt_host_chain_current_ != 0) {
2078  if (opt_timestamp_backup_host_ == 0)
2079  opt_timestamp_backup_host_ = time(NULL);
2080  } else {
2081  opt_timestamp_backup_host_ = 0;
2082  }
2083  }
2084 }
2085 
2086 void DownloadManager::SwitchHost() {
2087  SwitchHost(NULL);
2088 }
2089 
2090 
2097 void DownloadManager::ProbeHosts() {
2098  vector<string> host_chain;
2099  vector<int> host_rtt;
2100  unsigned current_host;
2101 
2102  GetHostInfo(&host_chain, &host_rtt, &current_host);
2103 
2104  // Stopwatch, two times to fill caches first
2105  unsigned i, retries;
2106  string url;
2107  JobInfo info(&url, false, false, NULL);
2108  for (retries = 0; retries < 2; ++retries) {
2109  for (i = 0; i < host_chain.size(); ++i) {
2110  url = host_chain[i] + "/.cvmfspublished";
2111 
2112  struct timeval tv_start, tv_end;
2113  gettimeofday(&tv_start, NULL);
2114  Failures result = Fetch(&info);
2115  gettimeofday(&tv_end, NULL);
2116  if (info.destination_mem.data)
2117  free(info.destination_mem.data);
2118  if (result == kFailOk) {
2119  host_rtt[i] = static_cast<int>(
2120  DiffTimeSeconds(tv_start, tv_end) * 1000);
2121  LogCvmfs(kLogDownload, kLogDebug, "probing host %s had %dms rtt",
2122  url.c_str(), host_rtt[i]);
2123  } else {
2124  LogCvmfs(kLogDownload, kLogDebug, "error while probing host %s: %d %s",
2125  url.c_str(), result, Code2Ascii(result));
2126  host_rtt[i] = INT_MAX;
2127  }
2128  }
2129  }
2130 
2131  SortTeam(&host_rtt, &host_chain);
2132  for (i = 0; i < host_chain.size(); ++i) {
2133  if (host_rtt[i] == INT_MAX) host_rtt[i] = kProbeDown;
2134  }
2135 
2136  MutexLockGuard m(lock_options_);
2137  delete opt_host_chain_;
2138  delete opt_host_chain_rtt_;
2139  opt_host_chain_ = new vector<string>(host_chain);
2140  opt_host_chain_rtt_ = new vector<int>(host_rtt);
2141  opt_host_chain_current_ = 0;
2142 }
2143 
2144 bool DownloadManager::GeoSortServers(std::vector<std::string> *servers,
2145  std::vector<uint64_t> *output_order) {
2146  if (!servers) {return false;}
2147  if (servers->size() == 1) {
2148  if (output_order) {
2149  output_order->clear();
2150  output_order->push_back(0);
2151  }
2152  return true;
2153  }
2154 
2155  std::vector<std::string> host_chain;
2156  GetHostInfo(&host_chain, NULL, NULL);
2157 
2158  std::vector<std::string> server_dns_names;
2159  server_dns_names.reserve(servers->size());
2160  for (unsigned i = 0; i < servers->size(); ++i) {
2161  std::string host = dns::ExtractHost((*servers)[i]);
2162  server_dns_names.push_back(host.empty() ? (*servers)[i] : host);
2163  }
2164  std::string host_list = JoinStrings(server_dns_names, ",");
2165 
2166  vector<string> host_chain_shuffled;
2167  {
2168  // Protect against concurrent access to prng_
2169  MutexLockGuard m(lock_options_);
2170  // Determine random hosts for the Geo-API query
2171  host_chain_shuffled = Shuffle(host_chain, &prng_);
2172  }
2173  // Request ordered list via Geo-API
2174  bool success = false;
2175  unsigned max_attempts = std::min(host_chain_shuffled.size(), size_t(3));
2176  vector<uint64_t> geo_order(servers->size());
2177  for (unsigned i = 0; i < max_attempts; ++i) {
2178  string url = host_chain_shuffled[i] + "/api/v1.0/geo/@proxy@/" + host_list;
2180  "requesting ordered server list from %s", url.c_str());
2181  JobInfo info(&url, false, false, NULL);
2182  Failures result = Fetch(&info);
2183  if (result == kFailOk) {
2184  string order(info.destination_mem.data, info.destination_mem.size);
2185  free(info.destination_mem.data);
2186  bool retval = ValidateGeoReply(order, servers->size(), &geo_order);
2187  if (!retval) {
2189  "retrieved invalid GeoAPI reply from %s [%s]",
2190  url.c_str(), order.c_str());
2191  } else {
2193  "geographic order of servers retrieved from %s",
2194  dns::ExtractHost(host_chain_shuffled[i]).c_str());
2195  LogCvmfs(kLogDownload, kLogDebug, "order is %s", order.c_str());
2196  success = true;
2197  break;
2198  }
2199  } else {
2201  "GeoAPI request %s failed with error %d [%s]",
2202  url.c_str(), result, Code2Ascii(result));
2203  }
2204  }
2205  if (!success) {
2207  "failed to retrieve geographic order from stratum 1 servers");
2208  return false;
2209  }
2210 
2211  if (output_order) {
2212  output_order->swap(geo_order);
2213  } else {
2214  std::vector<std::string> sorted_servers;
2215  sorted_servers.reserve(geo_order.size());
2216  for (unsigned i = 0; i < geo_order.size(); ++i) {
2217  uint64_t orderval = geo_order[i];
2218  sorted_servers.push_back((*servers)[orderval]);
2219  }
2220  servers->swap(sorted_servers);
2221  }
2222  return true;
2223 }
2224 
2225 
2233 bool DownloadManager::ProbeGeo() {
2234  vector<string> host_chain;
2235  vector<int> host_rtt;
2236  unsigned current_host;
2237  vector< vector<ProxyInfo> > proxy_chain;
2238  unsigned fallback_group;
2239 
2240  GetHostInfo(&host_chain, &host_rtt, &current_host);
2241  GetProxyInfo(&proxy_chain, NULL, &fallback_group);
2242  if ((host_chain.size() < 2) && ((proxy_chain.size() - fallback_group) < 2))
2243  return true;
2244 
2245  vector<string> host_names;
2246  for (unsigned i = 0; i < host_chain.size(); ++i)
2247  host_names.push_back(dns::ExtractHost(host_chain[i]));
2248  SortTeam(&host_names, &host_chain);
2249  unsigned last_geo_host = host_names.size();
2250 
2251  if ((fallback_group == 0) && (last_geo_host > 1)) {
2252  // There are no non-fallback proxies, which means that the client
2253  // will always use the fallback proxies. Add a keyword separator
2254  // between the hosts and fallback proxies so the geosorting service
2255  // will know to sort the hosts based on the distance from the
2256  // closest fallback proxy rather than the distance from the client.
2257  host_names.push_back("+PXYSEP+");
2258  }
2259 
2260  // Add fallback proxy names to the end of the host list
2261  unsigned first_geo_fallback = host_names.size();
2262  for (unsigned i = fallback_group; i < proxy_chain.size(); ++i) {
2263  // We only take the first fallback proxy name from every group under the
2264  // assumption that load-balanced servers are at the same location
2265  host_names.push_back(proxy_chain[i][0].host.name());
2266  }
2267 
2268  std::vector<uint64_t> geo_order;
2269  bool success = GeoSortServers(&host_names, &geo_order);
2270  if (!success) {
2271  // GeoSortServers already logged a failure message.
2272  return false;
2273  }
2274 
2275  // Re-install host chain and proxy chain
2276  MutexLockGuard m(lock_options_);
2277  delete opt_host_chain_;
2278  opt_num_proxies_ = 0;
2279  opt_host_chain_ = new vector<string>(host_chain.size());
2280 
2281  // It's possible that opt_proxy_groups_fallback_ might have changed while
2282  // the lock wasn't held
2283  vector<vector<ProxyInfo> > *proxy_groups = new vector<vector<ProxyInfo> >(
2284  opt_proxy_groups_fallback_ + proxy_chain.size() - fallback_group);
2285  // First copy the non-fallback part of the current proxy chain
2286  for (unsigned i = 0; i < opt_proxy_groups_fallback_; ++i) {
2287  (*proxy_groups)[i] = (*opt_proxy_groups_)[i];
2288  opt_num_proxies_ += (*opt_proxy_groups_)[i].size();
2289  }
2290 
2291  // Copy the host chain and fallback proxies by geo order. Array indices
2292  // in geo_order that are smaller than last_geo_host refer to a stratum 1,
2293  // and those indices greater than or equal to first_geo_fallback refer to
2294  // a fallback proxy.
2295  unsigned hosti = 0;
2296  unsigned proxyi = opt_proxy_groups_fallback_;
2297  for (unsigned i = 0; i < geo_order.size(); ++i) {
2298  uint64_t orderval = geo_order[i];
2299  if (orderval < static_cast<uint64_t>(last_geo_host)) {
2300  // LogCvmfs(kLogCvmfs, kLogSyslog, "this is orderval %u at host index
2301  // %u", orderval, hosti);
2302  (*opt_host_chain_)[hosti++] = host_chain[orderval];
2303  } else if (orderval >= static_cast<uint64_t>(first_geo_fallback)) {
2304  // LogCvmfs(kLogCvmfs, kLogSyslog,
2305  // "this is orderval %u at proxy index %u, using proxy_chain index %u",
2306  // orderval, proxyi, fallback_group + orderval - first_geo_fallback);
2307  (*proxy_groups)[proxyi] =
2308  proxy_chain[fallback_group + orderval - first_geo_fallback];
2309  opt_num_proxies_ += (*proxy_groups)[proxyi].size();
2310  proxyi++;
2311  }
2312  }
2313 
2314  opt_proxy_map_.clear();
2315  delete opt_proxy_groups_;
2316  opt_proxy_groups_ = proxy_groups;
2317  // In pathological cases, opt_proxy_groups_current_ can be larger now when
2318  // proxies changed in-between.
2319  if (opt_proxy_groups_current_ > opt_proxy_groups_->size()) {
2320  if (opt_proxy_groups_->size() == 0) {
2321  opt_proxy_groups_current_ = 0;
2322  } else {
2323  opt_proxy_groups_current_ = opt_proxy_groups_->size() - 1;
2324  }
2325  opt_proxy_groups_current_burned_ = 0;
2326  }
2327 
2328  UpdateProxiesUnlocked("geosort");
2329 
2330  delete opt_host_chain_rtt_;
2331  opt_host_chain_rtt_ = new vector<int>(host_chain.size(), kProbeGeo);
2332  opt_host_chain_current_ = 0;
2333 
2334  return true;
2335 }
2336 
2337 
2345 bool DownloadManager::ValidateGeoReply(
2346  const string &reply_order,
2347  const unsigned expected_size,
2348  vector<uint64_t> *reply_vals)
2349 {
2350  if (reply_order.empty())
2351  return false;
2352  sanitizer::InputSanitizer sanitizer("09 , \n");
2353  if (!sanitizer.IsValid(reply_order))
2354  return false;
2355  sanitizer::InputSanitizer strip_newline("09 ,");
2356  vector<string> reply_strings =
2357  SplitString(strip_newline.Filter(reply_order), ',');
2358  vector<uint64_t> tmp_vals;
2359  for (unsigned i = 0; i < reply_strings.size(); ++i) {
2360  if (reply_strings[i].empty())
2361  return false;
2362  tmp_vals.push_back(String2Uint64(reply_strings[i]));
2363  }
2364  if (tmp_vals.size() != expected_size)
2365  return false;
2366 
2367  // Check if tmp_vals contains the number 1..n
2368  set<uint64_t> coverage(tmp_vals.begin(), tmp_vals.end());
2369  if (coverage.size() != tmp_vals.size())
2370  return false;
2371  if ((*coverage.begin() != 1) || (*coverage.rbegin() != coverage.size()))
2372  return false;
2373 
2374  for (unsigned i = 0; i < expected_size; ++i) {
2375  (*reply_vals)[i] = tmp_vals[i] - 1;
2376  }
2377  return true;
2378 }
2379 
2380 
2385 bool DownloadManager::StripDirect(
2386  const string &proxy_list,
2387  string *cleaned_list)
2388 {
2389  assert(cleaned_list);
2390  if (proxy_list == "") {
2391  *cleaned_list = "";
2392  return false;
2393  }
2394  bool result = false;
2395 
2396  vector<string> proxy_groups = SplitString(proxy_list, ';');
2397  vector<string> cleaned_groups;
2398  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2399  vector<string> group = SplitString(proxy_groups[i], '|');
2400  vector<string> cleaned;
2401  for (unsigned j = 0; j < group.size(); ++j) {
2402  if ((group[j] == "DIRECT") || (group[j] == "")) {
2403  result = true;
2404  } else {
2405  cleaned.push_back(group[j]);
2406  }
2407  }
2408  if (!cleaned.empty())
2409  cleaned_groups.push_back(JoinStrings(cleaned, "|"));
2410  }
2411 
2412  *cleaned_list = JoinStrings(cleaned_groups, ";");
2413  return result;
2414 }
2415 
2416 
2425 void DownloadManager::SetProxyChain(
2426  const string &proxy_list,
2427  const string &fallback_proxy_list,
2428  const ProxySetModes set_mode)
2429 {
2430  MutexLockGuard m(lock_options_);
2431 
2432  opt_timestamp_backup_proxies_ = 0;
2433  opt_timestamp_failover_proxies_ = 0;
2434  string set_proxy_list = opt_proxy_list_;
2435  string set_proxy_fallback_list = opt_proxy_fallback_list_;
2436  bool contains_direct;
2437  if ((set_mode == kSetProxyFallback) || (set_mode == kSetProxyBoth)) {
2438  opt_proxy_fallback_list_ = fallback_proxy_list;
2439  }
2440  if ((set_mode == kSetProxyRegular) || (set_mode == kSetProxyBoth)) {
2441  opt_proxy_list_ = proxy_list;
2442  }
2443  contains_direct =
2444  StripDirect(opt_proxy_fallback_list_, &set_proxy_fallback_list);
2445  if (contains_direct) {
2447  "fallback proxies do not support DIRECT, removing");
2448  }
2449  if (set_proxy_fallback_list == "") {
2450  set_proxy_list = opt_proxy_list_;
2451  } else {
2452  bool contains_direct = StripDirect(opt_proxy_list_, &set_proxy_list);
2453  if (contains_direct) {
2455  "skipping DIRECT proxy to use fallback proxy");
2456  }
2457  }
2458 
2459  // From this point on, use set_proxy_list and set_fallback_proxy_list as
2460  // effective proxy lists!
2461 
2462  opt_proxy_map_.clear();
2463  delete opt_proxy_groups_;
2464  if ((set_proxy_list == "") && (set_proxy_fallback_list == "")) {
2465  opt_proxy_groups_ = NULL;
2466  opt_proxy_groups_current_ = 0;
2467  opt_proxy_groups_current_burned_ = 0;
2468  opt_proxy_groups_fallback_ = 0;
2469  opt_num_proxies_ = 0;
2470  return;
2471  }
2472 
2473  // Determine number of regular proxy groups (== first fallback proxy group)
2474  opt_proxy_groups_fallback_ = 0;
2475  if (set_proxy_list != "") {
2476  opt_proxy_groups_fallback_ = SplitString(set_proxy_list, ';').size();
2477  }
2478  LogCvmfs(kLogDownload, kLogDebug, "first fallback proxy group %u",
2479  opt_proxy_groups_fallback_);
2480 
2481  // Concatenate regular proxies and fallback proxies, both of which can be
2482  // empty.
2483  string all_proxy_list = set_proxy_list;
2484  if (set_proxy_fallback_list != "") {
2485  if (all_proxy_list != "")
2486  all_proxy_list += ";";
2487  all_proxy_list += set_proxy_fallback_list;
2488  }
2489  LogCvmfs(kLogDownload, kLogDebug, "full proxy list %s",
2490  all_proxy_list.c_str());
2491 
2492  // Resolve server names in provided urls
2493  vector<string> hostnames; // All encountered hostnames
2494  vector<string> proxy_groups;
2495  if (all_proxy_list != "")
2496  proxy_groups = SplitString(all_proxy_list, ';');
2497  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2498  vector<string> this_group = SplitString(proxy_groups[i], '|');
2499  for (unsigned j = 0; j < this_group.size(); ++j) {
2500  this_group[j] = dns::AddDefaultScheme(this_group[j]);
2501  // Note: DIRECT strings will be "extracted" to an empty string.
2502  string hostname = dns::ExtractHost(this_group[j]);
2503  // Save the hostname. Leave empty (DIRECT) names so indexes will
2504  // match later.
2505  hostnames.push_back(hostname);
2506  }
2507  }
2508  vector<dns::Host> hosts;
2509  LogCvmfs(kLogDownload, kLogDebug, "resolving %u proxy addresses",
2510  hostnames.size());
2511  resolver_->ResolveMany(hostnames, &hosts);
2512 
2513  // Construct opt_proxy_groups_: traverse proxy list in same order and expand
2514  // names to resolved IP addresses.
2515  opt_proxy_groups_ = new vector< vector<ProxyInfo> >();
2516  opt_num_proxies_ = 0;
2517  unsigned num_proxy = 0; // Combined i, j counter
2518  for (unsigned i = 0; i < proxy_groups.size(); ++i) {
2519  vector<string> this_group = SplitString(proxy_groups[i], '|');
2520  // Construct ProxyInfo objects from proxy string and DNS resolver result for
2521  // every proxy in this_group. One URL can result in multiple ProxyInfo
2522  // objects, one for each IP address.
2523  vector<ProxyInfo> infos;
2524  for (unsigned j = 0; j < this_group.size(); ++j, ++num_proxy) {
2525  this_group[j] = dns::AddDefaultScheme(this_group[j]);
2526  if (this_group[j] == "DIRECT") {
2527  infos.push_back(ProxyInfo("DIRECT"));
2528  continue;
2529  }
2530 
2531  if (hosts[num_proxy].status() != dns::kFailOk) {
2533  "failed to resolve IP addresses for %s (%d - %s)",
2534  hosts[num_proxy].name().c_str(), hosts[num_proxy].status(),
2535  dns::Code2Ascii(hosts[num_proxy].status()));
2536  dns::Host failed_host =
2537  dns::Host::ExtendDeadline(hosts[num_proxy], resolver_->min_ttl());
2538  infos.push_back(ProxyInfo(failed_host, this_group[j]));
2539  continue;
2540  }
2541 
2542  // IPv4 addresses have precedence
2543  set<string> best_addresses =
2544  hosts[num_proxy].ViewBestAddresses(opt_ip_preference_);
2545  set<string>::const_iterator iter_ips = best_addresses.begin();
2546  for (; iter_ips != best_addresses.end(); ++iter_ips) {
2547  string url_ip = dns::RewriteUrl(this_group[j], *iter_ips);
2548  infos.push_back(ProxyInfo(hosts[num_proxy], url_ip));
2549  }
2550  }
2551  opt_proxy_groups_->push_back(infos);
2552  opt_num_proxies_ += infos.size();
2553  }
2555  "installed %u proxies in %u load-balance groups",
2556  opt_num_proxies_, opt_proxy_groups_->size());
2557  opt_proxy_groups_current_ = 0;
2558  opt_proxy_groups_current_burned_ = 0;
2559 
2560  // Select random start proxy from the first group.
2561  if (opt_proxy_groups_->size() > 0) {
2562  // Select random start proxy from the first group.
2563  UpdateProxiesUnlocked("set proxies");
2564  }
2565 }
2566 
2567 
2574 void DownloadManager::GetProxyInfo(vector< vector<ProxyInfo> > *proxy_chain,
2575  unsigned *current_group,
2576  unsigned *fallback_group)
2577 {
2578  assert(proxy_chain != NULL);
2579  MutexLockGuard m(lock_options_);
2580 
2581 
2582  if (!opt_proxy_groups_) {
2583  vector< vector<ProxyInfo> > empty_chain;
2584  *proxy_chain = empty_chain;
2585  if (current_group != NULL)
2586  *current_group = 0;
2587  if (fallback_group != NULL)
2588  *fallback_group = 0;
2589  return;
2590  }
2591 
2592  *proxy_chain = *opt_proxy_groups_;
2593  if (current_group != NULL)
2594  *current_group = opt_proxy_groups_current_;
2595  if (fallback_group != NULL)
2596  *fallback_group = opt_proxy_groups_fallback_;
2597 }
2598 
2599 string DownloadManager::GetProxyList() {
2600  return opt_proxy_list_;
2601 }
2602 
2603 string DownloadManager::GetFallbackProxyList() {
2604  return opt_proxy_fallback_list_;
2605 }
2606 
2611 DownloadManager::ChooseProxyUnlocked(const shash::Any *hash) {
2612  if (!opt_proxy_groups_)
2613  return NULL;
2614 
2615  uint32_t key = (hash ? hash->Partial32() : 0);
2616  map<uint32_t, ProxyInfo *>::iterator it = opt_proxy_map_.lower_bound(key);
2617  ProxyInfo *proxy = it->second;
2618 
2619  return proxy;
2620 }
2621 
2625 void DownloadManager::UpdateProxiesUnlocked(const string &reason) {
2626  if (!opt_proxy_groups_)
2627  return;
2628 
2629  // Identify number of non-burned proxies within the current group
2630  vector<ProxyInfo> *group = current_proxy_group();
2631  unsigned num_alive = (group->size() - opt_proxy_groups_current_burned_);
2632  string old_proxy = JoinStrings(opt_proxy_urls_, "|");
2633 
2634  // Rebuild proxy map and URL list
2635  opt_proxy_map_.clear();
2636  opt_proxy_urls_.clear();
2637  const uint32_t max_key = 0xffffffffUL;
2638  if (opt_proxy_shard_) {
2639  // Build a consistent map with multiple entries for each proxy
2640  for (unsigned i = 0; i < num_alive; ++i) {
2641  ProxyInfo *proxy = &(*group)[i];
2642  shash::Any proxy_hash(shash::kSha1);
2643  HashString(proxy->url, &proxy_hash);
2644  Prng prng;
2645  prng.InitSeed(proxy_hash.Partial32());
2646  for (unsigned j = 0; j < kProxyMapScale; ++j) {
2647  const std::pair<uint32_t, ProxyInfo *> entry(prng.Next(max_key), proxy);
2648  opt_proxy_map_.insert(entry);
2649  }
2650  opt_proxy_urls_.push_back(proxy->url);
2651  }
2652  // Ensure lower_bound() finds a value for all keys
2653  ProxyInfo *first_proxy = opt_proxy_map_.begin()->second;
2654  const std::pair<uint32_t, ProxyInfo *> last_entry(max_key, first_proxy);
2655  opt_proxy_map_.insert(last_entry);
2656  } else {
2657  // Build a map with a single entry for one randomly selected proxy
2658  unsigned select = prng_.Next(num_alive);
2659  ProxyInfo *proxy = &(*group)[select];
2660  const std::pair<uint32_t, ProxyInfo *> entry(max_key, proxy);
2661  opt_proxy_map_.insert(entry);
2662  opt_proxy_urls_.push_back(proxy->url);
2663  }
2664  sort(opt_proxy_urls_.begin(), opt_proxy_urls_.end());
2665 
2666  // Report any change in proxy usage
2667  string new_proxy = JoinStrings(opt_proxy_urls_, "|");
2668  if (new_proxy != old_proxy) {
2670  "switching proxy from %s to %s (%s)",
2671  (old_proxy.empty() ? "(none)" : old_proxy.c_str()),
2672  (new_proxy.empty() ? "(none)" : new_proxy.c_str()),
2673  reason.c_str());
2674  }
2675 }
2676 
2680 void DownloadManager::ShardProxies() {
2681  opt_proxy_shard_ = true;
2682  RebalanceProxiesUnlocked("enable sharding");
2683 }
2684 
2689 void DownloadManager::RebalanceProxiesUnlocked(const string &reason) {
2690  if (!opt_proxy_groups_)
2691  return;
2692 
2693  opt_timestamp_failover_proxies_ = 0;
2694  opt_proxy_groups_current_burned_ = 0;
2695  UpdateProxiesUnlocked(reason);
2696 }
2697 
2698 
2699 void DownloadManager::RebalanceProxies() {
2700  MutexLockGuard m(lock_options_);
2701  RebalanceProxiesUnlocked("rebalance");
2702 }
2703 
2704 
2708 void DownloadManager::SwitchProxyGroup() {
2709  MutexLockGuard m(lock_options_);
2710 
2711  if (!opt_proxy_groups_ || (opt_proxy_groups_->size() < 2)) {
2712  return;
2713  }
2714 
2715  opt_proxy_groups_current_ = (opt_proxy_groups_current_ + 1) %
2716  opt_proxy_groups_->size();
2717  opt_timestamp_backup_proxies_ = time(NULL);
2718  RebalanceProxiesUnlocked("switch proxy group");
2719 }
2720 
2721 
2722 void DownloadManager::SetProxyGroupResetDelay(const unsigned seconds) {
2723  MutexLockGuard m(lock_options_);
2724  opt_proxy_groups_reset_after_ = seconds;
2725  if (opt_proxy_groups_reset_after_ == 0) {
2726  opt_timestamp_backup_proxies_ = 0;
2727  opt_timestamp_failover_proxies_ = 0;
2728  }
2729 }
2730 
2731 
2732 void DownloadManager::SetHostResetDelay(const unsigned seconds)
2733 {
2734  MutexLockGuard m(lock_options_);
2735  opt_host_reset_after_ = seconds;
2736  if (opt_host_reset_after_ == 0)
2737  opt_timestamp_backup_host_ = 0;
2738 }
2739 
2740 
2741 void DownloadManager::SetRetryParameters(const unsigned max_retries,
2742  const unsigned backoff_init_ms,
2743  const unsigned backoff_max_ms)
2744 {
2745  MutexLockGuard m(lock_options_);
2746  opt_max_retries_ = max_retries;
2747  opt_backoff_init_ms_ = backoff_init_ms;
2748  opt_backoff_max_ms_ = backoff_max_ms;
2749 }
2750 
2751 
2752 void DownloadManager::SetMaxIpaddrPerProxy(unsigned limit) {
2753  MutexLockGuard m(lock_options_);
2754  resolver_->set_throttle(limit);
2755 }
2756 
2757 
2758 void DownloadManager::SetProxyTemplates(
2759  const std::string &direct,
2760  const std::string &forced)
2761 {
2762  MutexLockGuard m(lock_options_);
2763  proxy_template_direct_ = direct;
2764  proxy_template_forced_ = forced;
2765 }
2766 
2767 
2768 void DownloadManager::EnableInfoHeader() {
2769  enable_info_header_ = true;
2770 }
2771 
2772 
2773 void DownloadManager::EnableRedirects() {
2774  follow_redirects_ = true;
2775 }
2776 
2777 void DownloadManager::UseSystemCertificatePath() {
2778  ssl_certificate_store_.UseSystemCertificatePath();
2779 }
2780 
2785 DownloadManager *DownloadManager::Clone(
2786  const perf::StatisticsTemplate &statistics)
2787 {
2788  DownloadManager *clone = new DownloadManager();
2789  clone->Init(pool_max_handles_, statistics);
2790  if (resolver_) {
2791  clone->SetDnsParameters(resolver_->retries(), resolver_->timeout_ms());
2792  clone->SetDnsTtlLimits(resolver_->min_ttl(), resolver_->max_ttl());
2793  clone->SetMaxIpaddrPerProxy(resolver_->throttle());
2794  }
2795  if (!opt_dns_server_.empty())
2796  clone->SetDnsServer(opt_dns_server_);
2797  clone->opt_timeout_proxy_ = opt_timeout_proxy_;
2798  clone->opt_timeout_direct_ = opt_timeout_direct_;
2799  clone->opt_low_speed_limit_ = opt_low_speed_limit_;
2800  clone->opt_max_retries_ = opt_max_retries_;
2801  clone->opt_backoff_init_ms_ = opt_backoff_init_ms_;
2802  clone->opt_backoff_max_ms_ = opt_backoff_max_ms_;
2803  clone->enable_info_header_ = enable_info_header_;
2804  clone->follow_redirects_ = follow_redirects_;
2805  if (opt_host_chain_) {
2806  clone->opt_host_chain_ = new vector<string>(*opt_host_chain_);
2807  clone->opt_host_chain_rtt_ = new vector<int>(*opt_host_chain_rtt_);
2808  }
2809  CloneProxyConfig(clone);
2810  clone->opt_ip_preference_ = opt_ip_preference_;
2811  clone->proxy_template_direct_ = proxy_template_direct_;
2812  clone->proxy_template_forced_ = proxy_template_forced_;
2813  clone->opt_proxy_groups_reset_after_ = opt_proxy_groups_reset_after_;
2814  clone->opt_host_reset_after_ = opt_host_reset_after_;
2815  clone->credentials_attachment_ = credentials_attachment_;
2816  clone->ssl_certificate_store_ = ssl_certificate_store_;
2817 
2818  return clone;
2819 }
2820 
2821 
2822 void DownloadManager::CloneProxyConfig(DownloadManager *clone) {
2823  clone->opt_proxy_groups_current_ = opt_proxy_groups_current_;
2824  clone->opt_proxy_groups_current_burned_ = opt_proxy_groups_current_burned_;
2825  clone->opt_proxy_groups_fallback_ = opt_proxy_groups_fallback_;
2826  clone->opt_num_proxies_ = opt_num_proxies_;
2827  clone->opt_proxy_shard_ = opt_proxy_shard_;
2828  clone->opt_proxy_list_ = opt_proxy_list_;
2829  clone->opt_proxy_fallback_list_ = opt_proxy_fallback_list_;
2830  if (opt_proxy_groups_ == NULL)
2831  return;
2832 
2833  clone->opt_proxy_groups_ = new vector< vector<ProxyInfo> >(
2834  *opt_proxy_groups_);
2835  clone->UpdateProxiesUnlocked("cloned");
2836 }
2837 
2838 } // namespace download
unsigned opt_timeout_direct_
Definition: download.h:528
unsigned opt_low_speed_limit_
Definition: download.h:529
void HashString(const std::string &content, Any *any_digest)
Definition: hash.cc:268
Destination destination
Definition: download.h:168
#define LogCvmfs(source, mask,...)
Definition: logging.h:25
Definition: prng.h:28
std::vector< T > Shuffle(const std::vector< T > &input, Prng *prng)
Definition: algorithm.h:49
const char * Code2Ascii(const ObjectFetcherFailures::Failures error)
unsigned opt_backoff_init_ms_
Definition: download.h:531
static unsigned EscapeHeader(const string &header, char *escaped_buf, size_t buf_size)
Definition: download.cc:123
unsigned char num_used_hosts
Definition: download.h:301
static bool EscapeUrlChar(char input, char output[3])
Definition: download.cc:73
int64_t Xadd(class Counter *counter, const int64_t delta)
Definition: statistics.h:51
const char * Code2Ascii(const Failures error)
Definition: dns.h:53
bool Write(const T &data)
Definition: pipe.h:125
int64_t id() const
Definition: dns.h:108
unsigned opt_proxy_groups_current_burned_
Definition: download.h:556
double DiffTimeSeconds(struct timeval start, struct timeval end)
Definition: algorithm.cc:31
unsigned opt_proxy_groups_reset_after_
Definition: download.h:618
virtual bool IsCanceled()
Definition: interrupt.h:16
void SetUrlOptions(JobInfo *info)
Definition: download.cc:912
StreamStates DecompressZStream2Sink(const void *buf, const int64_t size, z_stream *strm, cvmfs::Sink *sink)
Definition: compression.cc:234
unsigned backoff_ms
Definition: download.h:303
std::string opt_proxy_fallback_list_
Definition: download.h:573
static NormalResolver * Create(const bool ipv4_only, const unsigned retries, const unsigned timeout_ms)
Definition: dns.cc:1259
unsigned opt_host_reset_after_
Definition: download.h:626
std::string proxy_template_direct_
Definition: download.h:602
#define PANIC(...)
Definition: exception.h:29
string ReplaceAll(const string &haystack, const string &needle, const string &replace_by)
Definition: string.cc:484
string JoinStrings(const vector< string > &strings, const string &joint)
Definition: string.cc:325
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:249
unsigned opt_proxy_groups_current_
Definition: download.h:551
off_t range_offset
Definition: download.h:181
shash::ContextPtr hash_context
Definition: download.h:291
void DecompressInit(z_stream *strm)
Definition: compression.cc:185
unsigned int current_host_chain_index
Definition: download.h:304
bool DecompressMem2Mem(const void *buf, const int64_t size, void **out_buf, uint64_t *out_size)
Definition: compression.cc:797
std::set< CURL * > * pool_handles_inuse_
Definition: download.h:507
void * cred_data
Definition: download.h:166
StreamStates DecompressZStream2File(const void *buf, const int64_t size, z_stream *strm, FILE *f)
Definition: compression.cc:275
std::string opt_proxy_list_
Definition: download.h:569
const std::string & name() const
Definition: dns.h:118
perf::Counter * sz_transfer_time
Definition: download.h:131
std::vector< std::vector< ProxyInfo > > * opt_proxy_groups_
Definition: download.h:547
assert((mem||(size==0))&&"Out Of Memory")
unsigned opt_proxy_groups_fallback_
Definition: download.h:561
void ReleaseCurlHandle(CURL *handle)
Definition: download.cc:826
StreamStates
Definition: compression.h:36
Algorithms algorithm
Definition: hash.h:125
z_stream zstream
Definition: download.h:290
const std::set< std::string > & ViewBestAddresses(IpPreference preference) const
Definition: dns.cc:205
void SetDnsServer(const std::string &address)
Definition: download.cc:1834
void DecompressFini(z_stream *strm)
Definition: compression.cc:201
void InitSeed(const uint64_t seed)
Definition: prng.h:35
char algorithm
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:182
void Init(ContextPtr context)
Definition: hash.cc:164
bool IsValid(const std::string &input) const
Definition: sanitizer.cc:114
Algorithms
Definition: hash.h:41
void SetDnsTtlLimits(const unsigned min_seconds, const unsigned max_seconds)
Definition: download.cc:1870
static Failures PrepareDownloadDestination(JobInfo *info)
Definition: download.cc:153
void Init(const unsigned max_pool_handles, const perf::StatisticsTemplate &statistics)
Definition: download.cc:1621
const char * Code2Ascii(const Failures error)
Definition: download.h:92
unsigned char num_retries
Definition: download.h:302
std::string AddDefaultScheme(const std::string &proxy)
Definition: dns.cc:187
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:290
static string EscapeUrl(const string &url)
Definition: download.cc:100
dns::IpPreference opt_ip_preference_
Definition: download.h:595
Algorithms algorithm
Definition: hash.h:500
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:287
Definition: dns.h:90
Failures Fetch(const std::string &base_url, const std::string &repository_name, const uint64_t minimum_timestamp, const shash::Any *base_catalog, signature::SignatureManager *signature_manager, download::DownloadManager *download_manager, ManifestEnsemble *ensemble)
void UpdateProxiesUnlocked(const std::string &reason)
Definition: download.cc:2625
bool IsEquivalent(const Host &other) const
Definition: dns.cc:269
bool IsProxyTransferError(const Failures error)
Definition: download.h:80
bool IsExpired() const
Definition: dns.cc:280
FILE * destination_file
Definition: download.h:174
bool follow_redirects
Definition: download.h:161
virtual int Reset()=0
perf::Counter * n_requests
Definition: download.h:132
static int Init(const loader::LoaderExports *loader_exports)
Definition: cvmfs.cc:2187
void Final(ContextPtr context, Any *any_digest)
Definition: hash.cc:221
string StringifyInt(const int64_t value)
Definition: string.cc:78
bool IsValid() const
Definition: pointer.h:43
void SetMaxIpaddrPerProxy(unsigned limit)
Definition: download.cc:2752
const shash::Any * expected_hash
Definition: download.h:177
void Inc(class Counter *counter)
Definition: statistics.h:50
void * buffer
Definition: hash.h:501
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:267
const std::string * extra_info
Definition: download.h:178
std::string ExtractHost(const std::string &url)
Definition: dns.cc:110
std::vector< int > * opt_host_chain_rtt_
Definition: download.h:543
cvmfs::Sink * destination_sink
Definition: download.h:176
SslCertificateStore ssl_certificate_store_
Definition: download.h:639
uint32_t Partial32() const
Definition: hash.h:394
IpPreference
Definition: dns.h:46
unsigned opt_backoff_max_ms_
Definition: download.h:532
unsigned GetContextSize(const Algorithms algorithm)
Definition: hash.cc:148
CURL * curl_handle
Definition: download.h:287
CredentialsAttachment * credentials_attachment_
Definition: download.h:628
std::vector< std::string > * opt_host_chain_
Definition: download.h:538
struct pollfd * watch_fds_
Definition: download.h:519
void Update(const unsigned char *buffer, const unsigned buffer_length, ContextPtr context)
Definition: hash.cc:190
uint64_t String2Uint64(const string &value)
Definition: string.cc:228
Failures error_code
Definition: download.h:298
UniquePtr< Pipe< kPipeDownloadJobs > > pipe_jobs_
Definition: download.h:518
static void Fini()
Definition: cvmfs.cc:2404
static void Spawn()
Definition: cvmfs.cc:2312
InterruptCue * interrupt_cue
Definition: download.h:167
Definition: mutex.h:42
std::string Filter(const std::string &input) const
Definition: sanitizer.cc:107
std::string proxy_template_forced_
Definition: download.h:608
const std::string * destination_path
Definition: download.h:175
void SetDnsParameters(const unsigned retries, const unsigned timeout_ms)
Definition: download.cc:1852
static Host ExtendDeadline(const Host &original, unsigned seconds_from_now)
Definition: dns.cc:231
struct download::JobInfo::@4 destination_mem
UniquePtr< Pipe< kPipeThreadTerminator > > pipe_terminate_
Definition: download.h:516
void SafeSleepMs(const unsigned ms)
Definition: posix.cc:1969
bool IsHostTransferError(const Failures error)
Definition: download.h:68
void SortTeam(std::vector< T > *tractor, std::vector< U > *towed)
Definition: algorithm.h:67
curl_slist * headers
Definition: download.h:288
UniquePtr< Pipe< kPipeDownloadJobsResults > > pipe_job_results
Pipe used for the return value.
Definition: download.h:294
unsigned size
Definition: hash.h:502
unsigned char num_used_proxies
Definition: download.h:300
Failures status() const
Definition: dns.h:119
std::string proxy
Definition: download.h:296
static void size_t size
Definition: smalloc.h:54
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
Definition: download.cc:1235
const std::string * url
Definition: download.h:157
void InitializeRequest(JobInfo *info, CURL *handle)
Definition: download.cc:844
string RewriteUrl(const string &url, const string &ip)
Definition: dns.cc:156
uint32_t Next(const uint64_t boundary)
Definition: prng.h:49
virtual int64_t Write(const void *buf, uint64_t sz)=0
char * info_header
Definition: download.h:289