27 const char *S3FanoutManager::kCacheControlCas =
"Cache-Control: max-age=259200";
29 *S3FanoutManager::kCacheControlDotCvmfs =
"Cache-Control: max-age=61";
30 const unsigned S3FanoutManager::kDefault429ThrottleMs = 250;
31 const unsigned S3FanoutManager::kMax429ThrottleMs = 10000;
32 const unsigned S3FanoutManager::kThrottleReportIntervalSec = 10;
33 const unsigned S3FanoutManager::kDefaultHTTPPort = 80;
34 const unsigned S3FanoutManager::kDefaultHTTPSPort = 443;
40 void S3FanoutManager::DetectThrottleIndicator(
const std::string &header,
42 std::string value_str;
43 if (
HasPrefix(header,
"retry-after:",
true))
44 value_str = header.substr(12);
45 if (
HasPrefix(header,
"x-retry-in:",
true))
46 value_str = header.substr(11);
48 value_str =
Trim(value_str,
true );
49 if (!value_str.empty()) {
51 unsigned value_ms =
HasSuffix(value_str,
"ms",
true )
53 : (value_numeric * 1000);
55 info->
throttle_ms = std::min(value_ms, kMax429ThrottleMs);
65 const size_t num_bytes = size * nmemb;
66 const string header_line(static_cast<const char *>(ptr), num_bytes);
70 if (
HasPrefix(header_line,
"HTTP/1.",
false)) {
71 if (header_line.length() < 10)
75 for (i = 8; (i < header_line.length()) && (header_line[i] ==
' '); ++i) {
78 if (header_line[i] ==
'2') {
82 info, header_line.c_str());
83 if (header_line.length() < i + 3) {
94 info->
throttle_ms = S3FanoutManager::kDefault429ThrottleMs;
120 S3FanoutManager::DetectThrottleIndicator(header_line, info);
132 const size_t num_bytes = size * nmemb;
140 uint64_t read_bytes = info->
origin->Read(ptr, num_bytes);
161 int S3FanoutManager::CallbackCurlSocket(CURL *easy, curl_socket_t s,
int action,
162 void *userp,
void *socketp) {
165 "CallbackCurlSocket called with easy "
166 "handle %p, socket %d, action %d, up %p, "
167 "sp %p, fds_inuse %d, jobs %d",
170 if (action == CURL_POLL_NONE)
185 s3fanout_mgr->
watch_fds_ =
static_cast<struct pollfd *
>(
197 s3fanout_mgr->
watch_fds_[index].events = POLLIN | POLLPRI;
200 s3fanout_mgr->
watch_fds_[index].events = POLLOUT | POLLWRBAND;
202 case CURL_POLL_INOUT:
203 s3fanout_mgr->
watch_fds_[index].events = POLLIN | POLLPRI | POLLOUT
206 case CURL_POLL_REMOVE:
207 if (index < s3fanout_mgr->watch_fds_inuse_ - 1)
217 s3fanout_mgr->
watch_fds_ =
static_cast<struct pollfd *
>(
233 void *S3FanoutManager::MainUpload(
void *data) {
242 unsigned jobs_in_flight = 0;
246 int timeout_ms = 100;
251 int still_running = 0;
252 retval = curl_multi_socket_action(
253 s3fanout_mgr->
curl_multi_, CURL_SOCKET_TIMEOUT, 0, &still_running);
254 if (retval != CURLM_OK) {
256 assert(retval == CURLM_OK);
258 }
else if (retval < 0) {
273 if (handle == NULL) {
280 "Failed to initialize CURL handle (error: %d - %s | errno: %d)",
281 init_failure,
Code2Ascii(init_failure), errno);
285 curl_multi_add_handle(s3fanout_mgr->
curl_multi_, handle);
288 int still_running = 0, retval = 0;
289 retval = curl_multi_socket_action(
290 s3fanout_mgr->
curl_multi_, CURL_SOCKET_TIMEOUT, 0, &still_running);
293 retval, still_running);
309 if (s3fanout_mgr->
watch_fds_[i].revents & (POLLIN | POLLPRI))
310 ev_bitmask |= CURL_CSELECT_IN;
311 if (s3fanout_mgr->
watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
312 ev_bitmask |= CURL_CSELECT_OUT;
314 & (POLLERR | POLLHUP | POLLNVAL))
315 ev_bitmask |= CURL_CSELECT_ERR;
318 int still_running = 0;
319 retval = curl_multi_socket_action(s3fanout_mgr->
curl_multi_,
329 while ((curl_msg = curl_multi_info_read(s3fanout_mgr->
curl_multi_,
331 assert(curl_msg->msg == CURLMSG_DONE);
335 CURL *easy_handle = curl_msg->easy_handle;
336 int curl_error = curl_msg->data.result;
337 curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
339 curl_multi_remove_handle(s3fanout_mgr->
curl_multi_, easy_handle);
341 curl_multi_add_handle(s3fanout_mgr->
curl_multi_, easy_handle);
342 int still_running = 0;
343 curl_multi_socket_action(
344 s3fanout_mgr->
curl_multi_, CURL_SOCKET_TIMEOUT, 0, &still_running);
361 for (; i != i_end; ++i) {
362 curl_multi_remove_handle(s3fanout_mgr->
curl_multi_, *i);
363 curl_easy_cleanup(*i);
377 CURL *S3FanoutManager::AcquireCurlHandle()
const {
382 if (pool_handles_idle_->empty()) {
386 handle = curl_easy_init();
390 retval = curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
391 assert(retval == CURLE_OK);
392 retval = curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION,
394 assert(retval == CURLE_OK);
396 assert(retval == CURLE_OK);
398 assert(retval == CURLE_OK);
400 handle = *(pool_handles_idle_->begin());
401 pool_handles_idle_->erase(pool_handles_idle_->begin());
404 pool_handles_inuse_->insert(handle);
410 void S3FanoutManager::ReleaseCurlHandle(
JobInfo *info, CURL *handle)
const {
418 set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
419 assert(elem != pool_handles_inuse_->end());
421 if (pool_handles_idle_->size() > config_.pool_max_handles) {
422 CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, NULL);
423 assert(retval == CURLE_OK);
424 curl_easy_cleanup(handle);
425 std::map<CURL *, S3FanOutDnsEntry *>::size_type
426 retitems = curl_sharehandles_->erase(handle);
429 pool_handles_idle_->insert(handle);
432 pool_handles_inuse_->erase(elem);
435 void S3FanoutManager::InitPipeWatchFds() {
436 assert(watch_fds_inuse_ == 0);
437 assert(watch_fds_size_ >= 2);
438 watch_fds_[0].fd = pipe_terminate_[0];
439 watch_fds_[0].events = POLLIN | POLLPRI;
440 watch_fds_[0].revents = 0;
442 watch_fds_[1].fd = pipe_jobs_[0];
443 watch_fds_[1].events = POLLIN | POLLPRI;
444 watch_fds_[1].revents = 0;
452 bool S3FanoutManager::MkV2Authz(
const JobInfo &info,
453 vector<string> *headers)
const {
455 bool retval = MkPayloadHash(info, &payload_hash);
458 string content_type = GetContentType(info);
459 string request = GetRequestString(info);
462 string to_sign = request +
"\n" + payload_hash +
"\n" + content_type +
"\n"
464 if (config_.x_amz_acl !=
"") {
465 to_sign +=
"x-amz-acl:" + config_.x_amz_acl +
"\n" +
474 reinterpret_cast<const unsigned char *>(to_sign.data()),
475 to_sign.length(), &hmac);
477 headers->push_back(
"Authorization: AWS " + config_.access_key +
":"
478 +
Base64(
string(reinterpret_cast<char *>(hmac.digest),
479 hmac.GetDigestSize())));
480 headers->push_back(
"Date: " + timestamp);
481 headers->push_back(
"X-Amz-Acl: " + config_.x_amz_acl);
482 if (!payload_hash.empty())
483 headers->push_back(
"Content-MD5: " + payload_hash);
484 if (!content_type.empty())
485 headers->push_back(
"Content-Type: " + content_type);
490 string S3FanoutManager::GetUriEncode(
const string &val,
491 bool encode_slash)
const {
493 const unsigned len = val.length();
495 for (
unsigned i = 0; i < len; ++i) {
497 if ((c >=
'A' && c <=
'Z') || (c >=
'a' && c <=
'z')
498 || (c >=
'0' && c <=
'9') || c ==
'_' || c ==
'-' || c ==
'~'
501 }
else if (c ==
'/') {
508 result.push_back(
'%');
509 result.push_back((c / 16) + ((c / 16 <= 9) ?
'0' :
'A' - 10));
510 result.push_back((c % 16) + ((c % 16 <= 9) ?
'0' :
'A' - 10));
517 string S3FanoutManager::GetAwsV4SigningKey(
const string &date)
const {
518 if (last_signing_key_.first == date)
519 return last_signing_key_.second;
521 string date_key =
shash::Hmac256(
"AWS4" + config_.secret_key, date,
true);
522 string date_region_key =
shash::Hmac256(date_key, config_.region,
true);
523 string date_region_service_key =
shash::Hmac256(date_region_key,
"s3",
true);
524 string signing_key =
shash::Hmac256(date_region_service_key,
"aws4_request",
526 last_signing_key_.first = date;
527 last_signing_key_.second = signing_key;
536 bool S3FanoutManager::MkV4Authz(
const JobInfo &info,
537 vector<string> *headers)
const {
539 bool retval = MkPayloadHash(info, &payload_hash);
542 string content_type = GetContentType(info);
544 string date = timestamp.substr(0, 8);
545 vector<string> tokens =
SplitString(complete_hostname_,
':');
546 assert(tokens.size() <= 2);
547 string canonical_hostname = tokens[0];
551 if (tokens.size() == 2
554 canonical_hostname +=
":" + tokens[1];
556 string signed_headers;
557 string canonical_headers;
558 if (!content_type.empty()) {
559 signed_headers +=
"content-type;";
560 headers->push_back(
"Content-Type: " + content_type);
561 canonical_headers +=
"content-type:" + content_type +
"\n";
563 if (config_.x_amz_acl !=
"") {
564 signed_headers +=
"host;x-amz-acl;x-amz-content-sha256;x-amz-date";
566 signed_headers +=
"host;x-amz-content-sha256;x-amz-date";
568 canonical_headers +=
"host:" + canonical_hostname +
"\n";
569 if (config_.x_amz_acl !=
"") {
570 canonical_headers +=
"x-amz-acl:" + config_.x_amz_acl +
"\n";
572 canonical_headers +=
"x-amz-content-sha256:" + payload_hash +
"\n"
573 +
"x-amz-date:" + timestamp +
"\n";
575 string scope = date +
"/" + config_.region +
"/s3/aws4_request";
576 string uri = config_.dns_buckets
578 : (
string(
"/") + config_.bucket +
"/" + info.
object_key);
580 string canonical_request = GetRequestString(info) +
"\n"
581 + GetUriEncode(uri,
false) +
"\n" +
"\n"
582 + canonical_headers +
"\n" + signed_headers +
"\n"
587 string string_to_sign =
"AWS4-HMAC-SHA256\n" + timestamp +
"\n" + scope +
"\n"
590 string signing_key = GetAwsV4SigningKey(date);
593 headers->push_back(
"X-Amz-Acl: " + config_.x_amz_acl);
594 headers->push_back(
"X-Amz-Content-Sha256: " + payload_hash);
595 headers->push_back(
"X-Amz-Date: " + timestamp);
596 headers->push_back(
"Authorization: AWS4-HMAC-SHA256 "
598 + config_.access_key +
"/" + scope
612 bool S3FanoutManager::MkAzureAuthz(
const JobInfo &info,
613 vector<string> *headers)
const {
615 string canonical_headers =
"x-ms-blob-type:BlockBlob\nx-ms-date:" + timestamp
616 +
"\nx-ms-version:2011-08-18";
617 string canonical_resource =
"/" + config_.access_key +
"/" + config_.bucket
620 string string_to_sign;
621 if ((info.
request == JobInfo::kReqHeadOnly)
622 || (info.
request == JobInfo::kReqHeadPut)
623 || (info.
request == JobInfo::kReqDelete)) {
624 string_to_sign = GetRequestString(info) + string(
"\n\n\n")
625 +
"\n\n\n\n\n\n\n\n\n" + canonical_headers +
"\n"
626 + canonical_resource;
628 string_to_sign = GetRequestString(info) + string(
"\n\n\n")
630 +
"\n\n\n\n\n\n\n\n\n" + canonical_headers +
"\n"
631 + canonical_resource;
635 int retval =
Debase64(config_.secret_key, &signing_key);
639 string signature =
shash::Hmac256(signing_key, string_to_sign,
true);
641 headers->push_back(
"x-ms-date: " + timestamp);
642 headers->push_back(
"x-ms-version: 2011-08-18");
643 headers->push_back(
"Authorization: SharedKey " + config_.access_key +
":"
645 headers->push_back(
"x-ms-blob-type: BlockBlob");
649 void S3FanoutManager::InitializeDnsSettingsCurl(CURL *handle,
651 curl_slist *clist)
const {
652 CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, sharehandle);
653 assert(retval == CURLE_OK);
654 retval = curl_easy_setopt(handle, CURLOPT_RESOLVE, clist);
655 assert(retval == CURLE_OK);
659 int S3FanoutManager::InitializeDnsSettings(CURL *handle,
660 std::string host_with_port)
const {
662 std::map<CURL *, S3FanOutDnsEntry *>::const_iterator it = curl_sharehandles_
664 if (it != curl_sharehandles_->end()) {
665 InitializeDnsSettingsCurl(handle, it->second->sharehandle,
672 host_with_port = config_.protocol +
"://" + host_with_port;
678 unsigned int usemin = UINT_MAX;
679 std::set<S3FanOutDnsEntry *>::iterator its3 = sharehandles_->begin();
680 for (; its3 != sharehandles_->end(); ++its3) {
681 if ((*its3)->dns_name == remote_host) {
682 if (usemin >= (*its3)->counter) {
689 curl_sharehandles_->insert(
690 std::pair<CURL *, S3FanOutDnsEntry *>(handle, useme));
698 dns::Host host = resolver_->Resolve(remote_host);
700 std::set<string>::iterator its = ipv4_addresses.begin();
702 for (; its != ipv4_addresses.end(); ++its) {
706 dnse->
port = remote_port.size() == 0 ?
"80" : remote_port;
709 dnse->
clist = curl_slist_append(
714 CURLSHcode share_retval = curl_share_setopt(
715 dnse->
sharehandle, CURLSHOPT_SHARE, CURL_LOCK_DATA_DNS);
716 assert(share_retval == CURLSHE_OK);
717 sharehandles_->insert(dnse);
721 "Error: DNS resolve failed for address '%s'.",
722 remote_host.c_str());
726 curl_sharehandles_->insert(
727 std::pair<CURL *, S3FanOutDnsEntry *>(handle, dnse));
735 bool S3FanoutManager::MkPayloadHash(
const JobInfo &info,
736 string *hex_hash)
const {
737 if ((info.
request == JobInfo::kReqHeadOnly)
738 || (info.
request == JobInfo::kReqHeadPut)
739 || (info.
request == JobInfo::kReqDelete)) {
740 switch (config_.authz_method) {
746 *hex_hash =
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b78"
763 unsigned int nbytes = info.
origin->Data(reinterpret_cast<void **>(&data),
764 info.
origin->GetSize(), 0);
767 switch (config_.authz_method) {
770 *hex_hash =
Base64(
string(reinterpret_cast<char *>(payload_hash.
digest),
785 string S3FanoutManager::GetRequestString(
const JobInfo &info)
const {
787 case JobInfo::kReqHeadOnly:
788 case JobInfo::kReqHeadPut:
790 case JobInfo::kReqPutCas:
791 case JobInfo::kReqPutDotCvmfs:
792 case JobInfo::kReqPutHtml:
793 case JobInfo::kReqPutBucket:
795 case JobInfo::kReqDelete:
803 string S3FanoutManager::GetContentType(
const JobInfo &info)
const {
805 case JobInfo::kReqHeadOnly:
806 case JobInfo::kReqHeadPut:
807 case JobInfo::kReqDelete:
809 case JobInfo::kReqPutCas:
810 return "application/octet-stream";
811 case JobInfo::kReqPutDotCvmfs:
812 return "application/x-cvmfs";
813 case JobInfo::kReqPutHtml:
815 case JobInfo::kReqPutBucket:
841 InitializeDnsSettings(handle, complete_hostname_);
844 if ((info->
request == JobInfo::kReqHeadOnly)
845 || (info->
request == JobInfo::kReqHeadPut)
846 || (info->
request == JobInfo::kReqDelete)) {
847 retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 0);
848 assert(retval == CURLE_OK);
849 retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
850 assert(retval == CURLE_OK);
852 if (info->
request == JobInfo::kReqDelete) {
853 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST,
854 GetRequestString(*info).c_str());
855 assert(retval == CURLE_OK);
857 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
858 assert(retval == CURLE_OK);
861 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
862 assert(retval == CURLE_OK);
863 retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 1);
864 assert(retval == CURLE_OK);
865 retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 0);
866 assert(retval == CURLE_OK);
867 retval = curl_easy_setopt(handle, CURLOPT_INFILESIZE_LARGE,
868 static_cast<curl_off_t>(info->
origin->GetSize()));
869 assert(retval == CURLE_OK);
871 if (info->
request == JobInfo::kReqPutDotCvmfs) {
873 kCacheControlDotCvmfs);
874 }
else if (info->
request == JobInfo::kReqPutCas) {
883 vector<string> authz_headers;
884 switch (config_.authz_method) {
886 retval_b = MkV2Authz(*info, &authz_headers);
889 retval_b = MkV4Authz(*info, &authz_headers);
892 retval_b = MkAzureAuthz(*info, &authz_headers);
899 for (
unsigned i = 0; i < authz_headers.size(); ++i) {
901 authz_headers[i].c_str());
906 "Connection: Keep-Alive");
913 user_agent_->c_str());
916 retval = curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
917 assert(retval == CURLE_OK);
918 retval = curl_easy_setopt(handle, CURLOPT_HEADERDATA,
919 static_cast<void *>(info));
920 assert(retval == CURLE_OK);
921 retval = curl_easy_setopt(handle, CURLOPT_READDATA,
922 static_cast<void *>(info));
923 assert(retval == CURLE_OK);
924 retval = curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->
http_headers);
925 assert(retval == CURLE_OK);
926 if (opt_ipv4_only_) {
927 retval = curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
928 assert(retval == CURLE_OK);
931 retval = curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L);
932 assert(retval == CURLE_OK);
934 retval = curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, info->
errorbuffer);
935 assert(retval == CURLE_OK);
937 if (config_.protocol ==
"https") {
938 retval = curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 1L);
939 assert(retval == CURLE_OK);
940 retval = curl_easy_setopt(handle, CURLOPT_PROXY_SSL_VERIFYPEER, 1L);
941 assert(retval == CURLE_OK);
942 bool add_cert = ssl_certificate_store_.ApplySslCertificatePath(handle);
953 void S3FanoutManager::SetUrlOptions(
JobInfo *info)
const {
957 retval = curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT,
958 config_.opt_timeout_sec);
959 assert(retval == CURLE_OK);
960 retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT,
962 assert(retval == CURLE_OK);
963 retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME,
964 config_.opt_timeout_sec);
965 assert(retval == CURLE_OK);
967 if (is_curl_debug_) {
968 retval = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE, 1);
969 assert(retval == CURLE_OK);
973 retval = curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
974 assert(retval == CURLE_OK);
976 retval = curl_easy_setopt(curl_handle, CURLOPT_PROXY, config_.proxy.c_str());
977 assert(retval == CURLE_OK);
984 void S3FanoutManager::UpdateStatistics(CURL *handle) {
987 if (curl_easy_getinfo(handle, CURLINFO_SIZE_UPLOAD, &val) == CURLE_OK)
995 bool S3FanoutManager::CanRetry(
const JobInfo *info) {
1019 if ((now - timestamp_last_throttle_report_)
1020 > kThrottleReportIntervalSec) {
1022 "Warning: S3 backend throttling %ums "
1023 "(total backoff time so far %lums)",
1025 timestamp_last_throttle_report_ = now;
1033 info->
backoff_ms = prng_.Next(config_.opt_backoff_init_ms + 1);
1037 if (info->
backoff_ms > config_.opt_backoff_max_ms)
1038 info->
backoff_ms = config_.opt_backoff_max_ms;
1053 bool S3FanoutManager::VerifyAndFinalize(
const int curl_error,
JobInfo *info) {
1055 "Verify uploaded/tested object %s "
1056 "(curl error %d, info error %d, info request %d)",
1062 switch (curl_error) {
1069 case CURLE_UNSUPPORTED_PROTOCOL:
1070 case CURLE_URL_MALFORMAT:
1073 case CURLE_COULDNT_RESOLVE_HOST:
1076 case CURLE_COULDNT_CONNECT:
1077 case CURLE_OPERATION_TIMEDOUT:
1078 case CURLE_SEND_ERROR:
1079 case CURLE_RECV_ERROR:
1082 case CURLE_ABORTED_BY_CALLBACK:
1083 case CURLE_WRITE_ERROR:
1088 "unexpected curl error (%d) while trying to upload %s: %s",
1096 && (info->
request == JobInfo::kReqHeadPut)) {
1099 info->
request = JobInfo::kReqPutCas;
1107 "Failed to initialize CURL handle "
1108 "(error: %d - %s | errno: %d)",
1109 init_failure,
Code2Ascii(init_failure), errno);
1111 SetUrlOptions(info);
1118 bool try_again =
false;
1120 try_again = CanRetry(info);
1123 if (info->
request == JobInfo::kReqPutCas
1124 || info->
request == JobInfo::kReqPutDotCvmfs
1125 || info->
request == JobInfo::kReqPutHtml) {
1150 S3FanoutManager::S3FanoutManager(
const S3Config &config) : config_(config) {
1158 smalloc(
sizeof(pthread_mutex_t)));
1162 smalloc(
sizeof(pthread_mutex_t)));
1178 *
user_agent_ =
"User-Agent: cvmfs " + string(CVMFS_VERSION);
1181 CURLcode cretval = curl_global_init(CURL_GLOBAL_ALL);
1182 assert(cretval == CURLE_OK);
1186 mretval = curl_multi_setopt(
curl_multi_, CURLMOPT_SOCKETFUNCTION,
1188 assert(mretval == CURLM_OK);
1189 mretval = curl_multi_setopt(
curl_multi_, CURLMOPT_SOCKETDATA,
1190 static_cast<void *>(
this));
1191 assert(mretval == CURLM_OK);
1192 mretval = curl_multi_setopt(
curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1194 assert(mretval == CURLM_OK);
1203 if ((getenv(
"CVMFS_IPV4_ONLY") != NULL)
1204 && (strlen(getenv(
"CVMFS_IPV4_ONLY")) > 0)) {
1212 watch_fds_ =
static_cast<struct pollfd *
>(smalloc(4 *
sizeof(
struct pollfd)));
1237 for (; i != iEnd; ++i) {
1238 curl_easy_cleanup(*i);
1241 set<S3FanOutDnsEntry *>::iterator is =
sharehandles_->begin();
1242 const set<S3FanOutDnsEntry *>::const_iterator isEnd =
sharehandles_->end();
1243 for (; is != isEnd; ++is) {
1244 curl_share_cleanup((*is)->sharehandle);
1245 curl_slist_free_all((*is)->clist);
1263 curl_global_cleanup();
1273 static_cast<void *>(
this));
const char * Code2Ascii(const ObjectFetcherFailures::Failures error)
pthread_mutex_t * jobs_todo_lock_
atomic_int32 multi_threaded_
pthread_mutex_t * curl_handle_lock_
string Sha256String(const string &content)
std::set< CURL * > * pool_handles_idle_
struct curl_slist * http_headers
static size_t CallbackCurlBody(char *, size_t size, size_t nmemb, void *)
std::string IsoTimestamp()
dns::CaresResolver * resolver_
string Trim(const string &raw, bool trim_newline)
bool IsHttpUrl(const std::string &path)
void Hmac(const string &key, const unsigned char *buffer, const unsigned buffer_size, Any *any_digest)
perf::Statistics * statistics_
static void * MainUpload(void *data)
const std::string object_key
assert((mem||(size==0))&&"Out Of Memory")
const std::set< std::string > & ipv4_addresses() const
std::string MkCompleteHostname()
void PushNewJob(JobInfo *info)
SynchronizingCounter< uint32_t > Semaphore
std::string Print() const
bool Debase64(const string &data, string *decoded)
void MakePipe(int pipe_fd[2])
unsigned char digest[digest_size_]
struct curl_slist * clist
std::string * user_agent_
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, void *info_link)
unsigned int max_available_jobs_
std::string RfcTimestamp()
int64_t String2Int64(const string &value)
unsigned GetDigestSize() const
std::set< JobInfo * > * active_requests_
void PushCompletedJob(JobInfo *info)
string Sha256Mem(const unsigned char *buffer, const unsigned buffer_size)
vector< string > SplitString(const string &str, char delim)
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, void *info_link)
bool HasSuffix(const std::string &str, const std::string &suffix, const bool ignore_case)
void UseSystemCertificatePath()
void SetUrlOptions(JobInfo *info) const
CURL * AcquireCurlHandle() const
uint64_t throttle_timestamp
Semaphore * available_jobs_
static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, void *userp, void *socketp)
UniquePtr< FileBackedBuffer > origin
void ReleaseCurlHandle(JobInfo *info, CURL *handle) const
uint32_t pool_max_handles
string StringifyInt(const int64_t value)
struct pollfd * watch_fds_
std::string ExtractPort(const std::string &url)
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
JobInfo * PopCompletedJob()
std::string ExtractHost(const std::string &url)
uint32_t watch_fds_inuse_
unsigned char num_retries
void HashMem(const unsigned char *buffer, const unsigned buffer_size, Any *any_digest)
std::string complete_hostname_
string Base64(const string &data)
uint64_t String2Uint64(const string &value)
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
static CaresResolver * Create(const bool ipv4_only, const unsigned retries, const unsigned timeout_ms)
std::set< S3FanOutDnsEntry * > * sharehandles_
SslCertificateStore ssl_certificate_store_
const Statistics & GetStatistics()
std::string Hmac256(const std::string &key, const std::string &content, bool raw_output)
void SafeSleepMs(const unsigned ms)
void WritePipe(int fd, const void *buf, size_t nbyte)
uint64_t timestamp_last_throttle_report_
std::set< CURL * > * pool_handles_inuse_
void ReadPipe(int fd, void *buf, size_t nbyte)
void ClosePipe(int pipe_fd[2])
std::map< CURL *, S3FanOutDnsEntry * > * curl_sharehandles_
Failures InitializeRequest(JobInfo *info, CURL *handle) const
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)