27 const char *S3FanoutManager::kCacheControlCas =
"Cache-Control: max-age=259200";
29 *S3FanoutManager::kCacheControlDotCvmfs =
"Cache-Control: max-age=61";
30 const unsigned S3FanoutManager::kDefault429ThrottleMs = 250;
31 const unsigned S3FanoutManager::kMax429ThrottleMs = 10000;
32 const unsigned S3FanoutManager::kThrottleReportIntervalSec = 10;
33 const unsigned S3FanoutManager::kDefaultHTTPPort = 80;
34 const unsigned S3FanoutManager::kDefaultHTTPSPort = 443;
40 void S3FanoutManager::DetectThrottleIndicator(
const std::string &header,
42 std::string value_str;
43 if (
HasPrefix(header,
"retry-after:",
true))
44 value_str = header.substr(12);
45 if (
HasPrefix(header,
"x-retry-in:",
true))
46 value_str = header.substr(11);
48 value_str =
Trim(value_str,
true );
49 if (!value_str.empty()) {
51 const unsigned value_ms =
HasSuffix(value_str,
"ms",
true )
53 : (value_numeric * 1000);
55 info->
throttle_ms = std::min(value_ms, kMax429ThrottleMs);
65 const size_t num_bytes = size * nmemb;
66 const string header_line(static_cast<const char *>(ptr), num_bytes);
70 if (
HasPrefix(header_line,
"HTTP/1.",
false)) {
71 if (header_line.length() < 10)
75 for (i = 8; (i < header_line.length()) && (header_line[i] ==
' '); ++i) {
78 if (header_line[i] ==
'2') {
82 info, header_line.c_str());
83 if (header_line.length() < i + 3) {
94 info->
throttle_ms = S3FanoutManager::kDefault429ThrottleMs;
120 S3FanoutManager::DetectThrottleIndicator(header_line, info);
132 const size_t num_bytes = size * nmemb;
140 const uint64_t read_bytes = info->
origin->Read(ptr, num_bytes);
161 int S3FanoutManager::CallbackCurlSocket(CURL *easy, curl_socket_t s,
int action,
162 void *userp,
void *socketp) {
165 "CallbackCurlSocket called with easy "
166 "handle %p, socket %d, action %d, up %p, "
167 "sp %p, fds_inuse %d, jobs %d",
170 if (action == CURL_POLL_NONE)
185 s3fanout_mgr->
watch_fds_ =
static_cast<struct pollfd *
>(
197 s3fanout_mgr->
watch_fds_[index].events = POLLIN | POLLPRI;
200 s3fanout_mgr->
watch_fds_[index].events = POLLOUT | POLLWRBAND;
202 case CURL_POLL_INOUT:
203 s3fanout_mgr->
watch_fds_[index].events = POLLIN | POLLPRI | POLLOUT
206 case CURL_POLL_REMOVE:
207 if (index < s3fanout_mgr->watch_fds_inuse_ - 1)
217 s3fanout_mgr->
watch_fds_ =
static_cast<struct pollfd *
>(
233 void *S3FanoutManager::MainUpload(
void *data) {
242 unsigned jobs_in_flight = 0;
246 const int timeout_ms = 100;
251 int still_running = 0;
252 retval = curl_multi_socket_action(
253 s3fanout_mgr->
curl_multi_, CURL_SOCKET_TIMEOUT, 0, &still_running);
254 if (retval != CURLM_OK) {
256 assert(retval == CURLM_OK);
258 }
else if (retval < 0) {
273 if (handle == NULL) {
280 "Failed to initialize CURL handle (error: %d - %s | errno: %d)",
281 init_failure,
Code2Ascii(init_failure), errno);
285 curl_multi_add_handle(s3fanout_mgr->
curl_multi_, handle);
288 int still_running = 0, retval = 0;
289 retval = curl_multi_socket_action(
290 s3fanout_mgr->
curl_multi_, CURL_SOCKET_TIMEOUT, 0, &still_running);
293 retval, still_running);
309 if (s3fanout_mgr->
watch_fds_[i].revents & (POLLIN | POLLPRI))
310 ev_bitmask |= CURL_CSELECT_IN;
311 if (s3fanout_mgr->
watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
312 ev_bitmask |= CURL_CSELECT_OUT;
314 & (POLLERR | POLLHUP | POLLNVAL))
315 ev_bitmask |= CURL_CSELECT_ERR;
318 int still_running = 0;
319 retval = curl_multi_socket_action(s3fanout_mgr->
curl_multi_,
329 while ((curl_msg = curl_multi_info_read(s3fanout_mgr->
curl_multi_,
331 assert(curl_msg->msg == CURLMSG_DONE);
335 CURL *easy_handle = curl_msg->easy_handle;
336 const int curl_error = curl_msg->data.result;
337 curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
339 curl_multi_remove_handle(s3fanout_mgr->
curl_multi_, easy_handle);
341 curl_multi_add_handle(s3fanout_mgr->
curl_multi_, easy_handle);
342 int still_running = 0;
343 curl_multi_socket_action(
344 s3fanout_mgr->
curl_multi_, CURL_SOCKET_TIMEOUT, 0, &still_running);
361 for (; i != i_end; ++i) {
362 curl_multi_remove_handle(s3fanout_mgr->
curl_multi_, *i);
363 curl_easy_cleanup(*i);
377 CURL *S3FanoutManager::AcquireCurlHandle()
const {
382 if (pool_handles_idle_->empty()) {
386 handle = curl_easy_init();
390 retval = curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
391 assert(retval == CURLE_OK);
392 retval = curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION,
394 assert(retval == CURLE_OK);
396 assert(retval == CURLE_OK);
398 assert(retval == CURLE_OK);
400 handle = *(pool_handles_idle_->begin());
401 pool_handles_idle_->erase(pool_handles_idle_->begin());
404 pool_handles_inuse_->insert(handle);
410 void S3FanoutManager::ReleaseCurlHandle(
JobInfo *info, CURL *handle)
const {
418 const set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
419 assert(elem != pool_handles_inuse_->end());
421 if (pool_handles_idle_->size() > config_.pool_max_handles) {
422 const CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, NULL);
423 assert(retval == CURLE_OK);
424 curl_easy_cleanup(handle);
425 const std::map<CURL *, S3FanOutDnsEntry *>::size_type retitems =
426 curl_sharehandles_->erase(handle);
429 pool_handles_idle_->insert(handle);
432 pool_handles_inuse_->erase(elem);
435 void S3FanoutManager::InitPipeWatchFds() {
436 assert(watch_fds_inuse_ == 0);
437 assert(watch_fds_size_ >= 2);
438 watch_fds_[0].fd = pipe_terminate_[0];
439 watch_fds_[0].events = POLLIN | POLLPRI;
440 watch_fds_[0].revents = 0;
442 watch_fds_[1].fd = pipe_jobs_[0];
443 watch_fds_[1].events = POLLIN | POLLPRI;
444 watch_fds_[1].revents = 0;
452 bool S3FanoutManager::MkV2Authz(
const JobInfo &info,
453 vector<string> *headers)
const {
455 const bool retval = MkPayloadHash(info, &payload_hash);
458 const string content_type = GetContentType(info);
459 const string request = GetRequestString(info);
462 string to_sign = request +
"\n" + payload_hash +
"\n" + content_type +
"\n"
464 if (config_.x_amz_acl !=
"") {
465 to_sign +=
"x-amz-acl:" + config_.x_amz_acl +
"\n" +
474 reinterpret_cast<const unsigned char *>(to_sign.data()),
475 to_sign.length(), &hmac);
477 headers->push_back(
"Authorization: AWS " + config_.access_key +
":"
478 +
Base64(
string(reinterpret_cast<char *>(hmac.digest),
479 hmac.GetDigestSize())));
480 headers->push_back(
"Date: " + timestamp);
481 headers->push_back(
"X-Amz-Acl: " + config_.x_amz_acl);
482 if (!payload_hash.empty())
483 headers->push_back(
"Content-MD5: " + payload_hash);
484 if (!content_type.empty())
485 headers->push_back(
"Content-Type: " + content_type);
490 string S3FanoutManager::GetUriEncode(
const string &val,
491 bool encode_slash)
const {
493 const unsigned len = val.length();
495 for (
unsigned i = 0; i < len; ++i) {
496 const char c = val[i];
497 if ((c >=
'A' && c <=
'Z') || (c >=
'a' && c <=
'z')
498 || (c >=
'0' && c <=
'9') || c ==
'_' || c ==
'-' || c ==
'~'
501 }
else if (c ==
'/') {
508 result.push_back(
'%');
509 result.push_back((c / 16) + ((c / 16 <= 9) ?
'0' :
'A' - 10));
510 result.push_back((c % 16) + ((c % 16 <= 9) ?
'0' :
'A' - 10));
517 string S3FanoutManager::GetAwsV4SigningKey(
const string &date)
const {
518 if (last_signing_key_.first == date)
519 return last_signing_key_.second;
521 const string date_key =
523 const string date_region_key =
shash::Hmac256(date_key, config_.region,
true);
524 const string date_region_service_key =
526 string signing_key =
shash::Hmac256(date_region_service_key,
"aws4_request",
528 last_signing_key_.first = date;
529 last_signing_key_.second = signing_key;
538 bool S3FanoutManager::MkV4Authz(
const JobInfo &info,
539 vector<string> *headers)
const {
541 const bool retval = MkPayloadHash(info, &payload_hash);
544 const string content_type = GetContentType(info);
546 const string date = timestamp.substr(0, 8);
547 vector<string> tokens =
SplitString(complete_hostname_,
':');
548 assert(tokens.size() <= 2);
549 string canonical_hostname = tokens[0];
553 if (tokens.size() == 2
556 canonical_hostname +=
":" + tokens[1];
558 string signed_headers;
559 string canonical_headers;
560 if (!content_type.empty()) {
561 signed_headers +=
"content-type;";
562 headers->push_back(
"Content-Type: " + content_type);
563 canonical_headers +=
"content-type:" + content_type +
"\n";
565 if (config_.x_amz_acl !=
"") {
566 signed_headers +=
"host;x-amz-acl;x-amz-content-sha256;x-amz-date";
568 signed_headers +=
"host;x-amz-content-sha256;x-amz-date";
570 canonical_headers +=
"host:" + canonical_hostname +
"\n";
571 if (config_.x_amz_acl !=
"") {
572 canonical_headers +=
"x-amz-acl:" + config_.x_amz_acl +
"\n";
574 canonical_headers +=
"x-amz-content-sha256:" + payload_hash +
"\n"
575 +
"x-amz-date:" + timestamp +
"\n";
577 const string scope = date +
"/" + config_.region +
"/s3/aws4_request";
581 : (
string(
"/") + config_.bucket +
"/" + info.
object_key);
583 const string canonical_request =
584 GetRequestString(info) +
"\n" + GetUriEncode(uri,
false) +
"\n" +
"\n" +
585 canonical_headers +
"\n" + signed_headers +
"\n" + payload_hash;
589 const string string_to_sign =
590 "AWS4-HMAC-SHA256\n" + timestamp +
"\n" + scope +
"\n" + hash_request;
592 const string signing_key = GetAwsV4SigningKey(date);
593 const string signature =
shash::Hmac256(signing_key, string_to_sign);
595 headers->push_back(
"X-Amz-Acl: " + config_.x_amz_acl);
596 headers->push_back(
"X-Amz-Content-Sha256: " + payload_hash);
597 headers->push_back(
"X-Amz-Date: " + timestamp);
598 headers->push_back(
"Authorization: AWS4-HMAC-SHA256 "
600 + config_.access_key +
"/" + scope
614 bool S3FanoutManager::MkAzureAuthz(
const JobInfo &info,
615 vector<string> *headers)
const {
617 const string canonical_headers =
618 "x-ms-blob-type:BlockBlob\nx-ms-date:" + timestamp +
619 "\nx-ms-version:2011-08-18";
620 const string canonical_resource =
621 "/" + config_.access_key +
"/" + config_.bucket +
"/" + info.
object_key;
623 string string_to_sign;
624 if ((info.
request == JobInfo::kReqHeadOnly)
625 || (info.
request == JobInfo::kReqHeadPut)
626 || (info.
request == JobInfo::kReqDelete)) {
627 string_to_sign = GetRequestString(info) + string(
"\n\n\n")
628 +
"\n\n\n\n\n\n\n\n\n" + canonical_headers +
"\n"
629 + canonical_resource;
631 string_to_sign = GetRequestString(info) + string(
"\n\n\n")
633 +
"\n\n\n\n\n\n\n\n\n" + canonical_headers +
"\n"
634 + canonical_resource;
638 const int retval =
Debase64(config_.secret_key, &signing_key);
642 const string signature =
shash::Hmac256(signing_key, string_to_sign,
true);
644 headers->push_back(
"x-ms-date: " + timestamp);
645 headers->push_back(
"x-ms-version: 2011-08-18");
646 headers->push_back(
"Authorization: SharedKey " + config_.access_key +
":"
648 headers->push_back(
"x-ms-blob-type: BlockBlob");
652 void S3FanoutManager::InitializeDnsSettingsCurl(CURL *handle,
654 curl_slist *clist)
const {
655 CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, sharehandle);
656 assert(retval == CURLE_OK);
657 retval = curl_easy_setopt(handle, CURLOPT_RESOLVE, clist);
658 assert(retval == CURLE_OK);
662 int S3FanoutManager::InitializeDnsSettings(CURL *handle,
663 std::string host_with_port)
const {
665 const std::map<CURL *, S3FanOutDnsEntry *>::const_iterator it =
666 curl_sharehandles_->find(handle);
667 if (it != curl_sharehandles_->end()) {
668 InitializeDnsSettingsCurl(handle, it->second->sharehandle,
675 host_with_port = config_.protocol +
"://" + host_with_port;
681 unsigned int usemin = UINT_MAX;
682 std::set<S3FanOutDnsEntry *>::iterator its3 = sharehandles_->begin();
683 for (; its3 != sharehandles_->end(); ++its3) {
684 if ((*its3)->dns_name == remote_host) {
685 if (usemin >= (*its3)->counter) {
692 curl_sharehandles_->insert(
693 std::pair<CURL *, S3FanOutDnsEntry *>(handle, useme));
701 const dns::Host host = resolver_->Resolve(remote_host);
703 std::set<string>::iterator its = ipv4_addresses.begin();
705 for (; its != ipv4_addresses.end(); ++its) {
709 dnse->
port = remote_port.size() == 0 ?
"80" : remote_port;
712 dnse->
clist = curl_slist_append(
717 const CURLSHcode share_retval = curl_share_setopt(
718 dnse->
sharehandle, CURLSHOPT_SHARE, CURL_LOCK_DATA_DNS);
719 assert(share_retval == CURLSHE_OK);
720 sharehandles_->insert(dnse);
724 "Error: DNS resolve failed for address '%s'.",
725 remote_host.c_str());
729 curl_sharehandles_->insert(
730 std::pair<CURL *, S3FanOutDnsEntry *>(handle, dnse));
738 bool S3FanoutManager::MkPayloadHash(
const JobInfo &info,
739 string *hex_hash)
const {
740 if ((info.
request == JobInfo::kReqHeadOnly)
741 || (info.
request == JobInfo::kReqHeadPut)
742 || (info.
request == JobInfo::kReqDelete)) {
743 switch (config_.authz_method) {
749 *hex_hash =
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b78"
766 const unsigned int nbytes = info.
origin->Data(
767 reinterpret_cast<void **>(&data), info.
origin->GetSize(), 0);
770 switch (config_.authz_method) {
773 *hex_hash =
Base64(
string(reinterpret_cast<char *>(payload_hash.
digest),
788 string S3FanoutManager::GetRequestString(
const JobInfo &info)
const {
790 case JobInfo::kReqHeadOnly:
791 case JobInfo::kReqHeadPut:
793 case JobInfo::kReqPutCas:
794 case JobInfo::kReqPutDotCvmfs:
795 case JobInfo::kReqPutHtml:
796 case JobInfo::kReqPutBucket:
798 case JobInfo::kReqDelete:
806 string S3FanoutManager::GetContentType(
const JobInfo &info)
const {
808 case JobInfo::kReqHeadOnly:
809 case JobInfo::kReqHeadPut:
810 case JobInfo::kReqDelete:
812 case JobInfo::kReqPutCas:
813 return "application/octet-stream";
814 case JobInfo::kReqPutDotCvmfs:
815 return "application/x-cvmfs";
816 case JobInfo::kReqPutHtml:
818 case JobInfo::kReqPutBucket:
844 InitializeDnsSettings(handle, complete_hostname_);
847 if ((info->
request == JobInfo::kReqHeadOnly)
848 || (info->
request == JobInfo::kReqHeadPut)
849 || (info->
request == JobInfo::kReqDelete)) {
850 retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 0);
851 assert(retval == CURLE_OK);
852 retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
853 assert(retval == CURLE_OK);
855 if (info->
request == JobInfo::kReqDelete) {
856 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST,
857 GetRequestString(*info).c_str());
858 assert(retval == CURLE_OK);
860 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
861 assert(retval == CURLE_OK);
864 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
865 assert(retval == CURLE_OK);
866 retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 1);
867 assert(retval == CURLE_OK);
868 retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 0);
869 assert(retval == CURLE_OK);
870 retval = curl_easy_setopt(handle, CURLOPT_INFILESIZE_LARGE,
871 static_cast<curl_off_t>(info->
origin->GetSize()));
872 assert(retval == CURLE_OK);
874 if (info->
request == JobInfo::kReqPutDotCvmfs) {
876 kCacheControlDotCvmfs);
877 }
else if (info->
request == JobInfo::kReqPutCas) {
886 vector<string> authz_headers;
887 switch (config_.authz_method) {
889 retval_b = MkV2Authz(*info, &authz_headers);
892 retval_b = MkV4Authz(*info, &authz_headers);
895 retval_b = MkAzureAuthz(*info, &authz_headers);
902 for (
unsigned i = 0; i < authz_headers.size(); ++i) {
904 authz_headers[i].c_str());
909 "Connection: Keep-Alive");
916 user_agent_->c_str());
919 retval = curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
920 assert(retval == CURLE_OK);
921 retval = curl_easy_setopt(handle, CURLOPT_HEADERDATA,
922 static_cast<void *>(info));
923 assert(retval == CURLE_OK);
924 retval = curl_easy_setopt(handle, CURLOPT_READDATA,
925 static_cast<void *>(info));
926 assert(retval == CURLE_OK);
927 retval = curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->
http_headers);
928 assert(retval == CURLE_OK);
929 if (opt_ipv4_only_) {
930 retval = curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
931 assert(retval == CURLE_OK);
934 retval = curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L);
935 assert(retval == CURLE_OK);
937 retval = curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, info->
errorbuffer);
938 assert(retval == CURLE_OK);
940 if (config_.protocol ==
"https") {
941 retval = curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 1L);
942 assert(retval == CURLE_OK);
943 retval = curl_easy_setopt(handle, CURLOPT_PROXY_SSL_VERIFYPEER, 1L);
944 assert(retval == CURLE_OK);
945 const bool add_cert =
946 ssl_certificate_store_.ApplySslCertificatePath(handle);
957 void S3FanoutManager::SetUrlOptions(
JobInfo *info)
const {
961 retval = curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT,
962 config_.opt_timeout_sec);
963 assert(retval == CURLE_OK);
964 retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT,
966 assert(retval == CURLE_OK);
967 retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME,
968 config_.opt_timeout_sec);
969 assert(retval == CURLE_OK);
971 if (is_curl_debug_) {
972 retval = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE, 1);
973 assert(retval == CURLE_OK);
977 retval = curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
978 assert(retval == CURLE_OK);
980 retval = curl_easy_setopt(curl_handle, CURLOPT_PROXY, config_.proxy.c_str());
981 assert(retval == CURLE_OK);
988 void S3FanoutManager::UpdateStatistics(CURL *handle) {
991 if (curl_easy_getinfo(handle, CURLINFO_SIZE_UPLOAD, &val) == CURLE_OK)
999 bool S3FanoutManager::CanRetry(
const JobInfo *info) {
1023 if ((now - timestamp_last_throttle_report_)
1024 > kThrottleReportIntervalSec) {
1026 "Warning: S3 backend throttling %ums "
1027 "(total backoff time so far %lums)",
1029 timestamp_last_throttle_report_ = now;
1037 info->
backoff_ms = prng_.Next(config_.opt_backoff_init_ms + 1);
1041 if (info->
backoff_ms > config_.opt_backoff_max_ms)
1042 info->
backoff_ms = config_.opt_backoff_max_ms;
1057 bool S3FanoutManager::VerifyAndFinalize(
const int curl_error,
JobInfo *info) {
1059 "Verify uploaded/tested object %s "
1060 "(curl error %d, info error %d, info request %d)",
1066 switch (curl_error) {
1073 case CURLE_UNSUPPORTED_PROTOCOL:
1074 case CURLE_URL_MALFORMAT:
1077 case CURLE_COULDNT_RESOLVE_HOST:
1080 case CURLE_COULDNT_CONNECT:
1081 case CURLE_OPERATION_TIMEDOUT:
1082 case CURLE_SEND_ERROR:
1083 case CURLE_RECV_ERROR:
1086 case CURLE_ABORTED_BY_CALLBACK:
1087 case CURLE_WRITE_ERROR:
1092 "unexpected curl error (%d) while trying to upload %s: %s",
1100 && (info->
request == JobInfo::kReqHeadPut)) {
1103 info->
request = JobInfo::kReqPutCas;
1111 "Failed to initialize CURL handle "
1112 "(error: %d - %s | errno: %d)",
1113 init_failure,
Code2Ascii(init_failure), errno);
1115 SetUrlOptions(info);
1122 bool try_again =
false;
1124 try_again = CanRetry(info);
1127 if (info->
request == JobInfo::kReqPutCas
1128 || info->
request == JobInfo::kReqPutDotCvmfs
1129 || info->
request == JobInfo::kReqPutHtml) {
1154 S3FanoutManager::S3FanoutManager(
const S3Config &config) : config_(config) {
1162 smalloc(
sizeof(pthread_mutex_t)));
1166 smalloc(
sizeof(pthread_mutex_t)));
1182 *
user_agent_ =
"User-Agent: cvmfs " + string(CVMFS_VERSION);
1185 const CURLcode cretval = curl_global_init(CURL_GLOBAL_ALL);
1186 assert(cretval == CURLE_OK);
1190 mretval = curl_multi_setopt(
curl_multi_, CURLMOPT_SOCKETFUNCTION,
1192 assert(mretval == CURLM_OK);
1193 mretval = curl_multi_setopt(
curl_multi_, CURLMOPT_SOCKETDATA,
1194 static_cast<void *>(
this));
1195 assert(mretval == CURLM_OK);
1196 mretval = curl_multi_setopt(
curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1198 assert(mretval == CURLM_OK);
1207 if ((getenv(
"CVMFS_IPV4_ONLY") != NULL)
1208 && (strlen(getenv(
"CVMFS_IPV4_ONLY")) > 0)) {
1216 watch_fds_ =
static_cast<struct pollfd *
>(smalloc(4 *
sizeof(
struct pollfd)));
1241 for (; i != iEnd; ++i) {
1242 curl_easy_cleanup(*i);
1245 set<S3FanOutDnsEntry *>::iterator is =
sharehandles_->begin();
1246 const set<S3FanOutDnsEntry *>::const_iterator isEnd =
sharehandles_->end();
1247 for (; is != isEnd; ++is) {
1248 curl_share_cleanup((*is)->sharehandle);
1249 curl_slist_free_all((*is)->clist);
1267 curl_global_cleanup();
1277 static_cast<void *>(
this));
const char * Code2Ascii(const ObjectFetcherFailures::Failures error)
pthread_mutex_t * jobs_todo_lock_
atomic_int32 multi_threaded_
pthread_mutex_t * curl_handle_lock_
string Sha256String(const string &content)
std::set< CURL * > * pool_handles_idle_
struct curl_slist * http_headers
static size_t CallbackCurlBody(char *, size_t size, size_t nmemb, void *)
std::string IsoTimestamp()
dns::CaresResolver * resolver_
string Trim(const string &raw, bool trim_newline)
bool IsHttpUrl(const std::string &path)
void Hmac(const string &key, const unsigned char *buffer, const unsigned buffer_size, Any *any_digest)
perf::Statistics * statistics_
static void * MainUpload(void *data)
const std::string object_key
assert((mem||(size==0))&&"Out Of Memory")
const std::set< std::string > & ipv4_addresses() const
std::string MkCompleteHostname()
void PushNewJob(JobInfo *info)
SynchronizingCounter< uint32_t > Semaphore
std::string Print() const
bool Debase64(const string &data, string *decoded)
void MakePipe(int pipe_fd[2])
unsigned char digest[digest_size_]
struct curl_slist * clist
std::string * user_agent_
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, void *info_link)
unsigned int max_available_jobs_
std::string RfcTimestamp()
int64_t String2Int64(const string &value)
unsigned GetDigestSize() const
std::set< JobInfo * > * active_requests_
void PushCompletedJob(JobInfo *info)
string Sha256Mem(const unsigned char *buffer, const unsigned buffer_size)
vector< string > SplitString(const string &str, char delim)
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, void *info_link)
bool HasSuffix(const std::string &str, const std::string &suffix, const bool ignore_case)
void UseSystemCertificatePath()
void SetUrlOptions(JobInfo *info) const
CURL * AcquireCurlHandle() const
uint64_t throttle_timestamp
Semaphore * available_jobs_
static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, void *userp, void *socketp)
UniquePtr< FileBackedBuffer > origin
void ReleaseCurlHandle(JobInfo *info, CURL *handle) const
uint32_t pool_max_handles
string StringifyInt(const int64_t value)
struct pollfd * watch_fds_
std::string ExtractPort(const std::string &url)
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
JobInfo * PopCompletedJob()
std::string ExtractHost(const std::string &url)
uint32_t watch_fds_inuse_
unsigned char num_retries
void HashMem(const unsigned char *buffer, const unsigned buffer_size, Any *any_digest)
std::string complete_hostname_
string Base64(const string &data)
uint64_t String2Uint64(const string &value)
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
static CaresResolver * Create(const bool ipv4_only, const unsigned retries, const unsigned timeout_ms)
std::set< S3FanOutDnsEntry * > * sharehandles_
SslCertificateStore ssl_certificate_store_
const Statistics & GetStatistics()
std::string Hmac256(const std::string &key, const std::string &content, bool raw_output)
void SafeSleepMs(const unsigned ms)
void WritePipe(int fd, const void *buf, size_t nbyte)
uint64_t timestamp_last_throttle_report_
std::set< CURL * > * pool_handles_inuse_
void ReadPipe(int fd, void *buf, size_t nbyte)
void ClosePipe(int pipe_fd[2])
std::map< CURL *, S3FanOutDnsEntry * > * curl_sharehandles_
Failures InitializeRequest(JobInfo *info, CURL *handle) const
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)