29 #define __STDC_FORMAT_MACROS
31 #include "cvmfs_config.h"
74 if (((input >=
'0') && (input <=
'9')) ||
75 ((input >=
'A') && (input <=
'Z')) ||
76 ((input >=
'a') && (input <=
'z')) ||
77 (input ==
'/') || (input ==
':') || (input ==
'.') ||
79 (input ==
'+') || (input ==
'-') ||
80 (input ==
'_') || (input ==
'~') ||
81 (input ==
'[') || (input ==
']') || (input ==
','))
88 output[1] =
static_cast<char>(
89 (input / 16) + ((input / 16 <= 9) ?
'0' :
'A'-10));
90 output[2] =
static_cast<char>(
91 (input % 16) + ((input % 16 <= 9) ?
'0' :
'A'-10));
102 escaped.reserve(url.length());
104 char escaped_char[3];
105 for (
unsigned i = 0, s = url.length(); i < s; ++i) {
107 escaped.append(escaped_char, 3);
109 escaped.push_back(escaped_char[0]);
113 url.c_str(), escaped.c_str());
127 unsigned esc_pos = 0;
128 char escaped_char[3];
129 for (
unsigned i = 0, s = header.size(); i < s; ++i) {
131 for (
unsigned j = 0; j < 3; ++j) {
133 if (esc_pos >= buf_size)
135 escaped_buf[esc_pos] = escaped_char[j];
141 if (esc_pos >= buf_size)
143 escaped_buf[esc_pos] = escaped_char[0];
185 const size_t num_bytes = size*nmemb;
186 const string header_line(static_cast<const char *>(ptr), num_bytes);
193 if (
HasPrefix(header_line,
"HTTP/1.",
false)) {
194 if (header_line.length() < 10)
198 for (i = 8; (i < header_line.length()) && (header_line[i] ==
' '); ++i) {}
201 if (header_line.length() > i+2) {
202 info->
http_code = DownloadManager::ParseHttpCode(&header_line[i]);
214 header_line.c_str());
219 header_line.c_str());
245 HasPrefix(header_line,
"CONTENT-LENGTH:",
true))
247 char *tmp =
reinterpret_cast<char *
>(alloca(num_bytes+1));
249 sscanf(header_line.c_str(),
"%s %" PRIu64, tmp, &length);
251 if (length > DownloadManager::kMaxMemSize) {
253 "resource %s too large to store in memory (%" PRIu64
")",
254 info->
url->c_str(), length);
264 }
else if (
HasPrefix(header_line,
"LOCATION:",
true)) {
267 }
else if (
HasPrefix(header_line,
"X-SQUID-ERROR:",
true)) {
272 }
else if (
HasPrefix(header_line,
"PROXY-STATUS:",
true)) {
275 (header_line.find(
"error=") != string::npos)) {
290 const size_t num_bytes = size*nmemb;
315 "decompressing %s, local IO error", info->
url->c_str());
321 if ((written < 0) || (static_cast<uint64_t>(written) != num_bytes)) {
323 PRId64
")", info->
url->c_str(), written);
333 "Content-Length was missing or zero, but %zu bytes received",
337 "start %zu, bytes %zu, expected %zu",
363 "decompressing %s, local IO error", info->
url->c_str());
370 "downloading %s, IO failure: %s (errno=%d)",
371 info->
url->c_str(), strerror(errno), errno);
385 bool JobInfo::IsFileNotFound() {
389 return http_code == 404;
396 const int DownloadManager::kProbeUnprobed = -1;
397 const int DownloadManager::kProbeDown = -2;
398 const int DownloadManager::kProbeGeo = -3;
399 const unsigned DownloadManager::kMaxMemSize = 1024*1024;
405 int DownloadManager::ParseHttpCode(
const char digits[3]) {
408 for (
int i = 0; i < 3; ++i) {
409 if ((digits[i] <
'0') || (digits[i] >
'9'))
411 result += (digits[i] -
'0') * factor;
421 int DownloadManager::CallbackCurlSocket(CURL * ,
430 if (action == CURL_POLL_NONE)
446 download_mgr->
watch_fds_ =
static_cast<struct pollfd *
>(
458 download_mgr->
watch_fds_[index].events = POLLIN | POLLPRI;
461 download_mgr->
watch_fds_[index].events = POLLOUT | POLLWRBAND;
463 case CURL_POLL_INOUT:
465 POLLIN | POLLPRI | POLLOUT | POLLWRBAND;
467 case CURL_POLL_REMOVE:
468 if (index < download_mgr->watch_fds_inuse_-1) {
480 download_mgr->
watch_fds_ =
static_cast<struct pollfd *
>(
498 void *DownloadManager::MainDownload(
void *data) {
503 static_cast<struct pollfd *
>(smalloc(2 *
sizeof(
struct pollfd)));
506 download_mgr->
watch_fds_[0].events = POLLIN | POLLPRI;
509 download_mgr->
watch_fds_[1].events = POLLIN | POLLPRI;
513 int still_running = 0;
514 struct timeval timeval_start, timeval_stop;
515 gettimeofday(&timeval_start, NULL);
531 gettimeofday(&timeval_stop, NULL);
532 int64_t delta =
static_cast<int64_t
>(
544 curl_multi_socket_action(download_mgr->
curl_multi_,
561 gettimeofday(&timeval_start, NULL);
565 curl_multi_add_handle(download_mgr->
curl_multi_, handle);
566 curl_multi_socket_action(download_mgr->
curl_multi_,
583 if (download_mgr->
watch_fds_[i].revents & (POLLIN | POLLPRI))
584 ev_bitmask |= CURL_CSELECT_IN;
585 if (download_mgr->
watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
586 ev_bitmask |= CURL_CSELECT_OUT;
588 (POLLERR | POLLHUP | POLLNVAL))
590 ev_bitmask |= CURL_CSELECT_ERR;
594 curl_multi_socket_action(download_mgr->
curl_multi_,
604 while ((curl_msg = curl_multi_info_read(download_mgr->
curl_multi_,
607 if (curl_msg->msg == CURLMSG_DONE) {
610 CURL *easy_handle = curl_msg->easy_handle;
611 int curl_error = curl_msg->
data.result;
612 curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
614 curl_multi_remove_handle(download_mgr->
curl_multi_, easy_handle);
616 curl_multi_add_handle(download_mgr->
curl_multi_, easy_handle);
617 curl_multi_socket_action(download_mgr->
curl_multi_,
635 curl_multi_remove_handle(download_mgr->
curl_multi_, *i);
636 curl_easy_cleanup(*i);
649 HeaderLists::~HeaderLists() {
650 for (
unsigned i = 0; i < blocks_.size(); ++i) {
657 curl_slist *HeaderLists::GetList(
const char *header) {
662 curl_slist *HeaderLists::DuplicateList(curl_slist *slist) {
664 curl_slist *copy = GetList(slist->data);
665 copy->next = slist->next;
666 curl_slist *prev = copy;
669 curl_slist *new_link = Get(slist->data);
670 new_link->next = slist->next;
671 prev->next = new_link;
679 void HeaderLists::AppendHeader(curl_slist *slist,
const char *header) {
681 curl_slist *new_link = Get(header);
682 new_link->next = NULL;
686 slist->next = new_link;
695 void HeaderLists::CutHeader(
const char *header, curl_slist **slist) {
699 curl_slist *prev = &head;
700 curl_slist *rover = *slist;
702 if (strcmp(rover->data, header) == 0) {
703 prev->next = rover->next;
714 void HeaderLists::PutList(curl_slist *slist) {
716 curl_slist *next = slist->next;
723 string HeaderLists::Print(curl_slist *slist) {
726 verbose += string(slist->data) +
"\n";
733 curl_slist *HeaderLists::Get(
const char *header) {
734 for (
unsigned i = 0; i < blocks_.size(); ++i) {
735 for (
unsigned j = 0; j < kBlockSize; ++j) {
736 if (!IsUsed(&(blocks_[i][j]))) {
737 blocks_[i][j].data =
const_cast<char *
>(header);
738 return &(blocks_[i][j]);
745 blocks_[blocks_.size()-1][0].data =
const_cast<char *
>(header);
746 return &(blocks_[blocks_.size()-1][0]);
750 void HeaderLists::Put(curl_slist *slist) {
756 void HeaderLists::AddBlock() {
757 curl_slist *new_block =
new curl_slist[kBlockSize];
758 for (
unsigned i = 0; i < kBlockSize; ++i) {
761 blocks_.push_back(new_block);
768 string DownloadManager::ProxyInfo::Print() {
774 static_cast<int>(host.deadline()) - static_cast<int>(time(NULL));
775 string expinfo = (remaining >= 0) ?
"+" :
"";
776 if (abs(remaining) >= 3600) {
778 }
else if (abs(remaining) >= 60) {
784 result +=
" (" + host.name() +
", " + expinfo +
")";
786 result +=
" (:unresolved:, " + expinfo +
")";
796 CURL *DownloadManager::AcquireCurlHandle() {
799 if (pool_handles_idle_->empty()) {
801 handle = curl_easy_init();
804 curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
809 handle = *(pool_handles_idle_->begin());
810 pool_handles_idle_->erase(pool_handles_idle_->begin());
813 pool_handles_inuse_->insert(handle);
819 void DownloadManager::ReleaseCurlHandle(CURL *handle) {
820 set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
821 assert(elem != pool_handles_inuse_->end());
823 if (pool_handles_idle_->size() > pool_max_handles_) {
824 curl_easy_cleanup(*elem);
826 pool_handles_idle_->insert(*elem);
829 pool_handles_inuse_->erase(elem);
837 void DownloadManager::InitializeRequest(
JobInfo *info, CURL *handle) {
847 info->
headers = header_lists_->DuplicateList(default_headers_);
865 char byte_range_array[100];
866 const int64_t range_lower =
static_cast<int64_t
>(info->
range_offset);
867 const int64_t range_upper =
static_cast<int64_t
>(
869 if (snprintf(byte_range_array,
sizeof(byte_range_array),
870 "%" PRId64
"-%" PRId64,
871 range_lower, range_upper) == 100)
875 curl_easy_setopt(handle, CURLOPT_RANGE, byte_range_array);
877 curl_easy_setopt(handle, CURLOPT_RANGE, NULL);
881 curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
882 curl_easy_setopt(handle, CURLOPT_WRITEHEADER,
883 static_cast<void *>(info));
884 curl_easy_setopt(handle, CURLOPT_WRITEDATA, static_cast<void *>(info));
885 curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->
headers);
887 curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
889 curl_easy_setopt(handle, CURLOPT_HTTPGET, 1);
891 if (opt_ipv4_only_) {
892 curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
894 if (follow_redirects_) {
895 curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1);
896 curl_easy_setopt(handle, CURLOPT_MAXREDIRS, 4);
905 void DownloadManager::SetUrlOptions(
JobInfo *info) {
911 if (opt_timestamp_backup_proxies_ > 0) {
912 const time_t now = time(NULL);
913 if (static_cast<int64_t>(now) >
914 static_cast<int64_t>(opt_timestamp_backup_proxies_ +
915 opt_proxy_groups_reset_after_))
917 opt_proxy_groups_current_ = 0;
918 opt_timestamp_backup_proxies_ = 0;
919 RebalanceProxiesUnlocked(
"reset proxy group");
923 if (opt_timestamp_failover_proxies_ > 0) {
924 const time_t now = time(NULL);
925 if (static_cast<int64_t>(now) >
926 static_cast<int64_t>(opt_timestamp_failover_proxies_ +
927 opt_proxy_groups_reset_after_))
929 RebalanceProxiesUnlocked(
"reset load-balanced proxies");
933 if (opt_timestamp_backup_host_ > 0) {
934 const time_t now = time(NULL);
935 if (static_cast<int64_t>(now) >
936 static_cast<int64_t>(opt_timestamp_backup_host_ +
937 opt_host_reset_after_))
940 "switching host from %s to %s (reset host)",
941 (*opt_host_chain_)[opt_host_chain_current_].c_str(),
942 (*opt_host_chain_)[0].c_str());
943 opt_host_chain_current_ = 0;
944 opt_timestamp_backup_host_ = 0;
949 if (!proxy || (proxy->
url ==
"DIRECT")) {
950 info->
proxy =
"DIRECT";
951 curl_easy_setopt(info->
curl_handle, CURLOPT_PROXY,
"");
956 std::string purl = proxy->
url;
958 const bool changed = ValidateProxyIpsUnlocked(purl, phost);
967 curl_easy_setopt(info->
curl_handle, CURLOPT_PROXY,
"0.0.0.0");
970 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, opt_low_speed_limit_);
971 if (info->
proxy !=
"DIRECT") {
972 curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_proxy_);
973 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_proxy_);
975 curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_direct_);
976 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_direct_);
978 if (!opt_dns_server_.empty())
979 curl_easy_setopt(curl_handle, CURLOPT_DNS_SERVERS, opt_dns_server_.c_str());
982 url_prefix = (*opt_host_chain_)[opt_host_chain_current_];
986 string url = url_prefix + *(info->
url);
988 curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 1L);
989 if (url.substr(0, 5) ==
"https") {
990 bool rvb = ssl_certificate_store_.ApplySslCertificatePath(curl_handle);
993 "Failed to set SSL certificate path %s",
994 ssl_certificate_store_.GetCaPath().c_str());
996 if (info->
pid != -1) {
997 if (credentials_attachment_ == NULL) {
999 "uses secure downloads but no credentials attachment set");
1001 bool retval = credentials_attachment_->ConfigureCurlHandle(
1012 signal(SIGPIPE, SIG_IGN);
1015 if (url.find(
"@proxy@") != string::npos) {
1023 if (proxy_template_forced_ !=
"") {
1024 replacement = proxy_template_forced_;
1025 }
else if (info->
proxy ==
"DIRECT") {
1026 replacement = proxy_template_direct_;
1028 if (opt_proxy_groups_current_ >= opt_proxy_groups_fallback_) {
1031 info->
proxy =
"DIRECT";
1032 curl_easy_setopt(info->
curl_handle, CURLOPT_PROXY,
"");
1033 replacement = proxy_template_direct_;
1035 replacement = ChooseProxyUnlocked(info->
expected_hash)->host.name();
1038 replacement = (replacement ==
"") ? proxy_template_direct_ : replacement;
1040 replacement.c_str());
1041 url =
ReplaceAll(url,
"@proxy@", replacement);
1052 curl_easy_setopt(curl_handle, CURLOPT_URL,
EscapeUrl(url).c_str());
1065 bool DownloadManager::ValidateProxyIpsUnlocked(
1072 host.
name().c_str());
1074 unsigned group_idx = opt_proxy_groups_current_;
1077 bool update_only =
true;
1081 "failed to resolve IP addresses for %s (%d - %s)",
1086 update_only =
false;
1090 for (
unsigned i = 0; i < (*opt_proxy_groups_)[group_idx].size(); ++i) {
1091 if ((*opt_proxy_groups_)[group_idx][i].host.id() == host.
id())
1092 (*opt_proxy_groups_)[group_idx][i].host = new_host;
1101 "DNS entries for proxy %s changed, adjusting", host.
name().c_str());
1102 vector<ProxyInfo> *group = current_proxy_group();
1103 opt_num_proxies_ -= group->size();
1104 for (
unsigned i = 0; i < group->size(); ) {
1105 if ((*group)[i].host.id() == host.
id()) {
1106 group->erase(group->begin() + i);
1111 vector<ProxyInfo> new_infos;
1113 set<string>::const_iterator iter_ips = best_addresses.begin();
1114 for (; iter_ips != best_addresses.end(); ++iter_ips) {
1116 new_infos.push_back(
ProxyInfo(new_host, url_ip));
1118 group->insert(group->end(), new_infos.begin(), new_infos.end());
1119 opt_num_proxies_ += new_infos.size();
1121 RebalanceProxiesUnlocked(
"DNS change");
1129 void DownloadManager::UpdateStatistics(CURL *handle) {
1134 retval = curl_easy_getinfo(handle, CURLINFO_SIZE_DOWNLOAD, &val);
1135 assert(retval == CURLE_OK);
1136 sum +=
static_cast<int64_t
>(val);
1140 perf::Xadd(counters_->sz_transferred_bytes, sum);
1147 bool DownloadManager::CanRetry(
const JobInfo *info) {
1149 unsigned max_retries = opt_max_retries_;
1164 unsigned backoff_init_ms = 0;
1165 unsigned backoff_max_ms = 0;
1168 backoff_init_ms = opt_backoff_init_ms_;
1169 backoff_max_ms = opt_backoff_max_ms_;
1175 info->
backoff_ms = prng_.Next(backoff_init_ms + 1);
1188 header_lists_->AppendHeader(info->
headers,
"Pragma: no-cache");
1189 header_lists_->AppendHeader(info->
headers,
"Cache-Control: no-cache");
1199 void DownloadManager::SetRegularCache(
JobInfo *info) {
1202 header_lists_->CutHeader(
"Pragma: no-cache", &(info->
headers));
1203 header_lists_->CutHeader(
"Cache-Control: no-cache", &(info->
headers));
1212 void DownloadManager::ReleaseCredential(
JobInfo *info) {
1214 assert(credentials_attachment_ != NULL);
1215 credentials_attachment_->ReleaseCurlHandle(info->
curl_handle,
1228 bool DownloadManager::VerifyAndFinalize(
const int curl_error,
JobInfo *info) {
1230 "Verify downloaded url %s, proxy %s (curl error %d)",
1231 info->
url->c_str(), info->
proxy.c_str(), curl_error);
1235 switch (curl_error) {
1243 "hash verification of %s failed (expected %s, got %s)",
1265 "decompression (memory) of url %s failed",
1266 info->
url->c_str());
1274 case CURLE_UNSUPPORTED_PROTOCOL:
1277 case CURLE_URL_MALFORMAT:
1280 case CURLE_COULDNT_RESOLVE_PROXY:
1283 case CURLE_COULDNT_RESOLVE_HOST:
1286 case CURLE_OPERATION_TIMEDOUT:
1290 case CURLE_PARTIAL_FILE:
1291 case CURLE_GOT_NOTHING:
1292 case CURLE_RECV_ERROR:
1296 case CURLE_FILE_COULDNT_READ_FILE:
1297 case CURLE_COULDNT_CONNECT:
1298 if (info->
proxy !=
"DIRECT") {
1305 case CURLE_TOO_MANY_REDIRECTS:
1308 case CURLE_SSL_CACERT_BADFILE:
1310 "Failed to load certificate bundle. "
1311 "X509_CERT_BUNDLE might point to the wrong location.");
1316 case CURLE_PEER_FAILED_VERIFICATION:
1318 "invalid SSL certificate of remote host. "
1319 "X509_CERT_DIR and/or X509_CERT_BUNDLE might point to the wrong "
1323 case CURLE_ABORTED_BY_CALLBACK:
1324 case CURLE_WRITE_ERROR:
1327 case CURLE_SEND_ERROR:
1336 "trying to fetch %s", curl_error, info->
url->c_str());
1341 std::vector<std::string> *host_chain = opt_host_chain_;
1344 bool try_again =
false;
1345 bool same_url_retry = CanRetry(info);
1354 "data corruption with no-cache header, try another host");
1359 if ( same_url_retry || (
1369 if ( same_url_retry || (
1384 if (opt_proxy_groups_) {
1385 if ((opt_proxy_groups_current_ > 0) ||
1386 (opt_proxy_groups_current_burned_ > 0))
1388 opt_proxy_groups_current_ = 0;
1389 opt_timestamp_backup_proxies_ = 0;
1390 RebalanceProxiesUnlocked(
"reset proxies for host failover");
1407 "same url: %d, error code %d", same_url_retry, info->
error_code);
1417 goto verify_and_finalize_stop;
1428 goto verify_and_finalize_stop;
1434 goto verify_and_finalize_stop;
1441 SetRegularCache(info);
1444 bool switch_proxy =
false;
1445 bool switch_host =
false;
1452 switch_proxy =
true;
1461 if (same_url_retry) {
1464 switch_proxy =
true;
1467 if (same_url_retry) {
1478 ReleaseCredential(info);
1481 SetUrlOptions(info);
1484 ReleaseCredential(info);
1487 SetUrlOptions(info);
1493 verify_and_finalize_stop:
1495 ReleaseCredential(info);
1510 header_lists_->PutList(info->
headers);
1518 DownloadManager::DownloadManager() {
1519 pool_handles_idle_ = NULL;
1520 pool_handles_inuse_ = NULL;
1521 pool_max_handles_ = 0;
1523 default_headers_ = NULL;
1525 atomic_init32(&multi_threaded_);
1526 pipe_terminate_[0] = pipe_terminate_[1] = -1;
1528 pipe_jobs_[0] = pipe_jobs_[1] = -1;
1530 watch_fds_size_ = 0;
1531 watch_fds_inuse_ = 0;
1535 reinterpret_cast<pthread_mutex_t *
>(smalloc(
sizeof(pthread_mutex_t)));
1536 int retval = pthread_mutex_init(lock_options_, NULL);
1538 lock_synchronous_mode_ =
1539 reinterpret_cast<pthread_mutex_t *
>(smalloc(
sizeof(pthread_mutex_t)));
1540 retval = pthread_mutex_init(lock_synchronous_mode_, NULL);
1543 opt_dns_server_ =
"";
1545 opt_timeout_proxy_ = 0;
1546 opt_timeout_direct_ = 0;
1547 opt_low_speed_limit_ = 0;
1548 opt_host_chain_ = NULL;
1549 opt_host_chain_rtt_ = NULL;
1550 opt_host_chain_current_ = 0;
1551 opt_proxy_groups_ = NULL;
1552 opt_proxy_groups_current_ = 0;
1553 opt_proxy_groups_current_burned_ = 0;
1554 opt_num_proxies_ = 0;
1555 opt_proxy_shard_ =
false;
1556 opt_max_retries_ = 0;
1557 opt_backoff_init_ms_ = 0;
1558 opt_backoff_max_ms_ = 0;
1559 enable_info_header_ =
false;
1560 opt_ipv4_only_ =
false;
1561 follow_redirects_ =
false;
1565 opt_timestamp_backup_proxies_ = 0;
1566 opt_timestamp_failover_proxies_ = 0;
1567 opt_proxy_groups_reset_after_ = 0;
1568 opt_timestamp_backup_host_ = 0;
1569 opt_host_reset_after_ = 0;
1571 credentials_attachment_ = NULL;
1577 DownloadManager::~DownloadManager() {
1578 pthread_mutex_destroy(lock_options_);
1579 pthread_mutex_destroy(lock_synchronous_mode_);
1580 free(lock_options_);
1581 free(lock_synchronous_mode_);
1584 void DownloadManager::InitHeaders() {
1586 string cernvm_id =
"User-Agent: cvmfs ";
1587 #ifdef CVMFS_LIBCVMFS
1588 cernvm_id +=
"libcvmfs ";
1590 cernvm_id +=
"Fuse ";
1592 cernvm_id += string(VERSION);
1593 if (getenv(
"CERNVM_UUID") != NULL) {
1597 user_agent_ = strdup(cernvm_id.c_str());
1601 default_headers_ = header_lists_->GetList(
"Connection: Keep-Alive");
1602 header_lists_->AppendHeader(default_headers_,
"Pragma:");
1603 header_lists_->AppendHeader(default_headers_, user_agent_);
1607 void DownloadManager::FiniHeaders() {
1608 delete header_lists_;
1609 header_lists_ = NULL;
1610 default_headers_ = NULL;
1617 atomic_init32(&multi_threaded_);
1618 int retval = curl_global_init(CURL_GLOBAL_ALL);
1619 assert(retval == CURLE_OK);
1620 pool_handles_idle_ =
new set<CURL *>;
1621 pool_handles_inuse_ =
new set<CURL *>;
1622 pool_max_handles_ = max_pool_handles;
1623 watch_fds_max_ = 4*pool_max_handles_;
1625 opt_timeout_proxy_ = 5;
1626 opt_timeout_direct_ = 10;
1627 opt_low_speed_limit_ = 1024;
1628 opt_proxy_groups_current_ = 0;
1629 opt_proxy_groups_current_burned_ = 0;
1630 opt_num_proxies_ = 0;
1631 opt_proxy_shard_ =
false;
1632 opt_host_chain_current_ = 0;
1635 counters_ =
new Counters(statistics);
1640 curl_multi_ = curl_multi_init();
1641 assert(curl_multi_ != NULL);
1642 curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION, CallbackCurlSocket);
1643 curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1644 static_cast<void *>(
this));
1645 curl_multi_setopt(curl_multi_, CURLMOPT_MAXCONNECTS, watch_fds_max_);
1646 curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1649 prng_.InitLocaltime();
1652 if ((getenv(
"CVMFS_IPV4_ONLY") != NULL) &&
1653 (strlen(getenv(
"CVMFS_IPV4_ONLY")) > 0))
1655 opt_ipv4_only_ =
true;
1658 kDnsDefaultRetries, kDnsDefaultTimeoutMs);
1664 if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1668 pthread_join(thread_download_, NULL);
1670 close(pipe_terminate_[1]);
1671 close(pipe_terminate_[0]);
1672 close(pipe_jobs_[1]);
1673 close(pipe_jobs_[0]);
1676 for (set<CURL *>::iterator i = pool_handles_idle_->begin(),
1677 iEnd = pool_handles_idle_->end(); i != iEnd; ++i)
1679 curl_easy_cleanup(*i);
1681 delete pool_handles_idle_;
1682 delete pool_handles_inuse_;
1683 curl_multi_cleanup(curl_multi_);
1684 pool_handles_idle_ = NULL;
1685 pool_handles_inuse_ = NULL;
1696 delete opt_host_chain_;
1697 delete opt_host_chain_rtt_;
1698 opt_proxy_map_.clear();
1699 delete opt_proxy_groups_;
1700 opt_host_chain_ = NULL;
1701 opt_host_chain_rtt_ = NULL;
1702 opt_proxy_groups_ = NULL;
1704 curl_global_cleanup();
1719 int retval = pthread_create(&thread_download_, NULL, MainDownload,
1720 static_cast<void *>(
this));
1723 atomic_inc32(&multi_threaded_);
1748 if (enable_info_header_ && info->
extra_info) {
1749 const char *header_name =
"cvmfs-info: ";
1750 const size_t header_name_len = strlen(header_name);
1751 const unsigned header_size = 1 + header_name_len +
1753 info->
info_header =
static_cast<char *
>(alloca(header_size));
1754 memcpy(info->
info_header, header_name, header_name_len);
1756 header_size - header_name_len);
1760 if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1768 WritePipe(pipe_jobs_[1], &info,
sizeof(info));
1773 CURL *handle = AcquireCurlHandle();
1774 InitializeRequest(info, handle);
1775 SetUrlOptions(info);
1779 retval = curl_easy_perform(handle);
1782 if (curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &elapsed) == CURLE_OK)
1785 static_cast<int64_t>(elapsed * 1000));
1787 }
while (VerifyAndFinalize(retval, info));
1816 credentials_attachment_ = ca;
1822 std::string DownloadManager::GetDnsServer()
const {
1823 return opt_dns_server_;
1830 void DownloadManager::SetDnsServer(
const string &address) {
1831 if (!address.empty()) {
1833 opt_dns_server_ = address;
1834 assert(!opt_dns_server_.empty());
1836 vector<string> servers;
1837 servers.push_back(address);
1838 bool retval = resolver_->SetResolvers(servers);
1848 void DownloadManager::SetDnsParameters(
1850 const unsigned timeout_ms)
1853 if ((resolver_->retries() ==
retries) &&
1854 (resolver_->timeout_ms() == timeout_ms))
1866 void DownloadManager::SetDnsTtlLimits(
1867 const unsigned min_seconds,
1868 const unsigned max_seconds)
1871 resolver_->set_min_ttl(min_seconds);
1872 resolver_->set_max_ttl(max_seconds);
1878 opt_ip_preference_ = preference;
1887 void DownloadManager::SetTimeout(
const unsigned seconds_proxy,
1888 const unsigned seconds_direct)
1891 opt_timeout_proxy_ = seconds_proxy;
1892 opt_timeout_direct_ = seconds_direct;
1901 void DownloadManager::SetLowSpeedLimit(
const unsigned low_speed_limit) {
1903 opt_low_speed_limit_ = low_speed_limit;
1910 void DownloadManager::GetTimeout(
unsigned *seconds_proxy,
1911 unsigned *seconds_direct)
1914 *seconds_proxy = opt_timeout_proxy_;
1915 *seconds_direct = opt_timeout_direct_;
1923 void DownloadManager::SetHostChain(
const string &host_list) {
1928 void DownloadManager::SetHostChain(
const std::vector<std::string> &host_list) {
1930 opt_timestamp_backup_host_ = 0;
1931 delete opt_host_chain_;
1932 delete opt_host_chain_rtt_;
1933 opt_host_chain_current_ = 0;
1935 if (host_list.empty()) {
1936 opt_host_chain_ = NULL;
1937 opt_host_chain_rtt_ = NULL;
1941 opt_host_chain_ =
new vector<string>(host_list);
1942 opt_host_chain_rtt_ =
1943 new vector<int>(opt_host_chain_->size(), kProbeUnprobed);
1954 void DownloadManager::GetHostInfo(vector<string> *host_chain, vector<int> *rtt,
1955 unsigned *current_host)
1958 if (opt_host_chain_) {
1959 if (current_host) {*current_host = opt_host_chain_current_;}
1960 if (host_chain) {*host_chain = *opt_host_chain_;}
1961 if (rtt) {*rtt = *opt_host_chain_rtt_;}
1974 void DownloadManager::SwitchProxy(
JobInfo *info) {
1977 if (!opt_proxy_groups_) {
1982 vector<ProxyInfo> *group = current_proxy_group();
1983 const unsigned group_size = group->size();
1984 unsigned failed = 0;
1985 for (
unsigned i = 0; i < group_size - opt_proxy_groups_current_burned_; ++i) {
1986 if (info && (info->
proxy == (*group)[i].url)) {
1988 opt_proxy_groups_current_burned_++;
1990 (*group)[group_size - opt_proxy_groups_current_burned_]);
2002 if (opt_proxy_groups_current_burned_ == group->size()) {
2003 opt_proxy_groups_current_burned_ = 0;
2004 if (opt_proxy_groups_->size() > 1) {
2005 opt_proxy_groups_current_ = (opt_proxy_groups_current_ + 1) %
2006 opt_proxy_groups_->size();
2008 if (opt_proxy_groups_reset_after_ > 0) {
2009 if (opt_proxy_groups_current_ > 0) {
2010 if (opt_timestamp_backup_proxies_ == 0)
2011 opt_timestamp_backup_proxies_ = time(NULL);
2015 opt_timestamp_backup_proxies_ = 0;
2019 opt_timestamp_failover_proxies_ = 0;
2024 if (opt_proxy_groups_reset_after_ > 0) {
2025 if (opt_timestamp_failover_proxies_ == 0)
2026 opt_timestamp_failover_proxies_ = time(NULL);
2030 UpdateProxiesUnlocked(
"failed proxy");
2032 current_proxy_group()->
size() - opt_proxy_groups_current_burned_);
2044 if (!opt_host_chain_ || (opt_host_chain_->size() == 1)) {
2050 "don't switch host, "
2051 "last used host: %s, current host: %s",
2053 (*opt_host_chain_)[opt_host_chain_current_].c_str());
2057 string reason =
"manually triggered";
2062 string old_host = (*opt_host_chain_)[opt_host_chain_current_];
2063 opt_host_chain_current_ =
2064 (opt_host_chain_current_ + 1) % opt_host_chain_->size();
2067 "switching host from %s to %s (%s)", old_host.c_str(),
2068 (*opt_host_chain_)[opt_host_chain_current_].c_str(),
2072 if (opt_host_reset_after_ > 0) {
2073 if (opt_host_chain_current_ != 0) {
2074 if (opt_timestamp_backup_host_ == 0)
2075 opt_timestamp_backup_host_ = time(NULL);
2077 opt_timestamp_backup_host_ = 0;
2082 void DownloadManager::SwitchHost() {
2093 void DownloadManager::ProbeHosts() {
2094 vector<string> host_chain;
2095 vector<int> host_rtt;
2096 unsigned current_host;
2098 GetHostInfo(&host_chain, &host_rtt, ¤t_host);
2103 JobInfo info(&url,
false,
false, NULL);
2104 for (retries = 0; retries < 2; ++
retries) {
2105 for (i = 0; i < host_chain.size(); ++i) {
2106 url = host_chain[i] +
"/.cvmfspublished";
2108 struct timeval tv_start, tv_end;
2109 gettimeofday(&tv_start, NULL);
2111 gettimeofday(&tv_end, NULL);
2115 host_rtt[i] =
static_cast<int>(
2118 url.c_str(), host_rtt[i]);
2122 host_rtt[i] = INT_MAX;
2128 for (i = 0; i < host_chain.size(); ++i) {
2129 if (host_rtt[i] == INT_MAX) host_rtt[i] = kProbeDown;
2133 delete opt_host_chain_;
2134 delete opt_host_chain_rtt_;
2135 opt_host_chain_ =
new vector<string>(host_chain);
2136 opt_host_chain_rtt_ =
new vector<int>(host_rtt);
2137 opt_host_chain_current_ = 0;
2140 bool DownloadManager::GeoSortServers(std::vector<std::string> *servers,
2141 std::vector<uint64_t> *output_order) {
2142 if (!servers) {
return false;}
2143 if (servers->size() == 1) {
2145 output_order->clear();
2146 output_order->push_back(0);
2151 std::vector<std::string> host_chain;
2152 GetHostInfo(&host_chain, NULL, NULL);
2154 std::vector<std::string> server_dns_names;
2155 server_dns_names.reserve(servers->size());
2156 for (
unsigned i = 0; i < servers->size(); ++i) {
2158 server_dns_names.push_back(host.empty() ? (*servers)[i] : host);
2160 std::string host_list =
JoinStrings(server_dns_names,
",");
2162 vector<string> host_chain_shuffled;
2167 host_chain_shuffled =
Shuffle(host_chain, &prng_);
2170 bool success =
false;
2171 unsigned max_attempts = std::min(host_chain_shuffled.size(), size_t(3));
2172 vector<uint64_t> geo_order(servers->size());
2173 for (
unsigned i = 0; i < max_attempts; ++i) {
2174 string url = host_chain_shuffled[i] +
"/api/v1.0/geo/@proxy@/" + host_list;
2176 "requesting ordered server list from %s", url.c_str());
2177 JobInfo info(&url,
false,
false, NULL);
2180 string order(info.destination_mem.data, info.destination_mem.size);
2181 free(info.destination_mem.data);
2182 bool retval = ValidateGeoReply(order, servers->size(), &geo_order);
2185 "retrieved invalid GeoAPI reply from %s [%s]",
2186 url.c_str(), order.c_str());
2189 "geographic order of servers retrieved from %s",
2197 "GeoAPI request %s failed with error %d [%s]",
2203 "failed to retrieve geographic order from stratum 1 servers");
2208 output_order->swap(geo_order);
2210 std::vector<std::string> sorted_servers;
2211 sorted_servers.reserve(geo_order.size());
2212 for (
unsigned i = 0; i < geo_order.size(); ++i) {
2213 uint64_t orderval = geo_order[i];
2214 sorted_servers.push_back((*servers)[orderval]);
2216 servers->swap(sorted_servers);
2229 bool DownloadManager::ProbeGeo() {
2230 vector<string> host_chain;
2231 vector<int> host_rtt;
2232 unsigned current_host;
2233 vector< vector<ProxyInfo> > proxy_chain;
2234 unsigned fallback_group;
2236 GetHostInfo(&host_chain, &host_rtt, ¤t_host);
2237 GetProxyInfo(&proxy_chain, NULL, &fallback_group);
2238 if ((host_chain.size() < 2) && ((proxy_chain.size() - fallback_group) < 2))
2241 vector<string> host_names;
2242 for (
unsigned i = 0; i < host_chain.size(); ++i)
2244 SortTeam(&host_names, &host_chain);
2245 unsigned last_geo_host = host_names.size();
2247 if ((fallback_group == 0) && (last_geo_host > 1)) {
2253 host_names.push_back(
"+PXYSEP+");
2257 unsigned first_geo_fallback = host_names.size();
2258 for (
unsigned i = fallback_group; i < proxy_chain.size(); ++i) {
2261 host_names.push_back(proxy_chain[i][0].host.name());
2264 std::vector<uint64_t> geo_order;
2265 bool success = GeoSortServers(&host_names, &geo_order);
2273 delete opt_host_chain_;
2274 opt_num_proxies_ = 0;
2275 opt_host_chain_ =
new vector<string>(host_chain.size());
2279 vector<vector<ProxyInfo> > *proxy_groups =
new vector<vector<ProxyInfo> >(
2280 opt_proxy_groups_fallback_ + proxy_chain.size() - fallback_group);
2282 for (
unsigned i = 0; i < opt_proxy_groups_fallback_; ++i) {
2283 (*proxy_groups)[i] = (*opt_proxy_groups_)[i];
2284 opt_num_proxies_ += (*opt_proxy_groups_)[i].size();
2292 unsigned proxyi = opt_proxy_groups_fallback_;
2293 for (
unsigned i = 0; i < geo_order.size(); ++i) {
2294 uint64_t orderval = geo_order[i];
2295 if (orderval < static_cast<uint64_t>(last_geo_host)) {
2298 (*opt_host_chain_)[hosti++] = host_chain[orderval];
2299 }
else if (orderval >= static_cast<uint64_t>(first_geo_fallback)) {
2303 (*proxy_groups)[proxyi] =
2304 proxy_chain[fallback_group + orderval - first_geo_fallback];
2305 opt_num_proxies_ += (*proxy_groups)[proxyi].size();
2310 opt_proxy_map_.clear();
2311 delete opt_proxy_groups_;
2312 opt_proxy_groups_ = proxy_groups;
2315 if (opt_proxy_groups_current_ > opt_proxy_groups_->size()) {
2316 if (opt_proxy_groups_->size() == 0) {
2317 opt_proxy_groups_current_ = 0;
2319 opt_proxy_groups_current_ = opt_proxy_groups_->size() - 1;
2321 opt_proxy_groups_current_burned_ = 0;
2324 UpdateProxiesUnlocked(
"geosort");
2326 delete opt_host_chain_rtt_;
2327 opt_host_chain_rtt_ =
new vector<int>(host_chain.size(), kProbeGeo);
2328 opt_host_chain_current_ = 0;
2341 bool DownloadManager::ValidateGeoReply(
2342 const string &reply_order,
2343 const unsigned expected_size,
2344 vector<uint64_t> *reply_vals)
2346 if (reply_order.empty())
2349 if (!sanitizer.
IsValid(reply_order))
2352 vector<string> reply_strings =
2354 vector<uint64_t> tmp_vals;
2355 for (
unsigned i = 0; i < reply_strings.size(); ++i) {
2356 if (reply_strings[i].empty())
2360 if (tmp_vals.size() != expected_size)
2364 set<uint64_t> coverage(tmp_vals.begin(), tmp_vals.end());
2365 if (coverage.size() != tmp_vals.size())
2367 if ((*coverage.begin() != 1) || (*coverage.rbegin() != coverage.size()))
2370 for (
unsigned i = 0; i < expected_size; ++i) {
2371 (*reply_vals)[i] = tmp_vals[i] - 1;
2381 bool DownloadManager::StripDirect(
2382 const string &proxy_list,
2383 string *cleaned_list)
2386 if (proxy_list ==
"") {
2390 bool result =
false;
2392 vector<string> proxy_groups =
SplitString(proxy_list,
';');
2393 vector<string> cleaned_groups;
2394 for (
unsigned i = 0; i < proxy_groups.size(); ++i) {
2395 vector<string> group =
SplitString(proxy_groups[i],
'|');
2396 vector<string> cleaned;
2397 for (
unsigned j = 0; j < group.size(); ++j) {
2398 if ((group[j] ==
"DIRECT") || (group[j] ==
"")) {
2401 cleaned.push_back(group[j]);
2404 if (!cleaned.empty())
2405 cleaned_groups.push_back(
JoinStrings(cleaned,
"|"));
2421 void DownloadManager::SetProxyChain(
2422 const string &proxy_list,
2423 const string &fallback_proxy_list,
2428 opt_timestamp_backup_proxies_ = 0;
2429 opt_timestamp_failover_proxies_ = 0;
2430 string set_proxy_list = opt_proxy_list_;
2431 string set_proxy_fallback_list = opt_proxy_fallback_list_;
2432 bool contains_direct;
2433 if ((set_mode == kSetProxyFallback) || (set_mode == kSetProxyBoth)) {
2434 opt_proxy_fallback_list_ = fallback_proxy_list;
2436 if ((set_mode == kSetProxyRegular) || (set_mode == kSetProxyBoth)) {
2437 opt_proxy_list_ = proxy_list;
2440 StripDirect(opt_proxy_fallback_list_, &set_proxy_fallback_list);
2441 if (contains_direct) {
2443 "fallback proxies do not support DIRECT, removing");
2445 if (set_proxy_fallback_list ==
"") {
2446 set_proxy_list = opt_proxy_list_;
2448 bool contains_direct = StripDirect(opt_proxy_list_, &set_proxy_list);
2449 if (contains_direct) {
2451 "skipping DIRECT proxy to use fallback proxy");
2458 opt_proxy_map_.clear();
2459 delete opt_proxy_groups_;
2460 if ((set_proxy_list ==
"") && (set_proxy_fallback_list ==
"")) {
2461 opt_proxy_groups_ = NULL;
2462 opt_proxy_groups_current_ = 0;
2463 opt_proxy_groups_current_burned_ = 0;
2464 opt_proxy_groups_fallback_ = 0;
2465 opt_num_proxies_ = 0;
2470 opt_proxy_groups_fallback_ = 0;
2471 if (set_proxy_list !=
"") {
2472 opt_proxy_groups_fallback_ =
SplitString(set_proxy_list,
';').size();
2475 opt_proxy_groups_fallback_);
2479 string all_proxy_list = set_proxy_list;
2480 if (set_proxy_fallback_list !=
"") {
2481 if (all_proxy_list !=
"")
2482 all_proxy_list +=
";";
2483 all_proxy_list += set_proxy_fallback_list;
2486 all_proxy_list.c_str());
2489 vector<string> hostnames;
2490 vector<string> proxy_groups;
2491 if (all_proxy_list !=
"")
2493 for (
unsigned i = 0; i < proxy_groups.size(); ++i) {
2494 vector<string> this_group =
SplitString(proxy_groups[i],
'|');
2495 for (
unsigned j = 0; j < this_group.size(); ++j) {
2501 hostnames.push_back(hostname);
2504 vector<dns::Host> hosts;
2507 resolver_->ResolveMany(hostnames, &hosts);
2511 opt_proxy_groups_ =
new vector< vector<ProxyInfo> >();
2512 opt_num_proxies_ = 0;
2513 unsigned num_proxy = 0;
2514 for (
unsigned i = 0; i < proxy_groups.size(); ++i) {
2515 vector<string> this_group =
SplitString(proxy_groups[i],
'|');
2519 vector<ProxyInfo> infos;
2520 for (
unsigned j = 0; j < this_group.size(); ++j, ++num_proxy) {
2522 if (this_group[j] ==
"DIRECT") {
2529 "failed to resolve IP addresses for %s (%d - %s)",
2530 hosts[num_proxy].name().c_str(), hosts[num_proxy].status(),
2534 infos.push_back(
ProxyInfo(failed_host, this_group[j]));
2539 set<string> best_addresses =
2541 set<string>::const_iterator iter_ips = best_addresses.begin();
2542 for (; iter_ips != best_addresses.end(); ++iter_ips) {
2544 infos.push_back(
ProxyInfo(hosts[num_proxy], url_ip));
2547 opt_proxy_groups_->push_back(infos);
2548 opt_num_proxies_ += infos.size();
2551 "installed %u proxies in %u load-balance groups",
2552 opt_num_proxies_, opt_proxy_groups_->size());
2553 opt_proxy_groups_current_ = 0;
2554 opt_proxy_groups_current_burned_ = 0;
2557 if (opt_proxy_groups_->size() > 0) {
2559 UpdateProxiesUnlocked(
"set proxies");
2570 void DownloadManager::GetProxyInfo(vector< vector<ProxyInfo> > *proxy_chain,
2571 unsigned *current_group,
2572 unsigned *fallback_group)
2574 assert(proxy_chain != NULL);
2578 if (!opt_proxy_groups_) {
2579 vector< vector<ProxyInfo> > empty_chain;
2580 *proxy_chain = empty_chain;
2581 if (current_group != NULL)
2583 if (fallback_group != NULL)
2584 *fallback_group = 0;
2588 *proxy_chain = *opt_proxy_groups_;
2589 if (current_group != NULL)
2590 *current_group = opt_proxy_groups_current_;
2591 if (fallback_group != NULL)
2592 *fallback_group = opt_proxy_groups_fallback_;
2595 string DownloadManager::GetProxyList() {
2596 return opt_proxy_list_;
2599 string DownloadManager::GetFallbackProxyList() {
2600 return opt_proxy_fallback_list_;
2608 if (!opt_proxy_groups_)
2611 uint32_t key = (hash ? hash->
Partial32() : 0);
2612 map<uint32_t, ProxyInfo *>::iterator it = opt_proxy_map_.lower_bound(key);
2621 void DownloadManager::UpdateProxiesUnlocked(
const string &reason) {
2622 if (!opt_proxy_groups_)
2626 vector<ProxyInfo> *group = current_proxy_group();
2627 unsigned num_alive = (group->size() - opt_proxy_groups_current_burned_);
2628 string old_proxy =
JoinStrings(opt_proxy_urls_,
"|");
2631 opt_proxy_map_.clear();
2632 opt_proxy_urls_.clear();
2633 const uint32_t max_key = 0xffffffffUL;
2634 if (opt_proxy_shard_) {
2636 for (
unsigned i = 0; i < num_alive; ++i) {
2642 for (
unsigned j = 0; j < kProxyMapScale; ++j) {
2643 const std::pair<uint32_t, ProxyInfo *> entry(prng.
Next(max_key), proxy);
2644 opt_proxy_map_.insert(entry);
2646 opt_proxy_urls_.push_back(proxy->
url);
2649 ProxyInfo *first_proxy = opt_proxy_map_.begin()->second;
2650 const std::pair<uint32_t, ProxyInfo *> last_entry(max_key, first_proxy);
2651 opt_proxy_map_.insert(last_entry);
2654 unsigned select = prng_.Next(num_alive);
2656 const std::pair<uint32_t, ProxyInfo *> entry(max_key, proxy);
2657 opt_proxy_map_.insert(entry);
2658 opt_proxy_urls_.push_back(proxy->
url);
2660 sort(opt_proxy_urls_.begin(), opt_proxy_urls_.end());
2663 string new_proxy =
JoinStrings(opt_proxy_urls_,
"|");
2664 if (new_proxy != old_proxy) {
2666 "switching proxy from %s to %s (%s)",
2667 (old_proxy.empty() ?
"(none)" : old_proxy.c_str()),
2668 (new_proxy.empty() ?
"(none)" : new_proxy.c_str()),
2676 void DownloadManager::ShardProxies() {
2677 opt_proxy_shard_ =
true;
2678 RebalanceProxiesUnlocked(
"enable sharding");
2685 void DownloadManager::RebalanceProxiesUnlocked(
const string &reason) {
2686 if (!opt_proxy_groups_)
2689 opt_timestamp_failover_proxies_ = 0;
2690 opt_proxy_groups_current_burned_ = 0;
2691 UpdateProxiesUnlocked(reason);
2695 void DownloadManager::RebalanceProxies() {
2697 RebalanceProxiesUnlocked(
"rebalance");
2704 void DownloadManager::SwitchProxyGroup() {
2707 if (!opt_proxy_groups_ || (opt_proxy_groups_->size() < 2)) {
2711 opt_proxy_groups_current_ = (opt_proxy_groups_current_ + 1) %
2712 opt_proxy_groups_->size();
2713 opt_timestamp_backup_proxies_ = time(NULL);
2714 RebalanceProxiesUnlocked(
"switch proxy group");
2718 void DownloadManager::SetProxyGroupResetDelay(
const unsigned seconds) {
2720 opt_proxy_groups_reset_after_ = seconds;
2721 if (opt_proxy_groups_reset_after_ == 0) {
2722 opt_timestamp_backup_proxies_ = 0;
2723 opt_timestamp_failover_proxies_ = 0;
2728 void DownloadManager::SetHostResetDelay(
const unsigned seconds)
2731 opt_host_reset_after_ = seconds;
2732 if (opt_host_reset_after_ == 0)
2733 opt_timestamp_backup_host_ = 0;
2737 void DownloadManager::SetRetryParameters(
const unsigned max_retries,
2738 const unsigned backoff_init_ms,
2739 const unsigned backoff_max_ms)
2742 opt_max_retries_ = max_retries;
2743 opt_backoff_init_ms_ = backoff_init_ms;
2744 opt_backoff_max_ms_ = backoff_max_ms;
2748 void DownloadManager::SetMaxIpaddrPerProxy(
unsigned limit) {
2750 resolver_->set_throttle(limit);
2754 void DownloadManager::SetProxyTemplates(
2755 const std::string &direct,
2756 const std::string &forced)
2759 proxy_template_direct_ = direct;
2760 proxy_template_forced_ = forced;
2764 void DownloadManager::EnableInfoHeader() {
2765 enable_info_header_ =
true;
2769 void DownloadManager::EnableRedirects() {
2770 follow_redirects_ =
true;
2773 void DownloadManager::UseSystemCertificatePath() {
2774 ssl_certificate_store_.UseSystemCertificatePath();
2785 clone->
Init(pool_max_handles_, statistics);
2791 if (!opt_dns_server_.empty())
2801 if (opt_host_chain_) {
2805 CloneProxyConfig(clone);
2826 if (opt_proxy_groups_ == NULL)
2830 *opt_proxy_groups_);
unsigned opt_timeout_direct_
unsigned opt_low_speed_limit_
void HashString(const std::string &content, Any *any_digest)
#define LogCvmfs(source, mask,...)
std::vector< T > Shuffle(const std::vector< T > &input, Prng *prng)
const char * Code2Ascii(const ObjectFetcherFailures::Failures error)
unsigned opt_backoff_init_ms_
static unsigned EscapeHeader(const string &header, char *escaped_buf, size_t buf_size)
unsigned char num_used_hosts
static bool EscapeUrlChar(char input, char output[3])
int64_t Xadd(class Counter *counter, const int64_t delta)
const char * Code2Ascii(const Failures error)
unsigned opt_proxy_groups_current_burned_
double DiffTimeSeconds(struct timeval start, struct timeval end)
unsigned opt_proxy_groups_reset_after_
virtual bool IsCanceled()
void SetUrlOptions(JobInfo *info)
StreamStates DecompressZStream2Sink(const void *buf, const int64_t size, z_stream *strm, cvmfs::Sink *sink)
std::string opt_proxy_fallback_list_
static NormalResolver * Create(const bool ipv4_only, const unsigned retries, const unsigned timeout_ms)
unsigned opt_host_reset_after_
std::string proxy_template_direct_
string ReplaceAll(const string &haystack, const string &needle, const string &replace_by)
string JoinStrings(const vector< string > &strings, const string &joint)
std::string ToString(const bool with_suffix=false) const
unsigned opt_proxy_groups_current_
shash::ContextPtr hash_context
void DecompressInit(z_stream *strm)
unsigned int current_host_chain_index
bool DecompressMem2Mem(const void *buf, const int64_t size, void **out_buf, uint64_t *out_size)
std::set< CURL * > * pool_handles_inuse_
StreamStates DecompressZStream2File(const void *buf, const int64_t size, z_stream *strm, FILE *f)
std::string opt_proxy_list_
const std::string & name() const
perf::Counter * sz_transfer_time
std::vector< std::vector< ProxyInfo > > * opt_proxy_groups_
assert((mem||(size==0))&&"Out Of Memory")
unsigned opt_proxy_groups_fallback_
void ReleaseCurlHandle(CURL *handle)
const std::set< std::string > & ViewBestAddresses(IpPreference preference) const
void SetDnsServer(const std::string &address)
void DecompressFini(z_stream *strm)
void InitSeed(const uint64_t seed)
void MakePipe(int pipe_fd[2])
uint32_t watch_fds_inuse_
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, void *info_link)
void Init(ContextPtr context)
void SetDnsTtlLimits(const unsigned min_seconds, const unsigned max_seconds)
static Failures PrepareDownloadDestination(JobInfo *info)
void Init(const unsigned max_pool_handles, const perf::StatisticsTemplate &statistics)
const char * Code2Ascii(const Failures error)
unsigned char num_retries
std::string AddDefaultScheme(const std::string &proxy)
vector< string > SplitString(const string &str, char delim)
static string EscapeUrl(const string &url)
dns::IpPreference opt_ip_preference_
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, void *info_link)
Failures Fetch(const std::string &base_url, const std::string &repository_name, const uint64_t minimum_timestamp, const shash::Any *base_catalog, signature::SignatureManager *signature_manager, download::DownloadManager *download_manager, ManifestEnsemble *ensemble)
void UpdateProxiesUnlocked(const std::string &reason)
bool IsEquivalent(const Host &other) const
bool IsProxyTransferError(const Failures error)
perf::Counter * n_requests
static int Init(const loader::LoaderExports *loader_exports)
void Final(ContextPtr context, Any *any_digest)
string StringifyInt(const int64_t value)
void SetMaxIpaddrPerProxy(unsigned limit)
const shash::Any * expected_hash
CURL * AcquireCurlHandle()
void Inc(class Counter *counter)
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
const std::string * extra_info
std::string ExtractHost(const std::string &url)
std::vector< int > * opt_host_chain_rtt_
cvmfs::Sink * destination_sink
SslCertificateStore ssl_certificate_store_
uint32_t Partial32() const
unsigned opt_backoff_max_ms_
unsigned GetContextSize(const Algorithms algorithm)
unsigned opt_num_proxies_
CredentialsAttachment * credentials_attachment_
std::vector< std::string > * opt_host_chain_
struct pollfd * watch_fds_
void Update(const unsigned char *buffer, const unsigned buffer_length, ContextPtr context)
uint64_t String2Uint64(const string &value)
InterruptCue * interrupt_cue
unsigned opt_max_retries_
struct download::JobInfo::@3 destination_mem
std::string proxy_template_forced_
const std::string * destination_path
void SetDnsParameters(const unsigned retries, const unsigned timeout_ms)
static Host ExtendDeadline(const Host &original, unsigned seconds_from_now)
void SafeSleepMs(const unsigned ms)
bool IsHostTransferError(const Failures error)
void SortTeam(std::vector< T > *tractor, std::vector< U > *towed)
unsigned char num_used_proxies
void WritePipe(int fd, const void *buf, size_t nbyte)
unsigned opt_timeout_proxy_
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
void ReadPipe(int fd, void *buf, size_t nbyte)
void InitializeRequest(JobInfo *info, CURL *handle)
string RewriteUrl(const string &url, const string &ip)
uint32_t Next(const uint64_t boundary)
virtual int64_t Write(const void *buf, uint64_t sz)=0