29 #define __STDC_FORMAT_MACROS
31 #include "cvmfs_config.h"
74 if (((input >=
'0') && (input <=
'9')) ||
75 ((input >=
'A') && (input <=
'Z')) ||
76 ((input >=
'a') && (input <=
'z')) ||
77 (input ==
'/') || (input ==
':') || (input ==
'.') ||
79 (input ==
'+') || (input ==
'-') ||
80 (input ==
'_') || (input ==
'~') ||
81 (input ==
'[') || (input ==
']') || (input ==
','))
88 output[1] =
static_cast<char>(
89 (input / 16) + ((input / 16 <= 9) ?
'0' :
'A'-10));
90 output[2] =
static_cast<char>(
91 (input % 16) + ((input % 16 <= 9) ?
'0' :
'A'-10));
102 escaped.reserve(url.length());
104 char escaped_char[3];
105 for (
unsigned i = 0, s = url.length(); i < s; ++i) {
107 escaped.append(escaped_char, 3);
109 escaped.push_back(escaped_char[0]);
113 url.c_str(), escaped.c_str());
127 unsigned esc_pos = 0;
128 char escaped_char[3];
129 for (
unsigned i = 0, s = header.size(); i < s; ++i) {
131 for (
unsigned j = 0; j < 3; ++j) {
133 if (esc_pos >= buf_size)
135 escaped_buf[esc_pos] = escaped_char[j];
141 if (esc_pos >= buf_size)
143 escaped_buf[esc_pos] = escaped_char[0];
185 const size_t num_bytes = size*nmemb;
186 const string header_line(static_cast<const char *>(ptr), num_bytes);
193 if (
HasPrefix(header_line,
"HTTP/1.",
false)) {
194 if (header_line.length() < 10)
198 for (i = 8; (i < header_line.length()) && (header_line[i] ==
' '); ++i) {}
201 if (header_line.length() > i+2) {
202 info->
http_code = DownloadManager::ParseHttpCode(&header_line[i]);
214 header_line.c_str());
219 header_line.c_str());
245 HasPrefix(header_line,
"CONTENT-LENGTH:",
true))
247 char *tmp =
reinterpret_cast<char *
>(alloca(num_bytes+1));
249 sscanf(header_line.c_str(),
"%s %" PRIu64, tmp, &length);
251 if (length > DownloadManager::kMaxMemSize) {
253 "resource %s too large to store in memory (%" PRIu64
")",
254 info->
url->c_str(), length);
264 }
else if (
HasPrefix(header_line,
"LOCATION:",
true)) {
267 }
else if (
HasPrefix(header_line,
"X-SQUID-ERROR:",
true)) {
272 }
else if (
HasPrefix(header_line,
"PROXY-STATUS:",
true)) {
275 (header_line.find(
"error=") != string::npos)) {
290 const size_t num_bytes = size*nmemb;
315 "decompressing %s, local IO error", info->
url->c_str());
321 if ((written < 0) || (static_cast<uint64_t>(written) != num_bytes)) {
323 PRId64
")", info->
url->c_str(), written);
333 "Content-Length was missing or zero, but %zu bytes received",
337 "start %zu, bytes %zu, expected %zu",
363 "decompressing %s, local IO error", info->
url->c_str());
370 "downloading %s, IO failure: %s (errno=%d)",
371 info->
url->c_str(), strerror(errno), errno);
385 bool JobInfo::IsFileNotFound() {
389 return http_code == 404;
396 const int DownloadManager::kProbeUnprobed = -1;
397 const int DownloadManager::kProbeDown = -2;
398 const int DownloadManager::kProbeGeo = -3;
399 const unsigned DownloadManager::kMaxMemSize = 1024*1024;
405 int DownloadManager::ParseHttpCode(
const char digits[3]) {
408 for (
int i = 0; i < 3; ++i) {
409 if ((digits[i] <
'0') || (digits[i] >
'9'))
411 result += (digits[i] -
'0') * factor;
421 int DownloadManager::CallbackCurlSocket(CURL * ,
430 if (action == CURL_POLL_NONE)
449 download_mgr->
watch_fds_ =
static_cast<struct pollfd *
>(
461 download_mgr->
watch_fds_[index].events = POLLIN | POLLPRI;
464 download_mgr->
watch_fds_[index].events = POLLOUT | POLLWRBAND;
466 case CURL_POLL_INOUT:
468 POLLIN | POLLPRI | POLLOUT | POLLWRBAND;
470 case CURL_POLL_REMOVE:
471 if (index < download_mgr->watch_fds_inuse_-1) {
483 download_mgr->
watch_fds_ =
static_cast<struct pollfd *
>(
501 void *DownloadManager::MainDownload(
void *data) {
505 const int kIdxPipeTerminate = 0;
506 const int kIdxPipeJobs = 1;
509 static_cast<struct pollfd *
>(smalloc(2 *
sizeof(
struct pollfd)));
511 download_mgr->
watch_fds_[kIdxPipeTerminate].fd =
513 download_mgr->
watch_fds_[kIdxPipeTerminate].events = POLLIN | POLLPRI;
514 download_mgr->
watch_fds_[kIdxPipeTerminate].revents = 0;
517 download_mgr->
watch_fds_[kIdxPipeJobs].events = POLLIN | POLLPRI;
518 download_mgr->
watch_fds_[kIdxPipeJobs].revents = 0;
521 int still_running = 0;
522 struct timeval timeval_start, timeval_stop;
523 gettimeofday(&timeval_start, NULL);
539 gettimeofday(&timeval_stop, NULL);
540 int64_t delta =
static_cast<int64_t
>(
552 curl_multi_socket_action(download_mgr->
curl_multi_,
559 if (download_mgr->
watch_fds_[kIdxPipeTerminate].revents)
563 if (download_mgr->
watch_fds_[kIdxPipeJobs].revents) {
564 download_mgr->
watch_fds_[kIdxPipeJobs].revents = 0;
567 if (!still_running) {
568 gettimeofday(&timeval_start, NULL);
573 curl_multi_add_handle(download_mgr->
curl_multi_, handle);
574 curl_multi_socket_action(download_mgr->
curl_multi_,
591 if (download_mgr->
watch_fds_[i].revents & (POLLIN | POLLPRI))
592 ev_bitmask |= CURL_CSELECT_IN;
593 if (download_mgr->
watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
594 ev_bitmask |= CURL_CSELECT_OUT;
596 (POLLERR | POLLHUP | POLLNVAL))
598 ev_bitmask |= CURL_CSELECT_ERR;
602 curl_multi_socket_action(download_mgr->
curl_multi_,
612 while ((curl_msg = curl_multi_info_read(download_mgr->
curl_multi_,
615 if (curl_msg->msg == CURLMSG_DONE) {
618 CURL *easy_handle = curl_msg->easy_handle;
619 int curl_error = curl_msg->
data.result;
620 curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
622 curl_multi_remove_handle(download_mgr->
curl_multi_, easy_handle);
624 curl_multi_add_handle(download_mgr->
curl_multi_, easy_handle);
625 curl_multi_socket_action(download_mgr->
curl_multi_,
642 curl_multi_remove_handle(download_mgr->
curl_multi_, *i);
643 curl_easy_cleanup(*i);
656 HeaderLists::~HeaderLists() {
657 for (
unsigned i = 0; i < blocks_.size(); ++i) {
664 curl_slist *HeaderLists::GetList(
const char *header) {
669 curl_slist *HeaderLists::DuplicateList(curl_slist *slist) {
671 curl_slist *copy = GetList(slist->data);
672 copy->next = slist->next;
673 curl_slist *prev = copy;
676 curl_slist *new_link = Get(slist->data);
677 new_link->next = slist->next;
678 prev->next = new_link;
686 void HeaderLists::AppendHeader(curl_slist *slist,
const char *header) {
688 curl_slist *new_link = Get(header);
689 new_link->next = NULL;
693 slist->next = new_link;
702 void HeaderLists::CutHeader(
const char *header, curl_slist **slist) {
706 curl_slist *prev = &head;
707 curl_slist *rover = *slist;
709 if (strcmp(rover->data, header) == 0) {
710 prev->next = rover->next;
721 void HeaderLists::PutList(curl_slist *slist) {
723 curl_slist *next = slist->next;
730 string HeaderLists::Print(curl_slist *slist) {
733 verbose += string(slist->data) +
"\n";
740 curl_slist *HeaderLists::Get(
const char *header) {
741 for (
unsigned i = 0; i < blocks_.size(); ++i) {
742 for (
unsigned j = 0; j < kBlockSize; ++j) {
743 if (!IsUsed(&(blocks_[i][j]))) {
744 blocks_[i][j].data =
const_cast<char *
>(header);
745 return &(blocks_[i][j]);
752 blocks_[blocks_.size()-1][0].data =
const_cast<char *
>(header);
753 return &(blocks_[blocks_.size()-1][0]);
757 void HeaderLists::Put(curl_slist *slist) {
763 void HeaderLists::AddBlock() {
764 curl_slist *new_block =
new curl_slist[kBlockSize];
765 for (
unsigned i = 0; i < kBlockSize; ++i) {
768 blocks_.push_back(new_block);
775 string DownloadManager::ProxyInfo::Print() {
781 static_cast<int>(host.deadline()) - static_cast<int>(time(NULL));
782 string expinfo = (remaining >= 0) ?
"+" :
"";
783 if (abs(remaining) >= 3600) {
785 }
else if (abs(remaining) >= 60) {
791 result +=
" (" + host.name() +
", " + expinfo +
")";
793 result +=
" (:unresolved:, " + expinfo +
")";
803 CURL *DownloadManager::AcquireCurlHandle() {
806 if (pool_handles_idle_->empty()) {
808 handle = curl_easy_init();
811 curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
816 handle = *(pool_handles_idle_->begin());
817 pool_handles_idle_->erase(pool_handles_idle_->begin());
820 pool_handles_inuse_->insert(handle);
826 void DownloadManager::ReleaseCurlHandle(CURL *handle) {
827 set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
828 assert(elem != pool_handles_inuse_->end());
830 if (pool_handles_idle_->size() > pool_max_handles_) {
831 curl_easy_cleanup(*elem);
833 pool_handles_idle_->insert(*elem);
836 pool_handles_inuse_->erase(elem);
844 void DownloadManager::InitializeRequest(
JobInfo *info, CURL *handle) {
854 info->
headers = header_lists_->DuplicateList(default_headers_);
872 char byte_range_array[100];
873 const int64_t range_lower =
static_cast<int64_t
>(info->
range_offset);
874 const int64_t range_upper =
static_cast<int64_t
>(
876 if (snprintf(byte_range_array,
sizeof(byte_range_array),
877 "%" PRId64
"-%" PRId64,
878 range_lower, range_upper) == 100)
882 curl_easy_setopt(handle, CURLOPT_RANGE, byte_range_array);
884 curl_easy_setopt(handle, CURLOPT_RANGE, NULL);
888 curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
889 curl_easy_setopt(handle, CURLOPT_WRITEHEADER,
890 static_cast<void *>(info));
891 curl_easy_setopt(handle, CURLOPT_WRITEDATA, static_cast<void *>(info));
892 curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->
headers);
894 curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
896 curl_easy_setopt(handle, CURLOPT_HTTPGET, 1);
898 if (opt_ipv4_only_) {
899 curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
901 if (follow_redirects_) {
902 curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1);
903 curl_easy_setopt(handle, CURLOPT_MAXREDIRS, 4);
912 void DownloadManager::SetUrlOptions(
JobInfo *info) {
918 if (opt_timestamp_backup_proxies_ > 0) {
919 const time_t now = time(NULL);
920 if (static_cast<int64_t>(now) >
921 static_cast<int64_t>(opt_timestamp_backup_proxies_ +
922 opt_proxy_groups_reset_after_))
924 opt_proxy_groups_current_ = 0;
925 opt_timestamp_backup_proxies_ = 0;
926 RebalanceProxiesUnlocked(
"reset proxy group");
930 if (opt_timestamp_failover_proxies_ > 0) {
931 const time_t now = time(NULL);
932 if (static_cast<int64_t>(now) >
933 static_cast<int64_t>(opt_timestamp_failover_proxies_ +
934 opt_proxy_groups_reset_after_))
936 RebalanceProxiesUnlocked(
"reset load-balanced proxies");
940 if (opt_timestamp_backup_host_ > 0) {
941 const time_t now = time(NULL);
942 if (static_cast<int64_t>(now) >
943 static_cast<int64_t>(opt_timestamp_backup_host_ +
944 opt_host_reset_after_))
947 "switching host from %s to %s (reset host)",
948 (*opt_host_chain_)[opt_host_chain_current_].c_str(),
949 (*opt_host_chain_)[0].c_str());
950 opt_host_chain_current_ = 0;
951 opt_timestamp_backup_host_ = 0;
956 if (!proxy || (proxy->
url ==
"DIRECT")) {
957 info->
proxy =
"DIRECT";
958 curl_easy_setopt(info->
curl_handle, CURLOPT_PROXY,
"");
963 std::string purl = proxy->
url;
965 const bool changed = ValidateProxyIpsUnlocked(purl, phost);
974 curl_easy_setopt(info->
curl_handle, CURLOPT_PROXY,
"0.0.0.0");
977 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, opt_low_speed_limit_);
978 if (info->
proxy !=
"DIRECT") {
979 curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_proxy_);
980 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_proxy_);
982 curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_direct_);
983 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_direct_);
985 if (!opt_dns_server_.empty())
986 curl_easy_setopt(curl_handle, CURLOPT_DNS_SERVERS, opt_dns_server_.c_str());
989 url_prefix = (*opt_host_chain_)[opt_host_chain_current_];
993 string url = url_prefix + *(info->
url);
995 curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 1L);
996 if (url.substr(0, 5) ==
"https") {
997 bool rvb = ssl_certificate_store_.ApplySslCertificatePath(curl_handle);
1000 "Failed to set SSL certificate path %s",
1001 ssl_certificate_store_.GetCaPath().c_str());
1003 if (info->
pid != -1) {
1004 if (credentials_attachment_ == NULL) {
1006 "uses secure downloads but no credentials attachment set");
1008 bool retval = credentials_attachment_->ConfigureCurlHandle(
1019 signal(SIGPIPE, SIG_IGN);
1022 if (url.find(
"@proxy@") != string::npos) {
1030 if (proxy_template_forced_ !=
"") {
1031 replacement = proxy_template_forced_;
1032 }
else if (info->
proxy ==
"DIRECT") {
1033 replacement = proxy_template_direct_;
1035 if (opt_proxy_groups_current_ >= opt_proxy_groups_fallback_) {
1038 info->
proxy =
"DIRECT";
1039 curl_easy_setopt(info->
curl_handle, CURLOPT_PROXY,
"");
1040 replacement = proxy_template_direct_;
1042 replacement = ChooseProxyUnlocked(info->
expected_hash)->host.name();
1045 replacement = (replacement ==
"") ? proxy_template_direct_ : replacement;
1047 replacement.c_str());
1048 url =
ReplaceAll(url,
"@proxy@", replacement);
1059 curl_easy_setopt(curl_handle, CURLOPT_URL,
EscapeUrl(url).c_str());
1072 bool DownloadManager::ValidateProxyIpsUnlocked(
1079 host.
name().c_str());
1081 unsigned group_idx = opt_proxy_groups_current_;
1084 bool update_only =
true;
1088 "failed to resolve IP addresses for %s (%d - %s)",
1093 update_only =
false;
1097 for (
unsigned i = 0; i < (*opt_proxy_groups_)[group_idx].size(); ++i) {
1098 if ((*opt_proxy_groups_)[group_idx][i].host.id() == host.
id())
1099 (*opt_proxy_groups_)[group_idx][i].host = new_host;
1108 "DNS entries for proxy %s changed, adjusting", host.
name().c_str());
1109 vector<ProxyInfo> *group = current_proxy_group();
1110 opt_num_proxies_ -= group->size();
1111 for (
unsigned i = 0; i < group->size(); ) {
1112 if ((*group)[i].host.id() == host.
id()) {
1113 group->erase(group->begin() + i);
1118 vector<ProxyInfo> new_infos;
1120 set<string>::const_iterator iter_ips = best_addresses.begin();
1121 for (; iter_ips != best_addresses.end(); ++iter_ips) {
1123 new_infos.push_back(
ProxyInfo(new_host, url_ip));
1125 group->insert(group->end(), new_infos.begin(), new_infos.end());
1126 opt_num_proxies_ += new_infos.size();
1128 RebalanceProxiesUnlocked(
"DNS change");
1136 void DownloadManager::UpdateStatistics(CURL *handle) {
1141 retval = curl_easy_getinfo(handle, CURLINFO_SIZE_DOWNLOAD, &val);
1142 assert(retval == CURLE_OK);
1143 sum +=
static_cast<int64_t
>(val);
1147 perf::Xadd(counters_->sz_transferred_bytes, sum);
1154 bool DownloadManager::CanRetry(
const JobInfo *info) {
1156 unsigned max_retries = opt_max_retries_;
1171 unsigned backoff_init_ms = 0;
1172 unsigned backoff_max_ms = 0;
1175 backoff_init_ms = opt_backoff_init_ms_;
1176 backoff_max_ms = opt_backoff_max_ms_;
1182 info->
backoff_ms = prng_.Next(backoff_init_ms + 1);
1195 header_lists_->AppendHeader(info->
headers,
"Pragma: no-cache");
1196 header_lists_->AppendHeader(info->
headers,
"Cache-Control: no-cache");
1206 void DownloadManager::SetRegularCache(
JobInfo *info) {
1209 header_lists_->CutHeader(
"Pragma: no-cache", &(info->
headers));
1210 header_lists_->CutHeader(
"Cache-Control: no-cache", &(info->
headers));
1219 void DownloadManager::ReleaseCredential(
JobInfo *info) {
1221 assert(credentials_attachment_ != NULL);
1222 credentials_attachment_->ReleaseCurlHandle(info->
curl_handle,
1235 bool DownloadManager::VerifyAndFinalize(
const int curl_error,
JobInfo *info) {
1237 "Verify downloaded url %s, proxy %s (curl error %d)",
1238 info->
url->c_str(), info->
proxy.c_str(), curl_error);
1242 switch (curl_error) {
1250 "hash verification of %s failed (expected %s, got %s)",
1272 "decompression (memory) of url %s failed",
1273 info->
url->c_str());
1281 case CURLE_UNSUPPORTED_PROTOCOL:
1284 case CURLE_URL_MALFORMAT:
1287 case CURLE_COULDNT_RESOLVE_PROXY:
1290 case CURLE_COULDNT_RESOLVE_HOST:
1293 case CURLE_OPERATION_TIMEDOUT:
1297 case CURLE_PARTIAL_FILE:
1298 case CURLE_GOT_NOTHING:
1299 case CURLE_RECV_ERROR:
1303 case CURLE_FILE_COULDNT_READ_FILE:
1304 case CURLE_COULDNT_CONNECT:
1305 if (info->
proxy !=
"DIRECT") {
1312 case CURLE_TOO_MANY_REDIRECTS:
1315 case CURLE_SSL_CACERT_BADFILE:
1317 "Failed to load certificate bundle. "
1318 "X509_CERT_BUNDLE might point to the wrong location.");
1323 case CURLE_PEER_FAILED_VERIFICATION:
1325 "invalid SSL certificate of remote host. "
1326 "X509_CERT_DIR and/or X509_CERT_BUNDLE might point to the wrong "
1330 case CURLE_ABORTED_BY_CALLBACK:
1331 case CURLE_WRITE_ERROR:
1334 case CURLE_SEND_ERROR:
1343 "trying to fetch %s", curl_error, info->
url->c_str());
1348 std::vector<std::string> *host_chain = opt_host_chain_;
1351 bool try_again =
false;
1352 bool same_url_retry = CanRetry(info);
1361 "data corruption with no-cache header, try another host");
1366 if ( same_url_retry || (
1376 if ( same_url_retry || (
1391 if (opt_proxy_groups_) {
1392 if ((opt_proxy_groups_current_ > 0) ||
1393 (opt_proxy_groups_current_burned_ > 0))
1395 opt_proxy_groups_current_ = 0;
1396 opt_timestamp_backup_proxies_ = 0;
1397 RebalanceProxiesUnlocked(
"reset proxies for host failover");
1414 "same url: %d, error code %d", same_url_retry, info->
error_code);
1424 goto verify_and_finalize_stop;
1435 goto verify_and_finalize_stop;
1441 goto verify_and_finalize_stop;
1448 SetRegularCache(info);
1451 bool switch_proxy =
false;
1452 bool switch_host =
false;
1459 switch_proxy =
true;
1468 if (same_url_retry) {
1471 switch_proxy =
true;
1474 if (same_url_retry) {
1485 ReleaseCredential(info);
1488 SetUrlOptions(info);
1491 ReleaseCredential(info);
1494 SetUrlOptions(info);
1500 verify_and_finalize_stop:
1502 ReleaseCredential(info);
1517 header_lists_->PutList(info->
headers);
1525 DownloadManager::DownloadManager() {
1526 pool_handles_idle_ = NULL;
1527 pool_handles_inuse_ = NULL;
1528 pool_max_handles_ = 0;
1530 default_headers_ = NULL;
1532 atomic_init32(&multi_threaded_);
1533 pipe_terminate_ = NULL;
1537 watch_fds_size_ = 0;
1538 watch_fds_inuse_ = 0;
1542 reinterpret_cast<pthread_mutex_t *
>(smalloc(
sizeof(pthread_mutex_t)));
1543 int retval = pthread_mutex_init(lock_options_, NULL);
1545 lock_synchronous_mode_ =
1546 reinterpret_cast<pthread_mutex_t *
>(smalloc(
sizeof(pthread_mutex_t)));
1547 retval = pthread_mutex_init(lock_synchronous_mode_, NULL);
1550 opt_dns_server_ =
"";
1552 opt_timeout_proxy_ = 0;
1553 opt_timeout_direct_ = 0;
1554 opt_low_speed_limit_ = 0;
1555 opt_host_chain_ = NULL;
1556 opt_host_chain_rtt_ = NULL;
1557 opt_host_chain_current_ = 0;
1558 opt_proxy_groups_ = NULL;
1559 opt_proxy_groups_current_ = 0;
1560 opt_proxy_groups_current_burned_ = 0;
1561 opt_num_proxies_ = 0;
1562 opt_proxy_shard_ =
false;
1563 opt_max_retries_ = 0;
1564 opt_backoff_init_ms_ = 0;
1565 opt_backoff_max_ms_ = 0;
1566 enable_info_header_ =
false;
1567 opt_ipv4_only_ =
false;
1568 follow_redirects_ =
false;
1572 opt_timestamp_backup_proxies_ = 0;
1573 opt_timestamp_failover_proxies_ = 0;
1574 opt_proxy_groups_reset_after_ = 0;
1575 opt_timestamp_backup_host_ = 0;
1576 opt_host_reset_after_ = 0;
1578 credentials_attachment_ = NULL;
1584 DownloadManager::~DownloadManager() {
1585 pthread_mutex_destroy(lock_options_);
1586 pthread_mutex_destroy(lock_synchronous_mode_);
1587 free(lock_options_);
1588 free(lock_synchronous_mode_);
1591 void DownloadManager::InitHeaders() {
1593 string cernvm_id =
"User-Agent: cvmfs ";
1594 #ifdef CVMFS_LIBCVMFS
1595 cernvm_id +=
"libcvmfs ";
1597 cernvm_id +=
"Fuse ";
1599 cernvm_id += string(VERSION);
1600 if (getenv(
"CERNVM_UUID") != NULL) {
1604 user_agent_ = strdup(cernvm_id.c_str());
1608 default_headers_ = header_lists_->GetList(
"Connection: Keep-Alive");
1609 header_lists_->AppendHeader(default_headers_,
"Pragma:");
1610 header_lists_->AppendHeader(default_headers_, user_agent_);
1614 void DownloadManager::FiniHeaders() {
1615 delete header_lists_;
1616 header_lists_ = NULL;
1617 default_headers_ = NULL;
1624 atomic_init32(&multi_threaded_);
1625 int retval = curl_global_init(CURL_GLOBAL_ALL);
1626 assert(retval == CURLE_OK);
1627 pool_handles_idle_ =
new set<CURL *>;
1628 pool_handles_inuse_ =
new set<CURL *>;
1629 pool_max_handles_ = max_pool_handles;
1630 watch_fds_max_ = 4*pool_max_handles_;
1632 opt_timeout_proxy_ = 5;
1633 opt_timeout_direct_ = 10;
1634 opt_low_speed_limit_ = 1024;
1635 opt_proxy_groups_current_ = 0;
1636 opt_proxy_groups_current_burned_ = 0;
1637 opt_num_proxies_ = 0;
1638 opt_proxy_shard_ =
false;
1639 opt_host_chain_current_ = 0;
1642 counters_ =
new Counters(statistics);
1647 curl_multi_ = curl_multi_init();
1648 assert(curl_multi_ != NULL);
1649 curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION, CallbackCurlSocket);
1650 curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1651 static_cast<void *>(
this));
1652 curl_multi_setopt(curl_multi_, CURLMOPT_MAXCONNECTS, watch_fds_max_);
1653 curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1656 prng_.InitLocaltime();
1659 if ((getenv(
"CVMFS_IPV4_ONLY") != NULL) &&
1660 (strlen(getenv(
"CVMFS_IPV4_ONLY")) > 0))
1662 opt_ipv4_only_ =
true;
1665 kDnsDefaultRetries, kDnsDefaultTimeoutMs);
1671 if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1674 pthread_join(thread_download_, NULL);
1676 pipe_terminate_.Destroy();
1677 pipe_jobs_.Destroy();
1680 for (set<CURL *>::iterator i = pool_handles_idle_->begin(),
1681 iEnd = pool_handles_idle_->end(); i != iEnd; ++i)
1683 curl_easy_cleanup(*i);
1685 delete pool_handles_idle_;
1686 delete pool_handles_inuse_;
1687 curl_multi_cleanup(curl_multi_);
1688 pool_handles_idle_ = NULL;
1689 pool_handles_inuse_ = NULL;
1700 delete opt_host_chain_;
1701 delete opt_host_chain_rtt_;
1702 opt_proxy_map_.clear();
1703 delete opt_proxy_groups_;
1704 opt_host_chain_ = NULL;
1705 opt_host_chain_rtt_ = NULL;
1706 opt_proxy_groups_ = NULL;
1708 curl_global_cleanup();
1723 int retval = pthread_create(&thread_download_, NULL, MainDownload,
1724 static_cast<void *>(
this));
1727 atomic_inc32(&multi_threaded_);
1752 if (enable_info_header_ && info->
extra_info) {
1753 const char *header_name =
"cvmfs-info: ";
1754 const size_t header_name_len = strlen(header_name);
1755 const unsigned header_size = 1 + header_name_len +
1757 info->
info_header =
static_cast<char *
>(alloca(header_size));
1758 memcpy(info->
info_header, header_name, header_name_len);
1760 header_size - header_name_len);
1764 if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1777 CURL *handle = AcquireCurlHandle();
1778 InitializeRequest(info, handle);
1779 SetUrlOptions(info);
1783 retval = curl_easy_perform(handle);
1786 if (curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &elapsed) == CURLE_OK)
1789 static_cast<int64_t>(elapsed * 1000));
1791 }
while (VerifyAndFinalize(retval, info));
1820 credentials_attachment_ = ca;
1826 std::string DownloadManager::GetDnsServer()
const {
1827 return opt_dns_server_;
1834 void DownloadManager::SetDnsServer(
const string &address) {
1835 if (!address.empty()) {
1837 opt_dns_server_ = address;
1838 assert(!opt_dns_server_.empty());
1840 vector<string> servers;
1841 servers.push_back(address);
1842 bool retval = resolver_->SetResolvers(servers);
1852 void DownloadManager::SetDnsParameters(
1854 const unsigned timeout_ms)
1857 if ((resolver_->retries() ==
retries) &&
1858 (resolver_->timeout_ms() == timeout_ms))
1870 void DownloadManager::SetDnsTtlLimits(
1871 const unsigned min_seconds,
1872 const unsigned max_seconds)
1875 resolver_->set_min_ttl(min_seconds);
1876 resolver_->set_max_ttl(max_seconds);
1882 opt_ip_preference_ = preference;
1891 void DownloadManager::SetTimeout(
const unsigned seconds_proxy,
1892 const unsigned seconds_direct)
1895 opt_timeout_proxy_ = seconds_proxy;
1896 opt_timeout_direct_ = seconds_direct;
1905 void DownloadManager::SetLowSpeedLimit(
const unsigned low_speed_limit) {
1907 opt_low_speed_limit_ = low_speed_limit;
1914 void DownloadManager::GetTimeout(
unsigned *seconds_proxy,
1915 unsigned *seconds_direct)
1918 *seconds_proxy = opt_timeout_proxy_;
1919 *seconds_direct = opt_timeout_direct_;
1927 void DownloadManager::SetHostChain(
const string &host_list) {
1932 void DownloadManager::SetHostChain(
const std::vector<std::string> &host_list) {
1934 opt_timestamp_backup_host_ = 0;
1935 delete opt_host_chain_;
1936 delete opt_host_chain_rtt_;
1937 opt_host_chain_current_ = 0;
1939 if (host_list.empty()) {
1940 opt_host_chain_ = NULL;
1941 opt_host_chain_rtt_ = NULL;
1945 opt_host_chain_ =
new vector<string>(host_list);
1946 opt_host_chain_rtt_ =
1947 new vector<int>(opt_host_chain_->size(), kProbeUnprobed);
1958 void DownloadManager::GetHostInfo(vector<string> *host_chain, vector<int> *rtt,
1959 unsigned *current_host)
1962 if (opt_host_chain_) {
1963 if (current_host) {*current_host = opt_host_chain_current_;}
1964 if (host_chain) {*host_chain = *opt_host_chain_;}
1965 if (rtt) {*rtt = *opt_host_chain_rtt_;}
1978 void DownloadManager::SwitchProxy(
JobInfo *info) {
1981 if (!opt_proxy_groups_) {
1986 vector<ProxyInfo> *group = current_proxy_group();
1987 const unsigned group_size = group->size();
1988 unsigned failed = 0;
1989 for (
unsigned i = 0; i < group_size - opt_proxy_groups_current_burned_; ++i) {
1990 if (info && (info->
proxy == (*group)[i].url)) {
1992 opt_proxy_groups_current_burned_++;
1994 (*group)[group_size - opt_proxy_groups_current_burned_]);
2006 if (opt_proxy_groups_current_burned_ == group->size()) {
2007 opt_proxy_groups_current_burned_ = 0;
2008 if (opt_proxy_groups_->size() > 1) {
2009 opt_proxy_groups_current_ = (opt_proxy_groups_current_ + 1) %
2010 opt_proxy_groups_->size();
2012 if (opt_proxy_groups_reset_after_ > 0) {
2013 if (opt_proxy_groups_current_ > 0) {
2014 if (opt_timestamp_backup_proxies_ == 0)
2015 opt_timestamp_backup_proxies_ = time(NULL);
2019 opt_timestamp_backup_proxies_ = 0;
2023 opt_timestamp_failover_proxies_ = 0;
2028 if (opt_proxy_groups_reset_after_ > 0) {
2029 if (opt_timestamp_failover_proxies_ == 0)
2030 opt_timestamp_failover_proxies_ = time(NULL);
2034 UpdateProxiesUnlocked(
"failed proxy");
2036 current_proxy_group()->
size() - opt_proxy_groups_current_burned_);
2048 if (!opt_host_chain_ || (opt_host_chain_->size() == 1)) {
2054 "don't switch host, "
2055 "last used host: %s, current host: %s",
2057 (*opt_host_chain_)[opt_host_chain_current_].c_str());
2061 string reason =
"manually triggered";
2066 string old_host = (*opt_host_chain_)[opt_host_chain_current_];
2067 opt_host_chain_current_ =
2068 (opt_host_chain_current_ + 1) % opt_host_chain_->size();
2071 "switching host from %s to %s (%s)", old_host.c_str(),
2072 (*opt_host_chain_)[opt_host_chain_current_].c_str(),
2076 if (opt_host_reset_after_ > 0) {
2077 if (opt_host_chain_current_ != 0) {
2078 if (opt_timestamp_backup_host_ == 0)
2079 opt_timestamp_backup_host_ = time(NULL);
2081 opt_timestamp_backup_host_ = 0;
2086 void DownloadManager::SwitchHost() {
2097 void DownloadManager::ProbeHosts() {
2098 vector<string> host_chain;
2099 vector<int> host_rtt;
2100 unsigned current_host;
2102 GetHostInfo(&host_chain, &host_rtt, ¤t_host);
2107 JobInfo info(&url,
false,
false, NULL);
2108 for (retries = 0; retries < 2; ++
retries) {
2109 for (i = 0; i < host_chain.size(); ++i) {
2110 url = host_chain[i] +
"/.cvmfspublished";
2112 struct timeval tv_start, tv_end;
2113 gettimeofday(&tv_start, NULL);
2115 gettimeofday(&tv_end, NULL);
2119 host_rtt[i] =
static_cast<int>(
2122 url.c_str(), host_rtt[i]);
2126 host_rtt[i] = INT_MAX;
2132 for (i = 0; i < host_chain.size(); ++i) {
2133 if (host_rtt[i] == INT_MAX) host_rtt[i] = kProbeDown;
2137 delete opt_host_chain_;
2138 delete opt_host_chain_rtt_;
2139 opt_host_chain_ =
new vector<string>(host_chain);
2140 opt_host_chain_rtt_ =
new vector<int>(host_rtt);
2141 opt_host_chain_current_ = 0;
2144 bool DownloadManager::GeoSortServers(std::vector<std::string> *servers,
2145 std::vector<uint64_t> *output_order) {
2146 if (!servers) {
return false;}
2147 if (servers->size() == 1) {
2149 output_order->clear();
2150 output_order->push_back(0);
2155 std::vector<std::string> host_chain;
2156 GetHostInfo(&host_chain, NULL, NULL);
2158 std::vector<std::string> server_dns_names;
2159 server_dns_names.reserve(servers->size());
2160 for (
unsigned i = 0; i < servers->size(); ++i) {
2162 server_dns_names.push_back(host.empty() ? (*servers)[i] : host);
2164 std::string host_list =
JoinStrings(server_dns_names,
",");
2166 vector<string> host_chain_shuffled;
2171 host_chain_shuffled =
Shuffle(host_chain, &prng_);
2174 bool success =
false;
2175 unsigned max_attempts = std::min(host_chain_shuffled.size(), size_t(3));
2176 vector<uint64_t> geo_order(servers->size());
2177 for (
unsigned i = 0; i < max_attempts; ++i) {
2178 string url = host_chain_shuffled[i] +
"/api/v1.0/geo/@proxy@/" + host_list;
2180 "requesting ordered server list from %s", url.c_str());
2181 JobInfo info(&url,
false,
false, NULL);
2184 string order(info.destination_mem.data, info.destination_mem.size);
2185 free(info.destination_mem.data);
2186 bool retval = ValidateGeoReply(order, servers->size(), &geo_order);
2189 "retrieved invalid GeoAPI reply from %s [%s]",
2190 url.c_str(), order.c_str());
2193 "geographic order of servers retrieved from %s",
2201 "GeoAPI request %s failed with error %d [%s]",
2207 "failed to retrieve geographic order from stratum 1 servers");
2212 output_order->swap(geo_order);
2214 std::vector<std::string> sorted_servers;
2215 sorted_servers.reserve(geo_order.size());
2216 for (
unsigned i = 0; i < geo_order.size(); ++i) {
2217 uint64_t orderval = geo_order[i];
2218 sorted_servers.push_back((*servers)[orderval]);
2220 servers->swap(sorted_servers);
2233 bool DownloadManager::ProbeGeo() {
2234 vector<string> host_chain;
2235 vector<int> host_rtt;
2236 unsigned current_host;
2237 vector< vector<ProxyInfo> > proxy_chain;
2238 unsigned fallback_group;
2240 GetHostInfo(&host_chain, &host_rtt, ¤t_host);
2241 GetProxyInfo(&proxy_chain, NULL, &fallback_group);
2242 if ((host_chain.size() < 2) && ((proxy_chain.size() - fallback_group) < 2))
2245 vector<string> host_names;
2246 for (
unsigned i = 0; i < host_chain.size(); ++i)
2248 SortTeam(&host_names, &host_chain);
2249 unsigned last_geo_host = host_names.size();
2251 if ((fallback_group == 0) && (last_geo_host > 1)) {
2257 host_names.push_back(
"+PXYSEP+");
2261 unsigned first_geo_fallback = host_names.size();
2262 for (
unsigned i = fallback_group; i < proxy_chain.size(); ++i) {
2265 host_names.push_back(proxy_chain[i][0].host.name());
2268 std::vector<uint64_t> geo_order;
2269 bool success = GeoSortServers(&host_names, &geo_order);
2277 delete opt_host_chain_;
2278 opt_num_proxies_ = 0;
2279 opt_host_chain_ =
new vector<string>(host_chain.size());
2283 vector<vector<ProxyInfo> > *proxy_groups =
new vector<vector<ProxyInfo> >(
2284 opt_proxy_groups_fallback_ + proxy_chain.size() - fallback_group);
2286 for (
unsigned i = 0; i < opt_proxy_groups_fallback_; ++i) {
2287 (*proxy_groups)[i] = (*opt_proxy_groups_)[i];
2288 opt_num_proxies_ += (*opt_proxy_groups_)[i].size();
2296 unsigned proxyi = opt_proxy_groups_fallback_;
2297 for (
unsigned i = 0; i < geo_order.size(); ++i) {
2298 uint64_t orderval = geo_order[i];
2299 if (orderval < static_cast<uint64_t>(last_geo_host)) {
2302 (*opt_host_chain_)[hosti++] = host_chain[orderval];
2303 }
else if (orderval >= static_cast<uint64_t>(first_geo_fallback)) {
2307 (*proxy_groups)[proxyi] =
2308 proxy_chain[fallback_group + orderval - first_geo_fallback];
2309 opt_num_proxies_ += (*proxy_groups)[proxyi].size();
2314 opt_proxy_map_.clear();
2315 delete opt_proxy_groups_;
2316 opt_proxy_groups_ = proxy_groups;
2319 if (opt_proxy_groups_current_ > opt_proxy_groups_->size()) {
2320 if (opt_proxy_groups_->size() == 0) {
2321 opt_proxy_groups_current_ = 0;
2323 opt_proxy_groups_current_ = opt_proxy_groups_->size() - 1;
2325 opt_proxy_groups_current_burned_ = 0;
2328 UpdateProxiesUnlocked(
"geosort");
2330 delete opt_host_chain_rtt_;
2331 opt_host_chain_rtt_ =
new vector<int>(host_chain.size(), kProbeGeo);
2332 opt_host_chain_current_ = 0;
2345 bool DownloadManager::ValidateGeoReply(
2346 const string &reply_order,
2347 const unsigned expected_size,
2348 vector<uint64_t> *reply_vals)
2350 if (reply_order.empty())
2353 if (!sanitizer.
IsValid(reply_order))
2356 vector<string> reply_strings =
2358 vector<uint64_t> tmp_vals;
2359 for (
unsigned i = 0; i < reply_strings.size(); ++i) {
2360 if (reply_strings[i].empty())
2364 if (tmp_vals.size() != expected_size)
2368 set<uint64_t> coverage(tmp_vals.begin(), tmp_vals.end());
2369 if (coverage.size() != tmp_vals.size())
2371 if ((*coverage.begin() != 1) || (*coverage.rbegin() != coverage.size()))
2374 for (
unsigned i = 0; i < expected_size; ++i) {
2375 (*reply_vals)[i] = tmp_vals[i] - 1;
2385 bool DownloadManager::StripDirect(
2386 const string &proxy_list,
2387 string *cleaned_list)
2390 if (proxy_list ==
"") {
2394 bool result =
false;
2396 vector<string> proxy_groups =
SplitString(proxy_list,
';');
2397 vector<string> cleaned_groups;
2398 for (
unsigned i = 0; i < proxy_groups.size(); ++i) {
2399 vector<string> group =
SplitString(proxy_groups[i],
'|');
2400 vector<string> cleaned;
2401 for (
unsigned j = 0; j < group.size(); ++j) {
2402 if ((group[j] ==
"DIRECT") || (group[j] ==
"")) {
2405 cleaned.push_back(group[j]);
2408 if (!cleaned.empty())
2409 cleaned_groups.push_back(
JoinStrings(cleaned,
"|"));
2425 void DownloadManager::SetProxyChain(
2426 const string &proxy_list,
2427 const string &fallback_proxy_list,
2432 opt_timestamp_backup_proxies_ = 0;
2433 opt_timestamp_failover_proxies_ = 0;
2434 string set_proxy_list = opt_proxy_list_;
2435 string set_proxy_fallback_list = opt_proxy_fallback_list_;
2436 bool contains_direct;
2437 if ((set_mode == kSetProxyFallback) || (set_mode == kSetProxyBoth)) {
2438 opt_proxy_fallback_list_ = fallback_proxy_list;
2440 if ((set_mode == kSetProxyRegular) || (set_mode == kSetProxyBoth)) {
2441 opt_proxy_list_ = proxy_list;
2444 StripDirect(opt_proxy_fallback_list_, &set_proxy_fallback_list);
2445 if (contains_direct) {
2447 "fallback proxies do not support DIRECT, removing");
2449 if (set_proxy_fallback_list ==
"") {
2450 set_proxy_list = opt_proxy_list_;
2452 bool contains_direct = StripDirect(opt_proxy_list_, &set_proxy_list);
2453 if (contains_direct) {
2455 "skipping DIRECT proxy to use fallback proxy");
2462 opt_proxy_map_.clear();
2463 delete opt_proxy_groups_;
2464 if ((set_proxy_list ==
"") && (set_proxy_fallback_list ==
"")) {
2465 opt_proxy_groups_ = NULL;
2466 opt_proxy_groups_current_ = 0;
2467 opt_proxy_groups_current_burned_ = 0;
2468 opt_proxy_groups_fallback_ = 0;
2469 opt_num_proxies_ = 0;
2474 opt_proxy_groups_fallback_ = 0;
2475 if (set_proxy_list !=
"") {
2476 opt_proxy_groups_fallback_ =
SplitString(set_proxy_list,
';').size();
2479 opt_proxy_groups_fallback_);
2483 string all_proxy_list = set_proxy_list;
2484 if (set_proxy_fallback_list !=
"") {
2485 if (all_proxy_list !=
"")
2486 all_proxy_list +=
";";
2487 all_proxy_list += set_proxy_fallback_list;
2490 all_proxy_list.c_str());
2493 vector<string> hostnames;
2494 vector<string> proxy_groups;
2495 if (all_proxy_list !=
"")
2497 for (
unsigned i = 0; i < proxy_groups.size(); ++i) {
2498 vector<string> this_group =
SplitString(proxy_groups[i],
'|');
2499 for (
unsigned j = 0; j < this_group.size(); ++j) {
2505 hostnames.push_back(hostname);
2508 vector<dns::Host> hosts;
2511 resolver_->ResolveMany(hostnames, &hosts);
2515 opt_proxy_groups_ =
new vector< vector<ProxyInfo> >();
2516 opt_num_proxies_ = 0;
2517 unsigned num_proxy = 0;
2518 for (
unsigned i = 0; i < proxy_groups.size(); ++i) {
2519 vector<string> this_group =
SplitString(proxy_groups[i],
'|');
2523 vector<ProxyInfo> infos;
2524 for (
unsigned j = 0; j < this_group.size(); ++j, ++num_proxy) {
2526 if (this_group[j] ==
"DIRECT") {
2533 "failed to resolve IP addresses for %s (%d - %s)",
2534 hosts[num_proxy].name().c_str(), hosts[num_proxy].status(),
2538 infos.push_back(
ProxyInfo(failed_host, this_group[j]));
2543 set<string> best_addresses =
2545 set<string>::const_iterator iter_ips = best_addresses.begin();
2546 for (; iter_ips != best_addresses.end(); ++iter_ips) {
2548 infos.push_back(
ProxyInfo(hosts[num_proxy], url_ip));
2551 opt_proxy_groups_->push_back(infos);
2552 opt_num_proxies_ += infos.size();
2555 "installed %u proxies in %u load-balance groups",
2556 opt_num_proxies_, opt_proxy_groups_->size());
2557 opt_proxy_groups_current_ = 0;
2558 opt_proxy_groups_current_burned_ = 0;
2561 if (opt_proxy_groups_->size() > 0) {
2563 UpdateProxiesUnlocked(
"set proxies");
2574 void DownloadManager::GetProxyInfo(vector< vector<ProxyInfo> > *proxy_chain,
2575 unsigned *current_group,
2576 unsigned *fallback_group)
2578 assert(proxy_chain != NULL);
2582 if (!opt_proxy_groups_) {
2583 vector< vector<ProxyInfo> > empty_chain;
2584 *proxy_chain = empty_chain;
2585 if (current_group != NULL)
2587 if (fallback_group != NULL)
2588 *fallback_group = 0;
2592 *proxy_chain = *opt_proxy_groups_;
2593 if (current_group != NULL)
2594 *current_group = opt_proxy_groups_current_;
2595 if (fallback_group != NULL)
2596 *fallback_group = opt_proxy_groups_fallback_;
2599 string DownloadManager::GetProxyList() {
2600 return opt_proxy_list_;
2603 string DownloadManager::GetFallbackProxyList() {
2604 return opt_proxy_fallback_list_;
2612 if (!opt_proxy_groups_)
2615 uint32_t key = (hash ? hash->
Partial32() : 0);
2616 map<uint32_t, ProxyInfo *>::iterator it = opt_proxy_map_.lower_bound(key);
2625 void DownloadManager::UpdateProxiesUnlocked(
const string &reason) {
2626 if (!opt_proxy_groups_)
2630 vector<ProxyInfo> *group = current_proxy_group();
2631 unsigned num_alive = (group->size() - opt_proxy_groups_current_burned_);
2632 string old_proxy =
JoinStrings(opt_proxy_urls_,
"|");
2635 opt_proxy_map_.clear();
2636 opt_proxy_urls_.clear();
2637 const uint32_t max_key = 0xffffffffUL;
2638 if (opt_proxy_shard_) {
2640 for (
unsigned i = 0; i < num_alive; ++i) {
2646 for (
unsigned j = 0; j < kProxyMapScale; ++j) {
2647 const std::pair<uint32_t, ProxyInfo *> entry(prng.
Next(max_key), proxy);
2648 opt_proxy_map_.insert(entry);
2650 opt_proxy_urls_.push_back(proxy->
url);
2653 ProxyInfo *first_proxy = opt_proxy_map_.begin()->second;
2654 const std::pair<uint32_t, ProxyInfo *> last_entry(max_key, first_proxy);
2655 opt_proxy_map_.insert(last_entry);
2658 unsigned select = prng_.Next(num_alive);
2660 const std::pair<uint32_t, ProxyInfo *> entry(max_key, proxy);
2661 opt_proxy_map_.insert(entry);
2662 opt_proxy_urls_.push_back(proxy->
url);
2664 sort(opt_proxy_urls_.begin(), opt_proxy_urls_.end());
2667 string new_proxy =
JoinStrings(opt_proxy_urls_,
"|");
2668 if (new_proxy != old_proxy) {
2670 "switching proxy from %s to %s (%s)",
2671 (old_proxy.empty() ?
"(none)" : old_proxy.c_str()),
2672 (new_proxy.empty() ?
"(none)" : new_proxy.c_str()),
2680 void DownloadManager::ShardProxies() {
2681 opt_proxy_shard_ =
true;
2682 RebalanceProxiesUnlocked(
"enable sharding");
2689 void DownloadManager::RebalanceProxiesUnlocked(
const string &reason) {
2690 if (!opt_proxy_groups_)
2693 opt_timestamp_failover_proxies_ = 0;
2694 opt_proxy_groups_current_burned_ = 0;
2695 UpdateProxiesUnlocked(reason);
2699 void DownloadManager::RebalanceProxies() {
2701 RebalanceProxiesUnlocked(
"rebalance");
2708 void DownloadManager::SwitchProxyGroup() {
2711 if (!opt_proxy_groups_ || (opt_proxy_groups_->size() < 2)) {
2715 opt_proxy_groups_current_ = (opt_proxy_groups_current_ + 1) %
2716 opt_proxy_groups_->size();
2717 opt_timestamp_backup_proxies_ = time(NULL);
2718 RebalanceProxiesUnlocked(
"switch proxy group");
2722 void DownloadManager::SetProxyGroupResetDelay(
const unsigned seconds) {
2724 opt_proxy_groups_reset_after_ = seconds;
2725 if (opt_proxy_groups_reset_after_ == 0) {
2726 opt_timestamp_backup_proxies_ = 0;
2727 opt_timestamp_failover_proxies_ = 0;
2732 void DownloadManager::SetHostResetDelay(
const unsigned seconds)
2735 opt_host_reset_after_ = seconds;
2736 if (opt_host_reset_after_ == 0)
2737 opt_timestamp_backup_host_ = 0;
2741 void DownloadManager::SetRetryParameters(
const unsigned max_retries,
2742 const unsigned backoff_init_ms,
2743 const unsigned backoff_max_ms)
2746 opt_max_retries_ = max_retries;
2747 opt_backoff_init_ms_ = backoff_init_ms;
2748 opt_backoff_max_ms_ = backoff_max_ms;
2752 void DownloadManager::SetMaxIpaddrPerProxy(
unsigned limit) {
2754 resolver_->set_throttle(limit);
2758 void DownloadManager::SetProxyTemplates(
2759 const std::string &direct,
2760 const std::string &forced)
2763 proxy_template_direct_ = direct;
2764 proxy_template_forced_ = forced;
2768 void DownloadManager::EnableInfoHeader() {
2769 enable_info_header_ =
true;
2773 void DownloadManager::EnableRedirects() {
2774 follow_redirects_ =
true;
2777 void DownloadManager::UseSystemCertificatePath() {
2778 ssl_certificate_store_.UseSystemCertificatePath();
2789 clone->
Init(pool_max_handles_, statistics);
2795 if (!opt_dns_server_.empty())
2805 if (opt_host_chain_) {
2809 CloneProxyConfig(clone);
2830 if (opt_proxy_groups_ == NULL)
2834 *opt_proxy_groups_);
unsigned opt_timeout_direct_
unsigned opt_low_speed_limit_
void HashString(const std::string &content, Any *any_digest)
#define LogCvmfs(source, mask,...)
std::vector< T > Shuffle(const std::vector< T > &input, Prng *prng)
const char * Code2Ascii(const ObjectFetcherFailures::Failures error)
unsigned opt_backoff_init_ms_
static unsigned EscapeHeader(const string &header, char *escaped_buf, size_t buf_size)
unsigned char num_used_hosts
static bool EscapeUrlChar(char input, char output[3])
int64_t Xadd(class Counter *counter, const int64_t delta)
const char * Code2Ascii(const Failures error)
bool Write(const T &data)
unsigned opt_proxy_groups_current_burned_
double DiffTimeSeconds(struct timeval start, struct timeval end)
unsigned opt_proxy_groups_reset_after_
virtual bool IsCanceled()
void SetUrlOptions(JobInfo *info)
StreamStates DecompressZStream2Sink(const void *buf, const int64_t size, z_stream *strm, cvmfs::Sink *sink)
std::string opt_proxy_fallback_list_
static NormalResolver * Create(const bool ipv4_only, const unsigned retries, const unsigned timeout_ms)
unsigned opt_host_reset_after_
std::string proxy_template_direct_
string ReplaceAll(const string &haystack, const string &needle, const string &replace_by)
string JoinStrings(const vector< string > &strings, const string &joint)
std::string ToString(const bool with_suffix=false) const
unsigned opt_proxy_groups_current_
shash::ContextPtr hash_context
void DecompressInit(z_stream *strm)
unsigned int current_host_chain_index
bool DecompressMem2Mem(const void *buf, const int64_t size, void **out_buf, uint64_t *out_size)
std::set< CURL * > * pool_handles_inuse_
StreamStates DecompressZStream2File(const void *buf, const int64_t size, z_stream *strm, FILE *f)
std::string opt_proxy_list_
const std::string & name() const
perf::Counter * sz_transfer_time
std::vector< std::vector< ProxyInfo > > * opt_proxy_groups_
assert((mem||(size==0))&&"Out Of Memory")
unsigned opt_proxy_groups_fallback_
void ReleaseCurlHandle(CURL *handle)
const std::set< std::string > & ViewBestAddresses(IpPreference preference) const
void SetDnsServer(const std::string &address)
void DecompressFini(z_stream *strm)
void InitSeed(const uint64_t seed)
uint32_t watch_fds_inuse_
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, void *info_link)
void Init(ContextPtr context)
void SetDnsTtlLimits(const unsigned min_seconds, const unsigned max_seconds)
static Failures PrepareDownloadDestination(JobInfo *info)
void Init(const unsigned max_pool_handles, const perf::StatisticsTemplate &statistics)
const char * Code2Ascii(const Failures error)
unsigned char num_retries
std::string AddDefaultScheme(const std::string &proxy)
vector< string > SplitString(const string &str, char delim)
static string EscapeUrl(const string &url)
dns::IpPreference opt_ip_preference_
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, void *info_link)
Failures Fetch(const std::string &base_url, const std::string &repository_name, const uint64_t minimum_timestamp, const shash::Any *base_catalog, signature::SignatureManager *signature_manager, download::DownloadManager *download_manager, ManifestEnsemble *ensemble)
void UpdateProxiesUnlocked(const std::string &reason)
bool IsEquivalent(const Host &other) const
bool IsProxyTransferError(const Failures error)
perf::Counter * n_requests
static int Init(const loader::LoaderExports *loader_exports)
void Final(ContextPtr context, Any *any_digest)
string StringifyInt(const int64_t value)
void SetMaxIpaddrPerProxy(unsigned limit)
const shash::Any * expected_hash
CURL * AcquireCurlHandle()
void Inc(class Counter *counter)
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
const std::string * extra_info
std::string ExtractHost(const std::string &url)
std::vector< int > * opt_host_chain_rtt_
cvmfs::Sink * destination_sink
SslCertificateStore ssl_certificate_store_
uint32_t Partial32() const
unsigned opt_backoff_max_ms_
unsigned GetContextSize(const Algorithms algorithm)
unsigned opt_num_proxies_
CredentialsAttachment * credentials_attachment_
std::vector< std::string > * opt_host_chain_
struct pollfd * watch_fds_
void Update(const unsigned char *buffer, const unsigned buffer_length, ContextPtr context)
uint64_t String2Uint64(const string &value)
UniquePtr< Pipe< kPipeDownloadJobs > > pipe_jobs_
InterruptCue * interrupt_cue
unsigned opt_max_retries_
std::string proxy_template_forced_
const std::string * destination_path
void SetDnsParameters(const unsigned retries, const unsigned timeout_ms)
static Host ExtendDeadline(const Host &original, unsigned seconds_from_now)
struct download::JobInfo::@4 destination_mem
UniquePtr< Pipe< kPipeThreadTerminator > > pipe_terminate_
void SafeSleepMs(const unsigned ms)
bool IsHostTransferError(const Failures error)
void SortTeam(std::vector< T > *tractor, std::vector< U > *towed)
UniquePtr< Pipe< kPipeDownloadJobsResults > > pipe_job_results
Pipe used for the return value.
unsigned char num_used_proxies
unsigned opt_timeout_proxy_
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
void InitializeRequest(JobInfo *info, CURL *handle)
string RewriteUrl(const string &url, const string &ip)
uint32_t Next(const uint64_t boundary)
virtual int64_t Write(const void *buf, uint64_t sz)=0