GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/s3fanout.h
Date: 2023-02-05 02:36:10
Exec Total Coverage
Lines: 50 65 76.9%
Branches: 11 30 36.7%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_S3FANOUT_H_
6 #define CVMFS_S3FANOUT_H_
7
8 #include <poll.h>
9 #include <semaphore.h>
10
11 #include <climits>
12 #include <cstdlib>
13 #include <map>
14 #include <set>
15 #include <string>
16 #include <utility>
17 #include <vector>
18
19 #include "dns.h"
20 #include "duplex_curl.h"
21 #include "ssl.h"
22 #include "util/concurrency.h"
23 #include "util/file_backed_buffer.h"
24 #include "util/mmap_file.h"
25 #include "util/pointer.h"
26 #include "util/prng.h"
27 #include "util/single_copy.h"
28 #include "util/smalloc.h"
29
30 namespace s3fanout {
31
32 enum AuthzMethods {
33 kAuthzAwsV2 = 0,
34 kAuthzAwsV4,
35 kAuthzAzure
36 };
37
38 /**
39 * Possible return values.
40 */
41 enum Failures {
42 kFailOk = 0,
43 kFailLocalIO,
44 kFailBadRequest,
45 kFailForbidden,
46 kFailHostResolve,
47 kFailHostConnection,
48 kFailNotFound,
49 kFailServiceUnavailable,
50 kFailRetry,
51 kFailOther,
52
53 kFailNumEntries
54 }; // Failures
55
56
57 inline const char *Code2Ascii(const Failures error) {
58 const char *texts[kFailNumEntries + 1];
59 texts[0] = "S3: OK";
60 texts[1] = "S3: local I/O failure";
61 texts[2] = "S3: malformed URL (bad request)";
62 texts[3] = "S3: forbidden";
63 texts[4] = "S3: failed to resolve host address";
64 texts[5] = "S3: host connection problem";
65 texts[6] = "S3: not found";
66 texts[7] = "S3: service not available";
67 texts[8] = "S3: unknown service error, perhaps wrong authentication protocol";
68 texts[8] = "S3: too many requests, service asks for backoff and retry";
69 texts[9] = "no text";
70 return texts[error];
71 }
72
73
74
75 struct Statistics {
76 double transferred_bytes;
77 double transfer_time;
78 uint64_t num_requests;
79 uint64_t num_retries;
80 uint64_t ms_throttled; // Total waiting time imposed by HTTP 429 replies
81
82 12 Statistics() {
83 12 transferred_bytes = 0.0;
84 12 transfer_time = 0.0;
85 12 num_requests = 0;
86 12 num_retries = 0;
87 12 ms_throttled = 0;
88 12 }
89
90 std::string Print() const;
91 }; // Statistics
92
93
94 /**
95 * Contains all the information to specify an upload job.
96 */
97 struct JobInfo : SingleCopy {
98 enum RequestType {
99 kReqHeadOnly = 0, // peek
100 kReqHeadPut, // conditional upload of content-addressed objects
101 kReqPutCas, // immutable data object
102 kReqPutDotCvmfs, // one of the /.cvmfs... top level files
103 kReqPutHtml, // HTML file - display instead of downloading
104 kReqPutBucket, // bucket creation
105 kReqDelete,
106 };
107
108 const std::string object_key;
109 void *callback; // Callback to be called when job is finished
110 UniquePtr<FileBackedBuffer> origin;
111
112 // One constructor per destination
113 615 JobInfo(
114 const std::string &object_key,
115 void *callback,
116 FileBackedBuffer *origin)
117 615 : object_key(object_key),
118
1/2
✓ Branch 2 taken 615 times.
✗ Branch 3 not taken.
1230 origin(origin)
119 {
120 615 JobInfoInit();
121 615 this->callback = callback;
122 615 }
123 615 void JobInfoInit() {
124 615 curl_handle = NULL;
125 615 http_headers = NULL;
126 615 callback = NULL;
127 615 request = kReqPutCas;
128 615 error_code = kFailOk;
129 615 http_error = 0;
130 615 num_retries = 0;
131 615 backoff_ms = 0;
132 615 throttle_ms = 0;
133 615 throttle_timestamp = 0;
134 615 errorbuffer =
135 615 reinterpret_cast<char *>(smalloc(sizeof(char) * CURL_ERROR_SIZE));
136 615 }
137 1230 ~JobInfo() {
138 615 free(errorbuffer);
139 615 }
140
141 // Internal state, don't touch
142 CURL *curl_handle;
143 struct curl_slist *http_headers;
144 uint64_t payload_size;
145 RequestType request;
146 Failures error_code;
147 int http_error;
148 unsigned char num_retries;
149 // Exponential backoff with cutoff in case of errors
150 unsigned backoff_ms;
151 // Throttle imposed by HTTP 429 reply; mutually exclusive with backoff_ms
152 unsigned throttle_ms;
153 // Remember when the 429 reply came in to only throttle if still necessary
154 uint64_t throttle_timestamp;
155 char *errorbuffer;
156 }; // JobInfo
157
158 struct S3FanOutDnsEntry {
159
1/2
✓ Branch 4 taken 10 times.
✗ Branch 5 not taken.
10 S3FanOutDnsEntry() : counter(0), dns_name(), ip(), port("80"),
160 10 clist(NULL), sharehandle(NULL) {}
161 unsigned int counter;
162 std::string dns_name;
163 std::string ip;
164 std::string port;
165 struct curl_slist *clist;
166 CURLSH *sharehandle;
167 }; // S3FanOutDnsEntry
168
169
170 class S3FanoutManager : SingleCopy {
171 protected:
172 typedef SynchronizingCounter<uint32_t> Semaphore;
173
174 public:
175 // 250ms pause after HTTP 429 "Too Many Retries"
176 static const unsigned kDefault429ThrottleMs;
177 // Don't throttle for more than a few seconds
178 static const unsigned kMax429ThrottleMs;
179 // Report throttle operations only every so often
180 static const unsigned kThrottleReportIntervalSec;
181 static const unsigned kDefaultHTTPPort;
182 static const unsigned kDefaultHTTPSPort;
183
184 struct S3Config {
185 12 S3Config() {
186 12 authz_method = kAuthzAwsV2;
187 12 dns_buckets = true;
188
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 protocol = "http";
189 12 pool_max_handles = 0;
190 12 opt_timeout_sec = 20;
191 12 opt_max_retries = 3;
192 12 opt_backoff_init_ms = 100;
193 12 opt_backoff_max_ms = 2000;
194
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 x_amz_acl = "public-read";
195 12 }
196 std::string access_key;
197 std::string secret_key;
198 std::string hostname_port;
199 AuthzMethods authz_method;
200 std::string region;
201 std::string flavor;
202 std::string bucket;
203 bool dns_buckets;
204 std::string protocol;
205 uint32_t pool_max_handles;
206 unsigned opt_timeout_sec;
207 unsigned opt_max_retries;
208 unsigned opt_backoff_init_ms;
209 unsigned opt_backoff_max_ms;
210 std::string proxy;
211 std::string x_amz_acl;
212 };
213
214 static void DetectThrottleIndicator(const std::string &header, JobInfo *info);
215
216 explicit S3FanoutManager(const S3Config &config);
217
218 ~S3FanoutManager();
219
220 void Spawn();
221
222 void PushNewJob(JobInfo *info);
223 void PushCompletedJob(JobInfo *info);
224 JobInfo *PopCompletedJob();
225
226 const Statistics &GetStatistics();
227
228 private:
229 // Reflects the default Apache configuration of the local backend
230 static const char *kCacheControlCas; // Cache-Control: max-age=259200
231 static const char *kCacheControlDotCvmfs; // Cache-Control: max-age=61
232 static const unsigned kLowSpeedLimit = 1024; // Require at least 1kB/s
233
234 static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action,
235 void *userp, void *socketp);
236 static void *MainUpload(void *data);
237 std::vector<s3fanout::JobInfo*> jobs_todo_;
238 pthread_mutex_t *jobs_todo_lock_;
239 pthread_mutex_t *curl_handle_lock_;
240
241 CURL *AcquireCurlHandle() const;
242 void ReleaseCurlHandle(JobInfo *info, CURL *handle) const;
243 void InitPipeWatchFds();
244 int InitializeDnsSettings(CURL *handle,
245 std::string remote_host) const;
246 void InitializeDnsSettingsCurl(CURL *handle, CURLSH *sharehandle,
247 curl_slist *clist) const;
248 Failures InitializeRequest(JobInfo *info, CURL *handle) const;
249 void SetUrlOptions(JobInfo *info) const;
250 void UpdateStatistics(CURL *handle);
251 bool CanRetry(const JobInfo *info);
252 void Backoff(JobInfo *info);
253 bool VerifyAndFinalize(const int curl_error, JobInfo *info);
254 std::string GetRequestString(const JobInfo &info) const;
255 std::string GetContentType(const JobInfo &info) const;
256 std::string GetUriEncode(const std::string &val, bool encode_slash) const;
257 std::string GetAwsV4SigningKey(const std::string &date) const;
258 bool MkPayloadHash(const JobInfo &info, std::string *hex_hash) const;
259 bool MkV2Authz(const JobInfo &info,
260 std::vector<std::string> *headers) const;
261 bool MkV4Authz(const JobInfo &info,
262 std::vector<std::string> *headers) const;
263 bool MkAzureAuthz(const JobInfo &info,
264 std::vector<std::string> *headers) const;
265 1222 std::string MkUrl(const std::string &objkey) const {
266
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
1222 if (config_.dns_buckets) {
267 return config_.protocol + "://" + complete_hostname_ + "/" + objkey;
268 } else {
269
3/6
✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 1222 times.
✗ Branch 8 not taken.
2444 return config_.protocol + "://" + complete_hostname_ + "/" +
270
2/4
✓ Branch 2 taken 1222 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1222 times.
✗ Branch 6 not taken.
3666 config_.bucket + "/" + objkey;
271 }
272 }
273 12 std::string MkCompleteHostname() {
274
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
12 if (config_.dns_buckets) {
275 return config_.bucket + "." + config_.hostname_port;
276 } else {
277 12 return config_.hostname_port;
278 }
279 }
280
281 const S3Config config_;
282 std::string complete_hostname_;
283
284 Prng prng_;
285 /**
286 * This is not strictly necessary but it helps the debugging
287 */
288 std::set<JobInfo *> *active_requests_;
289
290 std::set<CURL *> *pool_handles_idle_;
291 std::set<CURL *> *pool_handles_inuse_;
292 std::set<S3FanOutDnsEntry *> *sharehandles_;
293 std::map<CURL *, S3FanOutDnsEntry *> *curl_sharehandles_;
294 dns::CaresResolver *resolver_;
295 CURLM *curl_multi_;
296 std::string *user_agent_;
297
298 /**
299 * AWS4 signing keys are derived from the secret key, a region and a date.
300 * The signing key for current day can be cached.
301 */
302 mutable std::pair<std::string, std::string> last_signing_key_;
303
304 pthread_t thread_upload_;
305 atomic_int32 multi_threaded_;
306
307 struct pollfd *watch_fds_;
308 uint32_t watch_fds_size_;
309 uint32_t watch_fds_inuse_;
310 uint32_t watch_fds_max_;
311
312 // A pipe used to signal termination from S3FanoutManager to MainUpload
313 // thread. Anything written into it results in MainUpload thread exit.
314 int pipe_terminate_[2];
315 // A pipe to used to push jobs from S3FanoutManager to MainUpload thread.
316 // S3FanoutManager writes a JobInfo* pointer. MainUpload then reads the
317 // pointer and processes the job.
318 int pipe_jobs_[2];
319 // A pipe used to collect completed jobs. MainUpload writes in the
320 // pointer to the completed job. PopCompletedJob() used to
321 // retrieve pointer.
322 int pipe_completed_[2];
323
324 bool opt_ipv4_only_;
325
326 unsigned int max_available_jobs_;
327 Semaphore *available_jobs_;
328
329 // Writes and reads should be atomic because reading happens in a different
330 // thread than writing.
331 Statistics *statistics_;
332
333 // Report not every occurance of throtteling but only every so often
334 uint64_t timestamp_last_throttle_report_;
335
336 bool is_curl_debug_;
337
338 /**
339 * Carries the path settings for SSL certificates
340 */
341 SslCertificateStore ssl_certificate_store_;
342 }; // S3FanoutManager
343
344 } // namespace s3fanout
345
346 #endif // CVMFS_S3FANOUT_H_
347