GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/network/s3fanout.h
Date: 2025-06-22 02:36:02
Exec Total Coverage
Lines: 52 67 77.6%
Branches: 11 30 36.7%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_NETWORK_S3FANOUT_H_
6 #define CVMFS_NETWORK_S3FANOUT_H_
7
8 #include <poll.h>
9 #include <semaphore.h>
10
11 #include <climits>
12 #include <cstdlib>
13 #include <map>
14 #include <set>
15 #include <string>
16 #include <utility>
17 #include <vector>
18
19 #include "dns.h"
20 #include "duplex_curl.h"
21 #include "ssl.h"
22 #include "util/concurrency.h"
23 #include "util/file_backed_buffer.h"
24 #include "util/mmap_file.h"
25 #include "util/pointer.h"
26 #include "util/prng.h"
27 #include "util/single_copy.h"
28 #include "util/smalloc.h"
29
30 namespace s3fanout {
31
32 enum AuthzMethods {
33 kAuthzAwsV2 = 0,
34 kAuthzAwsV4,
35 kAuthzAzure
36 };
37
38 /**
39 * Possible return values.
40 */
41 enum Failures {
42 kFailOk = 0,
43 kFailLocalIO,
44 kFailBadRequest,
45 kFailForbidden,
46 kFailHostResolve,
47 kFailHostConnection,
48 kFailNotFound,
49 kFailServiceUnavailable,
50 kFailRetry,
51 kFailOther,
52
53 kFailNumEntries
54 }; // Failures
55
56
57 inline const char *Code2Ascii(const Failures error) {
58 const char *texts[kFailNumEntries + 1];
59 texts[0] = "S3: OK";
60 texts[1] = "S3: local I/O failure";
61 texts[2] = "S3: malformed URL (bad request)";
62 texts[3] = "S3: forbidden";
63 texts[4] = "S3: failed to resolve host address";
64 texts[5] = "S3: host connection problem";
65 texts[6] = "S3: not found";
66 texts[7] = "S3: service not available";
67 texts[8] = "S3: unknown service error, perhaps wrong authentication protocol";
68 texts[8] = "S3: too many requests, service asks for backoff and retry";
69 texts[9] = "no text";
70 return texts[error];
71 }
72
73
74 struct Statistics {
75 double transferred_bytes;
76 double transfer_time;
77 uint64_t num_requests;
78 uint64_t num_retries;
79 uint64_t ms_throttled; // Total waiting time imposed by HTTP 429 replies
80
81 168 Statistics() {
82 168 transferred_bytes = 0.0;
83 168 transfer_time = 0.0;
84 168 num_requests = 0;
85 168 num_retries = 0;
86 168 ms_throttled = 0;
87 168 }
88
89 std::string Print() const;
90 }; // Statistics
91
92
93 /**
94 * Contains all the information to specify an upload job.
95 */
96 struct JobInfo : SingleCopy {
97 enum RequestType {
98 kReqHeadOnly = 0, // peek
99 kReqHeadPut, // conditional upload of content-addressed objects
100 kReqPutCas, // immutable data object
101 kReqPutDotCvmfs, // one of the /.cvmfs... top level files
102 kReqPutHtml, // HTML file - display instead of downloading
103 kReqPutBucket, // bucket creation
104 kReqDelete,
105 };
106
107 const std::string object_key;
108 void *callback; // Callback to be called when job is finished
109 UniquePtr<FileBackedBuffer> origin;
110
111 // One constructor per destination
112 8645 JobInfo(const std::string &object_key,
113 void *callback,
114 FileBackedBuffer *origin)
115
1/2
✓ Branch 3 taken 8645 times.
✗ Branch 4 not taken.
8645 : object_key(object_key), origin(origin) {
116 8645 JobInfoInit();
117 8645 this->callback = callback;
118 8645 }
119 8645 void JobInfoInit() {
120 8645 curl_handle = NULL;
121 8645 http_headers = NULL;
122 8645 callback = NULL;
123 8645 request = kReqPutCas;
124 8645 error_code = kFailOk;
125 8645 http_error = 0;
126 8645 num_retries = 0;
127 8645 backoff_ms = 0;
128 8645 throttle_ms = 0;
129 8645 throttle_timestamp = 0;
130 8645 errorbuffer = reinterpret_cast<char *>(
131 8645 smalloc(sizeof(char) * CURL_ERROR_SIZE));
132 8645 }
133 8645 ~JobInfo() { free(errorbuffer); }
134
135 // Internal state, don't touch
136 CURL *curl_handle;
137 struct curl_slist *http_headers;
138 uint64_t payload_size;
139 RequestType request;
140 Failures error_code;
141 int http_error;
142 unsigned char num_retries;
143 // Exponential backoff with cutoff in case of errors
144 unsigned backoff_ms;
145 // Throttle imposed by HTTP 429 reply; mutually exclusive with backoff_ms
146 unsigned throttle_ms;
147 // Remember when the 429 reply came in to only throttle if still necessary
148 uint64_t throttle_timestamp;
149 char *errorbuffer;
150 }; // JobInfo
151
152 struct S3FanOutDnsEntry {
153 140 S3FanOutDnsEntry()
154 140 : counter(0)
155 140 , dns_name()
156 140 , ip()
157
1/2
✓ Branch 2 taken 140 times.
✗ Branch 3 not taken.
140 , port("80")
158 140 , clist(NULL)
159 140 , sharehandle(NULL) { }
160 unsigned int counter;
161 std::string dns_name;
162 std::string ip;
163 std::string port;
164 struct curl_slist *clist;
165 CURLSH *sharehandle;
166 }; // S3FanOutDnsEntry
167
168
169 class S3FanoutManager : SingleCopy {
170 protected:
171 typedef SynchronizingCounter<uint32_t> Semaphore;
172
173 public:
174 // 250ms pause after HTTP 429 "Too Many Retries"
175 static const unsigned kDefault429ThrottleMs;
176 // Don't throttle for more than a few seconds
177 static const unsigned kMax429ThrottleMs;
178 // Report throttle operations only every so often
179 static const unsigned kThrottleReportIntervalSec;
180 static const unsigned kDefaultHTTPPort;
181 static const unsigned kDefaultHTTPSPort;
182
183 struct S3Config {
184 168 S3Config() {
185 168 authz_method = kAuthzAwsV2;
186 168 dns_buckets = true;
187
1/2
✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
168 protocol = "http";
188 168 pool_max_handles = 0;
189 168 opt_timeout_sec = 20;
190 168 opt_max_retries = 3;
191 168 opt_backoff_init_ms = 100;
192 168 opt_backoff_max_ms = 2000;
193
1/2
✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
168 x_amz_acl = "public-read";
194 168 }
195 std::string access_key;
196 std::string secret_key;
197 std::string hostname_port;
198 AuthzMethods authz_method;
199 std::string region;
200 std::string flavor;
201 std::string bucket;
202 bool dns_buckets;
203 std::string protocol;
204 uint32_t pool_max_handles;
205 unsigned opt_timeout_sec;
206 unsigned opt_max_retries;
207 unsigned opt_backoff_init_ms;
208 unsigned opt_backoff_max_ms;
209 std::string proxy;
210 std::string x_amz_acl;
211 };
212
213 static void DetectThrottleIndicator(const std::string &header, JobInfo *info);
214
215 explicit S3FanoutManager(const S3Config &config);
216
217 ~S3FanoutManager();
218
219 void Spawn();
220
221 void PushNewJob(JobInfo *info);
222 void PushCompletedJob(JobInfo *info);
223 JobInfo *PopCompletedJob();
224
225 const Statistics &GetStatistics();
226
227 private:
228 // Reflects the default Apache configuration of the local backend
229 static const char *kCacheControlCas; // Cache-Control: max-age=259200
230 static const char *kCacheControlDotCvmfs; // Cache-Control: max-age=61
231 static const unsigned kLowSpeedLimit = 1024; // Require at least 1kB/s
232
233 static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action,
234 void *userp, void *socketp);
235 static void *MainUpload(void *data);
236 std::vector<s3fanout::JobInfo *> jobs_todo_;
237 pthread_mutex_t *jobs_todo_lock_;
238 pthread_mutex_t *curl_handle_lock_;
239
240 CURL *AcquireCurlHandle() const;
241 void ReleaseCurlHandle(JobInfo *info, CURL *handle) const;
242 void InitPipeWatchFds();
243 int InitializeDnsSettings(CURL *handle, std::string remote_host) const;
244 void InitializeDnsSettingsCurl(CURL *handle, CURLSH *sharehandle,
245 curl_slist *clist) const;
246 Failures InitializeRequest(JobInfo *info, CURL *handle) const;
247 void SetUrlOptions(JobInfo *info) const;
248 void UpdateStatistics(CURL *handle);
249 bool CanRetry(const JobInfo *info);
250 void Backoff(JobInfo *info);
251 bool VerifyAndFinalize(const int curl_error, JobInfo *info);
252 std::string GetRequestString(const JobInfo &info) const;
253 std::string GetContentType(const JobInfo &info) const;
254 std::string GetUriEncode(const std::string &val, bool encode_slash) const;
255 std::string GetAwsV4SigningKey(const std::string &date) const;
256 bool MkPayloadHash(const JobInfo &info, std::string *hex_hash) const;
257 bool MkV2Authz(const JobInfo &info, std::vector<std::string> *headers) const;
258 bool MkV4Authz(const JobInfo &info, std::vector<std::string> *headers) const;
259 bool MkAzureAuthz(const JobInfo &info,
260 std::vector<std::string> *headers) const;
261 17108 std::string MkUrl(const std::string &objkey) const {
262
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 17108 times.
17108 if (config_.dns_buckets) {
263 return config_.protocol + "://" + complete_hostname_ + "/" + objkey;
264 } else {
265
2/4
✓ Branch 1 taken 17108 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 17108 times.
✗ Branch 5 not taken.
34216 return config_.protocol + "://" + complete_hostname_ + "/"
266
3/6
✓ Branch 2 taken 17108 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 17108 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 17108 times.
✗ Branch 9 not taken.
51324 + config_.bucket + "/" + objkey;
267 }
268 }
269 168 std::string MkCompleteHostname() {
270
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 168 times.
168 if (config_.dns_buckets) {
271 return config_.bucket + "." + config_.hostname_port;
272 } else {
273 168 return config_.hostname_port;
274 }
275 }
276
277 const S3Config config_;
278 std::string complete_hostname_;
279
280 Prng prng_;
281 /**
282 * This is not strictly necessary but it helps the debugging
283 */
284 std::set<JobInfo *> *active_requests_;
285
286 std::set<CURL *> *pool_handles_idle_;
287 std::set<CURL *> *pool_handles_inuse_;
288 std::set<S3FanOutDnsEntry *> *sharehandles_;
289 std::map<CURL *, S3FanOutDnsEntry *> *curl_sharehandles_;
290 dns::CaresResolver *resolver_;
291 CURLM *curl_multi_;
292 std::string *user_agent_;
293
294 /**
295 * AWS4 signing keys are derived from the secret key, a region and a date.
296 * The signing key for current day can be cached.
297 */
298 mutable std::pair<std::string, std::string> last_signing_key_;
299
300 pthread_t thread_upload_;
301 atomic_int32 multi_threaded_;
302
303 struct pollfd *watch_fds_;
304 uint32_t watch_fds_size_;
305 uint32_t watch_fds_inuse_;
306 uint32_t watch_fds_max_;
307
308 // A pipe used to signal termination from S3FanoutManager to MainUpload
309 // thread. Anything written into it results in MainUpload thread exit.
310 int pipe_terminate_[2];
311 // A pipe to used to push jobs from S3FanoutManager to MainUpload thread.
312 // S3FanoutManager writes a JobInfo* pointer. MainUpload then reads the
313 // pointer and processes the job.
314 int pipe_jobs_[2];
315 // A pipe used to collect completed jobs. MainUpload writes in the
316 // pointer to the completed job. PopCompletedJob() used to
317 // retrieve pointer.
318 int pipe_completed_[2];
319
320 bool opt_ipv4_only_;
321
322 unsigned int max_available_jobs_;
323 Semaphore *available_jobs_;
324
325 // Writes and reads should be atomic because reading happens in a different
326 // thread than writing.
327 Statistics *statistics_;
328
329 // Report not every occurrence of throtteling but only every so often
330 uint64_t timestamp_last_throttle_report_;
331
332 bool is_curl_debug_;
333
334 /**
335 * Carries the path settings for SSL certificates
336 */
337 SslCertificateStore ssl_certificate_store_;
338 }; // S3FanoutManager
339
340 } // namespace s3fanout
341
342 #endif // CVMFS_NETWORK_S3FANOUT_H_
343