CernVM-FS  2.9.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
s3fanout.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_S3FANOUT_H_
6 #define CVMFS_S3FANOUT_H_
7 
8 #include <poll.h>
9 #include <semaphore.h>
10 
11 #include <climits>
12 #include <map>
13 #include <set>
14 #include <string>
15 #include <utility>
16 #include <vector>
17 
18 #include "dns.h"
19 #include "duplex_curl.h"
20 #include "prng.h"
22 #include "util/mmap_file.h"
23 #include "util/pointer.h"
24 #include "util_concurrency.h"
25 
26 namespace s3fanout {
27 
32 };
33 
37 enum Failures {
38  kFailOk = 0,
48 
50 }; // Failures
51 
52 
53 inline const char *Code2Ascii(const Failures error) {
54  const char *texts[kFailNumEntries + 1];
55  texts[0] = "S3: OK";
56  texts[1] = "S3: local I/O failure";
57  texts[2] = "S3: malformed URL (bad request)";
58  texts[3] = "S3: forbidden";
59  texts[4] = "S3: failed to resolve host address";
60  texts[5] = "S3: host connection problem";
61  texts[6] = "S3: not found";
62  texts[7] = "S3: service not available";
63  texts[8] = "S3: unknown service error, perhaps wrong authentication protocol";
64  texts[8] = "S3: too many requests, service asks for backoff and retry";
65  texts[9] = "no text";
66  return texts[error];
67 }
68 
69 
70 
71 struct Statistics {
73  double transfer_time;
74  uint64_t num_requests;
75  uint64_t num_retries;
76  uint64_t ms_throttled; // Total waiting time imposed by HTTP 429 replies
77 
79  transferred_bytes = 0.0;
80  transfer_time = 0.0;
81  num_requests = 0;
82  num_retries = 0;
83  ms_throttled = 0;
84  }
85 
86  std::string Print() const;
87 }; // Statistics
88 
89 
93 struct JobInfo {
94  enum RequestType {
95  kReqHeadOnly = 0, // peek
96  kReqHeadPut, // conditional upload of content-addressed objects
97  kReqPutCas, // immutable data object
98  kReqPutDotCvmfs, // one of the /.cvmfs... top level files
99  kReqPutHtml, // HTML file - display instead of downloading
100  kReqPutBucket, // bucket creation
102  };
103 
104  const std::string object_key;
105  void *callback; // Callback to be called when job is finished
107 
108  // One constructor per destination
110  const std::string &object_key,
111  void *callback,
113  : object_key(object_key),
114  origin(origin)
115  {
116  JobInfoInit();
117  this->callback = callback;
118  }
119  void JobInfoInit() {
120  curl_handle = NULL;
121  http_headers = NULL;
122  callback = NULL;
125  http_error = 0;
126  num_retries = 0;
127  backoff_ms = 0;
128  throttle_ms = 0;
129  throttle_timestamp = 0;
130  }
131  ~JobInfo() {}
132 
133  // Internal state, don't touch
134  CURL *curl_handle;
135  struct curl_slist *http_headers;
136  uint64_t payload_size;
140  unsigned char num_retries;
141  // Exponential backoff with cutoff in case of errors
142  unsigned backoff_ms;
143  // Throttle imposed by HTTP 429 reply; mutually exclusive with backoff_ms
144  unsigned throttle_ms;
145  // Remember when the 429 reply came in to only throttle if still necessary
147 }; // JobInfo
148 
150  S3FanOutDnsEntry() : counter(0), dns_name(), ip(), port("80"),
151  clist(NULL), sharehandle(NULL) {}
152  unsigned int counter;
153  std::string dns_name;
154  std::string ip;
155  std::string port;
156  struct curl_slist *clist;
157  CURLSH *sharehandle;
158 }; // S3FanOutDnsEntry
159 
160 
162  protected:
164 
165  public:
166  // 250ms pause after HTTP 429 "Too Many Retries"
167  static const unsigned kDefault429ThrottleMs;
168  // Don't throttle for more than a few seconds
169  static const unsigned kMax429ThrottleMs;
170  // Report throttle operations only every so often
171  static const unsigned kThrottleReportIntervalSec;
172  static const unsigned kDefaultHTTPPort;
173 
174  struct S3Config {
177  dns_buckets = true;
178  pool_max_handles = 0;
179  opt_timeout_sec = 20;
180  opt_max_retries = 3;
181  opt_backoff_init_ms = 100;
182  opt_backoff_max_ms = 2000;
183  }
184  std::string access_key;
185  std::string secret_key;
186  std::string hostname_port;
188  std::string region;
189  std::string flavor;
190  std::string bucket;
193  unsigned opt_timeout_sec;
194  unsigned opt_max_retries;
197  };
198 
199  static void DetectThrottleIndicator(const std::string &header, JobInfo *info);
200 
201  explicit S3FanoutManager(const S3Config &config);
202 
204 
205  void Spawn();
206 
207  void PushNewJob(JobInfo *info);
208  void PushCompletedJob(JobInfo *info);
210 
211  const Statistics &GetStatistics();
212 
213  private:
214  // Reflects the default Apache configuration of the local backend
215  static const char *kCacheControlCas; // Cache-Control: max-age=259200
216  static const char *kCacheControlDotCvmfs; // Cache-Control: max-age=61
217  static const unsigned kLowSpeedLimit = 1024; // Require at least 1kB/s
218 
219  static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action,
220  void *userp, void *socketp);
221  static void *MainUpload(void *data);
222  std::vector<s3fanout::JobInfo*> jobs_todo_;
223  pthread_mutex_t *jobs_todo_lock_;
224  pthread_mutex_t *curl_handle_lock_;
225 
226  CURL *AcquireCurlHandle() const;
227  void ReleaseCurlHandle(JobInfo *info, CURL *handle) const;
228  void InitPipeWatchFds();
229  int InitializeDnsSettings(CURL *handle,
230  std::string remote_host) const;
231  void InitializeDnsSettingsCurl(CURL *handle, CURLSH *sharehandle,
232  curl_slist *clist) const;
233  Failures InitializeRequest(JobInfo *info, CURL *handle) const;
234  void SetUrlOptions(JobInfo *info) const;
235  void UpdateStatistics(CURL *handle);
236  bool CanRetry(const JobInfo *info);
237  void Backoff(JobInfo *info);
238  bool VerifyAndFinalize(const int curl_error, JobInfo *info);
239  std::string GetRequestString(const JobInfo &info) const;
240  std::string GetContentType(const JobInfo &info) const;
241  std::string GetUriEncode(const std::string &val, bool encode_slash) const;
242  std::string GetAwsV4SigningKey(const std::string &date) const;
243  bool MkPayloadHash(const JobInfo &info, std::string *hex_hash) const;
244  bool MkV2Authz(const JobInfo &info,
245  std::vector<std::string> *headers) const;
246  bool MkV4Authz(const JobInfo &info,
247  std::vector<std::string> *headers) const;
248  bool MkAzureAuthz(const JobInfo &info,
249  std::vector<std::string> *headers) const;
250  std::string MkUrl(const std::string &objkey) const {
251  if (config_.dns_buckets) {
252  return "http://" + complete_hostname_ + "/" + objkey;
253  } else {
254  return "http://" + complete_hostname_ + "/" + config_.bucket +
255  "/" + objkey;
256  }
257  }
258  std::string MkCompleteHostname() {
259  if (config_.dns_buckets) {
260  return config_.bucket + "." + config_.hostname_port;
261  } else {
262  return config_.hostname_port;
263  }
264  }
265 
267  std::string complete_hostname_;
268 
273  std::set<JobInfo *> *active_requests_;
274 
275  std::set<CURL *> *pool_handles_idle_;
276  std::set<CURL *> *pool_handles_inuse_;
277  std::set<S3FanOutDnsEntry *> *sharehandles_;
278  std::map<CURL *, S3FanOutDnsEntry *> *curl_sharehandles_;
280  CURLM *curl_multi_;
281  std::string *user_agent_;
282 
287  mutable std::pair<std::string, std::string> last_signing_key_;
288 
289  pthread_t thread_upload_;
291 
292  struct pollfd *watch_fds_;
293  uint32_t watch_fds_size_;
295  uint32_t watch_fds_max_;
296 
297  // A pipe used to signal termination from S3FanoutManager to MainUpload
298  // thread. Anything written into it results in MainUpload thread exit.
300  // A pipe to used to push jobs from S3FanoutManager to MainUpload thread.
301  // S3FanoutManager writes a JobInfo* pointer. MainUpload then reads the
302  // pointer and processes the job.
303  int pipe_jobs_[2];
304  // A pipe used to collect completed jobs. MainUpload writes in the
305  // pointer to the completed job. PopCompletedJob() used to
306  // retrieve pointer.
308 
310 
311  unsigned int max_available_jobs_;
313 
314  // Writes and reads should be atomic because reading happens in a different
315  // thread than writing.
317 
318  // Report not every occurance of throtteling but only every so often
320 
322 }; // S3FanoutManager
323 
324 } // namespace s3fanout
325 
326 #endif // CVMFS_S3FANOUT_H_
Definition: prng.h:25
unsigned throttle_ms
Definition: s3fanout.h:144
const S3Config config_
Definition: s3fanout.h:266
pthread_mutex_t * jobs_todo_lock_
Definition: s3fanout.h:223
std::string GetAwsV4SigningKey(const std::string &date) const
Definition: s3fanout.cc:527
atomic_int32 multi_threaded_
Definition: s3fanout.h:290
uint64_t ms_throttled
Definition: s3fanout.h:76
std::string ip
Definition: s3fanout.h:154
void JobInfoInit()
Definition: s3fanout.h:119
std::string GetContentType(const JobInfo &info) const
Definition: s3fanout.cc:826
pthread_mutex_t * curl_handle_lock_
Definition: s3fanout.h:224
std::set< CURL * > * pool_handles_idle_
Definition: s3fanout.h:275
struct curl_slist * http_headers
Definition: s3fanout.h:135
dns::CaresResolver * resolver_
Definition: s3fanout.h:279
static const unsigned kDefault429ThrottleMs
Definition: s3fanout.h:167
CURLSH * sharehandle
Definition: s3fanout.h:157
static const char * kCacheControlDotCvmfs
Definition: s3fanout.h:216
bool MkV2Authz(const JobInfo &info, std::vector< std::string > *headers) const
Definition: s3fanout.cc:458
double transferred_bytes
Definition: s3fanout.h:72
S3FanoutManager(const S3Config &config)
Definition: s3fanout.cc:1164
static void * MainUpload(void *data)
Definition: s3fanout.cc:231
std::string dns_name
Definition: s3fanout.h:153
const std::string object_key
Definition: s3fanout.h:104
std::string MkCompleteHostname()
Definition: s3fanout.h:258
void PushNewJob(JobInfo *info)
Definition: s3fanout.cc:1298
static const char * kCacheControlCas
Definition: s3fanout.h:215
std::string GetRequestString(const JobInfo &info) const
Definition: s3fanout.cc:808
void InitializeDnsSettingsCurl(CURL *handle, CURLSH *sharehandle, curl_slist *clist) const
Definition: s3fanout.cc:661
SynchronizingCounter< uint32_t > Semaphore
Definition: s3fanout.h:163
unsigned int counter
Definition: s3fanout.h:152
std::string Print() const
Definition: s3fanout.cc:1322
int InitializeDnsSettings(CURL *handle, std::string remote_host) const
Definition: s3fanout.cc:673
struct curl_slist * clist
Definition: s3fanout.h:156
std::string * user_agent_
Definition: s3fanout.h:281
Failures error_code
Definition: s3fanout.h:138
AuthzMethods
Definition: s3fanout.h:28
S3FanOutDnsEntry()
Definition: s3fanout.h:150
unsigned backoff_ms
Definition: s3fanout.h:142
unsigned int max_available_jobs_
Definition: s3fanout.h:311
bool MkPayloadHash(const JobInfo &info, std::string *hex_hash) const
Definition: s3fanout.cc:753
int32_t atomic_int32
Definition: atomic.h:17
std::set< JobInfo * > * active_requests_
Definition: s3fanout.h:273
void PushCompletedJob(JobInfo *info)
Definition: s3fanout.cc:1306
uint64_t payload_size
Definition: s3fanout.h:136
void UpdateStatistics(CURL *handle)
Definition: s3fanout.cc:994
std::string MkUrl(const std::string &objkey) const
Definition: s3fanout.h:250
void SetUrlOptions(JobInfo *info) const
Definition: s3fanout.cc:966
CURL * AcquireCurlHandle() const
Definition: s3fanout.cc:383
uint64_t throttle_timestamp
Definition: s3fanout.h:146
Semaphore * available_jobs_
Definition: s3fanout.h:312
static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, void *userp, void *socketp)
Definition: s3fanout.cc:161
UniquePtr< FileBackedBuffer > origin
Definition: s3fanout.h:106
JobInfo(const std::string &object_key, void *callback, FileBackedBuffer *origin)
Definition: s3fanout.h:109
void ReleaseCurlHandle(JobInfo *info, CURL *handle) const
Definition: s3fanout.cc:416
void Backoff(JobInfo *info)
Definition: s3fanout.cc:1020
struct pollfd * watch_fds_
Definition: s3fanout.h:292
JobInfo * PopCompletedJob()
Definition: s3fanout.cc:1313
static const unsigned kMax429ThrottleMs
Definition: s3fanout.h:169
unsigned char num_retries
Definition: s3fanout.h:140
std::string port
Definition: s3fanout.h:155
std::string complete_hostname_
Definition: s3fanout.h:267
uint64_t num_requests
Definition: s3fanout.h:74
bool MkAzureAuthz(const JobInfo &info, std::vector< std::string > *headers) const
Definition: s3fanout.cc:616
static const unsigned kLowSpeedLimit
Definition: s3fanout.h:217
RequestType request
Definition: s3fanout.h:137
static const unsigned kDefaultHTTPPort
Definition: s3fanout.h:172
double transfer_time
Definition: s3fanout.h:73
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
Definition: s3fanout.cc:1065
uint64_t num_retries
Definition: s3fanout.h:75
static void DetectThrottleIndicator(const std::string &header, JobInfo *info)
Definition: s3fanout.cc:39
std::set< S3FanOutDnsEntry * > * sharehandles_
Definition: s3fanout.h:277
static const unsigned kThrottleReportIntervalSec
Definition: s3fanout.h:171
const Statistics & GetStatistics()
Definition: s3fanout.cc:1291
std::pair< std::string, std::string > last_signing_key_
Definition: s3fanout.h:287
std::string GetUriEncode(const std::string &val, bool encode_slash) const
Definition: s3fanout.cc:497
Statistics * statistics_
Definition: s3fanout.h:316
bool MkV4Authz(const JobInfo &info, std::vector< std::string > *headers) const
Definition: s3fanout.cc:547
uint64_t timestamp_last_throttle_report_
Definition: s3fanout.h:319
std::set< CURL * > * pool_handles_inuse_
Definition: s3fanout.h:276
Definition: s3fanout.h:149
std::map< CURL *, S3FanOutDnsEntry * > * curl_sharehandles_
Definition: s3fanout.h:278
std::vector< s3fanout::JobInfo * > jobs_todo_
Definition: s3fanout.h:222
bool CanRetry(const JobInfo *info)
Definition: s3fanout.cc:1005
const char * Code2Ascii(const Failures error)
Definition: s3fanout.h:53
CURL * curl_handle
Definition: s3fanout.h:134
Failures InitializeRequest(JobInfo *info, CURL *handle) const
Definition: s3fanout.cc:850