Directory: | cvmfs/ |
---|---|
File: | cvmfs/network/s3fanout.h |
Date: | 2025-06-22 02:36:02 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 52 | 67 | 77.6% |
Branches: | 11 | 30 | 36.7% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #ifndef CVMFS_NETWORK_S3FANOUT_H_ | ||
6 | #define CVMFS_NETWORK_S3FANOUT_H_ | ||
7 | |||
8 | #include <poll.h> | ||
9 | #include <semaphore.h> | ||
10 | |||
11 | #include <climits> | ||
12 | #include <cstdlib> | ||
13 | #include <map> | ||
14 | #include <set> | ||
15 | #include <string> | ||
16 | #include <utility> | ||
17 | #include <vector> | ||
18 | |||
19 | #include "dns.h" | ||
20 | #include "duplex_curl.h" | ||
21 | #include "ssl.h" | ||
22 | #include "util/concurrency.h" | ||
23 | #include "util/file_backed_buffer.h" | ||
24 | #include "util/mmap_file.h" | ||
25 | #include "util/pointer.h" | ||
26 | #include "util/prng.h" | ||
27 | #include "util/single_copy.h" | ||
28 | #include "util/smalloc.h" | ||
29 | |||
30 | namespace s3fanout { | ||
31 | |||
32 | enum AuthzMethods { | ||
33 | kAuthzAwsV2 = 0, | ||
34 | kAuthzAwsV4, | ||
35 | kAuthzAzure | ||
36 | }; | ||
37 | |||
38 | /** | ||
39 | * Possible return values. | ||
40 | */ | ||
41 | enum Failures { | ||
42 | kFailOk = 0, | ||
43 | kFailLocalIO, | ||
44 | kFailBadRequest, | ||
45 | kFailForbidden, | ||
46 | kFailHostResolve, | ||
47 | kFailHostConnection, | ||
48 | kFailNotFound, | ||
49 | kFailServiceUnavailable, | ||
50 | kFailRetry, | ||
51 | kFailOther, | ||
52 | |||
53 | kFailNumEntries | ||
54 | }; // Failures | ||
55 | |||
56 | |||
57 | ✗ | inline const char *Code2Ascii(const Failures error) { | |
58 | const char *texts[kFailNumEntries + 1]; | ||
59 | ✗ | texts[0] = "S3: OK"; | |
60 | ✗ | texts[1] = "S3: local I/O failure"; | |
61 | ✗ | texts[2] = "S3: malformed URL (bad request)"; | |
62 | ✗ | texts[3] = "S3: forbidden"; | |
63 | ✗ | texts[4] = "S3: failed to resolve host address"; | |
64 | ✗ | texts[5] = "S3: host connection problem"; | |
65 | ✗ | texts[6] = "S3: not found"; | |
66 | ✗ | texts[7] = "S3: service not available"; | |
67 | ✗ | texts[8] = "S3: unknown service error, perhaps wrong authentication protocol"; | |
68 | ✗ | texts[8] = "S3: too many requests, service asks for backoff and retry"; | |
69 | ✗ | texts[9] = "no text"; | |
70 | ✗ | return texts[error]; | |
71 | } | ||
72 | |||
73 | |||
74 | struct Statistics { | ||
75 | double transferred_bytes; | ||
76 | double transfer_time; | ||
77 | uint64_t num_requests; | ||
78 | uint64_t num_retries; | ||
79 | uint64_t ms_throttled; // Total waiting time imposed by HTTP 429 replies | ||
80 | |||
81 | 168 | Statistics() { | |
82 | 168 | transferred_bytes = 0.0; | |
83 | 168 | transfer_time = 0.0; | |
84 | 168 | num_requests = 0; | |
85 | 168 | num_retries = 0; | |
86 | 168 | ms_throttled = 0; | |
87 | 168 | } | |
88 | |||
89 | std::string Print() const; | ||
90 | }; // Statistics | ||
91 | |||
92 | |||
93 | /** | ||
94 | * Contains all the information to specify an upload job. | ||
95 | */ | ||
96 | struct JobInfo : SingleCopy { | ||
97 | enum RequestType { | ||
98 | kReqHeadOnly = 0, // peek | ||
99 | kReqHeadPut, // conditional upload of content-addressed objects | ||
100 | kReqPutCas, // immutable data object | ||
101 | kReqPutDotCvmfs, // one of the /.cvmfs... top level files | ||
102 | kReqPutHtml, // HTML file - display instead of downloading | ||
103 | kReqPutBucket, // bucket creation | ||
104 | kReqDelete, | ||
105 | }; | ||
106 | |||
107 | const std::string object_key; | ||
108 | void *callback; // Callback to be called when job is finished | ||
109 | UniquePtr<FileBackedBuffer> origin; | ||
110 | |||
111 | // One constructor per destination | ||
112 | 8645 | JobInfo(const std::string &object_key, | |
113 | void *callback, | ||
114 | FileBackedBuffer *origin) | ||
115 |
1/2✓ Branch 3 taken 8645 times.
✗ Branch 4 not taken.
|
8645 | : object_key(object_key), origin(origin) { |
116 | 8645 | JobInfoInit(); | |
117 | 8645 | this->callback = callback; | |
118 | 8645 | } | |
119 | 8645 | void JobInfoInit() { | |
120 | 8645 | curl_handle = NULL; | |
121 | 8645 | http_headers = NULL; | |
122 | 8645 | callback = NULL; | |
123 | 8645 | request = kReqPutCas; | |
124 | 8645 | error_code = kFailOk; | |
125 | 8645 | http_error = 0; | |
126 | 8645 | num_retries = 0; | |
127 | 8645 | backoff_ms = 0; | |
128 | 8645 | throttle_ms = 0; | |
129 | 8645 | throttle_timestamp = 0; | |
130 | 8645 | errorbuffer = reinterpret_cast<char *>( | |
131 | 8645 | smalloc(sizeof(char) * CURL_ERROR_SIZE)); | |
132 | 8645 | } | |
133 | 8645 | ~JobInfo() { free(errorbuffer); } | |
134 | |||
135 | // Internal state, don't touch | ||
136 | CURL *curl_handle; | ||
137 | struct curl_slist *http_headers; | ||
138 | uint64_t payload_size; | ||
139 | RequestType request; | ||
140 | Failures error_code; | ||
141 | int http_error; | ||
142 | unsigned char num_retries; | ||
143 | // Exponential backoff with cutoff in case of errors | ||
144 | unsigned backoff_ms; | ||
145 | // Throttle imposed by HTTP 429 reply; mutually exclusive with backoff_ms | ||
146 | unsigned throttle_ms; | ||
147 | // Remember when the 429 reply came in to only throttle if still necessary | ||
148 | uint64_t throttle_timestamp; | ||
149 | char *errorbuffer; | ||
150 | }; // JobInfo | ||
151 | |||
152 | struct S3FanOutDnsEntry { | ||
153 | 140 | S3FanOutDnsEntry() | |
154 | 140 | : counter(0) | |
155 | 140 | , dns_name() | |
156 | 140 | , ip() | |
157 |
1/2✓ Branch 2 taken 140 times.
✗ Branch 3 not taken.
|
140 | , port("80") |
158 | 140 | , clist(NULL) | |
159 | 140 | , sharehandle(NULL) { } | |
160 | unsigned int counter; | ||
161 | std::string dns_name; | ||
162 | std::string ip; | ||
163 | std::string port; | ||
164 | struct curl_slist *clist; | ||
165 | CURLSH *sharehandle; | ||
166 | }; // S3FanOutDnsEntry | ||
167 | |||
168 | |||
169 | class S3FanoutManager : SingleCopy { | ||
170 | protected: | ||
171 | typedef SynchronizingCounter<uint32_t> Semaphore; | ||
172 | |||
173 | public: | ||
174 | // 250ms pause after HTTP 429 "Too Many Retries" | ||
175 | static const unsigned kDefault429ThrottleMs; | ||
176 | // Don't throttle for more than a few seconds | ||
177 | static const unsigned kMax429ThrottleMs; | ||
178 | // Report throttle operations only every so often | ||
179 | static const unsigned kThrottleReportIntervalSec; | ||
180 | static const unsigned kDefaultHTTPPort; | ||
181 | static const unsigned kDefaultHTTPSPort; | ||
182 | |||
183 | struct S3Config { | ||
184 | 168 | S3Config() { | |
185 | 168 | authz_method = kAuthzAwsV2; | |
186 | 168 | dns_buckets = true; | |
187 |
1/2✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
|
168 | protocol = "http"; |
188 | 168 | pool_max_handles = 0; | |
189 | 168 | opt_timeout_sec = 20; | |
190 | 168 | opt_max_retries = 3; | |
191 | 168 | opt_backoff_init_ms = 100; | |
192 | 168 | opt_backoff_max_ms = 2000; | |
193 |
1/2✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
|
168 | x_amz_acl = "public-read"; |
194 | 168 | } | |
195 | std::string access_key; | ||
196 | std::string secret_key; | ||
197 | std::string hostname_port; | ||
198 | AuthzMethods authz_method; | ||
199 | std::string region; | ||
200 | std::string flavor; | ||
201 | std::string bucket; | ||
202 | bool dns_buckets; | ||
203 | std::string protocol; | ||
204 | uint32_t pool_max_handles; | ||
205 | unsigned opt_timeout_sec; | ||
206 | unsigned opt_max_retries; | ||
207 | unsigned opt_backoff_init_ms; | ||
208 | unsigned opt_backoff_max_ms; | ||
209 | std::string proxy; | ||
210 | std::string x_amz_acl; | ||
211 | }; | ||
212 | |||
213 | static void DetectThrottleIndicator(const std::string &header, JobInfo *info); | ||
214 | |||
215 | explicit S3FanoutManager(const S3Config &config); | ||
216 | |||
217 | ~S3FanoutManager(); | ||
218 | |||
219 | void Spawn(); | ||
220 | |||
221 | void PushNewJob(JobInfo *info); | ||
222 | void PushCompletedJob(JobInfo *info); | ||
223 | JobInfo *PopCompletedJob(); | ||
224 | |||
225 | const Statistics &GetStatistics(); | ||
226 | |||
227 | private: | ||
228 | // Reflects the default Apache configuration of the local backend | ||
229 | static const char *kCacheControlCas; // Cache-Control: max-age=259200 | ||
230 | static const char *kCacheControlDotCvmfs; // Cache-Control: max-age=61 | ||
231 | static const unsigned kLowSpeedLimit = 1024; // Require at least 1kB/s | ||
232 | |||
233 | static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, | ||
234 | void *userp, void *socketp); | ||
235 | static void *MainUpload(void *data); | ||
236 | std::vector<s3fanout::JobInfo *> jobs_todo_; | ||
237 | pthread_mutex_t *jobs_todo_lock_; | ||
238 | pthread_mutex_t *curl_handle_lock_; | ||
239 | |||
240 | CURL *AcquireCurlHandle() const; | ||
241 | void ReleaseCurlHandle(JobInfo *info, CURL *handle) const; | ||
242 | void InitPipeWatchFds(); | ||
243 | int InitializeDnsSettings(CURL *handle, std::string remote_host) const; | ||
244 | void InitializeDnsSettingsCurl(CURL *handle, CURLSH *sharehandle, | ||
245 | curl_slist *clist) const; | ||
246 | Failures InitializeRequest(JobInfo *info, CURL *handle) const; | ||
247 | void SetUrlOptions(JobInfo *info) const; | ||
248 | void UpdateStatistics(CURL *handle); | ||
249 | bool CanRetry(const JobInfo *info); | ||
250 | void Backoff(JobInfo *info); | ||
251 | bool VerifyAndFinalize(const int curl_error, JobInfo *info); | ||
252 | std::string GetRequestString(const JobInfo &info) const; | ||
253 | std::string GetContentType(const JobInfo &info) const; | ||
254 | std::string GetUriEncode(const std::string &val, bool encode_slash) const; | ||
255 | std::string GetAwsV4SigningKey(const std::string &date) const; | ||
256 | bool MkPayloadHash(const JobInfo &info, std::string *hex_hash) const; | ||
257 | bool MkV2Authz(const JobInfo &info, std::vector<std::string> *headers) const; | ||
258 | bool MkV4Authz(const JobInfo &info, std::vector<std::string> *headers) const; | ||
259 | bool MkAzureAuthz(const JobInfo &info, | ||
260 | std::vector<std::string> *headers) const; | ||
261 | 17108 | std::string MkUrl(const std::string &objkey) const { | |
262 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 17108 times.
|
17108 | if (config_.dns_buckets) { |
263 | ✗ | return config_.protocol + "://" + complete_hostname_ + "/" + objkey; | |
264 | } else { | ||
265 |
2/4✓ Branch 1 taken 17108 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 17108 times.
✗ Branch 5 not taken.
|
34216 | return config_.protocol + "://" + complete_hostname_ + "/" |
266 |
3/6✓ Branch 2 taken 17108 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 17108 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 17108 times.
✗ Branch 9 not taken.
|
51324 | + config_.bucket + "/" + objkey; |
267 | } | ||
268 | } | ||
269 | 168 | std::string MkCompleteHostname() { | |
270 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 168 times.
|
168 | if (config_.dns_buckets) { |
271 | ✗ | return config_.bucket + "." + config_.hostname_port; | |
272 | } else { | ||
273 | 168 | return config_.hostname_port; | |
274 | } | ||
275 | } | ||
276 | |||
277 | const S3Config config_; | ||
278 | std::string complete_hostname_; | ||
279 | |||
280 | Prng prng_; | ||
281 | /** | ||
282 | * This is not strictly necessary but it helps the debugging | ||
283 | */ | ||
284 | std::set<JobInfo *> *active_requests_; | ||
285 | |||
286 | std::set<CURL *> *pool_handles_idle_; | ||
287 | std::set<CURL *> *pool_handles_inuse_; | ||
288 | std::set<S3FanOutDnsEntry *> *sharehandles_; | ||
289 | std::map<CURL *, S3FanOutDnsEntry *> *curl_sharehandles_; | ||
290 | dns::CaresResolver *resolver_; | ||
291 | CURLM *curl_multi_; | ||
292 | std::string *user_agent_; | ||
293 | |||
294 | /** | ||
295 | * AWS4 signing keys are derived from the secret key, a region and a date. | ||
296 | * The signing key for current day can be cached. | ||
297 | */ | ||
298 | mutable std::pair<std::string, std::string> last_signing_key_; | ||
299 | |||
300 | pthread_t thread_upload_; | ||
301 | atomic_int32 multi_threaded_; | ||
302 | |||
303 | struct pollfd *watch_fds_; | ||
304 | uint32_t watch_fds_size_; | ||
305 | uint32_t watch_fds_inuse_; | ||
306 | uint32_t watch_fds_max_; | ||
307 | |||
308 | // A pipe used to signal termination from S3FanoutManager to MainUpload | ||
309 | // thread. Anything written into it results in MainUpload thread exit. | ||
310 | int pipe_terminate_[2]; | ||
311 | // A pipe to used to push jobs from S3FanoutManager to MainUpload thread. | ||
312 | // S3FanoutManager writes a JobInfo* pointer. MainUpload then reads the | ||
313 | // pointer and processes the job. | ||
314 | int pipe_jobs_[2]; | ||
315 | // A pipe used to collect completed jobs. MainUpload writes in the | ||
316 | // pointer to the completed job. PopCompletedJob() used to | ||
317 | // retrieve pointer. | ||
318 | int pipe_completed_[2]; | ||
319 | |||
320 | bool opt_ipv4_only_; | ||
321 | |||
322 | unsigned int max_available_jobs_; | ||
323 | Semaphore *available_jobs_; | ||
324 | |||
325 | // Writes and reads should be atomic because reading happens in a different | ||
326 | // thread than writing. | ||
327 | Statistics *statistics_; | ||
328 | |||
329 | // Report not every occurrence of throtteling but only every so often | ||
330 | uint64_t timestamp_last_throttle_report_; | ||
331 | |||
332 | bool is_curl_debug_; | ||
333 | |||
334 | /** | ||
335 | * Carries the path settings for SSL certificates | ||
336 | */ | ||
337 | SslCertificateStore ssl_certificate_store_; | ||
338 | }; // S3FanoutManager | ||
339 | |||
340 | } // namespace s3fanout | ||
341 | |||
342 | #endif // CVMFS_NETWORK_S3FANOUT_H_ | ||
343 |