CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
s3fanout.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_NETWORK_S3FANOUT_H_
6 #define CVMFS_NETWORK_S3FANOUT_H_
7 
8 #include <poll.h>
9 #include <semaphore.h>
10 
11 #include <climits>
12 #include <cstdlib>
13 #include <map>
14 #include <set>
15 #include <string>
16 #include <utility>
17 #include <vector>
18 
19 #include "dns.h"
20 #include "duplex_curl.h"
21 #include "ssl.h"
22 #include "util/concurrency.h"
24 #include "util/mmap_file.h"
25 #include "util/pointer.h"
26 #include "util/prng.h"
27 #include "util/single_copy.h"
28 #include "util/smalloc.h"
29 
30 namespace s3fanout {
31 
36 };
37 
41 enum Failures {
42  kFailOk = 0,
52 
54 }; // Failures
55 
56 
57 inline const char *Code2Ascii(const Failures error) {
58  const char *texts[kFailNumEntries + 1];
59  texts[0] = "S3: OK";
60  texts[1] = "S3: local I/O failure";
61  texts[2] = "S3: malformed URL (bad request)";
62  texts[3] = "S3: forbidden";
63  texts[4] = "S3: failed to resolve host address";
64  texts[5] = "S3: host connection problem";
65  texts[6] = "S3: not found";
66  texts[7] = "S3: service not available";
67  texts[8] = "S3: unknown service error, perhaps wrong authentication protocol";
68  texts[8] = "S3: too many requests, service asks for backoff and retry";
69  texts[9] = "no text";
70  return texts[error];
71 }
72 
73 
74 struct Statistics {
76  double transfer_time;
77  uint64_t num_requests;
78  uint64_t num_retries;
79  uint64_t ms_throttled; // Total waiting time imposed by HTTP 429 replies
80 
82  transferred_bytes = 0.0;
83  transfer_time = 0.0;
84  num_requests = 0;
85  num_retries = 0;
86  ms_throttled = 0;
87  }
88 
89  std::string Print() const;
90 }; // Statistics
91 
92 
96 struct JobInfo : SingleCopy {
97  enum RequestType {
98  kReqHeadOnly = 0, // peek
99  kReqHeadPut, // conditional upload of content-addressed objects
100  kReqPutCas, // immutable data object
101  kReqPutDotCvmfs, // one of the /.cvmfs... top level files
102  kReqPutHtml, // HTML file - display instead of downloading
103  kReqPutBucket, // bucket creation
105  };
106 
107  const std::string object_key;
108  void *callback; // Callback to be called when job is finished
110 
111  // One constructor per destination
112  JobInfo(const std::string &object_key,
113  void *callback,
115  : object_key(object_key), origin(origin) {
116  JobInfoInit();
117  this->callback = callback;
118  }
119  void JobInfoInit() {
120  curl_handle = NULL;
121  http_headers = NULL;
122  callback = NULL;
125  http_error = 0;
126  num_retries = 0;
127  backoff_ms = 0;
128  throttle_ms = 0;
129  throttle_timestamp = 0;
130  errorbuffer = reinterpret_cast<char *>(
131  smalloc(sizeof(char) * CURL_ERROR_SIZE));
132  }
133  ~JobInfo() { free(errorbuffer); }
134 
135  // Internal state, don't touch
136  CURL *curl_handle;
137  struct curl_slist *http_headers;
138  uint64_t payload_size;
142  unsigned char num_retries;
143  // Exponential backoff with cutoff in case of errors
144  unsigned backoff_ms;
145  // Throttle imposed by HTTP 429 reply; mutually exclusive with backoff_ms
146  unsigned throttle_ms;
147  // Remember when the 429 reply came in to only throttle if still necessary
149  char *errorbuffer;
150 }; // JobInfo
151 
154  : counter(0)
155  , dns_name()
156  , ip()
157  , port("80")
158  , clist(NULL)
159  , sharehandle(NULL) { }
160  unsigned int counter;
161  std::string dns_name;
162  std::string ip;
163  std::string port;
164  struct curl_slist *clist;
165  CURLSH *sharehandle;
166 }; // S3FanOutDnsEntry
167 
168 
170  protected:
172 
173  public:
174  // 250ms pause after HTTP 429 "Too Many Retries"
175  static const unsigned kDefault429ThrottleMs;
176  // Don't throttle for more than a few seconds
177  static const unsigned kMax429ThrottleMs;
178  // Report throttle operations only every so often
179  static const unsigned kThrottleReportIntervalSec;
180  static const unsigned kDefaultHTTPPort;
181  static const unsigned kDefaultHTTPSPort;
182 
183  struct S3Config {
186  dns_buckets = true;
187  protocol = "http";
188  pool_max_handles = 0;
189  opt_timeout_sec = 20;
190  opt_max_retries = 3;
191  opt_backoff_init_ms = 100;
192  opt_backoff_max_ms = 2000;
193  x_amz_acl = "public-read";
194  }
195  std::string access_key;
196  std::string secret_key;
197  std::string hostname_port;
199  std::string region;
200  std::string flavor;
201  std::string bucket;
203  std::string protocol;
205  unsigned opt_timeout_sec;
206  unsigned opt_max_retries;
209  std::string proxy;
210  std::string x_amz_acl;
211  };
212 
213  static void DetectThrottleIndicator(const std::string &header, JobInfo *info);
214 
215  explicit S3FanoutManager(const S3Config &config);
216 
218 
219  void Spawn();
220 
221  void PushNewJob(JobInfo *info);
222  void PushCompletedJob(JobInfo *info);
224 
225  const Statistics &GetStatistics();
226 
227  private:
228  // Reflects the default Apache configuration of the local backend
229  static const char *kCacheControlCas; // Cache-Control: max-age=259200
230  static const char *kCacheControlDotCvmfs; // Cache-Control: max-age=61
231  static const unsigned kLowSpeedLimit = 1024; // Require at least 1kB/s
232 
233  static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action,
234  void *userp, void *socketp);
235  static void *MainUpload(void *data);
236  std::vector<s3fanout::JobInfo *> jobs_todo_;
237  pthread_mutex_t *jobs_todo_lock_;
238  pthread_mutex_t *curl_handle_lock_;
239 
240  CURL *AcquireCurlHandle() const;
241  void ReleaseCurlHandle(JobInfo *info, CURL *handle) const;
242  void InitPipeWatchFds();
243  int InitializeDnsSettings(CURL *handle, std::string remote_host) const;
244  void InitializeDnsSettingsCurl(CURL *handle, CURLSH *sharehandle,
245  curl_slist *clist) const;
246  Failures InitializeRequest(JobInfo *info, CURL *handle) const;
247  void SetUrlOptions(JobInfo *info) const;
248  void UpdateStatistics(CURL *handle);
249  bool CanRetry(const JobInfo *info);
250  void Backoff(JobInfo *info);
251  bool VerifyAndFinalize(const int curl_error, JobInfo *info);
252  std::string GetRequestString(const JobInfo &info) const;
253  std::string GetContentType(const JobInfo &info) const;
254  std::string GetUriEncode(const std::string &val, bool encode_slash) const;
255  std::string GetAwsV4SigningKey(const std::string &date) const;
256  bool MkPayloadHash(const JobInfo &info, std::string *hex_hash) const;
257  bool MkV2Authz(const JobInfo &info, std::vector<std::string> *headers) const;
258  bool MkV4Authz(const JobInfo &info, std::vector<std::string> *headers) const;
259  bool MkAzureAuthz(const JobInfo &info,
260  std::vector<std::string> *headers) const;
261  std::string MkUrl(const std::string &objkey) const {
262  if (config_.dns_buckets) {
263  return config_.protocol + "://" + complete_hostname_ + "/" + objkey;
264  } else {
265  return config_.protocol + "://" + complete_hostname_ + "/"
266  + config_.bucket + "/" + objkey;
267  }
268  }
269  std::string MkCompleteHostname() {
270  if (config_.dns_buckets) {
271  return config_.bucket + "." + config_.hostname_port;
272  } else {
273  return config_.hostname_port;
274  }
275  }
276 
278  std::string complete_hostname_;
279 
284  std::set<JobInfo *> *active_requests_;
285 
286  std::set<CURL *> *pool_handles_idle_;
287  std::set<CURL *> *pool_handles_inuse_;
288  std::set<S3FanOutDnsEntry *> *sharehandles_;
289  std::map<CURL *, S3FanOutDnsEntry *> *curl_sharehandles_;
291  CURLM *curl_multi_;
292  std::string *user_agent_;
293 
298  mutable std::pair<std::string, std::string> last_signing_key_;
299 
300  pthread_t thread_upload_;
302 
303  struct pollfd *watch_fds_;
304  uint32_t watch_fds_size_;
306  uint32_t watch_fds_max_;
307 
308  // A pipe used to signal termination from S3FanoutManager to MainUpload
309  // thread. Anything written into it results in MainUpload thread exit.
311  // A pipe to used to push jobs from S3FanoutManager to MainUpload thread.
312  // S3FanoutManager writes a JobInfo* pointer. MainUpload then reads the
313  // pointer and processes the job.
314  int pipe_jobs_[2];
315  // A pipe used to collect completed jobs. MainUpload writes in the
316  // pointer to the completed job. PopCompletedJob() used to
317  // retrieve pointer.
319 
321 
322  unsigned int max_available_jobs_;
324 
325  // Writes and reads should be atomic because reading happens in a different
326  // thread than writing.
328 
329  // Report not every occurrence of throtteling but only every so often
331 
333 
338 }; // S3FanoutManager
339 
340 } // namespace s3fanout
341 
342 #endif // CVMFS_NETWORK_S3FANOUT_H_
Definition: prng.h:27
unsigned throttle_ms
Definition: s3fanout.h:146
const S3Config config_
Definition: s3fanout.h:277
pthread_mutex_t * jobs_todo_lock_
Definition: s3fanout.h:237
std::string GetAwsV4SigningKey(const std::string &date) const
Definition: s3fanout.cc:517
atomic_int32 multi_threaded_
Definition: s3fanout.h:301
uint64_t ms_throttled
Definition: s3fanout.h:79
std::string ip
Definition: s3fanout.h:162
void JobInfoInit()
Definition: s3fanout.h:119
std::string GetContentType(const JobInfo &info) const
Definition: s3fanout.cc:803
pthread_mutex_t * curl_handle_lock_
Definition: s3fanout.h:238
std::set< CURL * > * pool_handles_idle_
Definition: s3fanout.h:286
struct curl_slist * http_headers
Definition: s3fanout.h:137
dns::CaresResolver * resolver_
Definition: s3fanout.h:290
static const unsigned kDefault429ThrottleMs
Definition: s3fanout.h:175
CURLSH * sharehandle
Definition: s3fanout.h:165
static const char * kCacheControlDotCvmfs
Definition: s3fanout.h:230
bool MkV2Authz(const JobInfo &info, std::vector< std::string > *headers) const
Definition: s3fanout.cc:452
double transferred_bytes
Definition: s3fanout.h:75
S3FanoutManager(const S3Config &config)
Definition: s3fanout.cc:1150
static void * MainUpload(void *data)
Definition: s3fanout.cc:233
std::string dns_name
Definition: s3fanout.h:161
const std::string object_key
Definition: s3fanout.h:107
std::string MkCompleteHostname()
Definition: s3fanout.h:269
void PushNewJob(JobInfo *info)
Definition: s3fanout.cc:1284
static const char * kCacheControlCas
Definition: s3fanout.h:229
std::string GetRequestString(const JobInfo &info) const
Definition: s3fanout.cc:785
void InitializeDnsSettingsCurl(CURL *handle, CURLSH *sharehandle, curl_slist *clist) const
Definition: s3fanout.cc:649
SynchronizingCounter< uint32_t > Semaphore
Definition: s3fanout.h:171
unsigned int counter
Definition: s3fanout.h:160
std::string Print() const
Definition: s3fanout.cc:1308
int InitializeDnsSettings(CURL *handle, std::string remote_host) const
Definition: s3fanout.cc:659
struct curl_slist * clist
Definition: s3fanout.h:164
std::string * user_agent_
Definition: s3fanout.h:292
Failures error_code
Definition: s3fanout.h:140
AuthzMethods
Definition: s3fanout.h:32
S3FanOutDnsEntry()
Definition: s3fanout.h:153
unsigned backoff_ms
Definition: s3fanout.h:144
unsigned int max_available_jobs_
Definition: s3fanout.h:322
bool MkPayloadHash(const JobInfo &info, std::string *hex_hash) const
Definition: s3fanout.cc:735
int32_t atomic_int32
Definition: atomic.h:17
std::set< JobInfo * > * active_requests_
Definition: s3fanout.h:284
void PushCompletedJob(JobInfo *info)
Definition: s3fanout.cc:1292
uint64_t payload_size
Definition: s3fanout.h:138
void UpdateStatistics(CURL *handle)
Definition: s3fanout.cc:984
std::string MkUrl(const std::string &objkey) const
Definition: s3fanout.h:261
void SetUrlOptions(JobInfo *info) const
Definition: s3fanout.cc:953
CURL * AcquireCurlHandle() const
Definition: s3fanout.cc:377
static const unsigned kDefaultHTTPSPort
Definition: s3fanout.h:181
uint64_t throttle_timestamp
Definition: s3fanout.h:148
Semaphore * available_jobs_
Definition: s3fanout.h:323
static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, void *userp, void *socketp)
Definition: s3fanout.cc:161
UniquePtr< FileBackedBuffer > origin
Definition: s3fanout.h:109
JobInfo(const std::string &object_key, void *callback, FileBackedBuffer *origin)
Definition: s3fanout.h:112
void ReleaseCurlHandle(JobInfo *info, CURL *handle) const
Definition: s3fanout.cc:410
char * errorbuffer
Definition: s3fanout.h:149
void Backoff(JobInfo *info)
Definition: s3fanout.cc:1009
struct pollfd * watch_fds_
Definition: s3fanout.h:303
JobInfo * PopCompletedJob()
Definition: s3fanout.cc:1299
static const unsigned kMax429ThrottleMs
Definition: s3fanout.h:177
unsigned char num_retries
Definition: s3fanout.h:142
std::string port
Definition: s3fanout.h:163
std::string complete_hostname_
Definition: s3fanout.h:278
uint64_t num_requests
Definition: s3fanout.h:77
bool MkAzureAuthz(const JobInfo &info, std::vector< std::string > *headers) const
Definition: s3fanout.cc:612
static const unsigned kLowSpeedLimit
Definition: s3fanout.h:231
RequestType request
Definition: s3fanout.h:139
static const unsigned kDefaultHTTPPort
Definition: s3fanout.h:180
double transfer_time
Definition: s3fanout.h:76
bool VerifyAndFinalize(const int curl_error, JobInfo *info)
Definition: s3fanout.cc:1053
uint64_t num_retries
Definition: s3fanout.h:78
static void DetectThrottleIndicator(const std::string &header, JobInfo *info)
Definition: s3fanout.cc:40
std::set< S3FanOutDnsEntry * > * sharehandles_
Definition: s3fanout.h:288
SslCertificateStore ssl_certificate_store_
Definition: s3fanout.h:337
static const unsigned kThrottleReportIntervalSec
Definition: s3fanout.h:179
const Statistics & GetStatistics()
Definition: s3fanout.cc:1279
std::pair< std::string, std::string > last_signing_key_
Definition: s3fanout.h:298
std::string GetUriEncode(const std::string &val, bool encode_slash) const
Definition: s3fanout.cc:490
Statistics * statistics_
Definition: s3fanout.h:327
bool MkV4Authz(const JobInfo &info, std::vector< std::string > *headers) const
Definition: s3fanout.cc:536
std::vector< s3fanout::JobInfo * > jobs_todo_
Definition: s3fanout.h:236
uint64_t timestamp_last_throttle_report_
Definition: s3fanout.h:330
std::set< CURL * > * pool_handles_inuse_
Definition: s3fanout.h:287
Definition: s3fanout.h:152
std::map< CURL *, S3FanOutDnsEntry * > * curl_sharehandles_
Definition: s3fanout.h:289
bool CanRetry(const JobInfo *info)
Definition: s3fanout.cc:995
const char * Code2Ascii(const Failures error)
Definition: s3fanout.h:57
CURL * curl_handle
Definition: s3fanout.h:136
Failures InitializeRequest(JobInfo *info, CURL *handle) const
Definition: s3fanout.cc:827