29 #define __STDC_FORMAT_MACROS
94 std::string pause_file = std::string(
"/var/run/cvmfs/interrupt.") + fqrn;
97 "(id %" PRId64
") Interrupted(): checking for existence of %s",
98 info->
id(), pause_file.c_str());
101 "(id %" PRId64
") Interrupt marker found - "
102 "Interrupting current download, this will EIO outstanding IO.",
104 if (0 != unlink(pause_file.c_str())) {
106 "(id %" PRId64
") Couldn't delete interrupt marker: errno=%d",
120 "(id %" PRId64
") Failed to open path %s: %s (errno=%d).",
121 info->
id(), psink->
path().c_str(), strerror(errno), errno);
125 "Failed to create a valid sink: \n %s",
141 const size_t num_bytes = size*nmemb;
142 const string header_line(static_cast<const char *>(ptr), num_bytes);
149 if (
HasPrefix(header_line,
"HTTP/1.",
false)) {
150 if (header_line.length() < 10) {
155 for (i = 8; (i < header_line.length()) && (header_line[i] ==
' '); ++i) {}
158 if (header_line.length() > i+2) {
159 info->
SetHttpCode(DownloadManager::ParseHttpCode(&header_line[i]));
171 "(id %" PRId64
") redirect support not enabled: %s",
172 info->
id(), header_line.c_str());
177 info->
id(), header_line.c_str());
182 "(id %" PRId64
") http status error code: %s [%d]",
183 info->
id(), header_line.c_str(), info->
http_code());
204 HasPrefix(header_line,
"CONTENT-LENGTH:",
true))
206 char *tmp =
reinterpret_cast<char *
>(alloca(num_bytes+1));
208 sscanf(header_line.c_str(),
"%s %" PRIu64, tmp, &length);
212 "resource %s too large to store in memory (%" PRIu64
")",
213 info->
id(), info->
url()->c_str(), length);
221 }
else if (
HasPrefix(header_line,
"LOCATION:",
true)) {
224 info->
id(), header_line.c_str());
225 }
else if (
HasPrefix(header_line,
"LINK:",
true)) {
228 info->
id(), header_line.c_str());
229 std::string link = info->
link();
230 if (link.size() != 0) {
232 link = link +
", " + header_line.substr(5);
234 link = header_line.substr(5);
237 }
else if (
HasPrefix(header_line,
"X-SQUID-ERROR:",
true)) {
242 }
else if (
HasPrefix(header_line,
"PROXY-STATUS:",
true)) {
245 (header_line.find(
"error=") != string::npos)) {
260 const size_t num_bytes = size*nmemb;
284 "(id %" PRId64
") failed to decompress %s",
285 info->
id(), info->
url()->c_str());
290 "(id %" PRId64
") decompressing %s, local IO error",
291 info->
id(), info->
url()->c_str());
296 int64_t written = info->
sink()->
Write(ptr, num_bytes);
297 if (written < 0 || static_cast<uint64_t>(written) != num_bytes) {
299 "Failed to perform write of %zu bytes to sink %s with errno %ld",
300 info->
id(), num_bytes, info->
sink()->
Describe().c_str(), written);
308 static int CallbackCurlDebug(
316 curl_easy_getinfo(handle, CURLINFO_PRIVATE, &info);
318 std::string prefix =
"(id " +
StringifyInt(info->id()) +
") ";
323 case CURLINFO_HEADER_IN:
324 prefix +=
"{header/recv} ";
326 case CURLINFO_HEADER_OUT:
327 prefix +=
"{header/sent} ";
329 case CURLINFO_DATA_IN:
331 prefix +=
"{data/recv} ";
337 case CURLINFO_DATA_OUT:
339 prefix +=
"{data/sent} ";
345 case CURLINFO_SSL_DATA_IN:
347 prefix +=
"{ssldata/recv} ";
354 case CURLINFO_SSL_DATA_OUT:
356 prefix +=
"{ssldata/sent} ";
368 bool valid_char =
true;
369 std::string msg(data, size);
370 for (
size_t i = 0; i < msg.length(); ++i) {
371 if (msg[i] ==
'\0') {
376 if ((msg[i] <
' ' || msg[i] >
'~')
384 msg =
"<Non-plaintext sequence>";
388 prefix.c_str(),
Trim(msg,
true ).c_str());
396 const int DownloadManager::kProbeUnprobed = -1;
397 const int DownloadManager::kProbeDown = -2;
398 const int DownloadManager::kProbeGeo = -3;
400 bool DownloadManager::EscapeUrlChar(
unsigned char input,
char output[3]) {
401 if (((input >=
'0') && (input <=
'9')) ||
402 ((input >=
'A') && (input <=
'Z')) ||
403 ((input >=
'a') && (input <=
'z')) ||
404 (input ==
'/') || (input ==
':') || (input ==
'.') ||
406 (input ==
'+') || (input ==
'-') ||
407 (input ==
'_') || (input ==
'~') ||
408 (input ==
'[') || (input ==
']') || (input ==
','))
410 output[0] =
static_cast<char>(input);
415 output[1] =
static_cast<char>(
416 (input / 16) + ((input / 16 <= 9) ?
'0' :
'A'-10));
417 output[2] =
static_cast<char>(
418 (input % 16) + ((input % 16 <= 9) ?
'0' :
'A'-10));
427 string DownloadManager::EscapeUrl(
const int64_t jobinfo_id,
const string &url) {
429 escaped.reserve(url.length());
431 char escaped_char[3];
432 for (
unsigned i = 0, s = url.length(); i < s; ++i) {
433 if (EscapeUrlChar(url[i], escaped_char)) {
434 escaped.append(escaped_char, 3);
436 escaped.push_back(escaped_char[0]);
440 jobinfo_id, url.c_str(), escaped.c_str());
449 unsigned DownloadManager::EscapeHeader(
const string &header,
453 unsigned esc_pos = 0;
454 char escaped_char[3];
455 for (
unsigned i = 0, s = header.size(); i < s; ++i) {
456 if (EscapeUrlChar(header[i], escaped_char)) {
457 for (
unsigned j = 0; j < 3; ++j) {
459 if (esc_pos >= buf_size)
461 escaped_buf[esc_pos] = escaped_char[j];
467 if (esc_pos >= buf_size)
469 escaped_buf[esc_pos] = escaped_char[0];
481 int DownloadManager::ParseHttpCode(
const char digits[3]) {
484 for (
int i = 0; i < 3; ++i) {
485 if ((digits[i] <
'0') || (digits[i] >
'9'))
487 result += (digits[i] -
'0') * factor;
497 int DownloadManager::CallbackCurlSocket(CURL * ,
506 if (action == CURL_POLL_NONE)
525 download_mgr->
watch_fds_ =
static_cast<struct pollfd *
>(
537 download_mgr->
watch_fds_[index].events = POLLIN | POLLPRI;
540 download_mgr->
watch_fds_[index].events = POLLOUT | POLLWRBAND;
542 case CURL_POLL_INOUT:
544 POLLIN | POLLPRI | POLLOUT | POLLWRBAND;
546 case CURL_POLL_REMOVE:
547 if (index < download_mgr->watch_fds_inuse_-1) {
559 download_mgr->
watch_fds_ =
static_cast<struct pollfd *
>(
577 void *DownloadManager::MainDownload(
void *data) {
580 "download I/O thread of DownloadManager '%s' started",
581 download_mgr->
name_.c_str());
583 const int kIdxPipeTerminate = 0;
584 const int kIdxPipeJobs = 1;
587 static_cast<struct pollfd *
>(smalloc(2 *
sizeof(
struct pollfd)));
589 download_mgr->
watch_fds_[kIdxPipeTerminate].fd =
591 download_mgr->
watch_fds_[kIdxPipeTerminate].events = POLLIN | POLLPRI;
592 download_mgr->
watch_fds_[kIdxPipeTerminate].revents = 0;
595 download_mgr->
watch_fds_[kIdxPipeJobs].events = POLLIN | POLLPRI;
596 download_mgr->
watch_fds_[kIdxPipeJobs].revents = 0;
599 int still_running = 0;
600 struct timeval timeval_start, timeval_stop;
601 gettimeofday(&timeval_start, NULL);
617 gettimeofday(&timeval_stop, NULL);
618 int64_t delta =
static_cast<int64_t
>(
630 curl_multi_socket_action(download_mgr->
curl_multi_,
637 if (download_mgr->
watch_fds_[kIdxPipeTerminate].revents)
641 if (download_mgr->
watch_fds_[kIdxPipeJobs].revents) {
642 download_mgr->
watch_fds_[kIdxPipeJobs].revents = 0;
645 if (!still_running) {
646 gettimeofday(&timeval_start, NULL);
651 curl_multi_add_handle(download_mgr->
curl_multi_, handle);
652 curl_multi_socket_action(download_mgr->
curl_multi_,
669 if (download_mgr->
watch_fds_[i].revents & (POLLIN | POLLPRI))
670 ev_bitmask |= CURL_CSELECT_IN;
671 if (download_mgr->
watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
672 ev_bitmask |= CURL_CSELECT_OUT;
674 (POLLERR | POLLHUP | POLLNVAL))
676 ev_bitmask |= CURL_CSELECT_ERR;
680 curl_multi_socket_action(download_mgr->
curl_multi_,
690 while ((curl_msg = curl_multi_info_read(download_mgr->
curl_multi_,
693 if (curl_msg->msg == CURLMSG_DONE) {
696 CURL *easy_handle = curl_msg->easy_handle;
697 int curl_error = curl_msg->data.result;
698 curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
701 curl_easy_getinfo(easy_handle, CURLINFO_REDIRECT_COUNT, &redir_count);
703 "Number of CURL redirects %" PRId64 ,
704 download_mgr->
name_.c_str(), info->
id(),
707 curl_multi_remove_handle(download_mgr->
curl_multi_, easy_handle);
709 curl_multi_add_handle(download_mgr->
curl_multi_, easy_handle);
710 curl_multi_socket_action(download_mgr->
curl_multi_,
721 Write<download::Failures>(info->
error_code());
730 curl_multi_remove_handle(download_mgr->
curl_multi_, *i);
731 curl_easy_cleanup(*i);
737 "download I/O thread of DownloadManager '%s' terminated",
738 download_mgr->
name_.c_str());
746 HeaderLists::~HeaderLists() {
747 for (
unsigned i = 0; i < blocks_.size(); ++i) {
754 curl_slist *HeaderLists::GetList(
const char *header) {
759 curl_slist *HeaderLists::DuplicateList(curl_slist *slist) {
761 curl_slist *copy = GetList(slist->data);
762 copy->next = slist->next;
763 curl_slist *prev = copy;
766 curl_slist *new_link = Get(slist->data);
767 new_link->next = slist->next;
768 prev->next = new_link;
776 void HeaderLists::AppendHeader(curl_slist *slist,
const char *header) {
778 curl_slist *new_link = Get(header);
779 new_link->next = NULL;
783 slist->next = new_link;
792 void HeaderLists::CutHeader(
const char *header, curl_slist **slist) {
796 curl_slist *prev = &head;
797 curl_slist *rover = *slist;
799 if (strcmp(rover->data, header) == 0) {
800 prev->next = rover->next;
811 void HeaderLists::PutList(curl_slist *slist) {
813 curl_slist *next = slist->next;
820 string HeaderLists::Print(curl_slist *slist) {
823 verbose += string(slist->data) +
"\n";
830 curl_slist *HeaderLists::Get(
const char *header) {
831 for (
unsigned i = 0; i < blocks_.size(); ++i) {
832 for (
unsigned j = 0; j < kBlockSize; ++j) {
833 if (!IsUsed(&(blocks_[i][j]))) {
834 blocks_[i][j].data =
const_cast<char *
>(header);
835 return &(blocks_[i][j]);
842 blocks_[blocks_.size()-1][0].data =
const_cast<char *
>(header);
843 return &(blocks_[blocks_.size()-1][0]);
847 void HeaderLists::Put(curl_slist *slist) {
853 void HeaderLists::AddBlock() {
854 curl_slist *new_block =
new curl_slist[kBlockSize];
855 for (
unsigned i = 0; i < kBlockSize; ++i) {
858 blocks_.push_back(new_block);
865 string DownloadManager::ProxyInfo::Print() {
871 static_cast<int>(host.deadline()) - static_cast<int>(time(NULL));
872 string expinfo = (remaining >= 0) ?
"+" :
"";
873 if (abs(remaining) >= 3600) {
875 }
else if (abs(remaining) >= 60) {
881 result +=
" (" + host.name() +
", " + expinfo +
")";
883 result +=
" (:unresolved:, " + expinfo +
")";
893 CURL *DownloadManager::AcquireCurlHandle() {
896 if (pool_handles_idle_->empty()) {
898 handle = curl_easy_init();
901 curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
906 handle = *(pool_handles_idle_->begin());
907 pool_handles_idle_->erase(pool_handles_idle_->begin());
910 pool_handles_inuse_->insert(handle);
916 void DownloadManager::ReleaseCurlHandle(CURL *handle) {
917 set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
918 assert(elem != pool_handles_inuse_->end());
920 if (pool_handles_idle_->size() > pool_max_handles_) {
921 curl_easy_cleanup(*elem);
923 pool_handles_idle_->insert(*elem);
926 pool_handles_inuse_->erase(elem);
934 void DownloadManager::InitializeRequest(
JobInfo *info, CURL *handle) {
945 info->
SetHeaders(header_lists_->DuplicateList(default_headers_));
949 if (enable_http_tracing_) {
950 for (
unsigned int i = 0; i < http_tracing_headers_.size(); i++) {
951 header_lists_->AppendHeader(info->
headers(),
952 (http_tracing_headers_)[i].c_str());
960 "CURL Header for URL: %s is:\n %s",
961 name_.c_str(), info->
id(), info->
url()->c_str(),
962 header_lists_->Print(info->
headers()).c_str());
979 char byte_range_array[100];
980 const int64_t range_lower =
static_cast<int64_t
>(info->
range_offset());
981 const int64_t range_upper =
static_cast<int64_t
>(
983 if (snprintf(byte_range_array,
sizeof(byte_range_array),
984 "%" PRId64
"-%" PRId64,
985 range_lower, range_upper) == 100)
989 curl_easy_setopt(handle, CURLOPT_RANGE, byte_range_array);
991 curl_easy_setopt(handle, CURLOPT_RANGE, NULL);
995 curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
996 curl_easy_setopt(handle, CURLOPT_WRITEHEADER,
997 static_cast<void *>(info));
998 curl_easy_setopt(handle, CURLOPT_WRITEDATA, static_cast<void *>(info));
999 curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->
headers());
1001 curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
1003 curl_easy_setopt(handle, CURLOPT_HTTPGET, 1);
1005 if (opt_ipv4_only_) {
1006 curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
1008 if (follow_redirects_) {
1009 curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1);
1010 curl_easy_setopt(handle, CURLOPT_MAXREDIRS, 4);
1013 curl_easy_setopt(handle, CURLOPT_VERBOSE, 1);
1014 curl_easy_setopt(handle, CURLOPT_DEBUGFUNCTION, CallbackCurlDebug);
1018 void DownloadManager::CheckHostInfoReset(
1019 const std::string &typ,
1027 if (static_cast<int64_t>(now) >
1031 "(manager %s - id %" PRId64
") "
1032 "switching %s from %s to %s (reset %s)", name_.c_str(),
1034 (*info.
chain)[0].c_str(), typ.c_str());
1046 void DownloadManager::SetUrlOptions(
JobInfo *info) {
1054 if (sharding_policy_.UseCount() > 0) {
1055 if (info->
proxy() !=
"") {
1062 curl_easy_setopt(info->
curl_handle(), CURLOPT_PROXY, info->
proxy().c_str());
1065 if (opt_timestamp_backup_proxies_ > 0) {
1067 if (static_cast<int64_t>(now) >
1068 static_cast<int64_t>(opt_timestamp_backup_proxies_ +
1069 opt_proxy_groups_reset_after_))
1071 opt_proxy_groups_current_ = 0;
1072 opt_timestamp_backup_proxies_ = 0;
1073 RebalanceProxiesUnlocked(
"Reset proxy group from backup to primary");
1077 if (opt_timestamp_failover_proxies_ > 0) {
1080 if (static_cast<int64_t>(now) >
1081 static_cast<int64_t>(opt_timestamp_failover_proxies_ +
1082 opt_proxy_groups_reset_after_))
1084 RebalanceProxiesUnlocked(
1085 "Reset load-balanced proxies within the active group");
1090 if (!proxy || (proxy->
url ==
"DIRECT")) {
1092 curl_easy_setopt(info->
curl_handle(), CURLOPT_PROXY,
"");
1097 std::string purl = proxy->
url;
1099 const bool changed = ValidateProxyIpsUnlocked(purl, phost);
1106 curl_easy_setopt(info->
curl_handle(), CURLOPT_PROXY,
1107 info->
proxy().c_str());
1110 curl_easy_setopt(info->
curl_handle(), CURLOPT_PROXY,
"0.0.0.0");
1116 CheckHostInfoReset(
"metalink", opt_metalink_, info, now);
1117 CheckHostInfoReset(
"host", opt_metalink_, info, now);
1119 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, opt_low_speed_limit_);
1120 if (info->
proxy() !=
"DIRECT") {
1121 curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_proxy_);
1122 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_proxy_);
1124 curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_direct_);
1125 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_direct_);
1127 if (!opt_dns_server_.empty())
1128 curl_easy_setopt(curl_handle, CURLOPT_DNS_SERVERS, opt_dns_server_.c_str());
1131 if (CheckMetalinkChain(now)) {
1132 url_prefix = (*opt_metalink_.chain)[opt_metalink_.current];
1135 "reading from metalink %d",
1136 name_.c_str(), info->
id(), opt_metalink_.current);
1137 }
else if (opt_host_.chain) {
1138 url_prefix = (*opt_host_.chain)[opt_host_.current];
1141 "reading from host %d",
1142 name_.c_str(), info->
id(), opt_host_.current);
1146 string url = url_prefix + *(info->
url());
1148 curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 1L);
1149 if (url.substr(0, 5) ==
"https") {
1150 bool rvb = ssl_certificate_store_.ApplySslCertificatePath(curl_handle);
1153 "(manager %s - id %" PRId64
") "
1154 "Failed to set SSL certificate path %s", name_.c_str(),
1155 info->
id(), ssl_certificate_store_.GetCaPath().c_str());
1157 if (info->
pid() != -1) {
1158 if (credentials_attachment_ == NULL) {
1160 "uses secure downloads but no credentials attachment set",
1161 name_.c_str(), info->
id());
1163 bool retval = credentials_attachment_->ConfigureCurlHandle(
1167 "failed attaching credentials",
1168 name_.c_str(), info->
id());
1176 signal(SIGPIPE, SIG_IGN);
1179 if (url.find(
"@proxy@") != string::npos) {
1187 if (proxy_template_forced_ !=
"") {
1188 replacement = proxy_template_forced_;
1189 }
else if (info->
proxy() ==
"DIRECT") {
1190 replacement = proxy_template_direct_;
1192 if (opt_proxy_groups_current_ >= opt_proxy_groups_fallback_) {
1196 curl_easy_setopt(info->
curl_handle(), CURLOPT_PROXY,
"");
1197 replacement = proxy_template_direct_;
1199 replacement = ChooseProxyUnlocked(info->
expected_hash())->host.name();
1202 replacement = (replacement ==
"") ? proxy_template_direct_ : replacement;
1204 "replacing @proxy@ by %s",
1205 name_.c_str(), info->
id(), replacement.c_str());
1206 url =
ReplaceAll(url,
"@proxy@", replacement);
1227 curl_easy_setopt(curl_handle, CURLOPT_URL,
1228 EscapeUrl(info->
id(), url).c_str());
1241 bool DownloadManager::ValidateProxyIpsUnlocked(
1248 name_.c_str(), host.
name().c_str());
1250 unsigned group_idx = opt_proxy_groups_current_;
1253 bool update_only =
true;
1257 "(manager '%s') failed to resolve IP addresses for %s (%d - %s)",
1258 name_.c_str(), host.
name().c_str(), new_host.
status(),
1262 update_only =
false;
1266 for (
unsigned i = 0; i < (*opt_proxy_groups_)[group_idx].size(); ++i) {
1267 if ((*opt_proxy_groups_)[group_idx][i].host.id() == host.
id())
1268 (*opt_proxy_groups_)[group_idx][i].host = new_host;
1277 "(manager '%s') DNS entries for proxy %s changed, adjusting",
1278 name_.c_str(), host.
name().c_str());
1279 vector<ProxyInfo> *group = current_proxy_group();
1280 opt_num_proxies_ -= group->size();
1281 for (
unsigned i = 0; i < group->size(); ) {
1282 if ((*group)[i].host.id() == host.
id()) {
1283 group->erase(group->begin() + i);
1288 vector<ProxyInfo> new_infos;
1290 set<string>::const_iterator iter_ips = best_addresses.begin();
1291 for (; iter_ips != best_addresses.end(); ++iter_ips) {
1293 new_infos.push_back(
ProxyInfo(new_host, url_ip));
1295 group->insert(group->end(), new_infos.begin(), new_infos.end());
1296 opt_num_proxies_ += new_infos.size();
1298 std::string msg =
"DNS entries for proxy " + host.
name() +
" changed";
1300 RebalanceProxiesUnlocked(msg);
1308 void DownloadManager::UpdateStatistics(CURL *handle) {
1313 retval = curl_easy_getinfo(handle, CURLINFO_SIZE_DOWNLOAD, &val);
1314 assert(retval == CURLE_OK);
1315 sum +=
static_cast<int64_t
>(val);
1319 perf::Xadd(counters_->sz_transferred_bytes, sum);
1326 bool DownloadManager::CanRetry(
const JobInfo *info) {
1328 unsigned max_retries = opt_max_retries_;
1343 unsigned backoff_init_ms = 0;
1344 unsigned backoff_max_ms = 0;
1347 backoff_init_ms = opt_backoff_init_ms_;
1348 backoff_max_ms = opt_backoff_max_ms_;
1363 "(manager '%s' - id %" PRId64
") backing off for %d ms",
1371 header_lists_->AppendHeader(info->
headers(),
"Pragma: no-cache");
1372 header_lists_->AppendHeader(info->
headers(),
"Cache-Control: no-cache");
1382 void DownloadManager::SetRegularCache(
JobInfo *info) {
1385 header_lists_->CutHeader(
"Pragma: no-cache", info->
GetHeadersPtr());
1386 header_lists_->CutHeader(
"Cache-Control: no-cache", info->
GetHeadersPtr());
1395 void DownloadManager::ReleaseCredential(
JobInfo *info) {
1397 assert(credentials_attachment_ != NULL);
1398 credentials_attachment_->ReleaseCurlHandle(info->
curl_handle(),
1406 static bool sortlinks(
const std::string &s1,
const std::string &s2) {
1407 const size_t pos1 = s1.find(
"; pri=");
1408 const size_t pos2 = s2.find(
"; pri=");
1410 if ((pos1 != std::string::npos) &&
1411 (pos2 != std::string::npos) &&
1412 (sscanf(s1.substr(pos1+6).c_str(),
"%d", &pri1) == 1) &&
1413 (sscanf(s2.substr(pos2+6).c_str(),
"%d", &pri2) == 1)) {
1423 void DownloadManager::ProcessLink(
JobInfo *info) {
1426 if (info->
link().find(
"; pri=") != std::string::npos)
1427 std::sort(links.begin(), links.end(),
sortlinks);
1429 std::vector<std::string> host_list;
1431 std::vector<std::string>::const_iterator il = links.begin();
1432 for (; il != links.end(); ++il) {
1433 const std::string &link = *il;
1434 if ((link.find(
"; rel=duplicate") == std::string::npos) &&
1435 (link.find(
"; rel=\"duplicate\"") == std::string::npos)) {
1437 "skipping link '%s' because it does not contain rel=duplicate",
1443 size_t start = link.find(
'<');
1444 if (start == std::string::npos) {
1446 "skipping link '%s' because it does not have a left angle bracket",
1452 if ((link.substr(start, 7) !=
"http://") &&
1453 (link.substr(start, 8) !=
"https://")) {
1455 "skipping link '%s' of unrecognized url protocol", link.c_str());
1459 size_t end = link.find(
'/', start+8);
1460 if (end == std::string::npos)
1461 end = link.find(
'>');
1462 if (end == std::string::npos) {
1464 "skipping link '%s' because no slash in url and no right angle bracket",
1468 const std::string host = link.substr(start, end-start);
1470 host_list.push_back(host);
1473 if (host_list.size() > 0) {
1474 SetHostChain(host_list);
1475 opt_metalink_timestamp_link_ = time(NULL);
1486 bool DownloadManager::VerifyAndFinalize(
const int curl_error,
JobInfo *info) {
1488 "Verify downloaded url %s, proxy %s (curl error %d)",
1489 name_.c_str(), info->
id(), info->
url()->c_str(),
1490 info->
proxy().c_str(), curl_error);
1496 was_metalink =
true;
1498 if (info->
link() !=
"") {
1503 was_metalink =
false;
1509 switch (curl_error) {
1516 if (ignore_signature_failures_) {
1518 "(manager '%s' - id %" PRId64
") "
1519 "ignoring failed hash verification of %s (expected %s, got %s)",
1520 name_.c_str(), info->
id(), info->
url()->c_str(),
1525 "hash verification of %s failed (expected %s, got %s)",
1526 name_.c_str(), info->
id(), info->
url()->c_str(),
1537 case CURLE_UNSUPPORTED_PROTOCOL:
1540 case CURLE_URL_MALFORMAT:
1543 case CURLE_COULDNT_RESOLVE_PROXY:
1546 case CURLE_COULDNT_RESOLVE_HOST:
1549 case CURLE_OPERATION_TIMEDOUT:
1553 case CURLE_PARTIAL_FILE:
1554 case CURLE_GOT_NOTHING:
1555 case CURLE_RECV_ERROR:
1559 case CURLE_FILE_COULDNT_READ_FILE:
1560 case CURLE_COULDNT_CONNECT:
1561 if (info->
proxy() !=
"DIRECT") {
1568 case CURLE_TOO_MANY_REDIRECTS:
1571 case CURLE_SSL_CACERT_BADFILE:
1573 "(manager '%s' -id %" PRId64
") "
1574 "Failed to load certificate bundle. "
1575 "X509_CERT_BUNDLE might point to the wrong location.",
1576 name_.c_str(), info->
id());
1581 case CURLE_PEER_FAILED_VERIFICATION:
1583 "(manager '%s' - id %" PRId64
") "
1584 "invalid SSL certificate of remote host. "
1585 "X509_CERT_DIR and/or X509_CERT_BUNDLE might point to the wrong "
1586 "location.", name_.c_str(), info->
id());
1589 case CURLE_ABORTED_BY_CALLBACK:
1590 case CURLE_WRITE_ERROR:
1593 case CURLE_SEND_ERROR:
1602 "unexpected curl error (%d) while trying to fetch %s",
1603 name_.c_str(), info->
id(), curl_error, info->
url()->c_str());
1608 std::vector<std::string> *host_chain;
1609 unsigned char num_used_hosts;
1611 host_chain = opt_metalink_.chain;
1614 host_chain = opt_host_.chain;
1619 bool try_again =
false;
1620 bool same_url_retry = CanRetry(info);
1629 "(manager '%s' - id %" PRId64
") "
1630 "data corruption with no-cache header, try another %s",
1631 name_.c_str(), info->
id(), typ.c_str());
1636 if ( same_url_retry || (
1641 host_chain && (num_used_hosts < host_chain->size()))
1646 if ( same_url_retry || (
1652 if (sharding_policy_.UseCount() > 0) {
1654 same_url_retry =
false;
1661 host_chain && (num_used_hosts < host_chain->size()))
1664 if (opt_proxy_groups_) {
1665 if ((opt_proxy_groups_current_ > 0) ||
1666 (opt_proxy_groups_current_burned_ > 0))
1668 opt_proxy_groups_current_ = 0;
1669 opt_timestamp_backup_proxies_ = 0;
1670 const std::string msg =
1671 "reset proxies for " + typ +
" failover";
1672 RebalanceProxiesUnlocked(msg);
1678 "(manager '%s' - id %" PRId64
") make it a %s failure",
1679 name_.c_str(), info->
id(), typ.c_str());
1683 if (failover_indefinitely_) {
1687 "(manager '%s' - id %" PRId64
") "
1688 "VerifyAndFinalize() would fail the download here. "
1689 "Instead switch proxy and retry download. "
1691 "info->probe_hosts=%d host_chain=%p num_used_hosts=%d "
1692 "host_chain->size()=%lu same_url_retry=%d "
1693 "info->num_used_proxies=%d opt_num_proxies_=%d",
1694 name_.c_str(), info->
id(), typ.c_str(),
1696 host_chain, num_used_hosts,
1698 host_chain->size() : -1,
static_cast<int>(same_url_retry),
1701 RebalanceProxiesUnlocked(
1702 "download failed - failover indefinitely");
1715 "Trying again on same curl handle, same url: %d, "
1716 "error code %d no-cache %d",
1717 name_.c_str(), info->
id(), same_url_retry,
1722 goto verify_and_finalize_stop;
1726 goto verify_and_finalize_stop;
1736 if (sharding_policy_.UseCount() > 0) {
1737 ReleaseCredential(info);
1738 SetUrlOptions(info);
1740 SetRegularCache(info);
1743 bool switch_proxy =
false;
1744 bool switch_host =
false;
1751 switch_proxy =
true;
1760 if (same_url_retry) {
1763 switch_proxy =
true;
1766 if (same_url_retry) {
1777 ReleaseCredential(info);
1780 SetUrlOptions(info);
1783 ReleaseCredential(info);
1785 SwitchMetalink(info);
1791 SetUrlOptions(info);
1795 if (failover_indefinitely_) {
1803 verify_and_finalize_stop:
1805 ReleaseCredential(info);
1814 header_lists_->PutList(info->
headers());
1821 DownloadManager::~DownloadManager() {
1823 if (sharding_policy_.UseCount() > 0) {
1824 sharding_policy_.Reset();
1826 if (health_check_.UseCount() > 0) {
1827 if (health_check_.Unique()) {
1829 "(manager '%s') Stopping healthcheck thread", name_.c_str());
1830 health_check_->StopHealthcheck();
1832 health_check_.Reset();
1835 if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1838 pthread_join(thread_download_, NULL);
1840 pipe_terminate_.Destroy();
1841 pipe_jobs_.Destroy();
1844 for (set<CURL *>::iterator i = pool_handles_idle_->begin(),
1845 iEnd = pool_handles_idle_->end(); i != iEnd; ++i)
1847 curl_easy_cleanup(*i);
1850 delete pool_handles_idle_;
1851 delete pool_handles_inuse_;
1852 curl_multi_cleanup(curl_multi_);
1854 delete header_lists_;
1859 delete opt_host_.chain;
1860 delete opt_host_chain_rtt_;
1861 delete opt_proxy_groups_;
1863 curl_global_cleanup();
1867 pthread_mutex_destroy(lock_options_);
1868 pthread_mutex_destroy(lock_synchronous_mode_);
1869 free(lock_options_);
1870 free(lock_synchronous_mode_);
1873 void DownloadManager::InitHeaders() {
1875 string cernvm_id =
"User-Agent: cvmfs ";
1876 #ifdef CVMFS_LIBCVMFS
1877 cernvm_id +=
"libcvmfs ";
1879 cernvm_id +=
"Fuse ";
1881 cernvm_id += string(CVMFS_VERSION);
1882 if (getenv(
"CERNVM_UUID") != NULL) {
1886 user_agent_ = strdup(cernvm_id.c_str());
1890 default_headers_ = header_lists_->GetList(
"Connection: Keep-Alive");
1891 header_lists_->AppendHeader(default_headers_,
"Pragma:");
1892 header_lists_->AppendHeader(default_headers_, user_agent_);
1895 DownloadManager::DownloadManager(
const unsigned max_pool_handles,
1897 const std::string &name) :
1899 pool_handles_idle_(new set<CURL *>),
1900 pool_handles_inuse_(new set<CURL *>),
1901 pool_max_handles_(max_pool_handles),
1902 pipe_terminate_(NULL),
1906 watch_fds_inuse_(0),
1907 watch_fds_max_(4 * max_pool_handles),
1908 opt_timeout_proxy_(5),
1909 opt_timeout_direct_(10),
1910 opt_low_speed_limit_(1024),
1911 opt_max_retries_(0),
1912 opt_backoff_init_ms_(0),
1913 opt_backoff_max_ms_(0),
1914 enable_info_header_(false),
1915 opt_ipv4_only_(false),
1916 follow_redirects_(false),
1917 ignore_signature_failures_(false),
1918 enable_http_tracing_(false),
1919 opt_metalink_(NULL, 0, 0, 0),
1920 opt_metalink_timestamp_link_(0),
1921 opt_host_(NULL, 0, 0, 0),
1922 opt_host_chain_rtt_(NULL),
1923 opt_proxy_groups_(NULL),
1924 opt_proxy_groups_current_(0),
1925 opt_proxy_groups_current_burned_(0),
1926 opt_proxy_groups_fallback_(0),
1927 opt_num_proxies_(0),
1928 opt_proxy_shard_(false),
1929 failover_indefinitely_(false),
1932 opt_timestamp_backup_proxies_(0),
1933 opt_timestamp_failover_proxies_(0),
1934 opt_proxy_groups_reset_after_(0),
1935 credentials_attachment_(NULL),
1936 counters_(new
Counters(statistics))
1941 reinterpret_cast<pthread_mutex_t *
>(smalloc(
sizeof(pthread_mutex_t)));
1945 reinterpret_cast<pthread_mutex_t *
>(smalloc(
sizeof(pthread_mutex_t)));
1949 retval = curl_global_init(CURL_GLOBAL_ALL);
1950 assert(retval == CURLE_OK);
1957 curl_multi_setopt(
curl_multi_, CURLMOPT_SOCKETDATA,
1958 static_cast<void *>(
this));
1960 curl_multi_setopt(
curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1966 if ((getenv(
"CVMFS_IPV4_ONLY") != NULL) &&
1967 (strlen(getenv(
"CVMFS_IPV4_ONLY")) > 0))
1985 static_cast<void *>(
this));
1992 "(manager '%s') Starting healthcheck thread",
name_.c_str());
2023 const char *header_name =
"cvmfs-info: ";
2024 const size_t header_name_len = strlen(header_name);
2025 const unsigned header_size = 1 + header_name_len +
2027 info->
SetInfoHeader(static_cast<char *>(alloca(header_size)));
2028 memcpy(info->
info_header(), header_name, header_name_len);
2030 header_size - header_name_len);
2035 const std::string str_pid =
"X-CVMFS-PID: " +
StringifyInt(info->
pid());
2081 retval = curl_easy_perform(handle);
2084 if (curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &elapsed) == CURLE_OK)
2087 static_cast<int64_t>(elapsed * 1000));
2096 "download failed (error %d - %s)",
2100 if (info->
sink() != NULL) {
2130 if (!address.empty()) {
2135 vector<string> servers;
2136 servers.push_back(address);
2141 name_.c_str(), address.c_str());
2150 const unsigned timeout_ms)
2167 const unsigned min_seconds,
2168 const unsigned max_seconds)
2188 const unsigned seconds_direct)
2211 unsigned *seconds_direct)
2229 const std::vector<std::string> &metalink_list) {
2235 if (metalink_list.empty()) {
2249 unsigned *current_metalink)
2275 if (host_list.empty()) {
2294 unsigned *current_host)
2322 const unsigned group_size = group->size();
2323 unsigned failed = 0;
2325 if (info && (info->
proxy() == (*group)[i].url)) {
2327 opt_proxy_groups_current_burned_++;
2329 (*group)[group_size - opt_proxy_groups_current_burned_]);
2341 if (opt_proxy_groups_current_burned_ == group->size()) {
2342 opt_proxy_groups_current_burned_ = 0;
2371 "%lu proxies remain in group",
name_.c_str(), info->
id(),
2386 if (!info.
chain || (info.
chain->size() == 1)) {
2392 if (typ ==
"host") {
2397 if (lastused != info.
current) {
2399 "(manager '%s' - id %" PRId64
")"
2401 "last used %s: %s, current %s: %s",
2402 name_.c_str(), jobinfo->
id(), typ.c_str(),
2403 typ.c_str(), (*info.
chain)[lastused].c_str(),
2409 string reason =
"manually triggered";
2410 string info_id =
"(manager " +
name_;
2417 const std::string old_host = (*info.
chain)[info.
current];
2419 if (typ ==
"host") {
2425 "%s switching %s from %s to %s (%s)", info_id.c_str(), typ.c_str(),
2461 (static_cast<int64_t>((now == 0) ? time(NULL) : now) >
2474 vector<string> host_chain;
2475 vector<int> host_rtt;
2476 unsigned current_host;
2478 GetHostInfo(&host_chain, &host_rtt, ¤t_host);
2485 JobInfo info(&url,
false,
false, NULL, &memsink);
2486 for (retries = 0; retries < 2; ++
retries) {
2487 for (i = 0; i < host_chain.size(); ++i) {
2488 url = host_chain[i] +
"/.cvmfspublished";
2490 struct timeval tv_start, tv_end;
2491 gettimeofday(&tv_start, NULL);
2493 gettimeofday(&tv_end, NULL);
2496 host_rtt[i] =
static_cast<int>(
2499 "probing host %s had %dms rtt",
2501 url.c_str(), host_rtt[i]);
2504 "error while probing host %s: %d %s",
2507 host_rtt[i] = INT_MAX;
2513 for (i = 0; i < host_chain.size(); ++i) {
2514 if (host_rtt[i] == INT_MAX) host_rtt[i] =
kProbeDown;
2526 std::vector<uint64_t> *output_order) {
2527 if (!servers) {
return false;}
2528 if (servers->size() == 1) {
2530 output_order->clear();
2531 output_order->push_back(0);
2536 std::vector<std::string> host_chain;
2539 std::vector<std::string> server_dns_names;
2540 server_dns_names.reserve(servers->size());
2541 for (
unsigned i = 0; i < servers->size(); ++i) {
2543 server_dns_names.push_back(host.empty() ? (*servers)[i] : host);
2545 std::string host_list =
JoinStrings(server_dns_names,
",");
2547 vector<string> host_chain_shuffled;
2555 bool success =
false;
2556 unsigned max_attempts = std::min(host_chain_shuffled.size(), size_t(3));
2557 vector<uint64_t> geo_order(servers->size());
2558 for (
unsigned i = 0; i < max_attempts; ++i) {
2559 string url = host_chain_shuffled[i] +
"/api/v1.0/geo/@proxy@/" + host_list;
2561 "(manager '%s') requesting ordered server list from %s",
2562 name_.c_str(), url.c_str());
2564 JobInfo info(&url,
false,
false, NULL, &memsink);
2567 string order(reinterpret_cast<char*>(memsink.data()), memsink.pos());
2572 "(manager '%s') retrieved invalid GeoAPI reply from %s [%s]",
2573 name_.c_str(), url.c_str(), order.c_str());
2576 "geographic order of servers retrieved from %s",
2581 Trim(order,
true ).c_str());
2587 "(manager '%s') GeoAPI request for %s failed with error %d [%s]",
2593 "failed to retrieve geographic order from stratum 1 servers",
2599 output_order->swap(geo_order);
2601 std::vector<std::string> sorted_servers;
2602 sorted_servers.reserve(geo_order.size());
2603 for (
unsigned i = 0; i < geo_order.size(); ++i) {
2604 uint64_t orderval = geo_order[i];
2605 sorted_servers.push_back((*servers)[orderval]);
2607 servers->swap(sorted_servers);
2621 vector<string> host_chain;
2622 vector<int> host_rtt;
2623 unsigned current_host;
2624 vector< vector<ProxyInfo> > proxy_chain;
2625 unsigned fallback_group;
2627 GetHostInfo(&host_chain, &host_rtt, ¤t_host);
2629 if ((host_chain.size() < 2) && ((proxy_chain.size() - fallback_group) < 2))
2632 vector<string> host_names;
2633 for (
unsigned i = 0; i < host_chain.size(); ++i)
2635 SortTeam(&host_names, &host_chain);
2636 unsigned last_geo_host = host_names.size();
2638 if ((fallback_group == 0) && (last_geo_host > 1)) {
2644 host_names.push_back(
"+PXYSEP+");
2648 unsigned first_geo_fallback = host_names.size();
2649 for (
unsigned i = fallback_group; i < proxy_chain.size(); ++i) {
2652 host_names.push_back(proxy_chain[i][0].host.name());
2655 std::vector<uint64_t> geo_order;
2670 vector<vector<ProxyInfo> > *proxy_groups =
new vector<vector<ProxyInfo> >(
2674 (*proxy_groups)[i] = (*opt_proxy_groups_)[i];
2684 for (
unsigned i = 0; i < geo_order.size(); ++i) {
2685 uint64_t orderval = geo_order[i];
2686 if (orderval < static_cast<uint64_t>(last_geo_host)) {
2690 }
else if (orderval >= static_cast<uint64_t>(first_geo_fallback)) {
2694 (*proxy_groups)[proxyi] =
2695 proxy_chain[fallback_group + orderval - first_geo_fallback];
2733 const string &reply_order,
2734 const unsigned expected_size,
2735 vector<uint64_t> *reply_vals)
2737 if (reply_order.empty())
2740 if (!sanitizer.
IsValid(reply_order))
2743 vector<string> reply_strings =
2745 vector<uint64_t> tmp_vals;
2746 for (
unsigned i = 0; i < reply_strings.size(); ++i) {
2747 if (reply_strings[i].empty())
2751 if (tmp_vals.size() != expected_size)
2755 set<uint64_t> coverage(tmp_vals.begin(), tmp_vals.end());
2756 if (coverage.size() != tmp_vals.size())
2758 if ((*coverage.begin() != 1) || (*coverage.rbegin() != coverage.size()))
2761 for (
unsigned i = 0; i < expected_size; ++i) {
2762 (*reply_vals)[i] = tmp_vals[i] - 1;
2773 const string &proxy_list,
2774 string *cleaned_list)
2777 if (proxy_list ==
"") {
2781 bool result =
false;
2783 vector<string> proxy_groups =
SplitString(proxy_list,
';');
2784 vector<string> cleaned_groups;
2785 for (
unsigned i = 0; i < proxy_groups.size(); ++i) {
2786 vector<string> group =
SplitString(proxy_groups[i],
'|');
2787 vector<string> cleaned;
2788 for (
unsigned j = 0; j < group.size(); ++j) {
2789 if ((group[j] ==
"DIRECT") || (group[j] ==
"")) {
2792 cleaned.push_back(group[j]);
2795 if (!cleaned.empty())
2796 cleaned_groups.push_back(
JoinStrings(cleaned,
"|"));
2813 const string &proxy_list,
2814 const string &fallback_proxy_list,
2823 bool contains_direct;
2832 if (contains_direct) {
2834 "(manager '%s') fallback proxies do not support DIRECT, removing",
2837 if (set_proxy_fallback_list ==
"") {
2841 if (contains_direct) {
2843 "(manager '%s') skipping DIRECT proxy to use fallback proxy",
2853 if ((set_proxy_list ==
"") && (set_proxy_fallback_list ==
"")) {
2864 if (set_proxy_list !=
"") {
2868 "first fallback proxy group %u",
2873 string all_proxy_list = set_proxy_list;
2874 if (set_proxy_fallback_list !=
"") {
2875 if (all_proxy_list !=
"")
2876 all_proxy_list +=
";";
2877 all_proxy_list += set_proxy_fallback_list;
2880 name_.c_str(), all_proxy_list.c_str());
2883 vector<string> hostnames;
2884 vector<string> proxy_groups;
2885 if (all_proxy_list !=
"")
2887 for (
unsigned i = 0; i < proxy_groups.size(); ++i) {
2888 vector<string> this_group =
SplitString(proxy_groups[i],
'|');
2889 for (
unsigned j = 0; j < this_group.size(); ++j) {
2895 hostnames.push_back(hostname);
2898 vector<dns::Host> hosts;
2900 "resolving %lu proxy addresses",
2901 name_.c_str(), hostnames.size());
2908 unsigned num_proxy = 0;
2909 for (
unsigned i = 0; i < proxy_groups.size(); ++i) {
2910 vector<string> this_group =
SplitString(proxy_groups[i],
'|');
2914 vector<ProxyInfo> infos;
2915 for (
unsigned j = 0; j < this_group.size(); ++j, ++num_proxy) {
2917 if (this_group[j] ==
"DIRECT") {
2924 "failed to resolve IP addresses for %s (%d - %s)",
name_.c_str(),
2925 hosts[num_proxy].name().c_str(), hosts[num_proxy].status(),
2929 infos.push_back(
ProxyInfo(failed_host, this_group[j]));
2934 set<string> best_addresses =
2936 set<string>::const_iterator iter_ips = best_addresses.begin();
2937 for (; iter_ips != best_addresses.end(); ++iter_ips) {
2939 infos.push_back(
ProxyInfo(hosts[num_proxy], url_ip));
2947 opt_num_proxies_ += infos.size();
2950 "(manager '%s') installed %u proxies in %lu load-balance groups",
2970 unsigned *current_group,
2971 unsigned *fallback_group)
2973 assert(proxy_chain != NULL);
2978 vector< vector<ProxyInfo> > empty_chain;
2979 *proxy_chain = empty_chain;
2980 if (current_group != NULL)
2982 if (fallback_group != NULL)
2983 *fallback_group = 0;
2988 if (current_group != NULL)
2990 if (fallback_group != NULL)
3010 uint32_t key = (hash ? hash->
Partial32() : 0);
3011 map<uint32_t, ProxyInfo *>::iterator it =
opt_proxy_map_.lower_bound(key);
3032 const uint32_t max_key = 0xffffffffUL;
3035 for (
unsigned i = 0; i < num_alive; ++i) {
3042 const std::pair<uint32_t, ProxyInfo *> entry(prng.
Next(max_key), proxy);
3045 std::string proxy_name = proxy->
host.
name().empty() ?
3046 "" :
" (" + proxy->
host.
name() +
")";
3051 const std::pair<uint32_t, ProxyInfo *> last_entry(max_key, first_proxy);
3055 unsigned select =
prng_.
Next(num_alive);
3057 const std::pair<uint32_t, ProxyInfo *> entry(max_key, proxy);
3059 std::string proxy_name = proxy->
host.
name().empty() ?
3060 "" :
" (" + proxy->
host.
name() +
")";
3069 if (new_proxy != old_proxy) {
3071 "(manager '%s') switching proxy from %s to %s. Reason: %s [%s]",
3072 name_.c_str(), (old_proxy.empty() ?
"(none)" : old_proxy.c_str()),
3073 (new_proxy.empty() ?
"(none)" : new_proxy.c_str()),
3074 reason.c_str(), curr_host.c_str());
3120 std::string msg =
"switch to proxy group " +
3155 const unsigned backoff_init_ms,
3156 const unsigned backoff_max_ms)
3172 const std::string &direct,
3173 const std::string &forced)
3207 bool success =
false;
3211 "Proposed sharding policy does not exist. Falling back to default",
unsigned opt_timeout_direct_
std::vector< std::string > http_tracing_headers_
bool StripDirect(const std::string &proxy_list, std::string *cleaned_list)
unsigned opt_low_speed_limit_
void HashString(const std::string &content, Any *any_digest)
static const unsigned kDnsDefaultTimeoutMs
bool ignore_signature_failures_
std::vector< T > Shuffle(const std::vector< T > &input, Prng *prng)
unsigned throttle() const
z_stream * GetZstreamPtr()
unsigned opt_backoff_init_ms_
void SetInfoHeader(char *info_header)
bool enable_http_tracing_
shash::ContextPtr * GetHashContextPtr()
int64_t Xadd(class Counter *counter, const int64_t delta)
const char * Code2Ascii(const Failures error)
unsigned opt_proxy_groups_current_burned_
double DiffTimeSeconds(struct timeval start, struct timeval end)
unsigned opt_proxy_groups_reset_after_
virtual bool IsCanceled()
void SetUrlOptions(JobInfo *info)
SharedPtr< ShardingPolicy > sharding_policy_
StreamStates DecompressZStream2Sink(const void *buf, const int64_t size, z_stream *strm, cvmfs::Sink *sink)
void ResolveMany(const std::vector< std::string > &names, std::vector< Host > *hosts)
std::string opt_proxy_fallback_list_
void SetHostChain(const std::string &host_list)
bool CheckMetalinkChain(const time_t now)
static NormalResolver * Create(const bool ipv4_only, const unsigned retries, const unsigned timeout_ms)
void SetMetalinkChain(const std::string &metalink_list)
void SetLowSpeedLimit(const unsigned low_speed_limit)
DownloadManager(const unsigned max_pool_handles, const perf::StatisticsTemplate &statistics, const std::string &name="standard")
std::string proxy_template_direct_
std::vector< std::string > opt_proxies_
curl_slist ** GetHeadersPtr()
static const int kProbeGeo
string Trim(const string &raw, bool trim_newline)
string ReplaceAll(const string &haystack, const string &needle, const string &replace_by)
void set_min_ttl(unsigned seconds)
string JoinStrings(const vector< string > &strings, const string &joint)
std::string ToString(const bool with_suffix=false) const
unsigned opt_proxy_groups_current_
void SetCurrentHostChainIndex(int current_host_chain_index)
virtual bool RequiresReserve()=0
bool ValidateGeoReply(const std::string &reply_order, const unsigned expected_size, std::vector< uint64_t > *reply_vals)
std::vector< ProxyInfo > * current_proxy_group() const
void DecompressInit(z_stream *strm)
const std::string * url() const
time_t opt_timestamp_backup_proxies_
void SetProxyChain(const std::string &proxy_list, const std::string &fallback_proxy_list, const ProxySetModes set_mode)
std::string GetProxyList()
bool allow_failure() const
void GetMetalinkInfo(std::vector< std::string > *metalink_chain, unsigned *current_metalink)
std::set< CURL * > * pool_handles_inuse_
CURL * curl_handle() const
pthread_mutex_t * lock_options_
ProxyInfo * ChooseProxyUnlocked(const shash::Any *hash)
pthread_t thread_download_
std::string opt_proxy_list_
const std::string & name() const
perf::Counter * sz_transfer_time
void SetTracingHeaderGid(char *tracing_header_gid)
std::vector< std::vector< ProxyInfo > > * opt_proxy_groups_
assert((mem||(size==0))&&"Out Of Memory")
void SetNocache(bool nocache)
unsigned opt_proxy_groups_fallback_
void ReleaseCurlHandle(CURL *handle)
static bool sortlinks(const std::string &s1, const std::string &s2)
void SetTracingHeaderPid(char *tracing_header_pid)
void set_max_ttl(unsigned seconds)
Tube< DataTubeElement > * GetDataTubePtr()
char * tracing_header_gid() const
const std::set< std::string > & ViewBestAddresses(IpPreference preference) const
char * info_header() const
void SetDnsServer(const std::string &address)
DownloadManager * Clone(const perf::StatisticsTemplate &statistics, const std::string &cloned_name)
bool force_nocache() const
std::string StringifyUint(const uint64_t value)
void DecompressFini(z_stream *strm)
virtual std::string Describe()=0
static void * MainDownload(void *data)
void InitSeed(const uint64_t seed)
void SetTimeout(const unsigned seconds_proxy, const unsigned seconds_direct)
void SetLink(const std::string &link)
std::string opt_dns_server_
uint32_t watch_fds_inuse_
void SwitchHostInfo(const std::string &typ, HostInfo &info, JobInfo *jobinfo)
off_t range_offset() const
unsigned char num_used_hosts() const
bool follow_redirects() const
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, void *info_link)
void Init(ContextPtr context)
void SetNumUsedMetalinks(unsigned char num_used_metalinks)
bool FileExists(const std::string &path)
Pipe< kPipeDownloadJobsResults > * GetPipeJobResultPtr()
void SetDnsTtlLimits(const unsigned min_seconds, const unsigned max_seconds)
static Failures PrepareDownloadDestination(JobInfo *info)
perf::Counter * n_metalink_failover
void SetHttpCode(int http_code)
const char * Code2Ascii(const Failures error)
uint32_t pool_max_handles_
bool head_request() const
unsigned char num_retries() const
std::vector< std::string > * chain
bool IsValidPipeJobResults()
void GetProxyInfo(std::vector< std::vector< ProxyInfo > > *proxy_chain, unsigned *current_group, unsigned *fallback_group)
void SetProxyGroupResetDelay(const unsigned seconds)
void SetCurrentMetalinkChainIndex(int current_metalink_chain_index)
atomic_int32 multi_threaded_
std::string AddDefaultScheme(const std::string &proxy)
vector< string > SplitString(const string &str, char delim)
cvmfs::Sink * sink() const
dns::NormalResolver * resolver_
bool Interrupted(const std::string &fqrn, JobInfo *info)
std::string proxy() const
dns::IpPreference opt_ip_preference_
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, void *info_link)
void UseSystemCertificatePath()
bool SetShardingPolicy(const ShardingPolicySelector type)
perf::Counter * n_host_failover
void UpdateProxiesUnlocked(const std::string &reason)
time_t opt_metalink_timestamp_link_
bool IsEquivalent(const Host &other) const
bool IsProxyTransferError(const Failures error)
void SetNumRetries(unsigned char num_retries)
void set_throttle(const unsigned throttle)
InterruptCue * interrupt_cue() const
void SetIpPreference(const dns::IpPreference preference)
bool failover_indefinitely_
shash::ContextPtr hash_context() const
perf::Counter * n_requests
void Final(ContextPtr context, Any *any_digest)
void SetRetryParameters(const unsigned max_retries, const unsigned backoff_init_ms, const unsigned backoff_max_ms)
string StringifyInt(const int64_t value)
char * tracing_header_uid() const
Failures error_code() const
void CloneProxyConfig(DownloadManager *clone)
void SetMaxIpaddrPerProxy(unsigned limit)
void EnableIgnoreSignatureFailures()
CURL * AcquireCurlHandle()
void Inc(class Counter *counter)
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
std::string ExtractHost(const std::string &url)
std::vector< int > * opt_host_chain_rtt_
SslCertificateStore ssl_certificate_store_
std::string GetFallbackProxyList()
uint32_t Partial32() const
void SetProxyTemplates(const std::string &direct, const std::string &forced)
unsigned opt_backoff_max_ms_
void SetErrorCode(Failures error_code)
void SetNumUsedProxies(unsigned char num_used_proxies)
unsigned GetContextSize(const Algorithms algorithm)
std::string GetDnsServer() const
unsigned opt_num_proxies_
int current_metalink_chain_index() const
CredentialsAttachment * credentials_attachment_
struct pollfd * watch_fds_
void Update(const unsigned char *buffer, const unsigned buffer_length, ContextPtr context)
std::map< uint32_t, ProxyInfo * > opt_proxy_map_
uint64_t String2Uint64(const string &value)
UniquePtr< Pipe< kPipeDownloadJobs > > pipe_jobs_
void UseSystemCertificatePath()
void CreatePipeJobResults()
Failures Fetch(JobInfo *info)
unsigned char num_used_metalinks() const
void SetCurlHandle(CURL *curl_handle)
unsigned opt_max_retries_
void SetTracingHeaderUid(char *tracing_header_uid)
curl_slist * headers() const
const shash::Any * expected_hash() const
perf::Counter * n_proxy_failover
void GetTimeout(unsigned *seconds_proxy, unsigned *seconds_direct)
void SetCredData(void *cred_data)
void SetFailoverIndefinitely()
std::string proxy_template_forced_
time_t opt_timestamp_failover_proxies_
void SetDnsParameters(const unsigned retries, const unsigned timeout_ms)
unsigned EscapeHeader(const std::string &header, char *escaped_buf, size_t buf_size)
const std::string * extra_info() const
static Host ExtendDeadline(const Host &original, unsigned seconds_from_now)
UniquePtr< Pipe< kPipeThreadTerminator > > pipe_terminate_
void SetProxy(const std::string &proxy)
static const int kProbeUnprobed
unsigned backoff_ms() const
unsigned char num_used_proxies() const
int current_host_chain_index() const
void SafeSleepMs(const unsigned ms)
void SetMetalinkResetDelay(const unsigned seconds)
bool IsHostTransferError(const Failures error)
static const unsigned kDnsDefaultRetries
void SortTeam(std::vector< T > *tractor, std::vector< U > *towed)
bool GeoSortServers(std::vector< std::string > *servers, std::vector< uint64_t > *output_order=NULL)
virtual bool Reserve(size_t size)=0
static const int kProbeDown
void SetNumUsedHosts(unsigned char num_used_hosts)
void SetHeaders(curl_slist *headers)
static const unsigned kProxyMapScale
void GetHostInfo(std::vector< std::string > *host_chain, std::vector< int > *rtt, unsigned *current_host)
unsigned opt_timeout_proxy_
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
char * tracing_header_pid() const
void SetBackoffMs(unsigned backoff_ms)
SharedPtr< HealthCheck > health_check_
void SwitchProxy(JobInfo *info)
void AddHTTPTracingHeader(const std::string &header)
void SetCredentialsAttachment(CredentialsAttachment *ca)
void SetFollowRedirects(bool follow_redirects)
void RebalanceProxiesUnlocked(const std::string &reason)
pthread_mutex_t * lock_synchronous_mode_
virtual bool SetResolvers(const std::vector< std::string > &resolvers)
void InitializeRequest(JobInfo *info, CURL *handle)
static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, void *userp, void *socketp)
string RewriteUrl(const string &url, const string &ip)
void SetHostResetDelay(const unsigned seconds)
uint32_t Next(const uint64_t boundary)
virtual int64_t Write(const void *buf, uint64_t sz)=0
unsigned timeout_ms() const
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)