CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
s3fanout.cc
Go to the documentation of this file.
1 
7 #include "s3fanout.h"
8 
9 #include <pthread.h>
10 
11 #include <algorithm>
12 #include <cassert>
13 #include <cerrno>
14 #include <utility>
15 
16 #include "upload_facility.h"
17 #include "util/concurrency.h"
18 #include "util/exception.h"
19 #include "util/platform.h"
20 #include "util/posix.h"
21 #include "util/string.h"
22 
23 using namespace std; // NOLINT
24 
25 namespace s3fanout {
26 
27 const char *S3FanoutManager::kCacheControlCas = "Cache-Control: max-age=259200";
28 const char
29  *S3FanoutManager::kCacheControlDotCvmfs = "Cache-Control: max-age=61";
30 const unsigned S3FanoutManager::kDefault429ThrottleMs = 250;
31 const unsigned S3FanoutManager::kMax429ThrottleMs = 10000;
32 const unsigned S3FanoutManager::kThrottleReportIntervalSec = 10;
33 const unsigned S3FanoutManager::kDefaultHTTPPort = 80;
34 const unsigned S3FanoutManager::kDefaultHTTPSPort = 443;
35 
36 
40 void S3FanoutManager::DetectThrottleIndicator(const std::string &header,
41  JobInfo *info) {
42  std::string value_str;
43  if (HasPrefix(header, "retry-after:", true))
44  value_str = header.substr(12);
45  if (HasPrefix(header, "x-retry-in:", true))
46  value_str = header.substr(11);
47 
48  value_str = Trim(value_str, true /* trim_newline */);
49  if (!value_str.empty()) {
50  unsigned value_numeric = String2Uint64(value_str);
51  unsigned value_ms = HasSuffix(value_str, "ms", true /* ignore_case */)
52  ? value_numeric
53  : (value_numeric * 1000);
54  if (value_ms > 0)
55  info->throttle_ms = std::min(value_ms, kMax429ThrottleMs);
56  }
57 }
58 
59 
63 static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb,
64  void *info_link) {
65  const size_t num_bytes = size * nmemb;
66  const string header_line(static_cast<const char *>(ptr), num_bytes);
67  JobInfo *info = static_cast<JobInfo *>(info_link);
68 
69  // Check for http status code errors
70  if (HasPrefix(header_line, "HTTP/1.", false)) {
71  if (header_line.length() < 10)
72  return 0;
73 
74  unsigned i;
75  for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {
76  }
77 
78  if (header_line[i] == '2') {
79  return num_bytes;
80  } else {
81  LogCvmfs(kLogS3Fanout, kLogDebug, "http status error code [info %p]: %s",
82  info, header_line.c_str());
83  if (header_line.length() < i + 3) {
84  LogCvmfs(kLogS3Fanout, kLogStderr, "S3: invalid HTTP response '%s'",
85  header_line.c_str());
86  info->error_code = kFailOther;
87  return 0;
88  }
89  info->http_error = String2Int64(string(&header_line[i], 3));
90 
91  switch (info->http_error) {
92  case 429:
93  info->error_code = kFailRetry;
94  info->throttle_ms = S3FanoutManager::kDefault429ThrottleMs;
96  return num_bytes;
97  case 503:
98  case 502: // Can happen if the S3 gateway-backend connection breaks
99  case 500: // sometimes see this as a transient error from S3
101  break;
102  case 501:
103  case 400:
104  info->error_code = kFailBadRequest;
105  break;
106  case 403:
107  info->error_code = kFailForbidden;
108  break;
109  case 404:
110  info->error_code = kFailNotFound;
111  return num_bytes;
112  default:
113  info->error_code = kFailOther;
114  }
115  return 0;
116  }
117  }
118 
119  if (info->error_code == kFailRetry) {
120  S3FanoutManager::DetectThrottleIndicator(header_line, info);
121  }
122 
123  return num_bytes;
124 }
125 
126 
130 static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb,
131  void *info_link) {
132  const size_t num_bytes = size * nmemb;
133  JobInfo *info = static_cast<JobInfo *>(info_link);
134 
135  LogCvmfs(kLogS3Fanout, kLogDebug, "Data callback with %zu bytes", num_bytes);
136 
137  if (num_bytes == 0)
138  return 0;
139 
140  uint64_t read_bytes = info->origin->Read(ptr, num_bytes);
141 
142  LogCvmfs(kLogS3Fanout, kLogDebug, "source buffer pushed out %lu bytes",
143  read_bytes);
144 
145  return read_bytes;
146 }
147 
148 
152 static size_t CallbackCurlBody(char * /*ptr*/, size_t size, size_t nmemb,
153  void * /*userdata*/) {
154  return size * nmemb;
155 }
156 
157 
161 int S3FanoutManager::CallbackCurlSocket(CURL *easy, curl_socket_t s, int action,
162  void *userp, void *socketp) {
163  S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(userp);
165  "CallbackCurlSocket called with easy "
166  "handle %p, socket %d, action %d, up %p, "
167  "sp %p, fds_inuse %d, jobs %d",
168  easy, s, action, userp, socketp, s3fanout_mgr->watch_fds_inuse_,
169  s3fanout_mgr->available_jobs_->Get());
170  if (action == CURL_POLL_NONE)
171  return 0;
172 
173  // Find s in watch_fds_
174  // First 2 fds are job and terminate pipes (not curl related)
175  unsigned index;
176  for (index = 2; index < s3fanout_mgr->watch_fds_inuse_; ++index) {
177  if (s3fanout_mgr->watch_fds_[index].fd == s)
178  break;
179  }
180  // Or create newly
181  if (index == s3fanout_mgr->watch_fds_inuse_) {
182  // Extend array if necessary
183  if (s3fanout_mgr->watch_fds_inuse_ == s3fanout_mgr->watch_fds_size_) {
184  s3fanout_mgr->watch_fds_size_ *= 2;
185  s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
186  srealloc(s3fanout_mgr->watch_fds_,
187  s3fanout_mgr->watch_fds_size_ * sizeof(struct pollfd)));
188  }
189  s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].fd = s;
190  s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].events = 0;
191  s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].revents = 0;
192  s3fanout_mgr->watch_fds_inuse_++;
193  }
194 
195  switch (action) {
196  case CURL_POLL_IN:
197  s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI;
198  break;
199  case CURL_POLL_OUT:
200  s3fanout_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND;
201  break;
202  case CURL_POLL_INOUT:
203  s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI | POLLOUT
204  | POLLWRBAND;
205  break;
206  case CURL_POLL_REMOVE:
207  if (index < s3fanout_mgr->watch_fds_inuse_ - 1)
208  s3fanout_mgr
209  ->watch_fds_[index] = s3fanout_mgr->watch_fds_
210  [s3fanout_mgr->watch_fds_inuse_ - 1];
211  s3fanout_mgr->watch_fds_inuse_--;
212  // Shrink array if necessary
213  if ((s3fanout_mgr->watch_fds_inuse_ > s3fanout_mgr->watch_fds_max_)
214  && (s3fanout_mgr->watch_fds_inuse_
215  < s3fanout_mgr->watch_fds_size_ / 2)) {
216  s3fanout_mgr->watch_fds_size_ /= 2;
217  s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
218  srealloc(s3fanout_mgr->watch_fds_,
219  s3fanout_mgr->watch_fds_size_ * sizeof(struct pollfd)));
220  }
221  break;
222  default:
223  PANIC(NULL);
224  }
225 
226  return 0;
227 }
228 
229 
233 void *S3FanoutManager::MainUpload(void *data) {
234  LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread started");
235  S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(data);
236 
237  s3fanout_mgr->InitPipeWatchFds();
238 
239  // Don't schedule more jobs into the multi handle than the maximum number of
240  // parallel connections. This should prevent starvation and thus a timeout
241  // of the authorization header (CVM-1339).
242  unsigned jobs_in_flight = 0;
243 
244  while (true) {
245  // Check events with 100ms timeout
246  int timeout_ms = 100;
247  int retval = poll(s3fanout_mgr->watch_fds_, s3fanout_mgr->watch_fds_inuse_,
248  timeout_ms);
249  if (retval == 0) {
250  // Handle timeout
251  int still_running = 0;
252  retval = curl_multi_socket_action(
253  s3fanout_mgr->curl_multi_, CURL_SOCKET_TIMEOUT, 0, &still_running);
254  if (retval != CURLM_OK) {
255  LogCvmfs(kLogS3Fanout, kLogStderr, "Error, timeout due to: %d", retval);
256  assert(retval == CURLM_OK);
257  }
258  } else if (retval < 0) {
259  assert(errno == EINTR);
260  continue;
261  }
262 
263  // Terminate I/O thread
264  if (s3fanout_mgr->watch_fds_[0].revents)
265  break;
266 
267  // New job incoming
268  if (s3fanout_mgr->watch_fds_[1].revents) {
269  s3fanout_mgr->watch_fds_[1].revents = 0;
270  JobInfo *info;
271  ReadPipe(s3fanout_mgr->pipe_jobs_[0], &info, sizeof(info));
272  CURL *handle = s3fanout_mgr->AcquireCurlHandle();
273  if (handle == NULL) {
274  PANIC(kLogStderr, "Failed to acquire CURL handle.");
275  }
276  s3fanout::Failures init_failure = s3fanout_mgr->InitializeRequest(info,
277  handle);
278  if (init_failure != s3fanout::kFailOk) {
280  "Failed to initialize CURL handle (error: %d - %s | errno: %d)",
281  init_failure, Code2Ascii(init_failure), errno);
282  }
283  s3fanout_mgr->SetUrlOptions(info);
284 
285  curl_multi_add_handle(s3fanout_mgr->curl_multi_, handle);
286  s3fanout_mgr->active_requests_->insert(info);
287  jobs_in_flight++;
288  int still_running = 0, retval = 0;
289  retval = curl_multi_socket_action(
290  s3fanout_mgr->curl_multi_, CURL_SOCKET_TIMEOUT, 0, &still_running);
291 
292  LogCvmfs(kLogS3Fanout, kLogDebug, "curl_multi_socket_action: %d - %d",
293  retval, still_running);
294  }
295 
296 
297  // Activity on curl sockets
298  // Within this loop the curl_multi_socket_action() may cause socket(s)
299  // to be removed from watch_fds_. If a socket is removed it is replaced
300  // by the socket at the end of the array and the inuse count is decreased.
301  // Therefore loop over the array in reverse order.
302  // First 2 fds are job and terminate pipes (not curl related)
303  for (int32_t i = s3fanout_mgr->watch_fds_inuse_ - 1; i >= 2; --i) {
304  if (static_cast<uint32_t>(i) >= s3fanout_mgr->watch_fds_inuse_) {
305  continue;
306  }
307  if (s3fanout_mgr->watch_fds_[i].revents) {
308  int ev_bitmask = 0;
309  if (s3fanout_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI))
310  ev_bitmask |= CURL_CSELECT_IN;
311  if (s3fanout_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
312  ev_bitmask |= CURL_CSELECT_OUT;
313  if (s3fanout_mgr->watch_fds_[i].revents
314  & (POLLERR | POLLHUP | POLLNVAL))
315  ev_bitmask |= CURL_CSELECT_ERR;
316  s3fanout_mgr->watch_fds_[i].revents = 0;
317 
318  int still_running = 0;
319  retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
320  s3fanout_mgr->watch_fds_[i].fd,
321  ev_bitmask,
322  &still_running);
323  }
324  }
325 
326  // Check if transfers are completed
327  CURLMsg *curl_msg;
328  int msgs_in_queue;
329  while ((curl_msg = curl_multi_info_read(s3fanout_mgr->curl_multi_,
330  &msgs_in_queue))) {
331  assert(curl_msg->msg == CURLMSG_DONE);
332 
333  s3fanout_mgr->statistics_->num_requests++;
334  JobInfo *info;
335  CURL *easy_handle = curl_msg->easy_handle;
336  int curl_error = curl_msg->data.result;
337  curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
338 
339  curl_multi_remove_handle(s3fanout_mgr->curl_multi_, easy_handle);
340  if (s3fanout_mgr->VerifyAndFinalize(curl_error, info)) {
341  curl_multi_add_handle(s3fanout_mgr->curl_multi_, easy_handle);
342  int still_running = 0;
343  curl_multi_socket_action(
344  s3fanout_mgr->curl_multi_, CURL_SOCKET_TIMEOUT, 0, &still_running);
345  } else {
346  // Return easy handle into pool and write result back
347  jobs_in_flight--;
348  s3fanout_mgr->active_requests_->erase(info);
349  s3fanout_mgr->ReleaseCurlHandle(info, easy_handle);
350  s3fanout_mgr->available_jobs_->Decrement();
351 
352  // Add to list of completed jobs
353  s3fanout_mgr->PushCompletedJob(info);
354  }
355  }
356  }
357 
358  set<CURL *>::iterator i = s3fanout_mgr->pool_handles_inuse_->begin();
359  const set<CURL *>::const_iterator i_end = s3fanout_mgr->pool_handles_inuse_
360  ->end();
361  for (; i != i_end; ++i) {
362  curl_multi_remove_handle(s3fanout_mgr->curl_multi_, *i);
363  curl_easy_cleanup(*i);
364  }
365  s3fanout_mgr->pool_handles_inuse_->clear();
366  free(s3fanout_mgr->watch_fds_);
367 
368  LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread terminated");
369  return NULL;
370 }
371 
372 
377 CURL *S3FanoutManager::AcquireCurlHandle() const {
378  CURL *handle;
379 
380  MutexLockGuard guard(curl_handle_lock_);
381 
382  if (pool_handles_idle_->empty()) {
383  CURLcode retval;
384 
385  // Create a new handle
386  handle = curl_easy_init();
387  assert(handle != NULL);
388 
389  // Other settings
390  retval = curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
391  assert(retval == CURLE_OK);
392  retval = curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION,
394  assert(retval == CURLE_OK);
395  retval = curl_easy_setopt(handle, CURLOPT_READFUNCTION, CallbackCurlData);
396  assert(retval == CURLE_OK);
397  retval = curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlBody);
398  assert(retval == CURLE_OK);
399  } else {
400  handle = *(pool_handles_idle_->begin());
401  pool_handles_idle_->erase(pool_handles_idle_->begin());
402  }
403 
404  pool_handles_inuse_->insert(handle);
405 
406  return handle;
407 }
408 
409 
410 void S3FanoutManager::ReleaseCurlHandle(JobInfo *info, CURL *handle) const {
411  if (info->http_headers) {
412  curl_slist_free_all(info->http_headers);
413  info->http_headers = NULL;
414  }
415 
416  MutexLockGuard guard(curl_handle_lock_);
417 
418  set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
419  assert(elem != pool_handles_inuse_->end());
420 
421  if (pool_handles_idle_->size() > config_.pool_max_handles) {
422  CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, NULL);
423  assert(retval == CURLE_OK);
424  curl_easy_cleanup(handle);
425  std::map<CURL *, S3FanOutDnsEntry *>::size_type
426  retitems = curl_sharehandles_->erase(handle);
427  assert(retitems == 1);
428  } else {
429  pool_handles_idle_->insert(handle);
430  }
431 
432  pool_handles_inuse_->erase(elem);
433 }
434 
435 void S3FanoutManager::InitPipeWatchFds() {
436  assert(watch_fds_inuse_ == 0);
437  assert(watch_fds_size_ >= 2);
438  watch_fds_[0].fd = pipe_terminate_[0];
439  watch_fds_[0].events = POLLIN | POLLPRI;
440  watch_fds_[0].revents = 0;
441  ++watch_fds_inuse_;
442  watch_fds_[1].fd = pipe_jobs_[0];
443  watch_fds_[1].events = POLLIN | POLLPRI;
444  watch_fds_[1].revents = 0;
445  ++watch_fds_inuse_;
446 }
447 
452 bool S3FanoutManager::MkV2Authz(const JobInfo &info,
453  vector<string> *headers) const {
454  string payload_hash;
455  bool retval = MkPayloadHash(info, &payload_hash);
456  if (!retval)
457  return false;
458  string content_type = GetContentType(info);
459  string request = GetRequestString(info);
460 
461  string timestamp = RfcTimestamp();
462  string to_sign = request + "\n" + payload_hash + "\n" + content_type + "\n"
463  + timestamp + "\n";
464  if (config_.x_amz_acl != "") {
465  to_sign += "x-amz-acl:" + config_.x_amz_acl + "\n" + // default ACL
466  "/" + config_.bucket + "/" + info.object_key;
467  }
468  LogCvmfs(kLogS3Fanout, kLogDebug, "%s string to sign for: %s",
469  request.c_str(), info.object_key.c_str());
470 
471  shash::Any hmac;
472  hmac.algorithm = shash::kSha1;
473  shash::Hmac(config_.secret_key,
474  reinterpret_cast<const unsigned char *>(to_sign.data()),
475  to_sign.length(), &hmac);
476 
477  headers->push_back("Authorization: AWS " + config_.access_key + ":"
478  + Base64(string(reinterpret_cast<char *>(hmac.digest),
479  hmac.GetDigestSize())));
480  headers->push_back("Date: " + timestamp);
481  headers->push_back("X-Amz-Acl: " + config_.x_amz_acl);
482  if (!payload_hash.empty())
483  headers->push_back("Content-MD5: " + payload_hash);
484  if (!content_type.empty())
485  headers->push_back("Content-Type: " + content_type);
486  return true;
487 }
488 
489 
490 string S3FanoutManager::GetUriEncode(const string &val,
491  bool encode_slash) const {
492  string result;
493  const unsigned len = val.length();
494  result.reserve(len);
495  for (unsigned i = 0; i < len; ++i) {
496  char c = val[i];
497  if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')
498  || (c >= '0' && c <= '9') || c == '_' || c == '-' || c == '~'
499  || c == '.') {
500  result.push_back(c);
501  } else if (c == '/') {
502  if (encode_slash) {
503  result += "%2F";
504  } else {
505  result.push_back(c);
506  }
507  } else {
508  result.push_back('%');
509  result.push_back((c / 16) + ((c / 16 <= 9) ? '0' : 'A' - 10));
510  result.push_back((c % 16) + ((c % 16 <= 9) ? '0' : 'A' - 10));
511  }
512  }
513  return result;
514 }
515 
516 
517 string S3FanoutManager::GetAwsV4SigningKey(const string &date) const {
518  if (last_signing_key_.first == date)
519  return last_signing_key_.second;
520 
521  string date_key = shash::Hmac256("AWS4" + config_.secret_key, date, true);
522  string date_region_key = shash::Hmac256(date_key, config_.region, true);
523  string date_region_service_key = shash::Hmac256(date_region_key, "s3", true);
524  string signing_key = shash::Hmac256(date_region_service_key, "aws4_request",
525  true);
526  last_signing_key_.first = date;
527  last_signing_key_.second = signing_key;
528  return signing_key;
529 }
530 
531 
536 bool S3FanoutManager::MkV4Authz(const JobInfo &info,
537  vector<string> *headers) const {
538  string payload_hash;
539  bool retval = MkPayloadHash(info, &payload_hash);
540  if (!retval)
541  return false;
542  string content_type = GetContentType(info);
543  string timestamp = IsoTimestamp();
544  string date = timestamp.substr(0, 8);
545  vector<string> tokens = SplitString(complete_hostname_, ':');
546  assert(tokens.size() <= 2);
547  string canonical_hostname = tokens[0];
548 
549  // if we could split the hostname in two and if the port is *NOT* a default
550  // one
551  if (tokens.size() == 2
552  && !((String2Uint64(tokens[1]) == kDefaultHTTPPort)
553  || (String2Uint64(tokens[1]) == kDefaultHTTPSPort)))
554  canonical_hostname += ":" + tokens[1];
555 
556  string signed_headers;
557  string canonical_headers;
558  if (!content_type.empty()) {
559  signed_headers += "content-type;";
560  headers->push_back("Content-Type: " + content_type);
561  canonical_headers += "content-type:" + content_type + "\n";
562  }
563  if (config_.x_amz_acl != "") {
564  signed_headers += "host;x-amz-acl;x-amz-content-sha256;x-amz-date";
565  } else {
566  signed_headers += "host;x-amz-content-sha256;x-amz-date";
567  }
568  canonical_headers += "host:" + canonical_hostname + "\n";
569  if (config_.x_amz_acl != "") {
570  canonical_headers += "x-amz-acl:" + config_.x_amz_acl + "\n";
571  }
572  canonical_headers += "x-amz-content-sha256:" + payload_hash + "\n"
573  + "x-amz-date:" + timestamp + "\n";
574 
575  string scope = date + "/" + config_.region + "/s3/aws4_request";
576  string uri = config_.dns_buckets
577  ? (string("/") + info.object_key)
578  : (string("/") + config_.bucket + "/" + info.object_key);
579 
580  string canonical_request = GetRequestString(info) + "\n"
581  + GetUriEncode(uri, false) + "\n" + "\n"
582  + canonical_headers + "\n" + signed_headers + "\n"
583  + payload_hash;
584 
585  string hash_request = shash::Sha256String(canonical_request.c_str());
586 
587  string string_to_sign = "AWS4-HMAC-SHA256\n" + timestamp + "\n" + scope + "\n"
588  + hash_request;
589 
590  string signing_key = GetAwsV4SigningKey(date);
591  string signature = shash::Hmac256(signing_key, string_to_sign);
592 
593  headers->push_back("X-Amz-Acl: " + config_.x_amz_acl);
594  headers->push_back("X-Amz-Content-Sha256: " + payload_hash);
595  headers->push_back("X-Amz-Date: " + timestamp);
596  headers->push_back("Authorization: AWS4-HMAC-SHA256 "
597  "Credential="
598  + config_.access_key + "/" + scope
599  + ","
600  "SignedHeaders="
601  + signed_headers
602  + ","
603  "Signature="
604  + signature);
605  return true;
606 }
607 
612 bool S3FanoutManager::MkAzureAuthz(const JobInfo &info,
613  vector<string> *headers) const {
614  string timestamp = RfcTimestamp();
615  string canonical_headers = "x-ms-blob-type:BlockBlob\nx-ms-date:" + timestamp
616  + "\nx-ms-version:2011-08-18";
617  string canonical_resource = "/" + config_.access_key + "/" + config_.bucket
618  + "/" + info.object_key;
619 
620  string string_to_sign;
621  if ((info.request == JobInfo::kReqHeadOnly)
622  || (info.request == JobInfo::kReqHeadPut)
623  || (info.request == JobInfo::kReqDelete)) {
624  string_to_sign = GetRequestString(info) + string("\n\n\n")
625  + "\n\n\n\n\n\n\n\n\n" + canonical_headers + "\n"
626  + canonical_resource;
627  } else {
628  string_to_sign = GetRequestString(info) + string("\n\n\n")
629  + string(StringifyInt(info.origin->GetSize()))
630  + "\n\n\n\n\n\n\n\n\n" + canonical_headers + "\n"
631  + canonical_resource;
632  }
633 
634  string signing_key;
635  int retval = Debase64(config_.secret_key, &signing_key);
636  if (!retval)
637  return false;
638 
639  string signature = shash::Hmac256(signing_key, string_to_sign, true);
640 
641  headers->push_back("x-ms-date: " + timestamp);
642  headers->push_back("x-ms-version: 2011-08-18");
643  headers->push_back("Authorization: SharedKey " + config_.access_key + ":"
644  + Base64(signature));
645  headers->push_back("x-ms-blob-type: BlockBlob");
646  return true;
647 }
648 
649 void S3FanoutManager::InitializeDnsSettingsCurl(CURL *handle,
650  CURLSH *sharehandle,
651  curl_slist *clist) const {
652  CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, sharehandle);
653  assert(retval == CURLE_OK);
654  retval = curl_easy_setopt(handle, CURLOPT_RESOLVE, clist);
655  assert(retval == CURLE_OK);
656 }
657 
658 
659 int S3FanoutManager::InitializeDnsSettings(CURL *handle,
660  std::string host_with_port) const {
661  // Use existing handle
662  std::map<CURL *, S3FanOutDnsEntry *>::const_iterator it = curl_sharehandles_
663  ->find(handle);
664  if (it != curl_sharehandles_->end()) {
665  InitializeDnsSettingsCurl(handle, it->second->sharehandle,
666  it->second->clist);
667  return 0;
668  }
669 
670  // Add protocol information for extraction of fields for DNS
671  if (!IsHttpUrl(host_with_port))
672  host_with_port = config_.protocol + "://" + host_with_port;
673  std::string remote_host = dns::ExtractHost(host_with_port);
674  std::string remote_port = dns::ExtractPort(host_with_port);
675 
676  // If we have the name already resolved, use the least used IP
677  S3FanOutDnsEntry *useme = NULL;
678  unsigned int usemin = UINT_MAX;
679  std::set<S3FanOutDnsEntry *>::iterator its3 = sharehandles_->begin();
680  for (; its3 != sharehandles_->end(); ++its3) {
681  if ((*its3)->dns_name == remote_host) {
682  if (usemin >= (*its3)->counter) {
683  usemin = (*its3)->counter;
684  useme = (*its3);
685  }
686  }
687  }
688  if (useme != NULL) {
689  curl_sharehandles_->insert(
690  std::pair<CURL *, S3FanOutDnsEntry *>(handle, useme));
691  useme->counter++;
692  InitializeDnsSettingsCurl(handle, useme->sharehandle, useme->clist);
693  return 0;
694  }
695 
696  // We need to resolve the hostname
697  // TODO(ssheikki): support ipv6 also... if (opt_ipv4_only_)
698  dns::Host host = resolver_->Resolve(remote_host);
699  set<string> ipv4_addresses = host.ipv4_addresses();
700  std::set<string>::iterator its = ipv4_addresses.begin();
701  S3FanOutDnsEntry *dnse = NULL;
702  for (; its != ipv4_addresses.end(); ++its) {
703  dnse = new S3FanOutDnsEntry();
704  dnse->counter = 0;
705  dnse->dns_name = remote_host;
706  dnse->port = remote_port.size() == 0 ? "80" : remote_port;
707  dnse->ip = *its;
708  dnse->clist = NULL;
709  dnse->clist = curl_slist_append(
710  dnse->clist,
711  (dnse->dns_name + ":" + dnse->port + ":" + dnse->ip).c_str());
712  dnse->sharehandle = curl_share_init();
713  assert(dnse->sharehandle != NULL);
714  CURLSHcode share_retval = curl_share_setopt(
715  dnse->sharehandle, CURLSHOPT_SHARE, CURL_LOCK_DATA_DNS);
716  assert(share_retval == CURLSHE_OK);
717  sharehandles_->insert(dnse);
718  }
719  if (dnse == NULL) {
721  "Error: DNS resolve failed for address '%s'.",
722  remote_host.c_str());
723  assert(dnse != NULL);
724  return -1;
725  }
726  curl_sharehandles_->insert(
727  std::pair<CURL *, S3FanOutDnsEntry *>(handle, dnse));
728  dnse->counter++;
729  InitializeDnsSettingsCurl(handle, dnse->sharehandle, dnse->clist);
730 
731  return 0;
732 }
733 
734 
735 bool S3FanoutManager::MkPayloadHash(const JobInfo &info,
736  string *hex_hash) const {
737  if ((info.request == JobInfo::kReqHeadOnly)
738  || (info.request == JobInfo::kReqHeadPut)
739  || (info.request == JobInfo::kReqDelete)) {
740  switch (config_.authz_method) {
741  case kAuthzAwsV2:
742  hex_hash->clear();
743  break;
744  case kAuthzAwsV4:
745  // Sha256 over empty string
746  *hex_hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b78"
747  "52b855";
748  break;
749  case kAuthzAzure:
750  // no payload hash required for Azure signature
751  hex_hash->clear();
752  break;
753  default:
754  PANIC(NULL);
755  }
756  return true;
757  }
758 
759  // PUT, there is actually payload
760  shash::Any payload_hash(shash::kMd5);
761 
762  unsigned char *data;
763  unsigned int nbytes = info.origin->Data(reinterpret_cast<void **>(&data),
764  info.origin->GetSize(), 0);
765  assert(nbytes == info.origin->GetSize());
766 
767  switch (config_.authz_method) {
768  case kAuthzAwsV2:
769  shash::HashMem(data, nbytes, &payload_hash);
770  *hex_hash = Base64(string(reinterpret_cast<char *>(payload_hash.digest),
771  payload_hash.GetDigestSize()));
772  return true;
773  case kAuthzAwsV4:
774  *hex_hash = shash::Sha256Mem(data, nbytes);
775  return true;
776  case kAuthzAzure:
777  // no payload hash required for Azure signature
778  hex_hash->clear();
779  return true;
780  default:
781  PANIC(NULL);
782  }
783 }
784 
785 string S3FanoutManager::GetRequestString(const JobInfo &info) const {
786  switch (info.request) {
787  case JobInfo::kReqHeadOnly:
788  case JobInfo::kReqHeadPut:
789  return "HEAD";
790  case JobInfo::kReqPutCas:
791  case JobInfo::kReqPutDotCvmfs:
792  case JobInfo::kReqPutHtml:
793  case JobInfo::kReqPutBucket:
794  return "PUT";
795  case JobInfo::kReqDelete:
796  return "DELETE";
797  default:
798  PANIC(NULL);
799  }
800 }
801 
802 
803 string S3FanoutManager::GetContentType(const JobInfo &info) const {
804  switch (info.request) {
805  case JobInfo::kReqHeadOnly:
806  case JobInfo::kReqHeadPut:
807  case JobInfo::kReqDelete:
808  return "";
809  case JobInfo::kReqPutCas:
810  return "application/octet-stream";
811  case JobInfo::kReqPutDotCvmfs:
812  return "application/x-cvmfs";
813  case JobInfo::kReqPutHtml:
814  return "text/html";
815  case JobInfo::kReqPutBucket:
816  return "text/xml";
817  default:
818  PANIC(NULL);
819  }
820 }
821 
822 
827 Failures S3FanoutManager::InitializeRequest(JobInfo *info, CURL *handle) const {
828  // Initialize internal download state
829  info->curl_handle = handle;
830  info->error_code = kFailOk;
831  info->http_error = 0;
832  info->num_retries = 0;
833  info->backoff_ms = 0;
834  info->throttle_ms = 0;
835  info->throttle_timestamp = 0;
836  info->http_headers = NULL;
837  // info->payload_size is needed in S3Uploader::MainCollectResults,
838  // where info->origin is already destroyed.
839  info->payload_size = info->origin->GetSize();
840 
841  InitializeDnsSettings(handle, complete_hostname_);
842 
843  CURLcode retval;
844  if ((info->request == JobInfo::kReqHeadOnly)
845  || (info->request == JobInfo::kReqHeadPut)
846  || (info->request == JobInfo::kReqDelete)) {
847  retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 0);
848  assert(retval == CURLE_OK);
849  retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
850  assert(retval == CURLE_OK);
851 
852  if (info->request == JobInfo::kReqDelete) {
853  retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST,
854  GetRequestString(*info).c_str());
855  assert(retval == CURLE_OK);
856  } else {
857  retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
858  assert(retval == CURLE_OK);
859  }
860  } else {
861  retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
862  assert(retval == CURLE_OK);
863  retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 1);
864  assert(retval == CURLE_OK);
865  retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 0);
866  assert(retval == CURLE_OK);
867  retval = curl_easy_setopt(handle, CURLOPT_INFILESIZE_LARGE,
868  static_cast<curl_off_t>(info->origin->GetSize()));
869  assert(retval == CURLE_OK);
870 
871  if (info->request == JobInfo::kReqPutDotCvmfs) {
872  info->http_headers = curl_slist_append(info->http_headers,
873  kCacheControlDotCvmfs);
874  } else if (info->request == JobInfo::kReqPutCas) {
875  info->http_headers = curl_slist_append(info->http_headers,
876  kCacheControlCas);
877  }
878  }
879 
880  bool retval_b;
881 
882  // Authorization
883  vector<string> authz_headers;
884  switch (config_.authz_method) {
885  case kAuthzAwsV2:
886  retval_b = MkV2Authz(*info, &authz_headers);
887  break;
888  case kAuthzAwsV4:
889  retval_b = MkV4Authz(*info, &authz_headers);
890  break;
891  case kAuthzAzure:
892  retval_b = MkAzureAuthz(*info, &authz_headers);
893  break;
894  default:
895  PANIC(NULL);
896  }
897  if (!retval_b)
898  return kFailLocalIO;
899  for (unsigned i = 0; i < authz_headers.size(); ++i) {
900  info->http_headers = curl_slist_append(info->http_headers,
901  authz_headers[i].c_str());
902  }
903 
904  // Common headers
905  info->http_headers = curl_slist_append(info->http_headers,
906  "Connection: Keep-Alive");
907  info->http_headers = curl_slist_append(info->http_headers, "Pragma:");
908  // No 100-continue
909  info->http_headers = curl_slist_append(info->http_headers, "Expect:");
910  // Strip unnecessary header
911  info->http_headers = curl_slist_append(info->http_headers, "Accept:");
912  info->http_headers = curl_slist_append(info->http_headers,
913  user_agent_->c_str());
914 
915  // Set curl parameters
916  retval = curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
917  assert(retval == CURLE_OK);
918  retval = curl_easy_setopt(handle, CURLOPT_HEADERDATA,
919  static_cast<void *>(info));
920  assert(retval == CURLE_OK);
921  retval = curl_easy_setopt(handle, CURLOPT_READDATA,
922  static_cast<void *>(info));
923  assert(retval == CURLE_OK);
924  retval = curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->http_headers);
925  assert(retval == CURLE_OK);
926  if (opt_ipv4_only_) {
927  retval = curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
928  assert(retval == CURLE_OK);
929  }
930  // Follow HTTP redirects
931  retval = curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L);
932  assert(retval == CURLE_OK);
933 
934  retval = curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, info->errorbuffer);
935  assert(retval == CURLE_OK);
936 
937  if (config_.protocol == "https") {
938  retval = curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 1L);
939  assert(retval == CURLE_OK);
940  retval = curl_easy_setopt(handle, CURLOPT_PROXY_SSL_VERIFYPEER, 1L);
941  assert(retval == CURLE_OK);
942  bool add_cert = ssl_certificate_store_.ApplySslCertificatePath(handle);
943  assert(add_cert);
944  }
945 
946  return kFailOk;
947 }
948 
949 
953 void S3FanoutManager::SetUrlOptions(JobInfo *info) const {
954  CURL *curl_handle = info->curl_handle;
955  CURLcode retval;
956 
957  retval = curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT,
958  config_.opt_timeout_sec);
959  assert(retval == CURLE_OK);
960  retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT,
961  kLowSpeedLimit);
962  assert(retval == CURLE_OK);
963  retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME,
964  config_.opt_timeout_sec);
965  assert(retval == CURLE_OK);
966 
967  if (is_curl_debug_) {
968  retval = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE, 1);
969  assert(retval == CURLE_OK);
970  }
971 
972  string url = MkUrl(info->object_key);
973  retval = curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
974  assert(retval == CURLE_OK);
975 
976  retval = curl_easy_setopt(curl_handle, CURLOPT_PROXY, config_.proxy.c_str());
977  assert(retval == CURLE_OK);
978 }
979 
980 
984 void S3FanoutManager::UpdateStatistics(CURL *handle) {
985  double val;
986 
987  if (curl_easy_getinfo(handle, CURLINFO_SIZE_UPLOAD, &val) == CURLE_OK)
988  statistics_->transferred_bytes += val;
989 }
990 
991 
995 bool S3FanoutManager::CanRetry(const JobInfo *info) {
996  return (info->error_code == kFailHostConnection
997  || info->error_code == kFailHostResolve
999  || info->error_code == kFailRetry)
1000  && (info->num_retries < config_.opt_max_retries);
1001 }
1002 
1003 
1009 void S3FanoutManager::Backoff(JobInfo *info) {
1010  if (info->error_code != kFailRetry)
1011  info->num_retries++;
1012  statistics_->num_retries++;
1013 
1014  if (info->throttle_ms > 0) {
1015  LogCvmfs(kLogS3Fanout, kLogDebug, "throttling for %d ms",
1016  info->throttle_ms);
1017  uint64_t now = platform_monotonic_time();
1018  if ((info->throttle_timestamp + (info->throttle_ms / 1000)) >= now) {
1019  if ((now - timestamp_last_throttle_report_)
1020  > kThrottleReportIntervalSec) {
1022  "Warning: S3 backend throttling %ums "
1023  "(total backoff time so far %lums)",
1024  info->throttle_ms, statistics_->ms_throttled);
1025  timestamp_last_throttle_report_ = now;
1026  }
1027  statistics_->ms_throttled += info->throttle_ms;
1028  SafeSleepMs(info->throttle_ms);
1029  }
1030  } else {
1031  if (info->backoff_ms == 0) {
1032  // Must be != 0
1033  info->backoff_ms = prng_.Next(config_.opt_backoff_init_ms + 1);
1034  } else {
1035  info->backoff_ms *= 2;
1036  }
1037  if (info->backoff_ms > config_.opt_backoff_max_ms)
1038  info->backoff_ms = config_.opt_backoff_max_ms;
1039 
1040  LogCvmfs(kLogS3Fanout, kLogDebug, "backing off for %d ms",
1041  info->backoff_ms);
1042  SafeSleepMs(info->backoff_ms);
1043  }
1044 }
1045 
1046 
1053 bool S3FanoutManager::VerifyAndFinalize(const int curl_error, JobInfo *info) {
1055  "Verify uploaded/tested object %s "
1056  "(curl error %d, info error %d, info request %d)",
1057  info->object_key.c_str(), curl_error, info->error_code,
1058  info->request);
1059  UpdateStatistics(info->curl_handle);
1060 
1061  // Verification and error classification
1062  switch (curl_error) {
1063  case CURLE_OK:
1064  if ((info->error_code != kFailRetry)
1065  && (info->error_code != kFailNotFound)) {
1066  info->error_code = kFailOk;
1067  }
1068  break;
1069  case CURLE_UNSUPPORTED_PROTOCOL:
1070  case CURLE_URL_MALFORMAT:
1071  info->error_code = kFailBadRequest;
1072  break;
1073  case CURLE_COULDNT_RESOLVE_HOST:
1074  info->error_code = kFailHostResolve;
1075  break;
1076  case CURLE_COULDNT_CONNECT:
1077  case CURLE_OPERATION_TIMEDOUT:
1078  case CURLE_SEND_ERROR:
1079  case CURLE_RECV_ERROR:
1081  break;
1082  case CURLE_ABORTED_BY_CALLBACK:
1083  case CURLE_WRITE_ERROR:
1084  // Error set by callback
1085  break;
1086  default:
1088  "unexpected curl error (%d) while trying to upload %s: %s",
1089  curl_error, info->object_key.c_str(), info->errorbuffer);
1090  info->error_code = kFailOther;
1091  break;
1092  }
1093 
1094  // Transform HEAD to PUT request
1095  if ((info->error_code == kFailNotFound)
1096  && (info->request == JobInfo::kReqHeadPut)) {
1097  LogCvmfs(kLogS3Fanout, kLogDebug, "not found: %s, uploading",
1098  info->object_key.c_str());
1099  info->request = JobInfo::kReqPutCas;
1100  curl_slist_free_all(info->http_headers);
1101  info->http_headers = NULL;
1102  s3fanout::Failures init_failure = InitializeRequest(info,
1103  info->curl_handle);
1104 
1105  if (init_failure != s3fanout::kFailOk) {
1106  PANIC(kLogStderr,
1107  "Failed to initialize CURL handle "
1108  "(error: %d - %s | errno: %d)",
1109  init_failure, Code2Ascii(init_failure), errno);
1110  }
1111  SetUrlOptions(info);
1112  // Reset origin
1113  info->origin->Rewind();
1114  return true; // Again, Put
1115  }
1116 
1117  // Determination if failed request should be repeated
1118  bool try_again = false;
1119  if (info->error_code != kFailOk) {
1120  try_again = CanRetry(info);
1121  }
1122  if (try_again) {
1123  if (info->request == JobInfo::kReqPutCas
1124  || info->request == JobInfo::kReqPutDotCvmfs
1125  || info->request == JobInfo::kReqPutHtml) {
1126  LogCvmfs(kLogS3Fanout, kLogDebug, "Trying again to upload %s",
1127  info->object_key.c_str());
1128  // Reset origin
1129  info->origin->Rewind();
1130  }
1131  Backoff(info);
1132  info->error_code = kFailOk;
1133  info->http_error = 0;
1134  info->throttle_ms = 0;
1135  info->backoff_ms = 0;
1136  info->throttle_timestamp = 0;
1137  return true; // try again
1138  }
1139 
1140  // Cleanup opened resources
1141  info->origin.Destroy();
1142 
1143  if ((info->error_code != kFailOk) && (info->http_error != 0)
1144  && (info->http_error != 404)) {
1145  LogCvmfs(kLogS3Fanout, kLogStderr, "S3: HTTP failure %d", info->http_error);
1146  }
1147  return false; // stop transfer
1148 }
1149 
1150 S3FanoutManager::S3FanoutManager(const S3Config &config) : config_(config) {
1151  atomic_init32(&multi_threaded_);
1155 
1156  int retval;
1157  jobs_todo_lock_ = reinterpret_cast<pthread_mutex_t *>(
1158  smalloc(sizeof(pthread_mutex_t)));
1159  retval = pthread_mutex_init(jobs_todo_lock_, NULL);
1160  assert(retval == 0);
1161  curl_handle_lock_ = reinterpret_cast<pthread_mutex_t *>(
1162  smalloc(sizeof(pthread_mutex_t)));
1163  retval = pthread_mutex_init(curl_handle_lock_, NULL);
1164  assert(retval == 0);
1165 
1166  active_requests_ = new set<JobInfo *>;
1167  pool_handles_idle_ = new set<CURL *>;
1168  pool_handles_inuse_ = new set<CURL *>;
1169  curl_sharehandles_ = new map<CURL *, S3FanOutDnsEntry *>;
1170  sharehandles_ = new set<S3FanOutDnsEntry *>;
1174  assert(NULL != available_jobs_);
1175 
1176  statistics_ = new Statistics();
1177  user_agent_ = new string();
1178  *user_agent_ = "User-Agent: cvmfs " + string(CVMFS_VERSION);
1180 
1181  CURLcode cretval = curl_global_init(CURL_GLOBAL_ALL);
1182  assert(cretval == CURLE_OK);
1183  curl_multi_ = curl_multi_init();
1184  assert(curl_multi_ != NULL);
1185  CURLMcode mretval;
1186  mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION,
1188  assert(mretval == CURLM_OK);
1189  mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1190  static_cast<void *>(this));
1191  assert(mretval == CURLM_OK);
1192  mretval = curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1194  assert(mretval == CURLM_OK);
1195 
1196  prng_.InitLocaltime();
1197 
1198  thread_upload_ = 0;
1200  is_curl_debug_ = (getenv("_CVMFS_CURL_DEBUG") != NULL);
1201 
1202  // Parsing environment variables
1203  if ((getenv("CVMFS_IPV4_ONLY") != NULL)
1204  && (strlen(getenv("CVMFS_IPV4_ONLY")) > 0)) {
1205  opt_ipv4_only_ = true;
1206  } else {
1207  opt_ipv4_only_ = false;
1208  }
1209 
1211 
1212  watch_fds_ = static_cast<struct pollfd *>(smalloc(4 * sizeof(struct pollfd)));
1213  watch_fds_size_ = 4;
1214  watch_fds_inuse_ = 0;
1215 
1217 }
1218 
1220  pthread_mutex_destroy(jobs_todo_lock_);
1221  free(jobs_todo_lock_);
1222  pthread_mutex_destroy(curl_handle_lock_);
1223  free(curl_handle_lock_);
1224 
1225  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1226  // Shutdown I/O thread
1227  char buf = 'T';
1228  WritePipe(pipe_terminate_[1], &buf, 1);
1229  pthread_join(thread_upload_, NULL);
1230  }
1234 
1235  set<CURL *>::iterator i = pool_handles_idle_->begin();
1236  const set<CURL *>::const_iterator iEnd = pool_handles_idle_->end();
1237  for (; i != iEnd; ++i) {
1238  curl_easy_cleanup(*i);
1239  }
1240 
1241  set<S3FanOutDnsEntry *>::iterator is = sharehandles_->begin();
1242  const set<S3FanOutDnsEntry *>::const_iterator isEnd = sharehandles_->end();
1243  for (; is != isEnd; ++is) {
1244  curl_share_cleanup((*is)->sharehandle);
1245  curl_slist_free_all((*is)->clist);
1246  delete *is;
1247  }
1248  pool_handles_idle_->clear();
1249  curl_sharehandles_->clear();
1250  sharehandles_->clear();
1251  delete active_requests_;
1252  delete pool_handles_idle_;
1253  delete pool_handles_inuse_;
1254  delete curl_sharehandles_;
1255  delete sharehandles_;
1256  delete user_agent_;
1257  curl_multi_cleanup(curl_multi_);
1258 
1259  delete statistics_;
1260 
1261  delete available_jobs_;
1262 
1263  curl_global_cleanup();
1264 }
1265 
1270  LogCvmfs(kLogS3Fanout, kLogDebug, "S3FanoutManager spawned");
1271 
1272  int retval = pthread_create(&thread_upload_, NULL, MainUpload,
1273  static_cast<void *>(this));
1274  assert(retval == 0);
1275 
1276  atomic_inc32(&multi_threaded_);
1277 }
1278 
1280 
1286  WritePipe(pipe_jobs_[1], &info, sizeof(info));
1287 }
1288 
1293  WritePipe(pipe_completed_[1], &info, sizeof(info));
1294 }
1295 
1300  JobInfo *info;
1301  ReadPipe(pipe_completed_[0], &info, sizeof(info));
1302  return info;
1303 }
1304 
1305 //------------------------------------------------------------------------------
1306 
1307 
1308 string Statistics::Print() const {
1309  return "Transferred Bytes: " + StringifyInt(uint64_t(transferred_bytes))
1310  + "\n" + "Transfer duration: " + StringifyInt(uint64_t(transfer_time))
1311  + " s\n" + "Number of requests: " + StringifyInt(num_requests) + "\n"
1312  + "Number of retries: " + StringifyInt(num_retries) + "\n";
1313 }
1314 
1315 } // namespace s3fanout
unsigned throttle_ms
Definition: s3fanout.h:146
const S3Config config_
Definition: s3fanout.h:277
const char * Code2Ascii(const ObjectFetcherFailures::Failures error)
pthread_mutex_t * jobs_todo_lock_
Definition: s3fanout.h:237
atomic_int32 multi_threaded_
Definition: s3fanout.h:301
std::string ip
Definition: s3fanout.h:162
pthread_mutex_t * curl_handle_lock_
Definition: s3fanout.h:238
string Sha256String(const string &content)
Definition: hash.cc:451
std::set< CURL * > * pool_handles_idle_
Definition: s3fanout.h:286
struct curl_slist * http_headers
Definition: s3fanout.h:137
static size_t CallbackCurlBody(char *, size_t size, size_t nmemb, void *)
Definition: s3fanout.cc:152
#define PANIC(...)
Definition: exception.h:29
std::string IsoTimestamp()
Definition: string.cc:167
dns::CaresResolver * resolver_
Definition: s3fanout.h:290
string Trim(const string &raw, bool trim_newline)
Definition: string.cc:466
CURLSH * sharehandle
Definition: s3fanout.h:165
double transferred_bytes
Definition: s3fanout.h:75
void InitLocaltime()
Definition: prng.h:34
bool IsHttpUrl(const std::string &path)
Definition: posix.cc:167
void Hmac(const string &key, const unsigned char *buffer, const unsigned buffer_size, Any *any_digest)
Definition: hash.cc:275
perf::Statistics * statistics_
Definition: repository.h:138
static void * MainUpload(void *data)
Definition: s3fanout.cc:233
std::string dns_name
Definition: s3fanout.h:161
const std::string object_key
Definition: s3fanout.h:107
assert((mem||(size==0))&&"Out Of Memory")
const std::set< std::string > & ipv4_addresses() const
Definition: dns.h:111
std::string MkCompleteHostname()
Definition: s3fanout.h:269
void PushNewJob(JobInfo *info)
Definition: s3fanout.cc:1284
Algorithms algorithm
Definition: hash.h:122
SynchronizingCounter< uint32_t > Semaphore
Definition: s3fanout.h:171
unsigned int counter
Definition: s3fanout.h:160
std::string Print() const
Definition: s3fanout.cc:1308
bool Debase64(const string &data, string *decoded)
Definition: string.cc:598
void MakePipe(int pipe_fd[2])
Definition: posix.cc:487
unsigned char digest[digest_size_]
Definition: hash.h:121
struct curl_slist * clist
Definition: s3fanout.h:164
std::string * user_agent_
Definition: s3fanout.h:292
Failures error_code
Definition: s3fanout.h:140
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:139
unsigned backoff_ms
Definition: s3fanout.h:144
unsigned int max_available_jobs_
Definition: s3fanout.h:322
std::string RfcTimestamp()
Definition: string.cc:145
int64_t String2Int64(const string &value)
Definition: string.cc:234
unsigned GetDigestSize() const
Definition: hash.h:164
std::set< JobInfo * > * active_requests_
Definition: s3fanout.h:284
void PushCompletedJob(JobInfo *info)
Definition: s3fanout.cc:1292
string Sha256Mem(const unsigned char *buffer, const unsigned buffer_size)
Definition: hash.cc:441
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:306
uint64_t payload_size
Definition: s3fanout.h:138
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:254
bool HasSuffix(const std::string &str, const std::string &suffix, const bool ignore_case)
Definition: string.cc:296
void UseSystemCertificatePath()
Definition: ssl.cc:71
void SetUrlOptions(JobInfo *info) const
Definition: s3fanout.cc:953
Definition: dns.h:90
CURL * AcquireCurlHandle() const
Definition: s3fanout.cc:377
uint64_t throttle_timestamp
Definition: s3fanout.h:148
Semaphore * available_jobs_
Definition: s3fanout.h:323
static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, void *userp, void *socketp)
Definition: s3fanout.cc:161
UniquePtr< FileBackedBuffer > origin
Definition: s3fanout.h:109
void ReleaseCurlHandle(JobInfo *info, CURL *handle) const
Definition: s3fanout.cc:410
char * errorbuffer
Definition: s3fanout.h:149
string StringifyInt(const int64_t value)
Definition: string.cc:77
struct pollfd * watch_fds_
Definition: s3fanout.h:303
uint64_t platform_monotonic_time()
std::string ExtractPort(const std::string &url)
Definition: dns.cc:123
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:279
JobInfo * PopCompletedJob()
Definition: s3fanout.cc:1299
std::string ExtractHost(const std::string &url)
Definition: dns.cc:108
unsigned char num_retries
Definition: s3fanout.h:142
std::string port
Definition: s3fanout.h:163
void HashMem(const unsigned char *buffer, const unsigned buffer_size, Any *any_digest)
Definition: hash.cc:257
std::string complete_hostname_
Definition: s3fanout.h:278
uint64_t num_requests
Definition: s3fanout.h:77
string Base64(const string &data)
Definition: string.cc:537
RequestType request
Definition: s3fanout.h:139
uint64_t String2Uint64(const string &value)
Definition: string.cc:240
double transfer_time
Definition: s3fanout.h:76
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
Definition: s3fanout.cc:1053
uint64_t num_retries
Definition: s3fanout.h:78
Definition: mutex.h:42
static CaresResolver * Create(const bool ipv4_only, const unsigned retries, const unsigned timeout_ms)
Definition: dns.cc:710
std::set< S3FanOutDnsEntry * > * sharehandles_
Definition: s3fanout.h:288
SslCertificateStore ssl_certificate_store_
Definition: s3fanout.h:337
const Statistics & GetStatistics()
Definition: s3fanout.cc:1279
std::string Hmac256(const std::string &key, const std::string &content, bool raw_output)
Definition: hash.cc:457
void SafeSleepMs(const unsigned ms)
Definition: posix.cc:2024
Statistics * statistics_
Definition: s3fanout.h:327
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:496
uint64_t timestamp_last_throttle_report_
Definition: s3fanout.h:330
static void size_t size
Definition: smalloc.h:54
std::set< CURL * > * pool_handles_inuse_
Definition: s3fanout.h:287
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:508
Definition: s3fanout.h:152
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:559
std::map< CURL *, S3FanOutDnsEntry * > * curl_sharehandles_
Definition: s3fanout.h:289
CURL * curl_handle
Definition: s3fanout.h:136
void Destroy()
Definition: pointer.h:53
Failures InitializeRequest(JobInfo *info, CURL *handle) const
Definition: s3fanout.cc:827
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545