CernVM-FS  2.9.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
s3fanout.cc
Go to the documentation of this file.
1 
7 #include <pthread.h>
8 
9 #include <algorithm>
10 #include <cassert>
11 #include <cerrno>
12 #include <utility>
13 
14 #include "cvmfs_config.h"
15 #include "platform.h"
16 #include "s3fanout.h"
17 #include "upload_facility.h"
18 #include "util/exception.h"
19 #include "util/posix.h"
20 #include "util/string.h"
21 #include "util_concurrency.h"
22 
23 using namespace std; // NOLINT
24 
25 namespace s3fanout {
26 
27 const char *S3FanoutManager::kCacheControlCas = "Cache-Control: max-age=259200";
28 const char *S3FanoutManager::kCacheControlDotCvmfs =
29  "Cache-Control: max-age=61";
30 const unsigned S3FanoutManager::kDefault429ThrottleMs = 250;
31 const unsigned S3FanoutManager::kMax429ThrottleMs = 10000;
32 const unsigned S3FanoutManager::kThrottleReportIntervalSec = 10;
33 const unsigned S3FanoutManager::kDefaultHTTPPort = 80;
34 
35 
39 void S3FanoutManager::DetectThrottleIndicator(
40  const std::string &header,
41  JobInfo *info)
42 {
43  std::string value_str;
44  if (HasPrefix(header, "retry-after:", true))
45  value_str = header.substr(12);
46  if (HasPrefix(header, "x-retry-in:", true))
47  value_str = header.substr(11);
48 
49  value_str = Trim(value_str, true /* trim_newline */);
50  if (!value_str.empty()) {
51  unsigned value_numeric = String2Uint64(value_str);
52  unsigned value_ms =
53  HasSuffix(value_str, "ms", true /* ignore_case */) ?
54  value_numeric : (value_numeric * 1000);
55  if (value_ms > 0)
56  info->throttle_ms = std::min(value_ms, kMax429ThrottleMs);
57  }
58 }
59 
60 
64 static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb,
65  void *info_link) {
66  const size_t num_bytes = size*nmemb;
67  const string header_line(static_cast<const char *>(ptr), num_bytes);
68  JobInfo *info = static_cast<JobInfo *>(info_link);
69 
70  // Check for http status code errors
71  if (HasPrefix(header_line, "HTTP/1.", false)) {
72  if (header_line.length() < 10)
73  return 0;
74 
75  unsigned i;
76  for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {}
77 
78  if (header_line[i] == '2') {
79  return num_bytes;
80  } else {
81  LogCvmfs(kLogS3Fanout, kLogDebug, "http status error code [info %p]: %s",
82  info, header_line.c_str());
83  if (header_line.length() < i+3) {
84  LogCvmfs(kLogS3Fanout, kLogStderr, "S3: invalid HTTP response '%s'",
85  header_line.c_str());
86  info->error_code = kFailOther;
87  return 0;
88  }
89  info->http_error = String2Int64(string(&header_line[i], 3));
90 
91  switch (info->http_error) {
92  case 429:
93  info->error_code = kFailRetry;
94  info->throttle_ms = S3FanoutManager::kDefault429ThrottleMs;
96  return num_bytes;
97  case 503:
98  case 502: // Can happen if the S3 gateway-backend connection breaks
100  break;
101  case 501:
102  case 400:
103  info->error_code = kFailBadRequest;
104  break;
105  case 403:
106  info->error_code = kFailForbidden;
107  break;
108  case 404:
109  info->error_code = kFailNotFound;
110  return num_bytes;
111  default:
112  info->error_code = kFailOther;
113  }
114  return 0;
115  }
116  }
117 
118  if (info->error_code == kFailRetry) {
119  S3FanoutManager::DetectThrottleIndicator(header_line, info);
120  }
121 
122  return num_bytes;
123 }
124 
125 
129 static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb,
130  void *info_link) {
131  const size_t num_bytes = size*nmemb;
132  JobInfo *info = static_cast<JobInfo *>(info_link);
133 
134  LogCvmfs(kLogS3Fanout, kLogDebug, "Data callback with %d bytes", num_bytes);
135 
136  if (num_bytes == 0)
137  return 0;
138 
139  uint64_t read_bytes = info->origin->Read(ptr, num_bytes);
140 
142  "source buffer pushed out %d bytes", read_bytes);
143 
144  return read_bytes;
145 }
146 
147 
151 static size_t CallbackCurlBody(
152  char * /*ptr*/, size_t size, size_t nmemb, void * /*userdata*/)
153 {
154  return size * nmemb;
155 }
156 
157 
161 int S3FanoutManager::CallbackCurlSocket(CURL *easy, curl_socket_t s, int action,
162  void *userp, void *socketp) {
163  S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(userp);
164  const int ajobs = *s3fanout_mgr->available_jobs_;
165  LogCvmfs(kLogS3Fanout, kLogDebug, "CallbackCurlSocket called with easy "
166  "handle %p, socket %d, action %d, up %d, "
167  "sp %d, fds_inuse %d, jobs %d",
168  easy, s, action, userp,
169  socketp, s3fanout_mgr->watch_fds_inuse_, ajobs);
170  if (action == CURL_POLL_NONE)
171  return 0;
172 
173  // Find s in watch_fds_
174  // First 2 fds are job and terminate pipes (not curl related)
175  unsigned index;
176  for (index = 2; index < s3fanout_mgr->watch_fds_inuse_; ++index) {
177  if (s3fanout_mgr->watch_fds_[index].fd == s)
178  break;
179  }
180  // Or create newly
181  if (index == s3fanout_mgr->watch_fds_inuse_) {
182  // Extend array if necessary
183  if (s3fanout_mgr->watch_fds_inuse_ == s3fanout_mgr->watch_fds_size_) {
184  s3fanout_mgr->watch_fds_size_ *= 2;
185  s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
186  srealloc(s3fanout_mgr->watch_fds_,
187  s3fanout_mgr->watch_fds_size_*sizeof(struct pollfd)));
188  }
189  s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].fd = s;
190  s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].events = 0;
191  s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].revents = 0;
192  s3fanout_mgr->watch_fds_inuse_++;
193  }
194 
195  switch (action) {
196  case CURL_POLL_IN:
197  s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI;
198  break;
199  case CURL_POLL_OUT:
200  s3fanout_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND;
201  break;
202  case CURL_POLL_INOUT:
203  s3fanout_mgr->watch_fds_[index].events =
204  POLLIN | POLLPRI | POLLOUT | POLLWRBAND;
205  break;
206  case CURL_POLL_REMOVE:
207  if (index < s3fanout_mgr->watch_fds_inuse_-1)
208  s3fanout_mgr->watch_fds_[index] =
209  s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_-1];
210  s3fanout_mgr->watch_fds_inuse_--;
211  // Shrink array if necessary
212  if ((s3fanout_mgr->watch_fds_inuse_ > s3fanout_mgr->watch_fds_max_) &&
213  (s3fanout_mgr->watch_fds_inuse_ < s3fanout_mgr->watch_fds_size_/2)) {
214  s3fanout_mgr->watch_fds_size_ /= 2;
215  s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
216  srealloc(s3fanout_mgr->watch_fds_,
217  s3fanout_mgr->watch_fds_size_*sizeof(struct pollfd)));
218  }
219  break;
220  default:
221  PANIC(NULL);
222  }
223 
224  return 0;
225 }
226 
227 
231 void *S3FanoutManager::MainUpload(void *data) {
232  LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread started");
233  S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(data);
234 
235  s3fanout_mgr->InitPipeWatchFds();
236 
237  // Don't schedule more jobs into the multi handle than the maximum number of
238  // parallel connections. This should prevent starvation and thus a timeout
239  // of the authorization header (CVM-1339).
240  unsigned jobs_in_flight = 0;
241 
242  while (true) {
243  // Check events with 100ms timeout
244  int timeout_ms = 100;
245  int retval = poll(s3fanout_mgr->watch_fds_, s3fanout_mgr->watch_fds_inuse_,
246  timeout_ms);
247  if (retval == 0) {
248  // Handle timeout
249  int still_running = 0;
250  retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
251  CURL_SOCKET_TIMEOUT,
252  0,
253  &still_running);
254  if (retval != CURLM_OK) {
255  LogCvmfs(kLogS3Fanout, kLogStderr, "Error, timeout due to: %d", retval);
256  assert(retval == CURLM_OK);
257  }
258  } else if (retval < 0) {
259  assert(errno == EINTR);
260  continue;
261  }
262 
263  // Terminate I/O thread
264  if (s3fanout_mgr->watch_fds_[0].revents)
265  break;
266 
267  // New job incoming
268  if (s3fanout_mgr->watch_fds_[1].revents) {
269  s3fanout_mgr->watch_fds_[1].revents = 0;
270  JobInfo *info;
271  ReadPipe(s3fanout_mgr->pipe_jobs_[0], &info, sizeof(info));
272  CURL *handle = s3fanout_mgr->AcquireCurlHandle();
273  if (handle == NULL) {
274  PANIC(kLogStderr, "Failed to acquire CURL handle.");
275  }
276  s3fanout::Failures init_failure =
277  s3fanout_mgr->InitializeRequest(info, handle);
278  if (init_failure != s3fanout::kFailOk) {
280  "Failed to initialize CURL handle (error: %d - %s | errno: %d)",
281  init_failure, Code2Ascii(init_failure), errno);
282  }
283  s3fanout_mgr->SetUrlOptions(info);
284 
285  curl_multi_add_handle(s3fanout_mgr->curl_multi_, handle);
286  s3fanout_mgr->active_requests_->insert(info);
287  jobs_in_flight++;
288  int still_running = 0, retval = 0;
289  retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
290  CURL_SOCKET_TIMEOUT,
291  0,
292  &still_running);
293 
295  "curl_multi_socket_action: %d - %d",
296  retval, still_running);
297  }
298 
299 
300  // Activity on curl sockets
301  // Within this loop the curl_multi_socket_action() may cause socket(s)
302  // to be removed from watch_fds_. If a socket is removed it is replaced
303  // by the socket at the end of the array and the inuse count is decreased.
304  // Therefore loop over the array in reverse order.
305  // First 2 fds are job and terminate pipes (not curl related)
306  for (int32_t i = s3fanout_mgr->watch_fds_inuse_ - 1; i >= 2; --i) {
307  if (static_cast<uint32_t>(i) >= s3fanout_mgr->watch_fds_inuse_) {
308  continue;
309  }
310  if (s3fanout_mgr->watch_fds_[i].revents) {
311  int ev_bitmask = 0;
312  if (s3fanout_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI))
313  ev_bitmask |= CURL_CSELECT_IN;
314  if (s3fanout_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
315  ev_bitmask |= CURL_CSELECT_OUT;
316  if (s3fanout_mgr->watch_fds_[i].revents &
317  (POLLERR | POLLHUP | POLLNVAL))
318  ev_bitmask |= CURL_CSELECT_ERR;
319  s3fanout_mgr->watch_fds_[i].revents = 0;
320 
321  int still_running = 0;
322  retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
323  s3fanout_mgr->watch_fds_[i].fd,
324  ev_bitmask,
325  &still_running);
326  }
327  }
328 
329  // Check if transfers are completed
330  CURLMsg *curl_msg;
331  int msgs_in_queue;
332  while ((curl_msg = curl_multi_info_read(s3fanout_mgr->curl_multi_,
333  &msgs_in_queue)))
334  {
335  assert(curl_msg->msg == CURLMSG_DONE);
336 
337  s3fanout_mgr->statistics_->num_requests++;
338  JobInfo *info;
339  CURL *easy_handle = curl_msg->easy_handle;
340  int curl_error = curl_msg->data.result;
341  curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
342 
343  curl_multi_remove_handle(s3fanout_mgr->curl_multi_, easy_handle);
344  if (s3fanout_mgr->VerifyAndFinalize(curl_error, info)) {
345  curl_multi_add_handle(s3fanout_mgr->curl_multi_, easy_handle);
346  int still_running = 0;
347  curl_multi_socket_action(s3fanout_mgr->curl_multi_,
348  CURL_SOCKET_TIMEOUT,
349  0,
350  &still_running);
351  } else {
352  // Return easy handle into pool and write result back
353  jobs_in_flight--;
354  s3fanout_mgr->active_requests_->erase(info);
355  s3fanout_mgr->ReleaseCurlHandle(info, easy_handle);
356  s3fanout_mgr->available_jobs_->Decrement();
357 
358  // Add to list of completed jobs
359  s3fanout_mgr->PushCompletedJob(info);
360  }
361  }
362  }
363 
364  set<CURL *>::iterator i = s3fanout_mgr->pool_handles_inuse_->begin();
365  const set<CURL *>::const_iterator i_end =
366  s3fanout_mgr->pool_handles_inuse_->end();
367  for (; i != i_end; ++i) {
368  curl_multi_remove_handle(s3fanout_mgr->curl_multi_, *i);
369  curl_easy_cleanup(*i);
370  }
371  s3fanout_mgr->pool_handles_inuse_->clear();
372  free(s3fanout_mgr->watch_fds_);
373 
374  LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread terminated");
375  return NULL;
376 }
377 
378 
383 CURL *S3FanoutManager::AcquireCurlHandle() const {
384  CURL *handle;
385 
386  MutexLockGuard guard(curl_handle_lock_);
387 
388  if (pool_handles_idle_->empty()) {
389  CURLcode retval;
390 
391  // Create a new handle
392  handle = curl_easy_init();
393  assert(handle != NULL);
394 
395  // Other settings
396  retval = curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
397  assert(retval == CURLE_OK);
398  retval = curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION,
400  assert(retval == CURLE_OK);
401  retval = curl_easy_setopt(handle, CURLOPT_READFUNCTION, CallbackCurlData);
402  assert(retval == CURLE_OK);
403  retval = curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlBody);
404  assert(retval == CURLE_OK);
405  } else {
406  handle = *(pool_handles_idle_->begin());
407  pool_handles_idle_->erase(pool_handles_idle_->begin());
408  }
409 
410  pool_handles_inuse_->insert(handle);
411 
412  return handle;
413 }
414 
415 
416 void S3FanoutManager::ReleaseCurlHandle(JobInfo *info, CURL *handle) const {
417  if (info->http_headers) {
418  curl_slist_free_all(info->http_headers);
419  info->http_headers = NULL;
420  }
421 
422  MutexLockGuard guard(curl_handle_lock_);
423 
424  set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
425  assert(elem != pool_handles_inuse_->end());
426 
427  if (pool_handles_idle_->size() > config_.pool_max_handles) {
428  CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, NULL);
429  assert(retval == CURLE_OK);
430  curl_easy_cleanup(handle);
431  std::map<CURL *, S3FanOutDnsEntry *>::size_type retitems =
432  curl_sharehandles_->erase(handle);
433  assert(retitems == 1);
434  } else {
435  pool_handles_idle_->insert(handle);
436  }
437 
438  pool_handles_inuse_->erase(elem);
439 }
440 
441 void S3FanoutManager::InitPipeWatchFds() {
442  assert(watch_fds_inuse_ == 0);
443  assert(watch_fds_size_ >= 2);
444  watch_fds_[0].fd = pipe_terminate_[0];
445  watch_fds_[0].events = POLLIN | POLLPRI;
446  watch_fds_[0].revents = 0;
447  ++watch_fds_inuse_;
448  watch_fds_[1].fd = pipe_jobs_[0];
449  watch_fds_[1].events = POLLIN | POLLPRI;
450  watch_fds_[1].revents = 0;
451  ++watch_fds_inuse_;
452 }
453 
458 bool S3FanoutManager::MkV2Authz(const JobInfo &info, vector<string> *headers)
459  const
460 {
461  string payload_hash;
462  bool retval = MkPayloadHash(info, &payload_hash);
463  if (!retval)
464  return false;
465  string content_type = GetContentType(info);
466  string request = GetRequestString(info);
467 
468  string timestamp = RfcTimestamp();
469  string to_sign = request + "\n" +
470  payload_hash + "\n" +
471  content_type + "\n" +
472  timestamp + "\n" +
473  "x-amz-acl:public-read" + "\n" + // default ACL
474  "/" + config_.bucket + "/" + info.object_key;
475  LogCvmfs(kLogS3Fanout, kLogDebug, "%s string to sign for: %s",
476  request.c_str(), info.object_key.c_str());
477 
478  shash::Any hmac;
479  hmac.algorithm = shash::kSha1;
480  shash::Hmac(config_.secret_key,
481  reinterpret_cast<const unsigned char *>(to_sign.data()),
482  to_sign.length(), &hmac);
483 
484  headers->push_back("Authorization: AWS " + config_.access_key + ":" +
485  Base64(string(reinterpret_cast<char *>(hmac.digest),
486  hmac.GetDigestSize())));
487  headers->push_back("Date: " + timestamp);
488  headers->push_back("X-Amz-Acl: public-read");
489  if (!payload_hash.empty())
490  headers->push_back("Content-MD5: " + payload_hash);
491  if (!content_type.empty())
492  headers->push_back("Content-Type: " + content_type);
493  return true;
494 }
495 
496 
497 string S3FanoutManager::GetUriEncode(const string &val, bool encode_slash)
498  const
499 {
500  string result;
501  const unsigned len = val.length();
502  result.reserve(len);
503  for (unsigned i = 0; i < len; ++i) {
504  char c = val[i];
505  if ((c >= 'A' && c <= 'Z') ||
506  (c >= 'a' && c <= 'z') ||
507  (c >= '0' && c <= '9') ||
508  c == '_' || c == '-' || c == '~' || c == '.')
509  {
510  result.push_back(c);
511  } else if (c == '/') {
512  if (encode_slash) {
513  result += "%2F";
514  } else {
515  result.push_back(c);
516  }
517  } else {
518  result.push_back('%');
519  result.push_back((c / 16) + ((c / 16 <= 9) ? '0' : 'A'-10));
520  result.push_back((c % 16) + ((c % 16 <= 9) ? '0' : 'A'-10));
521  }
522  }
523  return result;
524 }
525 
526 
527 string S3FanoutManager::GetAwsV4SigningKey(const string &date) const
528 {
529  if (last_signing_key_.first == date)
530  return last_signing_key_.second;
531 
532  string date_key = shash::Hmac256("AWS4" + config_.secret_key, date, true);
533  string date_region_key = shash::Hmac256(date_key, config_.region, true);
534  string date_region_service_key = shash::Hmac256(date_region_key, "s3", true);
535  string signing_key =
536  shash::Hmac256(date_region_service_key, "aws4_request", true);
537  last_signing_key_.first = date;
538  last_signing_key_.second = signing_key;
539  return signing_key;
540 }
541 
542 
547 bool S3FanoutManager::MkV4Authz(const JobInfo &info, vector<string> *headers)
548  const
549 {
550  string payload_hash;
551  bool retval = MkPayloadHash(info, &payload_hash);
552  if (!retval)
553  return false;
554  string content_type = GetContentType(info);
555  string timestamp = IsoTimestamp();
556  string date = timestamp.substr(0, 8);
557  vector<string> tokens = SplitString(complete_hostname_, ':');
558  assert(tokens.size() <= 2);
559  string canonical_hostname = tokens[0];
560  if (tokens.size() == 2 && String2Uint64(tokens[1]) != kDefaultHTTPPort)
561  canonical_hostname += ":" + tokens[1];
562 
563  string signed_headers;
564  string canonical_headers;
565  if (!content_type.empty()) {
566  signed_headers += "content-type;";
567  headers->push_back("Content-Type: " + content_type);
568  canonical_headers += "content-type:" + content_type + "\n";
569  }
570  signed_headers += "host;x-amz-acl;x-amz-content-sha256;x-amz-date";
571  canonical_headers +=
572  "host:" + canonical_hostname + "\n" +
573  "x-amz-acl:public-read\n"
574  "x-amz-content-sha256:" + payload_hash + "\n" +
575  "x-amz-date:" + timestamp + "\n";
576 
577  string scope = date + "/" + config_.region + "/s3/aws4_request";
578  string uri = config_.dns_buckets ?
579  (string("/") + info.object_key) :
580  (string("/") + config_.bucket + "/" + info.object_key);
581 
582  string canonical_request =
583  GetRequestString(info) + "\n" +
584  GetUriEncode(uri, false) + "\n" +
585  "\n" +
586  canonical_headers + "\n" +
587  signed_headers + "\n" +
588  payload_hash;
589 
590  string hash_request = shash::Sha256String(canonical_request.c_str());
591 
592  string string_to_sign =
593  "AWS4-HMAC-SHA256\n" +
594  timestamp + "\n" +
595  scope + "\n" +
596  hash_request;
597 
598  string signing_key = GetAwsV4SigningKey(date);
599  string signature = shash::Hmac256(signing_key, string_to_sign);
600 
601  headers->push_back("X-Amz-Acl: public-read");
602  headers->push_back("X-Amz-Content-Sha256: " + payload_hash);
603  headers->push_back("X-Amz-Date: " + timestamp);
604  headers->push_back(
605  "Authorization: AWS4-HMAC-SHA256 "
606  "Credential=" + config_.access_key + "/" + scope + ","
607  "SignedHeaders=" + signed_headers + ","
608  "Signature=" + signature);
609  return true;
610 }
611 
616 bool S3FanoutManager::MkAzureAuthz(const JobInfo &info, vector<string> *headers)
617  const
618 {
619  string timestamp = RfcTimestamp();
620  string canonical_headers =
621  "x-ms-blob-type:BlockBlob\nx-ms-date:" +
622  timestamp +
623  "\nx-ms-version:2011-08-18";
624  string canonical_resource =
625  "/" + config_.access_key + "/" + config_.bucket + "/" + info.object_key;
626 
627  string string_to_sign;
628  if ((info.request == JobInfo::kReqHeadOnly) ||
629  (info.request == JobInfo::kReqHeadPut) ||
630  (info.request == JobInfo::kReqDelete)) {
631  string_to_sign =
632  GetRequestString(info) +
633  string("\n\n\n") +
634  "\n\n\n\n\n\n\n\n\n" +
635  canonical_headers + "\n" +
636  canonical_resource;
637  } else {
638  string_to_sign =
639  GetRequestString(info) +
640  string("\n\n\n") +
641  string(StringifyInt(info.origin->GetSize())) + "\n\n\n\n\n\n\n\n\n" +
642  canonical_headers + "\n" +
643  canonical_resource;
644  }
645 
646  string signing_key;
647  int retval = Debase64(config_.secret_key, &signing_key);
648  if (!retval)
649  return false;
650 
651  string signature = shash::Hmac256(signing_key, string_to_sign, true);
652 
653  headers->push_back("x-ms-date: " + timestamp);
654  headers->push_back("x-ms-version: 2011-08-18");
655  headers->push_back(
656  "Authorization: SharedKey " + config_.access_key + ":" + Base64(signature));
657  headers->push_back("x-ms-blob-type: BlockBlob");
658  return true;
659 }
660 
661 void S3FanoutManager::InitializeDnsSettingsCurl(
662  CURL *handle,
663  CURLSH *sharehandle,
664  curl_slist *clist) const
665 {
666  CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, sharehandle);
667  assert(retval == CURLE_OK);
668  retval = curl_easy_setopt(handle, CURLOPT_RESOLVE, clist);
669  assert(retval == CURLE_OK);
670 }
671 
672 
673 int S3FanoutManager::InitializeDnsSettings(
674  CURL *handle,
675  std::string host_with_port) const
676 {
677  // Use existing handle
678  std::map<CURL *, S3FanOutDnsEntry *>::const_iterator it =
679  curl_sharehandles_->find(handle);
680  if (it != curl_sharehandles_->end()) {
681  InitializeDnsSettingsCurl(handle, it->second->sharehandle,
682  it->second->clist);
683  return 0;
684  }
685 
686  // Remove port number if such exists
687  if (!HasPrefix(host_with_port, "http://", false /*ignore_case*/))
688  host_with_port = "http://" + host_with_port;
689  std::string remote_host = dns::ExtractHost(host_with_port);
690  std::string remote_port = dns::ExtractPort(host_with_port);
691 
692  // If we have the name already resolved, use the least used IP
693  S3FanOutDnsEntry *useme = NULL;
694  unsigned int usemin = UINT_MAX;
695  std::set<S3FanOutDnsEntry *>::iterator its3 = sharehandles_->begin();
696  for (; its3 != sharehandles_->end(); ++its3) {
697  if ((*its3)->dns_name == remote_host) {
698  if (usemin >= (*its3)->counter) {
699  usemin = (*its3)->counter;
700  useme = (*its3);
701  }
702  }
703  }
704  if (useme != NULL) {
705  curl_sharehandles_->insert(std::pair<CURL *,
706  S3FanOutDnsEntry *>(handle, useme));
707  useme->counter++;
708  InitializeDnsSettingsCurl(handle, useme->sharehandle, useme->clist);
709  return 0;
710  }
711 
712  // We need to resolve the hostname
713  // TODO(ssheikki): support ipv6 also... if (opt_ipv4_only_)
714  dns::Host host = resolver_->Resolve(remote_host);
715  set<string> ipv4_addresses = host.ipv4_addresses();
716  std::set<string>::iterator its = ipv4_addresses.begin();
717  S3FanOutDnsEntry *dnse = NULL;
718  for ( ; its != ipv4_addresses.end(); ++its) {
719  dnse = new S3FanOutDnsEntry();
720  dnse->counter = 0;
721  dnse->dns_name = remote_host;
722  dnse->port = remote_port.size() == 0 ? "80" : remote_port;
723  dnse->ip = *its;
724  dnse->clist = NULL;
725  dnse->clist = curl_slist_append(dnse->clist,
726  (dnse->dns_name+":"+
727  dnse->port+":"+
728  dnse->ip).c_str());
729  dnse->sharehandle = curl_share_init();
730  assert(dnse->sharehandle != NULL);
731  CURLSHcode share_retval = curl_share_setopt(dnse->sharehandle,
732  CURLSHOPT_SHARE,
733  CURL_LOCK_DATA_DNS);
734  assert(share_retval == CURLSHE_OK);
735  sharehandles_->insert(dnse);
736  }
737  if (dnse == NULL) {
739  "Error: DNS resolve failed for address '%s'.",
740  remote_host.c_str());
741  assert(dnse != NULL);
742  return -1;
743  }
744  curl_sharehandles_->insert(
745  std::pair<CURL *, S3FanOutDnsEntry *>(handle, dnse));
746  dnse->counter++;
747  InitializeDnsSettingsCurl(handle, dnse->sharehandle, dnse->clist);
748 
749  return 0;
750 }
751 
752 
753 bool S3FanoutManager::MkPayloadHash(const JobInfo &info, string *hex_hash)
754  const
755 {
756  if ((info.request == JobInfo::kReqHeadOnly) ||
757  (info.request == JobInfo::kReqHeadPut) ||
758  (info.request == JobInfo::kReqDelete))
759  {
760  switch (config_.authz_method) {
761  case kAuthzAwsV2:
762  hex_hash->clear();
763  break;
764  case kAuthzAwsV4:
765  // Sha256 over empty string
766  *hex_hash =
767  "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
768  break;
769  case kAuthzAzure:
770  // no payload hash required for Azure signature
771  hex_hash->clear();
772  break;
773  default:
774  PANIC(NULL);
775  }
776  return true;
777  }
778 
779  // PUT, there is actually payload
780  shash::Any payload_hash(shash::kMd5);
781 
782  unsigned char *data;
783  unsigned int nbytes =
784  info.origin->Data(reinterpret_cast<void **>(&data),
785  info.origin->GetSize(), 0);
786  assert(nbytes == info.origin->GetSize());
787 
788  switch (config_.authz_method) {
789  case kAuthzAwsV2:
790  shash::HashMem(data, nbytes, &payload_hash);
791  *hex_hash =
792  Base64(string(reinterpret_cast<char *>(payload_hash.digest),
793  payload_hash.GetDigestSize()));
794  return true;
795  case kAuthzAwsV4:
796  *hex_hash =
797  shash::Sha256Mem(data, nbytes);
798  return true;
799  case kAuthzAzure:
800  // no payload hash required for Azure signature
801  hex_hash->clear();
802  return true;
803  default:
804  PANIC(NULL);
805  }
806 }
807 
808 string S3FanoutManager::GetRequestString(const JobInfo &info) const {
809  switch (info.request) {
810  case JobInfo::kReqHeadOnly:
811  case JobInfo::kReqHeadPut:
812  return "HEAD";
813  case JobInfo::kReqPutCas:
814  case JobInfo::kReqPutDotCvmfs:
815  case JobInfo::kReqPutHtml:
816  case JobInfo::kReqPutBucket:
817  return "PUT";
818  case JobInfo::kReqDelete:
819  return "DELETE";
820  default:
821  PANIC(NULL);
822  }
823 }
824 
825 
826 string S3FanoutManager::GetContentType(const JobInfo &info) const {
827  switch (info.request) {
828  case JobInfo::kReqHeadOnly:
829  case JobInfo::kReqHeadPut:
830  case JobInfo::kReqDelete:
831  return "";
832  case JobInfo::kReqPutCas:
833  return "application/octet-stream";
834  case JobInfo::kReqPutDotCvmfs:
835  return "application/x-cvmfs";
836  case JobInfo::kReqPutHtml:
837  return "text/html";
838  case JobInfo::kReqPutBucket:
839  return "text/xml";
840  default:
841  PANIC(NULL);
842  }
843 }
844 
845 
850 Failures S3FanoutManager::InitializeRequest(JobInfo *info, CURL *handle) const {
851  // Initialize internal download state
852  info->curl_handle = handle;
853  info->error_code = kFailOk;
854  info->http_error = 0;
855  info->num_retries = 0;
856  info->backoff_ms = 0;
857  info->throttle_ms = 0;
858  info->throttle_timestamp = 0;
859  info->http_headers = NULL;
860  // info->payload_size is needed in S3Uploader::MainCollectResults,
861  // where info->origin is already destroyed.
862  info->payload_size = info->origin->GetSize();
863 
864  InitializeDnsSettings(handle, complete_hostname_);
865 
866  CURLcode retval;
867  if ((info->request == JobInfo::kReqHeadOnly) ||
868  (info->request == JobInfo::kReqHeadPut) ||
869  (info->request == JobInfo::kReqDelete))
870  {
871  retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 0);
872  assert(retval == CURLE_OK);
873  retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
874  assert(retval == CURLE_OK);
875 
876  if (info->request == JobInfo::kReqDelete)
877  {
878  retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST,
879  GetRequestString(*info).c_str());
880  assert(retval == CURLE_OK);
881  } else {
882  retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
883  assert(retval == CURLE_OK);
884  }
885  } else {
886  retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
887  assert(retval == CURLE_OK);
888  retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 1);
889  assert(retval == CURLE_OK);
890  retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 0);
891  assert(retval == CURLE_OK);
892  retval = curl_easy_setopt(handle, CURLOPT_INFILESIZE_LARGE,
893  static_cast<curl_off_t>(info->origin->GetSize()));
894  assert(retval == CURLE_OK);
895 
896  if (info->request == JobInfo::kReqPutDotCvmfs) {
897  info->http_headers =
898  curl_slist_append(info->http_headers, kCacheControlDotCvmfs);
899  } else if (info->request == JobInfo::kReqPutCas) {
900  info->http_headers =
901  curl_slist_append(info->http_headers, kCacheControlCas);
902  }
903  }
904 
905  bool retval_b;
906 
907  // Authorization
908  vector<string> authz_headers;
909  switch (config_.authz_method) {
910  case kAuthzAwsV2:
911  retval_b = MkV2Authz(*info, &authz_headers);
912  break;
913  case kAuthzAwsV4:
914  retval_b = MkV4Authz(*info, &authz_headers);
915  break;
916  case kAuthzAzure:
917  retval_b = MkAzureAuthz(*info, &authz_headers);
918  break;
919  default:
920  PANIC(NULL);
921  }
922  if (!retval_b)
923  return kFailLocalIO;
924  for (unsigned i = 0; i < authz_headers.size(); ++i) {
925  info->http_headers =
926  curl_slist_append(info->http_headers, authz_headers[i].c_str());
927  }
928 
929  // Common headers
930  info->http_headers =
931  curl_slist_append(info->http_headers, "Connection: Keep-Alive");
932  info->http_headers = curl_slist_append(info->http_headers, "Pragma:");
933  // No 100-continue
934  info->http_headers = curl_slist_append(info->http_headers, "Expect:");
935  // Strip unnecessary header
936  info->http_headers = curl_slist_append(info->http_headers, "Accept:");
937  info->http_headers = curl_slist_append(info->http_headers,
938  user_agent_->c_str());
939 
940  // Set curl parameters
941  retval = curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
942  assert(retval == CURLE_OK);
943  retval = curl_easy_setopt(handle, CURLOPT_HEADERDATA,
944  static_cast<void *>(info));
945  assert(retval == CURLE_OK);
946  retval = curl_easy_setopt(handle, CURLOPT_READDATA,
947  static_cast<void *>(info));
948  assert(retval == CURLE_OK);
949  retval = curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->http_headers);
950  assert(retval == CURLE_OK);
951  if (opt_ipv4_only_) {
952  retval = curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
953  assert(retval == CURLE_OK);
954  }
955  // Follow HTTP redirects
956  retval = curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L);
957  assert(retval == CURLE_OK);
958 
959  return kFailOk;
960 }
961 
962 
966 void S3FanoutManager::SetUrlOptions(JobInfo *info) const {
967  CURL *curl_handle = info->curl_handle;
968  CURLcode retval;
969 
970  retval = curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT,
971  config_.opt_timeout_sec);
972  assert(retval == CURLE_OK);
973  retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT,
974  kLowSpeedLimit);
975  assert(retval == CURLE_OK);
976  retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME,
977  config_.opt_timeout_sec);
978  assert(retval == CURLE_OK);
979 
980  if (is_curl_debug_) {
981  retval = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE, 1);
982  assert(retval == CURLE_OK);
983  }
984 
985  string url = MkUrl(info->object_key);
986  retval = curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
987  assert(retval == CURLE_OK);
988 }
989 
990 
994 void S3FanoutManager::UpdateStatistics(CURL *handle) {
995  double val;
996 
997  if (curl_easy_getinfo(handle, CURLINFO_SIZE_UPLOAD, &val) == CURLE_OK)
998  statistics_->transferred_bytes += val;
999 }
1000 
1001 
1005 bool S3FanoutManager::CanRetry(const JobInfo *info) {
1006  return
1007  (info->error_code == kFailHostConnection ||
1008  info->error_code == kFailHostResolve ||
1010  info->error_code == kFailRetry) &&
1011  (info->num_retries < config_.opt_max_retries);
1012 }
1013 
1014 
1020 void S3FanoutManager::Backoff(JobInfo *info) {
1021  if (info->error_code != kFailRetry)
1022  info->num_retries++;
1023  statistics_->num_retries++;
1024 
1025  if (info->throttle_ms > 0) {
1026  LogCvmfs(kLogS3Fanout, kLogDebug, "throttling for %d ms",
1027  info->throttle_ms);
1028  uint64_t now = platform_monotonic_time();
1029  if ((info->throttle_timestamp + (info->throttle_ms / 1000)) >= now) {
1030  if ((now - timestamp_last_throttle_report_) > kThrottleReportIntervalSec)
1031  {
1033  "Warning: S3 backend throttling %ums "
1034  "(total backoff time so far %ums)",
1035  info->throttle_ms,
1036  statistics_->ms_throttled);
1037  timestamp_last_throttle_report_ = now;
1038  }
1039  statistics_->ms_throttled += info->throttle_ms;
1040  SafeSleepMs(info->throttle_ms);
1041  }
1042  } else {
1043  if (info->backoff_ms == 0) {
1044  // Must be != 0
1045  info->backoff_ms = prng_.Next(config_.opt_backoff_init_ms + 1);
1046  } else {
1047  info->backoff_ms *= 2;
1048  }
1049  if (info->backoff_ms > config_.opt_backoff_max_ms)
1050  info->backoff_ms = config_.opt_backoff_max_ms;
1051 
1052  LogCvmfs(kLogS3Fanout, kLogDebug, "backing off for %d ms",
1053  info->backoff_ms);
1054  SafeSleepMs(info->backoff_ms);
1055  }
1056 }
1057 
1058 
1065 bool S3FanoutManager::VerifyAndFinalize(const int curl_error, JobInfo *info) {
1066  LogCvmfs(kLogS3Fanout, kLogDebug, "Verify uploaded/tested object %s "
1067  "(curl error %d, info error %d, info request %d)",
1068  info->object_key.c_str(),
1069  curl_error, info->error_code, info->request);
1070  UpdateStatistics(info->curl_handle);
1071 
1072  // Verification and error classification
1073  switch (curl_error) {
1074  case CURLE_OK:
1075  if ((info->error_code != kFailRetry) &&
1076  (info->error_code != kFailNotFound))
1077  {
1078  info->error_code = kFailOk;
1079  }
1080  break;
1081  case CURLE_UNSUPPORTED_PROTOCOL:
1082  case CURLE_URL_MALFORMAT:
1083  info->error_code = kFailBadRequest;
1084  break;
1085  case CURLE_COULDNT_RESOLVE_HOST:
1086  info->error_code = kFailHostResolve;
1087  break;
1088  case CURLE_COULDNT_CONNECT:
1089  case CURLE_OPERATION_TIMEDOUT:
1090  case CURLE_SEND_ERROR:
1091  case CURLE_RECV_ERROR:
1093  break;
1094  case CURLE_ABORTED_BY_CALLBACK:
1095  case CURLE_WRITE_ERROR:
1096  // Error set by callback
1097  break;
1098  default:
1100  "unexpected curl error (%d) while trying to upload %s",
1101  curl_error, info->object_key.c_str());
1102  info->error_code = kFailOther;
1103  break;
1104  }
1105 
1106  // Transform HEAD to PUT request
1107  if ((info->error_code == kFailNotFound) &&
1108  (info->request == JobInfo::kReqHeadPut))
1109  {
1110  LogCvmfs(kLogS3Fanout, kLogDebug, "not found: %s, uploading",
1111  info->object_key.c_str());
1112  info->request = JobInfo::kReqPutCas;
1113  curl_slist_free_all(info->http_headers);
1114  info->http_headers = NULL;
1115  s3fanout::Failures init_failure = InitializeRequest(info,
1116  info->curl_handle);
1117 
1118  if (init_failure != s3fanout::kFailOk) {
1119  PANIC(kLogStderr,
1120  "Failed to initialize CURL handle "
1121  "(error: %d - %s | errno: %d)",
1122  init_failure, Code2Ascii(init_failure), errno);
1123  }
1124  SetUrlOptions(info);
1125  // Reset origin
1126  info->origin->Rewind();
1127  return true; // Again, Put
1128  }
1129 
1130  // Determination if failed request should be repeated
1131  bool try_again = false;
1132  if (info->error_code != kFailOk) {
1133  try_again = CanRetry(info);
1134  }
1135  if (try_again) {
1136  if (info->request == JobInfo::kReqPutCas ||
1137  info->request == JobInfo::kReqPutDotCvmfs ||
1138  info->request == JobInfo::kReqPutHtml) {
1139  LogCvmfs(kLogS3Fanout, kLogDebug, "Trying again to upload %s",
1140  info->object_key.c_str());
1141  // Reset origin
1142  info->origin->Rewind();
1143  }
1144  Backoff(info);
1145  info->error_code = kFailOk;
1146  info->http_error = 0;
1147  info->throttle_ms = 0;
1148  info->backoff_ms = 0;
1149  info->throttle_timestamp = 0;
1150  return true; // try again
1151  }
1152 
1153  // Cleanup opened resources
1154  info->origin.Destroy();
1155 
1156  if ((info->error_code != kFailOk) &&
1157  (info->http_error != 0) && (info->http_error != 404))
1158  {
1159  LogCvmfs(kLogS3Fanout, kLogStderr, "S3: HTTP failure %d", info->http_error);
1160  }
1161  return false; // stop transfer
1162 }
1163 
1164 S3FanoutManager::S3FanoutManager(const S3Config &config) : config_(config) {
1165  atomic_init32(&multi_threaded_);
1169 
1170  int retval;
1171  jobs_todo_lock_ =
1172  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1173  retval = pthread_mutex_init(jobs_todo_lock_, NULL);
1174  assert(retval == 0);
1176  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1177  retval = pthread_mutex_init(curl_handle_lock_, NULL);
1178  assert(retval == 0);
1179 
1180  active_requests_ = new set<JobInfo *>;
1181  pool_handles_idle_ = new set<CURL *>;
1182  pool_handles_inuse_ = new set<CURL *>;
1183  curl_sharehandles_ = new map<CURL *, S3FanOutDnsEntry *>;
1184  sharehandles_ = new set<S3FanOutDnsEntry *>;
1188  assert(NULL != available_jobs_);
1189 
1190  statistics_ = new Statistics();
1191  user_agent_ = new string();
1192  *user_agent_ = "User-Agent: cvmfs " + string(VERSION);
1194 
1195  CURLcode cretval = curl_global_init(CURL_GLOBAL_ALL);
1196  assert(cretval == CURLE_OK);
1197  curl_multi_ = curl_multi_init();
1198  assert(curl_multi_ != NULL);
1199  CURLMcode mretval;
1200  mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION,
1202  assert(mretval == CURLM_OK);
1203  mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1204  static_cast<void *>(this));
1205  assert(mretval == CURLM_OK);
1206  mretval = curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1208  assert(mretval == CURLM_OK);
1209 
1210  prng_.InitLocaltime();
1211 
1212  thread_upload_ = 0;
1214  is_curl_debug_ = (getenv("_CVMFS_CURL_DEBUG") != NULL);
1215 
1216  // Parsing environment variables
1217  if ((getenv("CVMFS_IPV4_ONLY") != NULL) &&
1218  (strlen(getenv("CVMFS_IPV4_ONLY")) > 0)) {
1219  opt_ipv4_only_ = true;
1220  } else {
1221  opt_ipv4_only_ = false;
1222  }
1223 
1225 
1226  watch_fds_ = static_cast<struct pollfd *>(smalloc(4 * sizeof(struct pollfd)));
1227  watch_fds_size_ = 4;
1228  watch_fds_inuse_ = 0;
1229 }
1230 
1232  pthread_mutex_destroy(jobs_todo_lock_);
1233  free(jobs_todo_lock_);
1234  pthread_mutex_destroy(curl_handle_lock_);
1235  free(curl_handle_lock_);
1236 
1237  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1238  // Shutdown I/O thread
1239  char buf = 'T';
1240  WritePipe(pipe_terminate_[1], &buf, 1);
1241  pthread_join(thread_upload_, NULL);
1242  }
1246 
1247  set<CURL *>::iterator i = pool_handles_idle_->begin();
1248  const set<CURL *>::const_iterator iEnd = pool_handles_idle_->end();
1249  for (; i != iEnd; ++i) {
1250  curl_easy_cleanup(*i);
1251  }
1252 
1253  set<S3FanOutDnsEntry *>::iterator is = sharehandles_->begin();
1254  const set<S3FanOutDnsEntry *>::const_iterator isEnd = sharehandles_->end();
1255  for (; is != isEnd; ++is) {
1256  curl_share_cleanup((*is)->sharehandle);
1257  curl_slist_free_all((*is)->clist);
1258  delete *is;
1259  }
1260  pool_handles_idle_->clear();
1261  curl_sharehandles_->clear();
1262  sharehandles_->clear();
1263  delete active_requests_;
1264  delete pool_handles_idle_;
1265  delete pool_handles_inuse_;
1266  delete curl_sharehandles_;
1267  delete sharehandles_;
1268  delete user_agent_;
1269  curl_multi_cleanup(curl_multi_);
1270 
1271  delete statistics_;
1272 
1273  delete available_jobs_;
1274 
1275  curl_global_cleanup();
1276 }
1277 
1282  LogCvmfs(kLogS3Fanout, kLogDebug, "S3FanoutManager spawned");
1283 
1284  int retval = pthread_create(&thread_upload_, NULL, MainUpload,
1285  static_cast<void *>(this));
1286  assert(retval == 0);
1287 
1288  atomic_inc32(&multi_threaded_);
1289 }
1290 
1292  return *statistics_;
1293 }
1294 
1300  WritePipe(pipe_jobs_[1], &info, sizeof(info));
1301 }
1302 
1307  WritePipe(pipe_completed_[1], &info, sizeof(info));
1308 }
1309 
1314  JobInfo *info;
1315  ReadPipe(pipe_completed_[0], &info, sizeof(info));
1316  return info;
1317 }
1318 
1319 //------------------------------------------------------------------------------
1320 
1321 
1322 string Statistics::Print() const {
1323  return
1324  "Transferred Bytes: " +
1325  StringifyInt(uint64_t(transferred_bytes)) + "\n" +
1326  "Transfer duration: " +
1327  StringifyInt(uint64_t(transfer_time)) + " s\n" +
1328  "Number of requests: " +
1329  StringifyInt(num_requests) + "\n" +
1330  "Number of retries: " +
1331  StringifyInt(num_retries) + "\n";
1332 }
1333 
1334 } // namespace s3fanout
#define LogCvmfs(source, mask,...)
Definition: logging.h:20
unsigned throttle_ms
Definition: s3fanout.h:144
const S3Config config_
Definition: s3fanout.h:266
const char * Code2Ascii(const ObjectFetcherFailures::Failures error)
pthread_mutex_t * jobs_todo_lock_
Definition: s3fanout.h:223
atomic_int32 multi_threaded_
Definition: s3fanout.h:290
std::string ip
Definition: s3fanout.h:154
vector< string > SplitString(const string &str, const char delim, const unsigned max_chunks)
Definition: string.cc:287
pthread_mutex_t * curl_handle_lock_
Definition: s3fanout.h:224
string Sha256String(const string &content)
Definition: hash.cc:452
std::set< CURL * > * pool_handles_idle_
Definition: s3fanout.h:275
struct curl_slist * http_headers
Definition: s3fanout.h:135
static size_t CallbackCurlBody(char *, size_t size, size_t nmemb, void *)
Definition: s3fanout.cc:151
#define PANIC(...)
Definition: exception.h:26
std::string IsoTimestamp()
Definition: string.cc:148
dns::CaresResolver * resolver_
Definition: s3fanout.h:279
string Sha256Mem(const unsigned char *buffer, const unsigned buffer_size)
Definition: hash.cc:442
string Trim(const string &raw, bool trim_newline)
Definition: string.cc:420
CURLSH * sharehandle
Definition: s3fanout.h:157
double transferred_bytes
Definition: s3fanout.h:72
void InitLocaltime()
Definition: prng.h:35
void Hmac(const string &key, const unsigned char *buffer, const unsigned buffer_size, Any *any_digest)
Definition: hash.cc:274
perf::Statistics * statistics_
Definition: repository.h:138
static void * MainUpload(void *data)
Definition: s3fanout.cc:231
std::string Hmac256(const std::string &key, const std::string &content, bool raw_output)
Definition: hash.cc:458
std::string dns_name
Definition: s3fanout.h:153
const std::string object_key
Definition: s3fanout.h:104
assert((mem||(size==0))&&"Out Of Memory")
const std::set< std::string > & ipv4_addresses() const
Definition: dns.h:111
std::string MkCompleteHostname()
Definition: s3fanout.h:258
void PushNewJob(JobInfo *info)
Definition: s3fanout.cc:1298
Algorithms algorithm
Definition: hash.h:123
SynchronizingCounter< uint32_t > Semaphore
Definition: s3fanout.h:163
unsigned int counter
Definition: s3fanout.h:152
std::string Print() const
Definition: s3fanout.cc:1322
bool Debase64(const string &data, string *decoded)
Definition: string.cc:532
void MakePipe(int pipe_fd[2])
Definition: posix.cc:520
unsigned char digest[digest_size_]
Definition: hash.h:122
struct curl_slist * clist
Definition: s3fanout.h:156
std::string * user_agent_
Definition: s3fanout.h:281
Failures error_code
Definition: s3fanout.h:138
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:174
unsigned backoff_ms
Definition: s3fanout.h:142
unsigned int max_available_jobs_
Definition: s3fanout.h:311
std::string RfcTimestamp()
Definition: string.cc:126
int64_t String2Int64(const string &value)
Definition: string.cc:221
unsigned GetDigestSize() const
Definition: hash.h:164
std::set< JobInfo * > * active_requests_
Definition: s3fanout.h:273
void PushCompletedJob(JobInfo *info)
Definition: s3fanout.cc:1306
uint64_t payload_size
Definition: s3fanout.h:136
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:268
bool HasSuffix(const std::string &str, const std::string &suffix, const bool ignore_case)
Definition: string.cc:278
void SetUrlOptions(JobInfo *info) const
Definition: s3fanout.cc:966
Definition: dns.h:90
CURL * AcquireCurlHandle() const
Definition: s3fanout.cc:383
uint64_t throttle_timestamp
Definition: s3fanout.h:146
Semaphore * available_jobs_
Definition: s3fanout.h:312
static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, void *userp, void *socketp)
Definition: s3fanout.cc:161
UniquePtr< FileBackedBuffer > origin
Definition: s3fanout.h:106
void ReleaseCurlHandle(JobInfo *info, CURL *handle) const
Definition: s3fanout.cc:416
string StringifyInt(const int64_t value)
Definition: string.cc:77
struct pollfd * watch_fds_
Definition: s3fanout.h:292
uint64_t platform_monotonic_time()
std::string ExtractPort(const std::string &url)
Definition: dns.cc:125
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:264
JobInfo * PopCompletedJob()
Definition: s3fanout.cc:1313
std::string ExtractHost(const std::string &url)
Definition: dns.cc:110
unsigned char num_retries
Definition: s3fanout.h:140
std::string port
Definition: s3fanout.h:155
std::string complete_hostname_
Definition: s3fanout.h:267
uint64_t num_requests
Definition: s3fanout.h:74
void HashMem(const unsigned char *buffer, const unsigned buffer_size, Any *any_digest)
Definition: hash.cc:255
string Base64(const string &data)
Definition: string.cc:473
RequestType request
Definition: s3fanout.h:137
uint64_t String2Uint64(const string &value)
Definition: string.cc:227
double transfer_time
Definition: s3fanout.h:73
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
Definition: s3fanout.cc:1065
uint64_t num_retries
Definition: s3fanout.h:75
static CaresResolver * Create(const bool ipv4_only, const unsigned retries, const unsigned timeout_ms)
Definition: dns.cc:733
std::set< S3FanOutDnsEntry * > * sharehandles_
Definition: s3fanout.h:277
const Statistics & GetStatistics()
Definition: s3fanout.cc:1291
void SafeSleepMs(const unsigned ms)
Definition: posix.cc:1908
Statistics * statistics_
Definition: s3fanout.h:316
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:529
uint64_t timestamp_last_throttle_report_
Definition: s3fanout.h:319
static void size_t size
Definition: smalloc.h:47
std::set< CURL * > * pool_handles_inuse_
Definition: s3fanout.h:276
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:541
Definition: s3fanout.h:149
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:579
std::map< CURL *, S3FanOutDnsEntry * > * curl_sharehandles_
Definition: s3fanout.h:278
CURL * curl_handle
Definition: s3fanout.h:134
void Destroy()
Definition: pointer.h:46
Failures InitializeRequest(JobInfo *info, CURL *handle) const
Definition: s3fanout.cc:850