29 #define __STDC_FORMAT_MACROS
94 const std::string pause_file =
95 std::string(
"/var/run/cvmfs/interrupt.") + fqrn;
98 "(id %" PRId64
") Interrupted(): checking for existence of %s",
99 info->
id(), pause_file.c_str());
102 "(id %" PRId64
") Interrupt marker found - "
103 "Interrupting current download, this will EIO outstanding IO.",
105 if (0 != unlink(pause_file.c_str())) {
107 "(id %" PRId64
") Couldn't delete interrupt marker: errno=%d",
121 "(id %" PRId64
") Failed to open path %s: %s (errno=%d).",
122 info->
id(), psink->
path().c_str(), strerror(errno), errno);
127 "Failed to create a valid sink: \n %s",
142 const size_t num_bytes = size * nmemb;
143 const string header_line(static_cast<const char *>(ptr), num_bytes);
150 if (
HasPrefix(header_line,
"HTTP/1.",
false)) {
151 if (header_line.length() < 10) {
156 for (i = 8; (i < header_line.length()) && (header_line[i] ==
' '); ++i) {
160 if (header_line.length() > i + 2) {
161 info->
SetHttpCode(DownloadManager::ParseHttpCode(&header_line[i]));
170 "(id %" PRId64
") redirect support not enabled: %s",
171 info->
id(), header_line.c_str());
176 info->
id(), header_line.c_str());
181 "(id %" PRId64
") http status error code: %s [%d]", info->
id(),
202 &&
HasPrefix(header_line,
"CONTENT-LENGTH:",
true)) {
203 char *tmp =
reinterpret_cast<char *
>(alloca(num_bytes + 1));
205 sscanf(header_line.c_str(),
"%s %" PRIu64, tmp, &length);
210 "resource %s too large to store in memory (%" PRIu64
")",
211 info->
id(), info->
url()->c_str(), length);
219 }
else if (
HasPrefix(header_line,
"LOCATION:",
true)) {
222 header_line.c_str());
223 }
else if (
HasPrefix(header_line,
"LINK:",
true)) {
226 header_line.c_str());
227 std::string link = info->
link();
228 if (link.size() != 0) {
230 link = link +
", " + header_line.substr(5);
232 link = header_line.substr(5);
235 }
else if (
HasPrefix(header_line,
"X-SQUID-ERROR:",
true)) {
240 }
else if (
HasPrefix(header_line,
"PROXY-STATUS:",
true)) {
243 && (header_line.find(
"error=") != string::npos)) {
257 const size_t num_bytes = size * nmemb;
271 shash::Update(reinterpret_cast<unsigned char *>(ptr), num_bytes,
281 "(id %" PRId64
") failed to decompress %s", info->
id(),
282 info->
url()->c_str());
287 "(id %" PRId64
") decompressing %s, local IO error", info->
id(),
288 info->
url()->c_str());
293 const int64_t written = info->
sink()->
Write(ptr, num_bytes);
294 if (written < 0 || static_cast<uint64_t>(written) != num_bytes) {
297 "Failed to perform write of %zu bytes to sink %s with errno %ld",
307 static int CallbackCurlDebug(CURL *handle,
313 curl_easy_getinfo(handle, CURLINFO_PRIVATE, &info);
315 std::string prefix =
"(id " +
StringifyInt(info->id()) +
") ";
320 case CURLINFO_HEADER_IN:
321 prefix +=
"{header/recv} ";
323 case CURLINFO_HEADER_OUT:
324 prefix +=
"{header/sent} ";
326 case CURLINFO_DATA_IN:
328 prefix +=
"{data/recv} ";
334 case CURLINFO_DATA_OUT:
336 prefix +=
"{data/sent} ";
342 case CURLINFO_SSL_DATA_IN:
344 prefix +=
"{ssldata/recv} ";
351 case CURLINFO_SSL_DATA_OUT:
353 prefix +=
"{ssldata/sent} ";
365 bool valid_char =
true;
366 std::string msg(data, size);
367 for (
size_t i = 0; i < msg.length(); ++i) {
368 if (msg[i] ==
'\0') {
373 if ((msg[i] <
' ' || msg[i] >
'~')
381 msg =
"<Non-plaintext sequence>";
385 Trim(msg,
true ).c_str());
393 const int DownloadManager::kProbeUnprobed = -1;
394 const int DownloadManager::kProbeDown = -2;
395 const int DownloadManager::kProbeGeo = -3;
397 bool DownloadManager::EscapeUrlChar(
unsigned char input,
char output[3]) {
398 if (((input >=
'0') && (input <=
'9')) || ((input >=
'A') && (input <=
'Z'))
399 || ((input >=
'a') && (input <=
'z')) || (input ==
'/') || (input ==
':')
400 || (input ==
'.') || (input ==
'@') || (input ==
'+') || (input ==
'-')
401 || (input ==
'_') || (input ==
'~') || (input ==
'[') || (input ==
']')
403 output[0] =
static_cast<char>(input);
408 output[1] =
static_cast<char>((input / 16)
409 + ((input / 16 <= 9) ?
'0' :
'A' - 10));
410 output[2] =
static_cast<char>((input % 16)
411 + ((input % 16 <= 9) ?
'0' :
'A' - 10));
420 string DownloadManager::EscapeUrl(
const int64_t jobinfo_id,
const string &url) {
422 escaped.reserve(url.length());
424 char escaped_char[3];
425 for (
unsigned i = 0, s = url.length(); i < s; ++i) {
426 if (EscapeUrlChar(url[i], escaped_char)) {
427 escaped.append(escaped_char, 3);
429 escaped.push_back(escaped_char[0]);
433 jobinfo_id, url.c_str(), escaped.c_str());
442 unsigned DownloadManager::EscapeHeader(
const string &header,
445 unsigned esc_pos = 0;
446 char escaped_char[3];
447 for (
unsigned i = 0, s = header.size(); i < s; ++i) {
448 if (EscapeUrlChar(header[i], escaped_char)) {
449 for (
unsigned j = 0; j < 3; ++j) {
451 if (esc_pos >= buf_size)
453 escaped_buf[esc_pos] = escaped_char[j];
459 if (esc_pos >= buf_size)
461 escaped_buf[esc_pos] = escaped_char[0];
473 int DownloadManager::ParseHttpCode(
const char digits[3]) {
476 for (
int i = 0; i < 3; ++i) {
477 if ((digits[i] <
'0') || (digits[i] >
'9'))
479 result += (digits[i] -
'0') * factor;
489 int DownloadManager::CallbackCurlSocket(CURL * ,
497 if (action == CURL_POLL_NONE)
515 download_mgr->
watch_fds_ =
static_cast<struct pollfd *
>(
527 download_mgr->
watch_fds_[index].events = POLLIN | POLLPRI;
530 download_mgr->
watch_fds_[index].events = POLLOUT | POLLWRBAND;
532 case CURL_POLL_INOUT:
533 download_mgr->
watch_fds_[index].events = POLLIN | POLLPRI | POLLOUT
536 case CURL_POLL_REMOVE:
537 if (index < download_mgr->watch_fds_inuse_ - 1) {
550 download_mgr->
watch_fds_ =
static_cast<struct pollfd *
>(
568 void *DownloadManager::MainDownload(
void *data) {
571 "download I/O thread of DownloadManager '%s' started",
572 download_mgr->
name_.c_str());
574 const int kIdxPipeTerminate = 0;
575 const int kIdxPipeJobs = 1;
577 download_mgr->
watch_fds_ =
static_cast<struct pollfd *
>(
578 smalloc(2 *
sizeof(
struct pollfd)));
582 download_mgr->
watch_fds_[kIdxPipeTerminate].events = POLLIN | POLLPRI;
583 download_mgr->
watch_fds_[kIdxPipeTerminate].revents = 0;
586 download_mgr->
watch_fds_[kIdxPipeJobs].events = POLLIN | POLLPRI;
587 download_mgr->
watch_fds_[kIdxPipeJobs].revents = 0;
590 int still_running = 0;
591 struct timeval timeval_start, timeval_stop;
592 gettimeofday(&timeval_start, NULL);
608 gettimeofday(&timeval_stop, NULL);
609 const int64_t delta =
static_cast<int64_t
>(
621 curl_multi_socket_action(
622 download_mgr->
curl_multi_, CURL_SOCKET_TIMEOUT, 0, &still_running);
626 if (download_mgr->
watch_fds_[kIdxPipeTerminate].revents)
630 if (download_mgr->
watch_fds_[kIdxPipeJobs].revents) {
631 download_mgr->
watch_fds_[kIdxPipeJobs].revents = 0;
634 if (!still_running) {
635 gettimeofday(&timeval_start, NULL);
640 curl_multi_add_handle(download_mgr->
curl_multi_, handle);
641 curl_multi_socket_action(
642 download_mgr->
curl_multi_, CURL_SOCKET_TIMEOUT, 0, &still_running);
656 if (download_mgr->
watch_fds_[i].revents & (POLLIN | POLLPRI))
657 ev_bitmask |= CURL_CSELECT_IN;
658 if (download_mgr->
watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
659 ev_bitmask |= CURL_CSELECT_OUT;
661 & (POLLERR | POLLHUP | POLLNVAL)) {
662 ev_bitmask |= CURL_CSELECT_ERR;
666 curl_multi_socket_action(download_mgr->
curl_multi_,
676 while ((curl_msg = curl_multi_info_read(download_mgr->
curl_multi_,
678 if (curl_msg->msg == CURLMSG_DONE) {
681 CURL *easy_handle = curl_msg->easy_handle;
682 const int curl_error = curl_msg->data.result;
683 curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
686 curl_easy_getinfo(easy_handle, CURLINFO_REDIRECT_COUNT, &redir_count);
688 "(manager '%s' - id %" PRId64
") "
689 "Number of CURL redirects %" PRId64,
690 download_mgr->
name_.c_str(), info->
id(), redir_count);
692 curl_multi_remove_handle(download_mgr->
curl_multi_, easy_handle);
694 curl_multi_add_handle(download_mgr->
curl_multi_, easy_handle);
695 curl_multi_socket_action(download_mgr->
curl_multi_,
716 curl_multi_remove_handle(download_mgr->
curl_multi_, *i);
717 curl_easy_cleanup(*i);
723 "download I/O thread of DownloadManager '%s' terminated",
724 download_mgr->
name_.c_str());
732 HeaderLists::~HeaderLists() {
733 for (
unsigned i = 0; i < blocks_.size(); ++i) {
740 curl_slist *HeaderLists::GetList(
const char *header) {
return Get(header); }
743 curl_slist *HeaderLists::DuplicateList(curl_slist *slist) {
745 curl_slist *copy = GetList(slist->data);
746 copy->next = slist->next;
747 curl_slist *prev = copy;
750 curl_slist *new_link = Get(slist->data);
751 new_link->next = slist->next;
752 prev->next = new_link;
760 void HeaderLists::AppendHeader(curl_slist *slist,
const char *header) {
762 curl_slist *new_link = Get(header);
763 new_link->next = NULL;
767 slist->next = new_link;
776 void HeaderLists::CutHeader(
const char *header, curl_slist **slist) {
780 curl_slist *prev = &head;
781 curl_slist *rover = *slist;
783 if (strcmp(rover->data, header) == 0) {
784 prev->next = rover->next;
795 void HeaderLists::PutList(curl_slist *slist) {
797 curl_slist *next = slist->next;
804 string HeaderLists::Print(curl_slist *slist) {
807 verbose += string(slist->data) +
"\n";
814 curl_slist *HeaderLists::Get(
const char *header) {
815 for (
unsigned i = 0; i < blocks_.size(); ++i) {
816 for (
unsigned j = 0; j < kBlockSize; ++j) {
817 if (!IsUsed(&(blocks_[i][j]))) {
818 blocks_[i][j].data =
const_cast<char *
>(header);
819 return &(blocks_[i][j]);
826 blocks_[blocks_.size() - 1][0].data =
const_cast<char *
>(header);
827 return &(blocks_[blocks_.size() - 1][0]);
831 void HeaderLists::Put(curl_slist *slist) {
837 void HeaderLists::AddBlock() {
838 curl_slist *new_block =
new curl_slist[kBlockSize];
839 for (
unsigned i = 0; i < kBlockSize; ++i) {
842 blocks_.push_back(new_block);
849 string DownloadManager::ProxyInfo::Print() {
854 const int remaining =
855 static_cast<int>(host.deadline()) - static_cast<int>(time(NULL));
856 string expinfo = (remaining >= 0) ?
"+" :
"";
857 if (abs(remaining) >= 3600) {
859 }
else if (abs(remaining) >= 60) {
865 result +=
" (" + host.name() +
", " + expinfo +
")";
867 result +=
" (:unresolved:, " + expinfo +
")";
877 CURL *DownloadManager::AcquireCurlHandle() {
880 if (pool_handles_idle_->empty()) {
882 handle = curl_easy_init();
885 curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
890 handle = *(pool_handles_idle_->begin());
891 pool_handles_idle_->erase(pool_handles_idle_->begin());
894 pool_handles_inuse_->insert(handle);
900 void DownloadManager::ReleaseCurlHandle(CURL *handle) {
901 const set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
902 assert(elem != pool_handles_inuse_->end());
904 if (pool_handles_idle_->size() > pool_max_handles_) {
905 curl_easy_cleanup(*elem);
907 pool_handles_idle_->insert(*elem);
910 pool_handles_inuse_->erase(elem);
918 void DownloadManager::InitializeRequest(
JobInfo *info, CURL *handle) {
929 info->
SetHeaders(header_lists_->DuplicateList(default_headers_));
933 if (enable_http_tracing_) {
934 for (
unsigned int i = 0; i < http_tracing_headers_.size(); i++) {
935 header_lists_->AppendHeader(info->
headers(),
936 (http_tracing_headers_)[i].c_str());
944 "(manager '%s' - id %" PRId64
") "
945 "CURL Header for URL: %s is:\n %s",
946 name_.c_str(), info->
id(), info->
url()->c_str(),
947 header_lists_->Print(info->
headers()).c_str());
964 char byte_range_array[100];
965 const int64_t range_lower =
static_cast<int64_t
>(info->
range_offset());
966 const int64_t range_upper =
static_cast<int64_t
>(info->
range_offset()
968 if (snprintf(byte_range_array,
sizeof(byte_range_array),
969 "%" PRId64
"-%" PRId64, range_lower, range_upper)
973 curl_easy_setopt(handle, CURLOPT_RANGE, byte_range_array);
975 curl_easy_setopt(handle, CURLOPT_RANGE, NULL);
979 curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
980 curl_easy_setopt(handle, CURLOPT_WRITEHEADER, static_cast<void *>(info));
981 curl_easy_setopt(handle, CURLOPT_WRITEDATA, static_cast<void *>(info));
982 curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->
headers());
984 curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
986 curl_easy_setopt(handle, CURLOPT_HTTPGET, 1);
988 if (opt_ipv4_only_) {
989 curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
991 if (follow_redirects_) {
992 curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1);
993 curl_easy_setopt(handle, CURLOPT_MAXREDIRS, 4);
996 curl_easy_setopt(handle, CURLOPT_VERBOSE, 1);
997 curl_easy_setopt(handle, CURLOPT_DEBUGFUNCTION, CallbackCurlDebug);
1001 void DownloadManager::CheckHostInfoReset(
const std::string &typ,
1008 if (static_cast<int64_t>(now)
1011 "(manager %s - id %" PRId64
") "
1012 "switching %s from %s to %s (reset %s)",
1013 name_.c_str(), jobinfo->
id(), typ.c_str(),
1027 void DownloadManager::SetUrlOptions(
JobInfo *info) {
1035 if (sharding_policy_.UseCount() > 0) {
1036 if (info->
proxy() !=
"") {
1040 info->
SetProxy(sharding_policy_->GetNextProxy(
1044 curl_easy_setopt(info->
curl_handle(), CURLOPT_PROXY, info->
proxy().c_str());
1047 if (opt_timestamp_backup_proxies_ > 0) {
1049 if (static_cast<int64_t>(now) > static_cast<int64_t>(
1050 opt_timestamp_backup_proxies_ + opt_proxy_groups_reset_after_)) {
1051 opt_proxy_groups_current_ = 0;
1052 opt_timestamp_backup_proxies_ = 0;
1053 RebalanceProxiesUnlocked(
"Reset proxy group from backup to primary");
1057 if (opt_timestamp_failover_proxies_ > 0) {
1060 if (static_cast<int64_t>(now)
1061 > static_cast<int64_t>(opt_timestamp_failover_proxies_
1062 + opt_proxy_groups_reset_after_)) {
1063 RebalanceProxiesUnlocked(
1064 "Reset load-balanced proxies within the active group");
1069 if (!proxy || (proxy->
url ==
"DIRECT")) {
1071 curl_easy_setopt(info->
curl_handle(), CURLOPT_PROXY,
"");
1076 const std::string purl = proxy->
url;
1078 const bool changed = ValidateProxyIpsUnlocked(purl, phost);
1085 curl_easy_setopt(info->
curl_handle(), CURLOPT_PROXY,
1086 info->
proxy().c_str());
1089 curl_easy_setopt(info->
curl_handle(), CURLOPT_PROXY,
"0.0.0.0");
1095 CheckHostInfoReset(
"metalink", opt_metalink_, info, now);
1096 CheckHostInfoReset(
"host", opt_metalink_, info, now);
1098 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, opt_low_speed_limit_);
1099 if (info->
proxy() !=
"DIRECT") {
1100 curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_proxy_);
1101 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_proxy_);
1103 curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_direct_);
1104 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_direct_);
1106 if (!opt_dns_server_.empty())
1107 curl_easy_setopt(curl_handle, CURLOPT_DNS_SERVERS, opt_dns_server_.c_str());
1110 if (CheckMetalinkChain(now)) {
1111 url_prefix = (*opt_metalink_.chain)[opt_metalink_.current];
1114 "(manager %s - id %" PRId64
") "
1115 "reading from metalink %d",
1116 name_.c_str(), info->
id(), opt_metalink_.current);
1117 }
else if (opt_host_.chain) {
1118 url_prefix = (*opt_host_.chain)[opt_host_.current];
1121 "(manager %s - id %" PRId64
") "
1122 "reading from host %d",
1123 name_.c_str(), info->
id(), opt_host_.current);
1127 string url = url_prefix + *(info->
url());
1129 curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 1L);
1130 if (url.substr(0, 5) ==
"https") {
1132 ssl_certificate_store_.ApplySslCertificatePath(curl_handle);
1135 "(manager %s - id %" PRId64
") "
1136 "Failed to set SSL certificate path %s",
1137 name_.c_str(), info->
id(),
1138 ssl_certificate_store_.GetCaPath().c_str());
1140 if (info->
pid() != -1) {
1141 if (credentials_attachment_ == NULL) {
1143 "(manager %s - id %" PRId64
") "
1144 "uses secure downloads but no credentials attachment set",
1145 name_.c_str(), info->
id());
1147 const bool retval = credentials_attachment_->ConfigureCurlHandle(
1151 "(manager %s - id %" PRId64
") "
1152 "failed attaching credentials",
1153 name_.c_str(), info->
id());
1161 signal(SIGPIPE, SIG_IGN);
1164 if (url.find(
"@proxy@") != string::npos) {
1172 if (proxy_template_forced_ !=
"") {
1173 replacement = proxy_template_forced_;
1174 }
else if (info->
proxy() ==
"DIRECT") {
1175 replacement = proxy_template_direct_;
1177 if (opt_proxy_groups_current_ >= opt_proxy_groups_fallback_) {
1181 curl_easy_setopt(info->
curl_handle(), CURLOPT_PROXY,
"");
1182 replacement = proxy_template_direct_;
1184 replacement = ChooseProxyUnlocked(info->
expected_hash())->host.name();
1187 replacement = (replacement ==
"") ? proxy_template_direct_ : replacement;
1189 "(manager %s - id %" PRId64
") "
1190 "replacing @proxy@ by %s",
1191 name_.c_str(), info->
id(), replacement.c_str());
1192 url =
ReplaceAll(url,
"@proxy@", replacement);
1213 curl_easy_setopt(curl_handle, CURLOPT_URL,
1214 EscapeUrl(info->
id(), url).c_str());
1227 bool DownloadManager::ValidateProxyIpsUnlocked(
const string &url,
1232 name_.c_str(), host.
name().c_str());
1234 const unsigned group_idx = opt_proxy_groups_current_;
1237 bool update_only =
true;
1241 "(manager '%s') failed to resolve IP addresses for %s (%d - %s)",
1242 name_.c_str(), host.
name().c_str(), new_host.
status(),
1246 update_only =
false;
1250 for (
unsigned i = 0; i < (*opt_proxy_groups_)[group_idx].size(); ++i) {
1251 if ((*opt_proxy_groups_)[group_idx][i].host.id() == host.
id())
1252 (*opt_proxy_groups_)[group_idx][i].host = new_host;
1261 "(manager '%s') DNS entries for proxy %s changed, adjusting",
1262 name_.c_str(), host.
name().c_str());
1263 vector<ProxyInfo> *group = current_proxy_group();
1264 opt_num_proxies_ -= group->size();
1265 for (
unsigned i = 0; i < group->size();) {
1266 if ((*group)[i].host.id() == host.
id()) {
1267 group->erase(group->begin() + i);
1272 vector<ProxyInfo> new_infos;
1274 set<string>::const_iterator iter_ips = best_addresses.begin();
1275 for (; iter_ips != best_addresses.end(); ++iter_ips) {
1277 new_infos.push_back(
ProxyInfo(new_host, url_ip));
1279 group->insert(group->end(), new_infos.begin(), new_infos.end());
1280 opt_num_proxies_ += new_infos.size();
1282 const std::string msg =
"DNS entries for proxy " + host.
name() +
" changed";
1284 RebalanceProxiesUnlocked(msg);
1292 void DownloadManager::UpdateStatistics(CURL *handle) {
1297 retval = curl_easy_getinfo(handle, CURLINFO_SIZE_DOWNLOAD, &val);
1298 assert(retval == CURLE_OK);
1299 sum +=
static_cast<int64_t
>(val);
1303 perf::Xadd(counters_->sz_transferred_bytes, sum);
1310 bool DownloadManager::CanRetry(
const JobInfo *info) {
1312 const unsigned max_retries = opt_max_retries_;
1327 unsigned backoff_init_ms = 0;
1328 unsigned backoff_max_ms = 0;
1331 backoff_init_ms = opt_backoff_init_ms_;
1332 backoff_max_ms = opt_backoff_max_ms_;
1347 "(manager '%s' - id %" PRId64
") backing off for %d ms",
1355 header_lists_->AppendHeader(info->
headers(),
"Pragma: no-cache");
1356 header_lists_->AppendHeader(info->
headers(),
"Cache-Control: no-cache");
1366 void DownloadManager::SetRegularCache(
JobInfo *info) {
1369 header_lists_->CutHeader(
"Pragma: no-cache", info->
GetHeadersPtr());
1370 header_lists_->CutHeader(
"Cache-Control: no-cache", info->
GetHeadersPtr());
1379 void DownloadManager::ReleaseCredential(
JobInfo *info) {
1381 assert(credentials_attachment_ != NULL);
1382 credentials_attachment_->ReleaseCurlHandle(info->
curl_handle(),
1390 static bool sortlinks(
const std::string &s1,
const std::string &s2) {
1391 const size_t pos1 = s1.find(
"; pri=");
1392 const size_t pos2 = s2.find(
"; pri=");
1394 if ((pos1 != std::string::npos) && (pos2 != std::string::npos)
1395 && (sscanf(s1.substr(pos1 + 6).c_str(),
"%d", &pri1) == 1)
1396 && (sscanf(s2.substr(pos2 + 6).c_str(),
"%d", &pri2) == 1)) {
1406 void DownloadManager::ProcessLink(
JobInfo *info) {
1408 if (info->
link().find(
"; pri=") != std::string::npos)
1409 std::sort(links.begin(), links.end(),
sortlinks);
1411 std::vector<std::string> host_list;
1413 std::vector<std::string>::const_iterator il = links.begin();
1414 for (; il != links.end(); ++il) {
1415 const std::string &link = *il;
1416 if ((link.find(
"; rel=duplicate") == std::string::npos)
1417 && (link.find(
"; rel=\"duplicate\"") == std::string::npos)) {
1419 "skipping link '%s' because it does not contain rel=duplicate",
1425 size_t start = link.find(
'<');
1426 if (start == std::string::npos) {
1429 "skipping link '%s' because it does not have a left angle bracket",
1435 if ((link.substr(start, 7) !=
"http://")
1436 && (link.substr(start, 8) !=
"https://")) {
1438 "skipping link '%s' of unrecognized url protocol", link.c_str());
1442 size_t end = link.find(
'/', start + 8);
1443 if (end == std::string::npos)
1444 end = link.find(
'>');
1445 if (end == std::string::npos) {
1447 "skipping link '%s' because no slash in url and no right angle "
1452 const std::string host = link.substr(start, end - start);
1454 host_list.push_back(host);
1457 if (host_list.size() > 0) {
1458 SetHostChain(host_list);
1459 opt_metalink_timestamp_link_ = time(NULL);
1470 bool DownloadManager::VerifyAndFinalize(
const int curl_error,
JobInfo *info) {
1472 "(manager '%s' - id %" PRId64
") "
1473 "Verify downloaded url %s, proxy %s (curl error %d)",
1474 name_.c_str(), info->
id(), info->
url()->c_str(),
1475 info->
proxy().c_str(), curl_error);
1481 was_metalink =
true;
1483 if (info->
link() !=
"") {
1488 was_metalink =
false;
1494 switch (curl_error) {
1501 if (ignore_signature_failures_) {
1504 "(manager '%s' - id %" PRId64
") "
1505 "ignoring failed hash verification of %s (expected %s, got %s)",
1506 name_.c_str(), info->
id(), info->
url()->c_str(),
1511 "(manager '%s' - id %" PRId64
") "
1512 "hash verification of %s failed (expected %s, got %s)",
1513 name_.c_str(), info->
id(), info->
url()->c_str(),
1524 case CURLE_UNSUPPORTED_PROTOCOL:
1527 case CURLE_URL_MALFORMAT:
1530 case CURLE_COULDNT_RESOLVE_PROXY:
1533 case CURLE_COULDNT_RESOLVE_HOST:
1536 case CURLE_OPERATION_TIMEDOUT:
1540 case CURLE_PARTIAL_FILE:
1541 case CURLE_GOT_NOTHING:
1542 case CURLE_RECV_ERROR:
1546 case CURLE_FILE_COULDNT_READ_FILE:
1547 case CURLE_COULDNT_CONNECT:
1548 if (info->
proxy() !=
"DIRECT") {
1555 case CURLE_TOO_MANY_REDIRECTS:
1558 case CURLE_SSL_CACERT_BADFILE:
1560 "(manager '%s' -id %" PRId64
") "
1561 "Failed to load certificate bundle. "
1562 "X509_CERT_BUNDLE might point to the wrong location.",
1563 name_.c_str(), info->
id());
1568 case CURLE_PEER_FAILED_VERIFICATION:
1570 "(manager '%s' - id %" PRId64
") "
1571 "invalid SSL certificate of remote host. "
1572 "X509_CERT_DIR and/or X509_CERT_BUNDLE might point to the wrong "
1574 name_.c_str(), info->
id());
1577 case CURLE_ABORTED_BY_CALLBACK:
1578 case CURLE_WRITE_ERROR:
1581 case CURLE_SEND_ERROR:
1586 : kFailProxyShortTransfer);
1590 "(manager '%s' - id %" PRId64
") "
1591 "unexpected curl error (%d) while trying to fetch %s",
1592 name_.c_str(), info->
id(), curl_error, info->
url()->c_str());
1597 std::vector<std::string> *host_chain;
1598 unsigned char num_used_hosts;
1600 host_chain = opt_metalink_.chain;
1603 host_chain = opt_host_.chain;
1608 bool try_again =
false;
1609 bool same_url_retry = CanRetry(info);
1618 "(manager '%s' - id %" PRId64
") "
1619 "data corruption with no-cache header, try another %s",
1620 name_.c_str(), info->
id(), typ.c_str());
1630 && (num_used_hosts < host_chain->size()))) {
1637 if (sharding_policy_.UseCount() > 0) {
1639 same_url_retry =
false;
1646 && (num_used_hosts < host_chain->size())) {
1648 if (opt_proxy_groups_) {
1649 if ((opt_proxy_groups_current_ > 0)
1650 || (opt_proxy_groups_current_burned_ > 0)) {
1651 opt_proxy_groups_current_ = 0;
1652 opt_timestamp_backup_proxies_ = 0;
1653 const std::string msg =
"reset proxies for " + typ
1655 RebalanceProxiesUnlocked(msg);
1661 "(manager '%s' - id %" PRId64
") make it a %s failure",
1662 name_.c_str(), info->
id(), typ.c_str());
1666 if (failover_indefinitely_) {
1670 "(manager '%s' - id %" PRId64
") "
1671 "VerifyAndFinalize() would fail the download here. "
1672 "Instead switch proxy and retry download. "
1674 "info->probe_hosts=%d host_chain=%p num_used_hosts=%d "
1675 "host_chain->size()=%lu same_url_retry=%d "
1676 "info->num_used_proxies=%d opt_num_proxies_=%d",
1677 name_.c_str(), info->
id(), typ.c_str(),
1678 static_cast<int>(info->
probe_hosts()), host_chain,
1679 num_used_hosts, host_chain ? host_chain->size() : -1,
1680 static_cast<int>(same_url_retry),
1683 RebalanceProxiesUnlocked(
1684 "download failed - failover indefinitely");
1697 "(manager '%s' - id %" PRId64
") "
1698 "Trying again on same curl handle, same url: %d, "
1699 "error code %d no-cache %d",
1700 name_.c_str(), info->
id(), same_url_retry, info->
error_code(),
1705 goto verify_and_finalize_stop;
1709 goto verify_and_finalize_stop;
1719 if (sharding_policy_.UseCount() > 0) {
1720 ReleaseCredential(info);
1721 SetUrlOptions(info);
1723 SetRegularCache(info);
1726 bool switch_proxy =
false;
1727 bool switch_host =
false;
1734 switch_proxy =
true;
1743 if (same_url_retry) {
1746 switch_proxy =
true;
1749 if (same_url_retry) {
1760 ReleaseCredential(info);
1763 SetUrlOptions(info);
1766 ReleaseCredential(info);
1768 SwitchMetalink(info);
1774 SetUrlOptions(info);
1778 if (failover_indefinitely_) {
1786 verify_and_finalize_stop:
1788 ReleaseCredential(info);
1797 header_lists_->PutList(info->
headers());
1804 DownloadManager::~DownloadManager() {
1806 if (sharding_policy_.UseCount() > 0) {
1807 sharding_policy_.Reset();
1809 if (health_check_.UseCount() > 0) {
1810 if (health_check_.Unique()) {
1812 "(manager '%s') Stopping healthcheck thread", name_.c_str());
1813 health_check_->StopHealthcheck();
1815 health_check_.Reset();
1818 if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1821 pthread_join(thread_download_, NULL);
1823 pipe_terminate_.Destroy();
1824 pipe_jobs_.Destroy();
1827 for (set<CURL *>::iterator i = pool_handles_idle_->begin(),
1828 iEnd = pool_handles_idle_->end();
1831 curl_easy_cleanup(*i);
1834 delete pool_handles_idle_;
1835 delete pool_handles_inuse_;
1836 curl_multi_cleanup(curl_multi_);
1838 delete header_lists_;
1843 delete opt_host_.chain;
1844 delete opt_host_chain_rtt_;
1845 delete opt_proxy_groups_;
1847 curl_global_cleanup();
1851 pthread_mutex_destroy(lock_options_);
1852 pthread_mutex_destroy(lock_synchronous_mode_);
1853 free(lock_options_);
1854 free(lock_synchronous_mode_);
1857 void DownloadManager::InitHeaders() {
1859 string cernvm_id =
"User-Agent: cvmfs ";
1860 #ifdef CVMFS_LIBCVMFS
1861 cernvm_id +=
"libcvmfs ";
1863 cernvm_id +=
"Fuse ";
1865 cernvm_id += string(CVMFS_VERSION);
1866 if (getenv(
"CERNVM_UUID") != NULL) {
1869 .
Filter(getenv(
"CERNVM_UUID"));
1871 user_agent_ = strdup(cernvm_id.c_str());
1875 default_headers_ = header_lists_->GetList(
"Connection: Keep-Alive");
1876 header_lists_->AppendHeader(default_headers_,
"Pragma:");
1877 header_lists_->AppendHeader(default_headers_, user_agent_);
1880 DownloadManager::DownloadManager(
const unsigned max_pool_handles,
1882 const std::string &name)
1884 , pool_handles_idle_(new set<CURL *>)
1885 , pool_handles_inuse_(new set<CURL *>)
1886 , pool_max_handles_(max_pool_handles)
1887 , pipe_terminate_(NULL)
1890 , watch_fds_size_(0)
1891 , watch_fds_inuse_(0)
1892 , watch_fds_max_(4 * max_pool_handles)
1893 , opt_timeout_proxy_(5)
1894 , opt_timeout_direct_(10)
1895 , opt_low_speed_limit_(1024)
1896 , opt_max_retries_(0)
1897 , opt_backoff_init_ms_(0)
1898 , opt_backoff_max_ms_(0)
1899 , enable_info_header_(false)
1900 , opt_ipv4_only_(false)
1901 , follow_redirects_(false)
1902 , ignore_signature_failures_(false)
1903 , enable_http_tracing_(false)
1904 , opt_metalink_(NULL, 0, 0, 0)
1905 , opt_metalink_timestamp_link_(0)
1906 , opt_host_(NULL, 0, 0, 0)
1907 , opt_host_chain_rtt_(NULL)
1908 , opt_proxy_groups_(NULL)
1909 , opt_proxy_groups_current_(0)
1910 , opt_proxy_groups_current_burned_(0)
1911 , opt_proxy_groups_fallback_(0)
1912 , opt_num_proxies_(0)
1913 , opt_proxy_shard_(false)
1914 , failover_indefinitely_(false)
1917 , opt_timestamp_backup_proxies_(0)
1918 , opt_timestamp_failover_proxies_(0)
1919 , opt_proxy_groups_reset_after_(0)
1920 , credentials_attachment_(NULL)
1921 , counters_(new
Counters(statistics)) {
1925 smalloc(
sizeof(pthread_mutex_t)));
1929 smalloc(
sizeof(pthread_mutex_t)));
1933 retval = curl_global_init(CURL_GLOBAL_ALL);
1934 assert(retval == CURLE_OK);
1941 curl_multi_setopt(
curl_multi_, CURLMOPT_SOCKETDATA,
1942 static_cast<void *>(
this));
1944 curl_multi_setopt(
curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1950 if ((getenv(
"CVMFS_IPV4_ONLY") != NULL)
1951 && (strlen(getenv(
"CVMFS_IPV4_ONLY")) > 0)) {
1968 static_cast<void *>(
this));
1975 "(manager '%s') Starting healthcheck thread",
name_.c_str());
2006 const char *header_name =
"cvmfs-info: ";
2007 const size_t header_name_len = strlen(header_name);
2008 const unsigned header_size = 1 + header_name_len
2010 info->
SetInfoHeader(static_cast<char *>(alloca(header_size)));
2011 memcpy(info->
info_header(), header_name, header_name_len);
2013 header_size - header_name_len);
2018 const std::string str_pid =
"X-CVMFS-PID: " +
StringifyInt(info->
pid());
2064 retval = curl_easy_perform(handle);
2067 if (curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &elapsed)
2070 static_cast<int64_t>(elapsed * 1000));
2079 "(manager '%s' - id %" PRId64
") "
2080 "download failed (error %d - %s)",
2083 if (info->
sink() != NULL) {
2111 if (!address.empty()) {
2116 vector<string> servers;
2117 servers.push_back(address);
2122 name_.c_str(), address.c_str());
2130 const unsigned timeout_ms) {
2144 const unsigned max_seconds) {
2163 const unsigned seconds_direct) {
2185 unsigned *seconds_direct) {
2202 const std::vector<std::string> &metalink_list) {
2208 if (metalink_list.empty()) {
2222 unsigned *current_metalink) {
2225 if (current_metalink) {
2228 if (metalink_chain) {
2251 if (host_list.empty()) {
2270 unsigned *current_host) {
2303 const unsigned group_size = group->size();
2304 unsigned failed = 0;
2306 if (info && (info->
proxy() == (*group)[i].url)) {
2308 opt_proxy_groups_current_burned_++;
2310 (*group)[group_size - opt_proxy_groups_current_burned_]);
2322 if (opt_proxy_groups_current_burned_ == group->size()) {
2323 opt_proxy_groups_current_burned_ = 0;
2352 "(manager '%s' - id %" PRId64
") "
2353 "%lu proxies remain in group",
2369 if (!info.
chain || (info.
chain->size() == 1)) {
2375 if (typ ==
"host") {
2380 if (lastused != info.
current) {
2382 "(manager '%s' - id %" PRId64
")"
2384 "last used %s: %s, current %s: %s",
2385 name_.c_str(), jobinfo->
id(), typ.c_str(), typ.c_str(),
2386 (*info.
chain)[lastused].c_str(), typ.c_str(),
2392 string reason =
"manually triggered";
2393 string info_id =
"(manager " +
name_;
2400 const std::string old_host = (*info.
chain)[info.
current];
2402 if (typ ==
"host") {
2408 "%s switching %s from %s to %s (%s)", info_id.c_str(), typ.c_str(),
2440 || (static_cast<int64_t>((now == 0) ? time(NULL) : now)
2453 vector<string> host_chain;
2454 vector<int> host_rtt;
2455 unsigned current_host;
2457 GetHostInfo(&host_chain, &host_rtt, ¤t_host);
2464 JobInfo info(&url,
false,
false, NULL, &memsink);
2465 for (retries = 0; retries < 2; ++
retries) {
2466 for (i = 0; i < host_chain.size(); ++i) {
2467 url = host_chain[i] +
"/.cvmfspublished";
2469 struct timeval tv_start, tv_end;
2470 gettimeofday(&tv_start, NULL);
2472 gettimeofday(&tv_end, NULL);
2478 "(manager '%s' - id %" PRId64
") "
2479 "probing host %s had %dms rtt",
2480 name_.c_str(), info.
id(), url.c_str(), host_rtt[i]);
2483 "(manager '%s' - id %" PRId64
") "
2484 "error while probing host %s: %d %s",
2485 name_.c_str(), info.
id(), url.c_str(), result,
2487 host_rtt[i] = INT_MAX;
2493 for (i = 0; i < host_chain.size(); ++i) {
2494 if (host_rtt[i] == INT_MAX)
2507 std::vector<uint64_t> *output_order) {
2511 if (servers->size() == 1) {
2513 output_order->clear();
2514 output_order->push_back(0);
2519 std::vector<std::string> host_chain;
2522 std::vector<std::string> server_dns_names;
2523 server_dns_names.reserve(servers->size());
2524 for (
unsigned i = 0; i < servers->size(); ++i) {
2526 server_dns_names.push_back(host.empty() ? (*servers)[i] : host);
2528 const std::string host_list =
JoinStrings(server_dns_names,
",");
2530 vector<string> host_chain_shuffled;
2538 bool success =
false;
2539 const unsigned max_attempts = std::min(host_chain_shuffled.size(), size_t(3));
2540 vector<uint64_t> geo_order(servers->size());
2541 for (
unsigned i = 0; i < max_attempts; ++i) {
2543 host_chain_shuffled[i] +
"/api/v1.0/geo/@proxy@/" + host_list;
2545 "(manager '%s') requesting ordered server list from %s",
2546 name_.c_str(), url.c_str());
2548 JobInfo info(&url,
false,
false, NULL, &memsink);
2551 const string order(reinterpret_cast<char *>(memsink.data()),
2557 "(manager '%s') retrieved invalid GeoAPI reply from %s [%s]",
2558 name_.c_str(), url.c_str(), order.c_str());
2562 "geographic order of servers retrieved from %s",
2567 Trim(order,
true ).c_str());
2573 "(manager '%s') GeoAPI request for %s failed with error %d [%s]",
2580 "failed to retrieve geographic order from stratum 1 servers",
2586 output_order->swap(geo_order);
2588 std::vector<std::string> sorted_servers;
2589 sorted_servers.reserve(geo_order.size());
2590 for (
unsigned i = 0; i < geo_order.size(); ++i) {
2591 const uint64_t orderval = geo_order[i];
2592 sorted_servers.push_back((*servers)[orderval]);
2594 servers->swap(sorted_servers);
2608 vector<string> host_chain;
2609 vector<int> host_rtt;
2610 unsigned current_host;
2611 vector<vector<ProxyInfo> > proxy_chain;
2612 unsigned fallback_group;
2614 GetHostInfo(&host_chain, &host_rtt, ¤t_host);
2616 if ((host_chain.size() < 2) && ((proxy_chain.size() - fallback_group) < 2))
2619 vector<string> host_names;
2620 for (
unsigned i = 0; i < host_chain.size(); ++i)
2622 SortTeam(&host_names, &host_chain);
2623 const unsigned last_geo_host = host_names.size();
2625 if ((fallback_group == 0) && (last_geo_host > 1)) {
2631 host_names.push_back(
"+PXYSEP+");
2635 const unsigned first_geo_fallback = host_names.size();
2636 for (
unsigned i = fallback_group; i < proxy_chain.size(); ++i) {
2639 host_names.push_back(proxy_chain[i][0].host.name());
2642 std::vector<uint64_t> geo_order;
2657 vector<vector<ProxyInfo> > *proxy_groups =
new vector<vector<ProxyInfo> >(
2661 (*proxy_groups)[i] = (*opt_proxy_groups_)[i];
2671 for (
unsigned i = 0; i < geo_order.size(); ++i) {
2672 const uint64_t orderval = geo_order[i];
2673 if (orderval < static_cast<uint64_t>(last_geo_host)) {
2677 }
else if (orderval >= static_cast<uint64_t>(first_geo_fallback)) {
2681 (*proxy_groups)[proxyi] = proxy_chain[fallback_group + orderval
2682 - first_geo_fallback];
2720 const unsigned expected_size,
2721 vector<uint64_t> *reply_vals) {
2722 if (reply_order.empty())
2725 if (!sanitizer.
IsValid(reply_order))
2730 vector<uint64_t> tmp_vals;
2731 for (
unsigned i = 0; i < reply_strings.size(); ++i) {
2732 if (reply_strings[i].empty())
2736 if (tmp_vals.size() != expected_size)
2740 set<uint64_t> coverage(tmp_vals.begin(), tmp_vals.end());
2741 if (coverage.size() != tmp_vals.size())
2743 if ((*coverage.begin() != 1) || (*coverage.rbegin() != coverage.size()))
2746 for (
unsigned i = 0; i < expected_size; ++i) {
2747 (*reply_vals)[i] = tmp_vals[i] - 1;
2758 string *cleaned_list) {
2760 if (proxy_list ==
"") {
2764 bool result =
false;
2766 vector<string> proxy_groups =
SplitString(proxy_list,
';');
2767 vector<string> cleaned_groups;
2768 for (
unsigned i = 0; i < proxy_groups.size(); ++i) {
2769 vector<string> group =
SplitString(proxy_groups[i],
'|');
2770 vector<string> cleaned;
2771 for (
unsigned j = 0; j < group.size(); ++j) {
2772 if ((group[j] ==
"DIRECT") || (group[j] ==
"")) {
2775 cleaned.push_back(group[j]);
2778 if (!cleaned.empty())
2779 cleaned_groups.push_back(
JoinStrings(cleaned,
"|"));
2796 const string &fallback_proxy_list,
2804 bool contains_direct;
2812 &set_proxy_fallback_list);
2813 if (contains_direct) {
2815 "(manager '%s') fallback proxies do not support DIRECT, removing",
2818 if (set_proxy_fallback_list ==
"") {
2822 if (contains_direct) {
2824 "(manager '%s') skipping DIRECT proxy to use fallback proxy",
2834 if ((set_proxy_list ==
"") && (set_proxy_fallback_list ==
"")) {
2845 if (set_proxy_list !=
"") {
2850 "first fallback proxy group %u",
2855 string all_proxy_list = set_proxy_list;
2856 if (set_proxy_fallback_list !=
"") {
2857 if (all_proxy_list !=
"")
2858 all_proxy_list +=
";";
2859 all_proxy_list += set_proxy_fallback_list;
2862 name_.c_str(), all_proxy_list.c_str());
2865 vector<string> hostnames;
2866 vector<string> proxy_groups;
2867 if (all_proxy_list !=
"")
2869 for (
unsigned i = 0; i < proxy_groups.size(); ++i) {
2870 vector<string> this_group =
SplitString(proxy_groups[i],
'|');
2871 for (
unsigned j = 0; j < this_group.size(); ++j) {
2877 hostnames.push_back(hostname);
2880 vector<dns::Host> hosts;
2883 "resolving %lu proxy addresses",
2884 name_.c_str(), hostnames.size());
2891 unsigned num_proxy = 0;
2892 for (
unsigned i = 0; i < proxy_groups.size(); ++i) {
2893 vector<string> this_group =
SplitString(proxy_groups[i],
'|');
2897 vector<ProxyInfo> infos;
2898 for (
unsigned j = 0; j < this_group.size(); ++j, ++num_proxy) {
2900 if (this_group[j] ==
"DIRECT") {
2908 "failed to resolve IP addresses for %s (%d - %s)",
2909 name_.c_str(), hosts[num_proxy].name().c_str(),
2910 hosts[num_proxy].status(),
2914 infos.push_back(
ProxyInfo(failed_host, this_group[j]));
2921 set<string>::const_iterator iter_ips = best_addresses.begin();
2922 for (; iter_ips != best_addresses.end(); ++iter_ips) {
2924 infos.push_back(
ProxyInfo(hosts[num_proxy], url_ip));
2932 opt_num_proxies_ += infos.size();
2935 "(manager '%s') installed %u proxies in %lu load-balance groups",
2955 unsigned *current_group,
2956 unsigned *fallback_group) {
2957 assert(proxy_chain != NULL);
2961 const vector< vector<ProxyInfo> > empty_chain;
2962 *proxy_chain = empty_chain;
2963 if (current_group != NULL)
2965 if (fallback_group != NULL)
2966 *fallback_group = 0;
2971 if (current_group != NULL)
2973 if (fallback_group != NULL)
2991 const uint32_t key = (hash ? hash->
Partial32() : 0);
2992 const map<uint32_t, ProxyInfo *>::iterator it =
3014 const uint32_t max_key = 0xffffffffUL;
3017 for (
unsigned i = 0; i < num_alive; ++i) {
3024 const std::pair<uint32_t, ProxyInfo *> entry(prng.
Next(max_key), proxy);
3027 const std::string proxy_name =
3033 const std::pair<uint32_t, ProxyInfo *> last_entry(max_key, first_proxy);
3037 const unsigned select =
prng_.
Next(num_alive);
3039 const std::pair<uint32_t, ProxyInfo *> entry(max_key, proxy);
3041 const std::string proxy_name =
3049 const string curr_host =
"Current host: "
3053 if (new_proxy != old_proxy) {
3055 "(manager '%s') switching proxy from %s to %s. Reason: %s [%s]",
3056 name_.c_str(), (old_proxy.empty() ?
"(none)" : old_proxy.c_str()),
3057 (new_proxy.empty() ?
"(none)" : new_proxy.c_str()), reason.c_str(),
3104 const std::string msg =
3137 const unsigned backoff_init_ms,
3138 const unsigned backoff_max_ms) {
3153 const std::string &forced) {
3180 const bool success =
false;
3186 "Proposed sharding policy does not exist. Falling back to default",
3202 const std::string &cloned_name) {
unsigned opt_timeout_direct_
std::vector< std::string > http_tracing_headers_
bool StripDirect(const std::string &proxy_list, std::string *cleaned_list)
unsigned opt_low_speed_limit_
void HashString(const std::string &content, Any *any_digest)
static const unsigned kDnsDefaultTimeoutMs
bool ignore_signature_failures_
std::vector< T > Shuffle(const std::vector< T > &input, Prng *prng)
unsigned throttle() const
z_stream * GetZstreamPtr()
unsigned opt_backoff_init_ms_
void SetInfoHeader(char *info_header)
bool enable_http_tracing_
shash::ContextPtr * GetHashContextPtr()
int64_t Xadd(class Counter *counter, const int64_t delta)
const char * Code2Ascii(const Failures error)
bool Write(const T &data)
unsigned opt_proxy_groups_current_burned_
double DiffTimeSeconds(struct timeval start, struct timeval end)
unsigned opt_proxy_groups_reset_after_
virtual bool IsCanceled()
void SetUrlOptions(JobInfo *info)
SharedPtr< ShardingPolicy > sharding_policy_
StreamStates DecompressZStream2Sink(const void *buf, const int64_t size, z_stream *strm, cvmfs::Sink *sink)
void ResolveMany(const std::vector< std::string > &names, std::vector< Host > *hosts)
std::string opt_proxy_fallback_list_
void SetHostChain(const std::string &host_list)
bool CheckMetalinkChain(const time_t now)
static NormalResolver * Create(const bool ipv4_only, const unsigned retries, const unsigned timeout_ms)
void SetMetalinkChain(const std::string &metalink_list)
void SetLowSpeedLimit(const unsigned low_speed_limit)
DownloadManager(const unsigned max_pool_handles, const perf::StatisticsTemplate &statistics, const std::string &name="standard")
std::string proxy_template_direct_
std::vector< std::string > opt_proxies_
curl_slist ** GetHeadersPtr()
static const int kProbeGeo
string Trim(const string &raw, bool trim_newline)
string ReplaceAll(const string &haystack, const string &needle, const string &replace_by)
void set_min_ttl(unsigned seconds)
string JoinStrings(const vector< string > &strings, const string &joint)
std::string ToString(const bool with_suffix=false) const
unsigned opt_proxy_groups_current_
void SetCurrentHostChainIndex(int current_host_chain_index)
virtual bool RequiresReserve()=0
bool ValidateGeoReply(const std::string &reply_order, const unsigned expected_size, std::vector< uint64_t > *reply_vals)
std::vector< ProxyInfo > * current_proxy_group() const
void DecompressInit(z_stream *strm)
const std::string * url() const
time_t opt_timestamp_backup_proxies_
void SetProxyChain(const std::string &proxy_list, const std::string &fallback_proxy_list, const ProxySetModes set_mode)
std::string GetProxyList()
bool allow_failure() const
void GetMetalinkInfo(std::vector< std::string > *metalink_chain, unsigned *current_metalink)
std::set< CURL * > * pool_handles_inuse_
CURL * curl_handle() const
pthread_mutex_t * lock_options_
ProxyInfo * ChooseProxyUnlocked(const shash::Any *hash)
pthread_t thread_download_
std::string opt_proxy_list_
const std::string & name() const
perf::Counter * sz_transfer_time
void SetTracingHeaderGid(char *tracing_header_gid)
assert((mem||(size==0))&&"Out Of Memory")
void SetNocache(bool nocache)
unsigned opt_proxy_groups_fallback_
void ReleaseCurlHandle(CURL *handle)
static bool sortlinks(const std::string &s1, const std::string &s2)
void SetTracingHeaderPid(char *tracing_header_pid)
void set_max_ttl(unsigned seconds)
Tube< DataTubeElement > * GetDataTubePtr()
char * tracing_header_gid() const
const std::set< std::string > & ViewBestAddresses(IpPreference preference) const
char * info_header() const
void SetDnsServer(const std::string &address)
DownloadManager * Clone(const perf::StatisticsTemplate &statistics, const std::string &cloned_name)
bool force_nocache() const
std::string StringifyUint(const uint64_t value)
void DecompressFini(z_stream *strm)
virtual std::string Describe()=0
static void * MainDownload(void *data)
void InitSeed(const uint64_t seed)
void SetTimeout(const unsigned seconds_proxy, const unsigned seconds_direct)
void SetLink(const std::string &link)
std::string opt_dns_server_
uint32_t watch_fds_inuse_
void SwitchHostInfo(const std::string &typ, HostInfo &info, JobInfo *jobinfo)
off_t range_offset() const
unsigned char num_used_hosts() const
bool follow_redirects() const
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, void *info_link)
void Init(ContextPtr context)
void SetNumUsedMetalinks(unsigned char num_used_metalinks)
bool FileExists(const std::string &path)
Pipe< kPipeDownloadJobsResults > * GetPipeJobResultPtr()
void SetDnsTtlLimits(const unsigned min_seconds, const unsigned max_seconds)
static Failures PrepareDownloadDestination(JobInfo *info)
perf::Counter * n_metalink_failover
void SetHttpCode(int http_code)
const char * Code2Ascii(const Failures error)
uint32_t pool_max_handles_
bool head_request() const
unsigned char num_retries() const
std::vector< std::string > * chain
bool IsValidPipeJobResults()
std::vector< std::vector< ProxyInfo > > * opt_proxy_groups_
void GetProxyInfo(std::vector< std::vector< ProxyInfo > > *proxy_chain, unsigned *current_group, unsigned *fallback_group)
void SetProxyGroupResetDelay(const unsigned seconds)
void SetCurrentMetalinkChainIndex(int current_metalink_chain_index)
atomic_int32 multi_threaded_
std::string AddDefaultScheme(const std::string &proxy)
vector< string > SplitString(const string &str, char delim)
cvmfs::Sink * sink() const
dns::NormalResolver * resolver_
bool Interrupted(const std::string &fqrn, JobInfo *info)
std::string proxy() const
dns::IpPreference opt_ip_preference_
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, void *info_link)
void UseSystemCertificatePath()
bool SetShardingPolicy(const ShardingPolicySelector type)
perf::Counter * n_host_failover
void UpdateProxiesUnlocked(const std::string &reason)
time_t opt_metalink_timestamp_link_
bool IsEquivalent(const Host &other) const
bool IsProxyTransferError(const Failures error)
void SetNumRetries(unsigned char num_retries)
void set_throttle(const unsigned throttle)
InterruptCue * interrupt_cue() const
void SetIpPreference(const dns::IpPreference preference)
bool failover_indefinitely_
shash::ContextPtr hash_context() const
perf::Counter * n_requests
void Final(ContextPtr context, Any *any_digest)
void SetRetryParameters(const unsigned max_retries, const unsigned backoff_init_ms, const unsigned backoff_max_ms)
string StringifyInt(const int64_t value)
char * tracing_header_uid() const
Failures error_code() const
void CloneProxyConfig(DownloadManager *clone)
void SetMaxIpaddrPerProxy(unsigned limit)
void EnableIgnoreSignatureFailures()
CURL * AcquireCurlHandle()
void Inc(class Counter *counter)
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
std::string ExtractHost(const std::string &url)
std::vector< int > * opt_host_chain_rtt_
SslCertificateStore ssl_certificate_store_
std::string GetFallbackProxyList()
uint32_t Partial32() const
void SetProxyTemplates(const std::string &direct, const std::string &forced)
unsigned opt_backoff_max_ms_
void SetErrorCode(Failures error_code)
void SetNumUsedProxies(unsigned char num_used_proxies)
unsigned GetContextSize(const Algorithms algorithm)
std::string GetDnsServer() const
unsigned opt_num_proxies_
int current_metalink_chain_index() const
CredentialsAttachment * credentials_attachment_
struct pollfd * watch_fds_
void Update(const unsigned char *buffer, const unsigned buffer_length, ContextPtr context)
std::map< uint32_t, ProxyInfo * > opt_proxy_map_
uint64_t String2Uint64(const string &value)
UniquePtr< Pipe< kPipeDownloadJobs > > pipe_jobs_
void UseSystemCertificatePath()
void CreatePipeJobResults()
Failures Fetch(JobInfo *info)
unsigned char num_used_metalinks() const
void SetCurlHandle(CURL *curl_handle)
unsigned opt_max_retries_
void SetTracingHeaderUid(char *tracing_header_uid)
curl_slist * headers() const
const shash::Any * expected_hash() const
perf::Counter * n_proxy_failover
void GetTimeout(unsigned *seconds_proxy, unsigned *seconds_direct)
void SetCredData(void *cred_data)
void SetFailoverIndefinitely()
std::string proxy_template_forced_
time_t opt_timestamp_failover_proxies_
void SetDnsParameters(const unsigned retries, const unsigned timeout_ms)
unsigned EscapeHeader(const std::string &header, char *escaped_buf, size_t buf_size)
const std::string * extra_info() const
static Host ExtendDeadline(const Host &original, unsigned seconds_from_now)
UniquePtr< Pipe< kPipeThreadTerminator > > pipe_terminate_
void SetProxy(const std::string &proxy)
static const int kProbeUnprobed
unsigned backoff_ms() const
unsigned char num_used_proxies() const
int current_host_chain_index() const
void SafeSleepMs(const unsigned ms)
void SetMetalinkResetDelay(const unsigned seconds)
bool IsHostTransferError(const Failures error)
static const unsigned kDnsDefaultRetries
void SortTeam(std::vector< T > *tractor, std::vector< U > *towed)
bool GeoSortServers(std::vector< std::string > *servers, std::vector< uint64_t > *output_order=NULL)
virtual bool Reserve(size_t size)=0
static const int kProbeDown
void SetNumUsedHosts(unsigned char num_used_hosts)
void SetHeaders(curl_slist *headers)
static const unsigned kProxyMapScale
void GetHostInfo(std::vector< std::string > *host_chain, std::vector< int > *rtt, unsigned *current_host)
unsigned opt_timeout_proxy_
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
char * tracing_header_pid() const
void SetBackoffMs(unsigned backoff_ms)
SharedPtr< HealthCheck > health_check_
void SwitchProxy(JobInfo *info)
void AddHTTPTracingHeader(const std::string &header)
void SetCredentialsAttachment(CredentialsAttachment *ca)
void SetFollowRedirects(bool follow_redirects)
void RebalanceProxiesUnlocked(const std::string &reason)
pthread_mutex_t * lock_synchronous_mode_
virtual bool SetResolvers(const std::vector< std::string > &resolvers)
void InitializeRequest(JobInfo *info, CURL *handle)
static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, void *userp, void *socketp)
string RewriteUrl(const string &url, const string &ip)
void SetHostResetDelay(const unsigned seconds)
uint32_t Next(const uint64_t boundary)
virtual int64_t Write(const void *buf, uint64_t sz)=0
unsigned timeout_ms() const
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)