14 #include "cvmfs_config.h"
27 const char *S3FanoutManager::kCacheControlCas =
"Cache-Control: max-age=259200";
28 const char *S3FanoutManager::kCacheControlDotCvmfs =
29 "Cache-Control: max-age=61";
30 const unsigned S3FanoutManager::kDefault429ThrottleMs = 250;
31 const unsigned S3FanoutManager::kMax429ThrottleMs = 10000;
32 const unsigned S3FanoutManager::kThrottleReportIntervalSec = 10;
33 const unsigned S3FanoutManager::kDefaultHTTPPort = 80;
34 const unsigned S3FanoutManager::kDefaultHTTPSPort = 443;
40 void S3FanoutManager::DetectThrottleIndicator(
41 const std::string &header,
44 std::string value_str;
45 if (
HasPrefix(header,
"retry-after:",
true))
46 value_str = header.substr(12);
47 if (
HasPrefix(header,
"x-retry-in:",
true))
48 value_str = header.substr(11);
50 value_str =
Trim(value_str,
true );
51 if (!value_str.empty()) {
55 value_numeric : (value_numeric * 1000);
57 info->
throttle_ms = std::min(value_ms, kMax429ThrottleMs);
67 const size_t num_bytes = size*nmemb;
68 const string header_line(static_cast<const char *>(ptr), num_bytes);
72 if (
HasPrefix(header_line,
"HTTP/1.",
false)) {
73 if (header_line.length() < 10)
77 for (i = 8; (i < header_line.length()) && (header_line[i] ==
' '); ++i) {}
79 if (header_line[i] ==
'2') {
83 info, header_line.c_str());
84 if (header_line.length() < i+3) {
95 info->
throttle_ms = S3FanoutManager::kDefault429ThrottleMs;
121 S3FanoutManager::DetectThrottleIndicator(header_line, info);
133 const size_t num_bytes = size*nmemb;
141 uint64_t read_bytes = info->
origin->Read(ptr, num_bytes);
144 "source buffer pushed out %lu bytes", read_bytes);
154 char * ,
size_t size,
size_t nmemb,
void * )
163 int S3FanoutManager::CallbackCurlSocket(CURL *easy, curl_socket_t s,
int action,
164 void *userp,
void *socketp) {
167 "handle %p, socket %d, action %d, up %p, "
168 "sp %p, fds_inuse %d, jobs %d",
169 easy, s, action, userp,
172 if (action == CURL_POLL_NONE)
187 s3fanout_mgr->
watch_fds_ =
static_cast<struct pollfd *
>(
199 s3fanout_mgr->
watch_fds_[index].events = POLLIN | POLLPRI;
202 s3fanout_mgr->
watch_fds_[index].events = POLLOUT | POLLWRBAND;
204 case CURL_POLL_INOUT:
206 POLLIN | POLLPRI | POLLOUT | POLLWRBAND;
208 case CURL_POLL_REMOVE:
209 if (index < s3fanout_mgr->watch_fds_inuse_-1)
217 s3fanout_mgr->
watch_fds_ =
static_cast<struct pollfd *
>(
233 void *S3FanoutManager::MainUpload(
void *data) {
242 unsigned jobs_in_flight = 0;
246 int timeout_ms = 100;
251 int still_running = 0;
252 retval = curl_multi_socket_action(s3fanout_mgr->
curl_multi_,
256 if (retval != CURLM_OK) {
258 assert(retval == CURLM_OK);
260 }
else if (retval < 0) {
275 if (handle == NULL) {
282 "Failed to initialize CURL handle (error: %d - %s | errno: %d)",
283 init_failure,
Code2Ascii(init_failure), errno);
287 curl_multi_add_handle(s3fanout_mgr->
curl_multi_, handle);
290 int still_running = 0, retval = 0;
291 retval = curl_multi_socket_action(s3fanout_mgr->
curl_multi_,
297 "curl_multi_socket_action: %d - %d",
298 retval, still_running);
314 if (s3fanout_mgr->
watch_fds_[i].revents & (POLLIN | POLLPRI))
315 ev_bitmask |= CURL_CSELECT_IN;
316 if (s3fanout_mgr->
watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
317 ev_bitmask |= CURL_CSELECT_OUT;
319 (POLLERR | POLLHUP | POLLNVAL))
320 ev_bitmask |= CURL_CSELECT_ERR;
323 int still_running = 0;
324 retval = curl_multi_socket_action(s3fanout_mgr->
curl_multi_,
334 while ((curl_msg = curl_multi_info_read(s3fanout_mgr->
curl_multi_,
337 assert(curl_msg->msg == CURLMSG_DONE);
341 CURL *easy_handle = curl_msg->easy_handle;
342 int curl_error = curl_msg->data.result;
343 curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
345 curl_multi_remove_handle(s3fanout_mgr->
curl_multi_, easy_handle);
347 curl_multi_add_handle(s3fanout_mgr->
curl_multi_, easy_handle);
348 int still_running = 0;
349 curl_multi_socket_action(s3fanout_mgr->
curl_multi_,
367 const set<CURL *>::const_iterator i_end =
369 for (; i != i_end; ++i) {
370 curl_multi_remove_handle(s3fanout_mgr->
curl_multi_, *i);
371 curl_easy_cleanup(*i);
385 CURL *S3FanoutManager::AcquireCurlHandle()
const {
390 if (pool_handles_idle_->empty()) {
394 handle = curl_easy_init();
398 retval = curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
399 assert(retval == CURLE_OK);
400 retval = curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION,
402 assert(retval == CURLE_OK);
404 assert(retval == CURLE_OK);
406 assert(retval == CURLE_OK);
408 handle = *(pool_handles_idle_->begin());
409 pool_handles_idle_->erase(pool_handles_idle_->begin());
412 pool_handles_inuse_->insert(handle);
418 void S3FanoutManager::ReleaseCurlHandle(
JobInfo *info, CURL *handle)
const {
426 set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
427 assert(elem != pool_handles_inuse_->end());
429 if (pool_handles_idle_->size() > config_.pool_max_handles) {
430 CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, NULL);
431 assert(retval == CURLE_OK);
432 curl_easy_cleanup(handle);
433 std::map<CURL *, S3FanOutDnsEntry *>::size_type retitems =
434 curl_sharehandles_->erase(handle);
437 pool_handles_idle_->insert(handle);
440 pool_handles_inuse_->erase(elem);
443 void S3FanoutManager::InitPipeWatchFds() {
444 assert(watch_fds_inuse_ == 0);
445 assert(watch_fds_size_ >= 2);
446 watch_fds_[0].fd = pipe_terminate_[0];
447 watch_fds_[0].events = POLLIN | POLLPRI;
448 watch_fds_[0].revents = 0;
450 watch_fds_[1].fd = pipe_jobs_[0];
451 watch_fds_[1].events = POLLIN | POLLPRI;
452 watch_fds_[1].revents = 0;
460 bool S3FanoutManager::MkV2Authz(
const JobInfo &info, vector<string> *headers)
464 bool retval = MkPayloadHash(info, &payload_hash);
467 string content_type = GetContentType(info);
468 string request = GetRequestString(info);
471 string to_sign = request +
"\n" +
472 payload_hash +
"\n" +
473 content_type +
"\n" +
475 if (config_.x_amz_acl !=
"") {
476 to_sign +=
"x-amz-acl:" + config_.x_amz_acl +
"\n" +
485 reinterpret_cast<const unsigned char *>(to_sign.data()),
486 to_sign.length(), &hmac);
488 headers->push_back(
"Authorization: AWS " + config_.access_key +
":" +
489 Base64(
string(reinterpret_cast<char *>(hmac.digest),
490 hmac.GetDigestSize())));
491 headers->push_back(
"Date: " + timestamp);
492 headers->push_back(
"X-Amz-Acl: " + config_.x_amz_acl);
493 if (!payload_hash.empty())
494 headers->push_back(
"Content-MD5: " + payload_hash);
495 if (!content_type.empty())
496 headers->push_back(
"Content-Type: " + content_type);
501 string S3FanoutManager::GetUriEncode(
const string &val,
bool encode_slash)
505 const unsigned len = val.length();
507 for (
unsigned i = 0; i < len; ++i) {
509 if ((c >=
'A' && c <=
'Z') ||
510 (c >=
'a' && c <=
'z') ||
511 (c >=
'0' && c <=
'9') ||
512 c ==
'_' || c ==
'-' || c ==
'~' || c ==
'.')
515 }
else if (c ==
'/') {
522 result.push_back(
'%');
523 result.push_back((c / 16) + ((c / 16 <= 9) ?
'0' :
'A'-10));
524 result.push_back((c % 16) + ((c % 16 <= 9) ?
'0' :
'A'-10));
531 string S3FanoutManager::GetAwsV4SigningKey(
const string &date)
const
533 if (last_signing_key_.first == date)
534 return last_signing_key_.second;
536 string date_key =
shash::Hmac256(
"AWS4" + config_.secret_key, date,
true);
537 string date_region_key =
shash::Hmac256(date_key, config_.region,
true);
538 string date_region_service_key =
shash::Hmac256(date_region_key,
"s3",
true);
541 last_signing_key_.first = date;
542 last_signing_key_.second = signing_key;
551 bool S3FanoutManager::MkV4Authz(
const JobInfo &info, vector<string> *headers)
555 bool retval = MkPayloadHash(info, &payload_hash);
558 string content_type = GetContentType(info);
560 string date = timestamp.substr(0, 8);
561 vector<string> tokens =
SplitString(complete_hostname_,
':');
562 assert(tokens.size() <= 2);
563 string canonical_hostname = tokens[0];
567 if (tokens.size() == 2 && !((
String2Uint64(tokens[1]) == kDefaultHTTPPort) ||
569 canonical_hostname +=
":" + tokens[1];
571 string signed_headers;
572 string canonical_headers;
573 if (!content_type.empty()) {
574 signed_headers +=
"content-type;";
575 headers->push_back(
"Content-Type: " + content_type);
576 canonical_headers +=
"content-type:" + content_type +
"\n";
578 if (config_.x_amz_acl !=
"") {
579 signed_headers +=
"host;x-amz-acl;x-amz-content-sha256;x-amz-date";
581 signed_headers +=
"host;x-amz-content-sha256;x-amz-date";
584 "host:" + canonical_hostname +
"\n";
585 if (config_.x_amz_acl !=
"") {
586 canonical_headers +=
"x-amz-acl:" + config_.x_amz_acl +
"\n";
588 canonical_headers +=
"x-amz-content-sha256:" + payload_hash +
"\n" +
589 "x-amz-date:" + timestamp +
"\n";
591 string scope = date +
"/" + config_.region +
"/s3/aws4_request";
592 string uri = config_.dns_buckets ?
594 (
string(
"/") + config_.bucket +
"/" + info.
object_key);
596 string canonical_request =
597 GetRequestString(info) +
"\n" +
598 GetUriEncode(uri,
false) +
"\n" +
600 canonical_headers +
"\n" +
601 signed_headers +
"\n" +
606 string string_to_sign =
607 "AWS4-HMAC-SHA256\n" +
612 string signing_key = GetAwsV4SigningKey(date);
615 headers->push_back(
"X-Amz-Acl: " + config_.x_amz_acl);
616 headers->push_back(
"X-Amz-Content-Sha256: " + payload_hash);
617 headers->push_back(
"X-Amz-Date: " + timestamp);
619 "Authorization: AWS4-HMAC-SHA256 "
620 "Credential=" + config_.access_key +
"/" + scope +
","
621 "SignedHeaders=" + signed_headers +
","
622 "Signature=" + signature);
630 bool S3FanoutManager::MkAzureAuthz(
const JobInfo &info, vector<string> *headers)
634 string canonical_headers =
635 "x-ms-blob-type:BlockBlob\nx-ms-date:" +
637 "\nx-ms-version:2011-08-18";
638 string canonical_resource =
639 "/" + config_.access_key +
"/" + config_.bucket +
"/" + info.
object_key;
641 string string_to_sign;
642 if ((info.
request == JobInfo::kReqHeadOnly) ||
643 (info.
request == JobInfo::kReqHeadPut) ||
644 (info.
request == JobInfo::kReqDelete)) {
646 GetRequestString(info) +
648 "\n\n\n\n\n\n\n\n\n" +
649 canonical_headers +
"\n" +
653 GetRequestString(info) +
656 canonical_headers +
"\n" +
661 int retval =
Debase64(config_.secret_key, &signing_key);
665 string signature =
shash::Hmac256(signing_key, string_to_sign,
true);
667 headers->push_back(
"x-ms-date: " + timestamp);
668 headers->push_back(
"x-ms-version: 2011-08-18");
670 "Authorization: SharedKey " + config_.access_key +
":" +
Base64(signature));
671 headers->push_back(
"x-ms-blob-type: BlockBlob");
675 void S3FanoutManager::InitializeDnsSettingsCurl(
678 curl_slist *clist)
const
680 CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, sharehandle);
681 assert(retval == CURLE_OK);
682 retval = curl_easy_setopt(handle, CURLOPT_RESOLVE, clist);
683 assert(retval == CURLE_OK);
687 int S3FanoutManager::InitializeDnsSettings(
689 std::string host_with_port)
const
692 std::map<CURL *, S3FanOutDnsEntry *>::const_iterator it =
693 curl_sharehandles_->find(handle);
694 if (it != curl_sharehandles_->end()) {
695 InitializeDnsSettingsCurl(handle, it->second->sharehandle,
702 host_with_port = config_.protocol +
"://" + host_with_port;
708 unsigned int usemin = UINT_MAX;
709 std::set<S3FanOutDnsEntry *>::iterator its3 = sharehandles_->begin();
710 for (; its3 != sharehandles_->end(); ++its3) {
711 if ((*its3)->dns_name == remote_host) {
712 if (usemin >= (*its3)->counter) {
719 curl_sharehandles_->insert(std::pair<CURL *,
728 dns::Host host = resolver_->Resolve(remote_host);
730 std::set<string>::iterator its = ipv4_addresses.begin();
732 for ( ; its != ipv4_addresses.end(); ++its) {
736 dnse->
port = remote_port.size() == 0 ?
"80" : remote_port;
745 CURLSHcode share_retval = curl_share_setopt(dnse->
sharehandle,
748 assert(share_retval == CURLSHE_OK);
749 sharehandles_->insert(dnse);
753 "Error: DNS resolve failed for address '%s'.",
754 remote_host.c_str());
758 curl_sharehandles_->insert(
759 std::pair<CURL *, S3FanOutDnsEntry *>(handle, dnse));
767 bool S3FanoutManager::MkPayloadHash(
const JobInfo &info,
string *hex_hash)
770 if ((info.
request == JobInfo::kReqHeadOnly) ||
771 (info.
request == JobInfo::kReqHeadPut) ||
772 (info.
request == JobInfo::kReqDelete))
774 switch (config_.authz_method) {
781 "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
797 unsigned int nbytes =
798 info.
origin->Data(reinterpret_cast<void **>(&data),
799 info.
origin->GetSize(), 0);
802 switch (config_.authz_method) {
806 Base64(
string(reinterpret_cast<char *>(payload_hash.
digest),
822 string S3FanoutManager::GetRequestString(
const JobInfo &info)
const {
824 case JobInfo::kReqHeadOnly:
825 case JobInfo::kReqHeadPut:
827 case JobInfo::kReqPutCas:
828 case JobInfo::kReqPutDotCvmfs:
829 case JobInfo::kReqPutHtml:
830 case JobInfo::kReqPutBucket:
832 case JobInfo::kReqDelete:
840 string S3FanoutManager::GetContentType(
const JobInfo &info)
const {
842 case JobInfo::kReqHeadOnly:
843 case JobInfo::kReqHeadPut:
844 case JobInfo::kReqDelete:
846 case JobInfo::kReqPutCas:
847 return "application/octet-stream";
848 case JobInfo::kReqPutDotCvmfs:
849 return "application/x-cvmfs";
850 case JobInfo::kReqPutHtml:
852 case JobInfo::kReqPutBucket:
878 InitializeDnsSettings(handle, complete_hostname_);
881 if ((info->
request == JobInfo::kReqHeadOnly) ||
882 (info->
request == JobInfo::kReqHeadPut) ||
883 (info->
request == JobInfo::kReqDelete))
885 retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 0);
886 assert(retval == CURLE_OK);
887 retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
888 assert(retval == CURLE_OK);
890 if (info->
request == JobInfo::kReqDelete)
892 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST,
893 GetRequestString(*info).c_str());
894 assert(retval == CURLE_OK);
896 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
897 assert(retval == CURLE_OK);
900 retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
901 assert(retval == CURLE_OK);
902 retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 1);
903 assert(retval == CURLE_OK);
904 retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 0);
905 assert(retval == CURLE_OK);
906 retval = curl_easy_setopt(handle, CURLOPT_INFILESIZE_LARGE,
907 static_cast<curl_off_t>(info->
origin->GetSize()));
908 assert(retval == CURLE_OK);
910 if (info->
request == JobInfo::kReqPutDotCvmfs) {
912 curl_slist_append(info->
http_headers, kCacheControlDotCvmfs);
913 }
else if (info->
request == JobInfo::kReqPutCas) {
915 curl_slist_append(info->
http_headers, kCacheControlCas);
922 vector<string> authz_headers;
923 switch (config_.authz_method) {
925 retval_b = MkV2Authz(*info, &authz_headers);
928 retval_b = MkV4Authz(*info, &authz_headers);
931 retval_b = MkAzureAuthz(*info, &authz_headers);
938 for (
unsigned i = 0; i < authz_headers.size(); ++i) {
940 curl_slist_append(info->
http_headers, authz_headers[i].c_str());
945 curl_slist_append(info->
http_headers,
"Connection: Keep-Alive");
952 user_agent_->c_str());
955 retval = curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
956 assert(retval == CURLE_OK);
957 retval = curl_easy_setopt(handle, CURLOPT_HEADERDATA,
958 static_cast<void *>(info));
959 assert(retval == CURLE_OK);
960 retval = curl_easy_setopt(handle, CURLOPT_READDATA,
961 static_cast<void *>(info));
962 assert(retval == CURLE_OK);
963 retval = curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->
http_headers);
964 assert(retval == CURLE_OK);
965 if (opt_ipv4_only_) {
966 retval = curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
967 assert(retval == CURLE_OK);
970 retval = curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L);
971 assert(retval == CURLE_OK);
973 retval = curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, info->
errorbuffer);
974 assert(retval == CURLE_OK);
976 if (config_.protocol ==
"https") {
977 retval = curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 1L);
978 assert(retval == CURLE_OK);
979 retval = curl_easy_setopt(handle, CURLOPT_PROXY_SSL_VERIFYPEER, 1L);
980 assert(retval == CURLE_OK);
981 bool add_cert = ssl_certificate_store_.ApplySslCertificatePath(handle);
992 void S3FanoutManager::SetUrlOptions(
JobInfo *info)
const {
996 retval = curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT,
997 config_.opt_timeout_sec);
998 assert(retval == CURLE_OK);
999 retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT,
1001 assert(retval == CURLE_OK);
1002 retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME,
1003 config_.opt_timeout_sec);
1004 assert(retval == CURLE_OK);
1006 if (is_curl_debug_) {
1007 retval = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE, 1);
1008 assert(retval == CURLE_OK);
1012 retval = curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
1013 assert(retval == CURLE_OK);
1015 retval = curl_easy_setopt(curl_handle, CURLOPT_PROXY, config_.proxy.c_str());
1016 assert(retval == CURLE_OK);
1023 void S3FanoutManager::UpdateStatistics(CURL *handle) {
1026 if (curl_easy_getinfo(handle, CURLINFO_SIZE_UPLOAD, &val) == CURLE_OK)
1034 bool S3FanoutManager::CanRetry(
const JobInfo *info) {
1059 if ((now - timestamp_last_throttle_report_) > kThrottleReportIntervalSec)
1062 "Warning: S3 backend throttling %ums "
1063 "(total backoff time so far %lums)",
1066 timestamp_last_throttle_report_ = now;
1074 info->
backoff_ms = prng_.Next(config_.opt_backoff_init_ms + 1);
1078 if (info->
backoff_ms > config_.opt_backoff_max_ms)
1079 info->
backoff_ms = config_.opt_backoff_max_ms;
1094 bool S3FanoutManager::VerifyAndFinalize(
const int curl_error,
JobInfo *info) {
1096 "(curl error %d, info error %d, info request %d)",
1102 switch (curl_error) {
1110 case CURLE_UNSUPPORTED_PROTOCOL:
1111 case CURLE_URL_MALFORMAT:
1114 case CURLE_COULDNT_RESOLVE_HOST:
1117 case CURLE_COULDNT_CONNECT:
1118 case CURLE_OPERATION_TIMEDOUT:
1119 case CURLE_SEND_ERROR:
1120 case CURLE_RECV_ERROR:
1123 case CURLE_ABORTED_BY_CALLBACK:
1124 case CURLE_WRITE_ERROR:
1129 "unexpected curl error (%d) while trying to upload %s: %s",
1137 (info->
request == JobInfo::kReqHeadPut))
1141 info->
request = JobInfo::kReqPutCas;
1149 "Failed to initialize CURL handle "
1150 "(error: %d - %s | errno: %d)",
1151 init_failure,
Code2Ascii(init_failure), errno);
1153 SetUrlOptions(info);
1160 bool try_again =
false;
1162 try_again = CanRetry(info);
1165 if (info->
request == JobInfo::kReqPutCas ||
1166 info->
request == JobInfo::kReqPutDotCvmfs ||
1167 info->
request == JobInfo::kReqPutHtml) {
1193 S3FanoutManager::S3FanoutManager(
const S3Config &config) : config_(config) {
1201 reinterpret_cast<pthread_mutex_t *
>(smalloc(
sizeof(pthread_mutex_t)));
1205 reinterpret_cast<pthread_mutex_t *
>(smalloc(
sizeof(pthread_mutex_t)));
1221 *
user_agent_ =
"User-Agent: cvmfs " + string(VERSION);
1224 CURLcode cretval = curl_global_init(CURL_GLOBAL_ALL);
1225 assert(cretval == CURLE_OK);
1229 mretval = curl_multi_setopt(
curl_multi_, CURLMOPT_SOCKETFUNCTION,
1231 assert(mretval == CURLM_OK);
1232 mretval = curl_multi_setopt(
curl_multi_, CURLMOPT_SOCKETDATA,
1233 static_cast<void *>(
this));
1234 assert(mretval == CURLM_OK);
1235 mretval = curl_multi_setopt(
curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1237 assert(mretval == CURLM_OK);
1246 if ((getenv(
"CVMFS_IPV4_ONLY") != NULL) &&
1247 (strlen(getenv(
"CVMFS_IPV4_ONLY")) > 0)) {
1255 watch_fds_ =
static_cast<struct pollfd *
>(smalloc(4 *
sizeof(
struct pollfd)));
1280 for (; i != iEnd; ++i) {
1281 curl_easy_cleanup(*i);
1284 set<S3FanOutDnsEntry *>::iterator is =
sharehandles_->begin();
1285 const set<S3FanOutDnsEntry *>::const_iterator isEnd =
sharehandles_->end();
1286 for (; is != isEnd; ++is) {
1287 curl_share_cleanup((*is)->sharehandle);
1288 curl_slist_free_all((*is)->clist);
1306 curl_global_cleanup();
1316 static_cast<void *>(
this));
1355 "Transferred Bytes: " +
1357 "Transfer duration: " +
1359 "Number of requests: " +
1361 "Number of retries: " +
const char * Code2Ascii(const ObjectFetcherFailures::Failures error)
pthread_mutex_t * jobs_todo_lock_
atomic_int32 multi_threaded_
pthread_mutex_t * curl_handle_lock_
string Sha256String(const string &content)
std::set< CURL * > * pool_handles_idle_
struct curl_slist * http_headers
static size_t CallbackCurlBody(char *, size_t size, size_t nmemb, void *)
std::string IsoTimestamp()
dns::CaresResolver * resolver_
string Trim(const string &raw, bool trim_newline)
bool IsHttpUrl(const std::string &path)
void Hmac(const string &key, const unsigned char *buffer, const unsigned buffer_size, Any *any_digest)
perf::Statistics * statistics_
static void * MainUpload(void *data)
const std::string object_key
assert((mem||(size==0))&&"Out Of Memory")
const std::set< std::string > & ipv4_addresses() const
std::string MkCompleteHostname()
void PushNewJob(JobInfo *info)
SynchronizingCounter< uint32_t > Semaphore
std::string Print() const
bool Debase64(const string &data, string *decoded)
void MakePipe(int pipe_fd[2])
unsigned char digest[digest_size_]
struct curl_slist * clist
std::string * user_agent_
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, void *info_link)
unsigned int max_available_jobs_
std::string RfcTimestamp()
int64_t String2Int64(const string &value)
unsigned GetDigestSize() const
std::set< JobInfo * > * active_requests_
void PushCompletedJob(JobInfo *info)
string Sha256Mem(const unsigned char *buffer, const unsigned buffer_size)
vector< string > SplitString(const string &str, char delim)
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, void *info_link)
bool HasSuffix(const std::string &str, const std::string &suffix, const bool ignore_case)
void UseSystemCertificatePath()
void SetUrlOptions(JobInfo *info) const
CURL * AcquireCurlHandle() const
uint64_t throttle_timestamp
Semaphore * available_jobs_
static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, void *userp, void *socketp)
UniquePtr< FileBackedBuffer > origin
void ReleaseCurlHandle(JobInfo *info, CURL *handle) const
uint32_t pool_max_handles
string StringifyInt(const int64_t value)
struct pollfd * watch_fds_
std::string ExtractPort(const std::string &url)
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
JobInfo * PopCompletedJob()
std::string ExtractHost(const std::string &url)
uint32_t watch_fds_inuse_
unsigned char num_retries
void HashMem(const unsigned char *buffer, const unsigned buffer_size, Any *any_digest)
std::string complete_hostname_
string Base64(const string &data)
uint64_t String2Uint64(const string &value)
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
static CaresResolver * Create(const bool ipv4_only, const unsigned retries, const unsigned timeout_ms)
std::set< S3FanOutDnsEntry * > * sharehandles_
SslCertificateStore ssl_certificate_store_
const Statistics & GetStatistics()
std::string Hmac256(const std::string &key, const std::string &content, bool raw_output)
void SafeSleepMs(const unsigned ms)
void WritePipe(int fd, const void *buf, size_t nbyte)
uint64_t timestamp_last_throttle_report_
std::set< CURL * > * pool_handles_inuse_
void ReadPipe(int fd, void *buf, size_t nbyte)
void ClosePipe(int pipe_fd[2])
std::map< CURL *, S3FanOutDnsEntry * > * curl_sharehandles_
Failures InitializeRequest(JobInfo *info, CURL *handle) const
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)