Directory: | cvmfs/ |
---|---|
File: | cvmfs/s3fanout.h |
Date: | 2023-02-05 02:36:10 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 50 | 65 | 76.9% |
Branches: | 11 | 30 | 36.7% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #ifndef CVMFS_S3FANOUT_H_ | ||
6 | #define CVMFS_S3FANOUT_H_ | ||
7 | |||
8 | #include <poll.h> | ||
9 | #include <semaphore.h> | ||
10 | |||
11 | #include <climits> | ||
12 | #include <cstdlib> | ||
13 | #include <map> | ||
14 | #include <set> | ||
15 | #include <string> | ||
16 | #include <utility> | ||
17 | #include <vector> | ||
18 | |||
19 | #include "dns.h" | ||
20 | #include "duplex_curl.h" | ||
21 | #include "ssl.h" | ||
22 | #include "util/concurrency.h" | ||
23 | #include "util/file_backed_buffer.h" | ||
24 | #include "util/mmap_file.h" | ||
25 | #include "util/pointer.h" | ||
26 | #include "util/prng.h" | ||
27 | #include "util/single_copy.h" | ||
28 | #include "util/smalloc.h" | ||
29 | |||
30 | namespace s3fanout { | ||
31 | |||
32 | enum AuthzMethods { | ||
33 | kAuthzAwsV2 = 0, | ||
34 | kAuthzAwsV4, | ||
35 | kAuthzAzure | ||
36 | }; | ||
37 | |||
38 | /** | ||
39 | * Possible return values. | ||
40 | */ | ||
41 | enum Failures { | ||
42 | kFailOk = 0, | ||
43 | kFailLocalIO, | ||
44 | kFailBadRequest, | ||
45 | kFailForbidden, | ||
46 | kFailHostResolve, | ||
47 | kFailHostConnection, | ||
48 | kFailNotFound, | ||
49 | kFailServiceUnavailable, | ||
50 | kFailRetry, | ||
51 | kFailOther, | ||
52 | |||
53 | kFailNumEntries | ||
54 | }; // Failures | ||
55 | |||
56 | |||
57 | ✗ | inline const char *Code2Ascii(const Failures error) { | |
58 | const char *texts[kFailNumEntries + 1]; | ||
59 | ✗ | texts[0] = "S3: OK"; | |
60 | ✗ | texts[1] = "S3: local I/O failure"; | |
61 | ✗ | texts[2] = "S3: malformed URL (bad request)"; | |
62 | ✗ | texts[3] = "S3: forbidden"; | |
63 | ✗ | texts[4] = "S3: failed to resolve host address"; | |
64 | ✗ | texts[5] = "S3: host connection problem"; | |
65 | ✗ | texts[6] = "S3: not found"; | |
66 | ✗ | texts[7] = "S3: service not available"; | |
67 | ✗ | texts[8] = "S3: unknown service error, perhaps wrong authentication protocol"; | |
68 | ✗ | texts[8] = "S3: too many requests, service asks for backoff and retry"; | |
69 | ✗ | texts[9] = "no text"; | |
70 | ✗ | return texts[error]; | |
71 | } | ||
72 | |||
73 | |||
74 | |||
75 | struct Statistics { | ||
76 | double transferred_bytes; | ||
77 | double transfer_time; | ||
78 | uint64_t num_requests; | ||
79 | uint64_t num_retries; | ||
80 | uint64_t ms_throttled; // Total waiting time imposed by HTTP 429 replies | ||
81 | |||
82 | 12 | Statistics() { | |
83 | 12 | transferred_bytes = 0.0; | |
84 | 12 | transfer_time = 0.0; | |
85 | 12 | num_requests = 0; | |
86 | 12 | num_retries = 0; | |
87 | 12 | ms_throttled = 0; | |
88 | 12 | } | |
89 | |||
90 | std::string Print() const; | ||
91 | }; // Statistics | ||
92 | |||
93 | |||
94 | /** | ||
95 | * Contains all the information to specify an upload job. | ||
96 | */ | ||
97 | struct JobInfo : SingleCopy { | ||
98 | enum RequestType { | ||
99 | kReqHeadOnly = 0, // peek | ||
100 | kReqHeadPut, // conditional upload of content-addressed objects | ||
101 | kReqPutCas, // immutable data object | ||
102 | kReqPutDotCvmfs, // one of the /.cvmfs... top level files | ||
103 | kReqPutHtml, // HTML file - display instead of downloading | ||
104 | kReqPutBucket, // bucket creation | ||
105 | kReqDelete, | ||
106 | }; | ||
107 | |||
108 | const std::string object_key; | ||
109 | void *callback; // Callback to be called when job is finished | ||
110 | UniquePtr<FileBackedBuffer> origin; | ||
111 | |||
112 | // One constructor per destination | ||
113 | 615 | JobInfo( | |
114 | const std::string &object_key, | ||
115 | void *callback, | ||
116 | FileBackedBuffer *origin) | ||
117 | 615 | : object_key(object_key), | |
118 |
1/2✓ Branch 2 taken 615 times.
✗ Branch 3 not taken.
|
1230 | origin(origin) |
119 | { | ||
120 | 615 | JobInfoInit(); | |
121 | 615 | this->callback = callback; | |
122 | 615 | } | |
123 | 615 | void JobInfoInit() { | |
124 | 615 | curl_handle = NULL; | |
125 | 615 | http_headers = NULL; | |
126 | 615 | callback = NULL; | |
127 | 615 | request = kReqPutCas; | |
128 | 615 | error_code = kFailOk; | |
129 | 615 | http_error = 0; | |
130 | 615 | num_retries = 0; | |
131 | 615 | backoff_ms = 0; | |
132 | 615 | throttle_ms = 0; | |
133 | 615 | throttle_timestamp = 0; | |
134 | 615 | errorbuffer = | |
135 | 615 | reinterpret_cast<char *>(smalloc(sizeof(char) * CURL_ERROR_SIZE)); | |
136 | 615 | } | |
137 | 1230 | ~JobInfo() { | |
138 | 615 | free(errorbuffer); | |
139 | 615 | } | |
140 | |||
141 | // Internal state, don't touch | ||
142 | CURL *curl_handle; | ||
143 | struct curl_slist *http_headers; | ||
144 | uint64_t payload_size; | ||
145 | RequestType request; | ||
146 | Failures error_code; | ||
147 | int http_error; | ||
148 | unsigned char num_retries; | ||
149 | // Exponential backoff with cutoff in case of errors | ||
150 | unsigned backoff_ms; | ||
151 | // Throttle imposed by HTTP 429 reply; mutually exclusive with backoff_ms | ||
152 | unsigned throttle_ms; | ||
153 | // Remember when the 429 reply came in to only throttle if still necessary | ||
154 | uint64_t throttle_timestamp; | ||
155 | char *errorbuffer; | ||
156 | }; // JobInfo | ||
157 | |||
158 | struct S3FanOutDnsEntry { | ||
159 |
1/2✓ Branch 4 taken 10 times.
✗ Branch 5 not taken.
|
10 | S3FanOutDnsEntry() : counter(0), dns_name(), ip(), port("80"), |
160 | 10 | clist(NULL), sharehandle(NULL) {} | |
161 | unsigned int counter; | ||
162 | std::string dns_name; | ||
163 | std::string ip; | ||
164 | std::string port; | ||
165 | struct curl_slist *clist; | ||
166 | CURLSH *sharehandle; | ||
167 | }; // S3FanOutDnsEntry | ||
168 | |||
169 | |||
170 | class S3FanoutManager : SingleCopy { | ||
171 | protected: | ||
172 | typedef SynchronizingCounter<uint32_t> Semaphore; | ||
173 | |||
174 | public: | ||
175 | // 250ms pause after HTTP 429 "Too Many Retries" | ||
176 | static const unsigned kDefault429ThrottleMs; | ||
177 | // Don't throttle for more than a few seconds | ||
178 | static const unsigned kMax429ThrottleMs; | ||
179 | // Report throttle operations only every so often | ||
180 | static const unsigned kThrottleReportIntervalSec; | ||
181 | static const unsigned kDefaultHTTPPort; | ||
182 | static const unsigned kDefaultHTTPSPort; | ||
183 | |||
184 | struct S3Config { | ||
185 | 12 | S3Config() { | |
186 | 12 | authz_method = kAuthzAwsV2; | |
187 | 12 | dns_buckets = true; | |
188 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | protocol = "http"; |
189 | 12 | pool_max_handles = 0; | |
190 | 12 | opt_timeout_sec = 20; | |
191 | 12 | opt_max_retries = 3; | |
192 | 12 | opt_backoff_init_ms = 100; | |
193 | 12 | opt_backoff_max_ms = 2000; | |
194 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | x_amz_acl = "public-read"; |
195 | 12 | } | |
196 | std::string access_key; | ||
197 | std::string secret_key; | ||
198 | std::string hostname_port; | ||
199 | AuthzMethods authz_method; | ||
200 | std::string region; | ||
201 | std::string flavor; | ||
202 | std::string bucket; | ||
203 | bool dns_buckets; | ||
204 | std::string protocol; | ||
205 | uint32_t pool_max_handles; | ||
206 | unsigned opt_timeout_sec; | ||
207 | unsigned opt_max_retries; | ||
208 | unsigned opt_backoff_init_ms; | ||
209 | unsigned opt_backoff_max_ms; | ||
210 | std::string proxy; | ||
211 | std::string x_amz_acl; | ||
212 | }; | ||
213 | |||
214 | static void DetectThrottleIndicator(const std::string &header, JobInfo *info); | ||
215 | |||
216 | explicit S3FanoutManager(const S3Config &config); | ||
217 | |||
218 | ~S3FanoutManager(); | ||
219 | |||
220 | void Spawn(); | ||
221 | |||
222 | void PushNewJob(JobInfo *info); | ||
223 | void PushCompletedJob(JobInfo *info); | ||
224 | JobInfo *PopCompletedJob(); | ||
225 | |||
226 | const Statistics &GetStatistics(); | ||
227 | |||
228 | private: | ||
229 | // Reflects the default Apache configuration of the local backend | ||
230 | static const char *kCacheControlCas; // Cache-Control: max-age=259200 | ||
231 | static const char *kCacheControlDotCvmfs; // Cache-Control: max-age=61 | ||
232 | static const unsigned kLowSpeedLimit = 1024; // Require at least 1kB/s | ||
233 | |||
234 | static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, | ||
235 | void *userp, void *socketp); | ||
236 | static void *MainUpload(void *data); | ||
237 | std::vector<s3fanout::JobInfo*> jobs_todo_; | ||
238 | pthread_mutex_t *jobs_todo_lock_; | ||
239 | pthread_mutex_t *curl_handle_lock_; | ||
240 | |||
241 | CURL *AcquireCurlHandle() const; | ||
242 | void ReleaseCurlHandle(JobInfo *info, CURL *handle) const; | ||
243 | void InitPipeWatchFds(); | ||
244 | int InitializeDnsSettings(CURL *handle, | ||
245 | std::string remote_host) const; | ||
246 | void InitializeDnsSettingsCurl(CURL *handle, CURLSH *sharehandle, | ||
247 | curl_slist *clist) const; | ||
248 | Failures InitializeRequest(JobInfo *info, CURL *handle) const; | ||
249 | void SetUrlOptions(JobInfo *info) const; | ||
250 | void UpdateStatistics(CURL *handle); | ||
251 | bool CanRetry(const JobInfo *info); | ||
252 | void Backoff(JobInfo *info); | ||
253 | bool VerifyAndFinalize(const int curl_error, JobInfo *info); | ||
254 | std::string GetRequestString(const JobInfo &info) const; | ||
255 | std::string GetContentType(const JobInfo &info) const; | ||
256 | std::string GetUriEncode(const std::string &val, bool encode_slash) const; | ||
257 | std::string GetAwsV4SigningKey(const std::string &date) const; | ||
258 | bool MkPayloadHash(const JobInfo &info, std::string *hex_hash) const; | ||
259 | bool MkV2Authz(const JobInfo &info, | ||
260 | std::vector<std::string> *headers) const; | ||
261 | bool MkV4Authz(const JobInfo &info, | ||
262 | std::vector<std::string> *headers) const; | ||
263 | bool MkAzureAuthz(const JobInfo &info, | ||
264 | std::vector<std::string> *headers) const; | ||
265 | 1222 | std::string MkUrl(const std::string &objkey) const { | |
266 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | if (config_.dns_buckets) { |
267 | ✗ | return config_.protocol + "://" + complete_hostname_ + "/" + objkey; | |
268 | } else { | ||
269 |
3/6✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 1222 times.
✗ Branch 8 not taken.
|
2444 | return config_.protocol + "://" + complete_hostname_ + "/" + |
270 |
2/4✓ Branch 2 taken 1222 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1222 times.
✗ Branch 6 not taken.
|
3666 | config_.bucket + "/" + objkey; |
271 | } | ||
272 | } | ||
273 | 12 | std::string MkCompleteHostname() { | |
274 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
|
12 | if (config_.dns_buckets) { |
275 | ✗ | return config_.bucket + "." + config_.hostname_port; | |
276 | } else { | ||
277 | 12 | return config_.hostname_port; | |
278 | } | ||
279 | } | ||
280 | |||
281 | const S3Config config_; | ||
282 | std::string complete_hostname_; | ||
283 | |||
284 | Prng prng_; | ||
285 | /** | ||
286 | * This is not strictly necessary but it helps the debugging | ||
287 | */ | ||
288 | std::set<JobInfo *> *active_requests_; | ||
289 | |||
290 | std::set<CURL *> *pool_handles_idle_; | ||
291 | std::set<CURL *> *pool_handles_inuse_; | ||
292 | std::set<S3FanOutDnsEntry *> *sharehandles_; | ||
293 | std::map<CURL *, S3FanOutDnsEntry *> *curl_sharehandles_; | ||
294 | dns::CaresResolver *resolver_; | ||
295 | CURLM *curl_multi_; | ||
296 | std::string *user_agent_; | ||
297 | |||
298 | /** | ||
299 | * AWS4 signing keys are derived from the secret key, a region and a date. | ||
300 | * The signing key for current day can be cached. | ||
301 | */ | ||
302 | mutable std::pair<std::string, std::string> last_signing_key_; | ||
303 | |||
304 | pthread_t thread_upload_; | ||
305 | atomic_int32 multi_threaded_; | ||
306 | |||
307 | struct pollfd *watch_fds_; | ||
308 | uint32_t watch_fds_size_; | ||
309 | uint32_t watch_fds_inuse_; | ||
310 | uint32_t watch_fds_max_; | ||
311 | |||
312 | // A pipe used to signal termination from S3FanoutManager to MainUpload | ||
313 | // thread. Anything written into it results in MainUpload thread exit. | ||
314 | int pipe_terminate_[2]; | ||
315 | // A pipe to used to push jobs from S3FanoutManager to MainUpload thread. | ||
316 | // S3FanoutManager writes a JobInfo* pointer. MainUpload then reads the | ||
317 | // pointer and processes the job. | ||
318 | int pipe_jobs_[2]; | ||
319 | // A pipe used to collect completed jobs. MainUpload writes in the | ||
320 | // pointer to the completed job. PopCompletedJob() used to | ||
321 | // retrieve pointer. | ||
322 | int pipe_completed_[2]; | ||
323 | |||
324 | bool opt_ipv4_only_; | ||
325 | |||
326 | unsigned int max_available_jobs_; | ||
327 | Semaphore *available_jobs_; | ||
328 | |||
329 | // Writes and reads should be atomic because reading happens in a different | ||
330 | // thread than writing. | ||
331 | Statistics *statistics_; | ||
332 | |||
333 | // Report not every occurance of throtteling but only every so often | ||
334 | uint64_t timestamp_last_throttle_report_; | ||
335 | |||
336 | bool is_curl_debug_; | ||
337 | |||
338 | /** | ||
339 | * Carries the path settings for SSL certificates | ||
340 | */ | ||
341 | SslCertificateStore ssl_certificate_store_; | ||
342 | }; // S3FanoutManager | ||
343 | |||
344 | } // namespace s3fanout | ||
345 | |||
346 | #endif // CVMFS_S3FANOUT_H_ | ||
347 |