GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/network/s3fanout.h
Date: 2025-11-09 02:35:23
Exec Total Coverage
Lines: 52 68 76.5%
Branches: 11 30 36.7%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_NETWORK_S3FANOUT_H_
6 #define CVMFS_NETWORK_S3FANOUT_H_
7
8 #include <poll.h>
9 #include <semaphore.h>
10
11 #include <climits>
12 #include <cstdlib>
13 #include <map>
14 #include <set>
15 #include <string>
16 #include <utility>
17 #include <vector>
18
19 #include "dns.h"
20 #include "duplex_curl.h"
21 #include "ssl.h"
22 #include "util/concurrency.h"
23 #include "util/file_backed_buffer.h"
24 #include "util/mmap_file.h"
25 #include "util/pointer.h"
26 #include "util/prng.h"
27 #include "util/single_copy.h"
28 #include "util/smalloc.h"
29
30 namespace s3fanout {
31
32 enum AuthzMethods {
33 kAuthzAwsV2 = 0,
34 kAuthzAwsV4,
35 kAuthzAzure
36 };
37
38 /**
39 * Possible return values.
40 */
41 enum Failures {
42 kFailOk = 0,
43 kFailLocalIO,
44 kFailBadRequest,
45 kFailForbidden,
46 kFailHostResolve,
47 kFailHostConnection,
48 kFailNotFound,
49 kFailServiceUnavailable,
50 kFailRetry,
51 kFailInsufficientStorage,
52 kFailOther,
53
54 kFailNumEntries
55 }; // Failures
56
57
58 inline const char *Code2Ascii(const Failures error) {
59 const char *texts[kFailNumEntries + 1];
60 texts[0] = "S3: OK";
61 texts[1] = "S3: local I/O failure";
62 texts[2] = "S3: malformed URL (bad request)";
63 texts[3] = "S3: forbidden";
64 texts[4] = "S3: failed to resolve host address";
65 texts[5] = "S3: host connection problem";
66 texts[6] = "S3: not found";
67 texts[7] = "S3: service not available";
68 texts[8] = "S3: unknown service error, perhaps wrong authentication protocol";
69 texts[8] = "S3: too many requests, service asks for backoff and retry";
70 texts[9] = "S3: Insufficient Storage";
71 texts[10] = "unclassified failure";
72 return texts[error];
73 }
74
75
76 struct Statistics {
77 double transferred_bytes;
78 double transfer_time;
79 uint64_t num_requests;
80 uint64_t num_retries;
81 uint64_t ms_throttled; // Total waiting time imposed by HTTP 429 replies
82
83 264 Statistics() {
84 264 transferred_bytes = 0.0;
85 264 transfer_time = 0.0;
86 264 num_requests = 0;
87 264 num_retries = 0;
88 264 ms_throttled = 0;
89 264 }
90
91 std::string Print() const;
92 }; // Statistics
93
94
95 /**
96 * Contains all the information to specify an upload job.
97 */
98 struct JobInfo : SingleCopy {
99 enum RequestType {
100 kReqHeadOnly = 0, // peek
101 kReqHeadPut, // conditional upload of content-addressed objects
102 kReqPutCas, // immutable data object
103 kReqPutDotCvmfs, // one of the /.cvmfs... top level files
104 kReqPutHtml, // HTML file - display instead of downloading
105 kReqPutBucket, // bucket creation
106 kReqDelete,
107 };
108
109 const std::string object_key;
110 void *callback; // Callback to be called when job is finished
111 UniquePtr<FileBackedBuffer> origin;
112
113 // One constructor per destination
114 13537 JobInfo(const std::string &object_key,
115 void *callback,
116 FileBackedBuffer *origin)
117
1/2
✓ Branch 3 taken 13537 times.
✗ Branch 4 not taken.
13537 : object_key(object_key), origin(origin) {
118 13537 JobInfoInit();
119 13537 this->callback = callback;
120 13537 }
121 13537 void JobInfoInit() {
122 13537 curl_handle = NULL;
123 13537 http_headers = NULL;
124 13537 callback = NULL;
125 13537 request = kReqPutCas;
126 13537 error_code = kFailOk;
127 13537 http_error = 0;
128 13537 num_retries = 0;
129 13537 backoff_ms = 0;
130 13537 throttle_ms = 0;
131 13537 throttle_timestamp = 0;
132 13537 errorbuffer = reinterpret_cast<char *>(
133 13537 smalloc(sizeof(char) * CURL_ERROR_SIZE));
134 13537 }
135 13537 ~JobInfo() { free(errorbuffer); }
136
137 // Internal state, don't touch
138 CURL *curl_handle;
139 struct curl_slist *http_headers;
140 uint64_t payload_size;
141 RequestType request;
142 Failures error_code;
143 int http_error;
144 unsigned char num_retries;
145 // Exponential backoff with cutoff in case of errors
146 unsigned backoff_ms;
147 // Throttle imposed by HTTP 429 reply; mutually exclusive with backoff_ms
148 unsigned throttle_ms;
149 // Remember when the 429 reply came in to only throttle if still necessary
150 uint64_t throttle_timestamp;
151 char *errorbuffer;
152 }; // JobInfo
153
154 struct S3FanOutDnsEntry {
155 220 S3FanOutDnsEntry()
156 220 : counter(0)
157 220 , dns_name()
158 220 , ip()
159
1/2
✓ Branch 2 taken 220 times.
✗ Branch 3 not taken.
220 , port("80")
160 220 , clist(NULL)
161 220 , sharehandle(NULL) { }
162 unsigned int counter;
163 std::string dns_name;
164 std::string ip;
165 std::string port;
166 struct curl_slist *clist;
167 CURLSH *sharehandle;
168 }; // S3FanOutDnsEntry
169
170
171 class S3FanoutManager : SingleCopy {
172 protected:
173 typedef SynchronizingCounter<uint32_t> Semaphore;
174
175 public:
176 // 250ms pause after HTTP 429 "Too Many Retries"
177 static const unsigned kDefault429ThrottleMs;
178 // Don't throttle for more than a few seconds
179 static const unsigned kMax429ThrottleMs;
180 // Report throttle operations only every so often
181 static const unsigned kThrottleReportIntervalSec;
182 static const unsigned kDefaultHTTPPort;
183 static const unsigned kDefaultHTTPSPort;
184
185 struct S3Config {
186 264 S3Config() {
187 264 authz_method = kAuthzAwsV2;
188 264 dns_buckets = true;
189
1/2
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
264 protocol = "http";
190 264 pool_max_handles = 0;
191 264 opt_timeout_sec = 20;
192 264 opt_max_retries = 3;
193 264 opt_backoff_init_ms = 100;
194 264 opt_backoff_max_ms = 2000;
195
1/2
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
264 x_amz_acl = "public-read";
196 264 }
197 std::string access_key;
198 std::string secret_key;
199 std::string hostname_port;
200 AuthzMethods authz_method;
201 std::string region;
202 std::string flavor;
203 std::string bucket;
204 bool dns_buckets;
205 std::string protocol;
206 uint32_t pool_max_handles;
207 unsigned opt_timeout_sec;
208 unsigned opt_max_retries;
209 unsigned opt_backoff_init_ms;
210 unsigned opt_backoff_max_ms;
211 std::string proxy;
212 std::string x_amz_acl;
213 };
214
215 static void DetectThrottleIndicator(const std::string &header, JobInfo *info);
216
217 explicit S3FanoutManager(const S3Config &config);
218
219 ~S3FanoutManager();
220
221 void Spawn();
222
223 void PushNewJob(JobInfo *info);
224 void PushCompletedJob(JobInfo *info);
225 JobInfo *PopCompletedJob();
226
227 const Statistics &GetStatistics();
228
229 private:
230 // Reflects the default Apache configuration of the local backend
231 static const char *kCacheControlCas; // Cache-Control: max-age=259200
232 static const unsigned kLowSpeedLimit = 1024; // Require at least 1kB/s
233
234 std::string MkDotCvmfsCacheControlHeader(unsigned defaultMaxAge=61, int overrideMaxAge=-1);
235
236 static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action,
237 void *userp, void *socketp);
238 static void *MainUpload(void *data);
239 std::vector<s3fanout::JobInfo *> jobs_todo_;
240 pthread_mutex_t *jobs_todo_lock_;
241 pthread_mutex_t *curl_handle_lock_;
242
243 CURL *AcquireCurlHandle() const;
244 void ReleaseCurlHandle(JobInfo *info, CURL *handle) const;
245 void InitPipeWatchFds();
246 int InitializeDnsSettings(CURL *handle, std::string remote_host) const;
247 void InitializeDnsSettingsCurl(CURL *handle, CURLSH *sharehandle,
248 curl_slist *clist) const;
249 Failures InitializeRequest(JobInfo *info, CURL *handle) const;
250 void SetUrlOptions(JobInfo *info) const;
251 void UpdateStatistics(CURL *handle);
252 bool CanRetry(const JobInfo *info);
253 void Backoff(JobInfo *info);
254 bool VerifyAndFinalize(const int curl_error, JobInfo *info);
255 std::string GetRequestString(const JobInfo &info) const;
256 std::string GetContentType(const JobInfo &info) const;
257 std::string GetUriEncode(const std::string &val, bool encode_slash) const;
258 std::string GetAwsV4SigningKey(const std::string &date) const;
259 bool MkPayloadHash(const JobInfo &info, std::string *hex_hash) const;
260 bool MkV2Authz(const JobInfo &info, std::vector<std::string> *headers) const;
261 bool MkV4Authz(const JobInfo &info, std::vector<std::string> *headers) const;
262 bool MkAzureAuthz(const JobInfo &info,
263 std::vector<std::string> *headers) const;
264 26884 std::string MkUrl(const std::string &objkey) const {
265
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 26884 times.
26884 if (config_.dns_buckets) {
266 return config_.protocol + "://" + complete_hostname_ + "/" + objkey;
267 } else {
268
2/4
✓ Branch 1 taken 26884 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 26884 times.
✗ Branch 5 not taken.
53768 return config_.protocol + "://" + complete_hostname_ + "/"
269
3/6
✓ Branch 2 taken 26884 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 26884 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 26884 times.
✗ Branch 9 not taken.
80652 + config_.bucket + "/" + objkey;
270 }
271 }
272 264 std::string MkCompleteHostname() {
273
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 264 times.
264 if (config_.dns_buckets) {
274 return config_.bucket + "." + config_.hostname_port;
275 } else {
276 264 return config_.hostname_port;
277 }
278 }
279
280 const S3Config config_;
281 std::string complete_hostname_;
282
283 Prng prng_;
284 /**
285 * This is not strictly necessary but it helps the debugging
286 */
287 std::set<JobInfo *> *active_requests_;
288
289 std::set<CURL *> *pool_handles_idle_;
290 std::set<CURL *> *pool_handles_inuse_;
291 std::set<S3FanOutDnsEntry *> *sharehandles_;
292 std::map<CURL *, S3FanOutDnsEntry *> *curl_sharehandles_;
293 dns::CaresResolver *resolver_;
294 CURLM *curl_multi_;
295 std::string *user_agent_;
296
297 /**
298 * AWS4 signing keys are derived from the secret key, a region and a date.
299 * The signing key for current day can be cached.
300 */
301 mutable std::pair<std::string, std::string> last_signing_key_;
302
303 pthread_t thread_upload_;
304 atomic_int32 multi_threaded_;
305
306 struct pollfd *watch_fds_;
307 uint32_t watch_fds_size_;
308 uint32_t watch_fds_inuse_;
309 uint32_t watch_fds_max_;
310
311 // A pipe used to signal termination from S3FanoutManager to MainUpload
312 // thread. Anything written into it results in MainUpload thread exit.
313 int pipe_terminate_[2];
314 // A pipe to used to push jobs from S3FanoutManager to MainUpload thread.
315 // S3FanoutManager writes a JobInfo* pointer. MainUpload then reads the
316 // pointer and processes the job.
317 int pipe_jobs_[2];
318 // A pipe used to collect completed jobs. MainUpload writes in the
319 // pointer to the completed job. PopCompletedJob() used to
320 // retrieve pointer.
321 int pipe_completed_[2];
322
323 bool opt_ipv4_only_;
324
325 unsigned int max_available_jobs_;
326 Semaphore *available_jobs_;
327
328 // Writes and reads should be atomic because reading happens in a different
329 // thread than writing.
330 Statistics *statistics_;
331
332 // Report not every occurrence of throtteling but only every so often
333 uint64_t timestamp_last_throttle_report_;
334
335 bool is_curl_debug_;
336
337 /**
338 * Carries the path settings for SSL certificates
339 */
340 SslCertificateStore ssl_certificate_store_;
341
342 std::string dot_cvmfs_cache_control_header; // Cache-Control: max-age=...
343 }; // S3FanoutManager
344
345 } // namespace s3fanout
346
347 #endif // CVMFS_NETWORK_S3FANOUT_H_
348