29 #define __STDC_FORMAT_MACROS
94 std::string pause_file = std::string(
"/var/run/cvmfs/interrupt.") + fqrn;
97 "(id %" PRId64
") Interrupted(): checking for existence of %s",
98 info->
id(), pause_file.c_str());
101 "(id %" PRId64
") Interrupt marker found - "
102 "Interrupting current download, this will EIO outstanding IO.",
104 if (0 != unlink(pause_file.c_str())) {
106 "(id %" PRId64
") Couldn't delete interrupt marker: errno=%d",
120 "(id %" PRId64
") Failed to open path %s: %s (errno=%d).",
121 info->
id(), psink->
path().c_str(), strerror(errno), errno);
126 "Failed to create a valid sink: \n %s",
141 const size_t num_bytes = size * nmemb;
142 const string header_line(static_cast<const char *>(ptr), num_bytes);
149 if (
HasPrefix(header_line,
"HTTP/1.",
false)) {
150 if (header_line.length() < 10) {
155 for (i = 8; (i < header_line.length()) && (header_line[i] ==
' '); ++i) {
159 if (header_line.length() > i + 2) {
160 info->
SetHttpCode(DownloadManager::ParseHttpCode(&header_line[i]));
169 "(id %" PRId64
") redirect support not enabled: %s",
170 info->
id(), header_line.c_str());
175 info->
id(), header_line.c_str());
180 "(id %" PRId64
") http status error code: %s [%d]", info->
id(),
201 &&
HasPrefix(header_line,
"CONTENT-LENGTH:",
true)) {
202 char *tmp =
reinterpret_cast<char *
>(alloca(num_bytes + 1));
204 sscanf(header_line.c_str(),
"%s %" PRIu64, tmp, &length);
209 "resource %s too large to store in memory (%" PRIu64
")",
210 info->
id(), info->
url()->c_str(), length);
218 }
else if (
HasPrefix(header_line,
"LOCATION:",
true)) {
221 header_line.c_str());
222 }
else if (
HasPrefix(header_line,
"LINK:",
true)) {
225 header_line.c_str());
226 std::string link = info->
link();
227 if (link.size() != 0) {
229 link = link +
", " + header_line.substr(5);
231 link = header_line.substr(5);
234 }
else if (
HasPrefix(header_line,
"X-SQUID-ERROR:",
true)) {
239 }
else if (
HasPrefix(header_line,
"PROXY-STATUS:",
true)) {
242 && (header_line.find(
"error=") != string::npos)) {
256 const size_t num_bytes = size * nmemb;
270 shash::Update(reinterpret_cast<unsigned char *>(ptr), num_bytes,
280 "(id %" PRId64
") failed to decompress %s", info->
id(),
281 info->
url()->c_str());
286 "(id %" PRId64
") decompressing %s, local IO error", info->
id(),
287 info->
url()->c_str());
292 int64_t written = info->
sink()->
Write(ptr, num_bytes);
293 if (written < 0 || static_cast<uint64_t>(written) != num_bytes) {
296 "Failed to perform write of %zu bytes to sink %s with errno %ld",
306 static int CallbackCurlDebug(CURL *handle,
312 curl_easy_getinfo(handle, CURLINFO_PRIVATE, &info);
314 std::string prefix =
"(id " +
StringifyInt(info->id()) +
") ";
319 case CURLINFO_HEADER_IN:
320 prefix +=
"{header/recv} ";
322 case CURLINFO_HEADER_OUT:
323 prefix +=
"{header/sent} ";
325 case CURLINFO_DATA_IN:
327 prefix +=
"{data/recv} ";
333 case CURLINFO_DATA_OUT:
335 prefix +=
"{data/sent} ";
341 case CURLINFO_SSL_DATA_IN:
343 prefix +=
"{ssldata/recv} ";
350 case CURLINFO_SSL_DATA_OUT:
352 prefix +=
"{ssldata/sent} ";
364 bool valid_char =
true;
365 std::string msg(data, size);
366 for (
size_t i = 0; i < msg.length(); ++i) {
367 if (msg[i] ==
'\0') {
372 if ((msg[i] <
' ' || msg[i] >
'~')
380 msg =
"<Non-plaintext sequence>";
384 Trim(msg,
true ).c_str());
392 const int DownloadManager::kProbeUnprobed = -1;
393 const int DownloadManager::kProbeDown = -2;
394 const int DownloadManager::kProbeGeo = -3;
396 bool DownloadManager::EscapeUrlChar(
unsigned char input,
char output[3]) {
397 if (((input >=
'0') && (input <=
'9')) || ((input >=
'A') && (input <=
'Z'))
398 || ((input >=
'a') && (input <=
'z')) || (input ==
'/') || (input ==
':')
399 || (input ==
'.') || (input ==
'@') || (input ==
'+') || (input ==
'-')
400 || (input ==
'_') || (input ==
'~') || (input ==
'[') || (input ==
']')
402 output[0] =
static_cast<char>(input);
407 output[1] =
static_cast<char>((input / 16)
408 + ((input / 16 <= 9) ?
'0' :
'A' - 10));
409 output[2] =
static_cast<char>((input % 16)
410 + ((input % 16 <= 9) ?
'0' :
'A' - 10));
419 string DownloadManager::EscapeUrl(
const int64_t jobinfo_id,
const string &url) {
421 escaped.reserve(url.length());
423 char escaped_char[3];
424 for (
unsigned i = 0, s = url.length(); i < s; ++i) {
425 if (EscapeUrlChar(url[i], escaped_char)) {
426 escaped.append(escaped_char, 3);
428 escaped.push_back(escaped_char[0]);
432 jobinfo_id, url.c_str(), escaped.c_str());
441 unsigned DownloadManager::EscapeHeader(
const string &header,
444 unsigned esc_pos = 0;
445 char escaped_char[3];
446 for (
unsigned i = 0, s = header.size(); i < s; ++i) {
447 if (EscapeUrlChar(header[i], escaped_char)) {
448 for (
unsigned j = 0; j < 3; ++j) {
450 if (esc_pos >= buf_size)
452 escaped_buf[esc_pos] = escaped_char[j];
458 if (esc_pos >= buf_size)
460 escaped_buf[esc_pos] = escaped_char[0];
472 int DownloadManager::ParseHttpCode(
const char digits[3]) {
475 for (
int i = 0; i < 3; ++i) {
476 if ((digits[i] <
'0') || (digits[i] >
'9'))
478 result += (digits[i] -
'0') * factor;
488 int DownloadManager::CallbackCurlSocket(CURL * ,
496 if (action == CURL_POLL_NONE)
514 download_mgr->
watch_fds_ =
static_cast<struct pollfd *
>(
526 download_mgr->
watch_fds_[index].events = POLLIN | POLLPRI;
529 download_mgr->
watch_fds_[index].events = POLLOUT | POLLWRBAND;
531 case CURL_POLL_INOUT:
532 download_mgr->
watch_fds_[index].events = POLLIN | POLLPRI | POLLOUT
535 case CURL_POLL_REMOVE:
536 if (index < download_mgr->watch_fds_inuse_ - 1) {
549 download_mgr->
watch_fds_ =
static_cast<struct pollfd *
>(
567 void *DownloadManager::MainDownload(
void *data) {
570 "download I/O thread of DownloadManager '%s' started",
571 download_mgr->
name_.c_str());
573 const int kIdxPipeTerminate = 0;
574 const int kIdxPipeJobs = 1;
576 download_mgr->
watch_fds_ =
static_cast<struct pollfd *
>(
577 smalloc(2 *
sizeof(
struct pollfd)));
581 download_mgr->
watch_fds_[kIdxPipeTerminate].events = POLLIN | POLLPRI;
582 download_mgr->
watch_fds_[kIdxPipeTerminate].revents = 0;
585 download_mgr->
watch_fds_[kIdxPipeJobs].events = POLLIN | POLLPRI;
586 download_mgr->
watch_fds_[kIdxPipeJobs].revents = 0;
589 int still_running = 0;
590 struct timeval timeval_start, timeval_stop;
591 gettimeofday(&timeval_start, NULL);
607 gettimeofday(&timeval_stop, NULL);
608 int64_t delta =
static_cast<int64_t
>(
620 curl_multi_socket_action(
621 download_mgr->
curl_multi_, CURL_SOCKET_TIMEOUT, 0, &still_running);
625 if (download_mgr->
watch_fds_[kIdxPipeTerminate].revents)
629 if (download_mgr->
watch_fds_[kIdxPipeJobs].revents) {
630 download_mgr->
watch_fds_[kIdxPipeJobs].revents = 0;
633 if (!still_running) {
634 gettimeofday(&timeval_start, NULL);
639 curl_multi_add_handle(download_mgr->
curl_multi_, handle);
640 curl_multi_socket_action(
641 download_mgr->
curl_multi_, CURL_SOCKET_TIMEOUT, 0, &still_running);
655 if (download_mgr->
watch_fds_[i].revents & (POLLIN | POLLPRI))
656 ev_bitmask |= CURL_CSELECT_IN;
657 if (download_mgr->
watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
658 ev_bitmask |= CURL_CSELECT_OUT;
660 & (POLLERR | POLLHUP | POLLNVAL)) {
661 ev_bitmask |= CURL_CSELECT_ERR;
665 curl_multi_socket_action(download_mgr->
curl_multi_,
675 while ((curl_msg = curl_multi_info_read(download_mgr->
curl_multi_,
677 if (curl_msg->msg == CURLMSG_DONE) {
680 CURL *easy_handle = curl_msg->easy_handle;
681 int curl_error = curl_msg->data.result;
682 curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
685 curl_easy_getinfo(easy_handle, CURLINFO_REDIRECT_COUNT, &redir_count);
687 "(manager '%s' - id %" PRId64
") "
688 "Number of CURL redirects %" PRId64,
689 download_mgr->
name_.c_str(), info->
id(), redir_count);
691 curl_multi_remove_handle(download_mgr->
curl_multi_, easy_handle);
693 curl_multi_add_handle(download_mgr->
curl_multi_, easy_handle);
694 curl_multi_socket_action(download_mgr->
curl_multi_,
715 curl_multi_remove_handle(download_mgr->
curl_multi_, *i);
716 curl_easy_cleanup(*i);
722 "download I/O thread of DownloadManager '%s' terminated",
723 download_mgr->
name_.c_str());
731 HeaderLists::~HeaderLists() {
732 for (
unsigned i = 0; i < blocks_.size(); ++i) {
739 curl_slist *HeaderLists::GetList(
const char *header) {
return Get(header); }
742 curl_slist *HeaderLists::DuplicateList(curl_slist *slist) {
744 curl_slist *copy = GetList(slist->data);
745 copy->next = slist->next;
746 curl_slist *prev = copy;
749 curl_slist *new_link = Get(slist->data);
750 new_link->next = slist->next;
751 prev->next = new_link;
759 void HeaderLists::AppendHeader(curl_slist *slist,
const char *header) {
761 curl_slist *new_link = Get(header);
762 new_link->next = NULL;
766 slist->next = new_link;
775 void HeaderLists::CutHeader(
const char *header, curl_slist **slist) {
779 curl_slist *prev = &head;
780 curl_slist *rover = *slist;
782 if (strcmp(rover->data, header) == 0) {
783 prev->next = rover->next;
794 void HeaderLists::PutList(curl_slist *slist) {
796 curl_slist *next = slist->next;
803 string HeaderLists::Print(curl_slist *slist) {
806 verbose += string(slist->data) +
"\n";
813 curl_slist *HeaderLists::Get(
const char *header) {
814 for (
unsigned i = 0; i < blocks_.size(); ++i) {
815 for (
unsigned j = 0; j < kBlockSize; ++j) {
816 if (!IsUsed(&(blocks_[i][j]))) {
817 blocks_[i][j].data =
const_cast<char *
>(header);
818 return &(blocks_[i][j]);
825 blocks_[blocks_.size() - 1][0].data =
const_cast<char *
>(header);
826 return &(blocks_[blocks_.size() - 1][0]);
830 void HeaderLists::Put(curl_slist *slist) {
836 void HeaderLists::AddBlock() {
837 curl_slist *new_block =
new curl_slist[kBlockSize];
838 for (
unsigned i = 0; i < kBlockSize; ++i) {
841 blocks_.push_back(new_block);
848 string DownloadManager::ProxyInfo::Print() {
853 int remaining =
static_cast<int>(host.deadline())
854 - static_cast<int>(time(NULL));
855 string expinfo = (remaining >= 0) ?
"+" :
"";
856 if (abs(remaining) >= 3600) {
858 }
else if (abs(remaining) >= 60) {
864 result +=
" (" + host.name() +
", " + expinfo +
")";
866 result +=
" (:unresolved:, " + expinfo +
")";
876 CURL *DownloadManager::AcquireCurlHandle() {
879 if (pool_handles_idle_->empty()) {
881 handle = curl_easy_init();
884 curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
889 handle = *(pool_handles_idle_->begin());
890 pool_handles_idle_->erase(pool_handles_idle_->begin());
893 pool_handles_inuse_->insert(handle);
899 void DownloadManager::ReleaseCurlHandle(CURL *handle) {
900 set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
901 assert(elem != pool_handles_inuse_->end());
903 if (pool_handles_idle_->size() > pool_max_handles_) {
904 curl_easy_cleanup(*elem);
906 pool_handles_idle_->insert(*elem);
909 pool_handles_inuse_->erase(elem);
917 void DownloadManager::InitializeRequest(
JobInfo *info, CURL *handle) {
928 info->
SetHeaders(header_lists_->DuplicateList(default_headers_));
932 if (enable_http_tracing_) {
933 for (
unsigned int i = 0; i < http_tracing_headers_.size(); i++) {
934 header_lists_->AppendHeader(info->
headers(),
935 (http_tracing_headers_)[i].c_str());
943 "(manager '%s' - id %" PRId64
") "
944 "CURL Header for URL: %s is:\n %s",
945 name_.c_str(), info->
id(), info->
url()->c_str(),
946 header_lists_->Print(info->
headers()).c_str());
963 char byte_range_array[100];
964 const int64_t range_lower =
static_cast<int64_t
>(info->
range_offset());
965 const int64_t range_upper =
static_cast<int64_t
>(info->
range_offset()
967 if (snprintf(byte_range_array,
sizeof(byte_range_array),
968 "%" PRId64
"-%" PRId64, range_lower, range_upper)
972 curl_easy_setopt(handle, CURLOPT_RANGE, byte_range_array);
974 curl_easy_setopt(handle, CURLOPT_RANGE, NULL);
978 curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
979 curl_easy_setopt(handle, CURLOPT_WRITEHEADER, static_cast<void *>(info));
980 curl_easy_setopt(handle, CURLOPT_WRITEDATA, static_cast<void *>(info));
981 curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->
headers());
983 curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
985 curl_easy_setopt(handle, CURLOPT_HTTPGET, 1);
987 if (opt_ipv4_only_) {
988 curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
990 if (follow_redirects_) {
991 curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1);
992 curl_easy_setopt(handle, CURLOPT_MAXREDIRS, 4);
995 curl_easy_setopt(handle, CURLOPT_VERBOSE, 1);
996 curl_easy_setopt(handle, CURLOPT_DEBUGFUNCTION, CallbackCurlDebug);
1000 void DownloadManager::CheckHostInfoReset(
const std::string &typ,
1007 if (static_cast<int64_t>(now)
1010 "(manager %s - id %" PRId64
") "
1011 "switching %s from %s to %s (reset %s)",
1012 name_.c_str(), jobinfo->
id(), typ.c_str(),
1026 void DownloadManager::SetUrlOptions(
JobInfo *info) {
1034 if (sharding_policy_.UseCount() > 0) {
1035 if (info->
proxy() !=
"") {
1039 info->
SetProxy(sharding_policy_->GetNextProxy(
1043 curl_easy_setopt(info->
curl_handle(), CURLOPT_PROXY, info->
proxy().c_str());
1046 if (opt_timestamp_backup_proxies_ > 0) {
1048 if (static_cast<int64_t>(now) > static_cast<int64_t>(
1049 opt_timestamp_backup_proxies_ + opt_proxy_groups_reset_after_)) {
1050 opt_proxy_groups_current_ = 0;
1051 opt_timestamp_backup_proxies_ = 0;
1052 RebalanceProxiesUnlocked(
"Reset proxy group from backup to primary");
1056 if (opt_timestamp_failover_proxies_ > 0) {
1059 if (static_cast<int64_t>(now)
1060 > static_cast<int64_t>(opt_timestamp_failover_proxies_
1061 + opt_proxy_groups_reset_after_)) {
1062 RebalanceProxiesUnlocked(
1063 "Reset load-balanced proxies within the active group");
1068 if (!proxy || (proxy->
url ==
"DIRECT")) {
1070 curl_easy_setopt(info->
curl_handle(), CURLOPT_PROXY,
"");
1075 std::string purl = proxy->
url;
1077 const bool changed = ValidateProxyIpsUnlocked(purl, phost);
1084 curl_easy_setopt(info->
curl_handle(), CURLOPT_PROXY,
1085 info->
proxy().c_str());
1088 curl_easy_setopt(info->
curl_handle(), CURLOPT_PROXY,
"0.0.0.0");
1094 CheckHostInfoReset(
"metalink", opt_metalink_, info, now);
1095 CheckHostInfoReset(
"host", opt_metalink_, info, now);
1097 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, opt_low_speed_limit_);
1098 if (info->
proxy() !=
"DIRECT") {
1099 curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_proxy_);
1100 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_proxy_);
1102 curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_direct_);
1103 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_direct_);
1105 if (!opt_dns_server_.empty())
1106 curl_easy_setopt(curl_handle, CURLOPT_DNS_SERVERS, opt_dns_server_.c_str());
1109 if (CheckMetalinkChain(now)) {
1110 url_prefix = (*opt_metalink_.chain)[opt_metalink_.current];
1113 "(manager %s - id %" PRId64
") "
1114 "reading from metalink %d",
1115 name_.c_str(), info->
id(), opt_metalink_.current);
1116 }
else if (opt_host_.chain) {
1117 url_prefix = (*opt_host_.chain)[opt_host_.current];
1120 "(manager %s - id %" PRId64
") "
1121 "reading from host %d",
1122 name_.c_str(), info->
id(), opt_host_.current);
1126 string url = url_prefix + *(info->
url());
1128 curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 1L);
1129 if (url.substr(0, 5) ==
"https") {
1130 bool rvb = ssl_certificate_store_.ApplySslCertificatePath(curl_handle);
1133 "(manager %s - id %" PRId64
") "
1134 "Failed to set SSL certificate path %s",
1135 name_.c_str(), info->
id(),
1136 ssl_certificate_store_.GetCaPath().c_str());
1138 if (info->
pid() != -1) {
1139 if (credentials_attachment_ == NULL) {
1141 "(manager %s - id %" PRId64
") "
1142 "uses secure downloads but no credentials attachment set",
1143 name_.c_str(), info->
id());
1145 bool retval = credentials_attachment_->ConfigureCurlHandle(
1149 "(manager %s - id %" PRId64
") "
1150 "failed attaching credentials",
1151 name_.c_str(), info->
id());
1159 signal(SIGPIPE, SIG_IGN);
1162 if (url.find(
"@proxy@") != string::npos) {
1170 if (proxy_template_forced_ !=
"") {
1171 replacement = proxy_template_forced_;
1172 }
else if (info->
proxy() ==
"DIRECT") {
1173 replacement = proxy_template_direct_;
1175 if (opt_proxy_groups_current_ >= opt_proxy_groups_fallback_) {
1179 curl_easy_setopt(info->
curl_handle(), CURLOPT_PROXY,
"");
1180 replacement = proxy_template_direct_;
1182 replacement = ChooseProxyUnlocked(info->
expected_hash())->host.name();
1185 replacement = (replacement ==
"") ? proxy_template_direct_ : replacement;
1187 "(manager %s - id %" PRId64
") "
1188 "replacing @proxy@ by %s",
1189 name_.c_str(), info->
id(), replacement.c_str());
1190 url =
ReplaceAll(url,
"@proxy@", replacement);
1211 curl_easy_setopt(curl_handle, CURLOPT_URL,
1212 EscapeUrl(info->
id(), url).c_str());
1225 bool DownloadManager::ValidateProxyIpsUnlocked(
const string &url,
1230 name_.c_str(), host.
name().c_str());
1232 unsigned group_idx = opt_proxy_groups_current_;
1235 bool update_only =
true;
1239 "(manager '%s') failed to resolve IP addresses for %s (%d - %s)",
1240 name_.c_str(), host.
name().c_str(), new_host.
status(),
1244 update_only =
false;
1248 for (
unsigned i = 0; i < (*opt_proxy_groups_)[group_idx].size(); ++i) {
1249 if ((*opt_proxy_groups_)[group_idx][i].host.id() == host.
id())
1250 (*opt_proxy_groups_)[group_idx][i].host = new_host;
1259 "(manager '%s') DNS entries for proxy %s changed, adjusting",
1260 name_.c_str(), host.
name().c_str());
1261 vector<ProxyInfo> *group = current_proxy_group();
1262 opt_num_proxies_ -= group->size();
1263 for (
unsigned i = 0; i < group->size();) {
1264 if ((*group)[i].host.id() == host.
id()) {
1265 group->erase(group->begin() + i);
1270 vector<ProxyInfo> new_infos;
1272 set<string>::const_iterator iter_ips = best_addresses.begin();
1273 for (; iter_ips != best_addresses.end(); ++iter_ips) {
1275 new_infos.push_back(
ProxyInfo(new_host, url_ip));
1277 group->insert(group->end(), new_infos.begin(), new_infos.end());
1278 opt_num_proxies_ += new_infos.size();
1280 std::string msg =
"DNS entries for proxy " + host.
name() +
" changed";
1282 RebalanceProxiesUnlocked(msg);
1290 void DownloadManager::UpdateStatistics(CURL *handle) {
1295 retval = curl_easy_getinfo(handle, CURLINFO_SIZE_DOWNLOAD, &val);
1296 assert(retval == CURLE_OK);
1297 sum +=
static_cast<int64_t
>(val);
1301 perf::Xadd(counters_->sz_transferred_bytes, sum);
1308 bool DownloadManager::CanRetry(
const JobInfo *info) {
1310 unsigned max_retries = opt_max_retries_;
1325 unsigned backoff_init_ms = 0;
1326 unsigned backoff_max_ms = 0;
1329 backoff_init_ms = opt_backoff_init_ms_;
1330 backoff_max_ms = opt_backoff_max_ms_;
1345 "(manager '%s' - id %" PRId64
") backing off for %d ms",
1353 header_lists_->AppendHeader(info->
headers(),
"Pragma: no-cache");
1354 header_lists_->AppendHeader(info->
headers(),
"Cache-Control: no-cache");
1364 void DownloadManager::SetRegularCache(
JobInfo *info) {
1367 header_lists_->CutHeader(
"Pragma: no-cache", info->
GetHeadersPtr());
1368 header_lists_->CutHeader(
"Cache-Control: no-cache", info->
GetHeadersPtr());
1377 void DownloadManager::ReleaseCredential(
JobInfo *info) {
1379 assert(credentials_attachment_ != NULL);
1380 credentials_attachment_->ReleaseCurlHandle(info->
curl_handle(),
1388 static bool sortlinks(
const std::string &s1,
const std::string &s2) {
1389 const size_t pos1 = s1.find(
"; pri=");
1390 const size_t pos2 = s2.find(
"; pri=");
1392 if ((pos1 != std::string::npos) && (pos2 != std::string::npos)
1393 && (sscanf(s1.substr(pos1 + 6).c_str(),
"%d", &pri1) == 1)
1394 && (sscanf(s2.substr(pos2 + 6).c_str(),
"%d", &pri2) == 1)) {
1404 void DownloadManager::ProcessLink(
JobInfo *info) {
1406 if (info->
link().find(
"; pri=") != std::string::npos)
1407 std::sort(links.begin(), links.end(),
sortlinks);
1409 std::vector<std::string> host_list;
1411 std::vector<std::string>::const_iterator il = links.begin();
1412 for (; il != links.end(); ++il) {
1413 const std::string &link = *il;
1414 if ((link.find(
"; rel=duplicate") == std::string::npos)
1415 && (link.find(
"; rel=\"duplicate\"") == std::string::npos)) {
1417 "skipping link '%s' because it does not contain rel=duplicate",
1423 size_t start = link.find(
'<');
1424 if (start == std::string::npos) {
1427 "skipping link '%s' because it does not have a left angle bracket",
1433 if ((link.substr(start, 7) !=
"http://")
1434 && (link.substr(start, 8) !=
"https://")) {
1436 "skipping link '%s' of unrecognized url protocol", link.c_str());
1440 size_t end = link.find(
'/', start + 8);
1441 if (end == std::string::npos)
1442 end = link.find(
'>');
1443 if (end == std::string::npos) {
1445 "skipping link '%s' because no slash in url and no right angle "
1450 const std::string host = link.substr(start, end - start);
1452 host_list.push_back(host);
1455 if (host_list.size() > 0) {
1456 SetHostChain(host_list);
1457 opt_metalink_timestamp_link_ = time(NULL);
1468 bool DownloadManager::VerifyAndFinalize(
const int curl_error,
JobInfo *info) {
1470 "(manager '%s' - id %" PRId64
") "
1471 "Verify downloaded url %s, proxy %s (curl error %d)",
1472 name_.c_str(), info->
id(), info->
url()->c_str(),
1473 info->
proxy().c_str(), curl_error);
1479 was_metalink =
true;
1481 if (info->
link() !=
"") {
1486 was_metalink =
false;
1492 switch (curl_error) {
1499 if (ignore_signature_failures_) {
1502 "(manager '%s' - id %" PRId64
") "
1503 "ignoring failed hash verification of %s (expected %s, got %s)",
1504 name_.c_str(), info->
id(), info->
url()->c_str(),
1509 "(manager '%s' - id %" PRId64
") "
1510 "hash verification of %s failed (expected %s, got %s)",
1511 name_.c_str(), info->
id(), info->
url()->c_str(),
1522 case CURLE_UNSUPPORTED_PROTOCOL:
1525 case CURLE_URL_MALFORMAT:
1528 case CURLE_COULDNT_RESOLVE_PROXY:
1531 case CURLE_COULDNT_RESOLVE_HOST:
1534 case CURLE_OPERATION_TIMEDOUT:
1538 case CURLE_PARTIAL_FILE:
1539 case CURLE_GOT_NOTHING:
1540 case CURLE_RECV_ERROR:
1544 case CURLE_FILE_COULDNT_READ_FILE:
1545 case CURLE_COULDNT_CONNECT:
1546 if (info->
proxy() !=
"DIRECT") {
1553 case CURLE_TOO_MANY_REDIRECTS:
1556 case CURLE_SSL_CACERT_BADFILE:
1558 "(manager '%s' -id %" PRId64
") "
1559 "Failed to load certificate bundle. "
1560 "X509_CERT_BUNDLE might point to the wrong location.",
1561 name_.c_str(), info->
id());
1566 case CURLE_PEER_FAILED_VERIFICATION:
1568 "(manager '%s' - id %" PRId64
") "
1569 "invalid SSL certificate of remote host. "
1570 "X509_CERT_DIR and/or X509_CERT_BUNDLE might point to the wrong "
1572 name_.c_str(), info->
id());
1575 case CURLE_ABORTED_BY_CALLBACK:
1576 case CURLE_WRITE_ERROR:
1579 case CURLE_SEND_ERROR:
1584 : kFailProxyShortTransfer);
1588 "(manager '%s' - id %" PRId64
") "
1589 "unexpected curl error (%d) while trying to fetch %s",
1590 name_.c_str(), info->
id(), curl_error, info->
url()->c_str());
1595 std::vector<std::string> *host_chain;
1596 unsigned char num_used_hosts;
1598 host_chain = opt_metalink_.chain;
1601 host_chain = opt_host_.chain;
1606 bool try_again =
false;
1607 bool same_url_retry = CanRetry(info);
1616 "(manager '%s' - id %" PRId64
") "
1617 "data corruption with no-cache header, try another %s",
1618 name_.c_str(), info->
id(), typ.c_str());
1628 && (num_used_hosts < host_chain->size()))) {
1635 if (sharding_policy_.UseCount() > 0) {
1637 same_url_retry =
false;
1644 && (num_used_hosts < host_chain->size())) {
1646 if (opt_proxy_groups_) {
1647 if ((opt_proxy_groups_current_ > 0)
1648 || (opt_proxy_groups_current_burned_ > 0)) {
1649 opt_proxy_groups_current_ = 0;
1650 opt_timestamp_backup_proxies_ = 0;
1651 const std::string msg =
"reset proxies for " + typ
1653 RebalanceProxiesUnlocked(msg);
1659 "(manager '%s' - id %" PRId64
") make it a %s failure",
1660 name_.c_str(), info->
id(), typ.c_str());
1664 if (failover_indefinitely_) {
1668 "(manager '%s' - id %" PRId64
") "
1669 "VerifyAndFinalize() would fail the download here. "
1670 "Instead switch proxy and retry download. "
1672 "info->probe_hosts=%d host_chain=%p num_used_hosts=%d "
1673 "host_chain->size()=%lu same_url_retry=%d "
1674 "info->num_used_proxies=%d opt_num_proxies_=%d",
1675 name_.c_str(), info->
id(), typ.c_str(),
1676 static_cast<int>(info->
probe_hosts()), host_chain,
1677 num_used_hosts, host_chain ? host_chain->size() : -1,
1678 static_cast<int>(same_url_retry),
1681 RebalanceProxiesUnlocked(
1682 "download failed - failover indefinitely");
1695 "(manager '%s' - id %" PRId64
") "
1696 "Trying again on same curl handle, same url: %d, "
1697 "error code %d no-cache %d",
1698 name_.c_str(), info->
id(), same_url_retry, info->
error_code(),
1703 goto verify_and_finalize_stop;
1707 goto verify_and_finalize_stop;
1717 if (sharding_policy_.UseCount() > 0) {
1718 ReleaseCredential(info);
1719 SetUrlOptions(info);
1721 SetRegularCache(info);
1724 bool switch_proxy =
false;
1725 bool switch_host =
false;
1732 switch_proxy =
true;
1741 if (same_url_retry) {
1744 switch_proxy =
true;
1747 if (same_url_retry) {
1758 ReleaseCredential(info);
1761 SetUrlOptions(info);
1764 ReleaseCredential(info);
1766 SwitchMetalink(info);
1772 SetUrlOptions(info);
1776 if (failover_indefinitely_) {
1784 verify_and_finalize_stop:
1786 ReleaseCredential(info);
1795 header_lists_->PutList(info->
headers());
1802 DownloadManager::~DownloadManager() {
1804 if (sharding_policy_.UseCount() > 0) {
1805 sharding_policy_.Reset();
1807 if (health_check_.UseCount() > 0) {
1808 if (health_check_.Unique()) {
1810 "(manager '%s') Stopping healthcheck thread", name_.c_str());
1811 health_check_->StopHealthcheck();
1813 health_check_.Reset();
1816 if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1819 pthread_join(thread_download_, NULL);
1821 pipe_terminate_.Destroy();
1822 pipe_jobs_.Destroy();
1825 for (set<CURL *>::iterator i = pool_handles_idle_->begin(),
1826 iEnd = pool_handles_idle_->end();
1829 curl_easy_cleanup(*i);
1832 delete pool_handles_idle_;
1833 delete pool_handles_inuse_;
1834 curl_multi_cleanup(curl_multi_);
1836 delete header_lists_;
1841 delete opt_host_.chain;
1842 delete opt_host_chain_rtt_;
1843 delete opt_proxy_groups_;
1845 curl_global_cleanup();
1849 pthread_mutex_destroy(lock_options_);
1850 pthread_mutex_destroy(lock_synchronous_mode_);
1851 free(lock_options_);
1852 free(lock_synchronous_mode_);
1855 void DownloadManager::InitHeaders() {
1857 string cernvm_id =
"User-Agent: cvmfs ";
1858 #ifdef CVMFS_LIBCVMFS
1859 cernvm_id +=
"libcvmfs ";
1861 cernvm_id +=
"Fuse ";
1863 cernvm_id += string(CVMFS_VERSION);
1864 if (getenv(
"CERNVM_UUID") != NULL) {
1867 .
Filter(getenv(
"CERNVM_UUID"));
1869 user_agent_ = strdup(cernvm_id.c_str());
1873 default_headers_ = header_lists_->GetList(
"Connection: Keep-Alive");
1874 header_lists_->AppendHeader(default_headers_,
"Pragma:");
1875 header_lists_->AppendHeader(default_headers_, user_agent_);
1878 DownloadManager::DownloadManager(
const unsigned max_pool_handles,
1880 const std::string &name)
1882 , pool_handles_idle_(new set<CURL *>)
1883 , pool_handles_inuse_(new set<CURL *>)
1884 , pool_max_handles_(max_pool_handles)
1885 , pipe_terminate_(NULL)
1888 , watch_fds_size_(0)
1889 , watch_fds_inuse_(0)
1890 , watch_fds_max_(4 * max_pool_handles)
1891 , opt_timeout_proxy_(5)
1892 , opt_timeout_direct_(10)
1893 , opt_low_speed_limit_(1024)
1894 , opt_max_retries_(0)
1895 , opt_backoff_init_ms_(0)
1896 , opt_backoff_max_ms_(0)
1897 , enable_info_header_(false)
1898 , opt_ipv4_only_(false)
1899 , follow_redirects_(false)
1900 , ignore_signature_failures_(false)
1901 , enable_http_tracing_(false)
1902 , opt_metalink_(NULL, 0, 0, 0)
1903 , opt_metalink_timestamp_link_(0)
1904 , opt_host_(NULL, 0, 0, 0)
1905 , opt_host_chain_rtt_(NULL)
1906 , opt_proxy_groups_(NULL)
1907 , opt_proxy_groups_current_(0)
1908 , opt_proxy_groups_current_burned_(0)
1909 , opt_proxy_groups_fallback_(0)
1910 , opt_num_proxies_(0)
1911 , opt_proxy_shard_(false)
1912 , failover_indefinitely_(false)
1915 , opt_timestamp_backup_proxies_(0)
1916 , opt_timestamp_failover_proxies_(0)
1917 , opt_proxy_groups_reset_after_(0)
1918 , credentials_attachment_(NULL)
1919 , counters_(new
Counters(statistics)) {
1923 smalloc(
sizeof(pthread_mutex_t)));
1927 smalloc(
sizeof(pthread_mutex_t)));
1931 retval = curl_global_init(CURL_GLOBAL_ALL);
1932 assert(retval == CURLE_OK);
1939 curl_multi_setopt(
curl_multi_, CURLMOPT_SOCKETDATA,
1940 static_cast<void *>(
this));
1942 curl_multi_setopt(
curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1948 if ((getenv(
"CVMFS_IPV4_ONLY") != NULL)
1949 && (strlen(getenv(
"CVMFS_IPV4_ONLY")) > 0)) {
1966 static_cast<void *>(
this));
1973 "(manager '%s') Starting healthcheck thread",
name_.c_str());
2004 const char *header_name =
"cvmfs-info: ";
2005 const size_t header_name_len = strlen(header_name);
2006 const unsigned header_size = 1 + header_name_len
2008 info->
SetInfoHeader(static_cast<char *>(alloca(header_size)));
2009 memcpy(info->
info_header(), header_name, header_name_len);
2011 header_size - header_name_len);
2016 const std::string str_pid =
"X-CVMFS-PID: " +
StringifyInt(info->
pid());
2062 retval = curl_easy_perform(handle);
2065 if (curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &elapsed)
2068 static_cast<int64_t>(elapsed * 1000));
2077 "(manager '%s' - id %" PRId64
") "
2078 "download failed (error %d - %s)",
2081 if (info->
sink() != NULL) {
2109 if (!address.empty()) {
2114 vector<string> servers;
2115 servers.push_back(address);
2120 name_.c_str(), address.c_str());
2128 const unsigned timeout_ms) {
2142 const unsigned max_seconds) {
2161 const unsigned seconds_direct) {
2183 unsigned *seconds_direct) {
2200 const std::vector<std::string> &metalink_list) {
2206 if (metalink_list.empty()) {
2220 unsigned *current_metalink) {
2223 if (current_metalink) {
2226 if (metalink_chain) {
2249 if (host_list.empty()) {
2268 unsigned *current_host) {
2301 const unsigned group_size = group->size();
2302 unsigned failed = 0;
2304 if (info && (info->
proxy() == (*group)[i].url)) {
2306 opt_proxy_groups_current_burned_++;
2308 (*group)[group_size - opt_proxy_groups_current_burned_]);
2320 if (opt_proxy_groups_current_burned_ == group->size()) {
2321 opt_proxy_groups_current_burned_ = 0;
2350 "(manager '%s' - id %" PRId64
") "
2351 "%lu proxies remain in group",
2367 if (!info.
chain || (info.
chain->size() == 1)) {
2373 if (typ ==
"host") {
2378 if (lastused != info.
current) {
2380 "(manager '%s' - id %" PRId64
")"
2382 "last used %s: %s, current %s: %s",
2383 name_.c_str(), jobinfo->
id(), typ.c_str(), typ.c_str(),
2384 (*info.
chain)[lastused].c_str(), typ.c_str(),
2390 string reason =
"manually triggered";
2391 string info_id =
"(manager " +
name_;
2398 const std::string old_host = (*info.
chain)[info.
current];
2400 if (typ ==
"host") {
2406 "%s switching %s from %s to %s (%s)", info_id.c_str(), typ.c_str(),
2438 || (static_cast<int64_t>((now == 0) ? time(NULL) : now)
2451 vector<string> host_chain;
2452 vector<int> host_rtt;
2453 unsigned current_host;
2455 GetHostInfo(&host_chain, &host_rtt, ¤t_host);
2462 JobInfo info(&url,
false,
false, NULL, &memsink);
2463 for (retries = 0; retries < 2; ++
retries) {
2464 for (i = 0; i < host_chain.size(); ++i) {
2465 url = host_chain[i] +
"/.cvmfspublished";
2467 struct timeval tv_start, tv_end;
2468 gettimeofday(&tv_start, NULL);
2470 gettimeofday(&tv_end, NULL);
2476 "(manager '%s' - id %" PRId64
") "
2477 "probing host %s had %dms rtt",
2478 name_.c_str(), info.
id(), url.c_str(), host_rtt[i]);
2481 "(manager '%s' - id %" PRId64
") "
2482 "error while probing host %s: %d %s",
2483 name_.c_str(), info.
id(), url.c_str(), result,
2485 host_rtt[i] = INT_MAX;
2491 for (i = 0; i < host_chain.size(); ++i) {
2492 if (host_rtt[i] == INT_MAX)
2505 std::vector<uint64_t> *output_order) {
2509 if (servers->size() == 1) {
2511 output_order->clear();
2512 output_order->push_back(0);
2517 std::vector<std::string> host_chain;
2520 std::vector<std::string> server_dns_names;
2521 server_dns_names.reserve(servers->size());
2522 for (
unsigned i = 0; i < servers->size(); ++i) {
2524 server_dns_names.push_back(host.empty() ? (*servers)[i] : host);
2526 std::string host_list =
JoinStrings(server_dns_names,
",");
2528 vector<string> host_chain_shuffled;
2536 bool success =
false;
2537 unsigned max_attempts = std::min(host_chain_shuffled.size(), size_t(3));
2538 vector<uint64_t> geo_order(servers->size());
2539 for (
unsigned i = 0; i < max_attempts; ++i) {
2540 string url = host_chain_shuffled[i] +
"/api/v1.0/geo/@proxy@/" + host_list;
2542 "(manager '%s') requesting ordered server list from %s",
2543 name_.c_str(), url.c_str());
2545 JobInfo info(&url,
false,
false, NULL, &memsink);
2548 string order(reinterpret_cast<char *>(memsink.data()), memsink.pos());
2553 "(manager '%s') retrieved invalid GeoAPI reply from %s [%s]",
2554 name_.c_str(), url.c_str(), order.c_str());
2558 "geographic order of servers retrieved from %s",
2563 Trim(order,
true ).c_str());
2569 "(manager '%s') GeoAPI request for %s failed with error %d [%s]",
2576 "failed to retrieve geographic order from stratum 1 servers",
2582 output_order->swap(geo_order);
2584 std::vector<std::string> sorted_servers;
2585 sorted_servers.reserve(geo_order.size());
2586 for (
unsigned i = 0; i < geo_order.size(); ++i) {
2587 uint64_t orderval = geo_order[i];
2588 sorted_servers.push_back((*servers)[orderval]);
2590 servers->swap(sorted_servers);
2604 vector<string> host_chain;
2605 vector<int> host_rtt;
2606 unsigned current_host;
2607 vector<vector<ProxyInfo> > proxy_chain;
2608 unsigned fallback_group;
2610 GetHostInfo(&host_chain, &host_rtt, ¤t_host);
2612 if ((host_chain.size() < 2) && ((proxy_chain.size() - fallback_group) < 2))
2615 vector<string> host_names;
2616 for (
unsigned i = 0; i < host_chain.size(); ++i)
2618 SortTeam(&host_names, &host_chain);
2619 unsigned last_geo_host = host_names.size();
2621 if ((fallback_group == 0) && (last_geo_host > 1)) {
2627 host_names.push_back(
"+PXYSEP+");
2631 unsigned first_geo_fallback = host_names.size();
2632 for (
unsigned i = fallback_group; i < proxy_chain.size(); ++i) {
2635 host_names.push_back(proxy_chain[i][0].host.name());
2638 std::vector<uint64_t> geo_order;
2653 vector<vector<ProxyInfo> > *proxy_groups =
new vector<vector<ProxyInfo> >(
2657 (*proxy_groups)[i] = (*opt_proxy_groups_)[i];
2667 for (
unsigned i = 0; i < geo_order.size(); ++i) {
2668 uint64_t orderval = geo_order[i];
2669 if (orderval < static_cast<uint64_t>(last_geo_host)) {
2673 }
else if (orderval >= static_cast<uint64_t>(first_geo_fallback)) {
2677 (*proxy_groups)[proxyi] = proxy_chain[fallback_group + orderval
2678 - first_geo_fallback];
2716 const unsigned expected_size,
2717 vector<uint64_t> *reply_vals) {
2718 if (reply_order.empty())
2721 if (!sanitizer.
IsValid(reply_order))
2726 vector<uint64_t> tmp_vals;
2727 for (
unsigned i = 0; i < reply_strings.size(); ++i) {
2728 if (reply_strings[i].empty())
2732 if (tmp_vals.size() != expected_size)
2736 set<uint64_t> coverage(tmp_vals.begin(), tmp_vals.end());
2737 if (coverage.size() != tmp_vals.size())
2739 if ((*coverage.begin() != 1) || (*coverage.rbegin() != coverage.size()))
2742 for (
unsigned i = 0; i < expected_size; ++i) {
2743 (*reply_vals)[i] = tmp_vals[i] - 1;
2754 string *cleaned_list) {
2756 if (proxy_list ==
"") {
2760 bool result =
false;
2762 vector<string> proxy_groups =
SplitString(proxy_list,
';');
2763 vector<string> cleaned_groups;
2764 for (
unsigned i = 0; i < proxy_groups.size(); ++i) {
2765 vector<string> group =
SplitString(proxy_groups[i],
'|');
2766 vector<string> cleaned;
2767 for (
unsigned j = 0; j < group.size(); ++j) {
2768 if ((group[j] ==
"DIRECT") || (group[j] ==
"")) {
2771 cleaned.push_back(group[j]);
2774 if (!cleaned.empty())
2775 cleaned_groups.push_back(
JoinStrings(cleaned,
"|"));
2792 const string &fallback_proxy_list,
2800 bool contains_direct;
2808 &set_proxy_fallback_list);
2809 if (contains_direct) {
2811 "(manager '%s') fallback proxies do not support DIRECT, removing",
2814 if (set_proxy_fallback_list ==
"") {
2818 if (contains_direct) {
2820 "(manager '%s') skipping DIRECT proxy to use fallback proxy",
2830 if ((set_proxy_list ==
"") && (set_proxy_fallback_list ==
"")) {
2841 if (set_proxy_list !=
"") {
2846 "first fallback proxy group %u",
2851 string all_proxy_list = set_proxy_list;
2852 if (set_proxy_fallback_list !=
"") {
2853 if (all_proxy_list !=
"")
2854 all_proxy_list +=
";";
2855 all_proxy_list += set_proxy_fallback_list;
2858 name_.c_str(), all_proxy_list.c_str());
2861 vector<string> hostnames;
2862 vector<string> proxy_groups;
2863 if (all_proxy_list !=
"")
2865 for (
unsigned i = 0; i < proxy_groups.size(); ++i) {
2866 vector<string> this_group =
SplitString(proxy_groups[i],
'|');
2867 for (
unsigned j = 0; j < this_group.size(); ++j) {
2873 hostnames.push_back(hostname);
2876 vector<dns::Host> hosts;
2879 "resolving %lu proxy addresses",
2880 name_.c_str(), hostnames.size());
2887 unsigned num_proxy = 0;
2888 for (
unsigned i = 0; i < proxy_groups.size(); ++i) {
2889 vector<string> this_group =
SplitString(proxy_groups[i],
'|');
2893 vector<ProxyInfo> infos;
2894 for (
unsigned j = 0; j < this_group.size(); ++j, ++num_proxy) {
2896 if (this_group[j] ==
"DIRECT") {
2904 "failed to resolve IP addresses for %s (%d - %s)",
2905 name_.c_str(), hosts[num_proxy].name().c_str(),
2906 hosts[num_proxy].status(),
2910 infos.push_back(
ProxyInfo(failed_host, this_group[j]));
2917 set<string>::const_iterator iter_ips = best_addresses.begin();
2918 for (; iter_ips != best_addresses.end(); ++iter_ips) {
2920 infos.push_back(
ProxyInfo(hosts[num_proxy], url_ip));
2928 opt_num_proxies_ += infos.size();
2931 "(manager '%s') installed %u proxies in %lu load-balance groups",
2951 unsigned *current_group,
2952 unsigned *fallback_group) {
2953 assert(proxy_chain != NULL);
2958 vector<vector<ProxyInfo> > empty_chain;
2959 *proxy_chain = empty_chain;
2960 if (current_group != NULL)
2962 if (fallback_group != NULL)
2963 *fallback_group = 0;
2968 if (current_group != NULL)
2970 if (fallback_group != NULL)
2988 uint32_t key = (hash ? hash->
Partial32() : 0);
2989 map<uint32_t, ProxyInfo *>::iterator it =
opt_proxy_map_.lower_bound(key);
3010 const uint32_t max_key = 0xffffffffUL;
3013 for (
unsigned i = 0; i < num_alive; ++i) {
3020 const std::pair<uint32_t, ProxyInfo *> entry(prng.
Next(max_key), proxy);
3023 std::string proxy_name = proxy->
host.
name().empty()
3030 const std::pair<uint32_t, ProxyInfo *> last_entry(max_key, first_proxy);
3034 unsigned select =
prng_.
Next(num_alive);
3036 const std::pair<uint32_t, ProxyInfo *> entry(max_key, proxy);
3038 std::string proxy_name = proxy->
host.
name().empty()
3047 const string curr_host =
"Current host: "
3051 if (new_proxy != old_proxy) {
3053 "(manager '%s') switching proxy from %s to %s. Reason: %s [%s]",
3054 name_.c_str(), (old_proxy.empty() ?
"(none)" : old_proxy.c_str()),
3055 (new_proxy.empty() ?
"(none)" : new_proxy.c_str()), reason.c_str(),
3102 std::string msg =
"switch to proxy group "
3135 const unsigned backoff_init_ms,
3136 const unsigned backoff_max_ms) {
3151 const std::string &forced) {
3178 bool success =
false;
3184 "Proposed sharding policy does not exist. Falling back to default",
3200 const std::string &cloned_name) {
unsigned opt_timeout_direct_
std::vector< std::string > http_tracing_headers_
bool StripDirect(const std::string &proxy_list, std::string *cleaned_list)
unsigned opt_low_speed_limit_
void HashString(const std::string &content, Any *any_digest)
static const unsigned kDnsDefaultTimeoutMs
bool ignore_signature_failures_
std::vector< T > Shuffle(const std::vector< T > &input, Prng *prng)
unsigned throttle() const
z_stream * GetZstreamPtr()
unsigned opt_backoff_init_ms_
void SetInfoHeader(char *info_header)
bool enable_http_tracing_
shash::ContextPtr * GetHashContextPtr()
int64_t Xadd(class Counter *counter, const int64_t delta)
const char * Code2Ascii(const Failures error)
bool Write(const T &data)
unsigned opt_proxy_groups_current_burned_
double DiffTimeSeconds(struct timeval start, struct timeval end)
unsigned opt_proxy_groups_reset_after_
virtual bool IsCanceled()
void SetUrlOptions(JobInfo *info)
SharedPtr< ShardingPolicy > sharding_policy_
StreamStates DecompressZStream2Sink(const void *buf, const int64_t size, z_stream *strm, cvmfs::Sink *sink)
void ResolveMany(const std::vector< std::string > &names, std::vector< Host > *hosts)
std::string opt_proxy_fallback_list_
void SetHostChain(const std::string &host_list)
bool CheckMetalinkChain(const time_t now)
static NormalResolver * Create(const bool ipv4_only, const unsigned retries, const unsigned timeout_ms)
void SetMetalinkChain(const std::string &metalink_list)
void SetLowSpeedLimit(const unsigned low_speed_limit)
DownloadManager(const unsigned max_pool_handles, const perf::StatisticsTemplate &statistics, const std::string &name="standard")
std::string proxy_template_direct_
std::vector< std::string > opt_proxies_
curl_slist ** GetHeadersPtr()
static const int kProbeGeo
string Trim(const string &raw, bool trim_newline)
string ReplaceAll(const string &haystack, const string &needle, const string &replace_by)
void set_min_ttl(unsigned seconds)
string JoinStrings(const vector< string > &strings, const string &joint)
std::string ToString(const bool with_suffix=false) const
unsigned opt_proxy_groups_current_
void SetCurrentHostChainIndex(int current_host_chain_index)
virtual bool RequiresReserve()=0
bool ValidateGeoReply(const std::string &reply_order, const unsigned expected_size, std::vector< uint64_t > *reply_vals)
std::vector< ProxyInfo > * current_proxy_group() const
void DecompressInit(z_stream *strm)
const std::string * url() const
time_t opt_timestamp_backup_proxies_
void SetProxyChain(const std::string &proxy_list, const std::string &fallback_proxy_list, const ProxySetModes set_mode)
std::string GetProxyList()
bool allow_failure() const
void GetMetalinkInfo(std::vector< std::string > *metalink_chain, unsigned *current_metalink)
std::set< CURL * > * pool_handles_inuse_
CURL * curl_handle() const
pthread_mutex_t * lock_options_
ProxyInfo * ChooseProxyUnlocked(const shash::Any *hash)
pthread_t thread_download_
std::string opt_proxy_list_
const std::string & name() const
perf::Counter * sz_transfer_time
void SetTracingHeaderGid(char *tracing_header_gid)
assert((mem||(size==0))&&"Out Of Memory")
void SetNocache(bool nocache)
unsigned opt_proxy_groups_fallback_
void ReleaseCurlHandle(CURL *handle)
static bool sortlinks(const std::string &s1, const std::string &s2)
void SetTracingHeaderPid(char *tracing_header_pid)
void set_max_ttl(unsigned seconds)
Tube< DataTubeElement > * GetDataTubePtr()
char * tracing_header_gid() const
const std::set< std::string > & ViewBestAddresses(IpPreference preference) const
char * info_header() const
void SetDnsServer(const std::string &address)
DownloadManager * Clone(const perf::StatisticsTemplate &statistics, const std::string &cloned_name)
bool force_nocache() const
std::string StringifyUint(const uint64_t value)
void DecompressFini(z_stream *strm)
virtual std::string Describe()=0
static void * MainDownload(void *data)
void InitSeed(const uint64_t seed)
void SetTimeout(const unsigned seconds_proxy, const unsigned seconds_direct)
void SetLink(const std::string &link)
std::string opt_dns_server_
uint32_t watch_fds_inuse_
void SwitchHostInfo(const std::string &typ, HostInfo &info, JobInfo *jobinfo)
off_t range_offset() const
unsigned char num_used_hosts() const
bool follow_redirects() const
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, void *info_link)
void Init(ContextPtr context)
void SetNumUsedMetalinks(unsigned char num_used_metalinks)
bool FileExists(const std::string &path)
Pipe< kPipeDownloadJobsResults > * GetPipeJobResultPtr()
void SetDnsTtlLimits(const unsigned min_seconds, const unsigned max_seconds)
static Failures PrepareDownloadDestination(JobInfo *info)
perf::Counter * n_metalink_failover
void SetHttpCode(int http_code)
const char * Code2Ascii(const Failures error)
uint32_t pool_max_handles_
bool head_request() const
unsigned char num_retries() const
std::vector< std::string > * chain
bool IsValidPipeJobResults()
std::vector< std::vector< ProxyInfo > > * opt_proxy_groups_
void GetProxyInfo(std::vector< std::vector< ProxyInfo > > *proxy_chain, unsigned *current_group, unsigned *fallback_group)
void SetProxyGroupResetDelay(const unsigned seconds)
void SetCurrentMetalinkChainIndex(int current_metalink_chain_index)
atomic_int32 multi_threaded_
std::string AddDefaultScheme(const std::string &proxy)
vector< string > SplitString(const string &str, char delim)
cvmfs::Sink * sink() const
dns::NormalResolver * resolver_
bool Interrupted(const std::string &fqrn, JobInfo *info)
std::string proxy() const
dns::IpPreference opt_ip_preference_
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, void *info_link)
void UseSystemCertificatePath()
bool SetShardingPolicy(const ShardingPolicySelector type)
perf::Counter * n_host_failover
void UpdateProxiesUnlocked(const std::string &reason)
time_t opt_metalink_timestamp_link_
bool IsEquivalent(const Host &other) const
bool IsProxyTransferError(const Failures error)
void SetNumRetries(unsigned char num_retries)
void set_throttle(const unsigned throttle)
InterruptCue * interrupt_cue() const
void SetIpPreference(const dns::IpPreference preference)
bool failover_indefinitely_
shash::ContextPtr hash_context() const
perf::Counter * n_requests
void Final(ContextPtr context, Any *any_digest)
void SetRetryParameters(const unsigned max_retries, const unsigned backoff_init_ms, const unsigned backoff_max_ms)
string StringifyInt(const int64_t value)
char * tracing_header_uid() const
Failures error_code() const
void CloneProxyConfig(DownloadManager *clone)
void SetMaxIpaddrPerProxy(unsigned limit)
void EnableIgnoreSignatureFailures()
CURL * AcquireCurlHandle()
void Inc(class Counter *counter)
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
std::string ExtractHost(const std::string &url)
std::vector< int > * opt_host_chain_rtt_
SslCertificateStore ssl_certificate_store_
std::string GetFallbackProxyList()
uint32_t Partial32() const
void SetProxyTemplates(const std::string &direct, const std::string &forced)
unsigned opt_backoff_max_ms_
void SetErrorCode(Failures error_code)
void SetNumUsedProxies(unsigned char num_used_proxies)
unsigned GetContextSize(const Algorithms algorithm)
std::string GetDnsServer() const
unsigned opt_num_proxies_
int current_metalink_chain_index() const
CredentialsAttachment * credentials_attachment_
struct pollfd * watch_fds_
void Update(const unsigned char *buffer, const unsigned buffer_length, ContextPtr context)
std::map< uint32_t, ProxyInfo * > opt_proxy_map_
uint64_t String2Uint64(const string &value)
UniquePtr< Pipe< kPipeDownloadJobs > > pipe_jobs_
void UseSystemCertificatePath()
void CreatePipeJobResults()
Failures Fetch(JobInfo *info)
unsigned char num_used_metalinks() const
void SetCurlHandle(CURL *curl_handle)
unsigned opt_max_retries_
void SetTracingHeaderUid(char *tracing_header_uid)
curl_slist * headers() const
const shash::Any * expected_hash() const
perf::Counter * n_proxy_failover
void GetTimeout(unsigned *seconds_proxy, unsigned *seconds_direct)
void SetCredData(void *cred_data)
void SetFailoverIndefinitely()
std::string proxy_template_forced_
time_t opt_timestamp_failover_proxies_
void SetDnsParameters(const unsigned retries, const unsigned timeout_ms)
unsigned EscapeHeader(const std::string &header, char *escaped_buf, size_t buf_size)
const std::string * extra_info() const
static Host ExtendDeadline(const Host &original, unsigned seconds_from_now)
UniquePtr< Pipe< kPipeThreadTerminator > > pipe_terminate_
void SetProxy(const std::string &proxy)
static const int kProbeUnprobed
unsigned backoff_ms() const
unsigned char num_used_proxies() const
int current_host_chain_index() const
void SafeSleepMs(const unsigned ms)
void SetMetalinkResetDelay(const unsigned seconds)
bool IsHostTransferError(const Failures error)
static const unsigned kDnsDefaultRetries
void SortTeam(std::vector< T > *tractor, std::vector< U > *towed)
bool GeoSortServers(std::vector< std::string > *servers, std::vector< uint64_t > *output_order=NULL)
virtual bool Reserve(size_t size)=0
static const int kProbeDown
void SetNumUsedHosts(unsigned char num_used_hosts)
void SetHeaders(curl_slist *headers)
static const unsigned kProxyMapScale
void GetHostInfo(std::vector< std::string > *host_chain, std::vector< int > *rtt, unsigned *current_host)
unsigned opt_timeout_proxy_
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
char * tracing_header_pid() const
void SetBackoffMs(unsigned backoff_ms)
SharedPtr< HealthCheck > health_check_
void SwitchProxy(JobInfo *info)
void AddHTTPTracingHeader(const std::string &header)
void SetCredentialsAttachment(CredentialsAttachment *ca)
void SetFollowRedirects(bool follow_redirects)
void RebalanceProxiesUnlocked(const std::string &reason)
pthread_mutex_t * lock_synchronous_mode_
virtual bool SetResolvers(const std::vector< std::string > &resolvers)
void InitializeRequest(JobInfo *info, CURL *handle)
static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, void *userp, void *socketp)
string RewriteUrl(const string &url, const string &ip)
void SetHostResetDelay(const unsigned seconds)
uint32_t Next(const uint64_t boundary)
virtual int64_t Write(const void *buf, uint64_t sz)=0
unsigned timeout_ms() const
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)