GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/s3fanout.h Lines: 48 62 77.4 %
Date: 2019-02-03 02:48:13 Branches: 1 2 50.0 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#ifndef CVMFS_S3FANOUT_H_
6
#define CVMFS_S3FANOUT_H_
7
8
#include <poll.h>
9
#include <semaphore.h>
10
11
#include <climits>
12
#include <map>
13
#include <set>
14
#include <string>
15
#include <utility>
16
#include <vector>
17
18
#include "dns.h"
19
#include "duplex_curl.h"
20
#include "prng.h"
21
#include "util/mmap_file.h"
22
#include "util_concurrency.h"
23
24
namespace s3fanout {
25
26
/**
27
 * From where to read the data.
28
 */
29
enum Origin {
30
  kOriginMem = 1,
31
  kOriginPath,
32
};  // Origin
33
34
35
enum AuthzMethods {
36
  kAuthzAwsV2 = 0,
37
  kAuthzAwsV4
38
};
39
40
/**
41
 * Possible return values.
42
 */
43
enum Failures {
44
  kFailOk = 0,
45
  kFailLocalIO,
46
  kFailBadRequest,
47
  kFailForbidden,
48
  kFailHostResolve,
49
  kFailHostConnection,
50
  kFailNotFound,
51
  kFailServiceUnavailable,
52
  kFailRetry,
53
  kFailOther,
54
55
  kFailNumEntries
56
};  // Failures
57
58
59
inline const char *Code2Ascii(const Failures error) {
60
  const char *texts[kFailNumEntries + 1];
61
  texts[0] = "S3: OK";
62
  texts[1] = "S3: local I/O failure";
63
  texts[2] = "S3: malformed URL (bad request)";
64
  texts[3] = "S3: forbidden";
65
  texts[4] = "S3: failed to resolve host address";
66
  texts[5] = "S3: host connection problem";
67
  texts[6] = "S3: not found";
68
  texts[7] = "S3: service not available";
69
  texts[8] = "S3: unknown service error, perhaps wrong authentication protocol";
70
  texts[8] = "S3: too many requests, service asks for backoff and retry";
71
  texts[9] = "no text";
72
  return texts[error];
73
}
74
75
76
77
struct Statistics {
78
  double transferred_bytes;
79
  double transfer_time;
80
  uint64_t num_requests;
81
  uint64_t num_retries;
82
  uint64_t ms_throttled;  // Total waiting time imposed by HTTP 429 replies
83
84
99
  Statistics() {
85
99
    transferred_bytes = 0.0;
86
99
    transfer_time = 0.0;
87
99
    num_requests = 0;
88
99
    num_retries = 0;
89
99
    ms_throttled = 0;
90
99
  }
91
92
  std::string Print() const;
93
};  // Statistics
94
95
96
/**
97
 * Contains all the information to specify an upload job.
98
 */
99
struct JobInfo {
100
  enum RequestType {
101
    kReqHead = 0,
102
    kReqPutCas,  // immutable data object
103
    kReqPutDotCvmfs,  // one of the /.cvmfs... top level files
104
    kReqDelete,
105
  };
106
107
  Origin origin;
108
  struct {
109
    size_t size;
110
    size_t pos;
111
    const unsigned char *data;
112
  } origin_mem;
113
114
  const std::string access_key;
115
  const std::string secret_key;
116
  const AuthzMethods authz_method;
117
  const std::string hostname;
118
  const std::string region;
119
  const std::string bucket;
120
  const std::string object_key;
121
  const std::string origin_path;
122
  bool test_and_set;
123
  void *callback;  // Callback to be called when job is finished
124
  MemoryMappedFile *mmf;
125
126
  // One constructor per destination
127
2055
  JobInfo(
128
    const std::string &access_key,
129
    const std::string &secret_key,
130
    const AuthzMethods &authz_method,
131
    const std::string &hostname,
132
    const std::string &region,
133
    const std::string &bucket,
134
    const std::string &object_key,
135
    void *callback,
136
    const std::string &origin_path)
137
    : access_key(access_key)
138
    , secret_key(secret_key)
139
    , authz_method(authz_method)
140
    , hostname(hostname)
141
    , region(region)
142
    , bucket(bucket)
143
    , object_key(object_key)
144
2055
    , origin_path(origin_path)
145
  {
146
2055
    JobInfoInit();
147
2055
    origin = kOriginPath;
148
2055
    this->callback = callback;
149
2055
  }
150
1041
  JobInfo(
151
    const std::string &access_key,
152
    const std::string &secret_key,
153
    const AuthzMethods &authz_method,
154
    const std::string &hostname,
155
    const std::string &region,
156
    const std::string &bucket,
157
    const std::string &object_key,
158
    void *callback,
159
    MemoryMappedFile *mmf,
160
    const unsigned char *buffer, size_t size)
161
    : access_key(access_key)
162
    , secret_key(secret_key)
163
    , authz_method(authz_method)
164
    , hostname(hostname)
165
    , region(region)
166
    , bucket(bucket)
167
1041
    , object_key(object_key)
168
  {
169
1041
    JobInfoInit();
170
1041
    origin = kOriginMem;
171
1041
    origin_mem.size = size;
172
1041
    origin_mem.data = buffer;
173
1041
    this->callback = callback;
174
1041
    this->mmf = mmf;
175
1041
  }
176
3096
  void JobInfoInit() {
177
3096
    curl_handle = NULL;
178
3096
    http_headers = NULL;
179
3096
    test_and_set = false;
180
3096
    origin_mem.pos = 0;
181
3096
    origin_mem.size = 0;
182
3096
    origin_mem.data = NULL;
183
3096
    callback = NULL;
184
3096
    mmf = NULL;
185
3096
    origin_file = NULL;
186
3096
    request = kReqPutCas;
187
3096
    error_code = kFailOk;
188
3096
    http_error = 0;
189
3096
    num_retries = 0;
190
3096
    backoff_ms = 0;
191
3096
    throttle_ms = 0;
192
3096
    throttle_timestamp = 0;
193
3096
    origin = kOriginPath;
194
3096
  }
195
130
  ~JobInfo() {}
196
197
  // Internal state, don't touch
198
  CURL *curl_handle;
199
  struct curl_slist *http_headers;
200
  FILE *origin_file;
201
  RequestType request;
202
  Failures error_code;
203
  int http_error;
204
  unsigned char num_retries;
205
  // Exponential backoff with cutoff in case of errors
206
  unsigned backoff_ms;
207
  // Throttle imposed by HTTP 429 reply; mutually exclusive with backoff_ms
208
  unsigned throttle_ms;
209
  // Remember when the 429 reply came in to only throttle if still necessary
210
  uint64_t throttle_timestamp;
211
};  // JobInfo
212
213
99
struct S3FanOutDnsEntry {
214
99
  S3FanOutDnsEntry() : counter(0), dns_name(), ip(), port("80"),
215
99
     clist(NULL), sharehandle(NULL) {}
216
  unsigned int counter;
217
  std::string dns_name;
218
  std::string ip;
219
  std::string port;
220
  struct curl_slist *clist;
221
  CURLSH *sharehandle;
222
};  // S3FanOutDnsEntry
223
224
225
class S3FanoutManager : SingleCopy {
226
 protected:
227
  typedef SynchronizingCounter<uint32_t> Semaphore;
228
229
 public:
230
  // 250ms pause after HTTP 429 "Too Many Retries"
231
  static const unsigned kDefault429ThrottleMs;
232
  // Don't throttle for more than a few seconds
233
  static const unsigned kMax429ThrottleMs;
234
  // Report throttle operations only every so often
235
  static const unsigned kThrottleReportIntervalSec;
236
237
  static void DetectThrottleIndicator(const std::string &header, JobInfo *info);
238
239
  S3FanoutManager();
240
  ~S3FanoutManager();
241
242
  void Init(const unsigned max_pool_handles, bool dns_buckets);
243
  void Fini();
244
  void Spawn();
245
246
  void PushNewJob(JobInfo *info);
247
  int PopCompletedJobs(std::vector<s3fanout::JobInfo*> *jobs);
248
249
  const Statistics &GetStatistics();
250
  void SetTimeout(const unsigned seconds);
251
  void GetTimeout(unsigned *seconds);
252
  void SetRetryParameters(const unsigned max_retries,
253
                          const unsigned backoff_init_ms,
254
                          const unsigned backoff_max_ms);
255
256
  bool DoSingleJob(JobInfo *info) const;
257
258
 private:
259
  // Reflects the default Apache configuration of the local backend
260
  static const char *kCacheControlCas;  // Cache-Control: max-age=259200
261
  static const char *kCacheControlDotCvmfs;  // Cache-Control: max-age=61
262
263
  static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action,
264
                                void *userp, void *socketp);
265
  static void *MainUpload(void *data);
266
  std::vector<s3fanout::JobInfo*> jobs_todo_;
267
  pthread_mutex_t *jobs_todo_lock_;
268
  std::vector<s3fanout::JobInfo*> jobs_completed_;
269
  pthread_mutex_t *jobs_completed_lock_;
270
  pthread_mutex_t *curl_handle_lock_;
271
272
  CURL *AcquireCurlHandle() const;
273
  void ReleaseCurlHandle(JobInfo *info, CURL *handle) const;
274
  int InitializeDnsSettings(CURL *handle,
275
                            std::string remote_host) const;
276
  void InitializeDnsSettingsCurl(CURL *handle, CURLSH *sharehandle,
277
                                 curl_slist *clist) const;
278
  Failures InitializeRequest(JobInfo *info, CURL *handle) const;
279
  void SetUrlOptions(JobInfo *info) const;
280
  void UpdateStatistics(CURL *handle);
281
  bool CanRetry(const JobInfo *info);
282
  void Backoff(JobInfo *info);
283
  bool VerifyAndFinalize(const int curl_error, JobInfo *info);
284
  std::string GetRequestString(const JobInfo &info) const;
285
  std::string GetContentType(const JobInfo &info) const;
286
  std::string GetUriEncode(const std::string &val, bool encode_slash) const;
287
  std::string GetAwsV4SigningKey(const JobInfo &info,
288
                                 const std::string &date) const;
289
  bool MkPayloadHash(const JobInfo &info, std::string *hex_hash) const;
290
  bool MkPayloadSize(const JobInfo &info, uint64_t *size) const;
291
  bool MkV2Authz(const JobInfo &info,
292
                 std::vector<std::string> *headers) const;
293
  bool MkV4Authz(const JobInfo &info,
294
                 std::vector<std::string> *headers) const;
295
6056
  std::string MkUrl(const std::string &host,
296
                    const std::string &bucket,
297
                    const std::string &objkey2) const {
298
6056
    if (dns_buckets_) {
299
      return "http://" + bucket + "." + host + "/" + objkey2;
300
    } else {
301
6056
      return "http://" + host + "/" + bucket + "/" + objkey2;
302
    }
303
  }
304
305
  Prng prng_;
306
  std::set<CURL *> *pool_handles_idle_;
307
  std::set<CURL *> *pool_handles_inuse_;
308
  std::set<S3FanOutDnsEntry *> *sharehandles_;
309
  std::map<CURL *, S3FanOutDnsEntry *> *curl_sharehandles_;
310
  dns::CaresResolver *resolver_;
311
  uint32_t pool_max_handles_;
312
  CURLM *curl_multi_;
313
  std::string *user_agent_;
314
315
  bool dns_buckets_;
316
317
  /**
318
   * AWS4 signing keys are derived from the secret key, a region and a date.
319
   * They can be cached.
320
   */
321
  mutable std::map<std::string, std::string> signing_keys_;
322
323
  pthread_t thread_upload_;
324
  bool thread_upload_run_;
325
  atomic_int32 multi_threaded_;
326
327
  struct pollfd *watch_fds_;
328
  uint32_t watch_fds_size_;
329
  uint32_t watch_fds_inuse_;
330
  uint32_t watch_fds_max_;
331
332
  pthread_mutex_t *lock_options_;
333
  unsigned opt_timeout_;
334
335
  unsigned opt_max_retries_;
336
  unsigned opt_backoff_init_ms_;
337
  unsigned opt_backoff_max_ms_;
338
  bool opt_ipv4_only_;
339
340
  unsigned int max_available_jobs_;
341
  Semaphore *available_jobs_;
342
343
  // Writes and reads should be atomic because reading happens in a different
344
  // thread than writing.
345
  Statistics *statistics_;
346
347
  // Report not every occurance of throtteling but only every so often
348
  uint64_t timestamp_last_throttle_report_;
349
};  // S3FanoutManager
350
351
}  // namespace s3fanout
352
353
#endif  // CVMFS_S3FANOUT_H_