CernVM-FS  2.10.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
s3fanout.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_S3FANOUT_H_
6 #define CVMFS_S3FANOUT_H_
7 
8 #include <poll.h>
9 #include <semaphore.h>
10 
11 #include <climits>
12 #include <cstdlib>
13 #include <map>
14 #include <set>
15 #include <string>
16 #include <utility>
17 #include <vector>
18 
19 #include "dns.h"
20 #include "duplex_curl.h"
21 #include "prng.h"
22 #include "smalloc.h"
23 #include "ssl.h"
25 #include "util/mmap_file.h"
26 #include "util/pointer.h"
27 #include "util/single_copy.h"
28 #include "util_concurrency.h"
29 
30 namespace s3fanout {
31 
36 };
37 
41 enum Failures {
42  kFailOk = 0,
52 
54 }; // Failures
55 
56 
57 inline const char *Code2Ascii(const Failures error) {
58  const char *texts[kFailNumEntries + 1];
59  texts[0] = "S3: OK";
60  texts[1] = "S3: local I/O failure";
61  texts[2] = "S3: malformed URL (bad request)";
62  texts[3] = "S3: forbidden";
63  texts[4] = "S3: failed to resolve host address";
64  texts[5] = "S3: host connection problem";
65  texts[6] = "S3: not found";
66  texts[7] = "S3: service not available";
67  texts[8] = "S3: unknown service error, perhaps wrong authentication protocol";
68  texts[8] = "S3: too many requests, service asks for backoff and retry";
69  texts[9] = "no text";
70  return texts[error];
71 }
72 
73 
74 
75 struct Statistics {
77  double transfer_time;
78  uint64_t num_requests;
79  uint64_t num_retries;
80  uint64_t ms_throttled; // Total waiting time imposed by HTTP 429 replies
81 
83  transferred_bytes = 0.0;
84  transfer_time = 0.0;
85  num_requests = 0;
86  num_retries = 0;
87  ms_throttled = 0;
88  }
89 
90  std::string Print() const;
91 }; // Statistics
92 
93 
97 struct JobInfo : SingleCopy {
98  enum RequestType {
99  kReqHeadOnly = 0, // peek
100  kReqHeadPut, // conditional upload of content-addressed objects
101  kReqPutCas, // immutable data object
102  kReqPutDotCvmfs, // one of the /.cvmfs... top level files
103  kReqPutHtml, // HTML file - display instead of downloading
104  kReqPutBucket, // bucket creation
106  };
107 
108  const std::string object_key;
109  void *callback; // Callback to be called when job is finished
111 
112  // One constructor per destination
114  const std::string &object_key,
115  void *callback,
117  : object_key(object_key),
118  origin(origin)
119  {
120  JobInfoInit();
121  this->callback = callback;
122  }
123  void JobInfoInit() {
124  curl_handle = NULL;
125  http_headers = NULL;
126  callback = NULL;
129  http_error = 0;
130  num_retries = 0;
131  backoff_ms = 0;
132  throttle_ms = 0;
133  throttle_timestamp = 0;
134  errorbuffer =
135  reinterpret_cast<char *>(smalloc(sizeof(char) * CURL_ERROR_SIZE));
136  }
138  free(errorbuffer);
139  }
140 
141  // Internal state, don't touch
142  CURL *curl_handle;
143  struct curl_slist *http_headers;
144  uint64_t payload_size;
148  unsigned char num_retries;
149  // Exponential backoff with cutoff in case of errors
150  unsigned backoff_ms;
151  // Throttle imposed by HTTP 429 reply; mutually exclusive with backoff_ms
152  unsigned throttle_ms;
153  // Remember when the 429 reply came in to only throttle if still necessary
155  char *errorbuffer;
156 }; // JobInfo
157 
159  S3FanOutDnsEntry() : counter(0), dns_name(), ip(), port("80"),
160  clist(NULL), sharehandle(NULL) {}
161  unsigned int counter;
162  std::string dns_name;
163  std::string ip;
164  std::string port;
165  struct curl_slist *clist;
166  CURLSH *sharehandle;
167 }; // S3FanOutDnsEntry
168 
169 
171  protected:
173 
174  public:
175  // 250ms pause after HTTP 429 "Too Many Retries"
176  static const unsigned kDefault429ThrottleMs;
177  // Don't throttle for more than a few seconds
178  static const unsigned kMax429ThrottleMs;
179  // Report throttle operations only every so often
180  static const unsigned kThrottleReportIntervalSec;
181  static const unsigned kDefaultHTTPPort;
182  static const unsigned kDefaultHTTPSPort;
183 
184  struct S3Config {
187  dns_buckets = true;
188  protocol = "http";
189  pool_max_handles = 0;
190  opt_timeout_sec = 20;
191  opt_max_retries = 3;
192  opt_backoff_init_ms = 100;
193  opt_backoff_max_ms = 2000;
194  }
195  std::string access_key;
196  std::string secret_key;
197  std::string hostname_port;
199  std::string region;
200  std::string flavor;
201  std::string bucket;
203  std::string protocol;
205  unsigned opt_timeout_sec;
206  unsigned opt_max_retries;
209  std::string proxy;
210  };
211 
212  static void DetectThrottleIndicator(const std::string &header, JobInfo *info);
213 
214  explicit S3FanoutManager(const S3Config &config);
215 
217 
218  void Spawn();
219 
220  void PushNewJob(JobInfo *info);
221  void PushCompletedJob(JobInfo *info);
223 
224  const Statistics &GetStatistics();
225 
226  private:
227  // Reflects the default Apache configuration of the local backend
228  static const char *kCacheControlCas; // Cache-Control: max-age=259200
229  static const char *kCacheControlDotCvmfs; // Cache-Control: max-age=61
230  static const unsigned kLowSpeedLimit = 1024; // Require at least 1kB/s
231 
232  static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action,
233  void *userp, void *socketp);
234  static void *MainUpload(void *data);
235  std::vector<s3fanout::JobInfo*> jobs_todo_;
236  pthread_mutex_t *jobs_todo_lock_;
237  pthread_mutex_t *curl_handle_lock_;
238 
239  CURL *AcquireCurlHandle() const;
240  void ReleaseCurlHandle(JobInfo *info, CURL *handle) const;
241  void InitPipeWatchFds();
242  int InitializeDnsSettings(CURL *handle,
243  std::string remote_host) const;
244  void InitializeDnsSettingsCurl(CURL *handle, CURLSH *sharehandle,
245  curl_slist *clist) const;
246  Failures InitializeRequest(JobInfo *info, CURL *handle) const;
247  void SetUrlOptions(JobInfo *info) const;
248  void UpdateStatistics(CURL *handle);
249  bool CanRetry(const JobInfo *info);
250  void Backoff(JobInfo *info);
251  bool VerifyAndFinalize(const int curl_error, JobInfo *info);
252  std::string GetRequestString(const JobInfo &info) const;
253  std::string GetContentType(const JobInfo &info) const;
254  std::string GetUriEncode(const std::string &val, bool encode_slash) const;
255  std::string GetAwsV4SigningKey(const std::string &date) const;
256  bool MkPayloadHash(const JobInfo &info, std::string *hex_hash) const;
257  bool MkV2Authz(const JobInfo &info,
258  std::vector<std::string> *headers) const;
259  bool MkV4Authz(const JobInfo &info,
260  std::vector<std::string> *headers) const;
261  bool MkAzureAuthz(const JobInfo &info,
262  std::vector<std::string> *headers) const;
263  std::string MkUrl(const std::string &objkey) const {
264  if (config_.dns_buckets) {
265  return config_.protocol + "://" + complete_hostname_ + "/" + objkey;
266  } else {
267  return config_.protocol + "://" + complete_hostname_ + "/" +
268  config_.bucket + "/" + objkey;
269  }
270  }
271  std::string MkCompleteHostname() {
272  if (config_.dns_buckets) {
273  return config_.bucket + "." + config_.hostname_port;
274  } else {
275  return config_.hostname_port;
276  }
277  }
278 
280  std::string complete_hostname_;
281 
286  std::set<JobInfo *> *active_requests_;
287 
288  std::set<CURL *> *pool_handles_idle_;
289  std::set<CURL *> *pool_handles_inuse_;
290  std::set<S3FanOutDnsEntry *> *sharehandles_;
291  std::map<CURL *, S3FanOutDnsEntry *> *curl_sharehandles_;
293  CURLM *curl_multi_;
294  std::string *user_agent_;
295 
300  mutable std::pair<std::string, std::string> last_signing_key_;
301 
302  pthread_t thread_upload_;
304 
305  struct pollfd *watch_fds_;
306  uint32_t watch_fds_size_;
308  uint32_t watch_fds_max_;
309 
310  // A pipe used to signal termination from S3FanoutManager to MainUpload
311  // thread. Anything written into it results in MainUpload thread exit.
313  // A pipe to used to push jobs from S3FanoutManager to MainUpload thread.
314  // S3FanoutManager writes a JobInfo* pointer. MainUpload then reads the
315  // pointer and processes the job.
316  int pipe_jobs_[2];
317  // A pipe used to collect completed jobs. MainUpload writes in the
318  // pointer to the completed job. PopCompletedJob() used to
319  // retrieve pointer.
321 
323 
324  unsigned int max_available_jobs_;
326 
327  // Writes and reads should be atomic because reading happens in a different
328  // thread than writing.
330 
331  // Report not every occurance of throtteling but only every so often
333 
335 
340 }; // S3FanoutManager
341 
342 } // namespace s3fanout
343 
344 #endif // CVMFS_S3FANOUT_H_
Definition: prng.h:25
unsigned throttle_ms
Definition: s3fanout.h:152
const S3Config config_
Definition: s3fanout.h:279
pthread_mutex_t * jobs_todo_lock_
Definition: s3fanout.h:236
std::string GetAwsV4SigningKey(const std::string &date) const
Definition: s3fanout.cc:528
atomic_int32 multi_threaded_
Definition: s3fanout.h:303
uint64_t ms_throttled
Definition: s3fanout.h:80
std::string ip
Definition: s3fanout.h:163
void JobInfoInit()
Definition: s3fanout.h:123
std::string GetContentType(const JobInfo &info) const
Definition: s3fanout.cc:831
pthread_mutex_t * curl_handle_lock_
Definition: s3fanout.h:237
std::set< CURL * > * pool_handles_idle_
Definition: s3fanout.h:288
struct curl_slist * http_headers
Definition: s3fanout.h:143
dns::CaresResolver * resolver_
Definition: s3fanout.h:292
static const unsigned kDefault429ThrottleMs
Definition: s3fanout.h:176
CURLSH * sharehandle
Definition: s3fanout.h:166
static const char * kCacheControlDotCvmfs
Definition: s3fanout.h:229
bool MkV2Authz(const JobInfo &info, std::vector< std::string > *headers) const
Definition: s3fanout.cc:459
double transferred_bytes
Definition: s3fanout.h:76
S3FanoutManager(const S3Config &config)
Definition: s3fanout.cc:1184
static void * MainUpload(void *data)
Definition: s3fanout.cc:232
std::string dns_name
Definition: s3fanout.h:162
const std::string object_key
Definition: s3fanout.h:108
std::string MkCompleteHostname()
Definition: s3fanout.h:271
void PushNewJob(JobInfo *info)
Definition: s3fanout.cc:1320
static const char * kCacheControlCas
Definition: s3fanout.h:228
std::string GetRequestString(const JobInfo &info) const
Definition: s3fanout.cc:813
void InitializeDnsSettingsCurl(CURL *handle, CURLSH *sharehandle, curl_slist *clist) const
Definition: s3fanout.cc:666
SynchronizingCounter< uint32_t > Semaphore
Definition: s3fanout.h:172
unsigned int counter
Definition: s3fanout.h:161
std::string Print() const
Definition: s3fanout.cc:1344
int InitializeDnsSettings(CURL *handle, std::string remote_host) const
Definition: s3fanout.cc:678
struct curl_slist * clist
Definition: s3fanout.h:165
std::string * user_agent_
Definition: s3fanout.h:294
Failures error_code
Definition: s3fanout.h:146
AuthzMethods
Definition: s3fanout.h:32
S3FanOutDnsEntry()
Definition: s3fanout.h:159
unsigned backoff_ms
Definition: s3fanout.h:150
unsigned int max_available_jobs_
Definition: s3fanout.h:324
bool MkPayloadHash(const JobInfo &info, std::string *hex_hash) const
Definition: s3fanout.cc:758
int32_t atomic_int32
Definition: atomic.h:17
std::set< JobInfo * > * active_requests_
Definition: s3fanout.h:286
void PushCompletedJob(JobInfo *info)
Definition: s3fanout.cc:1328
uint64_t payload_size
Definition: s3fanout.h:144
void UpdateStatistics(CURL *handle)
Definition: s3fanout.cc:1014
std::string MkUrl(const std::string &objkey) const
Definition: s3fanout.h:263
void SetUrlOptions(JobInfo *info) const
Definition: s3fanout.cc:983
CURL * AcquireCurlHandle() const
Definition: s3fanout.cc:384
static const unsigned kDefaultHTTPSPort
Definition: s3fanout.h:182
uint64_t throttle_timestamp
Definition: s3fanout.h:154
Semaphore * available_jobs_
Definition: s3fanout.h:325
static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, void *userp, void *socketp)
Definition: s3fanout.cc:162
UniquePtr< FileBackedBuffer > origin
Definition: s3fanout.h:110
JobInfo(const std::string &object_key, void *callback, FileBackedBuffer *origin)
Definition: s3fanout.h:113
void ReleaseCurlHandle(JobInfo *info, CURL *handle) const
Definition: s3fanout.cc:417
char * errorbuffer
Definition: s3fanout.h:155
void Backoff(JobInfo *info)
Definition: s3fanout.cc:1040
struct pollfd * watch_fds_
Definition: s3fanout.h:305
JobInfo * PopCompletedJob()
Definition: s3fanout.cc:1335
static const unsigned kMax429ThrottleMs
Definition: s3fanout.h:178
unsigned char num_retries
Definition: s3fanout.h:148
std::string port
Definition: s3fanout.h:164
std::string complete_hostname_
Definition: s3fanout.h:280
uint64_t num_requests
Definition: s3fanout.h:78
bool MkAzureAuthz(const JobInfo &info, std::vector< std::string > *headers) const
Definition: s3fanout.cc:621
static const unsigned kLowSpeedLimit
Definition: s3fanout.h:230
RequestType request
Definition: s3fanout.h:145
static const unsigned kDefaultHTTPPort
Definition: s3fanout.h:181
double transfer_time
Definition: s3fanout.h:77
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
Definition: s3fanout.cc:1085
uint64_t num_retries
Definition: s3fanout.h:79
static void DetectThrottleIndicator(const std::string &header, JobInfo *info)
Definition: s3fanout.cc:40
std::set< S3FanOutDnsEntry * > * sharehandles_
Definition: s3fanout.h:290
SslCertificateStore ssl_certificate_store_
Definition: s3fanout.h:339
static const unsigned kThrottleReportIntervalSec
Definition: s3fanout.h:180
const Statistics & GetStatistics()
Definition: s3fanout.cc:1313
std::pair< std::string, std::string > last_signing_key_
Definition: s3fanout.h:300
std::string GetUriEncode(const std::string &val, bool encode_slash) const
Definition: s3fanout.cc:498
Statistics * statistics_
Definition: s3fanout.h:329
bool MkV4Authz(const JobInfo &info, std::vector< std::string > *headers) const
Definition: s3fanout.cc:548
uint64_t timestamp_last_throttle_report_
Definition: s3fanout.h:332
std::set< CURL * > * pool_handles_inuse_
Definition: s3fanout.h:289
Definition: s3fanout.h:158
std::map< CURL *, S3FanOutDnsEntry * > * curl_sharehandles_
Definition: s3fanout.h:291
std::vector< s3fanout::JobInfo * > jobs_todo_
Definition: s3fanout.h:235
bool CanRetry(const JobInfo *info)
Definition: s3fanout.cc:1025
const char * Code2Ascii(const Failures error)
Definition: s3fanout.h:57
CURL * curl_handle
Definition: s3fanout.h:142
Failures InitializeRequest(JobInfo *info, CURL *handle) const
Definition: s3fanout.cc:855