CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
s3fanout.cc
Go to the documentation of this file.
1 
7 #include "s3fanout.h"
8 
9 #include <pthread.h>
10 
11 #include <algorithm>
12 #include <cassert>
13 #include <cerrno>
14 #include <utility>
15 
16 #include "upload_facility.h"
17 #include "util/concurrency.h"
18 #include "util/exception.h"
19 #include "util/platform.h"
20 #include "util/posix.h"
21 #include "util/string.h"
22 
23 using namespace std; // NOLINT
24 
25 namespace s3fanout {
26 
27 const char *S3FanoutManager::kCacheControlCas = "Cache-Control: max-age=259200";
28 const char
29  *S3FanoutManager::kCacheControlDotCvmfs = "Cache-Control: max-age=61";
30 const unsigned S3FanoutManager::kDefault429ThrottleMs = 250;
31 const unsigned S3FanoutManager::kMax429ThrottleMs = 10000;
32 const unsigned S3FanoutManager::kThrottleReportIntervalSec = 10;
33 const unsigned S3FanoutManager::kDefaultHTTPPort = 80;
34 const unsigned S3FanoutManager::kDefaultHTTPSPort = 443;
35 
36 
40 void S3FanoutManager::DetectThrottleIndicator(const std::string &header,
41  JobInfo *info) {
42  std::string value_str;
43  if (HasPrefix(header, "retry-after:", true))
44  value_str = header.substr(12);
45  if (HasPrefix(header, "x-retry-in:", true))
46  value_str = header.substr(11);
47 
48  value_str = Trim(value_str, true /* trim_newline */);
49  if (!value_str.empty()) {
50  const unsigned value_numeric = String2Uint64(value_str);
51  const unsigned value_ms = HasSuffix(value_str, "ms", true /* ignore_case */)
52  ? value_numeric
53  : (value_numeric * 1000);
54  if (value_ms > 0)
55  info->throttle_ms = std::min(value_ms, kMax429ThrottleMs);
56  }
57 }
58 
59 
63 static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb,
64  void *info_link) {
65  const size_t num_bytes = size * nmemb;
66  const string header_line(static_cast<const char *>(ptr), num_bytes);
67  JobInfo *info = static_cast<JobInfo *>(info_link);
68 
69  // Check for http status code errors
70  if (HasPrefix(header_line, "HTTP/1.", false)) {
71  if (header_line.length() < 10)
72  return 0;
73 
74  unsigned i;
75  for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {
76  }
77 
78  if (header_line[i] == '2') {
79  return num_bytes;
80  } else {
81  LogCvmfs(kLogS3Fanout, kLogDebug, "http status error code [info %p]: %s",
82  info, header_line.c_str());
83  if (header_line.length() < i + 3) {
84  LogCvmfs(kLogS3Fanout, kLogStderr, "S3: invalid HTTP response '%s'",
85  header_line.c_str());
86  info->error_code = kFailOther;
87  return 0;
88  }
89  info->http_error = String2Int64(string(&header_line[i], 3));
90 
91  switch (info->http_error) {
92  case 429:
93  info->error_code = kFailRetry;
94  info->throttle_ms = S3FanoutManager::kDefault429ThrottleMs;
96  return num_bytes;
97  case 503:
98  case 502: // Can happen if the S3 gateway-backend connection breaks
99  case 500: // sometimes see this as a transient error from S3
101  break;
102  case 501:
103  case 400:
104  info->error_code = kFailBadRequest;
105  break;
106  case 403:
107  info->error_code = kFailForbidden;
108  break;
109  case 404:
110  info->error_code = kFailNotFound;
111  return num_bytes;
112  default:
113  info->error_code = kFailOther;
114  }
115  return 0;
116  }
117  }
118 
119  if (info->error_code == kFailRetry) {
120  S3FanoutManager::DetectThrottleIndicator(header_line, info);
121  }
122 
123  return num_bytes;
124 }
125 
126 
130 static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb,
131  void *info_link) {
132  const size_t num_bytes = size * nmemb;
133  JobInfo *info = static_cast<JobInfo *>(info_link);
134 
135  LogCvmfs(kLogS3Fanout, kLogDebug, "Data callback with %zu bytes", num_bytes);
136 
137  if (num_bytes == 0)
138  return 0;
139 
140  const uint64_t read_bytes = info->origin->Read(ptr, num_bytes);
141 
142  LogCvmfs(kLogS3Fanout, kLogDebug, "source buffer pushed out %lu bytes",
143  read_bytes);
144 
145  return read_bytes;
146 }
147 
148 
152 static size_t CallbackCurlBody(char * /*ptr*/, size_t size, size_t nmemb,
153  void * /*userdata*/) {
154  return size * nmemb;
155 }
156 
157 
161 int S3FanoutManager::CallbackCurlSocket(CURL *easy, curl_socket_t s, int action,
162  void *userp, void *socketp) {
163  S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(userp);
165  "CallbackCurlSocket called with easy "
166  "handle %p, socket %d, action %d, up %p, "
167  "sp %p, fds_inuse %d, jobs %d",
168  easy, s, action, userp, socketp, s3fanout_mgr->watch_fds_inuse_,
169  s3fanout_mgr->available_jobs_->Get());
170  if (action == CURL_POLL_NONE)
171  return 0;
172 
173  // Find s in watch_fds_
174  // First 2 fds are job and terminate pipes (not curl related)
175  unsigned index;
176  for (index = 2; index < s3fanout_mgr->watch_fds_inuse_; ++index) {
177  if (s3fanout_mgr->watch_fds_[index].fd == s)
178  break;
179  }
180  // Or create newly
181  if (index == s3fanout_mgr->watch_fds_inuse_) {
182  // Extend array if necessary
183  if (s3fanout_mgr->watch_fds_inuse_ == s3fanout_mgr->watch_fds_size_) {
184  s3fanout_mgr->watch_fds_size_ *= 2;
185  s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
186  srealloc(s3fanout_mgr->watch_fds_,
187  s3fanout_mgr->watch_fds_size_ * sizeof(struct pollfd)));
188  }
189  s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].fd = s;
190  s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].events = 0;
191  s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].revents = 0;
192  s3fanout_mgr->watch_fds_inuse_++;
193  }
194 
195  switch (action) {
196  case CURL_POLL_IN:
197  s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI;
198  break;
199  case CURL_POLL_OUT:
200  s3fanout_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND;
201  break;
202  case CURL_POLL_INOUT:
203  s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI | POLLOUT
204  | POLLWRBAND;
205  break;
206  case CURL_POLL_REMOVE:
207  if (index < s3fanout_mgr->watch_fds_inuse_ - 1)
208  s3fanout_mgr
209  ->watch_fds_[index] = s3fanout_mgr->watch_fds_
210  [s3fanout_mgr->watch_fds_inuse_ - 1];
211  s3fanout_mgr->watch_fds_inuse_--;
212  // Shrink array if necessary
213  if ((s3fanout_mgr->watch_fds_inuse_ > s3fanout_mgr->watch_fds_max_)
214  && (s3fanout_mgr->watch_fds_inuse_
215  < s3fanout_mgr->watch_fds_size_ / 2)) {
216  s3fanout_mgr->watch_fds_size_ /= 2;
217  s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
218  srealloc(s3fanout_mgr->watch_fds_,
219  s3fanout_mgr->watch_fds_size_ * sizeof(struct pollfd)));
220  }
221  break;
222  default:
223  PANIC(NULL);
224  }
225 
226  return 0;
227 }
228 
229 
233 void *S3FanoutManager::MainUpload(void *data) {
234  LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread started");
235  S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(data);
236 
237  s3fanout_mgr->InitPipeWatchFds();
238 
239  // Don't schedule more jobs into the multi handle than the maximum number of
240  // parallel connections. This should prevent starvation and thus a timeout
241  // of the authorization header (CVM-1339).
242  unsigned jobs_in_flight = 0;
243 
244  while (true) {
245  // Check events with 100ms timeout
246  const int timeout_ms = 100;
247  int retval = poll(s3fanout_mgr->watch_fds_, s3fanout_mgr->watch_fds_inuse_,
248  timeout_ms);
249  if (retval == 0) {
250  // Handle timeout
251  int still_running = 0;
252  retval = curl_multi_socket_action(
253  s3fanout_mgr->curl_multi_, CURL_SOCKET_TIMEOUT, 0, &still_running);
254  if (retval != CURLM_OK) {
255  LogCvmfs(kLogS3Fanout, kLogStderr, "Error, timeout due to: %d", retval);
256  assert(retval == CURLM_OK);
257  }
258  } else if (retval < 0) {
259  assert(errno == EINTR);
260  continue;
261  }
262 
263  // Terminate I/O thread
264  if (s3fanout_mgr->watch_fds_[0].revents)
265  break;
266 
267  // New job incoming
268  if (s3fanout_mgr->watch_fds_[1].revents) {
269  s3fanout_mgr->watch_fds_[1].revents = 0;
270  JobInfo *info;
271  ReadPipe(s3fanout_mgr->pipe_jobs_[0], &info, sizeof(info));
272  CURL *handle = s3fanout_mgr->AcquireCurlHandle();
273  if (handle == NULL) {
274  PANIC(kLogStderr, "Failed to acquire CURL handle.");
275  }
276  const s3fanout::Failures init_failure =
277  s3fanout_mgr->InitializeRequest(info, handle);
278  if (init_failure != s3fanout::kFailOk) {
280  "Failed to initialize CURL handle (error: %d - %s | errno: %d)",
281  init_failure, Code2Ascii(init_failure), errno);
282  }
283  s3fanout_mgr->SetUrlOptions(info);
284 
285  curl_multi_add_handle(s3fanout_mgr->curl_multi_, handle);
286  s3fanout_mgr->active_requests_->insert(info);
287  jobs_in_flight++;
288  int still_running = 0, retval = 0;
289  retval = curl_multi_socket_action(
290  s3fanout_mgr->curl_multi_, CURL_SOCKET_TIMEOUT, 0, &still_running);
291 
292  LogCvmfs(kLogS3Fanout, kLogDebug, "curl_multi_socket_action: %d - %d",
293  retval, still_running);
294  }
295 
296 
297  // Activity on curl sockets
298  // Within this loop the curl_multi_socket_action() may cause socket(s)
299  // to be removed from watch_fds_. If a socket is removed it is replaced
300  // by the socket at the end of the array and the inuse count is decreased.
301  // Therefore loop over the array in reverse order.
302  // First 2 fds are job and terminate pipes (not curl related)
303  for (int32_t i = s3fanout_mgr->watch_fds_inuse_ - 1; i >= 2; --i) {
304  if (static_cast<uint32_t>(i) >= s3fanout_mgr->watch_fds_inuse_) {
305  continue;
306  }
307  if (s3fanout_mgr->watch_fds_[i].revents) {
308  int ev_bitmask = 0;
309  if (s3fanout_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI))
310  ev_bitmask |= CURL_CSELECT_IN;
311  if (s3fanout_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
312  ev_bitmask |= CURL_CSELECT_OUT;
313  if (s3fanout_mgr->watch_fds_[i].revents
314  & (POLLERR | POLLHUP | POLLNVAL))
315  ev_bitmask |= CURL_CSELECT_ERR;
316  s3fanout_mgr->watch_fds_[i].revents = 0;
317 
318  int still_running = 0;
319  retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
320  s3fanout_mgr->watch_fds_[i].fd,
321  ev_bitmask,
322  &still_running);
323  }
324  }
325 
326  // Check if transfers are completed
327  CURLMsg *curl_msg;
328  int msgs_in_queue;
329  while ((curl_msg = curl_multi_info_read(s3fanout_mgr->curl_multi_,
330  &msgs_in_queue))) {
331  assert(curl_msg->msg == CURLMSG_DONE);
332 
333  s3fanout_mgr->statistics_->num_requests++;
334  JobInfo *info;
335  CURL *easy_handle = curl_msg->easy_handle;
336  const int curl_error = curl_msg->data.result;
337  curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
338 
339  curl_multi_remove_handle(s3fanout_mgr->curl_multi_, easy_handle);
340  if (s3fanout_mgr->VerifyAndFinalize(curl_error, info)) {
341  curl_multi_add_handle(s3fanout_mgr->curl_multi_, easy_handle);
342  int still_running = 0;
343  curl_multi_socket_action(
344  s3fanout_mgr->curl_multi_, CURL_SOCKET_TIMEOUT, 0, &still_running);
345  } else {
346  // Return easy handle into pool and write result back
347  jobs_in_flight--;
348  s3fanout_mgr->active_requests_->erase(info);
349  s3fanout_mgr->ReleaseCurlHandle(info, easy_handle);
350  s3fanout_mgr->available_jobs_->Decrement();
351 
352  // Add to list of completed jobs
353  s3fanout_mgr->PushCompletedJob(info);
354  }
355  }
356  }
357 
358  set<CURL *>::iterator i = s3fanout_mgr->pool_handles_inuse_->begin();
359  const set<CURL *>::const_iterator i_end = s3fanout_mgr->pool_handles_inuse_
360  ->end();
361  for (; i != i_end; ++i) {
362  curl_multi_remove_handle(s3fanout_mgr->curl_multi_, *i);
363  curl_easy_cleanup(*i);
364  }
365  s3fanout_mgr->pool_handles_inuse_->clear();
366  free(s3fanout_mgr->watch_fds_);
367 
368  LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread terminated");
369  return NULL;
370 }
371 
372 
377 CURL *S3FanoutManager::AcquireCurlHandle() const {
378  CURL *handle;
379 
380  const MutexLockGuard guard(curl_handle_lock_);
381 
382  if (pool_handles_idle_->empty()) {
383  CURLcode retval;
384 
385  // Create a new handle
386  handle = curl_easy_init();
387  assert(handle != NULL);
388 
389  // Other settings
390  retval = curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
391  assert(retval == CURLE_OK);
392  retval = curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION,
394  assert(retval == CURLE_OK);
395  retval = curl_easy_setopt(handle, CURLOPT_READFUNCTION, CallbackCurlData);
396  assert(retval == CURLE_OK);
397  retval = curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlBody);
398  assert(retval == CURLE_OK);
399  } else {
400  handle = *(pool_handles_idle_->begin());
401  pool_handles_idle_->erase(pool_handles_idle_->begin());
402  }
403 
404  pool_handles_inuse_->insert(handle);
405 
406  return handle;
407 }
408 
409 
410 void S3FanoutManager::ReleaseCurlHandle(JobInfo *info, CURL *handle) const {
411  if (info->http_headers) {
412  curl_slist_free_all(info->http_headers);
413  info->http_headers = NULL;
414  }
415 
416  const MutexLockGuard guard(curl_handle_lock_);
417 
418  const set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
419  assert(elem != pool_handles_inuse_->end());
420 
421  if (pool_handles_idle_->size() > config_.pool_max_handles) {
422  const CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, NULL);
423  assert(retval == CURLE_OK);
424  curl_easy_cleanup(handle);
425  const std::map<CURL *, S3FanOutDnsEntry *>::size_type retitems =
426  curl_sharehandles_->erase(handle);
427  assert(retitems == 1);
428  } else {
429  pool_handles_idle_->insert(handle);
430  }
431 
432  pool_handles_inuse_->erase(elem);
433 }
434 
435 void S3FanoutManager::InitPipeWatchFds() {
436  assert(watch_fds_inuse_ == 0);
437  assert(watch_fds_size_ >= 2);
438  watch_fds_[0].fd = pipe_terminate_[0];
439  watch_fds_[0].events = POLLIN | POLLPRI;
440  watch_fds_[0].revents = 0;
441  ++watch_fds_inuse_;
442  watch_fds_[1].fd = pipe_jobs_[0];
443  watch_fds_[1].events = POLLIN | POLLPRI;
444  watch_fds_[1].revents = 0;
445  ++watch_fds_inuse_;
446 }
447 
452 bool S3FanoutManager::MkV2Authz(const JobInfo &info,
453  vector<string> *headers) const {
454  string payload_hash;
455  const bool retval = MkPayloadHash(info, &payload_hash);
456  if (!retval)
457  return false;
458  const string content_type = GetContentType(info);
459  const string request = GetRequestString(info);
460 
461  const string timestamp = RfcTimestamp();
462  string to_sign = request + "\n" + payload_hash + "\n" + content_type + "\n"
463  + timestamp + "\n";
464  if (config_.x_amz_acl != "") {
465  to_sign += "x-amz-acl:" + config_.x_amz_acl + "\n" + // default ACL
466  "/" + config_.bucket + "/" + info.object_key;
467  }
468  LogCvmfs(kLogS3Fanout, kLogDebug, "%s string to sign for: %s",
469  request.c_str(), info.object_key.c_str());
470 
471  shash::Any hmac;
472  hmac.algorithm = shash::kSha1;
473  shash::Hmac(config_.secret_key,
474  reinterpret_cast<const unsigned char *>(to_sign.data()),
475  to_sign.length(), &hmac);
476 
477  headers->push_back("Authorization: AWS " + config_.access_key + ":"
478  + Base64(string(reinterpret_cast<char *>(hmac.digest),
479  hmac.GetDigestSize())));
480  headers->push_back("Date: " + timestamp);
481  headers->push_back("X-Amz-Acl: " + config_.x_amz_acl);
482  if (!payload_hash.empty())
483  headers->push_back("Content-MD5: " + payload_hash);
484  if (!content_type.empty())
485  headers->push_back("Content-Type: " + content_type);
486  return true;
487 }
488 
489 
490 string S3FanoutManager::GetUriEncode(const string &val,
491  bool encode_slash) const {
492  string result;
493  const unsigned len = val.length();
494  result.reserve(len);
495  for (unsigned i = 0; i < len; ++i) {
496  const char c = val[i];
497  if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')
498  || (c >= '0' && c <= '9') || c == '_' || c == '-' || c == '~'
499  || c == '.') {
500  result.push_back(c);
501  } else if (c == '/') {
502  if (encode_slash) {
503  result += "%2F";
504  } else {
505  result.push_back(c);
506  }
507  } else {
508  result.push_back('%');
509  result.push_back((c / 16) + ((c / 16 <= 9) ? '0' : 'A' - 10));
510  result.push_back((c % 16) + ((c % 16 <= 9) ? '0' : 'A' - 10));
511  }
512  }
513  return result;
514 }
515 
516 
517 string S3FanoutManager::GetAwsV4SigningKey(const string &date) const {
518  if (last_signing_key_.first == date)
519  return last_signing_key_.second;
520 
521  const string date_key =
522  shash::Hmac256("AWS4" + config_.secret_key, date, true);
523  const string date_region_key = shash::Hmac256(date_key, config_.region, true);
524  const string date_region_service_key =
525  shash::Hmac256(date_region_key, "s3", true);
526  string signing_key = shash::Hmac256(date_region_service_key, "aws4_request",
527  true);
528  last_signing_key_.first = date;
529  last_signing_key_.second = signing_key;
530  return signing_key;
531 }
532 
533 
538 bool S3FanoutManager::MkV4Authz(const JobInfo &info,
539  vector<string> *headers) const {
540  string payload_hash;
541  const bool retval = MkPayloadHash(info, &payload_hash);
542  if (!retval)
543  return false;
544  const string content_type = GetContentType(info);
545  const string timestamp = IsoTimestamp();
546  const string date = timestamp.substr(0, 8);
547  vector<string> tokens = SplitString(complete_hostname_, ':');
548  assert(tokens.size() <= 2);
549  string canonical_hostname = tokens[0];
550 
551  // if we could split the hostname in two and if the port is *NOT* a default
552  // one
553  if (tokens.size() == 2
554  && !((String2Uint64(tokens[1]) == kDefaultHTTPPort)
555  || (String2Uint64(tokens[1]) == kDefaultHTTPSPort)))
556  canonical_hostname += ":" + tokens[1];
557 
558  string signed_headers;
559  string canonical_headers;
560  if (!content_type.empty()) {
561  signed_headers += "content-type;";
562  headers->push_back("Content-Type: " + content_type);
563  canonical_headers += "content-type:" + content_type + "\n";
564  }
565  if (config_.x_amz_acl != "") {
566  signed_headers += "host;x-amz-acl;x-amz-content-sha256;x-amz-date";
567  } else {
568  signed_headers += "host;x-amz-content-sha256;x-amz-date";
569  }
570  canonical_headers += "host:" + canonical_hostname + "\n";
571  if (config_.x_amz_acl != "") {
572  canonical_headers += "x-amz-acl:" + config_.x_amz_acl + "\n";
573  }
574  canonical_headers += "x-amz-content-sha256:" + payload_hash + "\n"
575  + "x-amz-date:" + timestamp + "\n";
576 
577  const string scope = date + "/" + config_.region + "/s3/aws4_request";
578  const string uri =
579  config_.dns_buckets
580  ? (string("/") + info.object_key)
581  : (string("/") + config_.bucket + "/" + info.object_key);
582 
583  const string canonical_request =
584  GetRequestString(info) + "\n" + GetUriEncode(uri, false) + "\n" + "\n" +
585  canonical_headers + "\n" + signed_headers + "\n" + payload_hash;
586 
587  const string hash_request = shash::Sha256String(canonical_request.c_str());
588 
589  const string string_to_sign =
590  "AWS4-HMAC-SHA256\n" + timestamp + "\n" + scope + "\n" + hash_request;
591 
592  const string signing_key = GetAwsV4SigningKey(date);
593  const string signature = shash::Hmac256(signing_key, string_to_sign);
594 
595  headers->push_back("X-Amz-Acl: " + config_.x_amz_acl);
596  headers->push_back("X-Amz-Content-Sha256: " + payload_hash);
597  headers->push_back("X-Amz-Date: " + timestamp);
598  headers->push_back("Authorization: AWS4-HMAC-SHA256 "
599  "Credential="
600  + config_.access_key + "/" + scope
601  + ","
602  "SignedHeaders="
603  + signed_headers
604  + ","
605  "Signature="
606  + signature);
607  return true;
608 }
609 
614 bool S3FanoutManager::MkAzureAuthz(const JobInfo &info,
615  vector<string> *headers) const {
616  const string timestamp = RfcTimestamp();
617  const string canonical_headers =
618  "x-ms-blob-type:BlockBlob\nx-ms-date:" + timestamp +
619  "\nx-ms-version:2011-08-18";
620  const string canonical_resource =
621  "/" + config_.access_key + "/" + config_.bucket + "/" + info.object_key;
622 
623  string string_to_sign;
624  if ((info.request == JobInfo::kReqHeadOnly)
625  || (info.request == JobInfo::kReqHeadPut)
626  || (info.request == JobInfo::kReqDelete)) {
627  string_to_sign = GetRequestString(info) + string("\n\n\n")
628  + "\n\n\n\n\n\n\n\n\n" + canonical_headers + "\n"
629  + canonical_resource;
630  } else {
631  string_to_sign = GetRequestString(info) + string("\n\n\n")
632  + string(StringifyInt(info.origin->GetSize()))
633  + "\n\n\n\n\n\n\n\n\n" + canonical_headers + "\n"
634  + canonical_resource;
635  }
636 
637  string signing_key;
638  const int retval = Debase64(config_.secret_key, &signing_key);
639  if (!retval)
640  return false;
641 
642  const string signature = shash::Hmac256(signing_key, string_to_sign, true);
643 
644  headers->push_back("x-ms-date: " + timestamp);
645  headers->push_back("x-ms-version: 2011-08-18");
646  headers->push_back("Authorization: SharedKey " + config_.access_key + ":"
647  + Base64(signature));
648  headers->push_back("x-ms-blob-type: BlockBlob");
649  return true;
650 }
651 
652 void S3FanoutManager::InitializeDnsSettingsCurl(CURL *handle,
653  CURLSH *sharehandle,
654  curl_slist *clist) const {
655  CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, sharehandle);
656  assert(retval == CURLE_OK);
657  retval = curl_easy_setopt(handle, CURLOPT_RESOLVE, clist);
658  assert(retval == CURLE_OK);
659 }
660 
661 
662 int S3FanoutManager::InitializeDnsSettings(CURL *handle,
663  std::string host_with_port) const {
664  // Use existing handle
665  const std::map<CURL *, S3FanOutDnsEntry *>::const_iterator it =
666  curl_sharehandles_->find(handle);
667  if (it != curl_sharehandles_->end()) {
668  InitializeDnsSettingsCurl(handle, it->second->sharehandle,
669  it->second->clist);
670  return 0;
671  }
672 
673  // Add protocol information for extraction of fields for DNS
674  if (!IsHttpUrl(host_with_port))
675  host_with_port = config_.protocol + "://" + host_with_port;
676  const std::string remote_host = dns::ExtractHost(host_with_port);
677  const std::string remote_port = dns::ExtractPort(host_with_port);
678 
679  // If we have the name already resolved, use the least used IP
680  S3FanOutDnsEntry *useme = NULL;
681  unsigned int usemin = UINT_MAX;
682  std::set<S3FanOutDnsEntry *>::iterator its3 = sharehandles_->begin();
683  for (; its3 != sharehandles_->end(); ++its3) {
684  if ((*its3)->dns_name == remote_host) {
685  if (usemin >= (*its3)->counter) {
686  usemin = (*its3)->counter;
687  useme = (*its3);
688  }
689  }
690  }
691  if (useme != NULL) {
692  curl_sharehandles_->insert(
693  std::pair<CURL *, S3FanOutDnsEntry *>(handle, useme));
694  useme->counter++;
695  InitializeDnsSettingsCurl(handle, useme->sharehandle, useme->clist);
696  return 0;
697  }
698 
699  // We need to resolve the hostname
700  // TODO(ssheikki): support ipv6 also... if (opt_ipv4_only_)
701  const dns::Host host = resolver_->Resolve(remote_host);
702  set<string> ipv4_addresses = host.ipv4_addresses();
703  std::set<string>::iterator its = ipv4_addresses.begin();
704  S3FanOutDnsEntry *dnse = NULL;
705  for (; its != ipv4_addresses.end(); ++its) {
706  dnse = new S3FanOutDnsEntry();
707  dnse->counter = 0;
708  dnse->dns_name = remote_host;
709  dnse->port = remote_port.size() == 0 ? "80" : remote_port;
710  dnse->ip = *its;
711  dnse->clist = NULL;
712  dnse->clist = curl_slist_append(
713  dnse->clist,
714  (dnse->dns_name + ":" + dnse->port + ":" + dnse->ip).c_str());
715  dnse->sharehandle = curl_share_init();
716  assert(dnse->sharehandle != NULL);
717  const CURLSHcode share_retval = curl_share_setopt(
718  dnse->sharehandle, CURLSHOPT_SHARE, CURL_LOCK_DATA_DNS);
719  assert(share_retval == CURLSHE_OK);
720  sharehandles_->insert(dnse);
721  }
722  if (dnse == NULL) {
724  "Error: DNS resolve failed for address '%s'.",
725  remote_host.c_str());
726  assert(dnse != NULL);
727  return -1;
728  }
729  curl_sharehandles_->insert(
730  std::pair<CURL *, S3FanOutDnsEntry *>(handle, dnse));
731  dnse->counter++;
732  InitializeDnsSettingsCurl(handle, dnse->sharehandle, dnse->clist);
733 
734  return 0;
735 }
736 
737 
738 bool S3FanoutManager::MkPayloadHash(const JobInfo &info,
739  string *hex_hash) const {
740  if ((info.request == JobInfo::kReqHeadOnly)
741  || (info.request == JobInfo::kReqHeadPut)
742  || (info.request == JobInfo::kReqDelete)) {
743  switch (config_.authz_method) {
744  case kAuthzAwsV2:
745  hex_hash->clear();
746  break;
747  case kAuthzAwsV4:
748  // Sha256 over empty string
749  *hex_hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b78"
750  "52b855";
751  break;
752  case kAuthzAzure:
753  // no payload hash required for Azure signature
754  hex_hash->clear();
755  break;
756  default:
757  PANIC(NULL);
758  }
759  return true;
760  }
761 
762  // PUT, there is actually payload
763  shash::Any payload_hash(shash::kMd5);
764 
765  unsigned char *data;
766  const unsigned int nbytes = info.origin->Data(
767  reinterpret_cast<void **>(&data), info.origin->GetSize(), 0);
768  assert(nbytes == info.origin->GetSize());
769 
770  switch (config_.authz_method) {
771  case kAuthzAwsV2:
772  shash::HashMem(data, nbytes, &payload_hash);
773  *hex_hash = Base64(string(reinterpret_cast<char *>(payload_hash.digest),
774  payload_hash.GetDigestSize()));
775  return true;
776  case kAuthzAwsV4:
777  *hex_hash = shash::Sha256Mem(data, nbytes);
778  return true;
779  case kAuthzAzure:
780  // no payload hash required for Azure signature
781  hex_hash->clear();
782  return true;
783  default:
784  PANIC(NULL);
785  }
786 }
787 
788 string S3FanoutManager::GetRequestString(const JobInfo &info) const {
789  switch (info.request) {
790  case JobInfo::kReqHeadOnly:
791  case JobInfo::kReqHeadPut:
792  return "HEAD";
793  case JobInfo::kReqPutCas:
794  case JobInfo::kReqPutDotCvmfs:
795  case JobInfo::kReqPutHtml:
796  case JobInfo::kReqPutBucket:
797  return "PUT";
798  case JobInfo::kReqDelete:
799  return "DELETE";
800  default:
801  PANIC(NULL);
802  }
803 }
804 
805 
806 string S3FanoutManager::GetContentType(const JobInfo &info) const {
807  switch (info.request) {
808  case JobInfo::kReqHeadOnly:
809  case JobInfo::kReqHeadPut:
810  case JobInfo::kReqDelete:
811  return "";
812  case JobInfo::kReqPutCas:
813  return "application/octet-stream";
814  case JobInfo::kReqPutDotCvmfs:
815  return "application/x-cvmfs";
816  case JobInfo::kReqPutHtml:
817  return "text/html";
818  case JobInfo::kReqPutBucket:
819  return "text/xml";
820  default:
821  PANIC(NULL);
822  }
823 }
824 
825 
830 Failures S3FanoutManager::InitializeRequest(JobInfo *info, CURL *handle) const {
831  // Initialize internal download state
832  info->curl_handle = handle;
833  info->error_code = kFailOk;
834  info->http_error = 0;
835  info->num_retries = 0;
836  info->backoff_ms = 0;
837  info->throttle_ms = 0;
838  info->throttle_timestamp = 0;
839  info->http_headers = NULL;
840  // info->payload_size is needed in S3Uploader::MainCollectResults,
841  // where info->origin is already destroyed.
842  info->payload_size = info->origin->GetSize();
843 
844  InitializeDnsSettings(handle, complete_hostname_);
845 
846  CURLcode retval;
847  if ((info->request == JobInfo::kReqHeadOnly)
848  || (info->request == JobInfo::kReqHeadPut)
849  || (info->request == JobInfo::kReqDelete)) {
850  retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 0);
851  assert(retval == CURLE_OK);
852  retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
853  assert(retval == CURLE_OK);
854 
855  if (info->request == JobInfo::kReqDelete) {
856  retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST,
857  GetRequestString(*info).c_str());
858  assert(retval == CURLE_OK);
859  } else {
860  retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
861  assert(retval == CURLE_OK);
862  }
863  } else {
864  retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
865  assert(retval == CURLE_OK);
866  retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 1);
867  assert(retval == CURLE_OK);
868  retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 0);
869  assert(retval == CURLE_OK);
870  retval = curl_easy_setopt(handle, CURLOPT_INFILESIZE_LARGE,
871  static_cast<curl_off_t>(info->origin->GetSize()));
872  assert(retval == CURLE_OK);
873 
874  if (info->request == JobInfo::kReqPutDotCvmfs) {
875  info->http_headers = curl_slist_append(info->http_headers,
876  kCacheControlDotCvmfs);
877  } else if (info->request == JobInfo::kReqPutCas) {
878  info->http_headers = curl_slist_append(info->http_headers,
879  kCacheControlCas);
880  }
881  }
882 
883  bool retval_b;
884 
885  // Authorization
886  vector<string> authz_headers;
887  switch (config_.authz_method) {
888  case kAuthzAwsV2:
889  retval_b = MkV2Authz(*info, &authz_headers);
890  break;
891  case kAuthzAwsV4:
892  retval_b = MkV4Authz(*info, &authz_headers);
893  break;
894  case kAuthzAzure:
895  retval_b = MkAzureAuthz(*info, &authz_headers);
896  break;
897  default:
898  PANIC(NULL);
899  }
900  if (!retval_b)
901  return kFailLocalIO;
902  for (unsigned i = 0; i < authz_headers.size(); ++i) {
903  info->http_headers = curl_slist_append(info->http_headers,
904  authz_headers[i].c_str());
905  }
906 
907  // Common headers
908  info->http_headers = curl_slist_append(info->http_headers,
909  "Connection: Keep-Alive");
910  info->http_headers = curl_slist_append(info->http_headers, "Pragma:");
911  // No 100-continue
912  info->http_headers = curl_slist_append(info->http_headers, "Expect:");
913  // Strip unnecessary header
914  info->http_headers = curl_slist_append(info->http_headers, "Accept:");
915  info->http_headers = curl_slist_append(info->http_headers,
916  user_agent_->c_str());
917 
918  // Set curl parameters
919  retval = curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
920  assert(retval == CURLE_OK);
921  retval = curl_easy_setopt(handle, CURLOPT_HEADERDATA,
922  static_cast<void *>(info));
923  assert(retval == CURLE_OK);
924  retval = curl_easy_setopt(handle, CURLOPT_READDATA,
925  static_cast<void *>(info));
926  assert(retval == CURLE_OK);
927  retval = curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->http_headers);
928  assert(retval == CURLE_OK);
929  if (opt_ipv4_only_) {
930  retval = curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
931  assert(retval == CURLE_OK);
932  }
933  // Follow HTTP redirects
934  retval = curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L);
935  assert(retval == CURLE_OK);
936 
937  retval = curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, info->errorbuffer);
938  assert(retval == CURLE_OK);
939 
940  if (config_.protocol == "https") {
941  retval = curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 1L);
942  assert(retval == CURLE_OK);
943  retval = curl_easy_setopt(handle, CURLOPT_PROXY_SSL_VERIFYPEER, 1L);
944  assert(retval == CURLE_OK);
945  const bool add_cert =
946  ssl_certificate_store_.ApplySslCertificatePath(handle);
947  assert(add_cert);
948  }
949 
950  return kFailOk;
951 }
952 
953 
957 void S3FanoutManager::SetUrlOptions(JobInfo *info) const {
958  CURL *curl_handle = info->curl_handle;
959  CURLcode retval;
960 
961  retval = curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT,
962  config_.opt_timeout_sec);
963  assert(retval == CURLE_OK);
964  retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT,
965  kLowSpeedLimit);
966  assert(retval == CURLE_OK);
967  retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME,
968  config_.opt_timeout_sec);
969  assert(retval == CURLE_OK);
970 
971  if (is_curl_debug_) {
972  retval = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE, 1);
973  assert(retval == CURLE_OK);
974  }
975 
976  const string url = MkUrl(info->object_key);
977  retval = curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
978  assert(retval == CURLE_OK);
979 
980  retval = curl_easy_setopt(curl_handle, CURLOPT_PROXY, config_.proxy.c_str());
981  assert(retval == CURLE_OK);
982 }
983 
984 
988 void S3FanoutManager::UpdateStatistics(CURL *handle) {
989  double val;
990 
991  if (curl_easy_getinfo(handle, CURLINFO_SIZE_UPLOAD, &val) == CURLE_OK)
992  statistics_->transferred_bytes += val;
993 }
994 
995 
999 bool S3FanoutManager::CanRetry(const JobInfo *info) {
1000  return (info->error_code == kFailHostConnection
1001  || info->error_code == kFailHostResolve
1003  || info->error_code == kFailRetry)
1004  && (info->num_retries < config_.opt_max_retries);
1005 }
1006 
1007 
1013 void S3FanoutManager::Backoff(JobInfo *info) {
1014  if (info->error_code != kFailRetry)
1015  info->num_retries++;
1016  statistics_->num_retries++;
1017 
1018  if (info->throttle_ms > 0) {
1019  LogCvmfs(kLogS3Fanout, kLogDebug, "throttling for %d ms",
1020  info->throttle_ms);
1021  const uint64_t now = platform_monotonic_time();
1022  if ((info->throttle_timestamp + (info->throttle_ms / 1000)) >= now) {
1023  if ((now - timestamp_last_throttle_report_)
1024  > kThrottleReportIntervalSec) {
1026  "Warning: S3 backend throttling %ums "
1027  "(total backoff time so far %lums)",
1028  info->throttle_ms, statistics_->ms_throttled);
1029  timestamp_last_throttle_report_ = now;
1030  }
1031  statistics_->ms_throttled += info->throttle_ms;
1032  SafeSleepMs(info->throttle_ms);
1033  }
1034  } else {
1035  if (info->backoff_ms == 0) {
1036  // Must be != 0
1037  info->backoff_ms = prng_.Next(config_.opt_backoff_init_ms + 1);
1038  } else {
1039  info->backoff_ms *= 2;
1040  }
1041  if (info->backoff_ms > config_.opt_backoff_max_ms)
1042  info->backoff_ms = config_.opt_backoff_max_ms;
1043 
1044  LogCvmfs(kLogS3Fanout, kLogDebug, "backing off for %d ms",
1045  info->backoff_ms);
1046  SafeSleepMs(info->backoff_ms);
1047  }
1048 }
1049 
1050 
1057 bool S3FanoutManager::VerifyAndFinalize(const int curl_error, JobInfo *info) {
1059  "Verify uploaded/tested object %s "
1060  "(curl error %d, info error %d, info request %d)",
1061  info->object_key.c_str(), curl_error, info->error_code,
1062  info->request);
1063  UpdateStatistics(info->curl_handle);
1064 
1065  // Verification and error classification
1066  switch (curl_error) {
1067  case CURLE_OK:
1068  if ((info->error_code != kFailRetry)
1069  && (info->error_code != kFailNotFound)) {
1070  info->error_code = kFailOk;
1071  }
1072  break;
1073  case CURLE_UNSUPPORTED_PROTOCOL:
1074  case CURLE_URL_MALFORMAT:
1075  info->error_code = kFailBadRequest;
1076  break;
1077  case CURLE_COULDNT_RESOLVE_HOST:
1078  info->error_code = kFailHostResolve;
1079  break;
1080  case CURLE_COULDNT_CONNECT:
1081  case CURLE_OPERATION_TIMEDOUT:
1082  case CURLE_SEND_ERROR:
1083  case CURLE_RECV_ERROR:
1085  break;
1086  case CURLE_ABORTED_BY_CALLBACK:
1087  case CURLE_WRITE_ERROR:
1088  // Error set by callback
1089  break;
1090  default:
1092  "unexpected curl error (%d) while trying to upload %s: %s",
1093  curl_error, info->object_key.c_str(), info->errorbuffer);
1094  info->error_code = kFailOther;
1095  break;
1096  }
1097 
1098  // Transform HEAD to PUT request
1099  if ((info->error_code == kFailNotFound)
1100  && (info->request == JobInfo::kReqHeadPut)) {
1101  LogCvmfs(kLogS3Fanout, kLogDebug, "not found: %s, uploading",
1102  info->object_key.c_str());
1103  info->request = JobInfo::kReqPutCas;
1104  curl_slist_free_all(info->http_headers);
1105  info->http_headers = NULL;
1106  const s3fanout::Failures init_failure =
1107  InitializeRequest(info, info->curl_handle);
1108 
1109  if (init_failure != s3fanout::kFailOk) {
1110  PANIC(kLogStderr,
1111  "Failed to initialize CURL handle "
1112  "(error: %d - %s | errno: %d)",
1113  init_failure, Code2Ascii(init_failure), errno);
1114  }
1115  SetUrlOptions(info);
1116  // Reset origin
1117  info->origin->Rewind();
1118  return true; // Again, Put
1119  }
1120 
1121  // Determination if failed request should be repeated
1122  bool try_again = false;
1123  if (info->error_code != kFailOk) {
1124  try_again = CanRetry(info);
1125  }
1126  if (try_again) {
1127  if (info->request == JobInfo::kReqPutCas
1128  || info->request == JobInfo::kReqPutDotCvmfs
1129  || info->request == JobInfo::kReqPutHtml) {
1130  LogCvmfs(kLogS3Fanout, kLogDebug, "Trying again to upload %s",
1131  info->object_key.c_str());
1132  // Reset origin
1133  info->origin->Rewind();
1134  }
1135  Backoff(info);
1136  info->error_code = kFailOk;
1137  info->http_error = 0;
1138  info->throttle_ms = 0;
1139  info->backoff_ms = 0;
1140  info->throttle_timestamp = 0;
1141  return true; // try again
1142  }
1143 
1144  // Cleanup opened resources
1145  info->origin.Destroy();
1146 
1147  if ((info->error_code != kFailOk) && (info->http_error != 0)
1148  && (info->http_error != 404)) {
1149  LogCvmfs(kLogS3Fanout, kLogStderr, "S3: HTTP failure %d", info->http_error);
1150  }
1151  return false; // stop transfer
1152 }
1153 
1154 S3FanoutManager::S3FanoutManager(const S3Config &config) : config_(config) {
1155  atomic_init32(&multi_threaded_);
1159 
1160  int retval;
1161  jobs_todo_lock_ = reinterpret_cast<pthread_mutex_t *>(
1162  smalloc(sizeof(pthread_mutex_t)));
1163  retval = pthread_mutex_init(jobs_todo_lock_, NULL);
1164  assert(retval == 0);
1165  curl_handle_lock_ = reinterpret_cast<pthread_mutex_t *>(
1166  smalloc(sizeof(pthread_mutex_t)));
1167  retval = pthread_mutex_init(curl_handle_lock_, NULL);
1168  assert(retval == 0);
1169 
1170  active_requests_ = new set<JobInfo *>;
1171  pool_handles_idle_ = new set<CURL *>;
1172  pool_handles_inuse_ = new set<CURL *>;
1173  curl_sharehandles_ = new map<CURL *, S3FanOutDnsEntry *>;
1174  sharehandles_ = new set<S3FanOutDnsEntry *>;
1178  assert(NULL != available_jobs_);
1179 
1180  statistics_ = new Statistics();
1181  user_agent_ = new string();
1182  *user_agent_ = "User-Agent: cvmfs " + string(CVMFS_VERSION);
1184 
1185  const CURLcode cretval = curl_global_init(CURL_GLOBAL_ALL);
1186  assert(cretval == CURLE_OK);
1187  curl_multi_ = curl_multi_init();
1188  assert(curl_multi_ != NULL);
1189  CURLMcode mretval;
1190  mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION,
1192  assert(mretval == CURLM_OK);
1193  mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1194  static_cast<void *>(this));
1195  assert(mretval == CURLM_OK);
1196  mretval = curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1198  assert(mretval == CURLM_OK);
1199 
1200  prng_.InitLocaltime();
1201 
1202  thread_upload_ = 0;
1204  is_curl_debug_ = (getenv("_CVMFS_CURL_DEBUG") != NULL);
1205 
1206  // Parsing environment variables
1207  if ((getenv("CVMFS_IPV4_ONLY") != NULL)
1208  && (strlen(getenv("CVMFS_IPV4_ONLY")) > 0)) {
1209  opt_ipv4_only_ = true;
1210  } else {
1211  opt_ipv4_only_ = false;
1212  }
1213 
1215 
1216  watch_fds_ = static_cast<struct pollfd *>(smalloc(4 * sizeof(struct pollfd)));
1217  watch_fds_size_ = 4;
1218  watch_fds_inuse_ = 0;
1219 
1221 }
1222 
1224  pthread_mutex_destroy(jobs_todo_lock_);
1225  free(jobs_todo_lock_);
1226  pthread_mutex_destroy(curl_handle_lock_);
1227  free(curl_handle_lock_);
1228 
1229  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1230  // Shutdown I/O thread
1231  char buf = 'T';
1232  WritePipe(pipe_terminate_[1], &buf, 1);
1233  pthread_join(thread_upload_, NULL);
1234  }
1238 
1239  set<CURL *>::iterator i = pool_handles_idle_->begin();
1240  const set<CURL *>::const_iterator iEnd = pool_handles_idle_->end();
1241  for (; i != iEnd; ++i) {
1242  curl_easy_cleanup(*i);
1243  }
1244 
1245  set<S3FanOutDnsEntry *>::iterator is = sharehandles_->begin();
1246  const set<S3FanOutDnsEntry *>::const_iterator isEnd = sharehandles_->end();
1247  for (; is != isEnd; ++is) {
1248  curl_share_cleanup((*is)->sharehandle);
1249  curl_slist_free_all((*is)->clist);
1250  delete *is;
1251  }
1252  pool_handles_idle_->clear();
1253  curl_sharehandles_->clear();
1254  sharehandles_->clear();
1255  delete active_requests_;
1256  delete pool_handles_idle_;
1257  delete pool_handles_inuse_;
1258  delete curl_sharehandles_;
1259  delete sharehandles_;
1260  delete user_agent_;
1261  curl_multi_cleanup(curl_multi_);
1262 
1263  delete statistics_;
1264 
1265  delete available_jobs_;
1266 
1267  curl_global_cleanup();
1268 }
1269 
1274  LogCvmfs(kLogS3Fanout, kLogDebug, "S3FanoutManager spawned");
1275 
1276  const int retval = pthread_create(&thread_upload_, NULL, MainUpload,
1277  static_cast<void *>(this));
1278  assert(retval == 0);
1279 
1280  atomic_inc32(&multi_threaded_);
1281 }
1282 
1284 
1290  WritePipe(pipe_jobs_[1], &info, sizeof(info));
1291 }
1292 
1297  WritePipe(pipe_completed_[1], &info, sizeof(info));
1298 }
1299 
1304  JobInfo *info;
1305  ReadPipe(pipe_completed_[0], &info, sizeof(info));
1306  return info;
1307 }
1308 
1309 //------------------------------------------------------------------------------
1310 
1311 
1312 string Statistics::Print() const {
1313  return "Transferred Bytes: " + StringifyInt(uint64_t(transferred_bytes))
1314  + "\n" + "Transfer duration: " + StringifyInt(uint64_t(transfer_time))
1315  + " s\n" + "Number of requests: " + StringifyInt(num_requests) + "\n"
1316  + "Number of retries: " + StringifyInt(num_retries) + "\n";
1317 }
1318 
1319 } // namespace s3fanout
unsigned throttle_ms
Definition: s3fanout.h:146
const S3Config config_
Definition: s3fanout.h:277
const char * Code2Ascii(const ObjectFetcherFailures::Failures error)
pthread_mutex_t * jobs_todo_lock_
Definition: s3fanout.h:237
atomic_int32 multi_threaded_
Definition: s3fanout.h:301
std::string ip
Definition: s3fanout.h:162
pthread_mutex_t * curl_handle_lock_
Definition: s3fanout.h:238
string Sha256String(const string &content)
Definition: hash.cc:449
std::set< CURL * > * pool_handles_idle_
Definition: s3fanout.h:286
struct curl_slist * http_headers
Definition: s3fanout.h:137
static size_t CallbackCurlBody(char *, size_t size, size_t nmemb, void *)
Definition: s3fanout.cc:152
#define PANIC(...)
Definition: exception.h:29
std::string IsoTimestamp()
Definition: string.cc:167
dns::CaresResolver * resolver_
Definition: s3fanout.h:290
string Trim(const string &raw, bool trim_newline)
Definition: string.cc:466
CURLSH * sharehandle
Definition: s3fanout.h:165
double transferred_bytes
Definition: s3fanout.h:75
void InitLocaltime()
Definition: prng.h:34
bool IsHttpUrl(const std::string &path)
Definition: posix.cc:167
void Hmac(const string &key, const unsigned char *buffer, const unsigned buffer_size, Any *any_digest)
Definition: hash.cc:273
perf::Statistics * statistics_
Definition: repository.h:138
static void * MainUpload(void *data)
Definition: s3fanout.cc:233
std::string dns_name
Definition: s3fanout.h:161
const std::string object_key
Definition: s3fanout.h:107
assert((mem||(size==0))&&"Out Of Memory")
const std::set< std::string > & ipv4_addresses() const
Definition: dns.h:111
std::string MkCompleteHostname()
Definition: s3fanout.h:269
void PushNewJob(JobInfo *info)
Definition: s3fanout.cc:1288
Algorithms algorithm
Definition: hash.h:122
SynchronizingCounter< uint32_t > Semaphore
Definition: s3fanout.h:171
unsigned int counter
Definition: s3fanout.h:160
std::string Print() const
Definition: s3fanout.cc:1312
bool Debase64(const string &data, string *decoded)
Definition: string.cc:598
void MakePipe(int pipe_fd[2])
Definition: posix.cc:487
unsigned char digest[digest_size_]
Definition: hash.h:121
struct curl_slist * clist
Definition: s3fanout.h:164
std::string * user_agent_
Definition: s3fanout.h:292
Failures error_code
Definition: s3fanout.h:140
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:140
unsigned backoff_ms
Definition: s3fanout.h:144
unsigned int max_available_jobs_
Definition: s3fanout.h:322
std::string RfcTimestamp()
Definition: string.cc:145
int64_t String2Int64(const string &value)
Definition: string.cc:234
unsigned GetDigestSize() const
Definition: hash.h:164
std::set< JobInfo * > * active_requests_
Definition: s3fanout.h:284
void PushCompletedJob(JobInfo *info)
Definition: s3fanout.cc:1296
string Sha256Mem(const unsigned char *buffer, const unsigned buffer_size)
Definition: hash.cc:439
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:306
uint64_t payload_size
Definition: s3fanout.h:138
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:255
bool HasSuffix(const std::string &str, const std::string &suffix, const bool ignore_case)
Definition: string.cc:296
void UseSystemCertificatePath()
Definition: ssl.cc:72
void SetUrlOptions(JobInfo *info) const
Definition: s3fanout.cc:957
Definition: dns.h:90
CURL * AcquireCurlHandle() const
Definition: s3fanout.cc:377
uint64_t throttle_timestamp
Definition: s3fanout.h:148
Semaphore * available_jobs_
Definition: s3fanout.h:323
static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, void *userp, void *socketp)
Definition: s3fanout.cc:161
UniquePtr< FileBackedBuffer > origin
Definition: s3fanout.h:109
void ReleaseCurlHandle(JobInfo *info, CURL *handle) const
Definition: s3fanout.cc:410
char * errorbuffer
Definition: s3fanout.h:149
string StringifyInt(const int64_t value)
Definition: string.cc:77
struct pollfd * watch_fds_
Definition: s3fanout.h:303
uint64_t platform_monotonic_time()
std::string ExtractPort(const std::string &url)
Definition: dns.cc:123
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:279
JobInfo * PopCompletedJob()
Definition: s3fanout.cc:1303
std::string ExtractHost(const std::string &url)
Definition: dns.cc:108
unsigned char num_retries
Definition: s3fanout.h:142
std::string port
Definition: s3fanout.h:163
void HashMem(const unsigned char *buffer, const unsigned buffer_size, Any *any_digest)
Definition: hash.cc:255
std::string complete_hostname_
Definition: s3fanout.h:278
uint64_t num_requests
Definition: s3fanout.h:77
string Base64(const string &data)
Definition: string.cc:537
RequestType request
Definition: s3fanout.h:139
uint64_t String2Uint64(const string &value)
Definition: string.cc:240
double transfer_time
Definition: s3fanout.h:76
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
Definition: s3fanout.cc:1057
uint64_t num_retries
Definition: s3fanout.h:78
Definition: mutex.h:42
static CaresResolver * Create(const bool ipv4_only, const unsigned retries, const unsigned timeout_ms)
Definition: dns.cc:711
std::set< S3FanOutDnsEntry * > * sharehandles_
Definition: s3fanout.h:288
SslCertificateStore ssl_certificate_store_
Definition: s3fanout.h:337
const Statistics & GetStatistics()
Definition: s3fanout.cc:1283
std::string Hmac256(const std::string &key, const std::string &content, bool raw_output)
Definition: hash.cc:455
void SafeSleepMs(const unsigned ms)
Definition: posix.cc:2025
Statistics * statistics_
Definition: s3fanout.h:327
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:496
uint64_t timestamp_last_throttle_report_
Definition: s3fanout.h:330
static void size_t size
Definition: smalloc.h:54
std::set< CURL * > * pool_handles_inuse_
Definition: s3fanout.h:287
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:508
Definition: s3fanout.h:152
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:559
std::map< CURL *, S3FanOutDnsEntry * > * curl_sharehandles_
Definition: s3fanout.h:289
CURL * curl_handle
Definition: s3fanout.h:136
void Destroy()
Definition: pointer.h:53
Failures InitializeRequest(JobInfo *info, CURL *handle) const
Definition: s3fanout.cc:830
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545