29 #define __STDC_FORMAT_MACROS
31 #include "cvmfs_config.h"
94 std::string pause_file = std::string(
"/var/run/cvmfs/interrupt.") + fqrn;
97 "Interrupted(): checking for existence of %s", pause_file.c_str());
100 "Interrupting current download, this will EIO outstanding IO.");
101 if (0 != unlink(pause_file.c_str())) {
103 "Couldn't delete interrupt marker: errno=%d", errno);
116 " (errno=%d).", psink->
path().c_str(),
117 strerror(errno), errno);
136 const size_t num_bytes = size*nmemb;
137 const string header_line(static_cast<const char *>(ptr), num_bytes);
144 if (
HasPrefix(header_line,
"HTTP/1.",
false)) {
145 if (header_line.length() < 10) {
150 for (i = 8; (i < header_line.length()) && (header_line[i] ==
' '); ++i) {}
153 if (header_line.length() > i+2) {
154 info->
SetHttpCode(DownloadManager::ParseHttpCode(&header_line[i]));
166 header_line.c_str());
171 header_line.c_str());
197 HasPrefix(header_line,
"CONTENT-LENGTH:",
true))
199 char *tmp =
reinterpret_cast<char *
>(alloca(num_bytes+1));
201 sscanf(header_line.c_str(),
"%s %" PRIu64, tmp, &length);
205 "resource %s too large to store in memory (%" PRIu64
")",
206 info->
url()->c_str(), length);
214 }
else if (
HasPrefix(header_line,
"LOCATION:",
true)) {
217 }
else if (
HasPrefix(header_line,
"X-SQUID-ERROR:",
true)) {
222 }
else if (
HasPrefix(header_line,
"PROXY-STATUS:",
true)) {
225 (header_line.find(
"error=") != string::npos)) {
240 const size_t num_bytes = size*nmemb;
264 info->
url()->c_str());
269 "decompressing %s, local IO error", info->
url()->c_str());
274 int64_t written = info->
sink()->
Write(ptr, num_bytes);
275 if (written < 0 || static_cast<uint64_t>(written) != num_bytes) {
277 "Failed to perform write of %zu bytes to sink %s with errno %d",
286 static int CallbackCurlDebug(
293 const char *prefix =
"";
298 case CURLINFO_HEADER_IN:
299 prefix =
"{header/in} ";
301 case CURLINFO_HEADER_OUT:
302 prefix =
"{header/out} ";
304 case CURLINFO_DATA_IN:
307 case CURLINFO_DATA_OUT:
310 case CURLINFO_SSL_DATA_IN:
313 case CURLINFO_SSL_DATA_OUT:
320 std::string msg(data, size);
321 for (
size_t i = 0; i < msg.length(); ++i) {
327 prefix,
Trim(msg,
true ).c_str());
335 const int DownloadManager::kProbeUnprobed = -1;
336 const int DownloadManager::kProbeDown = -2;
337 const int DownloadManager::kProbeGeo = -3;
339 bool DownloadManager::EscapeUrlChar(
unsigned char input,
char output[3]) {
340 if (((input >=
'0') && (input <=
'9')) ||
341 ((input >=
'A') && (input <=
'Z')) ||
342 ((input >=
'a') && (input <=
'z')) ||
343 (input ==
'/') || (input ==
':') || (input ==
'.') ||
345 (input ==
'+') || (input ==
'-') ||
346 (input ==
'_') || (input ==
'~') ||
347 (input ==
'[') || (input ==
']') || (input ==
','))
349 output[0] =
static_cast<char>(input);
354 output[1] =
static_cast<char>(
355 (input / 16) + ((input / 16 <= 9) ?
'0' :
'A'-10));
356 output[2] =
static_cast<char>(
357 (input % 16) + ((input % 16 <= 9) ?
'0' :
'A'-10));
366 string DownloadManager::EscapeUrl(
const string &url) {
368 escaped.reserve(url.length());
370 char escaped_char[3];
371 for (
unsigned i = 0, s = url.length(); i < s; ++i) {
372 if (EscapeUrlChar(url[i], escaped_char)) {
373 escaped.append(escaped_char, 3);
375 escaped.push_back(escaped_char[0]);
379 url.c_str(), escaped.c_str());
388 unsigned DownloadManager::EscapeHeader(
const string &header,
392 unsigned esc_pos = 0;
393 char escaped_char[3];
394 for (
unsigned i = 0, s = header.size(); i < s; ++i) {
395 if (EscapeUrlChar(header[i], escaped_char)) {
396 for (
unsigned j = 0; j < 3; ++j) {
398 if (esc_pos >= buf_size)
400 escaped_buf[esc_pos] = escaped_char[j];
406 if (esc_pos >= buf_size)
408 escaped_buf[esc_pos] = escaped_char[0];
420 int DownloadManager::ParseHttpCode(
const char digits[3]) {
423 for (
int i = 0; i < 3; ++i) {
424 if ((digits[i] <
'0') || (digits[i] >
'9'))
426 result += (digits[i] -
'0') * factor;
436 int DownloadManager::CallbackCurlSocket(CURL * ,
445 if (action == CURL_POLL_NONE)
464 download_mgr->
watch_fds_ =
static_cast<struct pollfd *
>(
476 download_mgr->
watch_fds_[index].events = POLLIN | POLLPRI;
479 download_mgr->
watch_fds_[index].events = POLLOUT | POLLWRBAND;
481 case CURL_POLL_INOUT:
483 POLLIN | POLLPRI | POLLOUT | POLLWRBAND;
485 case CURL_POLL_REMOVE:
486 if (index < download_mgr->watch_fds_inuse_-1) {
498 download_mgr->
watch_fds_ =
static_cast<struct pollfd *
>(
516 void *DownloadManager::MainDownload(
void *data) {
520 const int kIdxPipeTerminate = 0;
521 const int kIdxPipeJobs = 1;
524 static_cast<struct pollfd *
>(smalloc(2 *
sizeof(
struct pollfd)));
526 download_mgr->
watch_fds_[kIdxPipeTerminate].fd =
528 download_mgr->
watch_fds_[kIdxPipeTerminate].events = POLLIN | POLLPRI;
529 download_mgr->
watch_fds_[kIdxPipeTerminate].revents = 0;
532 download_mgr->
watch_fds_[kIdxPipeJobs].events = POLLIN | POLLPRI;
533 download_mgr->
watch_fds_[kIdxPipeJobs].revents = 0;
536 int still_running = 0;
537 struct timeval timeval_start, timeval_stop;
538 gettimeofday(&timeval_start, NULL);
554 gettimeofday(&timeval_stop, NULL);
555 int64_t delta =
static_cast<int64_t
>(
567 curl_multi_socket_action(download_mgr->
curl_multi_,
574 if (download_mgr->
watch_fds_[kIdxPipeTerminate].revents)
578 if (download_mgr->
watch_fds_[kIdxPipeJobs].revents) {
579 download_mgr->
watch_fds_[kIdxPipeJobs].revents = 0;
582 if (!still_running) {
583 gettimeofday(&timeval_start, NULL);
588 curl_multi_add_handle(download_mgr->
curl_multi_, handle);
589 curl_multi_socket_action(download_mgr->
curl_multi_,
606 if (download_mgr->
watch_fds_[i].revents & (POLLIN | POLLPRI))
607 ev_bitmask |= CURL_CSELECT_IN;
608 if (download_mgr->
watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
609 ev_bitmask |= CURL_CSELECT_OUT;
611 (POLLERR | POLLHUP | POLLNVAL))
613 ev_bitmask |= CURL_CSELECT_ERR;
617 curl_multi_socket_action(download_mgr->
curl_multi_,
627 while ((curl_msg = curl_multi_info_read(download_mgr->
curl_multi_,
630 if (curl_msg->msg == CURLMSG_DONE) {
633 CURL *easy_handle = curl_msg->easy_handle;
634 int curl_error = curl_msg->data.result;
635 curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
637 curl_multi_remove_handle(download_mgr->
curl_multi_, easy_handle);
639 curl_multi_add_handle(download_mgr->
curl_multi_, easy_handle);
640 curl_multi_socket_action(download_mgr->
curl_multi_,
649 Write<download::Failures>(info->
error_code());
658 curl_multi_remove_handle(download_mgr->
curl_multi_, *i);
659 curl_easy_cleanup(*i);
672 HeaderLists::~HeaderLists() {
673 for (
unsigned i = 0; i < blocks_.size(); ++i) {
680 curl_slist *HeaderLists::GetList(
const char *header) {
685 curl_slist *HeaderLists::DuplicateList(curl_slist *slist) {
687 curl_slist *copy = GetList(slist->data);
688 copy->next = slist->next;
689 curl_slist *prev = copy;
692 curl_slist *new_link = Get(slist->data);
693 new_link->next = slist->next;
694 prev->next = new_link;
702 void HeaderLists::AppendHeader(curl_slist *slist,
const char *header) {
704 curl_slist *new_link = Get(header);
705 new_link->next = NULL;
709 slist->next = new_link;
718 void HeaderLists::CutHeader(
const char *header, curl_slist **slist) {
722 curl_slist *prev = &head;
723 curl_slist *rover = *slist;
725 if (strcmp(rover->data, header) == 0) {
726 prev->next = rover->next;
737 void HeaderLists::PutList(curl_slist *slist) {
739 curl_slist *next = slist->next;
746 string HeaderLists::Print(curl_slist *slist) {
749 verbose += string(slist->data) +
"\n";
756 curl_slist *HeaderLists::Get(
const char *header) {
757 for (
unsigned i = 0; i < blocks_.size(); ++i) {
758 for (
unsigned j = 0; j < kBlockSize; ++j) {
759 if (!IsUsed(&(blocks_[i][j]))) {
760 blocks_[i][j].data =
const_cast<char *
>(header);
761 return &(blocks_[i][j]);
768 blocks_[blocks_.size()-1][0].data =
const_cast<char *
>(header);
769 return &(blocks_[blocks_.size()-1][0]);
773 void HeaderLists::Put(curl_slist *slist) {
779 void HeaderLists::AddBlock() {
780 curl_slist *new_block =
new curl_slist[kBlockSize];
781 for (
unsigned i = 0; i < kBlockSize; ++i) {
784 blocks_.push_back(new_block);
791 string DownloadManager::ProxyInfo::Print() {
797 static_cast<int>(host.deadline()) - static_cast<int>(time(NULL));
798 string expinfo = (remaining >= 0) ?
"+" :
"";
799 if (abs(remaining) >= 3600) {
801 }
else if (abs(remaining) >= 60) {
807 result +=
" (" + host.name() +
", " + expinfo +
")";
809 result +=
" (:unresolved:, " + expinfo +
")";
819 CURL *DownloadManager::AcquireCurlHandle() {
822 if (pool_handles_idle_->empty()) {
824 handle = curl_easy_init();
827 curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
832 handle = *(pool_handles_idle_->begin());
833 pool_handles_idle_->erase(pool_handles_idle_->begin());
836 pool_handles_inuse_->insert(handle);
842 void DownloadManager::ReleaseCurlHandle(CURL *handle) {
843 set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
844 assert(elem != pool_handles_inuse_->end());
846 if (pool_handles_idle_->size() > pool_max_handles_) {
847 curl_easy_cleanup(*elem);
849 pool_handles_idle_->insert(*elem);
852 pool_handles_inuse_->erase(elem);
860 void DownloadManager::InitializeRequest(
JobInfo *info, CURL *handle) {
870 info->
SetHeaders(header_lists_->DuplicateList(default_headers_));
874 if (enable_http_tracing_) {
875 for (
unsigned int i = 0; i < http_tracing_headers_.size(); i++) {
876 header_lists_->AppendHeader(info->
headers(),
877 (http_tracing_headers_)[i].c_str());
885 info->
url()->c_str(), header_lists_->Print(info->
headers()).c_str());
902 char byte_range_array[100];
903 const int64_t range_lower =
static_cast<int64_t
>(info->
range_offset());
904 const int64_t range_upper =
static_cast<int64_t
>(
906 if (snprintf(byte_range_array,
sizeof(byte_range_array),
907 "%" PRId64
"-%" PRId64,
908 range_lower, range_upper) == 100)
912 curl_easy_setopt(handle, CURLOPT_RANGE, byte_range_array);
914 curl_easy_setopt(handle, CURLOPT_RANGE, NULL);
918 curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
919 curl_easy_setopt(handle, CURLOPT_WRITEHEADER,
920 static_cast<void *>(info));
921 curl_easy_setopt(handle, CURLOPT_WRITEDATA, static_cast<void *>(info));
922 curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->
headers());
924 curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
926 curl_easy_setopt(handle, CURLOPT_HTTPGET, 1);
928 if (opt_ipv4_only_) {
929 curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
931 if (follow_redirects_) {
932 curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1);
933 curl_easy_setopt(handle, CURLOPT_MAXREDIRS, 4);
936 curl_easy_setopt(handle, CURLOPT_VERBOSE, 1);
937 curl_easy_setopt(handle, CURLOPT_DEBUGFUNCTION, CallbackCurlDebug);
946 void DownloadManager::SetUrlOptions(
JobInfo *info) {
953 if (sharding_policy_.UseCount() > 0) {
954 if (info->
proxy() !=
"") {
961 curl_easy_setopt(info->
curl_handle(), CURLOPT_PROXY, info->
proxy().c_str());
964 if (opt_timestamp_backup_proxies_ > 0) {
965 const time_t now = time(NULL);
966 if (static_cast<int64_t>(now) >
967 static_cast<int64_t>(opt_timestamp_backup_proxies_ +
968 opt_proxy_groups_reset_after_))
970 opt_proxy_groups_current_ = 0;
971 opt_timestamp_backup_proxies_ = 0;
972 RebalanceProxiesUnlocked(
"reset proxy group");
976 if (opt_timestamp_failover_proxies_ > 0) {
977 const time_t now = time(NULL);
978 if (static_cast<int64_t>(now) >
979 static_cast<int64_t>(opt_timestamp_failover_proxies_ +
980 opt_proxy_groups_reset_after_))
982 RebalanceProxiesUnlocked(
"reset load-balanced proxies");
986 if (opt_timestamp_backup_host_ > 0) {
987 const time_t now = time(NULL);
988 if (static_cast<int64_t>(now) >
989 static_cast<int64_t>(opt_timestamp_backup_host_ +
990 opt_host_reset_after_))
993 "switching host from %s to %s (reset host)",
994 (*opt_host_chain_)[opt_host_chain_current_].c_str(),
995 (*opt_host_chain_)[0].c_str());
996 opt_host_chain_current_ = 0;
997 opt_timestamp_backup_host_ = 0;
1002 if (!proxy || (proxy->
url ==
"DIRECT")) {
1004 curl_easy_setopt(info->
curl_handle(), CURLOPT_PROXY,
"");
1009 std::string purl = proxy->
url;
1011 const bool changed = ValidateProxyIpsUnlocked(purl, phost);
1018 curl_easy_setopt(info->
curl_handle(), CURLOPT_PROXY,
1019 info->
proxy().c_str());
1022 curl_easy_setopt(info->
curl_handle(), CURLOPT_PROXY,
"0.0.0.0");
1027 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, opt_low_speed_limit_);
1028 if (info->
proxy() !=
"DIRECT") {
1029 curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_proxy_);
1030 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_proxy_);
1032 curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_direct_);
1033 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, opt_timeout_direct_);
1035 if (!opt_dns_server_.empty())
1036 curl_easy_setopt(curl_handle, CURLOPT_DNS_SERVERS, opt_dns_server_.c_str());
1039 url_prefix = (*opt_host_chain_)[opt_host_chain_current_];
1043 string url = url_prefix + *(info->
url());
1045 curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 1L);
1046 if (url.substr(0, 5) ==
"https") {
1047 bool rvb = ssl_certificate_store_.ApplySslCertificatePath(curl_handle);
1050 "Failed to set SSL certificate path %s",
1051 ssl_certificate_store_.GetCaPath().c_str());
1053 if (info->
pid() != -1) {
1054 if (credentials_attachment_ == NULL) {
1056 "uses secure downloads but no credentials attachment set");
1058 bool retval = credentials_attachment_->ConfigureCurlHandle(
1069 signal(SIGPIPE, SIG_IGN);
1072 if (url.find(
"@proxy@") != string::npos) {
1080 if (proxy_template_forced_ !=
"") {
1081 replacement = proxy_template_forced_;
1082 }
else if (info->
proxy() ==
"DIRECT") {
1083 replacement = proxy_template_direct_;
1085 if (opt_proxy_groups_current_ >= opt_proxy_groups_fallback_) {
1089 curl_easy_setopt(info->
curl_handle(), CURLOPT_PROXY,
"");
1090 replacement = proxy_template_direct_;
1092 replacement = ChooseProxyUnlocked(info->
expected_hash())->host.name();
1095 replacement = (replacement ==
"") ? proxy_template_direct_ : replacement;
1097 replacement.c_str());
1098 url =
ReplaceAll(url,
"@proxy@", replacement);
1119 curl_easy_setopt(curl_handle, CURLOPT_URL, EscapeUrl(url).c_str());
1132 bool DownloadManager::ValidateProxyIpsUnlocked(
1139 host.
name().c_str());
1141 unsigned group_idx = opt_proxy_groups_current_;
1144 bool update_only =
true;
1148 "failed to resolve IP addresses for %s (%d - %s)",
1153 update_only =
false;
1157 for (
unsigned i = 0; i < (*opt_proxy_groups_)[group_idx].size(); ++i) {
1158 if ((*opt_proxy_groups_)[group_idx][i].host.id() == host.
id())
1159 (*opt_proxy_groups_)[group_idx][i].host = new_host;
1168 "DNS entries for proxy %s changed, adjusting", host.
name().c_str());
1169 vector<ProxyInfo> *group = current_proxy_group();
1170 opt_num_proxies_ -= group->size();
1171 for (
unsigned i = 0; i < group->size(); ) {
1172 if ((*group)[i].host.id() == host.
id()) {
1173 group->erase(group->begin() + i);
1178 vector<ProxyInfo> new_infos;
1180 set<string>::const_iterator iter_ips = best_addresses.begin();
1181 for (; iter_ips != best_addresses.end(); ++iter_ips) {
1183 new_infos.push_back(
ProxyInfo(new_host, url_ip));
1185 group->insert(group->end(), new_infos.begin(), new_infos.end());
1186 opt_num_proxies_ += new_infos.size();
1188 RebalanceProxiesUnlocked(
"DNS change");
1196 void DownloadManager::UpdateStatistics(CURL *handle) {
1201 retval = curl_easy_getinfo(handle, CURLINFO_SIZE_DOWNLOAD, &val);
1202 assert(retval == CURLE_OK);
1203 sum +=
static_cast<int64_t
>(val);
1207 perf::Xadd(counters_->sz_transferred_bytes, sum);
1214 bool DownloadManager::CanRetry(
const JobInfo *info) {
1216 unsigned max_retries = opt_max_retries_;
1231 unsigned backoff_init_ms = 0;
1232 unsigned backoff_max_ms = 0;
1235 backoff_init_ms = opt_backoff_init_ms_;
1236 backoff_max_ms = opt_backoff_max_ms_;
1258 header_lists_->AppendHeader(info->
headers(),
"Pragma: no-cache");
1259 header_lists_->AppendHeader(info->
headers(),
"Cache-Control: no-cache");
1269 void DownloadManager::SetRegularCache(
JobInfo *info) {
1272 header_lists_->CutHeader(
"Pragma: no-cache", info->
GetHeadersPtr());
1273 header_lists_->CutHeader(
"Cache-Control: no-cache", info->
GetHeadersPtr());
1282 void DownloadManager::ReleaseCredential(
JobInfo *info) {
1284 assert(credentials_attachment_ != NULL);
1285 credentials_attachment_->ReleaseCurlHandle(info->
curl_handle(),
1298 bool DownloadManager::VerifyAndFinalize(
const int curl_error,
JobInfo *info) {
1300 "Verify downloaded url %s, proxy %s (curl error %d)",
1301 info->
url()->c_str(), info->
proxy().c_str(), curl_error);
1305 switch (curl_error) {
1312 if (ignore_signature_failures_) {
1314 "ignoring failed hash verification of %s "
1315 "(expected %s, got %s)",
1316 info->
url()->c_str(),
1321 "hash verification of %s failed (expected %s, got %s)",
1322 info->
url()->c_str(),
1333 case CURLE_UNSUPPORTED_PROTOCOL:
1336 case CURLE_URL_MALFORMAT:
1339 case CURLE_COULDNT_RESOLVE_PROXY:
1342 case CURLE_COULDNT_RESOLVE_HOST:
1345 case CURLE_OPERATION_TIMEDOUT:
1349 case CURLE_PARTIAL_FILE:
1350 case CURLE_GOT_NOTHING:
1351 case CURLE_RECV_ERROR:
1355 case CURLE_FILE_COULDNT_READ_FILE:
1356 case CURLE_COULDNT_CONNECT:
1357 if (info->
proxy() !=
"DIRECT") {
1364 case CURLE_TOO_MANY_REDIRECTS:
1367 case CURLE_SSL_CACERT_BADFILE:
1369 "Failed to load certificate bundle. "
1370 "X509_CERT_BUNDLE might point to the wrong location.");
1375 case CURLE_PEER_FAILED_VERIFICATION:
1377 "invalid SSL certificate of remote host. "
1378 "X509_CERT_DIR and/or X509_CERT_BUNDLE might point to the wrong "
1382 case CURLE_ABORTED_BY_CALLBACK:
1383 case CURLE_WRITE_ERROR:
1386 case CURLE_SEND_ERROR:
1395 "trying to fetch %s", curl_error, info->
url()->c_str());
1400 std::vector<std::string> *host_chain = opt_host_chain_;
1403 bool try_again =
false;
1404 bool same_url_retry = CanRetry(info);
1413 "data corruption with no-cache header, try another host");
1418 if ( same_url_retry || (
1428 if ( same_url_retry || (
1434 if (sharding_policy_.UseCount() > 0) {
1436 same_url_retry =
false;
1447 if (opt_proxy_groups_) {
1448 if ((opt_proxy_groups_current_ > 0) ||
1449 (opt_proxy_groups_current_burned_ > 0))
1451 opt_proxy_groups_current_ = 0;
1452 opt_timestamp_backup_proxies_ = 0;
1453 RebalanceProxiesUnlocked(
"reset proxies for host failover");
1462 if (failover_indefinitely_) {
1466 "VerifyAndFinalize() would fail the download here. "
1467 "Instead switch proxy and retry download. "
1468 "info->probe_hosts=%d host_chain=%x info->num_used_hosts=%d "
1469 "host_chain->size()=%d same_url_retry=%d "
1470 "info->num_used_proxies=%d opt_num_proxies_=%d",
1474 host_chain->size() : -1,
static_cast<int>(same_url_retry),
1477 RebalanceProxiesUnlocked(
"failover indefinitely");
1490 "same url: %d, error code %d no-cache %d",
1495 goto verify_and_finalize_stop;
1499 goto verify_and_finalize_stop;
1509 if (sharding_policy_.UseCount() > 0) {
1510 ReleaseCredential(info);
1511 SetUrlOptions(info);
1513 SetRegularCache(info);
1516 bool switch_proxy =
false;
1517 bool switch_host =
false;
1524 switch_proxy =
true;
1533 if (same_url_retry) {
1536 switch_proxy =
true;
1539 if (same_url_retry) {
1550 ReleaseCredential(info);
1553 SetUrlOptions(info);
1556 ReleaseCredential(info);
1559 SetUrlOptions(info);
1563 if (failover_indefinitely_) {
1571 verify_and_finalize_stop:
1573 ReleaseCredential(info);
1582 header_lists_->PutList(info->
headers());
1589 DownloadManager::~DownloadManager() {
1591 if (sharding_policy_.UseCount() > 0) {
1592 sharding_policy_.Reset();
1594 if (health_check_.UseCount() > 0) {
1595 if (health_check_.Unique()) {
1597 health_check_->StopHealthcheck();
1599 health_check_.Reset();
1602 if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1605 pthread_join(thread_download_, NULL);
1607 pipe_terminate_.Destroy();
1608 pipe_jobs_.Destroy();
1611 for (set<CURL *>::iterator i = pool_handles_idle_->begin(),
1612 iEnd = pool_handles_idle_->end(); i != iEnd; ++i)
1614 curl_easy_cleanup(*i);
1617 delete pool_handles_idle_;
1618 delete pool_handles_inuse_;
1619 curl_multi_cleanup(curl_multi_);
1621 delete header_lists_;
1626 delete opt_host_chain_;
1627 delete opt_host_chain_rtt_;
1628 delete opt_proxy_groups_;
1630 curl_global_cleanup();
1634 pthread_mutex_destroy(lock_options_);
1635 pthread_mutex_destroy(lock_synchronous_mode_);
1636 free(lock_options_);
1637 free(lock_synchronous_mode_);
1640 void DownloadManager::InitHeaders() {
1642 string cernvm_id =
"User-Agent: cvmfs ";
1643 #ifdef CVMFS_LIBCVMFS
1644 cernvm_id +=
"libcvmfs ";
1646 cernvm_id +=
"Fuse ";
1648 cernvm_id += string(VERSION);
1649 if (getenv(
"CERNVM_UUID") != NULL) {
1653 user_agent_ = strdup(cernvm_id.c_str());
1657 default_headers_ = header_lists_->GetList(
"Connection: Keep-Alive");
1658 header_lists_->AppendHeader(default_headers_,
"Pragma:");
1659 header_lists_->AppendHeader(default_headers_, user_agent_);
1662 DownloadManager::DownloadManager(
const unsigned max_pool_handles,
1665 pool_handles_idle_(new set<CURL *>),
1666 pool_handles_inuse_(new set<CURL *>),
1667 pool_max_handles_(max_pool_handles),
1668 pipe_terminate_(NULL),
1672 watch_fds_inuse_(0),
1673 watch_fds_max_(4 * max_pool_handles),
1674 opt_timeout_proxy_(5),
1675 opt_timeout_direct_(10),
1676 opt_low_speed_limit_(1024),
1677 opt_max_retries_(0),
1678 opt_backoff_init_ms_(0),
1679 opt_backoff_max_ms_(0),
1680 enable_info_header_(false),
1681 opt_ipv4_only_(false),
1682 follow_redirects_(false),
1683 ignore_signature_failures_(false),
1684 enable_http_tracing_(false),
1685 opt_host_chain_(NULL),
1686 opt_host_chain_rtt_(NULL),
1687 opt_host_chain_current_(0),
1688 opt_proxy_groups_(NULL),
1689 opt_proxy_groups_current_(0),
1690 opt_proxy_groups_current_burned_(0),
1691 opt_proxy_groups_fallback_(0),
1692 opt_num_proxies_(0),
1693 opt_proxy_shard_(false),
1694 failover_indefinitely_(false),
1696 opt_timestamp_backup_proxies_(0),
1697 opt_timestamp_failover_proxies_(0),
1698 opt_proxy_groups_reset_after_(0),
1699 opt_timestamp_backup_host_(0),
1700 opt_host_reset_after_(0),
1701 credentials_attachment_(NULL),
1702 counters_(new
Counters(statistics))
1707 reinterpret_cast<pthread_mutex_t *
>(smalloc(
sizeof(pthread_mutex_t)));
1711 reinterpret_cast<pthread_mutex_t *
>(smalloc(
sizeof(pthread_mutex_t)));
1715 retval = curl_global_init(CURL_GLOBAL_ALL);
1716 assert(retval == CURLE_OK);
1723 curl_multi_setopt(
curl_multi_, CURLMOPT_SOCKETDATA,
1724 static_cast<void *>(
this));
1726 curl_multi_setopt(
curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1732 if ((getenv(
"CVMFS_IPV4_ONLY") != NULL) &&
1733 (strlen(getenv(
"CVMFS_IPV4_ONLY")) > 0))
1751 static_cast<void *>(
this));
1785 const char *header_name =
"cvmfs-info: ";
1786 const size_t header_name_len = strlen(header_name);
1787 const unsigned header_size = 1 + header_name_len +
1789 info->
SetInfoHeader(static_cast<char *>(alloca(header_size)));
1790 memcpy(info->
info_header(), header_name, header_name_len);
1792 header_size - header_name_len);
1797 const std::string str_pid =
"X-CVMFS-PID: " +
StringifyInt(info->
pid());
1829 retval = curl_easy_perform(handle);
1832 if (curl_easy_getinfo(handle, CURLINFO_TOTAL_TIME, &elapsed) == CURLE_OK)
1835 static_cast<int64_t>(elapsed * 1000));
1846 if (info->
sink() != NULL) {
1876 if (!address.empty()) {
1881 vector<string> servers;
1882 servers.push_back(address);
1895 const unsigned timeout_ms)
1912 const unsigned min_seconds,
1913 const unsigned max_seconds)
1933 const unsigned seconds_direct)
1956 unsigned *seconds_direct)
1980 if (host_list.empty()) {
2000 unsigned *current_host)
2028 const unsigned group_size = group->size();
2029 unsigned failed = 0;
2031 if (info && (info->
proxy() == (*group)[i].url)) {
2033 opt_proxy_groups_current_burned_++;
2035 (*group)[group_size - opt_proxy_groups_current_burned_]);
2047 if (opt_proxy_groups_current_burned_ == group->size()) {
2048 opt_proxy_groups_current_burned_ = 0;
2095 "don't switch host, "
2096 "last used host: %s, current host: %s",
2102 string reason =
"manually triggered";
2112 "switching host from %s to %s (%s)", old_host.c_str(),
2139 vector<string> host_chain;
2140 vector<int> host_rtt;
2141 unsigned current_host;
2143 GetHostInfo(&host_chain, &host_rtt, ¤t_host);
2150 JobInfo info(&url,
false,
false, NULL, &memsink);
2151 for (retries = 0; retries < 2; ++
retries) {
2152 for (i = 0; i < host_chain.size(); ++i) {
2153 url = host_chain[i] +
"/.cvmfspublished";
2155 struct timeval tv_start, tv_end;
2156 gettimeofday(&tv_start, NULL);
2158 gettimeofday(&tv_end, NULL);
2161 host_rtt[i] =
static_cast<int>(
2164 url.c_str(), host_rtt[i]);
2168 host_rtt[i] = INT_MAX;
2174 for (i = 0; i < host_chain.size(); ++i) {
2175 if (host_rtt[i] == INT_MAX) host_rtt[i] =
kProbeDown;
2187 std::vector<uint64_t> *output_order) {
2188 if (!servers) {
return false;}
2189 if (servers->size() == 1) {
2191 output_order->clear();
2192 output_order->push_back(0);
2197 std::vector<std::string> host_chain;
2200 std::vector<std::string> server_dns_names;
2201 server_dns_names.reserve(servers->size());
2202 for (
unsigned i = 0; i < servers->size(); ++i) {
2204 server_dns_names.push_back(host.empty() ? (*servers)[i] : host);
2206 std::string host_list =
JoinStrings(server_dns_names,
",");
2208 vector<string> host_chain_shuffled;
2216 bool success =
false;
2217 unsigned max_attempts = std::min(host_chain_shuffled.size(), size_t(3));
2218 vector<uint64_t> geo_order(servers->size());
2219 for (
unsigned i = 0; i < max_attempts; ++i) {
2220 string url = host_chain_shuffled[i] +
"/api/v1.0/geo/@proxy@/" + host_list;
2222 "requesting ordered server list from %s", url.c_str());
2224 JobInfo info(&url,
false,
false, NULL, &memsink);
2227 string order(reinterpret_cast<char*>(memsink.data()), memsink.pos());
2232 "retrieved invalid GeoAPI reply from %s [%s]",
2233 url.c_str(), order.c_str());
2236 "geographic order of servers retrieved from %s",
2244 "GeoAPI request %s failed with error %d [%s]",
2250 "failed to retrieve geographic order from stratum 1 servers");
2255 output_order->swap(geo_order);
2257 std::vector<std::string> sorted_servers;
2258 sorted_servers.reserve(geo_order.size());
2259 for (
unsigned i = 0; i < geo_order.size(); ++i) {
2260 uint64_t orderval = geo_order[i];
2261 sorted_servers.push_back((*servers)[orderval]);
2263 servers->swap(sorted_servers);
2277 vector<string> host_chain;
2278 vector<int> host_rtt;
2279 unsigned current_host;
2280 vector< vector<ProxyInfo> > proxy_chain;
2281 unsigned fallback_group;
2283 GetHostInfo(&host_chain, &host_rtt, ¤t_host);
2285 if ((host_chain.size() < 2) && ((proxy_chain.size() - fallback_group) < 2))
2288 vector<string> host_names;
2289 for (
unsigned i = 0; i < host_chain.size(); ++i)
2291 SortTeam(&host_names, &host_chain);
2292 unsigned last_geo_host = host_names.size();
2294 if ((fallback_group == 0) && (last_geo_host > 1)) {
2300 host_names.push_back(
"+PXYSEP+");
2304 unsigned first_geo_fallback = host_names.size();
2305 for (
unsigned i = fallback_group; i < proxy_chain.size(); ++i) {
2308 host_names.push_back(proxy_chain[i][0].host.name());
2311 std::vector<uint64_t> geo_order;
2326 vector<vector<ProxyInfo> > *proxy_groups =
new vector<vector<ProxyInfo> >(
2330 (*proxy_groups)[i] = (*opt_proxy_groups_)[i];
2340 for (
unsigned i = 0; i < geo_order.size(); ++i) {
2341 uint64_t orderval = geo_order[i];
2342 if (orderval < static_cast<uint64_t>(last_geo_host)) {
2345 (*opt_host_chain_)[hosti++] = host_chain[orderval];
2346 }
else if (orderval >= static_cast<uint64_t>(first_geo_fallback)) {
2350 (*proxy_groups)[proxyi] =
2351 proxy_chain[fallback_group + orderval - first_geo_fallback];
2389 const string &reply_order,
2390 const unsigned expected_size,
2391 vector<uint64_t> *reply_vals)
2393 if (reply_order.empty())
2396 if (!sanitizer.
IsValid(reply_order))
2399 vector<string> reply_strings =
2401 vector<uint64_t> tmp_vals;
2402 for (
unsigned i = 0; i < reply_strings.size(); ++i) {
2403 if (reply_strings[i].empty())
2407 if (tmp_vals.size() != expected_size)
2411 set<uint64_t> coverage(tmp_vals.begin(), tmp_vals.end());
2412 if (coverage.size() != tmp_vals.size())
2414 if ((*coverage.begin() != 1) || (*coverage.rbegin() != coverage.size()))
2417 for (
unsigned i = 0; i < expected_size; ++i) {
2418 (*reply_vals)[i] = tmp_vals[i] - 1;
2429 const string &proxy_list,
2430 string *cleaned_list)
2433 if (proxy_list ==
"") {
2437 bool result =
false;
2439 vector<string> proxy_groups =
SplitString(proxy_list,
';');
2440 vector<string> cleaned_groups;
2441 for (
unsigned i = 0; i < proxy_groups.size(); ++i) {
2442 vector<string> group =
SplitString(proxy_groups[i],
'|');
2443 vector<string> cleaned;
2444 for (
unsigned j = 0; j < group.size(); ++j) {
2445 if ((group[j] ==
"DIRECT") || (group[j] ==
"")) {
2448 cleaned.push_back(group[j]);
2451 if (!cleaned.empty())
2452 cleaned_groups.push_back(
JoinStrings(cleaned,
"|"));
2469 const string &proxy_list,
2470 const string &fallback_proxy_list,
2479 bool contains_direct;
2488 if (contains_direct) {
2490 "fallback proxies do not support DIRECT, removing");
2492 if (set_proxy_fallback_list ==
"") {
2496 if (contains_direct) {
2498 "skipping DIRECT proxy to use fallback proxy");
2507 if ((set_proxy_list ==
"") && (set_proxy_fallback_list ==
"")) {
2518 if (set_proxy_list !=
"") {
2526 string all_proxy_list = set_proxy_list;
2527 if (set_proxy_fallback_list !=
"") {
2528 if (all_proxy_list !=
"")
2529 all_proxy_list +=
";";
2530 all_proxy_list += set_proxy_fallback_list;
2533 all_proxy_list.c_str());
2536 vector<string> hostnames;
2537 vector<string> proxy_groups;
2538 if (all_proxy_list !=
"")
2540 for (
unsigned i = 0; i < proxy_groups.size(); ++i) {
2541 vector<string> this_group =
SplitString(proxy_groups[i],
'|');
2542 for (
unsigned j = 0; j < this_group.size(); ++j) {
2548 hostnames.push_back(hostname);
2551 vector<dns::Host> hosts;
2560 unsigned num_proxy = 0;
2561 for (
unsigned i = 0; i < proxy_groups.size(); ++i) {
2562 vector<string> this_group =
SplitString(proxy_groups[i],
'|');
2566 vector<ProxyInfo> infos;
2567 for (
unsigned j = 0; j < this_group.size(); ++j, ++num_proxy) {
2569 if (this_group[j] ==
"DIRECT") {
2576 "failed to resolve IP addresses for %s (%d - %s)",
2577 hosts[num_proxy].name().c_str(), hosts[num_proxy].status(),
2581 infos.push_back(
ProxyInfo(failed_host, this_group[j]));
2586 set<string> best_addresses =
2588 set<string>::const_iterator iter_ips = best_addresses.begin();
2589 for (; iter_ips != best_addresses.end(); ++iter_ips) {
2591 infos.push_back(
ProxyInfo(hosts[num_proxy], url_ip));
2599 opt_num_proxies_ += infos.size();
2602 "installed %u proxies in %u load-balance groups",
2622 unsigned *current_group,
2623 unsigned *fallback_group)
2625 assert(proxy_chain != NULL);
2630 vector< vector<ProxyInfo> > empty_chain;
2631 *proxy_chain = empty_chain;
2632 if (current_group != NULL)
2634 if (fallback_group != NULL)
2635 *fallback_group = 0;
2640 if (current_group != NULL)
2642 if (fallback_group != NULL)
2662 uint32_t key = (hash ? hash->
Partial32() : 0);
2663 map<uint32_t, ProxyInfo *>::iterator it =
opt_proxy_map_.lower_bound(key);
2684 const uint32_t max_key = 0xffffffffUL;
2687 for (
unsigned i = 0; i < num_alive; ++i) {
2694 const std::pair<uint32_t, ProxyInfo *> entry(prng.
Next(max_key), proxy);
2701 const std::pair<uint32_t, ProxyInfo *> last_entry(max_key, first_proxy);
2705 unsigned select =
prng_.
Next(num_alive);
2707 const std::pair<uint32_t, ProxyInfo *> entry(max_key, proxy);
2715 if (new_proxy != old_proxy) {
2717 "switching proxy from %s to %s (%s)",
2718 (old_proxy.empty() ?
"(none)" : old_proxy.c_str()),
2719 (new_proxy.empty() ?
"(none)" : new_proxy.c_str()),
2789 const unsigned backoff_init_ms,
2790 const unsigned backoff_max_ms)
2806 const std::string &direct,
2807 const std::string &forced)
2841 bool success =
false;
2845 "Proposed sharding policy does not exist. Falling back to default");
unsigned opt_timeout_direct_
std::vector< std::string > http_tracing_headers_
bool StripDirect(const std::string &proxy_list, std::string *cleaned_list)
unsigned opt_low_speed_limit_
void HashString(const std::string &content, Any *any_digest)
#define LogCvmfs(source, mask,...)
static const unsigned kDnsDefaultTimeoutMs
bool ignore_signature_failures_
std::vector< T > Shuffle(const std::vector< T > &input, Prng *prng)
unsigned throttle() const
z_stream * GetZstreamPtr()
unsigned opt_backoff_init_ms_
void SetInfoHeader(char *info_header)
bool enable_http_tracing_
shash::ContextPtr * GetHashContextPtr()
int64_t Xadd(class Counter *counter, const int64_t delta)
const char * Code2Ascii(const Failures error)
unsigned opt_proxy_groups_current_burned_
double DiffTimeSeconds(struct timeval start, struct timeval end)
unsigned opt_proxy_groups_reset_after_
virtual bool IsCanceled()
void SetUrlOptions(JobInfo *info)
SharedPtr< ShardingPolicy > sharding_policy_
StreamStates DecompressZStream2Sink(const void *buf, const int64_t size, z_stream *strm, cvmfs::Sink *sink)
void ResolveMany(const std::vector< std::string > &names, std::vector< Host > *hosts)
std::string opt_proxy_fallback_list_
void SetHostChain(const std::string &host_list)
static NormalResolver * Create(const bool ipv4_only, const unsigned retries, const unsigned timeout_ms)
unsigned opt_host_reset_after_
void SetLowSpeedLimit(const unsigned low_speed_limit)
std::string proxy_template_direct_
curl_slist ** GetHeadersPtr()
static const int kProbeGeo
string Trim(const string &raw, bool trim_newline)
string ReplaceAll(const string &haystack, const string &needle, const string &replace_by)
void set_min_ttl(unsigned seconds)
string JoinStrings(const vector< string > &strings, const string &joint)
std::string ToString(const bool with_suffix=false) const
unsigned opt_proxy_groups_current_
virtual bool RequiresReserve()=0
Pipe< kPipeDownloadJobsResults > * GetPipeJobResultWeakRef()
bool ValidateGeoReply(const std::string &reply_order, const unsigned expected_size, std::vector< uint64_t > *reply_vals)
std::vector< ProxyInfo > * current_proxy_group() const
void DecompressInit(z_stream *strm)
const std::string * url() const
time_t opt_timestamp_backup_proxies_
void SetProxyChain(const std::string &proxy_list, const std::string &fallback_proxy_list, const ProxySetModes set_mode)
std::string GetProxyList()
bool allow_failure() const
std::set< CURL * > * pool_handles_inuse_
CURL * curl_handle() const
pthread_mutex_t * lock_options_
ProxyInfo * ChooseProxyUnlocked(const shash::Any *hash)
pthread_t thread_download_
DownloadManager(const unsigned max_pool_handles, const perf::StatisticsTemplate &statistics)
std::string opt_proxy_list_
const std::string & name() const
perf::Counter * sz_transfer_time
void SetTracingHeaderGid(char *tracing_header_gid)
std::vector< std::vector< ProxyInfo > > * opt_proxy_groups_
assert((mem||(size==0))&&"Out Of Memory")
void SetNocache(bool nocache)
unsigned opt_proxy_groups_fallback_
void ReleaseCurlHandle(CURL *handle)
void SetTracingHeaderPid(char *tracing_header_pid)
void set_max_ttl(unsigned seconds)
char * tracing_header_gid() const
const std::set< std::string > & ViewBestAddresses(IpPreference preference) const
char * info_header() const
void SetDnsServer(const std::string &address)
bool force_nocache() const
std::string StringifyUint(const uint64_t value)
void DecompressFini(z_stream *strm)
virtual std::string Describe()=0
static void * MainDownload(void *data)
void InitSeed(const uint64_t seed)
void SetTimeout(const unsigned seconds_proxy, const unsigned seconds_direct)
std::string opt_dns_server_
uint32_t watch_fds_inuse_
off_t range_offset() const
unsigned char num_used_hosts() const
bool follow_redirects() const
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, void *info_link)
void Init(ContextPtr context)
bool FileExists(const std::string &path)
void SetDnsTtlLimits(const unsigned min_seconds, const unsigned max_seconds)
static Failures PrepareDownloadDestination(JobInfo *info)
void SetHttpCode(int http_code)
std::vector< std::string > opt_proxy_urls_
const char * Code2Ascii(const Failures error)
uint32_t pool_max_handles_
bool head_request() const
unsigned char num_retries() const
bool IsValidPipeJobResults()
void GetProxyInfo(std::vector< std::vector< ProxyInfo > > *proxy_chain, unsigned *current_group, unsigned *fallback_group)
void SetProxyGroupResetDelay(const unsigned seconds)
atomic_int32 multi_threaded_
void SetCurrentHostChainIndex(unsigned int current_host_chain_index)
std::string AddDefaultScheme(const std::string &proxy)
vector< string > SplitString(const string &str, char delim)
cvmfs::Sink * sink() const
dns::NormalResolver * resolver_
bool Interrupted(const std::string &fqrn, JobInfo *info)
std::string proxy() const
dns::IpPreference opt_ip_preference_
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, void *info_link)
unsigned int current_host_chain_index() const
void UseSystemCertificatePath()
bool SetShardingPolicy(const ShardingPolicySelector type)
perf::Counter * n_host_failover
void UpdateProxiesUnlocked(const std::string &reason)
bool IsEquivalent(const Host &other) const
bool IsProxyTransferError(const Failures error)
void SetNumRetries(unsigned char num_retries)
void set_throttle(const unsigned throttle)
InterruptCue * interrupt_cue() const
void SetIpPreference(const dns::IpPreference preference)
bool failover_indefinitely_
shash::ContextPtr hash_context() const
perf::Counter * n_requests
void Final(ContextPtr context, Any *any_digest)
void SetRetryParameters(const unsigned max_retries, const unsigned backoff_init_ms, const unsigned backoff_max_ms)
string StringifyInt(const int64_t value)
char * tracing_header_uid() const
Failures error_code() const
void CloneProxyConfig(DownloadManager *clone)
void SetMaxIpaddrPerProxy(unsigned limit)
void EnableIgnoreSignatureFailures()
DownloadManager * Clone(const perf::StatisticsTemplate &statistics)
CURL * AcquireCurlHandle()
void Inc(class Counter *counter)
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
std::string ExtractHost(const std::string &url)
std::vector< int > * opt_host_chain_rtt_
SslCertificateStore ssl_certificate_store_
time_t opt_timestamp_backup_host_
std::string GetFallbackProxyList()
uint32_t Partial32() const
void SetProxyTemplates(const std::string &direct, const std::string &forced)
unsigned opt_backoff_max_ms_
void SetErrorCode(Failures error_code)
void SetNumUsedProxies(unsigned char num_used_proxies)
unsigned GetContextSize(const Algorithms algorithm)
std::string GetDnsServer() const
unsigned opt_num_proxies_
unsigned opt_host_chain_current_
CredentialsAttachment * credentials_attachment_
std::vector< std::string > * opt_host_chain_
struct pollfd * watch_fds_
void Update(const unsigned char *buffer, const unsigned buffer_length, ContextPtr context)
std::map< uint32_t, ProxyInfo * > opt_proxy_map_
uint64_t String2Uint64(const string &value)
UniquePtr< Pipe< kPipeDownloadJobs > > pipe_jobs_
void UseSystemCertificatePath()
void CreatePipeJobResults()
Failures Fetch(JobInfo *info)
void SetCurlHandle(CURL *curl_handle)
unsigned opt_max_retries_
void SetTracingHeaderUid(char *tracing_header_uid)
curl_slist * headers() const
const shash::Any * expected_hash() const
perf::Counter * n_proxy_failover
void GetTimeout(unsigned *seconds_proxy, unsigned *seconds_direct)
void SetCredData(void *cred_data)
void SetFailoverIndefinitely()
std::string proxy_template_forced_
time_t opt_timestamp_failover_proxies_
void SetDnsParameters(const unsigned retries, const unsigned timeout_ms)
unsigned EscapeHeader(const std::string &header, char *escaped_buf, size_t buf_size)
const std::string * extra_info() const
static Host ExtendDeadline(const Host &original, unsigned seconds_from_now)
UniquePtr< Pipe< kPipeThreadTerminator > > pipe_terminate_
void SetProxy(const std::string &proxy)
static const int kProbeUnprobed
unsigned backoff_ms() const
unsigned char num_used_proxies() const
void SafeSleepMs(const unsigned ms)
bool IsHostTransferError(const Failures error)
static const unsigned kDnsDefaultRetries
void SortTeam(std::vector< T > *tractor, std::vector< U > *towed)
bool GeoSortServers(std::vector< std::string > *servers, std::vector< uint64_t > *output_order=NULL)
virtual bool Reserve(size_t size)=0
static const int kProbeDown
void SetNumUsedHosts(unsigned char num_used_hosts)
void SetHeaders(curl_slist *headers)
static const unsigned kProxyMapScale
void GetHostInfo(std::vector< std::string > *host_chain, std::vector< int > *rtt, unsigned *current_host)
unsigned opt_timeout_proxy_
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
char * tracing_header_pid() const
void SetBackoffMs(unsigned backoff_ms)
SharedPtr< HealthCheck > health_check_
void SwitchProxy(JobInfo *info)
void AddHTTPTracingHeader(const std::string &header)
void SetCredentialsAttachment(CredentialsAttachment *ca)
void SetFollowRedirects(bool follow_redirects)
void RebalanceProxiesUnlocked(const std::string &reason)
pthread_mutex_t * lock_synchronous_mode_
virtual bool SetResolvers(const std::vector< std::string > &resolvers)
void InitializeRequest(JobInfo *info, CURL *handle)
static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, void *userp, void *socketp)
string RewriteUrl(const string &url, const string &ip)
void SetHostResetDelay(const unsigned seconds)
uint32_t Next(const uint64_t boundary)
virtual int64_t Write(const void *buf, uint64_t sz)=0
unsigned timeout_ms() const