29 #define __STDC_FORMAT_MACROS
31 #include "cvmfs_config.h"
73 if (((input >=
'0') && (input <=
'9')) ||
74 ((input >=
'A') && (input <=
'Z')) ||
75 ((input >=
'a') && (input <=
'z')) ||
76 (input ==
'/') || (input ==
':') || (input ==
'.') ||
78 (input ==
'+') || (input ==
'-') ||
79 (input ==
'_') || (input ==
'~') ||
80 (input ==
'[') || (input ==
']') || (input ==
','))
87 output[1] =
static_cast<char>(
88 (input / 16) + ((input / 16 <= 9) ?
'0' :
'A'-10));
89 output[2] =
static_cast<char>(
90 (input % 16) + ((input % 16 <= 9) ?
'0' :
'A'-10));
101 escaped.reserve(url.length());
103 char escaped_char[3];
104 for (
unsigned i = 0, s = url.length(); i < s; ++i) {
106 escaped.append(escaped_char, 3);
108 escaped.push_back(escaped_char[0]);
112 url.c_str(), escaped.c_str());
126 unsigned esc_pos = 0;
127 char escaped_char[3];
128 for (
unsigned i = 0, s = header.size(); i < s; ++i) {
130 for (
unsigned j = 0; j < 3; ++j) {
132 if (esc_pos >= buf_size)
134 escaped_buf[esc_pos] = escaped_char[j];
140 if (esc_pos >= buf_size)
142 escaped_buf[esc_pos] = escaped_char[0];
184 const size_t num_bytes = size*nmemb;
185 const string header_line(static_cast<const char *>(ptr), num_bytes);
192 if (
HasPrefix(header_line,
"HTTP/1.",
false)) {
193 if (header_line.length() < 10)
197 for (i = 8; (i < header_line.length()) && (header_line[i] ==
' '); ++i) {}
200 if (header_line.length() > i+2) {
201 info->
http_code = DownloadManager::ParseHttpCode(&header_line[i]);
213 header_line.c_str());
218 header_line.c_str());
244 HasPrefix(header_line,
"CONTENT-LENGTH:",
true))
246 char *tmp =
reinterpret_cast<char *
>(alloca(num_bytes+1));
248 sscanf(header_line.c_str(),
"%s %" PRIu64, tmp, &length);
250 if (length > DownloadManager::kMaxMemSize) {
252 "resource %s too large to store in memory (%" PRIu64
")",
253 info->
url->c_str(), length);
263 }
else if (
HasPrefix(header_line,
"LOCATION:",
true)) {
266 }
else if (
HasPrefix(header_line,
"X-SQUID-ERROR:",
true)) {
271 }
else if (
HasPrefix(header_line,
"PROXY-STATUS:",
true)) {
274 (header_line.find(
"error=") != string::npos)) {
289 const size_t num_bytes = size*nmemb;
314 "decompressing %s, local IO error", info->
url->c_str());
320 if ((written < 0) || (static_cast<uint64_t>(written) != num_bytes)) {
322 PRId64
")", info->
url->c_str(), written);
332 "Content-Length was missing or zero, but %zu bytes received",
336 "start %zu, bytes %zu, expected %zu",
362 "decompressing %s, local IO error", info->
url->c_str());
369 "downloading %s, IO failure: %s (errno=%d)",
370 info->
url->c_str(), strerror(errno), errno);
384 bool JobInfo::IsFileNotFound() {
388 return http_code == 404;
395 const int DownloadManager::kProbeUnprobed = -1;
396 const int DownloadManager::kProbeDown = -2;
397 const int DownloadManager::kProbeGeo = -3;
398 const unsigned DownloadManager::kMaxMemSize = 1024*1024;
404 int DownloadManager::ParseHttpCode(
const char digits[3]) {
407 for (
int i = 0; i < 3; ++i) {
408 if ((digits[i] <
'0') || (digits[i] >
'9'))
410 result += (digits[i] -
'0') * factor;
420 int DownloadManager::CallbackCurlSocket(CURL * ,
429 if (action == CURL_POLL_NONE)
445 download_mgr->
watch_fds_ =
static_cast<struct pollfd *
>(
457 download_mgr->
watch_fds_[index].events = POLLIN | POLLPRI;
460 download_mgr->
watch_fds_[index].events = POLLOUT | POLLWRBAND;
462 case CURL_POLL_INOUT:
464 POLLIN | POLLPRI | POLLOUT | POLLWRBAND;
466 case CURL_POLL_REMOVE:
467 if (index < download_mgr->watch_fds_inuse_-1) {
479 download_mgr->
watch_fds_ =
static_cast<struct pollfd *
>(
497 void *DownloadManager::MainDownload(
void *data) {
502 static_cast<struct pollfd *
>(smalloc(2 *
sizeof(
struct pollfd)));
505 download_mgr->
watch_fds_[0].events = POLLIN | POLLPRI;
508 download_mgr->
watch_fds_[1].events = POLLIN | POLLPRI;
512 int still_running = 0;
513 struct timeval timeval_start, timeval_stop;
514 gettimeofday(&timeval_start, NULL);
530 gettimeofday(&timeval_stop, NULL);
531 int64_t delta =
static_cast<int64_t
>(
543 curl_multi_socket_action(download_mgr->
curl_multi_,
560 gettimeofday(&timeval_start, NULL);
564 curl_multi_add_handle(download_mgr->
curl_multi_, handle);
565 curl_multi_socket_action(download_mgr->
curl_multi_,
582 if (download_mgr->
watch_fds_[i].revents & (POLLIN | POLLPRI))
583 ev_bitmask |= CURL_CSELECT_IN;
584 if (download_mgr->
watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
585 ev_bitmask |= CURL_CSELECT_OUT;
587 (POLLERR | POLLHUP | POLLNVAL))
589 ev_bitmask |= CURL_CSELECT_ERR;
593 curl_multi_socket_action(download_mgr->
curl_multi_,
603 while ((curl_msg = curl_multi_info_read(download_mgr->
curl_multi_,
606 if (curl_msg->msg == CURLMSG_DONE) {
609 CURL *easy_handle = curl_msg->easy_handle;
610 int curl_error = curl_msg->
data.result;
611 curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
613 curl_multi_remove_handle(download_mgr->
curl_multi_, easy_handle);
615 curl_multi_add_handle(download_mgr->
curl_multi_, easy_handle);
616 curl_multi_socket_action(download_mgr->
curl_multi_,
634 curl_multi_remove_handle(download_mgr->
curl_multi_, *i);
635 curl_easy_cleanup(*i);
648 HeaderLists::~HeaderLists() {
649 for (
unsigned i = 0; i < blocks_.size(); ++i) {
656 curl_slist *HeaderLists::GetList(
const char *header) {
661 curl_slist *HeaderLists::DuplicateList(curl_slist *slist) {
663 curl_slist *copy = GetList(slist->data);
664 copy->next = slist->next;
665 curl_slist *prev = copy;
668 curl_slist *new_link = Get(slist->data);
669 new_link->next = slist->next;
670 prev->next = new_link;
678 void HeaderLists::AppendHeader(curl_slist *slist,
const char *header) {
680 curl_slist *new_link = Get(header);
681 new_link->next = NULL;
685 slist->next = new_link;
694 void HeaderLists::CutHeader(
const char *header, curl_slist **slist) {
698 curl_slist *prev = &head;
699 curl_slist *rover = *slist;
701 if (strcmp(rover->data, header) == 0) {
702 prev->next = rover->next;
713 void HeaderLists::PutList(curl_slist *slist) {
715 curl_slist *next = slist->next;
722 string HeaderLists::Print(curl_slist *slist) {
725 verbose += string(slist->data) +
"\n";
732 curl_slist *HeaderLists::Get(
const char *header) {
733 for (
unsigned i = 0; i < blocks_.size(); ++i) {
734 for (
unsigned j = 0; j < kBlockSize; ++j) {
735 if (!IsUsed(&(blocks_[i][j]))) {
736 blocks_[i][j].data =
const_cast<char *
>(header);
737 return &(blocks_[i][j]);
744 blocks_[blocks_.size()-1][0].data =
const_cast<char *
>(header);
745 return &(blocks_[blocks_.size()-1][0]);
749 void HeaderLists::Put(curl_slist *slist) {
755 void HeaderLists::AddBlock() {
756 curl_slist *new_block =
new curl_slist[kBlockSize];
757 for (
unsigned i = 0; i < kBlockSize; ++i) {
760 blocks_.push_back(new_block);
767 string DownloadManager::ProxyInfo::Print() {
773 static_cast<int>(host.deadline()) - static_cast<int>(time(NULL));
774 string expinfo = (remaining >= 0) ?
"+" :
"";
775 if (abs(remaining) >= 3600) {
777 }
else if (abs(remaining) >= 60) {
783 result +=
" (" + host.name() +
", " + expinfo +
")";
785 result +=
" (:unresolved:, " + expinfo +
")";
795 CURL *DownloadManager::AcquireCurlHandle() {
798 if (pool_handles_idle_->empty()) {
800 handle = curl_easy_init();
803 curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
808 handle = *(pool_handles_idle_->begin());
809 pool_handles_idle_->erase(pool_handles_idle_->begin());
812 pool_handles_inuse_->insert(handle);
818 void DownloadManager::ReleaseCurlHandle(CURL *handle) {
819 set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
820 assert(elem != pool_handles_inuse_->end());
822 if (pool_handles_idle_->size() > pool_max_handles_) {
823 curl_easy_cleanup(*elem);
825 pool_handles_idle_->insert(*elem);
828 pool_handles_inuse_->erase(elem);
836 void DownloadManager::InitializeRequest(
JobInfo *info, CURL *handle) {
846 info->
headers = header_lists_->DuplicateList(default_headers_);
864 char byte_range_array[100];
865 const int64_t range_lower =
static_cast<int64_t
>(info->
range_offset);
866 const int64_t range_upper =
static_cast<int64_t
>(
868 if (snprintf(byte_range_array,
sizeof(byte_range_array),
869 "%" PRId64
"-%" PRId64,
870 range_lower, range_upper) == 100)
874 curl_easy_setopt(handle, CURLOPT_RANGE, byte_range_array);
876 curl_easy_setopt(handle, CURLOPT_RANGE, NULL);
880 curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
881 curl_easy_setopt(handle, CURLOPT_WRITEHEADER,
882 static_cast<void *>(info));
883 curl_easy_setopt(handle, CURLOPT_WRITEDATA, static_cast<void *>(info));
884 curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->
headers);
886 curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
888 curl_easy_setopt(handle, CURLOPT_HTTPGET, 1);
890 if (opt_ipv4_only_) {
891 curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
893 if (follow_redirects_) {
894 curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1);
895 curl_easy_setopt(handle, CURLOPT_MAXREDIRS, 4);
904 void DownloadManager::SetUrlOptions(
JobInfo *info) {
910 if (opt_timestamp_backup_proxies_ > 0) {
911 const time_t now = time(NULL);
912 if (static_cast<int64_t>(now) >
913 static_cast<int64_t>(opt_timestamp_backup_proxies_ +
914 opt_proxy_groups_reset_after_))
916 opt_proxy_groups_current_ = 0;
917 opt_timestamp_backup_proxies_ = 0;
918 RebalanceProxiesUnlocked(
"reset proxy group");
922 if (opt_timestamp_failover_proxies_ > 0) {
923 const time_t now = time(NULL);
924 if (static_cast<int64_t>(now) >
925 static_cast<int64_t>(opt_timestamp_failover_proxies_ +
926 opt_proxy_groups_reset_after_))
928 RebalanceProxiesUnlocked(
"reset load-balanced proxies");
932 if (opt_timestamp_backup_host_ > 0) {
933 const time_t now = time(NULL);
934 if (static_cast<int64_t>(now) >
935 static_cast<int64_t>(opt_timestamp_backup_host_ +
936 opt_host_reset_after_))
939 "switching host from %s to %s (reset host)",
940 (*opt_host_chain_)[opt_host_chain_current_].c_str(),
941 (*opt_host_chain_)[0].c_str());
942 opt_host_chain_current_ = 0;
943 opt_timestamp_backup_host_ = 0;
948 if (!proxy || (proxy->
url ==
"DIRECT")) {
949 info->
proxy =
"DIRECT";
950 curl_easy_setopt(info->
curl_handle, CURLOPT_PROXY,
"");
955 std::string purl = proxy->
url;
957 const bool changed = ValidateProxyIpsUnlocked(purl, phost);
966 curl_easy_setopt(info->
curl_handle, CURLOPT_PROXY,
"0.0.0.0");
969 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, opt_low_speed_limit_);
970 if (info->
proxy !=
"DIRECT") {
971 curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_proxy_);
972 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_proxy_);
974 curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_direct_);
975 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_direct_);
977 if (!opt_dns_server_.empty())
978 curl_easy_setopt(curl_handle, CURLOPT_DNS_SERVERS, opt_dns_server_.c_str());
981 url_prefix = (*opt_host_chain_)[opt_host_chain_current_];
985 string url = url_prefix + *(info->
url);
987 curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 1L);
988 if (url.substr(0, 5) ==
"https") {
989 bool rvb = ssl_certificate_store_.ApplySslCertificatePath(curl_handle);
992 "Failed to set SSL certificate path %s",
993 ssl_certificate_store_.GetCaPath().c_str());
995 if (info->
pid != -1) {
996 if (credentials_attachment_ == NULL) {
998 "uses secure downloads but no credentials attachment set");
1000 bool retval = credentials_attachment_->ConfigureCurlHandle(
1011 signal(SIGPIPE, SIG_IGN);
1014 if (url.find(
"@proxy@") != string::npos) {
1022 if (proxy_template_forced_ !=
"") {
1023 replacement = proxy_template_forced_;
1024 }
else if (info->
proxy ==
"DIRECT") {
1025 replacement = proxy_template_direct_;
1027 if (opt_proxy_groups_current_ >= opt_proxy_groups_fallback_) {
1030 info->
proxy =
"DIRECT";
1031 curl_easy_setopt(info->
curl_handle, CURLOPT_PROXY,
"");
1032 replacement = proxy_template_direct_;
1034 replacement = ChooseProxyUnlocked(info->
expected_hash)->host.name();
1037 replacement = (replacement ==
"") ? proxy_template_direct_ : replacement;
1039 replacement.c_str());
1040 url =
ReplaceAll(url,
"@proxy@", replacement);
1051 curl_easy_setopt(curl_handle, CURLOPT_URL,
EscapeUrl(url).c_str());
1064 bool DownloadManager::ValidateProxyIpsUnlocked(
1071 host.
name().c_str());
1073 unsigned group_idx = opt_proxy_groups_current_;
1076 bool update_only =
true;
1080 "failed to resolve IP addresses for %s (%d - %s)",
1085 update_only =
false;
1089 for (
unsigned i = 0; i < (*opt_proxy_groups_)[group_idx].size(); ++i) {
1090 if ((*opt_proxy_groups_)[group_idx][i].host.id() == host.
id())
1091 (*opt_proxy_groups_)[group_idx][i].host = new_host;
1100 "DNS entries for proxy %s changed, adjusting", host.
name().c_str());
1101 vector<ProxyInfo> *group = current_proxy_group();
1102 opt_num_proxies_ -= group->size();
1103 for (
unsigned i = 0; i < group->size(); ) {
1104 if ((*group)[i].host.id() == host.
id()) {
1105 group->erase(group->begin() + i);
1110 vector<ProxyInfo> new_infos;
1112 set<string>::const_iterator iter_ips = best_addresses.begin();
1113 for (; iter_ips != best_addresses.end(); ++iter_ips) {
1115 new_infos.push_back(
ProxyInfo(new_host, url_ip));
1117 group->insert(group->end(), new_infos.begin(), new_infos.end());
1118 opt_num_proxies_ += new_infos.size();
1120 RebalanceProxiesUnlocked(
"DNS change");
1128 void DownloadManager::UpdateStatistics(CURL *handle) {
1133 retval = curl_easy_getinfo(handle, CURLINFO_SIZE_DOWNLOAD, &val);
1134 assert(retval == CURLE_OK);
1135 sum +=
static_cast<int64_t
>(val);
1139 perf::Xadd(counters_->sz_transferred_bytes, sum);
1146 bool DownloadManager::CanRetry(
const JobInfo *info) {
1148 unsigned max_retries = opt_max_retries_;
1163 unsigned backoff_init_ms = 0;
1164 unsigned backoff_max_ms = 0;
1167 backoff_init_ms = opt_backoff_init_ms_;
1168 backoff_max_ms = opt_backoff_max_ms_;
1174 info->
backoff_ms = prng_.Next(backoff_init_ms + 1);
1187 header_lists_->AppendHeader(info->
headers,
"Pragma: no-cache");
1188 header_lists_->AppendHeader(info->
headers,
"Cache-Control: no-cache");
1198 void DownloadManager::SetRegularCache(
JobInfo *info) {
1201 header_lists_->CutHeader(
"Pragma: no-cache", &(info->
headers));
1202 header_lists_->CutHeader(
"Cache-Control: no-cache", &(info->
headers));
1211 void DownloadManager::ReleaseCredential(
JobInfo *info) {
1213 assert(credentials_attachment_ != NULL);
1214 credentials_attachment_->ReleaseCurlHandle(info->
curl_handle,
1227 bool DownloadManager::VerifyAndFinalize(
const int curl_error,
JobInfo *info) {
1229 "Verify downloaded url %s, proxy %s (curl error %d)",
1230 info->
url->c_str(), info->
proxy.c_str(), curl_error);
1234 switch (curl_error) {
1242 "hash verification of %s failed (expected %s, got %s)",
1264 "decompression (memory) of url %s failed",
1265 info->
url->c_str());
1273 case CURLE_UNSUPPORTED_PROTOCOL:
1276 case CURLE_URL_MALFORMAT:
1279 case CURLE_COULDNT_RESOLVE_PROXY:
1282 case CURLE_COULDNT_RESOLVE_HOST:
1285 case CURLE_OPERATION_TIMEDOUT:
1289 case CURLE_PARTIAL_FILE:
1290 case CURLE_GOT_NOTHING:
1291 case CURLE_RECV_ERROR:
1295 case CURLE_FILE_COULDNT_READ_FILE:
1296 case CURLE_COULDNT_CONNECT:
1297 if (info->
proxy !=
"DIRECT") {
1304 case CURLE_TOO_MANY_REDIRECTS:
1307 case CURLE_SSL_CACERT_BADFILE:
1309 "Failed to load certificate bundle. "
1310 "X509_CERT_BUNDLE might point to the wrong location.");
1315 case CURLE_PEER_FAILED_VERIFICATION:
1317 "invalid SSL certificate of remote host. "
1318 "X509_CERT_DIR and/or X509_CERT_BUNDLE might point to the wrong "
1322 case CURLE_ABORTED_BY_CALLBACK:
1323 case CURLE_WRITE_ERROR:
1328 "trying to fetch %s", curl_error, info->
url->c_str());
1333 std::vector<std::string> *host_chain = opt_host_chain_;
1336 bool try_again =
false;
1337 bool same_url_retry = CanRetry(info);
1346 "data corruption with no-cache header, try another host");
1351 if ( same_url_retry || (
1361 if ( same_url_retry || (
1376 if (opt_proxy_groups_) {
1377 if ((opt_proxy_groups_current_ > 0) ||
1378 (opt_proxy_groups_current_burned_ > 0))
1380 opt_proxy_groups_current_ = 0;
1381 opt_timestamp_backup_proxies_ = 0;
1382 RebalanceProxiesUnlocked(
"reset proxies for host failover");
1399 "same url: %d, error code %d", same_url_retry, info->
error_code);
1414 goto verify_and_finalize_stop;
1421 goto verify_and_finalize_stop;
1428 SetRegularCache(info);
1431 bool switch_proxy =
false;
1432 bool switch_host =
false;
1439 switch_proxy =
true;
1448 if (same_url_retry) {
1451 switch_proxy =
true;
1454 if (same_url_retry) {
1465 ReleaseCredential(info);
1468 SetUrlOptions(info);
1471 ReleaseCredential(info);
1474 SetUrlOptions(info);
1480 verify_and_finalize_stop:
1482 ReleaseCredential(info);
1497 header_lists_->PutList(info->
headers);
1505 DownloadManager::DownloadManager() {
1506 pool_handles_idle_ = NULL;
1507 pool_handles_inuse_ = NULL;
1508 pool_max_handles_ = 0;
1510 default_headers_ = NULL;
1512 atomic_init32(&multi_threaded_);
1513 pipe_terminate_[0] = pipe_terminate_[1] = -1;
1515 pipe_jobs_[0] = pipe_jobs_[1] = -1;
1517 watch_fds_size_ = 0;
1518 watch_fds_inuse_ = 0;
1522 reinterpret_cast<pthread_mutex_t *
>(smalloc(
sizeof(pthread_mutex_t)));
1523 int retval = pthread_mutex_init(lock_options_, NULL);
1525 lock_synchronous_mode_ =
1526 reinterpret_cast<pthread_mutex_t *
>(smalloc(
sizeof(pthread_mutex_t)));
1527 retval = pthread_mutex_init(lock_synchronous_mode_, NULL);
1530 opt_dns_server_ =
"";
1532 opt_timeout_proxy_ = 0;
1533 opt_timeout_direct_ = 0;
1534 opt_low_speed_limit_ = 0;
1535 opt_host_chain_ = NULL;
1536 opt_host_chain_rtt_ = NULL;
1537 opt_host_chain_current_ = 0;
1538 opt_proxy_groups_ = NULL;
1539 opt_proxy_groups_current_ = 0;
1540 opt_proxy_groups_current_burned_ = 0;
1541 opt_num_proxies_ = 0;
1542 opt_proxy_shard_ =
false;
1543 opt_max_retries_ = 0;
1544 opt_backoff_init_ms_ = 0;
1545 opt_backoff_max_ms_ = 0;
1546 enable_info_header_ =
false;
1547 opt_ipv4_only_ =
false;
1548 follow_redirects_ =
false;
1552 opt_timestamp_backup_proxies_ = 0;
1553 opt_timestamp_failover_proxies_ = 0;
1554 opt_proxy_groups_reset_after_ = 0;
1555 opt_timestamp_backup_host_ = 0;
1556 opt_host_reset_after_ = 0;
1558 credentials_attachment_ = NULL;
1564 DownloadManager::~DownloadManager() {
1565 pthread_mutex_destroy(lock_options_);
1566 pthread_mutex_destroy(lock_synchronous_mode_);
1567 free(lock_options_);
1568 free(lock_synchronous_mode_);
1571 void DownloadManager::InitHeaders() {
1573 string cernvm_id =
"User-Agent: cvmfs ";
1574 #ifdef CVMFS_LIBCVMFS
1575 cernvm_id +=
"libcvmfs ";
1577 cernvm_id +=
"Fuse ";
1579 cernvm_id += string(VERSION);
1580 if (getenv(
"CERNVM_UUID") != NULL) {
1584 user_agent_ = strdup(cernvm_id.c_str());
1588 default_headers_ = header_lists_->GetList(
"Connection: Keep-Alive");
1589 header_lists_->AppendHeader(default_headers_,
"Pragma:");
1590 header_lists_->AppendHeader(default_headers_, user_agent_);
1594 void DownloadManager::FiniHeaders() {
1595 delete header_lists_;
1596 header_lists_ = NULL;
1597 default_headers_ = NULL;
1604 atomic_init32(&multi_threaded_);
1605 int retval = curl_global_init(CURL_GLOBAL_ALL);
1606 assert(retval == CURLE_OK);
1607 pool_handles_idle_ =
new set<CURL *>;
1608 pool_handles_inuse_ =
new set<CURL *>;
1609 pool_max_handles_ = max_pool_handles;
1610 watch_fds_max_ = 4*pool_max_handles_;
1612 opt_timeout_proxy_ = 5;
1613 opt_timeout_direct_ = 10;
1614 opt_low_speed_limit_ = 1024;
1615 opt_proxy_groups_current_ = 0;
1616 opt_proxy_groups_current_burned_ = 0;
1617 opt_num_proxies_ = 0;
1618 opt_proxy_shard_ =
false;
1619 opt_host_chain_current_ = 0;
1622 counters_ =
new Counters(statistics);
1627 curl_multi_ = curl_multi_init();
1628 assert(curl_multi_ != NULL);
1629 curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION, CallbackCurlSocket);
1630 curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1631 static_cast<void *>(
this));
1632 curl_multi_setopt(curl_multi_, CURLMOPT_MAXCONNECTS, watch_fds_max_);
1633 curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1636 prng_.InitLocaltime();
1639 if ((getenv(
"CVMFS_IPV4_ONLY") != NULL) &&
1640 (strlen(getenv(
"CVMFS_IPV4_ONLY")) > 0))
1642 opt_ipv4_only_ =
true;
1645 kDnsDefaultRetries, kDnsDefaultTimeoutMs);
1651 if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1655 pthread_join(thread_download_, NULL);
1657 close(pipe_terminate_[1]);
1658 close(pipe_terminate_[0]);
1659 close(pipe_jobs_[1]);
1660 close(pipe_jobs_[0]);
1663 for (set<CURL *>::iterator i = pool_handles_idle_->begin(),
1664 iEnd = pool_handles_idle_->end(); i != iEnd; ++i)
1666 curl_easy_cleanup(*i);
1668 delete pool_handles_idle_;
1669 delete pool_handles_inuse_;
1670 curl_multi_cleanup(curl_multi_);
1671 pool_handles_idle_ = NULL;
1672 pool_handles_inuse_ = NULL;
1683 delete opt_host_chain_;
1684 delete opt_host_chain_rtt_;
1685 opt_proxy_map_.clear();
1686 delete opt_proxy_groups_;
1687 opt_host_chain_ = NULL;
1688 opt_host_chain_rtt_ = NULL;
1689 opt_proxy_groups_ = NULL;
1691 curl_global_cleanup();
1706 int retval = pthread_create(&thread_download_, NULL, MainDownload,
1707 static_cast<void *>(
this));
1710 atomic_inc32(&multi_threaded_);
1735 if (enable_info_header_ && info->
extra_info) {
1736 const char *header_name =
"cvmfs-info: ";
1737 const size_t header_name_len = strlen(header_name);
1738 const unsigned header_size = 1 + header_name_len +
1740 info->
info_header =
static_cast<char *
>(alloca(header_size));
1741 memcpy(info->
info_header, header_name, header_name_len);
1743 header_size - header_name_len);
1747 if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1755 WritePipe(pipe_jobs_[1], &info,
sizeof(info));
1760 CURL *handle = AcquireCurlHandle();
1761 InitializeRequest(info, handle);
1762 SetUrlOptions(info);
1766 retval = curl_easy_perform(handle);
1769 if (curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &elapsed) == CURLE_OK)
1772 static_cast<int64_t>(elapsed * 1000));
1774 }
while (VerifyAndFinalize(retval, info));
1803 credentials_attachment_ = ca;
1809 std::string DownloadManager::GetDnsServer()
const {
1810 return opt_dns_server_;
1817 void DownloadManager::SetDnsServer(
const string &address) {
1818 if (!address.empty()) {
1820 opt_dns_server_ = address;
1821 assert(!opt_dns_server_.empty());
1823 vector<string> servers;
1824 servers.push_back(address);
1825 bool retval = resolver_->SetResolvers(servers);
1835 void DownloadManager::SetDnsParameters(
1837 const unsigned timeout_ms)
1840 if ((resolver_->retries() ==
retries) &&
1841 (resolver_->timeout_ms() == timeout_ms))
1853 void DownloadManager::SetDnsTtlLimits(
1854 const unsigned min_seconds,
1855 const unsigned max_seconds)
1858 resolver_->set_min_ttl(min_seconds);
1859 resolver_->set_max_ttl(max_seconds);
1865 opt_ip_preference_ = preference;
1874 void DownloadManager::SetTimeout(
const unsigned seconds_proxy,
1875 const unsigned seconds_direct)
1878 opt_timeout_proxy_ = seconds_proxy;
1879 opt_timeout_direct_ = seconds_direct;
1888 void DownloadManager::SetLowSpeedLimit(
const unsigned low_speed_limit) {
1890 opt_low_speed_limit_ = low_speed_limit;
1897 void DownloadManager::GetTimeout(
unsigned *seconds_proxy,
1898 unsigned *seconds_direct)
1901 *seconds_proxy = opt_timeout_proxy_;
1902 *seconds_direct = opt_timeout_direct_;
1910 void DownloadManager::SetHostChain(
const string &host_list) {
1915 void DownloadManager::SetHostChain(
const std::vector<std::string> &host_list) {
1917 opt_timestamp_backup_host_ = 0;
1918 delete opt_host_chain_;
1919 delete opt_host_chain_rtt_;
1920 opt_host_chain_current_ = 0;
1922 if (host_list.empty()) {
1923 opt_host_chain_ = NULL;
1924 opt_host_chain_rtt_ = NULL;
1928 opt_host_chain_ =
new vector<string>(host_list);
1929 opt_host_chain_rtt_ =
1930 new vector<int>(opt_host_chain_->size(), kProbeUnprobed);
1941 void DownloadManager::GetHostInfo(vector<string> *host_chain, vector<int> *rtt,
1942 unsigned *current_host)
1945 if (opt_host_chain_) {
1946 if (current_host) {*current_host = opt_host_chain_current_;}
1947 if (host_chain) {*host_chain = *opt_host_chain_;}
1948 if (rtt) {*rtt = *opt_host_chain_rtt_;}
1961 void DownloadManager::SwitchProxy(
JobInfo *info) {
1964 if (!opt_proxy_groups_) {
1969 vector<ProxyInfo> *group = current_proxy_group();
1970 const unsigned group_size = group->size();
1971 unsigned failed = 0;
1972 for (
unsigned i = 0; i < group_size - opt_proxy_groups_current_burned_; ++i) {
1973 if (info && (info->
proxy == (*group)[i].url)) {
1975 opt_proxy_groups_current_burned_++;
1977 (*group)[group_size - opt_proxy_groups_current_burned_]);
1989 if (opt_proxy_groups_current_burned_ == group->size()) {
1990 opt_proxy_groups_current_burned_ = 0;
1991 if (opt_proxy_groups_->size() > 1) {
1992 opt_proxy_groups_current_ = (opt_proxy_groups_current_ + 1) %
1993 opt_proxy_groups_->size();
1995 if (opt_proxy_groups_reset_after_ > 0) {
1996 if (opt_proxy_groups_current_ > 0) {
1997 if (opt_timestamp_backup_proxies_ == 0)
1998 opt_timestamp_backup_proxies_ = time(NULL);
2002 opt_timestamp_backup_proxies_ = 0;
2006 opt_timestamp_failover_proxies_ = 0;
2011 if (opt_proxy_groups_reset_after_ > 0) {
2012 if (opt_timestamp_failover_proxies_ == 0)
2013 opt_timestamp_failover_proxies_ = time(NULL);
2017 UpdateProxiesUnlocked(
"failed proxy");
2019 current_proxy_group()->
size() - opt_proxy_groups_current_burned_);
2031 if (!opt_host_chain_ || (opt_host_chain_->size() == 1)) {
2037 "don't switch host, "
2038 "last used host: %s, current host: %s",
2040 (*opt_host_chain_)[opt_host_chain_current_].c_str());
2044 string reason =
"manually triggered";
2049 string old_host = (*opt_host_chain_)[opt_host_chain_current_];
2050 opt_host_chain_current_ =
2051 (opt_host_chain_current_ + 1) % opt_host_chain_->size();
2054 "switching host from %s to %s (%s)", old_host.c_str(),
2055 (*opt_host_chain_)[opt_host_chain_current_].c_str(),
2059 if (opt_host_reset_after_ > 0) {
2060 if (opt_host_chain_current_ != 0) {
2061 if (opt_timestamp_backup_host_ == 0)
2062 opt_timestamp_backup_host_ = time(NULL);
2064 opt_timestamp_backup_host_ = 0;
2069 void DownloadManager::SwitchHost() {
2080 void DownloadManager::ProbeHosts() {
2081 vector<string> host_chain;
2082 vector<int> host_rtt;
2083 unsigned current_host;
2085 GetHostInfo(&host_chain, &host_rtt, ¤t_host);
2090 JobInfo info(&url,
false,
false, NULL);
2091 for (retries = 0; retries < 2; ++
retries) {
2092 for (i = 0; i < host_chain.size(); ++i) {
2093 url = host_chain[i] +
"/.cvmfspublished";
2095 struct timeval tv_start, tv_end;
2096 gettimeofday(&tv_start, NULL);
2098 gettimeofday(&tv_end, NULL);
2102 host_rtt[i] =
static_cast<int>(
2105 url.c_str(), host_rtt[i]);
2109 host_rtt[i] = INT_MAX;
2115 for (i = 0; i < host_chain.size(); ++i) {
2116 if (host_rtt[i] == INT_MAX) host_rtt[i] = kProbeDown;
2120 delete opt_host_chain_;
2121 delete opt_host_chain_rtt_;
2122 opt_host_chain_ =
new vector<string>(host_chain);
2123 opt_host_chain_rtt_ =
new vector<int>(host_rtt);
2124 opt_host_chain_current_ = 0;
2127 bool DownloadManager::GeoSortServers(std::vector<std::string> *servers,
2128 std::vector<uint64_t> *output_order) {
2129 if (!servers) {
return false;}
2130 if (servers->size() == 1) {
2132 output_order->clear();
2133 output_order->push_back(0);
2138 std::vector<std::string> host_chain;
2139 GetHostInfo(&host_chain, NULL, NULL);
2141 std::vector<std::string> server_dns_names;
2142 server_dns_names.reserve(servers->size());
2143 for (
unsigned i = 0; i < servers->size(); ++i) {
2145 server_dns_names.push_back(host.empty() ? (*servers)[i] : host);
2147 std::string host_list =
JoinStrings(server_dns_names,
",");
2149 vector<string> host_chain_shuffled;
2154 host_chain_shuffled =
Shuffle(host_chain, &prng_);
2157 bool success =
false;
2158 unsigned max_attempts = std::min(host_chain_shuffled.size(), size_t(3));
2159 vector<uint64_t> geo_order(servers->size());
2160 for (
unsigned i = 0; i < max_attempts; ++i) {
2161 string url = host_chain_shuffled[i] +
"/api/v1.0/geo/@proxy@/" + host_list;
2163 "requesting ordered server list from %s", url.c_str());
2164 JobInfo info(&url,
false,
false, NULL);
2167 string order(info.destination_mem.data, info.destination_mem.size);
2168 free(info.destination_mem.data);
2169 bool retval = ValidateGeoReply(order, servers->size(), &geo_order);
2172 "retrieved invalid GeoAPI reply from %s [%s]",
2173 url.c_str(), order.c_str());
2176 "geographic order of servers retrieved from %s",
2184 "GeoAPI request %s failed with error %d [%s]",
2190 "failed to retrieve geographic order from stratum 1 servers");
2195 output_order->swap(geo_order);
2197 std::vector<std::string> sorted_servers;
2198 sorted_servers.reserve(geo_order.size());
2199 for (
unsigned i = 0; i < geo_order.size(); ++i) {
2200 uint64_t orderval = geo_order[i];
2201 sorted_servers.push_back((*servers)[orderval]);
2203 servers->swap(sorted_servers);
2216 bool DownloadManager::ProbeGeo() {
2217 vector<string> host_chain;
2218 vector<int> host_rtt;
2219 unsigned current_host;
2220 vector< vector<ProxyInfo> > proxy_chain;
2221 unsigned fallback_group;
2223 GetHostInfo(&host_chain, &host_rtt, ¤t_host);
2224 GetProxyInfo(&proxy_chain, NULL, &fallback_group);
2225 if ((host_chain.size() < 2) && ((proxy_chain.size() - fallback_group) < 2))
2228 vector<string> host_names;
2229 for (
unsigned i = 0; i < host_chain.size(); ++i)
2231 SortTeam(&host_names, &host_chain);
2232 unsigned last_geo_host = host_names.size();
2234 if ((fallback_group == 0) && (last_geo_host > 1)) {
2240 host_names.push_back(
"+PXYSEP+");
2244 unsigned first_geo_fallback = host_names.size();
2245 for (
unsigned i = fallback_group; i < proxy_chain.size(); ++i) {
2248 host_names.push_back(proxy_chain[i][0].host.name());
2251 std::vector<uint64_t> geo_order;
2252 bool success = GeoSortServers(&host_names, &geo_order);
2260 delete opt_host_chain_;
2261 opt_num_proxies_ = 0;
2262 opt_host_chain_ =
new vector<string>(host_chain.size());
2266 vector<vector<ProxyInfo> > *proxy_groups =
new vector<vector<ProxyInfo> >(
2267 opt_proxy_groups_fallback_ + proxy_chain.size() - fallback_group);
2269 for (
unsigned i = 0; i < opt_proxy_groups_fallback_; ++i) {
2270 (*proxy_groups)[i] = (*opt_proxy_groups_)[i];
2271 opt_num_proxies_ += (*opt_proxy_groups_)[i].size();
2279 unsigned proxyi = opt_proxy_groups_fallback_;
2280 for (
unsigned i = 0; i < geo_order.size(); ++i) {
2281 uint64_t orderval = geo_order[i];
2282 if (orderval < static_cast<uint64_t>(last_geo_host)) {
2285 (*opt_host_chain_)[hosti++] = host_chain[orderval];
2286 }
else if (orderval >= static_cast<uint64_t>(first_geo_fallback)) {
2290 (*proxy_groups)[proxyi] =
2291 proxy_chain[fallback_group + orderval - first_geo_fallback];
2292 opt_num_proxies_ += (*proxy_groups)[proxyi].size();
2297 opt_proxy_map_.clear();
2298 delete opt_proxy_groups_;
2299 opt_proxy_groups_ = proxy_groups;
2302 if (opt_proxy_groups_current_ > opt_proxy_groups_->size()) {
2303 if (opt_proxy_groups_->size() == 0) {
2304 opt_proxy_groups_current_ = 0;
2306 opt_proxy_groups_current_ = opt_proxy_groups_->size() - 1;
2308 opt_proxy_groups_current_burned_ = 0;
2311 UpdateProxiesUnlocked(
"geosort");
2313 delete opt_host_chain_rtt_;
2314 opt_host_chain_rtt_ =
new vector<int>(host_chain.size(), kProbeGeo);
2315 opt_host_chain_current_ = 0;
2328 bool DownloadManager::ValidateGeoReply(
2329 const string &reply_order,
2330 const unsigned expected_size,
2331 vector<uint64_t> *reply_vals)
2333 if (reply_order.empty())
2336 if (!sanitizer.
IsValid(reply_order))
2339 vector<string> reply_strings =
2341 vector<uint64_t> tmp_vals;
2342 for (
unsigned i = 0; i < reply_strings.size(); ++i) {
2343 if (reply_strings[i].empty())
2347 if (tmp_vals.size() != expected_size)
2351 set<uint64_t> coverage(tmp_vals.begin(), tmp_vals.end());
2352 if (coverage.size() != tmp_vals.size())
2354 if ((*coverage.begin() != 1) || (*coverage.rbegin() != coverage.size()))
2357 for (
unsigned i = 0; i < expected_size; ++i) {
2358 (*reply_vals)[i] = tmp_vals[i] - 1;
2368 bool DownloadManager::StripDirect(
2369 const string &proxy_list,
2370 string *cleaned_list)
2373 if (proxy_list ==
"") {
2377 bool result =
false;
2379 vector<string> proxy_groups =
SplitString(proxy_list,
';');
2380 vector<string> cleaned_groups;
2381 for (
unsigned i = 0; i < proxy_groups.size(); ++i) {
2382 vector<string> group =
SplitString(proxy_groups[i],
'|');
2383 vector<string> cleaned;
2384 for (
unsigned j = 0; j < group.size(); ++j) {
2385 if ((group[j] ==
"DIRECT") || (group[j] ==
"")) {
2388 cleaned.push_back(group[j]);
2391 if (!cleaned.empty())
2392 cleaned_groups.push_back(
JoinStrings(cleaned,
"|"));
2408 void DownloadManager::SetProxyChain(
2409 const string &proxy_list,
2410 const string &fallback_proxy_list,
2415 opt_timestamp_backup_proxies_ = 0;
2416 opt_timestamp_failover_proxies_ = 0;
2417 string set_proxy_list = opt_proxy_list_;
2418 string set_proxy_fallback_list = opt_proxy_fallback_list_;
2419 bool contains_direct;
2420 if ((set_mode == kSetProxyFallback) || (set_mode == kSetProxyBoth)) {
2421 opt_proxy_fallback_list_ = fallback_proxy_list;
2423 if ((set_mode == kSetProxyRegular) || (set_mode == kSetProxyBoth)) {
2424 opt_proxy_list_ = proxy_list;
2427 StripDirect(opt_proxy_fallback_list_, &set_proxy_fallback_list);
2428 if (contains_direct) {
2430 "fallback proxies do not support DIRECT, removing");
2432 if (set_proxy_fallback_list ==
"") {
2433 set_proxy_list = opt_proxy_list_;
2435 bool contains_direct = StripDirect(opt_proxy_list_, &set_proxy_list);
2436 if (contains_direct) {
2438 "skipping DIRECT proxy to use fallback proxy");
2445 opt_proxy_map_.clear();
2446 delete opt_proxy_groups_;
2447 if ((set_proxy_list ==
"") && (set_proxy_fallback_list ==
"")) {
2448 opt_proxy_groups_ = NULL;
2449 opt_proxy_groups_current_ = 0;
2450 opt_proxy_groups_current_burned_ = 0;
2451 opt_proxy_groups_fallback_ = 0;
2452 opt_num_proxies_ = 0;
2457 opt_proxy_groups_fallback_ = 0;
2458 if (set_proxy_list !=
"") {
2459 opt_proxy_groups_fallback_ =
SplitString(set_proxy_list,
';').size();
2462 opt_proxy_groups_fallback_);
2466 string all_proxy_list = set_proxy_list;
2467 if (set_proxy_fallback_list !=
"") {
2468 if (all_proxy_list !=
"")
2469 all_proxy_list +=
";";
2470 all_proxy_list += set_proxy_fallback_list;
2473 all_proxy_list.c_str());
2476 vector<string> hostnames;
2477 vector<string> proxy_groups;
2478 if (all_proxy_list !=
"")
2480 for (
unsigned i = 0; i < proxy_groups.size(); ++i) {
2481 vector<string> this_group =
SplitString(proxy_groups[i],
'|');
2482 for (
unsigned j = 0; j < this_group.size(); ++j) {
2488 hostnames.push_back(hostname);
2491 vector<dns::Host> hosts;
2494 resolver_->ResolveMany(hostnames, &hosts);
2498 opt_proxy_groups_ =
new vector< vector<ProxyInfo> >();
2499 opt_num_proxies_ = 0;
2500 unsigned num_proxy = 0;
2501 for (
unsigned i = 0; i < proxy_groups.size(); ++i) {
2502 vector<string> this_group =
SplitString(proxy_groups[i],
'|');
2506 vector<ProxyInfo> infos;
2507 for (
unsigned j = 0; j < this_group.size(); ++j, ++num_proxy) {
2509 if (this_group[j] ==
"DIRECT") {
2516 "failed to resolve IP addresses for %s (%d - %s)",
2517 hosts[num_proxy].name().c_str(), hosts[num_proxy].status(),
2521 infos.push_back(
ProxyInfo(failed_host, this_group[j]));
2526 set<string> best_addresses =
2528 set<string>::const_iterator iter_ips = best_addresses.begin();
2529 for (; iter_ips != best_addresses.end(); ++iter_ips) {
2531 infos.push_back(
ProxyInfo(hosts[num_proxy], url_ip));
2534 opt_proxy_groups_->push_back(infos);
2535 opt_num_proxies_ += infos.size();
2538 "installed %u proxies in %u load-balance groups",
2539 opt_num_proxies_, opt_proxy_groups_->size());
2540 opt_proxy_groups_current_ = 0;
2541 opt_proxy_groups_current_burned_ = 0;
2544 if (opt_proxy_groups_->size() > 0) {
2546 UpdateProxiesUnlocked(
"set proxies");
2557 void DownloadManager::GetProxyInfo(vector< vector<ProxyInfo> > *proxy_chain,
2558 unsigned *current_group,
2559 unsigned *fallback_group)
2561 assert(proxy_chain != NULL);
2565 if (!opt_proxy_groups_) {
2566 vector< vector<ProxyInfo> > empty_chain;
2567 *proxy_chain = empty_chain;
2568 if (current_group != NULL)
2570 if (fallback_group != NULL)
2571 *fallback_group = 0;
2575 *proxy_chain = *opt_proxy_groups_;
2576 if (current_group != NULL)
2577 *current_group = opt_proxy_groups_current_;
2578 if (fallback_group != NULL)
2579 *fallback_group = opt_proxy_groups_fallback_;
2582 string DownloadManager::GetProxyList() {
2583 return opt_proxy_list_;
2586 string DownloadManager::GetFallbackProxyList() {
2587 return opt_proxy_fallback_list_;
2595 if (!opt_proxy_groups_)
2598 uint32_t key = (hash ? hash->
Partial32() : 0);
2599 map<uint32_t, ProxyInfo *>::iterator it = opt_proxy_map_.lower_bound(key);
2608 void DownloadManager::UpdateProxiesUnlocked(
const string &reason) {
2609 if (!opt_proxy_groups_)
2613 vector<ProxyInfo> *group = current_proxy_group();
2614 unsigned num_alive = (group->size() - opt_proxy_groups_current_burned_);
2615 string old_proxy =
JoinStrings(opt_proxy_urls_,
"|");
2618 opt_proxy_map_.clear();
2619 opt_proxy_urls_.clear();
2620 const uint32_t max_key = 0xffffffffUL;
2621 if (opt_proxy_shard_) {
2623 for (
unsigned i = 0; i < num_alive; ++i) {
2629 for (
unsigned j = 0; j < kProxyMapScale; ++j) {
2630 const std::pair<uint32_t, ProxyInfo *> entry(prng.
Next(max_key), proxy);
2631 opt_proxy_map_.insert(entry);
2633 opt_proxy_urls_.push_back(proxy->
url);
2636 ProxyInfo *first_proxy = opt_proxy_map_.begin()->second;
2637 const std::pair<uint32_t, ProxyInfo *> last_entry(max_key, first_proxy);
2638 opt_proxy_map_.insert(last_entry);
2641 unsigned select = prng_.Next(num_alive);
2643 const std::pair<uint32_t, ProxyInfo *> entry(max_key, proxy);
2644 opt_proxy_map_.insert(entry);
2645 opt_proxy_urls_.push_back(proxy->
url);
2647 sort(opt_proxy_urls_.begin(), opt_proxy_urls_.end());
2650 string new_proxy =
JoinStrings(opt_proxy_urls_,
"|");
2651 if (new_proxy != old_proxy) {
2653 "switching proxy from %s to %s (%s)",
2654 (old_proxy.empty() ?
"(none)" : old_proxy.c_str()),
2655 (new_proxy.empty() ?
"(none)" : new_proxy.c_str()),
2663 void DownloadManager::ShardProxies() {
2664 opt_proxy_shard_ =
true;
2665 RebalanceProxiesUnlocked(
"enable sharding");
2672 void DownloadManager::RebalanceProxiesUnlocked(
const string &reason) {
2673 if (!opt_proxy_groups_)
2676 opt_timestamp_failover_proxies_ = 0;
2677 opt_proxy_groups_current_burned_ = 0;
2678 UpdateProxiesUnlocked(reason);
2682 void DownloadManager::RebalanceProxies() {
2684 RebalanceProxiesUnlocked(
"rebalance");
2691 void DownloadManager::SwitchProxyGroup() {
2694 if (!opt_proxy_groups_ || (opt_proxy_groups_->size() < 2)) {
2698 opt_proxy_groups_current_ = (opt_proxy_groups_current_ + 1) %
2699 opt_proxy_groups_->size();
2700 opt_timestamp_backup_proxies_ = time(NULL);
2701 RebalanceProxiesUnlocked(
"switch proxy group");
2705 void DownloadManager::SetProxyGroupResetDelay(
const unsigned seconds) {
2707 opt_proxy_groups_reset_after_ = seconds;
2708 if (opt_proxy_groups_reset_after_ == 0) {
2709 opt_timestamp_backup_proxies_ = 0;
2710 opt_timestamp_failover_proxies_ = 0;
2715 void DownloadManager::SetHostResetDelay(
const unsigned seconds)
2718 opt_host_reset_after_ = seconds;
2719 if (opt_host_reset_after_ == 0)
2720 opt_timestamp_backup_host_ = 0;
2724 void DownloadManager::SetRetryParameters(
const unsigned max_retries,
2725 const unsigned backoff_init_ms,
2726 const unsigned backoff_max_ms)
2729 opt_max_retries_ = max_retries;
2730 opt_backoff_init_ms_ = backoff_init_ms;
2731 opt_backoff_max_ms_ = backoff_max_ms;
2735 void DownloadManager::SetMaxIpaddrPerProxy(
unsigned limit) {
2737 resolver_->set_throttle(limit);
2741 void DownloadManager::SetProxyTemplates(
2742 const std::string &direct,
2743 const std::string &forced)
2746 proxy_template_direct_ = direct;
2747 proxy_template_forced_ = forced;
2751 void DownloadManager::EnableInfoHeader() {
2752 enable_info_header_ =
true;
2756 void DownloadManager::EnableRedirects() {
2757 follow_redirects_ =
true;
2760 void DownloadManager::UseSystemCertificatePath() {
2761 ssl_certificate_store_.UseSystemCertificatePath();
2772 clone->
Init(pool_max_handles_, statistics);
2778 if (!opt_dns_server_.empty())
2788 if (opt_host_chain_) {
2792 CloneProxyConfig(clone);
2813 if (opt_proxy_groups_ == NULL)
2817 *opt_proxy_groups_);
unsigned opt_timeout_direct_
unsigned opt_low_speed_limit_
#define LogCvmfs(source, mask,...)
std::vector< T > Shuffle(const std::vector< T > &input, Prng *prng)
const char * Code2Ascii(const ObjectFetcherFailures::Failures error)
unsigned opt_backoff_init_ms_
static unsigned EscapeHeader(const string &header, char *escaped_buf, size_t buf_size)
unsigned char num_used_hosts
static bool EscapeUrlChar(char input, char output[3])
int64_t Xadd(class Counter *counter, const int64_t delta)
const char * Code2Ascii(const Failures error)
unsigned opt_proxy_groups_current_burned_
double DiffTimeSeconds(struct timeval start, struct timeval end)
unsigned opt_proxy_groups_reset_after_
void SetUrlOptions(JobInfo *info)
StreamStates DecompressZStream2Sink(const void *buf, const int64_t size, z_stream *strm, cvmfs::Sink *sink)
std::string opt_proxy_fallback_list_
static NormalResolver * Create(const bool ipv4_only, const unsigned retries, const unsigned timeout_ms)
vector< string > SplitString(const string &str, const char delim, const unsigned max_chunks)
unsigned opt_host_reset_after_
std::string proxy_template_direct_
string ReplaceAll(const string &haystack, const string &needle, const string &replace_by)
string JoinStrings(const vector< string > &strings, const string &joint)
void Init(ContextPtr context)
std::string ToString(const bool with_suffix=false) const
unsigned opt_proxy_groups_current_
shash::ContextPtr hash_context
void DecompressInit(z_stream *strm)
unsigned int current_host_chain_index
bool DecompressMem2Mem(const void *buf, const int64_t size, void **out_buf, uint64_t *out_size)
std::set< CURL * > * pool_handles_inuse_
StreamStates DecompressZStream2File(const void *buf, const int64_t size, z_stream *strm, FILE *f)
std::string opt_proxy_list_
const std::string & name() const
perf::Counter * sz_transfer_time
std::vector< std::vector< ProxyInfo > > * opt_proxy_groups_
assert((mem||(size==0))&&"Out Of Memory")
unsigned opt_proxy_groups_fallback_
void ReleaseCurlHandle(CURL *handle)
const std::set< std::string > & ViewBestAddresses(IpPreference preference) const
void SetDnsServer(const std::string &address)
void DecompressFini(z_stream *strm)
void InitSeed(const uint64_t seed)
void MakePipe(int pipe_fd[2])
uint32_t watch_fds_inuse_
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, void *info_link)
void SetDnsTtlLimits(const unsigned min_seconds, const unsigned max_seconds)
static Failures PrepareDownloadDestination(JobInfo *info)
void Init(const unsigned max_pool_handles, const perf::StatisticsTemplate &statistics)
const char * Code2Ascii(const Failures error)
void Update(const unsigned char *buffer, const unsigned buffer_length, ContextPtr context)
unsigned char num_retries
void Final(ContextPtr context, Any *any_digest)
std::string AddDefaultScheme(const std::string &proxy)
static string EscapeUrl(const string &url)
dns::IpPreference opt_ip_preference_
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, void *info_link)
Failures Fetch(const std::string &base_url, const std::string &repository_name, const uint64_t minimum_timestamp, const shash::Any *base_catalog, signature::SignatureManager *signature_manager, download::DownloadManager *download_manager, ManifestEnsemble *ensemble)
void UpdateProxiesUnlocked(const std::string &reason)
bool IsEquivalent(const Host &other) const
bool IsProxyTransferError(const Failures error)
unsigned GetContextSize(const Algorithms algorithm)
perf::Counter * n_requests
static int Init(const loader::LoaderExports *loader_exports)
string StringifyInt(const int64_t value)
void SetMaxIpaddrPerProxy(unsigned limit)
const shash::Any * expected_hash
CURL * AcquireCurlHandle()
void Inc(class Counter *counter)
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
const std::string * extra_info
std::string ExtractHost(const std::string &url)
std::vector< int > * opt_host_chain_rtt_
cvmfs::Sink * destination_sink
SslCertificateStore ssl_certificate_store_
uint32_t Partial32() const
unsigned opt_backoff_max_ms_
unsigned opt_num_proxies_
CredentialsAttachment * credentials_attachment_
std::vector< std::string > * opt_host_chain_
struct pollfd * watch_fds_
uint64_t String2Uint64(const string &value)
unsigned opt_max_retries_
struct download::JobInfo::@3 destination_mem
std::string proxy_template_forced_
const std::string * destination_path
void SetDnsParameters(const unsigned retries, const unsigned timeout_ms)
static Host ExtendDeadline(const Host &original, unsigned seconds_from_now)
void SafeSleepMs(const unsigned ms)
bool IsHostTransferError(const Failures error)
void SortTeam(std::vector< T > *tractor, std::vector< U > *towed)
unsigned char num_used_proxies
void WritePipe(int fd, const void *buf, size_t nbyte)
unsigned opt_timeout_proxy_
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
void ReadPipe(int fd, void *buf, size_t nbyte)
void HashString(const std::string &content, Any *any_digest)
void InitializeRequest(JobInfo *info, CURL *handle)
string RewriteUrl(const string &url, const string &ip)
uint32_t Next(const uint64_t boundary)
virtual int64_t Write(const void *buf, uint64_t sz)=0