CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
s3fanout.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_NETWORK_S3FANOUT_H_
6 #define CVMFS_NETWORK_S3FANOUT_H_
7 
8 #include <poll.h>
9 #include <semaphore.h>
10 
11 #include <climits>
12 #include <cstdlib>
13 #include <map>
14 #include <set>
15 #include <string>
16 #include <utility>
17 #include <vector>
18 
19 #include "dns.h"
20 #include "duplex_curl.h"
21 #include "ssl.h"
22 #include "util/concurrency.h"
24 #include "util/mmap_file.h"
25 #include "util/pointer.h"
26 #include "util/prng.h"
27 #include "util/single_copy.h"
28 #include "util/smalloc.h"
29 
30 namespace s3fanout {
31 
36 };
37 
41 enum Failures {
42  kFailOk = 0,
52 
54 }; // Failures
55 
56 
57 inline const char *Code2Ascii(const Failures error) {
58  const char *texts[kFailNumEntries + 1];
59  texts[0] = "S3: OK";
60  texts[1] = "S3: local I/O failure";
61  texts[2] = "S3: malformed URL (bad request)";
62  texts[3] = "S3: forbidden";
63  texts[4] = "S3: failed to resolve host address";
64  texts[5] = "S3: host connection problem";
65  texts[6] = "S3: not found";
66  texts[7] = "S3: service not available";
67  texts[8] = "S3: unknown service error, perhaps wrong authentication protocol";
68  texts[8] = "S3: too many requests, service asks for backoff and retry";
69  texts[9] = "no text";
70  return texts[error];
71 }
72 
73 
74 
75 struct Statistics {
77  double transfer_time;
78  uint64_t num_requests;
79  uint64_t num_retries;
80  uint64_t ms_throttled; // Total waiting time imposed by HTTP 429 replies
81 
83  transferred_bytes = 0.0;
84  transfer_time = 0.0;
85  num_requests = 0;
86  num_retries = 0;
87  ms_throttled = 0;
88  }
89 
90  std::string Print() const;
91 }; // Statistics
92 
93 
97 struct JobInfo : SingleCopy {
98  enum RequestType {
99  kReqHeadOnly = 0, // peek
100  kReqHeadPut, // conditional upload of content-addressed objects
101  kReqPutCas, // immutable data object
102  kReqPutDotCvmfs, // one of the /.cvmfs... top level files
103  kReqPutHtml, // HTML file - display instead of downloading
104  kReqPutBucket, // bucket creation
106  };
107 
108  const std::string object_key;
109  void *callback; // Callback to be called when job is finished
111 
112  // One constructor per destination
114  const std::string &object_key,
115  void *callback,
117  : object_key(object_key),
118  origin(origin)
119  {
120  JobInfoInit();
121  this->callback = callback;
122  }
123  void JobInfoInit() {
124  curl_handle = NULL;
125  http_headers = NULL;
126  callback = NULL;
129  http_error = 0;
130  num_retries = 0;
131  backoff_ms = 0;
132  throttle_ms = 0;
133  throttle_timestamp = 0;
134  errorbuffer =
135  reinterpret_cast<char *>(smalloc(sizeof(char) * CURL_ERROR_SIZE));
136  }
138  free(errorbuffer);
139  }
140 
141  // Internal state, don't touch
142  CURL *curl_handle;
143  struct curl_slist *http_headers;
144  uint64_t payload_size;
148  unsigned char num_retries;
149  // Exponential backoff with cutoff in case of errors
150  unsigned backoff_ms;
151  // Throttle imposed by HTTP 429 reply; mutually exclusive with backoff_ms
152  unsigned throttle_ms;
153  // Remember when the 429 reply came in to only throttle if still necessary
155  char *errorbuffer;
156 }; // JobInfo
157 
159  S3FanOutDnsEntry() : counter(0), dns_name(), ip(), port("80"),
160  clist(NULL), sharehandle(NULL) {}
161  unsigned int counter;
162  std::string dns_name;
163  std::string ip;
164  std::string port;
165  struct curl_slist *clist;
166  CURLSH *sharehandle;
167 }; // S3FanOutDnsEntry
168 
169 
171  protected:
173 
174  public:
175  // 250ms pause after HTTP 429 "Too Many Retries"
176  static const unsigned kDefault429ThrottleMs;
177  // Don't throttle for more than a few seconds
178  static const unsigned kMax429ThrottleMs;
179  // Report throttle operations only every so often
180  static const unsigned kThrottleReportIntervalSec;
181  static const unsigned kDefaultHTTPPort;
182  static const unsigned kDefaultHTTPSPort;
183 
184  struct S3Config {
187  dns_buckets = true;
188  protocol = "http";
189  pool_max_handles = 0;
190  opt_timeout_sec = 20;
191  opt_max_retries = 3;
192  opt_backoff_init_ms = 100;
193  opt_backoff_max_ms = 2000;
194  x_amz_acl = "public-read";
195  }
196  std::string access_key;
197  std::string secret_key;
198  std::string hostname_port;
200  std::string region;
201  std::string flavor;
202  std::string bucket;
204  std::string protocol;
206  unsigned opt_timeout_sec;
207  unsigned opt_max_retries;
210  std::string proxy;
211  std::string x_amz_acl;
212  };
213 
214  static void DetectThrottleIndicator(const std::string &header, JobInfo *info);
215 
216  explicit S3FanoutManager(const S3Config &config);
217 
219 
220  void Spawn();
221 
222  void PushNewJob(JobInfo *info);
223  void PushCompletedJob(JobInfo *info);
225 
226  const Statistics &GetStatistics();
227 
228  private:
229  // Reflects the default Apache configuration of the local backend
230  static const char *kCacheControlCas; // Cache-Control: max-age=259200
231  static const char *kCacheControlDotCvmfs; // Cache-Control: max-age=61
232  static const unsigned kLowSpeedLimit = 1024; // Require at least 1kB/s
233 
234  static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action,
235  void *userp, void *socketp);
236  static void *MainUpload(void *data);
237  std::vector<s3fanout::JobInfo*> jobs_todo_;
238  pthread_mutex_t *jobs_todo_lock_;
239  pthread_mutex_t *curl_handle_lock_;
240 
241  CURL *AcquireCurlHandle() const;
242  void ReleaseCurlHandle(JobInfo *info, CURL *handle) const;
243  void InitPipeWatchFds();
244  int InitializeDnsSettings(CURL *handle,
245  std::string remote_host) const;
246  void InitializeDnsSettingsCurl(CURL *handle, CURLSH *sharehandle,
247  curl_slist *clist) const;
248  Failures InitializeRequest(JobInfo *info, CURL *handle) const;
249  void SetUrlOptions(JobInfo *info) const;
250  void UpdateStatistics(CURL *handle);
251  bool CanRetry(const JobInfo *info);
252  void Backoff(JobInfo *info);
253  bool VerifyAndFinalize(const int curl_error, JobInfo *info);
254  std::string GetRequestString(const JobInfo &info) const;
255  std::string GetContentType(const JobInfo &info) const;
256  std::string GetUriEncode(const std::string &val, bool encode_slash) const;
257  std::string GetAwsV4SigningKey(const std::string &date) const;
258  bool MkPayloadHash(const JobInfo &info, std::string *hex_hash) const;
259  bool MkV2Authz(const JobInfo &info,
260  std::vector<std::string> *headers) const;
261  bool MkV4Authz(const JobInfo &info,
262  std::vector<std::string> *headers) const;
263  bool MkAzureAuthz(const JobInfo &info,
264  std::vector<std::string> *headers) const;
265  std::string MkUrl(const std::string &objkey) const {
266  if (config_.dns_buckets) {
267  return config_.protocol + "://" + complete_hostname_ + "/" + objkey;
268  } else {
269  return config_.protocol + "://" + complete_hostname_ + "/" +
270  config_.bucket + "/" + objkey;
271  }
272  }
273  std::string MkCompleteHostname() {
274  if (config_.dns_buckets) {
275  return config_.bucket + "." + config_.hostname_port;
276  } else {
277  return config_.hostname_port;
278  }
279  }
280 
282  std::string complete_hostname_;
283 
288  std::set<JobInfo *> *active_requests_;
289 
290  std::set<CURL *> *pool_handles_idle_;
291  std::set<CURL *> *pool_handles_inuse_;
292  std::set<S3FanOutDnsEntry *> *sharehandles_;
293  std::map<CURL *, S3FanOutDnsEntry *> *curl_sharehandles_;
295  CURLM *curl_multi_;
296  std::string *user_agent_;
297 
302  mutable std::pair<std::string, std::string> last_signing_key_;
303 
304  pthread_t thread_upload_;
306 
307  struct pollfd *watch_fds_;
308  uint32_t watch_fds_size_;
310  uint32_t watch_fds_max_;
311 
312  // A pipe used to signal termination from S3FanoutManager to MainUpload
313  // thread. Anything written into it results in MainUpload thread exit.
315  // A pipe to used to push jobs from S3FanoutManager to MainUpload thread.
316  // S3FanoutManager writes a JobInfo* pointer. MainUpload then reads the
317  // pointer and processes the job.
318  int pipe_jobs_[2];
319  // A pipe used to collect completed jobs. MainUpload writes in the
320  // pointer to the completed job. PopCompletedJob() used to
321  // retrieve pointer.
323 
325 
326  unsigned int max_available_jobs_;
328 
329  // Writes and reads should be atomic because reading happens in a different
330  // thread than writing.
332 
333  // Report not every occurrence of throtteling but only every so often
335 
337 
342 }; // S3FanoutManager
343 
344 } // namespace s3fanout
345 
346 #endif // CVMFS_NETWORK_S3FANOUT_H_
Definition: prng.h:28
unsigned throttle_ms
Definition: s3fanout.h:152
const S3Config config_
Definition: s3fanout.h:281
pthread_mutex_t * jobs_todo_lock_
Definition: s3fanout.h:238
std::string GetAwsV4SigningKey(const std::string &date) const
Definition: s3fanout.cc:531
atomic_int32 multi_threaded_
Definition: s3fanout.h:305
uint64_t ms_throttled
Definition: s3fanout.h:80
std::string ip
Definition: s3fanout.h:163
void JobInfoInit()
Definition: s3fanout.h:123
std::string GetContentType(const JobInfo &info) const
Definition: s3fanout.cc:840
pthread_mutex_t * curl_handle_lock_
Definition: s3fanout.h:239
std::set< CURL * > * pool_handles_idle_
Definition: s3fanout.h:290
struct curl_slist * http_headers
Definition: s3fanout.h:143
dns::CaresResolver * resolver_
Definition: s3fanout.h:294
static const unsigned kDefault429ThrottleMs
Definition: s3fanout.h:176
CURLSH * sharehandle
Definition: s3fanout.h:166
static const char * kCacheControlDotCvmfs
Definition: s3fanout.h:231
bool MkV2Authz(const JobInfo &info, std::vector< std::string > *headers) const
Definition: s3fanout.cc:460
double transferred_bytes
Definition: s3fanout.h:76
S3FanoutManager(const S3Config &config)
Definition: s3fanout.cc:1193
static void * MainUpload(void *data)
Definition: s3fanout.cc:233
std::string dns_name
Definition: s3fanout.h:162
const std::string object_key
Definition: s3fanout.h:108
std::string MkCompleteHostname()
Definition: s3fanout.h:273
void PushNewJob(JobInfo *info)
Definition: s3fanout.cc:1329
static const char * kCacheControlCas
Definition: s3fanout.h:230
std::string GetRequestString(const JobInfo &info) const
Definition: s3fanout.cc:822
void InitializeDnsSettingsCurl(CURL *handle, CURLSH *sharehandle, curl_slist *clist) const
Definition: s3fanout.cc:675
SynchronizingCounter< uint32_t > Semaphore
Definition: s3fanout.h:172
unsigned int counter
Definition: s3fanout.h:161
std::string Print() const
Definition: s3fanout.cc:1353
int InitializeDnsSettings(CURL *handle, std::string remote_host) const
Definition: s3fanout.cc:687
struct curl_slist * clist
Definition: s3fanout.h:165
std::string * user_agent_
Definition: s3fanout.h:296
Failures error_code
Definition: s3fanout.h:146
AuthzMethods
Definition: s3fanout.h:32
S3FanOutDnsEntry()
Definition: s3fanout.h:159
unsigned backoff_ms
Definition: s3fanout.h:150
unsigned int max_available_jobs_
Definition: s3fanout.h:326
bool MkPayloadHash(const JobInfo &info, std::string *hex_hash) const
Definition: s3fanout.cc:767
int32_t atomic_int32
Definition: atomic.h:17
std::set< JobInfo * > * active_requests_
Definition: s3fanout.h:288
void PushCompletedJob(JobInfo *info)
Definition: s3fanout.cc:1337
uint64_t payload_size
Definition: s3fanout.h:144
void UpdateStatistics(CURL *handle)
Definition: s3fanout.cc:1023
std::string MkUrl(const std::string &objkey) const
Definition: s3fanout.h:265
void SetUrlOptions(JobInfo *info) const
Definition: s3fanout.cc:992
CURL * AcquireCurlHandle() const
Definition: s3fanout.cc:385
static const unsigned kDefaultHTTPSPort
Definition: s3fanout.h:182
uint64_t throttle_timestamp
Definition: s3fanout.h:154
Semaphore * available_jobs_
Definition: s3fanout.h:327
static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, void *userp, void *socketp)
Definition: s3fanout.cc:163
UniquePtr< FileBackedBuffer > origin
Definition: s3fanout.h:110
JobInfo(const std::string &object_key, void *callback, FileBackedBuffer *origin)
Definition: s3fanout.h:113
void ReleaseCurlHandle(JobInfo *info, CURL *handle) const
Definition: s3fanout.cc:418
char * errorbuffer
Definition: s3fanout.h:155
void Backoff(JobInfo *info)
Definition: s3fanout.cc:1049
struct pollfd * watch_fds_
Definition: s3fanout.h:307
JobInfo * PopCompletedJob()
Definition: s3fanout.cc:1344
static const unsigned kMax429ThrottleMs
Definition: s3fanout.h:178
unsigned char num_retries
Definition: s3fanout.h:148
std::string port
Definition: s3fanout.h:164
std::string complete_hostname_
Definition: s3fanout.h:282
uint64_t num_requests
Definition: s3fanout.h:78
bool MkAzureAuthz(const JobInfo &info, std::vector< std::string > *headers) const
Definition: s3fanout.cc:630
static const unsigned kLowSpeedLimit
Definition: s3fanout.h:232
RequestType request
Definition: s3fanout.h:145
static const unsigned kDefaultHTTPPort
Definition: s3fanout.h:181
double transfer_time
Definition: s3fanout.h:77
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
Definition: s3fanout.cc:1094
uint64_t num_retries
Definition: s3fanout.h:79
static void DetectThrottleIndicator(const std::string &header, JobInfo *info)
Definition: s3fanout.cc:40
std::set< S3FanOutDnsEntry * > * sharehandles_
Definition: s3fanout.h:292
SslCertificateStore ssl_certificate_store_
Definition: s3fanout.h:341
static const unsigned kThrottleReportIntervalSec
Definition: s3fanout.h:180
const Statistics & GetStatistics()
Definition: s3fanout.cc:1322
std::pair< std::string, std::string > last_signing_key_
Definition: s3fanout.h:302
std::string GetUriEncode(const std::string &val, bool encode_slash) const
Definition: s3fanout.cc:501
Statistics * statistics_
Definition: s3fanout.h:331
bool MkV4Authz(const JobInfo &info, std::vector< std::string > *headers) const
Definition: s3fanout.cc:551
uint64_t timestamp_last_throttle_report_
Definition: s3fanout.h:334
std::set< CURL * > * pool_handles_inuse_
Definition: s3fanout.h:291
Definition: s3fanout.h:158
std::map< CURL *, S3FanOutDnsEntry * > * curl_sharehandles_
Definition: s3fanout.h:293
std::vector< s3fanout::JobInfo * > jobs_todo_
Definition: s3fanout.h:237
bool CanRetry(const JobInfo *info)
Definition: s3fanout.cc:1034
const char * Code2Ascii(const Failures error)
Definition: s3fanout.h:57
CURL * curl_handle
Definition: s3fanout.h:142
Failures InitializeRequest(JobInfo *info, CURL *handle) const
Definition: s3fanout.cc:864