CernVM-FS  2.10.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
s3fanout.cc
Go to the documentation of this file.
1 
7 #include <pthread.h>
8 
9 #include <algorithm>
10 #include <cassert>
11 #include <cerrno>
12 #include <utility>
13 
14 #include "cvmfs_config.h"
15 #include "platform.h"
16 #include "s3fanout.h"
17 #include "upload_facility.h"
18 #include "util/exception.h"
19 #include "util/posix.h"
20 #include "util/string.h"
21 #include "util_concurrency.h"
22 
23 using namespace std; // NOLINT
24 
25 namespace s3fanout {
26 
27 const char *S3FanoutManager::kCacheControlCas = "Cache-Control: max-age=259200";
28 const char *S3FanoutManager::kCacheControlDotCvmfs =
29  "Cache-Control: max-age=61";
30 const unsigned S3FanoutManager::kDefault429ThrottleMs = 250;
31 const unsigned S3FanoutManager::kMax429ThrottleMs = 10000;
32 const unsigned S3FanoutManager::kThrottleReportIntervalSec = 10;
33 const unsigned S3FanoutManager::kDefaultHTTPPort = 80;
34 const unsigned S3FanoutManager::kDefaultHTTPSPort = 443;
35 
36 
40 void S3FanoutManager::DetectThrottleIndicator(
41  const std::string &header,
42  JobInfo *info)
43 {
44  std::string value_str;
45  if (HasPrefix(header, "retry-after:", true))
46  value_str = header.substr(12);
47  if (HasPrefix(header, "x-retry-in:", true))
48  value_str = header.substr(11);
49 
50  value_str = Trim(value_str, true /* trim_newline */);
51  if (!value_str.empty()) {
52  unsigned value_numeric = String2Uint64(value_str);
53  unsigned value_ms =
54  HasSuffix(value_str, "ms", true /* ignore_case */) ?
55  value_numeric : (value_numeric * 1000);
56  if (value_ms > 0)
57  info->throttle_ms = std::min(value_ms, kMax429ThrottleMs);
58  }
59 }
60 
61 
65 static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb,
66  void *info_link) {
67  const size_t num_bytes = size*nmemb;
68  const string header_line(static_cast<const char *>(ptr), num_bytes);
69  JobInfo *info = static_cast<JobInfo *>(info_link);
70 
71  // Check for http status code errors
72  if (HasPrefix(header_line, "HTTP/1.", false)) {
73  if (header_line.length() < 10)
74  return 0;
75 
76  unsigned i;
77  for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {}
78 
79  if (header_line[i] == '2') {
80  return num_bytes;
81  } else {
82  LogCvmfs(kLogS3Fanout, kLogDebug, "http status error code [info %p]: %s",
83  info, header_line.c_str());
84  if (header_line.length() < i+3) {
85  LogCvmfs(kLogS3Fanout, kLogStderr, "S3: invalid HTTP response '%s'",
86  header_line.c_str());
87  info->error_code = kFailOther;
88  return 0;
89  }
90  info->http_error = String2Int64(string(&header_line[i], 3));
91 
92  switch (info->http_error) {
93  case 429:
94  info->error_code = kFailRetry;
95  info->throttle_ms = S3FanoutManager::kDefault429ThrottleMs;
97  return num_bytes;
98  case 503:
99  case 502: // Can happen if the S3 gateway-backend connection breaks
101  break;
102  case 501:
103  case 400:
104  info->error_code = kFailBadRequest;
105  break;
106  case 403:
107  info->error_code = kFailForbidden;
108  break;
109  case 404:
110  info->error_code = kFailNotFound;
111  return num_bytes;
112  default:
113  info->error_code = kFailOther;
114  }
115  return 0;
116  }
117  }
118 
119  if (info->error_code == kFailRetry) {
120  S3FanoutManager::DetectThrottleIndicator(header_line, info);
121  }
122 
123  return num_bytes;
124 }
125 
126 
130 static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb,
131  void *info_link) {
132  const size_t num_bytes = size*nmemb;
133  JobInfo *info = static_cast<JobInfo *>(info_link);
134 
135  LogCvmfs(kLogS3Fanout, kLogDebug, "Data callback with %d bytes", num_bytes);
136 
137  if (num_bytes == 0)
138  return 0;
139 
140  uint64_t read_bytes = info->origin->Read(ptr, num_bytes);
141 
143  "source buffer pushed out %d bytes", read_bytes);
144 
145  return read_bytes;
146 }
147 
148 
152 static size_t CallbackCurlBody(
153  char * /*ptr*/, size_t size, size_t nmemb, void * /*userdata*/)
154 {
155  return size * nmemb;
156 }
157 
158 
162 int S3FanoutManager::CallbackCurlSocket(CURL *easy, curl_socket_t s, int action,
163  void *userp, void *socketp) {
164  S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(userp);
165  LogCvmfs(kLogS3Fanout, kLogDebug, "CallbackCurlSocket called with easy "
166  "handle %p, socket %d, action %d, up %d, "
167  "sp %d, fds_inuse %d, jobs %d",
168  easy, s, action, userp,
169  socketp, s3fanout_mgr->watch_fds_inuse_,
170  s3fanout_mgr->available_jobs_->Get());
171  if (action == CURL_POLL_NONE)
172  return 0;
173 
174  // Find s in watch_fds_
175  // First 2 fds are job and terminate pipes (not curl related)
176  unsigned index;
177  for (index = 2; index < s3fanout_mgr->watch_fds_inuse_; ++index) {
178  if (s3fanout_mgr->watch_fds_[index].fd == s)
179  break;
180  }
181  // Or create newly
182  if (index == s3fanout_mgr->watch_fds_inuse_) {
183  // Extend array if necessary
184  if (s3fanout_mgr->watch_fds_inuse_ == s3fanout_mgr->watch_fds_size_) {
185  s3fanout_mgr->watch_fds_size_ *= 2;
186  s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
187  srealloc(s3fanout_mgr->watch_fds_,
188  s3fanout_mgr->watch_fds_size_*sizeof(struct pollfd)));
189  }
190  s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].fd = s;
191  s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].events = 0;
192  s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].revents = 0;
193  s3fanout_mgr->watch_fds_inuse_++;
194  }
195 
196  switch (action) {
197  case CURL_POLL_IN:
198  s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI;
199  break;
200  case CURL_POLL_OUT:
201  s3fanout_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND;
202  break;
203  case CURL_POLL_INOUT:
204  s3fanout_mgr->watch_fds_[index].events =
205  POLLIN | POLLPRI | POLLOUT | POLLWRBAND;
206  break;
207  case CURL_POLL_REMOVE:
208  if (index < s3fanout_mgr->watch_fds_inuse_-1)
209  s3fanout_mgr->watch_fds_[index] =
210  s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_-1];
211  s3fanout_mgr->watch_fds_inuse_--;
212  // Shrink array if necessary
213  if ((s3fanout_mgr->watch_fds_inuse_ > s3fanout_mgr->watch_fds_max_) &&
214  (s3fanout_mgr->watch_fds_inuse_ < s3fanout_mgr->watch_fds_size_/2)) {
215  s3fanout_mgr->watch_fds_size_ /= 2;
216  s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
217  srealloc(s3fanout_mgr->watch_fds_,
218  s3fanout_mgr->watch_fds_size_*sizeof(struct pollfd)));
219  }
220  break;
221  default:
222  PANIC(NULL);
223  }
224 
225  return 0;
226 }
227 
228 
232 void *S3FanoutManager::MainUpload(void *data) {
233  LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread started");
234  S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(data);
235 
236  s3fanout_mgr->InitPipeWatchFds();
237 
238  // Don't schedule more jobs into the multi handle than the maximum number of
239  // parallel connections. This should prevent starvation and thus a timeout
240  // of the authorization header (CVM-1339).
241  unsigned jobs_in_flight = 0;
242 
243  while (true) {
244  // Check events with 100ms timeout
245  int timeout_ms = 100;
246  int retval = poll(s3fanout_mgr->watch_fds_, s3fanout_mgr->watch_fds_inuse_,
247  timeout_ms);
248  if (retval == 0) {
249  // Handle timeout
250  int still_running = 0;
251  retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
252  CURL_SOCKET_TIMEOUT,
253  0,
254  &still_running);
255  if (retval != CURLM_OK) {
256  LogCvmfs(kLogS3Fanout, kLogStderr, "Error, timeout due to: %d", retval);
257  assert(retval == CURLM_OK);
258  }
259  } else if (retval < 0) {
260  assert(errno == EINTR);
261  continue;
262  }
263 
264  // Terminate I/O thread
265  if (s3fanout_mgr->watch_fds_[0].revents)
266  break;
267 
268  // New job incoming
269  if (s3fanout_mgr->watch_fds_[1].revents) {
270  s3fanout_mgr->watch_fds_[1].revents = 0;
271  JobInfo *info;
272  ReadPipe(s3fanout_mgr->pipe_jobs_[0], &info, sizeof(info));
273  CURL *handle = s3fanout_mgr->AcquireCurlHandle();
274  if (handle == NULL) {
275  PANIC(kLogStderr, "Failed to acquire CURL handle.");
276  }
277  s3fanout::Failures init_failure =
278  s3fanout_mgr->InitializeRequest(info, handle);
279  if (init_failure != s3fanout::kFailOk) {
281  "Failed to initialize CURL handle (error: %d - %s | errno: %d)",
282  init_failure, Code2Ascii(init_failure), errno);
283  }
284  s3fanout_mgr->SetUrlOptions(info);
285 
286  curl_multi_add_handle(s3fanout_mgr->curl_multi_, handle);
287  s3fanout_mgr->active_requests_->insert(info);
288  jobs_in_flight++;
289  int still_running = 0, retval = 0;
290  retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
291  CURL_SOCKET_TIMEOUT,
292  0,
293  &still_running);
294 
296  "curl_multi_socket_action: %d - %d",
297  retval, still_running);
298  }
299 
300 
301  // Activity on curl sockets
302  // Within this loop the curl_multi_socket_action() may cause socket(s)
303  // to be removed from watch_fds_. If a socket is removed it is replaced
304  // by the socket at the end of the array and the inuse count is decreased.
305  // Therefore loop over the array in reverse order.
306  // First 2 fds are job and terminate pipes (not curl related)
307  for (int32_t i = s3fanout_mgr->watch_fds_inuse_ - 1; i >= 2; --i) {
308  if (static_cast<uint32_t>(i) >= s3fanout_mgr->watch_fds_inuse_) {
309  continue;
310  }
311  if (s3fanout_mgr->watch_fds_[i].revents) {
312  int ev_bitmask = 0;
313  if (s3fanout_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI))
314  ev_bitmask |= CURL_CSELECT_IN;
315  if (s3fanout_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
316  ev_bitmask |= CURL_CSELECT_OUT;
317  if (s3fanout_mgr->watch_fds_[i].revents &
318  (POLLERR | POLLHUP | POLLNVAL))
319  ev_bitmask |= CURL_CSELECT_ERR;
320  s3fanout_mgr->watch_fds_[i].revents = 0;
321 
322  int still_running = 0;
323  retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
324  s3fanout_mgr->watch_fds_[i].fd,
325  ev_bitmask,
326  &still_running);
327  }
328  }
329 
330  // Check if transfers are completed
331  CURLMsg *curl_msg;
332  int msgs_in_queue;
333  while ((curl_msg = curl_multi_info_read(s3fanout_mgr->curl_multi_,
334  &msgs_in_queue)))
335  {
336  assert(curl_msg->msg == CURLMSG_DONE);
337 
338  s3fanout_mgr->statistics_->num_requests++;
339  JobInfo *info;
340  CURL *easy_handle = curl_msg->easy_handle;
341  int curl_error = curl_msg->data.result;
342  curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
343 
344  curl_multi_remove_handle(s3fanout_mgr->curl_multi_, easy_handle);
345  if (s3fanout_mgr->VerifyAndFinalize(curl_error, info)) {
346  curl_multi_add_handle(s3fanout_mgr->curl_multi_, easy_handle);
347  int still_running = 0;
348  curl_multi_socket_action(s3fanout_mgr->curl_multi_,
349  CURL_SOCKET_TIMEOUT,
350  0,
351  &still_running);
352  } else {
353  // Return easy handle into pool and write result back
354  jobs_in_flight--;
355  s3fanout_mgr->active_requests_->erase(info);
356  s3fanout_mgr->ReleaseCurlHandle(info, easy_handle);
357  s3fanout_mgr->available_jobs_->Decrement();
358 
359  // Add to list of completed jobs
360  s3fanout_mgr->PushCompletedJob(info);
361  }
362  }
363  }
364 
365  set<CURL *>::iterator i = s3fanout_mgr->pool_handles_inuse_->begin();
366  const set<CURL *>::const_iterator i_end =
367  s3fanout_mgr->pool_handles_inuse_->end();
368  for (; i != i_end; ++i) {
369  curl_multi_remove_handle(s3fanout_mgr->curl_multi_, *i);
370  curl_easy_cleanup(*i);
371  }
372  s3fanout_mgr->pool_handles_inuse_->clear();
373  free(s3fanout_mgr->watch_fds_);
374 
375  LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread terminated");
376  return NULL;
377 }
378 
379 
384 CURL *S3FanoutManager::AcquireCurlHandle() const {
385  CURL *handle;
386 
387  MutexLockGuard guard(curl_handle_lock_);
388 
389  if (pool_handles_idle_->empty()) {
390  CURLcode retval;
391 
392  // Create a new handle
393  handle = curl_easy_init();
394  assert(handle != NULL);
395 
396  // Other settings
397  retval = curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
398  assert(retval == CURLE_OK);
399  retval = curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION,
401  assert(retval == CURLE_OK);
402  retval = curl_easy_setopt(handle, CURLOPT_READFUNCTION, CallbackCurlData);
403  assert(retval == CURLE_OK);
404  retval = curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlBody);
405  assert(retval == CURLE_OK);
406  } else {
407  handle = *(pool_handles_idle_->begin());
408  pool_handles_idle_->erase(pool_handles_idle_->begin());
409  }
410 
411  pool_handles_inuse_->insert(handle);
412 
413  return handle;
414 }
415 
416 
417 void S3FanoutManager::ReleaseCurlHandle(JobInfo *info, CURL *handle) const {
418  if (info->http_headers) {
419  curl_slist_free_all(info->http_headers);
420  info->http_headers = NULL;
421  }
422 
423  MutexLockGuard guard(curl_handle_lock_);
424 
425  set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
426  assert(elem != pool_handles_inuse_->end());
427 
428  if (pool_handles_idle_->size() > config_.pool_max_handles) {
429  CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, NULL);
430  assert(retval == CURLE_OK);
431  curl_easy_cleanup(handle);
432  std::map<CURL *, S3FanOutDnsEntry *>::size_type retitems =
433  curl_sharehandles_->erase(handle);
434  assert(retitems == 1);
435  } else {
436  pool_handles_idle_->insert(handle);
437  }
438 
439  pool_handles_inuse_->erase(elem);
440 }
441 
442 void S3FanoutManager::InitPipeWatchFds() {
443  assert(watch_fds_inuse_ == 0);
444  assert(watch_fds_size_ >= 2);
445  watch_fds_[0].fd = pipe_terminate_[0];
446  watch_fds_[0].events = POLLIN | POLLPRI;
447  watch_fds_[0].revents = 0;
448  ++watch_fds_inuse_;
449  watch_fds_[1].fd = pipe_jobs_[0];
450  watch_fds_[1].events = POLLIN | POLLPRI;
451  watch_fds_[1].revents = 0;
452  ++watch_fds_inuse_;
453 }
454 
459 bool S3FanoutManager::MkV2Authz(const JobInfo &info, vector<string> *headers)
460  const
461 {
462  string payload_hash;
463  bool retval = MkPayloadHash(info, &payload_hash);
464  if (!retval)
465  return false;
466  string content_type = GetContentType(info);
467  string request = GetRequestString(info);
468 
469  string timestamp = RfcTimestamp();
470  string to_sign = request + "\n" +
471  payload_hash + "\n" +
472  content_type + "\n" +
473  timestamp + "\n" +
474  "x-amz-acl:public-read" + "\n" + // default ACL
475  "/" + config_.bucket + "/" + info.object_key;
476  LogCvmfs(kLogS3Fanout, kLogDebug, "%s string to sign for: %s",
477  request.c_str(), info.object_key.c_str());
478 
479  shash::Any hmac;
480  hmac.algorithm = shash::kSha1;
481  shash::Hmac(config_.secret_key,
482  reinterpret_cast<const unsigned char *>(to_sign.data()),
483  to_sign.length(), &hmac);
484 
485  headers->push_back("Authorization: AWS " + config_.access_key + ":" +
486  Base64(string(reinterpret_cast<char *>(hmac.digest),
487  hmac.GetDigestSize())));
488  headers->push_back("Date: " + timestamp);
489  headers->push_back("X-Amz-Acl: public-read");
490  if (!payload_hash.empty())
491  headers->push_back("Content-MD5: " + payload_hash);
492  if (!content_type.empty())
493  headers->push_back("Content-Type: " + content_type);
494  return true;
495 }
496 
497 
498 string S3FanoutManager::GetUriEncode(const string &val, bool encode_slash)
499  const
500 {
501  string result;
502  const unsigned len = val.length();
503  result.reserve(len);
504  for (unsigned i = 0; i < len; ++i) {
505  char c = val[i];
506  if ((c >= 'A' && c <= 'Z') ||
507  (c >= 'a' && c <= 'z') ||
508  (c >= '0' && c <= '9') ||
509  c == '_' || c == '-' || c == '~' || c == '.')
510  {
511  result.push_back(c);
512  } else if (c == '/') {
513  if (encode_slash) {
514  result += "%2F";
515  } else {
516  result.push_back(c);
517  }
518  } else {
519  result.push_back('%');
520  result.push_back((c / 16) + ((c / 16 <= 9) ? '0' : 'A'-10));
521  result.push_back((c % 16) + ((c % 16 <= 9) ? '0' : 'A'-10));
522  }
523  }
524  return result;
525 }
526 
527 
528 string S3FanoutManager::GetAwsV4SigningKey(const string &date) const
529 {
530  if (last_signing_key_.first == date)
531  return last_signing_key_.second;
532 
533  string date_key = shash::Hmac256("AWS4" + config_.secret_key, date, true);
534  string date_region_key = shash::Hmac256(date_key, config_.region, true);
535  string date_region_service_key = shash::Hmac256(date_region_key, "s3", true);
536  string signing_key =
537  shash::Hmac256(date_region_service_key, "aws4_request", true);
538  last_signing_key_.first = date;
539  last_signing_key_.second = signing_key;
540  return signing_key;
541 }
542 
543 
548 bool S3FanoutManager::MkV4Authz(const JobInfo &info, vector<string> *headers)
549  const
550 {
551  string payload_hash;
552  bool retval = MkPayloadHash(info, &payload_hash);
553  if (!retval)
554  return false;
555  string content_type = GetContentType(info);
556  string timestamp = IsoTimestamp();
557  string date = timestamp.substr(0, 8);
558  vector<string> tokens = SplitString(complete_hostname_, ':');
559  assert(tokens.size() <= 2);
560  string canonical_hostname = tokens[0];
561 
562  // if we could split the hostname in two and if the port is *NOT* a default
563  // one
564  if (tokens.size() == 2 && !((String2Uint64(tokens[1]) == kDefaultHTTPPort) ||
565  (String2Uint64(tokens[1]) == kDefaultHTTPSPort)))
566  canonical_hostname += ":" + tokens[1];
567 
568  string signed_headers;
569  string canonical_headers;
570  if (!content_type.empty()) {
571  signed_headers += "content-type;";
572  headers->push_back("Content-Type: " + content_type);
573  canonical_headers += "content-type:" + content_type + "\n";
574  }
575  signed_headers += "host;x-amz-acl;x-amz-content-sha256;x-amz-date";
576  canonical_headers +=
577  "host:" + canonical_hostname + "\n" +
578  "x-amz-acl:public-read\n"
579  "x-amz-content-sha256:" + payload_hash + "\n" +
580  "x-amz-date:" + timestamp + "\n";
581 
582  string scope = date + "/" + config_.region + "/s3/aws4_request";
583  string uri = config_.dns_buckets ?
584  (string("/") + info.object_key) :
585  (string("/") + config_.bucket + "/" + info.object_key);
586 
587  string canonical_request =
588  GetRequestString(info) + "\n" +
589  GetUriEncode(uri, false) + "\n" +
590  "\n" +
591  canonical_headers + "\n" +
592  signed_headers + "\n" +
593  payload_hash;
594 
595  string hash_request = shash::Sha256String(canonical_request.c_str());
596 
597  string string_to_sign =
598  "AWS4-HMAC-SHA256\n" +
599  timestamp + "\n" +
600  scope + "\n" +
601  hash_request;
602 
603  string signing_key = GetAwsV4SigningKey(date);
604  string signature = shash::Hmac256(signing_key, string_to_sign);
605 
606  headers->push_back("X-Amz-Acl: public-read");
607  headers->push_back("X-Amz-Content-Sha256: " + payload_hash);
608  headers->push_back("X-Amz-Date: " + timestamp);
609  headers->push_back(
610  "Authorization: AWS4-HMAC-SHA256 "
611  "Credential=" + config_.access_key + "/" + scope + ","
612  "SignedHeaders=" + signed_headers + ","
613  "Signature=" + signature);
614  return true;
615 }
616 
621 bool S3FanoutManager::MkAzureAuthz(const JobInfo &info, vector<string> *headers)
622  const
623 {
624  string timestamp = RfcTimestamp();
625  string canonical_headers =
626  "x-ms-blob-type:BlockBlob\nx-ms-date:" +
627  timestamp +
628  "\nx-ms-version:2011-08-18";
629  string canonical_resource =
630  "/" + config_.access_key + "/" + config_.bucket + "/" + info.object_key;
631 
632  string string_to_sign;
633  if ((info.request == JobInfo::kReqHeadOnly) ||
634  (info.request == JobInfo::kReqHeadPut) ||
635  (info.request == JobInfo::kReqDelete)) {
636  string_to_sign =
637  GetRequestString(info) +
638  string("\n\n\n") +
639  "\n\n\n\n\n\n\n\n\n" +
640  canonical_headers + "\n" +
641  canonical_resource;
642  } else {
643  string_to_sign =
644  GetRequestString(info) +
645  string("\n\n\n") +
646  string(StringifyInt(info.origin->GetSize())) + "\n\n\n\n\n\n\n\n\n" +
647  canonical_headers + "\n" +
648  canonical_resource;
649  }
650 
651  string signing_key;
652  int retval = Debase64(config_.secret_key, &signing_key);
653  if (!retval)
654  return false;
655 
656  string signature = shash::Hmac256(signing_key, string_to_sign, true);
657 
658  headers->push_back("x-ms-date: " + timestamp);
659  headers->push_back("x-ms-version: 2011-08-18");
660  headers->push_back(
661  "Authorization: SharedKey " + config_.access_key + ":" + Base64(signature));
662  headers->push_back("x-ms-blob-type: BlockBlob");
663  return true;
664 }
665 
666 void S3FanoutManager::InitializeDnsSettingsCurl(
667  CURL *handle,
668  CURLSH *sharehandle,
669  curl_slist *clist) const
670 {
671  CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, sharehandle);
672  assert(retval == CURLE_OK);
673  retval = curl_easy_setopt(handle, CURLOPT_RESOLVE, clist);
674  assert(retval == CURLE_OK);
675 }
676 
677 
678 int S3FanoutManager::InitializeDnsSettings(
679  CURL *handle,
680  std::string host_with_port) const
681 {
682  // Use existing handle
683  std::map<CURL *, S3FanOutDnsEntry *>::const_iterator it =
684  curl_sharehandles_->find(handle);
685  if (it != curl_sharehandles_->end()) {
686  InitializeDnsSettingsCurl(handle, it->second->sharehandle,
687  it->second->clist);
688  return 0;
689  }
690 
691  // Add protocol information for extraction of fields for DNS
692  if (!IsHttpUrl(host_with_port))
693  host_with_port = config_.protocol + "://" + host_with_port;
694  std::string remote_host = dns::ExtractHost(host_with_port);
695  std::string remote_port = dns::ExtractPort(host_with_port);
696 
697  // If we have the name already resolved, use the least used IP
698  S3FanOutDnsEntry *useme = NULL;
699  unsigned int usemin = UINT_MAX;
700  std::set<S3FanOutDnsEntry *>::iterator its3 = sharehandles_->begin();
701  for (; its3 != sharehandles_->end(); ++its3) {
702  if ((*its3)->dns_name == remote_host) {
703  if (usemin >= (*its3)->counter) {
704  usemin = (*its3)->counter;
705  useme = (*its3);
706  }
707  }
708  }
709  if (useme != NULL) {
710  curl_sharehandles_->insert(std::pair<CURL *,
711  S3FanOutDnsEntry *>(handle, useme));
712  useme->counter++;
713  InitializeDnsSettingsCurl(handle, useme->sharehandle, useme->clist);
714  return 0;
715  }
716 
717  // We need to resolve the hostname
718  // TODO(ssheikki): support ipv6 also... if (opt_ipv4_only_)
719  dns::Host host = resolver_->Resolve(remote_host);
720  set<string> ipv4_addresses = host.ipv4_addresses();
721  std::set<string>::iterator its = ipv4_addresses.begin();
722  S3FanOutDnsEntry *dnse = NULL;
723  for ( ; its != ipv4_addresses.end(); ++its) {
724  dnse = new S3FanOutDnsEntry();
725  dnse->counter = 0;
726  dnse->dns_name = remote_host;
727  dnse->port = remote_port.size() == 0 ? "80" : remote_port;
728  dnse->ip = *its;
729  dnse->clist = NULL;
730  dnse->clist = curl_slist_append(dnse->clist,
731  (dnse->dns_name+":"+
732  dnse->port+":"+
733  dnse->ip).c_str());
734  dnse->sharehandle = curl_share_init();
735  assert(dnse->sharehandle != NULL);
736  CURLSHcode share_retval = curl_share_setopt(dnse->sharehandle,
737  CURLSHOPT_SHARE,
738  CURL_LOCK_DATA_DNS);
739  assert(share_retval == CURLSHE_OK);
740  sharehandles_->insert(dnse);
741  }
742  if (dnse == NULL) {
744  "Error: DNS resolve failed for address '%s'.",
745  remote_host.c_str());
746  assert(dnse != NULL);
747  return -1;
748  }
749  curl_sharehandles_->insert(
750  std::pair<CURL *, S3FanOutDnsEntry *>(handle, dnse));
751  dnse->counter++;
752  InitializeDnsSettingsCurl(handle, dnse->sharehandle, dnse->clist);
753 
754  return 0;
755 }
756 
757 
758 bool S3FanoutManager::MkPayloadHash(const JobInfo &info, string *hex_hash)
759  const
760 {
761  if ((info.request == JobInfo::kReqHeadOnly) ||
762  (info.request == JobInfo::kReqHeadPut) ||
763  (info.request == JobInfo::kReqDelete))
764  {
765  switch (config_.authz_method) {
766  case kAuthzAwsV2:
767  hex_hash->clear();
768  break;
769  case kAuthzAwsV4:
770  // Sha256 over empty string
771  *hex_hash =
772  "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
773  break;
774  case kAuthzAzure:
775  // no payload hash required for Azure signature
776  hex_hash->clear();
777  break;
778  default:
779  PANIC(NULL);
780  }
781  return true;
782  }
783 
784  // PUT, there is actually payload
785  shash::Any payload_hash(shash::kMd5);
786 
787  unsigned char *data;
788  unsigned int nbytes =
789  info.origin->Data(reinterpret_cast<void **>(&data),
790  info.origin->GetSize(), 0);
791  assert(nbytes == info.origin->GetSize());
792 
793  switch (config_.authz_method) {
794  case kAuthzAwsV2:
795  shash::HashMem(data, nbytes, &payload_hash);
796  *hex_hash =
797  Base64(string(reinterpret_cast<char *>(payload_hash.digest),
798  payload_hash.GetDigestSize()));
799  return true;
800  case kAuthzAwsV4:
801  *hex_hash =
802  shash::Sha256Mem(data, nbytes);
803  return true;
804  case kAuthzAzure:
805  // no payload hash required for Azure signature
806  hex_hash->clear();
807  return true;
808  default:
809  PANIC(NULL);
810  }
811 }
812 
813 string S3FanoutManager::GetRequestString(const JobInfo &info) const {
814  switch (info.request) {
815  case JobInfo::kReqHeadOnly:
816  case JobInfo::kReqHeadPut:
817  return "HEAD";
818  case JobInfo::kReqPutCas:
819  case JobInfo::kReqPutDotCvmfs:
820  case JobInfo::kReqPutHtml:
821  case JobInfo::kReqPutBucket:
822  return "PUT";
823  case JobInfo::kReqDelete:
824  return "DELETE";
825  default:
826  PANIC(NULL);
827  }
828 }
829 
830 
831 string S3FanoutManager::GetContentType(const JobInfo &info) const {
832  switch (info.request) {
833  case JobInfo::kReqHeadOnly:
834  case JobInfo::kReqHeadPut:
835  case JobInfo::kReqDelete:
836  return "";
837  case JobInfo::kReqPutCas:
838  return "application/octet-stream";
839  case JobInfo::kReqPutDotCvmfs:
840  return "application/x-cvmfs";
841  case JobInfo::kReqPutHtml:
842  return "text/html";
843  case JobInfo::kReqPutBucket:
844  return "text/xml";
845  default:
846  PANIC(NULL);
847  }
848 }
849 
850 
855 Failures S3FanoutManager::InitializeRequest(JobInfo *info, CURL *handle) const {
856  // Initialize internal download state
857  info->curl_handle = handle;
858  info->error_code = kFailOk;
859  info->http_error = 0;
860  info->num_retries = 0;
861  info->backoff_ms = 0;
862  info->throttle_ms = 0;
863  info->throttle_timestamp = 0;
864  info->http_headers = NULL;
865  // info->payload_size is needed in S3Uploader::MainCollectResults,
866  // where info->origin is already destroyed.
867  info->payload_size = info->origin->GetSize();
868 
869  InitializeDnsSettings(handle, complete_hostname_);
870 
871  CURLcode retval;
872  if ((info->request == JobInfo::kReqHeadOnly) ||
873  (info->request == JobInfo::kReqHeadPut) ||
874  (info->request == JobInfo::kReqDelete))
875  {
876  retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 0);
877  assert(retval == CURLE_OK);
878  retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
879  assert(retval == CURLE_OK);
880 
881  if (info->request == JobInfo::kReqDelete)
882  {
883  retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST,
884  GetRequestString(*info).c_str());
885  assert(retval == CURLE_OK);
886  } else {
887  retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
888  assert(retval == CURLE_OK);
889  }
890  } else {
891  retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
892  assert(retval == CURLE_OK);
893  retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 1);
894  assert(retval == CURLE_OK);
895  retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 0);
896  assert(retval == CURLE_OK);
897  retval = curl_easy_setopt(handle, CURLOPT_INFILESIZE_LARGE,
898  static_cast<curl_off_t>(info->origin->GetSize()));
899  assert(retval == CURLE_OK);
900 
901  if (info->request == JobInfo::kReqPutDotCvmfs) {
902  info->http_headers =
903  curl_slist_append(info->http_headers, kCacheControlDotCvmfs);
904  } else if (info->request == JobInfo::kReqPutCas) {
905  info->http_headers =
906  curl_slist_append(info->http_headers, kCacheControlCas);
907  }
908  }
909 
910  bool retval_b;
911 
912  // Authorization
913  vector<string> authz_headers;
914  switch (config_.authz_method) {
915  case kAuthzAwsV2:
916  retval_b = MkV2Authz(*info, &authz_headers);
917  break;
918  case kAuthzAwsV4:
919  retval_b = MkV4Authz(*info, &authz_headers);
920  break;
921  case kAuthzAzure:
922  retval_b = MkAzureAuthz(*info, &authz_headers);
923  break;
924  default:
925  PANIC(NULL);
926  }
927  if (!retval_b)
928  return kFailLocalIO;
929  for (unsigned i = 0; i < authz_headers.size(); ++i) {
930  info->http_headers =
931  curl_slist_append(info->http_headers, authz_headers[i].c_str());
932  }
933 
934  // Common headers
935  info->http_headers =
936  curl_slist_append(info->http_headers, "Connection: Keep-Alive");
937  info->http_headers = curl_slist_append(info->http_headers, "Pragma:");
938  // No 100-continue
939  info->http_headers = curl_slist_append(info->http_headers, "Expect:");
940  // Strip unnecessary header
941  info->http_headers = curl_slist_append(info->http_headers, "Accept:");
942  info->http_headers = curl_slist_append(info->http_headers,
943  user_agent_->c_str());
944 
945  // Set curl parameters
946  retval = curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
947  assert(retval == CURLE_OK);
948  retval = curl_easy_setopt(handle, CURLOPT_HEADERDATA,
949  static_cast<void *>(info));
950  assert(retval == CURLE_OK);
951  retval = curl_easy_setopt(handle, CURLOPT_READDATA,
952  static_cast<void *>(info));
953  assert(retval == CURLE_OK);
954  retval = curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->http_headers);
955  assert(retval == CURLE_OK);
956  if (opt_ipv4_only_) {
957  retval = curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
958  assert(retval == CURLE_OK);
959  }
960  // Follow HTTP redirects
961  retval = curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L);
962  assert(retval == CURLE_OK);
963 
964  retval = curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, info->errorbuffer);
965  assert(retval == CURLE_OK);
966 
967  if (config_.protocol == "https") {
968  retval = curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 1L);
969  assert(retval == CURLE_OK);
970  retval = curl_easy_setopt(handle, CURLOPT_PROXY_SSL_VERIFYPEER, 1L);
971  assert(retval == CURLE_OK);
972  bool add_cert = ssl_certificate_store_.ApplySslCertificatePath(handle);
973  assert(add_cert);
974  }
975 
976  return kFailOk;
977 }
978 
979 
983 void S3FanoutManager::SetUrlOptions(JobInfo *info) const {
984  CURL *curl_handle = info->curl_handle;
985  CURLcode retval;
986 
987  retval = curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT,
988  config_.opt_timeout_sec);
989  assert(retval == CURLE_OK);
990  retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT,
991  kLowSpeedLimit);
992  assert(retval == CURLE_OK);
993  retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME,
994  config_.opt_timeout_sec);
995  assert(retval == CURLE_OK);
996 
997  if (is_curl_debug_) {
998  retval = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE, 1);
999  assert(retval == CURLE_OK);
1000  }
1001 
1002  string url = MkUrl(info->object_key);
1003  retval = curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
1004  assert(retval == CURLE_OK);
1005 
1006  retval = curl_easy_setopt(curl_handle, CURLOPT_PROXY, config_.proxy.c_str());
1007  assert(retval == CURLE_OK);
1008 }
1009 
1010 
1014 void S3FanoutManager::UpdateStatistics(CURL *handle) {
1015  double val;
1016 
1017  if (curl_easy_getinfo(handle, CURLINFO_SIZE_UPLOAD, &val) == CURLE_OK)
1018  statistics_->transferred_bytes += val;
1019 }
1020 
1021 
1025 bool S3FanoutManager::CanRetry(const JobInfo *info) {
1026  return
1027  (info->error_code == kFailHostConnection ||
1028  info->error_code == kFailHostResolve ||
1030  info->error_code == kFailRetry) &&
1031  (info->num_retries < config_.opt_max_retries);
1032 }
1033 
1034 
1040 void S3FanoutManager::Backoff(JobInfo *info) {
1041  if (info->error_code != kFailRetry)
1042  info->num_retries++;
1043  statistics_->num_retries++;
1044 
1045  if (info->throttle_ms > 0) {
1046  LogCvmfs(kLogS3Fanout, kLogDebug, "throttling for %d ms",
1047  info->throttle_ms);
1048  uint64_t now = platform_monotonic_time();
1049  if ((info->throttle_timestamp + (info->throttle_ms / 1000)) >= now) {
1050  if ((now - timestamp_last_throttle_report_) > kThrottleReportIntervalSec)
1051  {
1053  "Warning: S3 backend throttling %ums "
1054  "(total backoff time so far %ums)",
1055  info->throttle_ms,
1056  statistics_->ms_throttled);
1057  timestamp_last_throttle_report_ = now;
1058  }
1059  statistics_->ms_throttled += info->throttle_ms;
1060  SafeSleepMs(info->throttle_ms);
1061  }
1062  } else {
1063  if (info->backoff_ms == 0) {
1064  // Must be != 0
1065  info->backoff_ms = prng_.Next(config_.opt_backoff_init_ms + 1);
1066  } else {
1067  info->backoff_ms *= 2;
1068  }
1069  if (info->backoff_ms > config_.opt_backoff_max_ms)
1070  info->backoff_ms = config_.opt_backoff_max_ms;
1071 
1072  LogCvmfs(kLogS3Fanout, kLogDebug, "backing off for %d ms",
1073  info->backoff_ms);
1074  SafeSleepMs(info->backoff_ms);
1075  }
1076 }
1077 
1078 
1085 bool S3FanoutManager::VerifyAndFinalize(const int curl_error, JobInfo *info) {
1086  LogCvmfs(kLogS3Fanout, kLogDebug, "Verify uploaded/tested object %s "
1087  "(curl error %d, info error %d, info request %d)",
1088  info->object_key.c_str(),
1089  curl_error, info->error_code, info->request);
1090  UpdateStatistics(info->curl_handle);
1091 
1092  // Verification and error classification
1093  switch (curl_error) {
1094  case CURLE_OK:
1095  if ((info->error_code != kFailRetry) &&
1096  (info->error_code != kFailNotFound))
1097  {
1098  info->error_code = kFailOk;
1099  }
1100  break;
1101  case CURLE_UNSUPPORTED_PROTOCOL:
1102  case CURLE_URL_MALFORMAT:
1103  info->error_code = kFailBadRequest;
1104  break;
1105  case CURLE_COULDNT_RESOLVE_HOST:
1106  info->error_code = kFailHostResolve;
1107  break;
1108  case CURLE_COULDNT_CONNECT:
1109  case CURLE_OPERATION_TIMEDOUT:
1110  case CURLE_SEND_ERROR:
1111  case CURLE_RECV_ERROR:
1113  break;
1114  case CURLE_ABORTED_BY_CALLBACK:
1115  case CURLE_WRITE_ERROR:
1116  // Error set by callback
1117  break;
1118  default:
1120  "unexpected curl error (%d) while trying to upload %s: %s",
1121  curl_error, info->object_key.c_str(), info->errorbuffer);
1122  info->error_code = kFailOther;
1123  break;
1124  }
1125 
1126  // Transform HEAD to PUT request
1127  if ((info->error_code == kFailNotFound) &&
1128  (info->request == JobInfo::kReqHeadPut))
1129  {
1130  LogCvmfs(kLogS3Fanout, kLogDebug, "not found: %s, uploading",
1131  info->object_key.c_str());
1132  info->request = JobInfo::kReqPutCas;
1133  curl_slist_free_all(info->http_headers);
1134  info->http_headers = NULL;
1135  s3fanout::Failures init_failure = InitializeRequest(info,
1136  info->curl_handle);
1137 
1138  if (init_failure != s3fanout::kFailOk) {
1139  PANIC(kLogStderr,
1140  "Failed to initialize CURL handle "
1141  "(error: %d - %s | errno: %d)",
1142  init_failure, Code2Ascii(init_failure), errno);
1143  }
1144  SetUrlOptions(info);
1145  // Reset origin
1146  info->origin->Rewind();
1147  return true; // Again, Put
1148  }
1149 
1150  // Determination if failed request should be repeated
1151  bool try_again = false;
1152  if (info->error_code != kFailOk) {
1153  try_again = CanRetry(info);
1154  }
1155  if (try_again) {
1156  if (info->request == JobInfo::kReqPutCas ||
1157  info->request == JobInfo::kReqPutDotCvmfs ||
1158  info->request == JobInfo::kReqPutHtml) {
1159  LogCvmfs(kLogS3Fanout, kLogDebug, "Trying again to upload %s",
1160  info->object_key.c_str());
1161  // Reset origin
1162  info->origin->Rewind();
1163  }
1164  Backoff(info);
1165  info->error_code = kFailOk;
1166  info->http_error = 0;
1167  info->throttle_ms = 0;
1168  info->backoff_ms = 0;
1169  info->throttle_timestamp = 0;
1170  return true; // try again
1171  }
1172 
1173  // Cleanup opened resources
1174  info->origin.Destroy();
1175 
1176  if ((info->error_code != kFailOk) &&
1177  (info->http_error != 0) && (info->http_error != 404))
1178  {
1179  LogCvmfs(kLogS3Fanout, kLogStderr, "S3: HTTP failure %d", info->http_error);
1180  }
1181  return false; // stop transfer
1182 }
1183 
1184 S3FanoutManager::S3FanoutManager(const S3Config &config) : config_(config) {
1185  atomic_init32(&multi_threaded_);
1189 
1190  int retval;
1191  jobs_todo_lock_ =
1192  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1193  retval = pthread_mutex_init(jobs_todo_lock_, NULL);
1194  assert(retval == 0);
1196  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1197  retval = pthread_mutex_init(curl_handle_lock_, NULL);
1198  assert(retval == 0);
1199 
1200  active_requests_ = new set<JobInfo *>;
1201  pool_handles_idle_ = new set<CURL *>;
1202  pool_handles_inuse_ = new set<CURL *>;
1203  curl_sharehandles_ = new map<CURL *, S3FanOutDnsEntry *>;
1204  sharehandles_ = new set<S3FanOutDnsEntry *>;
1208  assert(NULL != available_jobs_);
1209 
1210  statistics_ = new Statistics();
1211  user_agent_ = new string();
1212  *user_agent_ = "User-Agent: cvmfs " + string(VERSION);
1214 
1215  CURLcode cretval = curl_global_init(CURL_GLOBAL_ALL);
1216  assert(cretval == CURLE_OK);
1217  curl_multi_ = curl_multi_init();
1218  assert(curl_multi_ != NULL);
1219  CURLMcode mretval;
1220  mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION,
1222  assert(mretval == CURLM_OK);
1223  mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1224  static_cast<void *>(this));
1225  assert(mretval == CURLM_OK);
1226  mretval = curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1228  assert(mretval == CURLM_OK);
1229 
1230  prng_.InitLocaltime();
1231 
1232  thread_upload_ = 0;
1234  is_curl_debug_ = (getenv("_CVMFS_CURL_DEBUG") != NULL);
1235 
1236  // Parsing environment variables
1237  if ((getenv("CVMFS_IPV4_ONLY") != NULL) &&
1238  (strlen(getenv("CVMFS_IPV4_ONLY")) > 0)) {
1239  opt_ipv4_only_ = true;
1240  } else {
1241  opt_ipv4_only_ = false;
1242  }
1243 
1245 
1246  watch_fds_ = static_cast<struct pollfd *>(smalloc(4 * sizeof(struct pollfd)));
1247  watch_fds_size_ = 4;
1248  watch_fds_inuse_ = 0;
1249 
1251 }
1252 
1254  pthread_mutex_destroy(jobs_todo_lock_);
1255  free(jobs_todo_lock_);
1256  pthread_mutex_destroy(curl_handle_lock_);
1257  free(curl_handle_lock_);
1258 
1259  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1260  // Shutdown I/O thread
1261  char buf = 'T';
1262  WritePipe(pipe_terminate_[1], &buf, 1);
1263  pthread_join(thread_upload_, NULL);
1264  }
1268 
1269  set<CURL *>::iterator i = pool_handles_idle_->begin();
1270  const set<CURL *>::const_iterator iEnd = pool_handles_idle_->end();
1271  for (; i != iEnd; ++i) {
1272  curl_easy_cleanup(*i);
1273  }
1274 
1275  set<S3FanOutDnsEntry *>::iterator is = sharehandles_->begin();
1276  const set<S3FanOutDnsEntry *>::const_iterator isEnd = sharehandles_->end();
1277  for (; is != isEnd; ++is) {
1278  curl_share_cleanup((*is)->sharehandle);
1279  curl_slist_free_all((*is)->clist);
1280  delete *is;
1281  }
1282  pool_handles_idle_->clear();
1283  curl_sharehandles_->clear();
1284  sharehandles_->clear();
1285  delete active_requests_;
1286  delete pool_handles_idle_;
1287  delete pool_handles_inuse_;
1288  delete curl_sharehandles_;
1289  delete sharehandles_;
1290  delete user_agent_;
1291  curl_multi_cleanup(curl_multi_);
1292 
1293  delete statistics_;
1294 
1295  delete available_jobs_;
1296 
1297  curl_global_cleanup();
1298 }
1299 
1304  LogCvmfs(kLogS3Fanout, kLogDebug, "S3FanoutManager spawned");
1305 
1306  int retval = pthread_create(&thread_upload_, NULL, MainUpload,
1307  static_cast<void *>(this));
1308  assert(retval == 0);
1309 
1310  atomic_inc32(&multi_threaded_);
1311 }
1312 
1314  return *statistics_;
1315 }
1316 
1322  WritePipe(pipe_jobs_[1], &info, sizeof(info));
1323 }
1324 
1329  WritePipe(pipe_completed_[1], &info, sizeof(info));
1330 }
1331 
1336  JobInfo *info;
1337  ReadPipe(pipe_completed_[0], &info, sizeof(info));
1338  return info;
1339 }
1340 
1341 //------------------------------------------------------------------------------
1342 
1343 
1344 string Statistics::Print() const {
1345  return
1346  "Transferred Bytes: " +
1347  StringifyInt(uint64_t(transferred_bytes)) + "\n" +
1348  "Transfer duration: " +
1349  StringifyInt(uint64_t(transfer_time)) + " s\n" +
1350  "Number of requests: " +
1351  StringifyInt(num_requests) + "\n" +
1352  "Number of retries: " +
1353  StringifyInt(num_retries) + "\n";
1354 }
1355 
1356 } // namespace s3fanout
#define LogCvmfs(source, mask,...)
Definition: logging.h:20
unsigned throttle_ms
Definition: s3fanout.h:152
const S3Config config_
Definition: s3fanout.h:279
const char * Code2Ascii(const ObjectFetcherFailures::Failures error)
pthread_mutex_t * jobs_todo_lock_
Definition: s3fanout.h:236
atomic_int32 multi_threaded_
Definition: s3fanout.h:303
std::string ip
Definition: s3fanout.h:163
vector< string > SplitString(const string &str, const char delim, const unsigned max_chunks)
Definition: string.cc:288
pthread_mutex_t * curl_handle_lock_
Definition: s3fanout.h:237
string Sha256String(const string &content)
Definition: hash.cc:452
std::set< CURL * > * pool_handles_idle_
Definition: s3fanout.h:288
struct curl_slist * http_headers
Definition: s3fanout.h:143
static size_t CallbackCurlBody(char *, size_t size, size_t nmemb, void *)
Definition: s3fanout.cc:152
#define PANIC(...)
Definition: exception.h:26
std::string IsoTimestamp()
Definition: string.cc:149
dns::CaresResolver * resolver_
Definition: s3fanout.h:292
string Sha256Mem(const unsigned char *buffer, const unsigned buffer_size)
Definition: hash.cc:442
string Trim(const string &raw, bool trim_newline)
Definition: string.cc:421
CURLSH * sharehandle
Definition: s3fanout.h:166
double transferred_bytes
Definition: s3fanout.h:76
void InitLocaltime()
Definition: prng.h:36
bool IsHttpUrl(const std::string &path)
Definition: posix.cc:204
void Hmac(const string &key, const unsigned char *buffer, const unsigned buffer_size, Any *any_digest)
Definition: hash.cc:274
perf::Statistics * statistics_
Definition: repository.h:138
static void * MainUpload(void *data)
Definition: s3fanout.cc:232
std::string Hmac256(const std::string &key, const std::string &content, bool raw_output)
Definition: hash.cc:458
std::string dns_name
Definition: s3fanout.h:162
const std::string object_key
Definition: s3fanout.h:108
assert((mem||(size==0))&&"Out Of Memory")
const std::set< std::string > & ipv4_addresses() const
Definition: dns.h:111
std::string MkCompleteHostname()
Definition: s3fanout.h:271
void PushNewJob(JobInfo *info)
Definition: s3fanout.cc:1320
Algorithms algorithm
Definition: hash.h:124
SynchronizingCounter< uint32_t > Semaphore
Definition: s3fanout.h:172
unsigned int counter
Definition: s3fanout.h:161
std::string Print() const
Definition: s3fanout.cc:1344
bool Debase64(const string &data, string *decoded)
Definition: string.cc:534
void MakePipe(int pipe_fd[2])
Definition: posix.cc:525
unsigned char digest[digest_size_]
Definition: hash.h:123
struct curl_slist * clist
Definition: s3fanout.h:165
std::string * user_agent_
Definition: s3fanout.h:294
Failures error_code
Definition: s3fanout.h:146
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:180
unsigned backoff_ms
Definition: s3fanout.h:150
unsigned int max_available_jobs_
Definition: s3fanout.h:324
std::string RfcTimestamp()
Definition: string.cc:127
int64_t String2Int64(const string &value)
Definition: string.cc:222
unsigned GetDigestSize() const
Definition: hash.h:167
std::set< JobInfo * > * active_requests_
Definition: s3fanout.h:286
void PushCompletedJob(JobInfo *info)
Definition: s3fanout.cc:1328
uint64_t payload_size
Definition: s3fanout.h:144
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:285
bool HasSuffix(const std::string &str, const std::string &suffix, const bool ignore_case)
Definition: string.cc:279
void UseSystemCertificatePath()
Definition: ssl.cc:68
void SetUrlOptions(JobInfo *info) const
Definition: s3fanout.cc:983
Definition: dns.h:90
CURL * AcquireCurlHandle() const
Definition: s3fanout.cc:384
uint64_t throttle_timestamp
Definition: s3fanout.h:154
Semaphore * available_jobs_
Definition: s3fanout.h:325
static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, void *userp, void *socketp)
Definition: s3fanout.cc:162
UniquePtr< FileBackedBuffer > origin
Definition: s3fanout.h:110
void ReleaseCurlHandle(JobInfo *info, CURL *handle) const
Definition: s3fanout.cc:417
char * errorbuffer
Definition: s3fanout.h:155
string StringifyInt(const int64_t value)
Definition: string.cc:78
struct pollfd * watch_fds_
Definition: s3fanout.h:305
uint64_t platform_monotonic_time()
std::string ExtractPort(const std::string &url)
Definition: dns.cc:125
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:265
JobInfo * PopCompletedJob()
Definition: s3fanout.cc:1335
std::string ExtractHost(const std::string &url)
Definition: dns.cc:110
unsigned char num_retries
Definition: s3fanout.h:148
std::string port
Definition: s3fanout.h:164
std::string complete_hostname_
Definition: s3fanout.h:280
uint64_t num_requests
Definition: s3fanout.h:78
void HashMem(const unsigned char *buffer, const unsigned buffer_size, Any *any_digest)
Definition: hash.cc:255
string Base64(const string &data)
Definition: string.cc:474
RequestType request
Definition: s3fanout.h:145
uint64_t String2Uint64(const string &value)
Definition: string.cc:228
double transfer_time
Definition: s3fanout.h:77
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
Definition: s3fanout.cc:1085
uint64_t num_retries
Definition: s3fanout.h:79
Definition: mutex.h:42
static CaresResolver * Create(const bool ipv4_only, const unsigned retries, const unsigned timeout_ms)
Definition: dns.cc:733
std::set< S3FanOutDnsEntry * > * sharehandles_
Definition: s3fanout.h:290
SslCertificateStore ssl_certificate_store_
Definition: s3fanout.h:339
const Statistics & GetStatistics()
Definition: s3fanout.cc:1313
void SafeSleepMs(const unsigned ms)
Definition: posix.cc:1918
Statistics * statistics_
Definition: s3fanout.h:329
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:534
uint64_t timestamp_last_throttle_report_
Definition: s3fanout.h:332
static void size_t size
Definition: smalloc.h:47
std::set< CURL * > * pool_handles_inuse_
Definition: s3fanout.h:289
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:546
Definition: s3fanout.h:158
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:584
std::map< CURL *, S3FanOutDnsEntry * > * curl_sharehandles_
Definition: s3fanout.h:291
CURL * curl_handle
Definition: s3fanout.h:142
void Destroy()
Definition: pointer.h:45
Failures InitializeRequest(JobInfo *info, CURL *handle) const
Definition: s3fanout.cc:855