GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/network/s3fanout.h
Date: 2026-04-26 02:35:59
Exec Total Coverage
Lines: 55 71 77.5%
Branches: 16 38 42.1%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_NETWORK_S3FANOUT_H_
6 #define CVMFS_NETWORK_S3FANOUT_H_
7
8 #include <poll.h>
9 #include <semaphore.h>
10
11 #include <climits>
12 #include <cstdlib>
13 #include <map>
14 #include <set>
15 #include <string>
16 #include <utility>
17 #include <vector>
18
19 #include "dns.h"
20 #include "duplex_curl.h" // IWYU pragma: keep
21 #include "ssl.h"
22 #include "util/concurrency.h"
23 #include "util/file_backed_buffer.h"
24 #include "util/pointer.h"
25 #include "util/prng.h"
26 #include "util/single_copy.h"
27 #include "util/smalloc.h"
28
29 namespace s3fanout {
30
31 enum AuthzMethods {
32 kAuthzAwsV2 = 0,
33 kAuthzAwsV4,
34 kAuthzAzure
35 };
36
37 /**
38 * Possible return values.
39 */
40 enum Failures {
41 kFailOk = 0,
42 kFailLocalIO,
43 kFailBadRequest,
44 kFailForbidden,
45 kFailHostResolve,
46 kFailHostConnection,
47 kFailNotFound,
48 kFailServiceUnavailable,
49 kFailRetry,
50 kFailInsufficientStorage,
51 kFailOther,
52
53 kFailNumEntries
54 }; // Failures
55
56
57 inline const char *Code2Ascii(const Failures error) {
58 const char *texts[kFailNumEntries + 1];
59 texts[0] = "S3: OK";
60 texts[1] = "S3: local I/O failure";
61 texts[2] = "S3: malformed URL (bad request)";
62 texts[3] = "S3: forbidden";
63 texts[4] = "S3: failed to resolve host address";
64 texts[5] = "S3: host connection problem";
65 texts[6] = "S3: not found";
66 texts[7] = "S3: service not available";
67 texts[8] = "S3: unknown service error, perhaps wrong authentication protocol";
68 texts[8] = "S3: too many requests, service asks for backoff and retry";
69 texts[9] = "S3: Insufficient Storage";
70 texts[10] = "unclassified failure";
71 return texts[error];
72 }
73
74
75 struct Statistics {
76 double transferred_bytes;
77 double transfer_time;
78 uint64_t num_requests;
79 uint64_t num_retries;
80 uint64_t ms_throttled; // Total waiting time imposed by HTTP 429 replies
81
82 442 Statistics() {
83 442 transferred_bytes = 0.0;
84 442 transfer_time = 0.0;
85 442 num_requests = 0;
86 442 num_retries = 0;
87 442 ms_throttled = 0;
88 442 }
89
90 std::string Print() const;
91 }; // Statistics
92
93
94 /**
95 * Contains all the information to specify an upload job.
96 */
97 struct JobInfo : SingleCopy {
98 enum RequestType {
99 kReqHeadOnly = 0, // peek
100 kReqHeadPut, // conditional upload of content-addressed objects
101 kReqPutCas, // immutable data object
102 kReqPutDotCvmfs, // one of the /.cvmfs... top level files
103 kReqPutHtml, // HTML file - display instead of downloading
104 kReqPutBucket, // bucket creation
105 kReqDelete,
106 kReqDeleteMulti, // S3 multi-object delete (POST /?delete)
107 };
108
109 const std::string object_key;
110 void *callback; // Callback to be called when job is finished
111 UniquePtr<FileBackedBuffer> origin;
112
113 // One constructor per destination
114 20958 JobInfo(const std::string &object_key,
115 void *callback,
116 FileBackedBuffer *origin)
117
1/2
✓ Branch 3 taken 20958 times.
✗ Branch 4 not taken.
20958 : object_key(object_key), origin(origin) {
118 20958 JobInfoInit();
119 20958 this->callback = callback;
120 20958 }
121 20958 void JobInfoInit() {
122 20958 curl_handle = NULL;
123 20958 http_headers = NULL;
124 20958 callback = NULL;
125 20958 request = kReqPutCas;
126 20958 error_code = kFailOk;
127 20958 http_error = 0;
128 20958 num_retries = 0;
129 20958 backoff_ms = 0;
130 20958 throttle_ms = 0;
131 20958 throttle_timestamp = 0;
132 20958 errorbuffer = reinterpret_cast<char *>(
133 20958 smalloc(sizeof(char) * CURL_ERROR_SIZE));
134 20958 }
135 20958 ~JobInfo() { free(errorbuffer); }
136
137 // For kReqDeleteMulti: keys included in the batch, and response body
138 std::vector<std::string> multi_delete_keys;
139 std::string response_body;
140
141 // Internal state, don't touch
142 CURL *curl_handle;
143 struct curl_slist *http_headers;
144 uint64_t payload_size;
145 RequestType request;
146 Failures error_code;
147 int http_error;
148 unsigned char num_retries;
149 // Exponential backoff with cutoff in case of errors
150 unsigned backoff_ms;
151 // Throttle imposed by HTTP 429 reply; mutually exclusive with backoff_ms
152 unsigned throttle_ms;
153 // Remember when the 429 reply came in to only throttle if still necessary
154 uint64_t throttle_timestamp;
155 char *errorbuffer;
156 }; // JobInfo
157
158 struct S3FanOutDnsEntry {
159 374 S3FanOutDnsEntry()
160 374 : counter(0)
161 374 , dns_name()
162 374 , ip()
163
1/2
✓ Branch 2 taken 374 times.
✗ Branch 3 not taken.
374 , port("80")
164 374 , clist(NULL)
165 374 , sharehandle(NULL) { }
166 unsigned int counter;
167 std::string dns_name;
168 std::string ip;
169 std::string port;
170 struct curl_slist *clist;
171 CURLSH *sharehandle;
172 }; // S3FanOutDnsEntry
173
174
175 class S3FanoutManager : SingleCopy {
176 protected:
177 typedef SynchronizingCounter<uint32_t> Semaphore;
178
179 public:
180 // 250ms pause after HTTP 429 "Too Many Retries"
181 static const unsigned kDefault429ThrottleMs;
182 // Don't throttle for more than a few seconds
183 static const unsigned kMax429ThrottleMs;
184 // Report throttle operations only every so often
185 static const unsigned kThrottleReportIntervalSec;
186 static const unsigned kDefaultHTTPPort;
187 static const unsigned kDefaultHTTPSPort;
188
189 struct S3Config {
190 442 S3Config() {
191 442 authz_method = kAuthzAwsV2;
192 442 dns_buckets = true;
193
1/2
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
442 protocol = "http";
194 442 pool_max_handles = 0;
195 442 opt_timeout_sec = 20;
196 442 opt_max_retries = 3;
197 442 opt_backoff_init_ms = 100;
198 442 opt_backoff_max_ms = 2000;
199
1/2
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
442 x_amz_acl = "public-read";
200 442 }
201 std::string access_key;
202 std::string secret_key;
203 std::string hostname_port;
204 AuthzMethods authz_method;
205 std::string region;
206 std::string flavor;
207 std::string bucket;
208 bool dns_buckets;
209 std::string protocol;
210 uint32_t pool_max_handles;
211 unsigned opt_timeout_sec;
212 unsigned opt_max_retries;
213 unsigned opt_backoff_init_ms;
214 unsigned opt_backoff_max_ms;
215 std::string proxy;
216 std::string x_amz_acl;
217 };
218
219 static void DetectThrottleIndicator(const std::string &header, JobInfo *info);
220
221 explicit S3FanoutManager(const S3Config &config);
222
223 ~S3FanoutManager();
224
225 void Spawn();
226
227 void PushNewJob(JobInfo *info);
228 void PushCompletedJob(JobInfo *info);
229 JobInfo *PopCompletedJob();
230
231 const Statistics &GetStatistics();
232
233 private:
234 // Reflects the default Apache configuration of the local backend
235 static const char *kCacheControlCas; // Cache-Control: max-age=259200
236 static const unsigned kLowSpeedLimit = 1024; // Require at least 1kB/s
237
238 std::string MkDotCvmfsCacheControlHeader(unsigned defaultMaxAge=61, int overrideMaxAge=-1);
239
240 static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action,
241 void *userp, void *socketp);
242 static void *MainUpload(void *data);
243 std::vector<s3fanout::JobInfo *> jobs_todo_;
244 pthread_mutex_t *jobs_todo_lock_;
245 pthread_mutex_t *curl_handle_lock_;
246
247 CURL *AcquireCurlHandle() const;
248 void ReleaseCurlHandle(JobInfo *info, CURL *handle) const;
249 void InitPipeWatchFds();
250 int InitializeDnsSettings(CURL *handle, std::string remote_host) const;
251 void InitializeDnsSettingsCurl(CURL *handle, CURLSH *sharehandle,
252 curl_slist *clist) const;
253 Failures InitializeRequest(JobInfo *info, CURL *handle) const;
254 void SetUrlOptions(JobInfo *info) const;
255 void UpdateStatistics(CURL *handle);
256 bool CanRetry(const JobInfo *info);
257 void Backoff(JobInfo *info);
258 bool VerifyAndFinalize(const int curl_error, JobInfo *info);
259 std::string GetRequestString(const JobInfo &info) const;
260 std::string GetContentType(const JobInfo &info) const;
261 std::string GetUriEncode(const std::string &val, bool encode_slash) const;
262 std::string GetAwsV4SigningKey(const std::string &date) const;
263 bool MkPayloadHash(const JobInfo &info, std::string *hex_hash) const;
264 bool MkV2Authz(const JobInfo &info, std::vector<std::string> *headers) const;
265 bool MkV4Authz(const JobInfo &info, std::vector<std::string> *headers) const;
266 bool MkAzureAuthz(const JobInfo &info,
267 std::vector<std::string> *headers) const;
268 41582 std::string MkUrl(const std::string &objkey) const {
269
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41582 times.
41582 if (config_.dns_buckets) {
270 return config_.protocol + "://" + complete_hostname_ + "/" + objkey;
271 } else {
272
2/2
✓ Branch 1 taken 68 times.
✓ Branch 2 taken 41514 times.
41582 if (objkey.empty()) {
273
2/4
✓ Branch 1 taken 68 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 68 times.
✗ Branch 5 not taken.
136 return config_.protocol + "://" + complete_hostname_ + "/"
274
1/2
✓ Branch 2 taken 68 times.
✗ Branch 3 not taken.
136 + config_.bucket;
275 }
276
2/4
✓ Branch 1 taken 41514 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 41514 times.
✗ Branch 5 not taken.
83028 return config_.protocol + "://" + complete_hostname_ + "/"
277
3/6
✓ Branch 2 taken 41514 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 41514 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 41514 times.
✗ Branch 9 not taken.
124542 + config_.bucket + "/" + objkey;
278 }
279 }
280 442 std::string MkCompleteHostname() {
281
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 442 times.
442 if (config_.dns_buckets) {
282 return config_.bucket + "." + config_.hostname_port;
283 } else {
284 442 return config_.hostname_port;
285 }
286 }
287
288 const S3Config config_;
289 std::string complete_hostname_;
290
291 Prng prng_;
292 /**
293 * This is not strictly necessary but it helps the debugging
294 */
295 std::set<JobInfo *> *active_requests_;
296
297 std::set<CURL *> *pool_handles_idle_;
298 std::set<CURL *> *pool_handles_inuse_;
299 std::set<S3FanOutDnsEntry *> *sharehandles_;
300 std::map<CURL *, S3FanOutDnsEntry *> *curl_sharehandles_;
301 dns::CaresResolver *resolver_;
302 CURLM *curl_multi_;
303 std::string *user_agent_;
304
305 /**
306 * AWS4 signing keys are derived from the secret key, a region and a date.
307 * The signing key for current day can be cached.
308 */
309 mutable std::pair<std::string, std::string> last_signing_key_;
310
311 pthread_t thread_upload_;
312 atomic_int32 multi_threaded_;
313
314 struct pollfd *watch_fds_;
315 uint32_t watch_fds_size_;
316 uint32_t watch_fds_inuse_;
317 uint32_t watch_fds_max_;
318
319 // A pipe used to signal termination from S3FanoutManager to MainUpload
320 // thread. Anything written into it results in MainUpload thread exit.
321 int pipe_terminate_[2];
322 // A pipe to used to push jobs from S3FanoutManager to MainUpload thread.
323 // S3FanoutManager writes a JobInfo* pointer. MainUpload then reads the
324 // pointer and processes the job.
325 int pipe_jobs_[2];
326 // A pipe used to collect completed jobs. MainUpload writes in the
327 // pointer to the completed job. PopCompletedJob() used to
328 // retrieve pointer.
329 int pipe_completed_[2];
330
331 bool opt_ipv4_only_;
332
333 unsigned int max_available_jobs_;
334 Semaphore *available_jobs_;
335
336 // Writes and reads should be atomic because reading happens in a different
337 // thread than writing.
338 Statistics *statistics_;
339
340 // Report not every occurrence of throtteling but only every so often
341 uint64_t timestamp_last_throttle_report_;
342
343 bool is_curl_debug_;
344
345 /**
346 * Carries the path settings for SSL certificates
347 */
348 SslCertificateStore ssl_certificate_store_;
349
350 std::string dot_cvmfs_cache_control_header; // Cache-Control: max-age=...
351 }; // S3FanoutManager
352
353 std::string ComposeDeleteMultiXml(const std::vector<std::string> &keys);
354 unsigned ParseDeleteMultiResponse(const std::string &response,
355 std::vector<std::string> *error_keys,
356 std::vector<std::string> *error_codes,
357 std::vector<std::string> *error_messages);
358
359 } // namespace s3fanout
360
361 #endif // CVMFS_NETWORK_S3FANOUT_H_
362