CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
s3fanout.cc
Go to the documentation of this file.
1 
7 #include <pthread.h>
8 
9 #include <algorithm>
10 #include <cassert>
11 #include <cerrno>
12 #include <utility>
13 
14 
15 #include "s3fanout.h"
16 #include "upload_facility.h"
17 #include "util/concurrency.h"
18 #include "util/exception.h"
19 #include "util/platform.h"
20 #include "util/posix.h"
21 #include "util/string.h"
22 
23 using namespace std; // NOLINT
24 
25 namespace s3fanout {
26 
27 const char *S3FanoutManager::kCacheControlCas = "Cache-Control: max-age=259200";
28 const char *S3FanoutManager::kCacheControlDotCvmfs =
29  "Cache-Control: max-age=61";
30 const unsigned S3FanoutManager::kDefault429ThrottleMs = 250;
31 const unsigned S3FanoutManager::kMax429ThrottleMs = 10000;
32 const unsigned S3FanoutManager::kThrottleReportIntervalSec = 10;
33 const unsigned S3FanoutManager::kDefaultHTTPPort = 80;
34 const unsigned S3FanoutManager::kDefaultHTTPSPort = 443;
35 
36 
40 void S3FanoutManager::DetectThrottleIndicator(
41  const std::string &header,
42  JobInfo *info)
43 {
44  std::string value_str;
45  if (HasPrefix(header, "retry-after:", true))
46  value_str = header.substr(12);
47  if (HasPrefix(header, "x-retry-in:", true))
48  value_str = header.substr(11);
49 
50  value_str = Trim(value_str, true /* trim_newline */);
51  if (!value_str.empty()) {
52  unsigned value_numeric = String2Uint64(value_str);
53  unsigned value_ms =
54  HasSuffix(value_str, "ms", true /* ignore_case */) ?
55  value_numeric : (value_numeric * 1000);
56  if (value_ms > 0)
57  info->throttle_ms = std::min(value_ms, kMax429ThrottleMs);
58  }
59 }
60 
61 
65 static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb,
66  void *info_link) {
67  const size_t num_bytes = size*nmemb;
68  const string header_line(static_cast<const char *>(ptr), num_bytes);
69  JobInfo *info = static_cast<JobInfo *>(info_link);
70 
71  // Check for http status code errors
72  if (HasPrefix(header_line, "HTTP/1.", false)) {
73  if (header_line.length() < 10)
74  return 0;
75 
76  unsigned i;
77  for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {}
78 
79  if (header_line[i] == '2') {
80  return num_bytes;
81  } else {
82  LogCvmfs(kLogS3Fanout, kLogDebug, "http status error code [info %p]: %s",
83  info, header_line.c_str());
84  if (header_line.length() < i+3) {
85  LogCvmfs(kLogS3Fanout, kLogStderr, "S3: invalid HTTP response '%s'",
86  header_line.c_str());
87  info->error_code = kFailOther;
88  return 0;
89  }
90  info->http_error = String2Int64(string(&header_line[i], 3));
91 
92  switch (info->http_error) {
93  case 429:
94  info->error_code = kFailRetry;
95  info->throttle_ms = S3FanoutManager::kDefault429ThrottleMs;
97  return num_bytes;
98  case 503:
99  case 502: // Can happen if the S3 gateway-backend connection breaks
100  case 500: // sometimes see this as a transient error from S3
102  break;
103  case 501:
104  case 400:
105  info->error_code = kFailBadRequest;
106  break;
107  case 403:
108  info->error_code = kFailForbidden;
109  break;
110  case 404:
111  info->error_code = kFailNotFound;
112  return num_bytes;
113  default:
114  info->error_code = kFailOther;
115  }
116  return 0;
117  }
118  }
119 
120  if (info->error_code == kFailRetry) {
121  S3FanoutManager::DetectThrottleIndicator(header_line, info);
122  }
123 
124  return num_bytes;
125 }
126 
127 
131 static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb,
132  void *info_link) {
133  const size_t num_bytes = size*nmemb;
134  JobInfo *info = static_cast<JobInfo *>(info_link);
135 
136  LogCvmfs(kLogS3Fanout, kLogDebug, "Data callback with %zu bytes", num_bytes);
137 
138  if (num_bytes == 0)
139  return 0;
140 
141  uint64_t read_bytes = info->origin->Read(ptr, num_bytes);
142 
144  "source buffer pushed out %lu bytes", read_bytes);
145 
146  return read_bytes;
147 }
148 
149 
153 static size_t CallbackCurlBody(
154  char * /*ptr*/, size_t size, size_t nmemb, void * /*userdata*/)
155 {
156  return size * nmemb;
157 }
158 
159 
163 int S3FanoutManager::CallbackCurlSocket(CURL *easy, curl_socket_t s, int action,
164  void *userp, void *socketp) {
165  S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(userp);
166  LogCvmfs(kLogS3Fanout, kLogDebug, "CallbackCurlSocket called with easy "
167  "handle %p, socket %d, action %d, up %p, "
168  "sp %p, fds_inuse %d, jobs %d",
169  easy, s, action, userp,
170  socketp, s3fanout_mgr->watch_fds_inuse_,
171  s3fanout_mgr->available_jobs_->Get());
172  if (action == CURL_POLL_NONE)
173  return 0;
174 
175  // Find s in watch_fds_
176  // First 2 fds are job and terminate pipes (not curl related)
177  unsigned index;
178  for (index = 2; index < s3fanout_mgr->watch_fds_inuse_; ++index) {
179  if (s3fanout_mgr->watch_fds_[index].fd == s)
180  break;
181  }
182  // Or create newly
183  if (index == s3fanout_mgr->watch_fds_inuse_) {
184  // Extend array if necessary
185  if (s3fanout_mgr->watch_fds_inuse_ == s3fanout_mgr->watch_fds_size_) {
186  s3fanout_mgr->watch_fds_size_ *= 2;
187  s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
188  srealloc(s3fanout_mgr->watch_fds_,
189  s3fanout_mgr->watch_fds_size_*sizeof(struct pollfd)));
190  }
191  s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].fd = s;
192  s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].events = 0;
193  s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].revents = 0;
194  s3fanout_mgr->watch_fds_inuse_++;
195  }
196 
197  switch (action) {
198  case CURL_POLL_IN:
199  s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI;
200  break;
201  case CURL_POLL_OUT:
202  s3fanout_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND;
203  break;
204  case CURL_POLL_INOUT:
205  s3fanout_mgr->watch_fds_[index].events =
206  POLLIN | POLLPRI | POLLOUT | POLLWRBAND;
207  break;
208  case CURL_POLL_REMOVE:
209  if (index < s3fanout_mgr->watch_fds_inuse_-1)
210  s3fanout_mgr->watch_fds_[index] =
211  s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_-1];
212  s3fanout_mgr->watch_fds_inuse_--;
213  // Shrink array if necessary
214  if ((s3fanout_mgr->watch_fds_inuse_ > s3fanout_mgr->watch_fds_max_) &&
215  (s3fanout_mgr->watch_fds_inuse_ < s3fanout_mgr->watch_fds_size_/2)) {
216  s3fanout_mgr->watch_fds_size_ /= 2;
217  s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
218  srealloc(s3fanout_mgr->watch_fds_,
219  s3fanout_mgr->watch_fds_size_*sizeof(struct pollfd)));
220  }
221  break;
222  default:
223  PANIC(NULL);
224  }
225 
226  return 0;
227 }
228 
229 
233 void *S3FanoutManager::MainUpload(void *data) {
234  LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread started");
235  S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(data);
236 
237  s3fanout_mgr->InitPipeWatchFds();
238 
239  // Don't schedule more jobs into the multi handle than the maximum number of
240  // parallel connections. This should prevent starvation and thus a timeout
241  // of the authorization header (CVM-1339).
242  unsigned jobs_in_flight = 0;
243 
244  while (true) {
245  // Check events with 100ms timeout
246  int timeout_ms = 100;
247  int retval = poll(s3fanout_mgr->watch_fds_, s3fanout_mgr->watch_fds_inuse_,
248  timeout_ms);
249  if (retval == 0) {
250  // Handle timeout
251  int still_running = 0;
252  retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
253  CURL_SOCKET_TIMEOUT,
254  0,
255  &still_running);
256  if (retval != CURLM_OK) {
257  LogCvmfs(kLogS3Fanout, kLogStderr, "Error, timeout due to: %d", retval);
258  assert(retval == CURLM_OK);
259  }
260  } else if (retval < 0) {
261  assert(errno == EINTR);
262  continue;
263  }
264 
265  // Terminate I/O thread
266  if (s3fanout_mgr->watch_fds_[0].revents)
267  break;
268 
269  // New job incoming
270  if (s3fanout_mgr->watch_fds_[1].revents) {
271  s3fanout_mgr->watch_fds_[1].revents = 0;
272  JobInfo *info;
273  ReadPipe(s3fanout_mgr->pipe_jobs_[0], &info, sizeof(info));
274  CURL *handle = s3fanout_mgr->AcquireCurlHandle();
275  if (handle == NULL) {
276  PANIC(kLogStderr, "Failed to acquire CURL handle.");
277  }
278  s3fanout::Failures init_failure =
279  s3fanout_mgr->InitializeRequest(info, handle);
280  if (init_failure != s3fanout::kFailOk) {
282  "Failed to initialize CURL handle (error: %d - %s | errno: %d)",
283  init_failure, Code2Ascii(init_failure), errno);
284  }
285  s3fanout_mgr->SetUrlOptions(info);
286 
287  curl_multi_add_handle(s3fanout_mgr->curl_multi_, handle);
288  s3fanout_mgr->active_requests_->insert(info);
289  jobs_in_flight++;
290  int still_running = 0, retval = 0;
291  retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
292  CURL_SOCKET_TIMEOUT,
293  0,
294  &still_running);
295 
297  "curl_multi_socket_action: %d - %d",
298  retval, still_running);
299  }
300 
301 
302  // Activity on curl sockets
303  // Within this loop the curl_multi_socket_action() may cause socket(s)
304  // to be removed from watch_fds_. If a socket is removed it is replaced
305  // by the socket at the end of the array and the inuse count is decreased.
306  // Therefore loop over the array in reverse order.
307  // First 2 fds are job and terminate pipes (not curl related)
308  for (int32_t i = s3fanout_mgr->watch_fds_inuse_ - 1; i >= 2; --i) {
309  if (static_cast<uint32_t>(i) >= s3fanout_mgr->watch_fds_inuse_) {
310  continue;
311  }
312  if (s3fanout_mgr->watch_fds_[i].revents) {
313  int ev_bitmask = 0;
314  if (s3fanout_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI))
315  ev_bitmask |= CURL_CSELECT_IN;
316  if (s3fanout_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
317  ev_bitmask |= CURL_CSELECT_OUT;
318  if (s3fanout_mgr->watch_fds_[i].revents &
319  (POLLERR | POLLHUP | POLLNVAL))
320  ev_bitmask |= CURL_CSELECT_ERR;
321  s3fanout_mgr->watch_fds_[i].revents = 0;
322 
323  int still_running = 0;
324  retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
325  s3fanout_mgr->watch_fds_[i].fd,
326  ev_bitmask,
327  &still_running);
328  }
329  }
330 
331  // Check if transfers are completed
332  CURLMsg *curl_msg;
333  int msgs_in_queue;
334  while ((curl_msg = curl_multi_info_read(s3fanout_mgr->curl_multi_,
335  &msgs_in_queue)))
336  {
337  assert(curl_msg->msg == CURLMSG_DONE);
338 
339  s3fanout_mgr->statistics_->num_requests++;
340  JobInfo *info;
341  CURL *easy_handle = curl_msg->easy_handle;
342  int curl_error = curl_msg->data.result;
343  curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
344 
345  curl_multi_remove_handle(s3fanout_mgr->curl_multi_, easy_handle);
346  if (s3fanout_mgr->VerifyAndFinalize(curl_error, info)) {
347  curl_multi_add_handle(s3fanout_mgr->curl_multi_, easy_handle);
348  int still_running = 0;
349  curl_multi_socket_action(s3fanout_mgr->curl_multi_,
350  CURL_SOCKET_TIMEOUT,
351  0,
352  &still_running);
353  } else {
354  // Return easy handle into pool and write result back
355  jobs_in_flight--;
356  s3fanout_mgr->active_requests_->erase(info);
357  s3fanout_mgr->ReleaseCurlHandle(info, easy_handle);
358  s3fanout_mgr->available_jobs_->Decrement();
359 
360  // Add to list of completed jobs
361  s3fanout_mgr->PushCompletedJob(info);
362  }
363  }
364  }
365 
366  set<CURL *>::iterator i = s3fanout_mgr->pool_handles_inuse_->begin();
367  const set<CURL *>::const_iterator i_end =
368  s3fanout_mgr->pool_handles_inuse_->end();
369  for (; i != i_end; ++i) {
370  curl_multi_remove_handle(s3fanout_mgr->curl_multi_, *i);
371  curl_easy_cleanup(*i);
372  }
373  s3fanout_mgr->pool_handles_inuse_->clear();
374  free(s3fanout_mgr->watch_fds_);
375 
376  LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread terminated");
377  return NULL;
378 }
379 
380 
385 CURL *S3FanoutManager::AcquireCurlHandle() const {
386  CURL *handle;
387 
388  MutexLockGuard guard(curl_handle_lock_);
389 
390  if (pool_handles_idle_->empty()) {
391  CURLcode retval;
392 
393  // Create a new handle
394  handle = curl_easy_init();
395  assert(handle != NULL);
396 
397  // Other settings
398  retval = curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
399  assert(retval == CURLE_OK);
400  retval = curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION,
402  assert(retval == CURLE_OK);
403  retval = curl_easy_setopt(handle, CURLOPT_READFUNCTION, CallbackCurlData);
404  assert(retval == CURLE_OK);
405  retval = curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CallbackCurlBody);
406  assert(retval == CURLE_OK);
407  } else {
408  handle = *(pool_handles_idle_->begin());
409  pool_handles_idle_->erase(pool_handles_idle_->begin());
410  }
411 
412  pool_handles_inuse_->insert(handle);
413 
414  return handle;
415 }
416 
417 
418 void S3FanoutManager::ReleaseCurlHandle(JobInfo *info, CURL *handle) const {
419  if (info->http_headers) {
420  curl_slist_free_all(info->http_headers);
421  info->http_headers = NULL;
422  }
423 
424  MutexLockGuard guard(curl_handle_lock_);
425 
426  set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
427  assert(elem != pool_handles_inuse_->end());
428 
429  if (pool_handles_idle_->size() > config_.pool_max_handles) {
430  CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, NULL);
431  assert(retval == CURLE_OK);
432  curl_easy_cleanup(handle);
433  std::map<CURL *, S3FanOutDnsEntry *>::size_type retitems =
434  curl_sharehandles_->erase(handle);
435  assert(retitems == 1);
436  } else {
437  pool_handles_idle_->insert(handle);
438  }
439 
440  pool_handles_inuse_->erase(elem);
441 }
442 
443 void S3FanoutManager::InitPipeWatchFds() {
444  assert(watch_fds_inuse_ == 0);
445  assert(watch_fds_size_ >= 2);
446  watch_fds_[0].fd = pipe_terminate_[0];
447  watch_fds_[0].events = POLLIN | POLLPRI;
448  watch_fds_[0].revents = 0;
449  ++watch_fds_inuse_;
450  watch_fds_[1].fd = pipe_jobs_[0];
451  watch_fds_[1].events = POLLIN | POLLPRI;
452  watch_fds_[1].revents = 0;
453  ++watch_fds_inuse_;
454 }
455 
460 bool S3FanoutManager::MkV2Authz(const JobInfo &info, vector<string> *headers)
461  const
462 {
463  string payload_hash;
464  bool retval = MkPayloadHash(info, &payload_hash);
465  if (!retval)
466  return false;
467  string content_type = GetContentType(info);
468  string request = GetRequestString(info);
469 
470  string timestamp = RfcTimestamp();
471  string to_sign = request + "\n" +
472  payload_hash + "\n" +
473  content_type + "\n" +
474  timestamp + "\n";
475  if (config_.x_amz_acl != "") {
476  to_sign += "x-amz-acl:" + config_.x_amz_acl + "\n" + // default ACL
477  "/" + config_.bucket + "/" + info.object_key;
478  }
479  LogCvmfs(kLogS3Fanout, kLogDebug, "%s string to sign for: %s",
480  request.c_str(), info.object_key.c_str());
481 
482  shash::Any hmac;
483  hmac.algorithm = shash::kSha1;
484  shash::Hmac(config_.secret_key,
485  reinterpret_cast<const unsigned char *>(to_sign.data()),
486  to_sign.length(), &hmac);
487 
488  headers->push_back("Authorization: AWS " + config_.access_key + ":" +
489  Base64(string(reinterpret_cast<char *>(hmac.digest),
490  hmac.GetDigestSize())));
491  headers->push_back("Date: " + timestamp);
492  headers->push_back("X-Amz-Acl: " + config_.x_amz_acl);
493  if (!payload_hash.empty())
494  headers->push_back("Content-MD5: " + payload_hash);
495  if (!content_type.empty())
496  headers->push_back("Content-Type: " + content_type);
497  return true;
498 }
499 
500 
501 string S3FanoutManager::GetUriEncode(const string &val, bool encode_slash)
502  const
503 {
504  string result;
505  const unsigned len = val.length();
506  result.reserve(len);
507  for (unsigned i = 0; i < len; ++i) {
508  char c = val[i];
509  if ((c >= 'A' && c <= 'Z') ||
510  (c >= 'a' && c <= 'z') ||
511  (c >= '0' && c <= '9') ||
512  c == '_' || c == '-' || c == '~' || c == '.')
513  {
514  result.push_back(c);
515  } else if (c == '/') {
516  if (encode_slash) {
517  result += "%2F";
518  } else {
519  result.push_back(c);
520  }
521  } else {
522  result.push_back('%');
523  result.push_back((c / 16) + ((c / 16 <= 9) ? '0' : 'A'-10));
524  result.push_back((c % 16) + ((c % 16 <= 9) ? '0' : 'A'-10));
525  }
526  }
527  return result;
528 }
529 
530 
531 string S3FanoutManager::GetAwsV4SigningKey(const string &date) const
532 {
533  if (last_signing_key_.first == date)
534  return last_signing_key_.second;
535 
536  string date_key = shash::Hmac256("AWS4" + config_.secret_key, date, true);
537  string date_region_key = shash::Hmac256(date_key, config_.region, true);
538  string date_region_service_key = shash::Hmac256(date_region_key, "s3", true);
539  string signing_key =
540  shash::Hmac256(date_region_service_key, "aws4_request", true);
541  last_signing_key_.first = date;
542  last_signing_key_.second = signing_key;
543  return signing_key;
544 }
545 
546 
551 bool S3FanoutManager::MkV4Authz(const JobInfo &info, vector<string> *headers)
552  const
553 {
554  string payload_hash;
555  bool retval = MkPayloadHash(info, &payload_hash);
556  if (!retval)
557  return false;
558  string content_type = GetContentType(info);
559  string timestamp = IsoTimestamp();
560  string date = timestamp.substr(0, 8);
561  vector<string> tokens = SplitString(complete_hostname_, ':');
562  assert(tokens.size() <= 2);
563  string canonical_hostname = tokens[0];
564 
565  // if we could split the hostname in two and if the port is *NOT* a default
566  // one
567  if (tokens.size() == 2 && !((String2Uint64(tokens[1]) == kDefaultHTTPPort) ||
568  (String2Uint64(tokens[1]) == kDefaultHTTPSPort)))
569  canonical_hostname += ":" + tokens[1];
570 
571  string signed_headers;
572  string canonical_headers;
573  if (!content_type.empty()) {
574  signed_headers += "content-type;";
575  headers->push_back("Content-Type: " + content_type);
576  canonical_headers += "content-type:" + content_type + "\n";
577  }
578  if (config_.x_amz_acl != "") {
579  signed_headers += "host;x-amz-acl;x-amz-content-sha256;x-amz-date";
580  } else {
581  signed_headers += "host;x-amz-content-sha256;x-amz-date";
582  }
583  canonical_headers +=
584  "host:" + canonical_hostname + "\n";
585  if (config_.x_amz_acl != "") {
586  canonical_headers += "x-amz-acl:" + config_.x_amz_acl +"\n";
587  }
588  canonical_headers += "x-amz-content-sha256:" + payload_hash + "\n" +
589  "x-amz-date:" + timestamp + "\n";
590 
591  string scope = date + "/" + config_.region + "/s3/aws4_request";
592  string uri = config_.dns_buckets ?
593  (string("/") + info.object_key) :
594  (string("/") + config_.bucket + "/" + info.object_key);
595 
596  string canonical_request =
597  GetRequestString(info) + "\n" +
598  GetUriEncode(uri, false) + "\n" +
599  "\n" +
600  canonical_headers + "\n" +
601  signed_headers + "\n" +
602  payload_hash;
603 
604  string hash_request = shash::Sha256String(canonical_request.c_str());
605 
606  string string_to_sign =
607  "AWS4-HMAC-SHA256\n" +
608  timestamp + "\n" +
609  scope + "\n" +
610  hash_request;
611 
612  string signing_key = GetAwsV4SigningKey(date);
613  string signature = shash::Hmac256(signing_key, string_to_sign);
614 
615  headers->push_back("X-Amz-Acl: " + config_.x_amz_acl);
616  headers->push_back("X-Amz-Content-Sha256: " + payload_hash);
617  headers->push_back("X-Amz-Date: " + timestamp);
618  headers->push_back(
619  "Authorization: AWS4-HMAC-SHA256 "
620  "Credential=" + config_.access_key + "/" + scope + ","
621  "SignedHeaders=" + signed_headers + ","
622  "Signature=" + signature);
623  return true;
624 }
625 
630 bool S3FanoutManager::MkAzureAuthz(const JobInfo &info, vector<string> *headers)
631  const
632 {
633  string timestamp = RfcTimestamp();
634  string canonical_headers =
635  "x-ms-blob-type:BlockBlob\nx-ms-date:" +
636  timestamp +
637  "\nx-ms-version:2011-08-18";
638  string canonical_resource =
639  "/" + config_.access_key + "/" + config_.bucket + "/" + info.object_key;
640 
641  string string_to_sign;
642  if ((info.request == JobInfo::kReqHeadOnly) ||
643  (info.request == JobInfo::kReqHeadPut) ||
644  (info.request == JobInfo::kReqDelete)) {
645  string_to_sign =
646  GetRequestString(info) +
647  string("\n\n\n") +
648  "\n\n\n\n\n\n\n\n\n" +
649  canonical_headers + "\n" +
650  canonical_resource;
651  } else {
652  string_to_sign =
653  GetRequestString(info) +
654  string("\n\n\n") +
655  string(StringifyInt(info.origin->GetSize())) + "\n\n\n\n\n\n\n\n\n" +
656  canonical_headers + "\n" +
657  canonical_resource;
658  }
659 
660  string signing_key;
661  int retval = Debase64(config_.secret_key, &signing_key);
662  if (!retval)
663  return false;
664 
665  string signature = shash::Hmac256(signing_key, string_to_sign, true);
666 
667  headers->push_back("x-ms-date: " + timestamp);
668  headers->push_back("x-ms-version: 2011-08-18");
669  headers->push_back(
670  "Authorization: SharedKey " + config_.access_key + ":" + Base64(signature));
671  headers->push_back("x-ms-blob-type: BlockBlob");
672  return true;
673 }
674 
675 void S3FanoutManager::InitializeDnsSettingsCurl(
676  CURL *handle,
677  CURLSH *sharehandle,
678  curl_slist *clist) const
679 {
680  CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, sharehandle);
681  assert(retval == CURLE_OK);
682  retval = curl_easy_setopt(handle, CURLOPT_RESOLVE, clist);
683  assert(retval == CURLE_OK);
684 }
685 
686 
687 int S3FanoutManager::InitializeDnsSettings(
688  CURL *handle,
689  std::string host_with_port) const
690 {
691  // Use existing handle
692  std::map<CURL *, S3FanOutDnsEntry *>::const_iterator it =
693  curl_sharehandles_->find(handle);
694  if (it != curl_sharehandles_->end()) {
695  InitializeDnsSettingsCurl(handle, it->second->sharehandle,
696  it->second->clist);
697  return 0;
698  }
699 
700  // Add protocol information for extraction of fields for DNS
701  if (!IsHttpUrl(host_with_port))
702  host_with_port = config_.protocol + "://" + host_with_port;
703  std::string remote_host = dns::ExtractHost(host_with_port);
704  std::string remote_port = dns::ExtractPort(host_with_port);
705 
706  // If we have the name already resolved, use the least used IP
707  S3FanOutDnsEntry *useme = NULL;
708  unsigned int usemin = UINT_MAX;
709  std::set<S3FanOutDnsEntry *>::iterator its3 = sharehandles_->begin();
710  for (; its3 != sharehandles_->end(); ++its3) {
711  if ((*its3)->dns_name == remote_host) {
712  if (usemin >= (*its3)->counter) {
713  usemin = (*its3)->counter;
714  useme = (*its3);
715  }
716  }
717  }
718  if (useme != NULL) {
719  curl_sharehandles_->insert(std::pair<CURL *,
720  S3FanOutDnsEntry *>(handle, useme));
721  useme->counter++;
722  InitializeDnsSettingsCurl(handle, useme->sharehandle, useme->clist);
723  return 0;
724  }
725 
726  // We need to resolve the hostname
727  // TODO(ssheikki): support ipv6 also... if (opt_ipv4_only_)
728  dns::Host host = resolver_->Resolve(remote_host);
729  set<string> ipv4_addresses = host.ipv4_addresses();
730  std::set<string>::iterator its = ipv4_addresses.begin();
731  S3FanOutDnsEntry *dnse = NULL;
732  for ( ; its != ipv4_addresses.end(); ++its) {
733  dnse = new S3FanOutDnsEntry();
734  dnse->counter = 0;
735  dnse->dns_name = remote_host;
736  dnse->port = remote_port.size() == 0 ? "80" : remote_port;
737  dnse->ip = *its;
738  dnse->clist = NULL;
739  dnse->clist = curl_slist_append(dnse->clist,
740  (dnse->dns_name+":"+
741  dnse->port+":"+
742  dnse->ip).c_str());
743  dnse->sharehandle = curl_share_init();
744  assert(dnse->sharehandle != NULL);
745  CURLSHcode share_retval = curl_share_setopt(dnse->sharehandle,
746  CURLSHOPT_SHARE,
747  CURL_LOCK_DATA_DNS);
748  assert(share_retval == CURLSHE_OK);
749  sharehandles_->insert(dnse);
750  }
751  if (dnse == NULL) {
753  "Error: DNS resolve failed for address '%s'.",
754  remote_host.c_str());
755  assert(dnse != NULL);
756  return -1;
757  }
758  curl_sharehandles_->insert(
759  std::pair<CURL *, S3FanOutDnsEntry *>(handle, dnse));
760  dnse->counter++;
761  InitializeDnsSettingsCurl(handle, dnse->sharehandle, dnse->clist);
762 
763  return 0;
764 }
765 
766 
767 bool S3FanoutManager::MkPayloadHash(const JobInfo &info, string *hex_hash)
768  const
769 {
770  if ((info.request == JobInfo::kReqHeadOnly) ||
771  (info.request == JobInfo::kReqHeadPut) ||
772  (info.request == JobInfo::kReqDelete))
773  {
774  switch (config_.authz_method) {
775  case kAuthzAwsV2:
776  hex_hash->clear();
777  break;
778  case kAuthzAwsV4:
779  // Sha256 over empty string
780  *hex_hash =
781  "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
782  break;
783  case kAuthzAzure:
784  // no payload hash required for Azure signature
785  hex_hash->clear();
786  break;
787  default:
788  PANIC(NULL);
789  }
790  return true;
791  }
792 
793  // PUT, there is actually payload
794  shash::Any payload_hash(shash::kMd5);
795 
796  unsigned char *data;
797  unsigned int nbytes =
798  info.origin->Data(reinterpret_cast<void **>(&data),
799  info.origin->GetSize(), 0);
800  assert(nbytes == info.origin->GetSize());
801 
802  switch (config_.authz_method) {
803  case kAuthzAwsV2:
804  shash::HashMem(data, nbytes, &payload_hash);
805  *hex_hash =
806  Base64(string(reinterpret_cast<char *>(payload_hash.digest),
807  payload_hash.GetDigestSize()));
808  return true;
809  case kAuthzAwsV4:
810  *hex_hash =
811  shash::Sha256Mem(data, nbytes);
812  return true;
813  case kAuthzAzure:
814  // no payload hash required for Azure signature
815  hex_hash->clear();
816  return true;
817  default:
818  PANIC(NULL);
819  }
820 }
821 
822 string S3FanoutManager::GetRequestString(const JobInfo &info) const {
823  switch (info.request) {
824  case JobInfo::kReqHeadOnly:
825  case JobInfo::kReqHeadPut:
826  return "HEAD";
827  case JobInfo::kReqPutCas:
828  case JobInfo::kReqPutDotCvmfs:
829  case JobInfo::kReqPutHtml:
830  case JobInfo::kReqPutBucket:
831  return "PUT";
832  case JobInfo::kReqDelete:
833  return "DELETE";
834  default:
835  PANIC(NULL);
836  }
837 }
838 
839 
840 string S3FanoutManager::GetContentType(const JobInfo &info) const {
841  switch (info.request) {
842  case JobInfo::kReqHeadOnly:
843  case JobInfo::kReqHeadPut:
844  case JobInfo::kReqDelete:
845  return "";
846  case JobInfo::kReqPutCas:
847  return "application/octet-stream";
848  case JobInfo::kReqPutDotCvmfs:
849  return "application/x-cvmfs";
850  case JobInfo::kReqPutHtml:
851  return "text/html";
852  case JobInfo::kReqPutBucket:
853  return "text/xml";
854  default:
855  PANIC(NULL);
856  }
857 }
858 
859 
864 Failures S3FanoutManager::InitializeRequest(JobInfo *info, CURL *handle) const {
865  // Initialize internal download state
866  info->curl_handle = handle;
867  info->error_code = kFailOk;
868  info->http_error = 0;
869  info->num_retries = 0;
870  info->backoff_ms = 0;
871  info->throttle_ms = 0;
872  info->throttle_timestamp = 0;
873  info->http_headers = NULL;
874  // info->payload_size is needed in S3Uploader::MainCollectResults,
875  // where info->origin is already destroyed.
876  info->payload_size = info->origin->GetSize();
877 
878  InitializeDnsSettings(handle, complete_hostname_);
879 
880  CURLcode retval;
881  if ((info->request == JobInfo::kReqHeadOnly) ||
882  (info->request == JobInfo::kReqHeadPut) ||
883  (info->request == JobInfo::kReqDelete))
884  {
885  retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 0);
886  assert(retval == CURLE_OK);
887  retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
888  assert(retval == CURLE_OK);
889 
890  if (info->request == JobInfo::kReqDelete)
891  {
892  retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST,
893  GetRequestString(*info).c_str());
894  assert(retval == CURLE_OK);
895  } else {
896  retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
897  assert(retval == CURLE_OK);
898  }
899  } else {
900  retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
901  assert(retval == CURLE_OK);
902  retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 1);
903  assert(retval == CURLE_OK);
904  retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 0);
905  assert(retval == CURLE_OK);
906  retval = curl_easy_setopt(handle, CURLOPT_INFILESIZE_LARGE,
907  static_cast<curl_off_t>(info->origin->GetSize()));
908  assert(retval == CURLE_OK);
909 
910  if (info->request == JobInfo::kReqPutDotCvmfs) {
911  info->http_headers =
912  curl_slist_append(info->http_headers, kCacheControlDotCvmfs);
913  } else if (info->request == JobInfo::kReqPutCas) {
914  info->http_headers =
915  curl_slist_append(info->http_headers, kCacheControlCas);
916  }
917  }
918 
919  bool retval_b;
920 
921  // Authorization
922  vector<string> authz_headers;
923  switch (config_.authz_method) {
924  case kAuthzAwsV2:
925  retval_b = MkV2Authz(*info, &authz_headers);
926  break;
927  case kAuthzAwsV4:
928  retval_b = MkV4Authz(*info, &authz_headers);
929  break;
930  case kAuthzAzure:
931  retval_b = MkAzureAuthz(*info, &authz_headers);
932  break;
933  default:
934  PANIC(NULL);
935  }
936  if (!retval_b)
937  return kFailLocalIO;
938  for (unsigned i = 0; i < authz_headers.size(); ++i) {
939  info->http_headers =
940  curl_slist_append(info->http_headers, authz_headers[i].c_str());
941  }
942 
943  // Common headers
944  info->http_headers =
945  curl_slist_append(info->http_headers, "Connection: Keep-Alive");
946  info->http_headers = curl_slist_append(info->http_headers, "Pragma:");
947  // No 100-continue
948  info->http_headers = curl_slist_append(info->http_headers, "Expect:");
949  // Strip unnecessary header
950  info->http_headers = curl_slist_append(info->http_headers, "Accept:");
951  info->http_headers = curl_slist_append(info->http_headers,
952  user_agent_->c_str());
953 
954  // Set curl parameters
955  retval = curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
956  assert(retval == CURLE_OK);
957  retval = curl_easy_setopt(handle, CURLOPT_HEADERDATA,
958  static_cast<void *>(info));
959  assert(retval == CURLE_OK);
960  retval = curl_easy_setopt(handle, CURLOPT_READDATA,
961  static_cast<void *>(info));
962  assert(retval == CURLE_OK);
963  retval = curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->http_headers);
964  assert(retval == CURLE_OK);
965  if (opt_ipv4_only_) {
966  retval = curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
967  assert(retval == CURLE_OK);
968  }
969  // Follow HTTP redirects
970  retval = curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L);
971  assert(retval == CURLE_OK);
972 
973  retval = curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, info->errorbuffer);
974  assert(retval == CURLE_OK);
975 
976  if (config_.protocol == "https") {
977  retval = curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 1L);
978  assert(retval == CURLE_OK);
979  retval = curl_easy_setopt(handle, CURLOPT_PROXY_SSL_VERIFYPEER, 1L);
980  assert(retval == CURLE_OK);
981  bool add_cert = ssl_certificate_store_.ApplySslCertificatePath(handle);
982  assert(add_cert);
983  }
984 
985  return kFailOk;
986 }
987 
988 
992 void S3FanoutManager::SetUrlOptions(JobInfo *info) const {
993  CURL *curl_handle = info->curl_handle;
994  CURLcode retval;
995 
996  retval = curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT,
997  config_.opt_timeout_sec);
998  assert(retval == CURLE_OK);
999  retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT,
1000  kLowSpeedLimit);
1001  assert(retval == CURLE_OK);
1002  retval = curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME,
1003  config_.opt_timeout_sec);
1004  assert(retval == CURLE_OK);
1005 
1006  if (is_curl_debug_) {
1007  retval = curl_easy_setopt(curl_handle, CURLOPT_VERBOSE, 1);
1008  assert(retval == CURLE_OK);
1009  }
1010 
1011  string url = MkUrl(info->object_key);
1012  retval = curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
1013  assert(retval == CURLE_OK);
1014 
1015  retval = curl_easy_setopt(curl_handle, CURLOPT_PROXY, config_.proxy.c_str());
1016  assert(retval == CURLE_OK);
1017 }
1018 
1019 
1023 void S3FanoutManager::UpdateStatistics(CURL *handle) {
1024  double val;
1025 
1026  if (curl_easy_getinfo(handle, CURLINFO_SIZE_UPLOAD, &val) == CURLE_OK)
1027  statistics_->transferred_bytes += val;
1028 }
1029 
1030 
1034 bool S3FanoutManager::CanRetry(const JobInfo *info) {
1035  return
1036  (info->error_code == kFailHostConnection ||
1037  info->error_code == kFailHostResolve ||
1039  info->error_code == kFailRetry) &&
1040  (info->num_retries < config_.opt_max_retries);
1041 }
1042 
1043 
1049 void S3FanoutManager::Backoff(JobInfo *info) {
1050  if (info->error_code != kFailRetry)
1051  info->num_retries++;
1052  statistics_->num_retries++;
1053 
1054  if (info->throttle_ms > 0) {
1055  LogCvmfs(kLogS3Fanout, kLogDebug, "throttling for %d ms",
1056  info->throttle_ms);
1057  uint64_t now = platform_monotonic_time();
1058  if ((info->throttle_timestamp + (info->throttle_ms / 1000)) >= now) {
1059  if ((now - timestamp_last_throttle_report_) > kThrottleReportIntervalSec)
1060  {
1062  "Warning: S3 backend throttling %ums "
1063  "(total backoff time so far %lums)",
1064  info->throttle_ms,
1065  statistics_->ms_throttled);
1066  timestamp_last_throttle_report_ = now;
1067  }
1068  statistics_->ms_throttled += info->throttle_ms;
1069  SafeSleepMs(info->throttle_ms);
1070  }
1071  } else {
1072  if (info->backoff_ms == 0) {
1073  // Must be != 0
1074  info->backoff_ms = prng_.Next(config_.opt_backoff_init_ms + 1);
1075  } else {
1076  info->backoff_ms *= 2;
1077  }
1078  if (info->backoff_ms > config_.opt_backoff_max_ms)
1079  info->backoff_ms = config_.opt_backoff_max_ms;
1080 
1081  LogCvmfs(kLogS3Fanout, kLogDebug, "backing off for %d ms",
1082  info->backoff_ms);
1083  SafeSleepMs(info->backoff_ms);
1084  }
1085 }
1086 
1087 
1094 bool S3FanoutManager::VerifyAndFinalize(const int curl_error, JobInfo *info) {
1095  LogCvmfs(kLogS3Fanout, kLogDebug, "Verify uploaded/tested object %s "
1096  "(curl error %d, info error %d, info request %d)",
1097  info->object_key.c_str(),
1098  curl_error, info->error_code, info->request);
1099  UpdateStatistics(info->curl_handle);
1100 
1101  // Verification and error classification
1102  switch (curl_error) {
1103  case CURLE_OK:
1104  if ((info->error_code != kFailRetry) &&
1105  (info->error_code != kFailNotFound))
1106  {
1107  info->error_code = kFailOk;
1108  }
1109  break;
1110  case CURLE_UNSUPPORTED_PROTOCOL:
1111  case CURLE_URL_MALFORMAT:
1112  info->error_code = kFailBadRequest;
1113  break;
1114  case CURLE_COULDNT_RESOLVE_HOST:
1115  info->error_code = kFailHostResolve;
1116  break;
1117  case CURLE_COULDNT_CONNECT:
1118  case CURLE_OPERATION_TIMEDOUT:
1119  case CURLE_SEND_ERROR:
1120  case CURLE_RECV_ERROR:
1122  break;
1123  case CURLE_ABORTED_BY_CALLBACK:
1124  case CURLE_WRITE_ERROR:
1125  // Error set by callback
1126  break;
1127  default:
1129  "unexpected curl error (%d) while trying to upload %s: %s",
1130  curl_error, info->object_key.c_str(), info->errorbuffer);
1131  info->error_code = kFailOther;
1132  break;
1133  }
1134 
1135  // Transform HEAD to PUT request
1136  if ((info->error_code == kFailNotFound) &&
1137  (info->request == JobInfo::kReqHeadPut))
1138  {
1139  LogCvmfs(kLogS3Fanout, kLogDebug, "not found: %s, uploading",
1140  info->object_key.c_str());
1141  info->request = JobInfo::kReqPutCas;
1142  curl_slist_free_all(info->http_headers);
1143  info->http_headers = NULL;
1144  s3fanout::Failures init_failure = InitializeRequest(info,
1145  info->curl_handle);
1146 
1147  if (init_failure != s3fanout::kFailOk) {
1148  PANIC(kLogStderr,
1149  "Failed to initialize CURL handle "
1150  "(error: %d - %s | errno: %d)",
1151  init_failure, Code2Ascii(init_failure), errno);
1152  }
1153  SetUrlOptions(info);
1154  // Reset origin
1155  info->origin->Rewind();
1156  return true; // Again, Put
1157  }
1158 
1159  // Determination if failed request should be repeated
1160  bool try_again = false;
1161  if (info->error_code != kFailOk) {
1162  try_again = CanRetry(info);
1163  }
1164  if (try_again) {
1165  if (info->request == JobInfo::kReqPutCas ||
1166  info->request == JobInfo::kReqPutDotCvmfs ||
1167  info->request == JobInfo::kReqPutHtml) {
1168  LogCvmfs(kLogS3Fanout, kLogDebug, "Trying again to upload %s",
1169  info->object_key.c_str());
1170  // Reset origin
1171  info->origin->Rewind();
1172  }
1173  Backoff(info);
1174  info->error_code = kFailOk;
1175  info->http_error = 0;
1176  info->throttle_ms = 0;
1177  info->backoff_ms = 0;
1178  info->throttle_timestamp = 0;
1179  return true; // try again
1180  }
1181 
1182  // Cleanup opened resources
1183  info->origin.Destroy();
1184 
1185  if ((info->error_code != kFailOk) &&
1186  (info->http_error != 0) && (info->http_error != 404))
1187  {
1188  LogCvmfs(kLogS3Fanout, kLogStderr, "S3: HTTP failure %d", info->http_error);
1189  }
1190  return false; // stop transfer
1191 }
1192 
1193 S3FanoutManager::S3FanoutManager(const S3Config &config) : config_(config) {
1194  atomic_init32(&multi_threaded_);
1198 
1199  int retval;
1200  jobs_todo_lock_ =
1201  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1202  retval = pthread_mutex_init(jobs_todo_lock_, NULL);
1203  assert(retval == 0);
1205  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1206  retval = pthread_mutex_init(curl_handle_lock_, NULL);
1207  assert(retval == 0);
1208 
1209  active_requests_ = new set<JobInfo *>;
1210  pool_handles_idle_ = new set<CURL *>;
1211  pool_handles_inuse_ = new set<CURL *>;
1212  curl_sharehandles_ = new map<CURL *, S3FanOutDnsEntry *>;
1213  sharehandles_ = new set<S3FanOutDnsEntry *>;
1217  assert(NULL != available_jobs_);
1218 
1219  statistics_ = new Statistics();
1220  user_agent_ = new string();
1221  *user_agent_ = "User-Agent: cvmfs " + string(CVMFS_VERSION);
1223 
1224  CURLcode cretval = curl_global_init(CURL_GLOBAL_ALL);
1225  assert(cretval == CURLE_OK);
1226  curl_multi_ = curl_multi_init();
1227  assert(curl_multi_ != NULL);
1228  CURLMcode mretval;
1229  mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION,
1231  assert(mretval == CURLM_OK);
1232  mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1233  static_cast<void *>(this));
1234  assert(mretval == CURLM_OK);
1235  mretval = curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1237  assert(mretval == CURLM_OK);
1238 
1239  prng_.InitLocaltime();
1240 
1241  thread_upload_ = 0;
1243  is_curl_debug_ = (getenv("_CVMFS_CURL_DEBUG") != NULL);
1244 
1245  // Parsing environment variables
1246  if ((getenv("CVMFS_IPV4_ONLY") != NULL) &&
1247  (strlen(getenv("CVMFS_IPV4_ONLY")) > 0)) {
1248  opt_ipv4_only_ = true;
1249  } else {
1250  opt_ipv4_only_ = false;
1251  }
1252 
1254 
1255  watch_fds_ = static_cast<struct pollfd *>(smalloc(4 * sizeof(struct pollfd)));
1256  watch_fds_size_ = 4;
1257  watch_fds_inuse_ = 0;
1258 
1260 }
1261 
1263  pthread_mutex_destroy(jobs_todo_lock_);
1264  free(jobs_todo_lock_);
1265  pthread_mutex_destroy(curl_handle_lock_);
1266  free(curl_handle_lock_);
1267 
1268  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1269  // Shutdown I/O thread
1270  char buf = 'T';
1271  WritePipe(pipe_terminate_[1], &buf, 1);
1272  pthread_join(thread_upload_, NULL);
1273  }
1277 
1278  set<CURL *>::iterator i = pool_handles_idle_->begin();
1279  const set<CURL *>::const_iterator iEnd = pool_handles_idle_->end();
1280  for (; i != iEnd; ++i) {
1281  curl_easy_cleanup(*i);
1282  }
1283 
1284  set<S3FanOutDnsEntry *>::iterator is = sharehandles_->begin();
1285  const set<S3FanOutDnsEntry *>::const_iterator isEnd = sharehandles_->end();
1286  for (; is != isEnd; ++is) {
1287  curl_share_cleanup((*is)->sharehandle);
1288  curl_slist_free_all((*is)->clist);
1289  delete *is;
1290  }
1291  pool_handles_idle_->clear();
1292  curl_sharehandles_->clear();
1293  sharehandles_->clear();
1294  delete active_requests_;
1295  delete pool_handles_idle_;
1296  delete pool_handles_inuse_;
1297  delete curl_sharehandles_;
1298  delete sharehandles_;
1299  delete user_agent_;
1300  curl_multi_cleanup(curl_multi_);
1301 
1302  delete statistics_;
1303 
1304  delete available_jobs_;
1305 
1306  curl_global_cleanup();
1307 }
1308 
1313  LogCvmfs(kLogS3Fanout, kLogDebug, "S3FanoutManager spawned");
1314 
1315  int retval = pthread_create(&thread_upload_, NULL, MainUpload,
1316  static_cast<void *>(this));
1317  assert(retval == 0);
1318 
1319  atomic_inc32(&multi_threaded_);
1320 }
1321 
1323  return *statistics_;
1324 }
1325 
1331  WritePipe(pipe_jobs_[1], &info, sizeof(info));
1332 }
1333 
1338  WritePipe(pipe_completed_[1], &info, sizeof(info));
1339 }
1340 
1345  JobInfo *info;
1346  ReadPipe(pipe_completed_[0], &info, sizeof(info));
1347  return info;
1348 }
1349 
1350 //------------------------------------------------------------------------------
1351 
1352 
1353 string Statistics::Print() const {
1354  return
1355  "Transferred Bytes: " +
1356  StringifyInt(uint64_t(transferred_bytes)) + "\n" +
1357  "Transfer duration: " +
1358  StringifyInt(uint64_t(transfer_time)) + " s\n" +
1359  "Number of requests: " +
1360  StringifyInt(num_requests) + "\n" +
1361  "Number of retries: " +
1362  StringifyInt(num_retries) + "\n";
1363 }
1364 
1365 } // namespace s3fanout
unsigned throttle_ms
Definition: s3fanout.h:152
const S3Config config_
Definition: s3fanout.h:281
const char * Code2Ascii(const ObjectFetcherFailures::Failures error)
pthread_mutex_t * jobs_todo_lock_
Definition: s3fanout.h:238
atomic_int32 multi_threaded_
Definition: s3fanout.h:305
std::string ip
Definition: s3fanout.h:163
pthread_mutex_t * curl_handle_lock_
Definition: s3fanout.h:239
string Sha256String(const string &content)
Definition: hash.cc:452
std::set< CURL * > * pool_handles_idle_
Definition: s3fanout.h:290
struct curl_slist * http_headers
Definition: s3fanout.h:143
static size_t CallbackCurlBody(char *, size_t size, size_t nmemb, void *)
Definition: s3fanout.cc:153
#define PANIC(...)
Definition: exception.h:29
std::string IsoTimestamp()
Definition: string.cc:167
dns::CaresResolver * resolver_
Definition: s3fanout.h:294
string Trim(const string &raw, bool trim_newline)
Definition: string.cc:446
CURLSH * sharehandle
Definition: s3fanout.h:166
double transferred_bytes
Definition: s3fanout.h:76
void InitLocaltime()
Definition: prng.h:39
bool IsHttpUrl(const std::string &path)
Definition: posix.cc:168
void Hmac(const string &key, const unsigned char *buffer, const unsigned buffer_size, Any *any_digest)
Definition: hash.cc:274
perf::Statistics * statistics_
Definition: repository.h:139
static void * MainUpload(void *data)
Definition: s3fanout.cc:233
std::string dns_name
Definition: s3fanout.h:162
const std::string object_key
Definition: s3fanout.h:108
assert((mem||(size==0))&&"Out Of Memory")
const std::set< std::string > & ipv4_addresses() const
Definition: dns.h:111
std::string MkCompleteHostname()
Definition: s3fanout.h:273
void PushNewJob(JobInfo *info)
Definition: s3fanout.cc:1329
Algorithms algorithm
Definition: hash.h:125
SynchronizingCounter< uint32_t > Semaphore
Definition: s3fanout.h:172
unsigned int counter
Definition: s3fanout.h:161
std::string Print() const
Definition: s3fanout.cc:1353
bool Debase64(const string &data, string *decoded)
Definition: string.cc:582
void MakePipe(int pipe_fd[2])
Definition: posix.cc:492
unsigned char digest[digest_size_]
Definition: hash.h:124
struct curl_slist * clist
Definition: s3fanout.h:165
std::string * user_agent_
Definition: s3fanout.h:296
Failures error_code
Definition: s3fanout.h:146
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:138
unsigned backoff_ms
Definition: s3fanout.h:150
unsigned int max_available_jobs_
Definition: s3fanout.h:326
std::string RfcTimestamp()
Definition: string.cc:145
int64_t String2Int64(const string &value)
Definition: string.cc:240
unsigned GetDigestSize() const
Definition: hash.h:168
std::set< JobInfo * > * active_requests_
Definition: s3fanout.h:288
void PushCompletedJob(JobInfo *info)
Definition: s3fanout.cc:1337
string Sha256Mem(const unsigned char *buffer, const unsigned buffer_size)
Definition: hash.cc:442
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:308
uint64_t payload_size
Definition: s3fanout.h:144
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb, void *info_link)
Definition: download.cc:257
bool HasSuffix(const std::string &str, const std::string &suffix, const bool ignore_case)
Definition: string.cc:299
void UseSystemCertificatePath()
Definition: ssl.cc:68
void SetUrlOptions(JobInfo *info) const
Definition: s3fanout.cc:992
Definition: dns.h:90
CURL * AcquireCurlHandle() const
Definition: s3fanout.cc:385
uint64_t throttle_timestamp
Definition: s3fanout.h:154
Semaphore * available_jobs_
Definition: s3fanout.h:327
static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, void *userp, void *socketp)
Definition: s3fanout.cc:163
UniquePtr< FileBackedBuffer > origin
Definition: s3fanout.h:110
void ReleaseCurlHandle(JobInfo *info, CURL *handle) const
Definition: s3fanout.cc:418
char * errorbuffer
Definition: s3fanout.h:155
string StringifyInt(const int64_t value)
Definition: string.cc:78
struct pollfd * watch_fds_
Definition: s3fanout.h:307
uint64_t platform_monotonic_time()
std::string ExtractPort(const std::string &url)
Definition: dns.cc:125
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:285
JobInfo * PopCompletedJob()
Definition: s3fanout.cc:1344
std::string ExtractHost(const std::string &url)
Definition: dns.cc:110
unsigned char num_retries
Definition: s3fanout.h:148
std::string port
Definition: s3fanout.h:164
void HashMem(const unsigned char *buffer, const unsigned buffer_size, Any *any_digest)
Definition: hash.cc:255
std::string complete_hostname_
Definition: s3fanout.h:282
uint64_t num_requests
Definition: s3fanout.h:78
string Base64(const string &data)
Definition: string.cc:522
RequestType request
Definition: s3fanout.h:145
uint64_t String2Uint64(const string &value)
Definition: string.cc:246
double transfer_time
Definition: s3fanout.h:77
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
Definition: s3fanout.cc:1094
uint64_t num_retries
Definition: s3fanout.h:79
Definition: mutex.h:42
static CaresResolver * Create(const bool ipv4_only, const unsigned retries, const unsigned timeout_ms)
Definition: dns.cc:743
std::set< S3FanOutDnsEntry * > * sharehandles_
Definition: s3fanout.h:292
SslCertificateStore ssl_certificate_store_
Definition: s3fanout.h:341
const Statistics & GetStatistics()
Definition: s3fanout.cc:1322
std::string Hmac256(const std::string &key, const std::string &content, bool raw_output)
Definition: hash.cc:458
void SafeSleepMs(const unsigned ms)
Definition: posix.cc:2049
Statistics * statistics_
Definition: s3fanout.h:331
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:501
uint64_t timestamp_last_throttle_report_
Definition: s3fanout.h:334
static void size_t size
Definition: smalloc.h:54
std::set< CURL * > * pool_handles_inuse_
Definition: s3fanout.h:291
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:513
Definition: s3fanout.h:158
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:562
std::map< CURL *, S3FanOutDnsEntry * > * curl_sharehandles_
Definition: s3fanout.h:293
CURL * curl_handle
Definition: s3fanout.h:142
void Destroy()
Definition: pointer.h:45
Failures InitializeRequest(JobInfo *info, CURL *handle) const
Definition: s3fanout.cc:864
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528