29 #define __STDC_FORMAT_MACROS
31 #include "cvmfs_config.h"
94 std::string pause_file = std::string(
"/var/run/cvmfs/interrupt.") + fqrn;
97 "(id %" PRId64
") Interrupted(): checking for existence of %s",
98 info->
id(), pause_file.c_str());
101 "(id %" PRId64
") Interrupt marker found - "
102 "Interrupting current download, this will EIO outstanding IO.",
104 if (0 != unlink(pause_file.c_str())) {
106 "(id %" PRId64
") Couldn't delete interrupt marker: errno=%d",
120 "(id %" PRId64
") Failed to open path %s: %s (errno=%d).",
121 info->
id(), psink->
path().c_str(), strerror(errno), errno);
125 "Failed to create a valid sink: \n %s",
141 const size_t num_bytes = size*nmemb;
142 const string header_line(static_cast<const char *>(ptr), num_bytes);
149 if (
HasPrefix(header_line,
"HTTP/1.",
false)) {
150 if (header_line.length() < 10) {
155 for (i = 8; (i < header_line.length()) && (header_line[i] ==
' '); ++i) {}
158 if (header_line.length() > i+2) {
159 info->
SetHttpCode(DownloadManager::ParseHttpCode(&header_line[i]));
171 "(id %" PRId64
") redirect support not enabled: %s",
172 info->
id(), header_line.c_str());
177 info->
id(), header_line.c_str());
182 "(id %" PRId64
") http status error code: %s [%d]",
183 info->
id(), header_line.c_str(), info->
http_code());
204 HasPrefix(header_line,
"CONTENT-LENGTH:",
true))
206 char *tmp =
reinterpret_cast<char *
>(alloca(num_bytes+1));
208 sscanf(header_line.c_str(),
"%s %" PRIu64, tmp, &length);
212 "resource %s too large to store in memory (%" PRIu64
")",
213 info->
id(), info->
url()->c_str(), length);
221 }
else if (
HasPrefix(header_line,
"LOCATION:",
true)) {
224 info->
id(), header_line.c_str());
225 }
else if (
HasPrefix(header_line,
"X-SQUID-ERROR:",
true)) {
230 }
else if (
HasPrefix(header_line,
"PROXY-STATUS:",
true)) {
233 (header_line.find(
"error=") != string::npos)) {
248 const size_t num_bytes = size*nmemb;
272 "(id %" PRId64
") failed to decompress %s",
273 info->
id(), info->
url()->c_str());
278 "(id %" PRId64
") decompressing %s, local IO error",
279 info->
id(), info->
url()->c_str());
284 int64_t written = info->
sink()->
Write(ptr, num_bytes);
285 if (written < 0 || static_cast<uint64_t>(written) != num_bytes) {
287 "Failed to perform write of %zu bytes to sink %s with errno %ld",
288 info->
id(), num_bytes, info->
sink()->
Describe().c_str(), written);
296 static int CallbackCurlDebug(
304 curl_easy_getinfo(handle, CURLINFO_PRIVATE, &info);
306 std::string prefix =
"(id " +
StringifyInt(info->id()) +
") ";
311 case CURLINFO_HEADER_IN:
312 prefix +=
"{header/recv} ";
314 case CURLINFO_HEADER_OUT:
315 prefix +=
"{header/sent} ";
317 case CURLINFO_DATA_IN:
319 prefix +=
"{data/recv} ";
325 case CURLINFO_DATA_OUT:
327 prefix +=
"{data/sent} ";
333 case CURLINFO_SSL_DATA_IN:
335 prefix +=
"{ssldata/recv} ";
342 case CURLINFO_SSL_DATA_OUT:
344 prefix +=
"{ssldata/sent} ";
356 bool valid_char =
true;
357 std::string msg(data, size);
358 for (
size_t i = 0; i < msg.length(); ++i) {
359 if (msg[i] ==
'\0') {
364 if ((msg[i] <
' ' || msg[i] >
'~')
372 msg =
"<Non-plaintext sequence>";
376 prefix.c_str(),
Trim(msg,
true ).c_str());
384 const int DownloadManager::kProbeUnprobed = -1;
385 const int DownloadManager::kProbeDown = -2;
386 const int DownloadManager::kProbeGeo = -3;
388 bool DownloadManager::EscapeUrlChar(
unsigned char input,
char output[3]) {
389 if (((input >=
'0') && (input <=
'9')) ||
390 ((input >=
'A') && (input <=
'Z')) ||
391 ((input >=
'a') && (input <=
'z')) ||
392 (input ==
'/') || (input ==
':') || (input ==
'.') ||
394 (input ==
'+') || (input ==
'-') ||
395 (input ==
'_') || (input ==
'~') ||
396 (input ==
'[') || (input ==
']') || (input ==
','))
398 output[0] =
static_cast<char>(input);
403 output[1] =
static_cast<char>(
404 (input / 16) + ((input / 16 <= 9) ?
'0' :
'A'-10));
405 output[2] =
static_cast<char>(
406 (input % 16) + ((input % 16 <= 9) ?
'0' :
'A'-10));
415 string DownloadManager::EscapeUrl(
const int64_t jobinfo_id,
const string &url) {
417 escaped.reserve(url.length());
419 char escaped_char[3];
420 for (
unsigned i = 0, s = url.length(); i < s; ++i) {
421 if (EscapeUrlChar(url[i], escaped_char)) {
422 escaped.append(escaped_char, 3);
424 escaped.push_back(escaped_char[0]);
428 jobinfo_id, url.c_str(), escaped.c_str());
437 unsigned DownloadManager::EscapeHeader(
const string &header,
441 unsigned esc_pos = 0;
442 char escaped_char[3];
443 for (
unsigned i = 0, s = header.size(); i < s; ++i) {
444 if (EscapeUrlChar(header[i], escaped_char)) {
445 for (
unsigned j = 0; j < 3; ++j) {
447 if (esc_pos >= buf_size)
449 escaped_buf[esc_pos] = escaped_char[j];
455 if (esc_pos >= buf_size)
457 escaped_buf[esc_pos] = escaped_char[0];
469 int DownloadManager::ParseHttpCode(
const char digits[3]) {
472 for (
int i = 0; i < 3; ++i) {
473 if ((digits[i] <
'0') || (digits[i] >
'9'))
475 result += (digits[i] -
'0') * factor;
485 int DownloadManager::CallbackCurlSocket(CURL * ,
494 if (action == CURL_POLL_NONE)
513 download_mgr->
watch_fds_ =
static_cast<struct pollfd *
>(
525 download_mgr->
watch_fds_[index].events = POLLIN | POLLPRI;
528 download_mgr->
watch_fds_[index].events = POLLOUT | POLLWRBAND;
530 case CURL_POLL_INOUT:
532 POLLIN | POLLPRI | POLLOUT | POLLWRBAND;
534 case CURL_POLL_REMOVE:
535 if (index < download_mgr->watch_fds_inuse_-1) {
547 download_mgr->
watch_fds_ =
static_cast<struct pollfd *
>(
565 void *DownloadManager::MainDownload(
void *data) {
568 "download I/O thread of DownloadManager '%s' started",
569 download_mgr->
name_.c_str());
571 const int kIdxPipeTerminate = 0;
572 const int kIdxPipeJobs = 1;
575 static_cast<struct pollfd *
>(smalloc(2 *
sizeof(
struct pollfd)));
577 download_mgr->
watch_fds_[kIdxPipeTerminate].fd =
579 download_mgr->
watch_fds_[kIdxPipeTerminate].events = POLLIN | POLLPRI;
580 download_mgr->
watch_fds_[kIdxPipeTerminate].revents = 0;
583 download_mgr->
watch_fds_[kIdxPipeJobs].events = POLLIN | POLLPRI;
584 download_mgr->
watch_fds_[kIdxPipeJobs].revents = 0;
587 int still_running = 0;
588 struct timeval timeval_start, timeval_stop;
589 gettimeofday(&timeval_start, NULL);
605 gettimeofday(&timeval_stop, NULL);
606 int64_t delta =
static_cast<int64_t
>(
618 curl_multi_socket_action(download_mgr->
curl_multi_,
625 if (download_mgr->
watch_fds_[kIdxPipeTerminate].revents)
629 if (download_mgr->
watch_fds_[kIdxPipeJobs].revents) {
630 download_mgr->
watch_fds_[kIdxPipeJobs].revents = 0;
633 if (!still_running) {
634 gettimeofday(&timeval_start, NULL);
639 curl_multi_add_handle(download_mgr->
curl_multi_, handle);
640 curl_multi_socket_action(download_mgr->
curl_multi_,
657 if (download_mgr->
watch_fds_[i].revents & (POLLIN | POLLPRI))
658 ev_bitmask |= CURL_CSELECT_IN;
659 if (download_mgr->
watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
660 ev_bitmask |= CURL_CSELECT_OUT;
662 (POLLERR | POLLHUP | POLLNVAL))
664 ev_bitmask |= CURL_CSELECT_ERR;
668 curl_multi_socket_action(download_mgr->
curl_multi_,
678 while ((curl_msg = curl_multi_info_read(download_mgr->
curl_multi_,
681 if (curl_msg->msg == CURLMSG_DONE) {
684 CURL *easy_handle = curl_msg->easy_handle;
685 int curl_error = curl_msg->data.result;
686 curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
689 curl_easy_getinfo(easy_handle, CURLINFO_REDIRECT_COUNT, &redir_count);
691 "Number of CURL redirects %" PRId64 ,
692 download_mgr->
name_.c_str(), info->
id(),
695 curl_multi_remove_handle(download_mgr->
curl_multi_, easy_handle);
697 curl_multi_add_handle(download_mgr->
curl_multi_, easy_handle);
698 curl_multi_socket_action(download_mgr->
curl_multi_,
709 Write<download::Failures>(info->
error_code());
718 curl_multi_remove_handle(download_mgr->
curl_multi_, *i);
719 curl_easy_cleanup(*i);
725 "download I/O thread of DownloadManager '%s' terminated",
726 download_mgr->
name_.c_str());
734 HeaderLists::~HeaderLists() {
735 for (
unsigned i = 0; i < blocks_.size(); ++i) {
742 curl_slist *HeaderLists::GetList(
const char *header) {
747 curl_slist *HeaderLists::DuplicateList(curl_slist *slist) {
749 curl_slist *copy = GetList(slist->data);
750 copy->next = slist->next;
751 curl_slist *prev = copy;
754 curl_slist *new_link = Get(slist->data);
755 new_link->next = slist->next;
756 prev->next = new_link;
764 void HeaderLists::AppendHeader(curl_slist *slist,
const char *header) {
766 curl_slist *new_link = Get(header);
767 new_link->next = NULL;
771 slist->next = new_link;
780 void HeaderLists::CutHeader(
const char *header, curl_slist **slist) {
784 curl_slist *prev = &head;
785 curl_slist *rover = *slist;
787 if (strcmp(rover->data, header) == 0) {
788 prev->next = rover->next;
799 void HeaderLists::PutList(curl_slist *slist) {
801 curl_slist *next = slist->next;
808 string HeaderLists::Print(curl_slist *slist) {
811 verbose += string(slist->data) +
"\n";
818 curl_slist *HeaderLists::Get(
const char *header) {
819 for (
unsigned i = 0; i < blocks_.size(); ++i) {
820 for (
unsigned j = 0; j < kBlockSize; ++j) {
821 if (!IsUsed(&(blocks_[i][j]))) {
822 blocks_[i][j].data =
const_cast<char *
>(header);
823 return &(blocks_[i][j]);
830 blocks_[blocks_.size()-1][0].data =
const_cast<char *
>(header);
831 return &(blocks_[blocks_.size()-1][0]);
835 void HeaderLists::Put(curl_slist *slist) {
841 void HeaderLists::AddBlock() {
842 curl_slist *new_block =
new curl_slist[kBlockSize];
843 for (
unsigned i = 0; i < kBlockSize; ++i) {
846 blocks_.push_back(new_block);
853 string DownloadManager::ProxyInfo::Print() {
859 static_cast<int>(host.deadline()) - static_cast<int>(time(NULL));
860 string expinfo = (remaining >= 0) ?
"+" :
"";
861 if (abs(remaining) >= 3600) {
863 }
else if (abs(remaining) >= 60) {
869 result +=
" (" + host.name() +
", " + expinfo +
")";
871 result +=
" (:unresolved:, " + expinfo +
")";
881 CURL *DownloadManager::AcquireCurlHandle() {
884 if (pool_handles_idle_->empty()) {
886 handle = curl_easy_init();
889 curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
894 handle = *(pool_handles_idle_->begin());
895 pool_handles_idle_->erase(pool_handles_idle_->begin());
898 pool_handles_inuse_->insert(handle);
904 void DownloadManager::ReleaseCurlHandle(CURL *handle) {
905 set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
906 assert(elem != pool_handles_inuse_->end());
908 if (pool_handles_idle_->size() > pool_max_handles_) {
909 curl_easy_cleanup(*elem);
911 pool_handles_idle_->insert(*elem);
914 pool_handles_inuse_->erase(elem);
922 void DownloadManager::InitializeRequest(
JobInfo *info, CURL *handle) {
932 info->
SetHeaders(header_lists_->DuplicateList(default_headers_));
936 if (enable_http_tracing_) {
937 for (
unsigned int i = 0; i < http_tracing_headers_.size(); i++) {
938 header_lists_->AppendHeader(info->
headers(),
939 (http_tracing_headers_)[i].c_str());
947 "CURL Header for URL: %s is:\n %s",
948 name_.c_str(), info->
id(), info->
url()->c_str(),
949 header_lists_->Print(info->
headers()).c_str());
966 char byte_range_array[100];
967 const int64_t range_lower =
static_cast<int64_t
>(info->
range_offset());
968 const int64_t range_upper =
static_cast<int64_t
>(
970 if (snprintf(byte_range_array,
sizeof(byte_range_array),
971 "%" PRId64
"-%" PRId64,
972 range_lower, range_upper) == 100)
976 curl_easy_setopt(handle, CURLOPT_RANGE, byte_range_array);
978 curl_easy_setopt(handle, CURLOPT_RANGE, NULL);
982 curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
983 curl_easy_setopt(handle, CURLOPT_WRITEHEADER,
984 static_cast<void *>(info));
985 curl_easy_setopt(handle, CURLOPT_WRITEDATA, static_cast<void *>(info));
986 curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->
headers());
988 curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
990 curl_easy_setopt(handle, CURLOPT_HTTPGET, 1);
992 if (opt_ipv4_only_) {
993 curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
995 if (follow_redirects_) {
996 curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1);
997 curl_easy_setopt(handle, CURLOPT_MAXREDIRS, 4);
1000 curl_easy_setopt(handle, CURLOPT_VERBOSE, 1);
1001 curl_easy_setopt(handle, CURLOPT_DEBUGFUNCTION, CallbackCurlDebug);
1010 void DownloadManager::SetUrlOptions(
JobInfo *info) {
1017 if (sharding_policy_.UseCount() > 0) {
1018 if (info->
proxy() !=
"") {
1025 curl_easy_setopt(info->
curl_handle(), CURLOPT_PROXY, info->
proxy().c_str());
1028 if (opt_timestamp_backup_proxies_ > 0) {
1029 const time_t now = time(NULL);
1030 if (static_cast<int64_t>(now) >
1031 static_cast<int64_t>(opt_timestamp_backup_proxies_ +
1032 opt_proxy_groups_reset_after_))
1034 opt_proxy_groups_current_ = 0;
1035 opt_timestamp_backup_proxies_ = 0;
1036 RebalanceProxiesUnlocked(
"Reset proxy group from backup to primary");
1040 if (opt_timestamp_failover_proxies_ > 0) {
1041 const time_t now = time(NULL);
1042 if (static_cast<int64_t>(now) >
1043 static_cast<int64_t>(opt_timestamp_failover_proxies_ +
1044 opt_proxy_groups_reset_after_))
1046 RebalanceProxiesUnlocked(
1047 "Reset load-balanced proxies within the active group");
1051 if (opt_timestamp_backup_host_ > 0) {
1052 const time_t now = time(NULL);
1053 if (static_cast<int64_t>(now) >
1054 static_cast<int64_t>(opt_timestamp_backup_host_ +
1055 opt_host_reset_after_))
1058 "(manager %s - id %" PRId64
") "
1059 "switching host from %s to %s (reset host)", name_.c_str(),
1060 info->
id(), (*opt_host_chain_)[opt_host_chain_current_].c_str(),
1061 (*opt_host_chain_)[0].c_str());
1062 opt_host_chain_current_ = 0;
1063 opt_timestamp_backup_host_ = 0;
1068 if (!proxy || (proxy->
url ==
"DIRECT")) {
1070 curl_easy_setopt(info->
curl_handle(), CURLOPT_PROXY,
"");
1075 std::string purl = proxy->
url;
1077 const bool changed = ValidateProxyIpsUnlocked(purl, phost);
1084 curl_easy_setopt(info->
curl_handle(), CURLOPT_PROXY,
1085 info->
proxy().c_str());
1088 curl_easy_setopt(info->
curl_handle(), CURLOPT_PROXY,
"0.0.0.0");
1093 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, opt_low_speed_limit_);
1094 if (info->
proxy() !=
"DIRECT") {
1095 curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_proxy_);
1096 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_proxy_);
1098 curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_direct_);
1099 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_direct_);
1101 if (!opt_dns_server_.empty())
1102 curl_easy_setopt(curl_handle, CURLOPT_DNS_SERVERS, opt_dns_server_.c_str());
1105 url_prefix = (*opt_host_chain_)[opt_host_chain_current_];
1109 string url = url_prefix + *(info->
url());
1111 curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 1L);
1112 if (url.substr(0, 5) ==
"https") {
1113 bool rvb = ssl_certificate_store_.ApplySslCertificatePath(curl_handle);
1116 "(manager %s - id %" PRId64
") "
1117 "Failed to set SSL certificate path %s", name_.c_str(),
1118 info->
id(), ssl_certificate_store_.GetCaPath().c_str());
1120 if (info->
pid() != -1) {
1121 if (credentials_attachment_ == NULL) {
1123 "uses secure downloads but no credentials attachment set",
1124 name_.c_str(), info->
id());
1126 bool retval = credentials_attachment_->ConfigureCurlHandle(
1130 "failed attaching credentials",
1131 name_.c_str(), info->
id());
1139 signal(SIGPIPE, SIG_IGN);
1142 if (url.find(
"@proxy@") != string::npos) {
1150 if (proxy_template_forced_ !=
"") {
1151 replacement = proxy_template_forced_;
1152 }
else if (info->
proxy() ==
"DIRECT") {
1153 replacement = proxy_template_direct_;
1155 if (opt_proxy_groups_current_ >= opt_proxy_groups_fallback_) {
1159 curl_easy_setopt(info->
curl_handle(), CURLOPT_PROXY,
"");
1160 replacement = proxy_template_direct_;
1162 replacement = ChooseProxyUnlocked(info->
expected_hash())->host.name();
1165 replacement = (replacement ==
"") ? proxy_template_direct_ : replacement;
1167 "replacing @proxy@ by %s",
1168 name_.c_str(), info->
id(), replacement.c_str());
1169 url =
ReplaceAll(url,
"@proxy@", replacement);
1190 curl_easy_setopt(curl_handle, CURLOPT_URL,
1191 EscapeUrl(info->
id(), url).c_str());
1204 bool DownloadManager::ValidateProxyIpsUnlocked(
1211 name_.c_str(), host.
name().c_str());
1213 unsigned group_idx = opt_proxy_groups_current_;
1216 bool update_only =
true;
1220 "(manager '%s') failed to resolve IP addresses for %s (%d - %s)",
1221 name_.c_str(), host.
name().c_str(), new_host.
status(),
1225 update_only =
false;
1229 for (
unsigned i = 0; i < (*opt_proxy_groups_)[group_idx].size(); ++i) {
1230 if ((*opt_proxy_groups_)[group_idx][i].host.id() == host.
id())
1231 (*opt_proxy_groups_)[group_idx][i].host = new_host;
1240 "(manager '%s') DNS entries for proxy %s changed, adjusting",
1241 name_.c_str(), host.
name().c_str());
1242 vector<ProxyInfo> *group = current_proxy_group();
1243 opt_num_proxies_ -= group->size();
1244 for (
unsigned i = 0; i < group->size(); ) {
1245 if ((*group)[i].host.id() == host.
id()) {
1246 group->erase(group->begin() + i);
1251 vector<ProxyInfo> new_infos;
1253 set<string>::const_iterator iter_ips = best_addresses.begin();
1254 for (; iter_ips != best_addresses.end(); ++iter_ips) {
1256 new_infos.push_back(
ProxyInfo(new_host, url_ip));
1258 group->insert(group->end(), new_infos.begin(), new_infos.end());
1259 opt_num_proxies_ += new_infos.size();
1261 std::string msg =
"DNS entries for proxy " + host.
name() +
" changed";
1263 RebalanceProxiesUnlocked(msg);
1271 void DownloadManager::UpdateStatistics(CURL *handle) {
1276 retval = curl_easy_getinfo(handle, CURLINFO_SIZE_DOWNLOAD, &val);
1277 assert(retval == CURLE_OK);
1278 sum +=
static_cast<int64_t
>(val);
1282 perf::Xadd(counters_->sz_transferred_bytes, sum);
1289 bool DownloadManager::CanRetry(
const JobInfo *info) {
1291 unsigned max_retries = opt_max_retries_;
1306 unsigned backoff_init_ms = 0;
1307 unsigned backoff_max_ms = 0;
1310 backoff_init_ms = opt_backoff_init_ms_;
1311 backoff_max_ms = opt_backoff_max_ms_;
1326 "(manager '%s' - id %" PRId64
") backing off for %d ms",
1334 header_lists_->AppendHeader(info->
headers(),
"Pragma: no-cache");
1335 header_lists_->AppendHeader(info->
headers(),
"Cache-Control: no-cache");
1345 void DownloadManager::SetRegularCache(
JobInfo *info) {
1348 header_lists_->CutHeader(
"Pragma: no-cache", info->
GetHeadersPtr());
1349 header_lists_->CutHeader(
"Cache-Control: no-cache", info->
GetHeadersPtr());
1358 void DownloadManager::ReleaseCredential(
JobInfo *info) {
1360 assert(credentials_attachment_ != NULL);
1361 credentials_attachment_->ReleaseCurlHandle(info->
curl_handle(),
1374 bool DownloadManager::VerifyAndFinalize(
const int curl_error,
JobInfo *info) {
1376 "Verify downloaded url %s, proxy %s (curl error %d)",
1377 name_.c_str(), info->
id(), info->
url()->c_str(),
1378 info->
proxy().c_str(), curl_error);
1382 switch (curl_error) {
1389 if (ignore_signature_failures_) {
1391 "(manager '%s' - id %" PRId64
") "
1392 "ignoring failed hash verification of %s (expected %s, got %s)",
1393 name_.c_str(), info->
id(), info->
url()->c_str(),
1398 "hash verification of %s failed (expected %s, got %s)",
1399 name_.c_str(), info->
id(), info->
url()->c_str(),
1410 case CURLE_UNSUPPORTED_PROTOCOL:
1413 case CURLE_URL_MALFORMAT:
1416 case CURLE_COULDNT_RESOLVE_PROXY:
1419 case CURLE_COULDNT_RESOLVE_HOST:
1422 case CURLE_OPERATION_TIMEDOUT:
1426 case CURLE_PARTIAL_FILE:
1427 case CURLE_GOT_NOTHING:
1428 case CURLE_RECV_ERROR:
1432 case CURLE_FILE_COULDNT_READ_FILE:
1433 case CURLE_COULDNT_CONNECT:
1434 if (info->
proxy() !=
"DIRECT") {
1441 case CURLE_TOO_MANY_REDIRECTS:
1444 case CURLE_SSL_CACERT_BADFILE:
1446 "(manager '%s' -id %" PRId64
") "
1447 "Failed to load certificate bundle. "
1448 "X509_CERT_BUNDLE might point to the wrong location.",
1449 name_.c_str(), info->
id());
1454 case CURLE_PEER_FAILED_VERIFICATION:
1456 "(manager '%s' - id %" PRId64
") "
1457 "invalid SSL certificate of remote host. "
1458 "X509_CERT_DIR and/or X509_CERT_BUNDLE might point to the wrong "
1459 "location.", name_.c_str(), info->
id());
1462 case CURLE_ABORTED_BY_CALLBACK:
1463 case CURLE_WRITE_ERROR:
1466 case CURLE_SEND_ERROR:
1475 "unexpected curl error (%d) while trying to fetch %s",
1476 name_.c_str(), info->
id(), curl_error, info->
url()->c_str());
1481 std::vector<std::string> *host_chain = opt_host_chain_;
1484 bool try_again =
false;
1485 bool same_url_retry = CanRetry(info);
1494 "(manager '%s' - id %" PRId64
") "
1495 "data corruption with no-cache header, try another host",
1496 name_.c_str(), info->
id());
1501 if ( same_url_retry || (
1511 if ( same_url_retry || (
1517 if (sharding_policy_.UseCount() > 0) {
1519 same_url_retry =
false;
1530 if (opt_proxy_groups_) {
1531 if ((opt_proxy_groups_current_ > 0) ||
1532 (opt_proxy_groups_current_burned_ > 0))
1534 opt_proxy_groups_current_ = 0;
1535 opt_timestamp_backup_proxies_ = 0;
1536 RebalanceProxiesUnlocked(
"reset proxies for host failover");
1542 "(manager '%s' - id %" PRId64
") make it a host failure",
1543 name_.c_str(), info->
id());
1547 if (failover_indefinitely_) {
1551 "(manager '%s' - id %" PRId64
") "
1552 "VerifyAndFinalize() would fail the download here. "
1553 "Instead switch proxy and retry download. "
1554 "info->probe_hosts=%d host_chain=%p info->num_used_hosts=%d "
1555 "host_chain->size()=%lu same_url_retry=%d "
1556 "info->num_used_proxies=%d opt_num_proxies_=%d",
1557 name_.c_str(), info->
id(),
1561 host_chain->size() : -1,
static_cast<int>(same_url_retry),
1564 RebalanceProxiesUnlocked(
1565 "download failed - failover indefinitely");
1578 "Trying again on same curl handle, same url: %d, "
1579 "error code %d no-cache %d",
1580 name_.c_str(), info->
id(), same_url_retry,
1585 goto verify_and_finalize_stop;
1589 goto verify_and_finalize_stop;
1599 if (sharding_policy_.UseCount() > 0) {
1600 ReleaseCredential(info);
1601 SetUrlOptions(info);
1603 SetRegularCache(info);
1606 bool switch_proxy =
false;
1607 bool switch_host =
false;
1614 switch_proxy =
true;
1623 if (same_url_retry) {
1626 switch_proxy =
true;
1629 if (same_url_retry) {
1640 ReleaseCredential(info);
1643 SetUrlOptions(info);
1646 ReleaseCredential(info);
1649 SetUrlOptions(info);
1653 if (failover_indefinitely_) {
1661 verify_and_finalize_stop:
1663 ReleaseCredential(info);
1672 header_lists_->PutList(info->
headers());
1679 DownloadManager::~DownloadManager() {
1681 if (sharding_policy_.UseCount() > 0) {
1682 sharding_policy_.Reset();
1684 if (health_check_.UseCount() > 0) {
1685 if (health_check_.Unique()) {
1687 "(manager '%s') Stopping healthcheck thread", name_.c_str());
1688 health_check_->StopHealthcheck();
1690 health_check_.Reset();
1693 if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1696 pthread_join(thread_download_, NULL);
1698 pipe_terminate_.Destroy();
1699 pipe_jobs_.Destroy();
1702 for (set<CURL *>::iterator i = pool_handles_idle_->begin(),
1703 iEnd = pool_handles_idle_->end(); i != iEnd; ++i)
1705 curl_easy_cleanup(*i);
1708 delete pool_handles_idle_;
1709 delete pool_handles_inuse_;
1710 curl_multi_cleanup(curl_multi_);
1712 delete header_lists_;
1717 delete opt_host_chain_;
1718 delete opt_host_chain_rtt_;
1719 delete opt_proxy_groups_;
1721 curl_global_cleanup();
1725 pthread_mutex_destroy(lock_options_);
1726 pthread_mutex_destroy(lock_synchronous_mode_);
1727 free(lock_options_);
1728 free(lock_synchronous_mode_);
1731 void DownloadManager::InitHeaders() {
1733 string cernvm_id =
"User-Agent: cvmfs ";
1734 #ifdef CVMFS_LIBCVMFS
1735 cernvm_id +=
"libcvmfs ";
1737 cernvm_id +=
"Fuse ";
1739 cernvm_id += string(VERSION);
1740 if (getenv(
"CERNVM_UUID") != NULL) {
1744 user_agent_ = strdup(cernvm_id.c_str());
1748 default_headers_ = header_lists_->GetList(
"Connection: Keep-Alive");
1749 header_lists_->AppendHeader(default_headers_,
"Pragma:");
1750 header_lists_->AppendHeader(default_headers_, user_agent_);
1753 DownloadManager::DownloadManager(
const unsigned max_pool_handles,
1755 const std::string &name) :
1757 pool_handles_idle_(new set<CURL *>),
1758 pool_handles_inuse_(new set<CURL *>),
1759 pool_max_handles_(max_pool_handles),
1760 pipe_terminate_(NULL),
1764 watch_fds_inuse_(0),
1765 watch_fds_max_(4 * max_pool_handles),
1766 opt_timeout_proxy_(5),
1767 opt_timeout_direct_(10),
1768 opt_low_speed_limit_(1024),
1769 opt_max_retries_(0),
1770 opt_backoff_init_ms_(0),
1771 opt_backoff_max_ms_(0),
1772 enable_info_header_(false),
1773 opt_ipv4_only_(false),
1774 follow_redirects_(false),
1775 ignore_signature_failures_(false),
1776 enable_http_tracing_(false),
1777 opt_host_chain_(NULL),
1778 opt_host_chain_rtt_(NULL),
1779 opt_host_chain_current_(0),
1780 opt_proxy_groups_(NULL),
1781 opt_proxy_groups_current_(0),
1782 opt_proxy_groups_current_burned_(0),
1783 opt_proxy_groups_fallback_(0),
1784 opt_num_proxies_(0),
1785 opt_proxy_shard_(false),
1786 failover_indefinitely_(false),
1789 opt_timestamp_backup_proxies_(0),
1790 opt_timestamp_failover_proxies_(0),
1791 opt_proxy_groups_reset_after_(0),
1792 opt_timestamp_backup_host_(0),
1793 opt_host_reset_after_(0),
1794 credentials_attachment_(NULL),
1795 counters_(new
Counters(statistics))
1800 reinterpret_cast<pthread_mutex_t *
>(smalloc(
sizeof(pthread_mutex_t)));
1804 reinterpret_cast<pthread_mutex_t *
>(smalloc(
sizeof(pthread_mutex_t)));
1808 retval = curl_global_init(CURL_GLOBAL_ALL);
1809 assert(retval == CURLE_OK);
1816 curl_multi_setopt(
curl_multi_, CURLMOPT_SOCKETDATA,
1817 static_cast<void *>(
this));
1819 curl_multi_setopt(
curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1825 if ((getenv(
"CVMFS_IPV4_ONLY") != NULL) &&
1826 (strlen(getenv(
"CVMFS_IPV4_ONLY")) > 0))
1844 static_cast<void *>(
this));
1851 "(manager '%s') Starting healthcheck thread",
name_.c_str());
1879 const char *header_name =
"cvmfs-info: ";
1880 const size_t header_name_len = strlen(header_name);
1881 const unsigned header_size = 1 + header_name_len +
1883 info->
SetInfoHeader(static_cast<char *>(alloca(header_size)));
1884 memcpy(info->
info_header(), header_name, header_name_len);
1886 header_size - header_name_len);
1891 const std::string str_pid =
"X-CVMFS-PID: " +
StringifyInt(info->
pid());
1937 retval = curl_easy_perform(handle);
1940 if (curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &elapsed) == CURLE_OK)
1943 static_cast<int64_t>(elapsed * 1000));
1952 "download failed (error %d - %s)",
1956 if (info->
sink() != NULL) {
1986 if (!address.empty()) {
1991 vector<string> servers;
1992 servers.push_back(address);
1997 name_.c_str(), address.c_str());
2006 const unsigned timeout_ms)
2023 const unsigned min_seconds,
2024 const unsigned max_seconds)
2044 const unsigned seconds_direct)
2067 unsigned *seconds_direct)
2091 if (host_list.empty()) {
2111 unsigned *current_host)
2139 const unsigned group_size = group->size();
2140 unsigned failed = 0;
2142 if (info && (info->
proxy() == (*group)[i].url)) {
2144 opt_proxy_groups_current_burned_++;
2146 (*group)[group_size - opt_proxy_groups_current_burned_]);
2158 if (opt_proxy_groups_current_burned_ == group->size()) {
2159 opt_proxy_groups_current_burned_ = 0;
2188 "%lu proxies remain in group",
name_.c_str(), info->
id(),
2207 "(manager '%s' - id %" PRId64
")"
2208 "don't switch host, "
2209 "last used host: %s, current host: %s",
name_.c_str(), info->
id(),
2215 string reason =
"manually triggered";
2216 string info_id =
"(manager " +
name_;
2228 "%s switching host from %s to %s (%s)", info_id.c_str(),
2255 vector<string> host_chain;
2256 vector<int> host_rtt;
2257 unsigned current_host;
2259 GetHostInfo(&host_chain, &host_rtt, ¤t_host);
2266 JobInfo info(&url,
false,
false, NULL, &memsink);
2267 for (retries = 0; retries < 2; ++
retries) {
2268 for (i = 0; i < host_chain.size(); ++i) {
2269 url = host_chain[i] +
"/.cvmfspublished";
2271 struct timeval tv_start, tv_end;
2272 gettimeofday(&tv_start, NULL);
2274 gettimeofday(&tv_end, NULL);
2277 host_rtt[i] =
static_cast<int>(
2280 "probing host %s had %dms rtt",
2282 url.c_str(), host_rtt[i]);
2285 "error while probing host %s: %d %s",
2288 host_rtt[i] = INT_MAX;
2294 for (i = 0; i < host_chain.size(); ++i) {
2295 if (host_rtt[i] == INT_MAX) host_rtt[i] =
kProbeDown;
2307 std::vector<uint64_t> *output_order) {
2308 if (!servers) {
return false;}
2309 if (servers->size() == 1) {
2311 output_order->clear();
2312 output_order->push_back(0);
2317 std::vector<std::string> host_chain;
2320 std::vector<std::string> server_dns_names;
2321 server_dns_names.reserve(servers->size());
2322 for (
unsigned i = 0; i < servers->size(); ++i) {
2324 server_dns_names.push_back(host.empty() ? (*servers)[i] : host);
2326 std::string host_list =
JoinStrings(server_dns_names,
",");
2328 vector<string> host_chain_shuffled;
2336 bool success =
false;
2337 unsigned max_attempts = std::min(host_chain_shuffled.size(), size_t(3));
2338 vector<uint64_t> geo_order(servers->size());
2339 for (
unsigned i = 0; i < max_attempts; ++i) {
2340 string url = host_chain_shuffled[i] +
"/api/v1.0/geo/@proxy@/" + host_list;
2342 "(manager '%s') requesting ordered server list from %s",
2343 name_.c_str(), url.c_str());
2345 JobInfo info(&url,
false,
false, NULL, &memsink);
2348 string order(reinterpret_cast<char*>(memsink.data()), memsink.pos());
2353 "(manager '%s') retrieved invalid GeoAPI reply from %s [%s]",
2354 name_.c_str(), url.c_str(), order.c_str());
2357 "geographic order of servers retrieved from %s",
2362 Trim(order,
true ).c_str());
2368 "(manager '%s') GeoAPI request %s failed with error %d [%s]",
2374 "failed to retrieve geographic order from stratum 1 servers",
2380 output_order->swap(geo_order);
2382 std::vector<std::string> sorted_servers;
2383 sorted_servers.reserve(geo_order.size());
2384 for (
unsigned i = 0; i < geo_order.size(); ++i) {
2385 uint64_t orderval = geo_order[i];
2386 sorted_servers.push_back((*servers)[orderval]);
2388 servers->swap(sorted_servers);
2402 vector<string> host_chain;
2403 vector<int> host_rtt;
2404 unsigned current_host;
2405 vector< vector<ProxyInfo> > proxy_chain;
2406 unsigned fallback_group;
2408 GetHostInfo(&host_chain, &host_rtt, ¤t_host);
2410 if ((host_chain.size() < 2) && ((proxy_chain.size() - fallback_group) < 2))
2413 vector<string> host_names;
2414 for (
unsigned i = 0; i < host_chain.size(); ++i)
2416 SortTeam(&host_names, &host_chain);
2417 unsigned last_geo_host = host_names.size();
2419 if ((fallback_group == 0) && (last_geo_host > 1)) {
2425 host_names.push_back(
"+PXYSEP+");
2429 unsigned first_geo_fallback = host_names.size();
2430 for (
unsigned i = fallback_group; i < proxy_chain.size(); ++i) {
2433 host_names.push_back(proxy_chain[i][0].host.name());
2436 std::vector<uint64_t> geo_order;
2451 vector<vector<ProxyInfo> > *proxy_groups =
new vector<vector<ProxyInfo> >(
2455 (*proxy_groups)[i] = (*opt_proxy_groups_)[i];
2465 for (
unsigned i = 0; i < geo_order.size(); ++i) {
2466 uint64_t orderval = geo_order[i];
2467 if (orderval < static_cast<uint64_t>(last_geo_host)) {
2470 (*opt_host_chain_)[hosti++] = host_chain[orderval];
2471 }
else if (orderval >= static_cast<uint64_t>(first_geo_fallback)) {
2475 (*proxy_groups)[proxyi] =
2476 proxy_chain[fallback_group + orderval - first_geo_fallback];
2514 const string &reply_order,
2515 const unsigned expected_size,
2516 vector<uint64_t> *reply_vals)
2518 if (reply_order.empty())
2521 if (!sanitizer.
IsValid(reply_order))
2524 vector<string> reply_strings =
2526 vector<uint64_t> tmp_vals;
2527 for (
unsigned i = 0; i < reply_strings.size(); ++i) {
2528 if (reply_strings[i].empty())
2532 if (tmp_vals.size() != expected_size)
2536 set<uint64_t> coverage(tmp_vals.begin(), tmp_vals.end());
2537 if (coverage.size() != tmp_vals.size())
2539 if ((*coverage.begin() != 1) || (*coverage.rbegin() != coverage.size()))
2542 for (
unsigned i = 0; i < expected_size; ++i) {
2543 (*reply_vals)[i] = tmp_vals[i] - 1;
2554 const string &proxy_list,
2555 string *cleaned_list)
2558 if (proxy_list ==
"") {
2562 bool result =
false;
2564 vector<string> proxy_groups =
SplitString(proxy_list,
';');
2565 vector<string> cleaned_groups;
2566 for (
unsigned i = 0; i < proxy_groups.size(); ++i) {
2567 vector<string> group =
SplitString(proxy_groups[i],
'|');
2568 vector<string> cleaned;
2569 for (
unsigned j = 0; j < group.size(); ++j) {
2570 if ((group[j] ==
"DIRECT") || (group[j] ==
"")) {
2573 cleaned.push_back(group[j]);
2576 if (!cleaned.empty())
2577 cleaned_groups.push_back(
JoinStrings(cleaned,
"|"));
2594 const string &proxy_list,
2595 const string &fallback_proxy_list,
2604 bool contains_direct;
2613 if (contains_direct) {
2615 "(manager '%s') fallback proxies do not support DIRECT, removing",
2618 if (set_proxy_fallback_list ==
"") {
2622 if (contains_direct) {
2624 "(manager '%s') skipping DIRECT proxy to use fallback proxy",
2634 if ((set_proxy_list ==
"") && (set_proxy_fallback_list ==
"")) {
2645 if (set_proxy_list !=
"") {
2649 "first fallback proxy group %u",
2654 string all_proxy_list = set_proxy_list;
2655 if (set_proxy_fallback_list !=
"") {
2656 if (all_proxy_list !=
"")
2657 all_proxy_list +=
";";
2658 all_proxy_list += set_proxy_fallback_list;
2661 name_.c_str(), all_proxy_list.c_str());
2664 vector<string> hostnames;
2665 vector<string> proxy_groups;
2666 if (all_proxy_list !=
"")
2668 for (
unsigned i = 0; i < proxy_groups.size(); ++i) {
2669 vector<string> this_group =
SplitString(proxy_groups[i],
'|');
2670 for (
unsigned j = 0; j < this_group.size(); ++j) {
2676 hostnames.push_back(hostname);
2679 vector<dns::Host> hosts;
2681 "resolving %lu proxy addresses",
2682 name_.c_str(), hostnames.size());
2689 unsigned num_proxy = 0;
2690 for (
unsigned i = 0; i < proxy_groups.size(); ++i) {
2691 vector<string> this_group =
SplitString(proxy_groups[i],
'|');
2695 vector<ProxyInfo> infos;
2696 for (
unsigned j = 0; j < this_group.size(); ++j, ++num_proxy) {
2698 if (this_group[j] ==
"DIRECT") {
2705 "failed to resolve IP addresses for %s (%d - %s)",
name_.c_str(),
2706 hosts[num_proxy].name().c_str(), hosts[num_proxy].status(),
2710 infos.push_back(
ProxyInfo(failed_host, this_group[j]));
2715 set<string> best_addresses =
2717 set<string>::const_iterator iter_ips = best_addresses.begin();
2718 for (; iter_ips != best_addresses.end(); ++iter_ips) {
2720 infos.push_back(
ProxyInfo(hosts[num_proxy], url_ip));
2728 opt_num_proxies_ += infos.size();
2731 "(manager '%s') installed %u proxies in %lu load-balance groups",
2751 unsigned *current_group,
2752 unsigned *fallback_group)
2754 assert(proxy_chain != NULL);
2759 vector< vector<ProxyInfo> > empty_chain;
2760 *proxy_chain = empty_chain;
2761 if (current_group != NULL)
2763 if (fallback_group != NULL)
2764 *fallback_group = 0;
2769 if (current_group != NULL)
2771 if (fallback_group != NULL)
2791 uint32_t key = (hash ? hash->
Partial32() : 0);
2792 map<uint32_t, ProxyInfo *>::iterator it =
opt_proxy_map_.lower_bound(key);
2813 const uint32_t max_key = 0xffffffffUL;
2816 for (
unsigned i = 0; i < num_alive; ++i) {
2823 const std::pair<uint32_t, ProxyInfo *> entry(prng.
Next(max_key), proxy);
2830 const std::pair<uint32_t, ProxyInfo *> last_entry(max_key, first_proxy);
2834 unsigned select =
prng_.
Next(num_alive);
2836 const std::pair<uint32_t, ProxyInfo *> entry(max_key, proxy);
2844 if (new_proxy != old_proxy) {
2846 "(manager '%s') switching proxy from %s to %s. Reason: %s",
2847 name_.c_str(), (old_proxy.empty() ?
"(none)" : old_proxy.c_str()),
2848 (new_proxy.empty() ?
"(none)" : new_proxy.c_str()),
2895 std::string msg =
"switch to proxy group " +
2921 const unsigned backoff_init_ms,
2922 const unsigned backoff_max_ms)
2938 const std::string &direct,
2939 const std::string &forced)
2973 bool success =
false;
2977 "Proposed sharding policy does not exist. Falling back to default",
unsigned opt_timeout_direct_
std::vector< std::string > http_tracing_headers_
bool StripDirect(const std::string &proxy_list, std::string *cleaned_list)
unsigned opt_low_speed_limit_
void HashString(const std::string &content, Any *any_digest)
static const unsigned kDnsDefaultTimeoutMs
bool ignore_signature_failures_
std::vector< T > Shuffle(const std::vector< T > &input, Prng *prng)
unsigned throttle() const
z_stream * GetZstreamPtr()
unsigned opt_backoff_init_ms_
void SetInfoHeader(char *info_header)
bool enable_http_tracing_
shash::ContextPtr * GetHashContextPtr()
int64_t Xadd(class Counter *counter, const int64_t delta)
const char * Code2Ascii(const Failures error)
unsigned opt_proxy_groups_current_burned_
double DiffTimeSeconds(struct timeval start, struct timeval end)
unsigned opt_proxy_groups_reset_after_
virtual bool IsCanceled()
void SetUrlOptions(JobInfo *info)
SharedPtr< ShardingPolicy > sharding_policy_
StreamStates DecompressZStream2Sink(const void *buf, const int64_t size, z_stream *strm, cvmfs::Sink *sink)
void ResolveMany(const std::vector< std::string > &names, std::vector< Host > *hosts)
std::string opt_proxy_fallback_list_
void SetHostChain(const std::string &host_list)
static NormalResolver * Create(const bool ipv4_only, const unsigned retries, const unsigned timeout_ms)
unsigned opt_host_reset_after_
void SetLowSpeedLimit(const unsigned low_speed_limit)
DownloadManager(const unsigned max_pool_handles, const perf::StatisticsTemplate &statistics, const std::string &name="standard")
std::string proxy_template_direct_
curl_slist ** GetHeadersPtr()
static const int kProbeGeo
string Trim(const string &raw, bool trim_newline)
string ReplaceAll(const string &haystack, const string &needle, const string &replace_by)
void set_min_ttl(unsigned seconds)
string JoinStrings(const vector< string > &strings, const string &joint)
std::string ToString(const bool with_suffix=false) const
unsigned opt_proxy_groups_current_
virtual bool RequiresReserve()=0
bool ValidateGeoReply(const std::string &reply_order, const unsigned expected_size, std::vector< uint64_t > *reply_vals)
std::vector< ProxyInfo > * current_proxy_group() const
void DecompressInit(z_stream *strm)
const std::string * url() const
time_t opt_timestamp_backup_proxies_
void SetProxyChain(const std::string &proxy_list, const std::string &fallback_proxy_list, const ProxySetModes set_mode)
std::string GetProxyList()
bool allow_failure() const
std::set< CURL * > * pool_handles_inuse_
CURL * curl_handle() const
pthread_mutex_t * lock_options_
ProxyInfo * ChooseProxyUnlocked(const shash::Any *hash)
pthread_t thread_download_
std::string opt_proxy_list_
const std::string & name() const
perf::Counter * sz_transfer_time
void SetTracingHeaderGid(char *tracing_header_gid)
std::vector< std::vector< ProxyInfo > > * opt_proxy_groups_
assert((mem||(size==0))&&"Out Of Memory")
void SetNocache(bool nocache)
unsigned opt_proxy_groups_fallback_
void ReleaseCurlHandle(CURL *handle)
void SetTracingHeaderPid(char *tracing_header_pid)
void set_max_ttl(unsigned seconds)
Tube< DataTubeElement > * GetDataTubePtr()
char * tracing_header_gid() const
const std::set< std::string > & ViewBestAddresses(IpPreference preference) const
char * info_header() const
void SetDnsServer(const std::string &address)
DownloadManager * Clone(const perf::StatisticsTemplate &statistics, const std::string &cloned_name)
bool force_nocache() const
std::string StringifyUint(const uint64_t value)
void DecompressFini(z_stream *strm)
virtual std::string Describe()=0
static void * MainDownload(void *data)
void InitSeed(const uint64_t seed)
void SetTimeout(const unsigned seconds_proxy, const unsigned seconds_direct)
std::string opt_dns_server_
uint32_t watch_fds_inuse_
off_t range_offset() const
unsigned char num_used_hosts() const
bool follow_redirects() const
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, void *info_link)
void Init(ContextPtr context)
bool FileExists(const std::string &path)
Pipe< kPipeDownloadJobsResults > * GetPipeJobResultPtr()
void SetDnsTtlLimits(const unsigned min_seconds, const unsigned max_seconds)
static Failures PrepareDownloadDestination(JobInfo *info)
void SetHttpCode(int http_code)
std::vector< std::string > opt_proxy_urls_
const char * Code2Ascii(const Failures error)
uint32_t pool_max_handles_
bool head_request() const
unsigned char num_retries() const
bool IsValidPipeJobResults()
void GetProxyInfo(std::vector< std::vector< ProxyInfo > > *proxy_chain, unsigned *current_group, unsigned *fallback_group)
void SetProxyGroupResetDelay(const unsigned seconds)
atomic_int32 multi_threaded_
void SetCurrentHostChainIndex(unsigned int current_host_chain_index)
std::string AddDefaultScheme(const std::string &proxy)
vector< string > SplitString(const string &str, char delim)
cvmfs::Sink * sink() const
dns::NormalResolver * resolver_
bool Interrupted(const std::string &fqrn, JobInfo *info)
std::string proxy() const
dns::IpPreference opt_ip_preference_
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, void *info_link)
unsigned int current_host_chain_index() const
void UseSystemCertificatePath()
bool SetShardingPolicy(const ShardingPolicySelector type)
perf::Counter * n_host_failover
void UpdateProxiesUnlocked(const std::string &reason)
bool IsEquivalent(const Host &other) const
bool IsProxyTransferError(const Failures error)
void SetNumRetries(unsigned char num_retries)
void set_throttle(const unsigned throttle)
InterruptCue * interrupt_cue() const
void SetIpPreference(const dns::IpPreference preference)
bool failover_indefinitely_
shash::ContextPtr hash_context() const
perf::Counter * n_requests
void Final(ContextPtr context, Any *any_digest)
void SetRetryParameters(const unsigned max_retries, const unsigned backoff_init_ms, const unsigned backoff_max_ms)
string StringifyInt(const int64_t value)
char * tracing_header_uid() const
Failures error_code() const
void CloneProxyConfig(DownloadManager *clone)
void SetMaxIpaddrPerProxy(unsigned limit)
void EnableIgnoreSignatureFailures()
CURL * AcquireCurlHandle()
void Inc(class Counter *counter)
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
std::string ExtractHost(const std::string &url)
std::vector< int > * opt_host_chain_rtt_
SslCertificateStore ssl_certificate_store_
time_t opt_timestamp_backup_host_
std::string GetFallbackProxyList()
uint32_t Partial32() const
void SetProxyTemplates(const std::string &direct, const std::string &forced)
unsigned opt_backoff_max_ms_
void SetErrorCode(Failures error_code)
void SetNumUsedProxies(unsigned char num_used_proxies)
unsigned GetContextSize(const Algorithms algorithm)
std::string GetDnsServer() const
unsigned opt_num_proxies_
unsigned opt_host_chain_current_
CredentialsAttachment * credentials_attachment_
std::vector< std::string > * opt_host_chain_
struct pollfd * watch_fds_
void Update(const unsigned char *buffer, const unsigned buffer_length, ContextPtr context)
std::map< uint32_t, ProxyInfo * > opt_proxy_map_
uint64_t String2Uint64(const string &value)
UniquePtr< Pipe< kPipeDownloadJobs > > pipe_jobs_
void UseSystemCertificatePath()
void CreatePipeJobResults()
Failures Fetch(JobInfo *info)
void SetCurlHandle(CURL *curl_handle)
unsigned opt_max_retries_
void SetTracingHeaderUid(char *tracing_header_uid)
curl_slist * headers() const
const shash::Any * expected_hash() const
perf::Counter * n_proxy_failover
void GetTimeout(unsigned *seconds_proxy, unsigned *seconds_direct)
void SetCredData(void *cred_data)
void SetFailoverIndefinitely()
std::string proxy_template_forced_
time_t opt_timestamp_failover_proxies_
void SetDnsParameters(const unsigned retries, const unsigned timeout_ms)
unsigned EscapeHeader(const std::string &header, char *escaped_buf, size_t buf_size)
const std::string * extra_info() const
static Host ExtendDeadline(const Host &original, unsigned seconds_from_now)
UniquePtr< Pipe< kPipeThreadTerminator > > pipe_terminate_
void SetProxy(const std::string &proxy)
static const int kProbeUnprobed
unsigned backoff_ms() const
unsigned char num_used_proxies() const
void SafeSleepMs(const unsigned ms)
bool IsHostTransferError(const Failures error)
static const unsigned kDnsDefaultRetries
void SortTeam(std::vector< T > *tractor, std::vector< U > *towed)
bool GeoSortServers(std::vector< std::string > *servers, std::vector< uint64_t > *output_order=NULL)
virtual bool Reserve(size_t size)=0
static const int kProbeDown
void SetNumUsedHosts(unsigned char num_used_hosts)
void SetHeaders(curl_slist *headers)
static const unsigned kProxyMapScale
void GetHostInfo(std::vector< std::string > *host_chain, std::vector< int > *rtt, unsigned *current_host)
unsigned opt_timeout_proxy_
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
char * tracing_header_pid() const
void SetBackoffMs(unsigned backoff_ms)
SharedPtr< HealthCheck > health_check_
void SwitchProxy(JobInfo *info)
void AddHTTPTracingHeader(const std::string &header)
void SetCredentialsAttachment(CredentialsAttachment *ca)
void SetFollowRedirects(bool follow_redirects)
void RebalanceProxiesUnlocked(const std::string &reason)
pthread_mutex_t * lock_synchronous_mode_
virtual bool SetResolvers(const std::vector< std::string > &resolvers)
void InitializeRequest(JobInfo *info, CURL *handle)
static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, void *userp, void *socketp)
string RewriteUrl(const string &url, const string &ip)
void SetHostResetDelay(const unsigned seconds)
uint32_t Next(const uint64_t boundary)
virtual int64_t Write(const void *buf, uint64_t sz)=0
unsigned timeout_ms() const
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)