GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/network/s3fanout.h
Date: 2026-03-15 02:35:27
Exec Total Coverage
Lines: 52 68 76.5%
Branches: 11 30 36.7%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_NETWORK_S3FANOUT_H_
6 #define CVMFS_NETWORK_S3FANOUT_H_
7
8 #include <poll.h>
9 #include <semaphore.h>
10
11 #include <climits>
12 #include <cstdlib>
13 #include <map>
14 #include <set>
15 #include <string>
16 #include <utility>
17 #include <vector>
18
19 #include "dns.h"
20 #include "duplex_curl.h"
21 #include "ssl.h"
22 #include "util/concurrency.h"
23 #include "util/file_backed_buffer.h"
24 #include "util/mmap_file.h"
25 #include "util/pointer.h"
26 #include "util/prng.h"
27 #include "util/single_copy.h"
28 #include "util/smalloc.h"
29
30 namespace s3fanout {
31
32 enum AuthzMethods {
33 kAuthzAwsV2 = 0,
34 kAuthzAwsV4,
35 kAuthzAzure
36 };
37
38 /**
39 * Possible return values.
40 */
41 enum Failures {
42 kFailOk = 0,
43 kFailLocalIO,
44 kFailBadRequest,
45 kFailForbidden,
46 kFailHostResolve,
47 kFailHostConnection,
48 kFailNotFound,
49 kFailServiceUnavailable,
50 kFailRetry,
51 kFailInsufficientStorage,
52 kFailOther,
53
54 kFailNumEntries
55 }; // Failures
56
57
58 inline const char *Code2Ascii(const Failures error) {
59 const char *texts[kFailNumEntries + 1];
60 texts[0] = "S3: OK";
61 texts[1] = "S3: local I/O failure";
62 texts[2] = "S3: malformed URL (bad request)";
63 texts[3] = "S3: forbidden";
64 texts[4] = "S3: failed to resolve host address";
65 texts[5] = "S3: host connection problem";
66 texts[6] = "S3: not found";
67 texts[7] = "S3: service not available";
68 texts[8] = "S3: unknown service error, perhaps wrong authentication protocol";
69 texts[8] = "S3: too many requests, service asks for backoff and retry";
70 texts[9] = "S3: Insufficient Storage";
71 texts[10] = "unclassified failure";
72 return texts[error];
73 }
74
75
76 struct Statistics {
77 double transferred_bytes;
78 double transfer_time;
79 uint64_t num_requests;
80 uint64_t num_retries;
81 uint64_t ms_throttled; // Total waiting time imposed by HTTP 429 replies
82
83 492 Statistics() {
84 492 transferred_bytes = 0.0;
85 492 transfer_time = 0.0;
86 492 num_requests = 0;
87 492 num_retries = 0;
88 492 ms_throttled = 0;
89 492 }
90
91 std::string Print() const;
92 }; // Statistics
93
94
95 /**
96 * Contains all the information to specify an upload job.
97 */
98 struct JobInfo : SingleCopy {
99 enum RequestType {
100 kReqHeadOnly = 0, // peek
101 kReqHeadPut, // conditional upload of content-addressed objects
102 kReqPutCas, // immutable data object
103 kReqPutDotCvmfs, // one of the /.cvmfs... top level files
104 kReqPutHtml, // HTML file - display instead of downloading
105 kReqPutBucket, // bucket creation
106 kReqDelete,
107 kReqDeleteMulti, // S3 multi-object delete (POST /?delete)
108 };
109
110 const std::string object_key;
111 void *callback; // Callback to be called when job is finished
112 UniquePtr<FileBackedBuffer> origin;
113
114 // One constructor per destination
115 25209 JobInfo(const std::string &object_key,
116 void *callback,
117 FileBackedBuffer *origin)
118
1/2
✓ Branch 3 taken 25209 times.
✗ Branch 4 not taken.
25209 : object_key(object_key), origin(origin) {
119 25209 JobInfoInit();
120 25209 this->callback = callback;
121 25209 }
122 25209 void JobInfoInit() {
123 25209 curl_handle = NULL;
124 25209 http_headers = NULL;
125 25209 callback = NULL;
126 25209 request = kReqPutCas;
127 25209 error_code = kFailOk;
128 25209 http_error = 0;
129 25209 num_retries = 0;
130 25209 backoff_ms = 0;
131 25209 throttle_ms = 0;
132 25209 throttle_timestamp = 0;
133 25209 errorbuffer = reinterpret_cast<char *>(
134 25209 smalloc(sizeof(char) * CURL_ERROR_SIZE));
135 25209 }
136 25209 ~JobInfo() { free(errorbuffer); }
137
138 // For kReqDeleteMulti: keys included in the batch, and response body
139 std::vector<std::string> multi_delete_keys;
140 std::string response_body;
141
142 // Internal state, don't touch
143 CURL *curl_handle;
144 struct curl_slist *http_headers;
145 uint64_t payload_size;
146 RequestType request;
147 Failures error_code;
148 int http_error;
149 unsigned char num_retries;
150 // Exponential backoff with cutoff in case of errors
151 unsigned backoff_ms;
152 // Throttle imposed by HTTP 429 reply; mutually exclusive with backoff_ms
153 unsigned throttle_ms;
154 // Remember when the 429 reply came in to only throttle if still necessary
155 uint64_t throttle_timestamp;
156 char *errorbuffer;
157 }; // JobInfo
158
159 struct S3FanOutDnsEntry {
160 410 S3FanOutDnsEntry()
161 410 : counter(0)
162 410 , dns_name()
163 410 , ip()
164
1/2
✓ Branch 2 taken 410 times.
✗ Branch 3 not taken.
410 , port("80")
165 410 , clist(NULL)
166 410 , sharehandle(NULL) { }
167 unsigned int counter;
168 std::string dns_name;
169 std::string ip;
170 std::string port;
171 struct curl_slist *clist;
172 CURLSH *sharehandle;
173 }; // S3FanOutDnsEntry
174
175
176 class S3FanoutManager : SingleCopy {
177 protected:
178 typedef SynchronizingCounter<uint32_t> Semaphore;
179
180 public:
181 // 250ms pause after HTTP 429 "Too Many Retries"
182 static const unsigned kDefault429ThrottleMs;
183 // Don't throttle for more than a few seconds
184 static const unsigned kMax429ThrottleMs;
185 // Report throttle operations only every so often
186 static const unsigned kThrottleReportIntervalSec;
187 static const unsigned kDefaultHTTPPort;
188 static const unsigned kDefaultHTTPSPort;
189
190 struct S3Config {
191 492 S3Config() {
192 492 authz_method = kAuthzAwsV2;
193 492 dns_buckets = true;
194
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 protocol = "http";
195 492 pool_max_handles = 0;
196 492 opt_timeout_sec = 20;
197 492 opt_max_retries = 3;
198 492 opt_backoff_init_ms = 100;
199 492 opt_backoff_max_ms = 2000;
200
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 x_amz_acl = "public-read";
201 492 }
202 std::string access_key;
203 std::string secret_key;
204 std::string hostname_port;
205 AuthzMethods authz_method;
206 std::string region;
207 std::string flavor;
208 std::string bucket;
209 bool dns_buckets;
210 std::string protocol;
211 uint32_t pool_max_handles;
212 unsigned opt_timeout_sec;
213 unsigned opt_max_retries;
214 unsigned opt_backoff_init_ms;
215 unsigned opt_backoff_max_ms;
216 std::string proxy;
217 std::string x_amz_acl;
218 };
219
220 static void DetectThrottleIndicator(const std::string &header, JobInfo *info);
221
222 explicit S3FanoutManager(const S3Config &config);
223
224 ~S3FanoutManager();
225
226 void Spawn();
227
228 void PushNewJob(JobInfo *info);
229 void PushCompletedJob(JobInfo *info);
230 JobInfo *PopCompletedJob();
231
232 const Statistics &GetStatistics();
233
234 private:
235 // Reflects the default Apache configuration of the local backend
236 static const char *kCacheControlCas; // Cache-Control: max-age=259200
237 static const unsigned kLowSpeedLimit = 1024; // Require at least 1kB/s
238
239 std::string MkDotCvmfsCacheControlHeader(unsigned defaultMaxAge=61, int overrideMaxAge=-1);
240
241 static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action,
242 void *userp, void *socketp);
243 static void *MainUpload(void *data);
244 std::vector<s3fanout::JobInfo *> jobs_todo_;
245 pthread_mutex_t *jobs_todo_lock_;
246 pthread_mutex_t *curl_handle_lock_;
247
248 CURL *AcquireCurlHandle() const;
249 void ReleaseCurlHandle(JobInfo *info, CURL *handle) const;
250 void InitPipeWatchFds();
251 int InitializeDnsSettings(CURL *handle, std::string remote_host) const;
252 void InitializeDnsSettingsCurl(CURL *handle, CURLSH *sharehandle,
253 curl_slist *clist) const;
254 Failures InitializeRequest(JobInfo *info, CURL *handle) const;
255 void SetUrlOptions(JobInfo *info) const;
256 void UpdateStatistics(CURL *handle);
257 bool CanRetry(const JobInfo *info);
258 void Backoff(JobInfo *info);
259 bool VerifyAndFinalize(const int curl_error, JobInfo *info);
260 std::string GetRequestString(const JobInfo &info) const;
261 std::string GetContentType(const JobInfo &info) const;
262 std::string GetUriEncode(const std::string &val, bool encode_slash) const;
263 std::string GetAwsV4SigningKey(const std::string &date) const;
264 bool MkPayloadHash(const JobInfo &info, std::string *hex_hash) const;
265 bool MkV2Authz(const JobInfo &info, std::vector<std::string> *headers) const;
266 bool MkV4Authz(const JobInfo &info, std::vector<std::string> *headers) const;
267 bool MkAzureAuthz(const JobInfo &info,
268 std::vector<std::string> *headers) const;
269 50102 std::string MkUrl(const std::string &objkey) const {
270
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50102 times.
50102 if (config_.dns_buckets) {
271 return config_.protocol + "://" + complete_hostname_ + "/" + objkey;
272 } else {
273
2/4
✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 50102 times.
✗ Branch 5 not taken.
100204 return config_.protocol + "://" + complete_hostname_ + "/"
274
3/6
✓ Branch 2 taken 50102 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 50102 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 50102 times.
✗ Branch 9 not taken.
150306 + config_.bucket + "/" + objkey;
275 }
276 }
277 492 std::string MkCompleteHostname() {
278
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 492 times.
492 if (config_.dns_buckets) {
279 return config_.bucket + "." + config_.hostname_port;
280 } else {
281 492 return config_.hostname_port;
282 }
283 }
284
285 const S3Config config_;
286 std::string complete_hostname_;
287
288 Prng prng_;
289 /**
290 * This is not strictly necessary but it helps the debugging
291 */
292 std::set<JobInfo *> *active_requests_;
293
294 std::set<CURL *> *pool_handles_idle_;
295 std::set<CURL *> *pool_handles_inuse_;
296 std::set<S3FanOutDnsEntry *> *sharehandles_;
297 std::map<CURL *, S3FanOutDnsEntry *> *curl_sharehandles_;
298 dns::CaresResolver *resolver_;
299 CURLM *curl_multi_;
300 std::string *user_agent_;
301
302 /**
303 * AWS4 signing keys are derived from the secret key, a region and a date.
304 * The signing key for current day can be cached.
305 */
306 mutable std::pair<std::string, std::string> last_signing_key_;
307
308 pthread_t thread_upload_;
309 atomic_int32 multi_threaded_;
310
311 struct pollfd *watch_fds_;
312 uint32_t watch_fds_size_;
313 uint32_t watch_fds_inuse_;
314 uint32_t watch_fds_max_;
315
316 // A pipe used to signal termination from S3FanoutManager to MainUpload
317 // thread. Anything written into it results in MainUpload thread exit.
318 int pipe_terminate_[2];
319 // A pipe to used to push jobs from S3FanoutManager to MainUpload thread.
320 // S3FanoutManager writes a JobInfo* pointer. MainUpload then reads the
321 // pointer and processes the job.
322 int pipe_jobs_[2];
323 // A pipe used to collect completed jobs. MainUpload writes in the
324 // pointer to the completed job. PopCompletedJob() used to
325 // retrieve pointer.
326 int pipe_completed_[2];
327
328 bool opt_ipv4_only_;
329
330 unsigned int max_available_jobs_;
331 Semaphore *available_jobs_;
332
333 // Writes and reads should be atomic because reading happens in a different
334 // thread than writing.
335 Statistics *statistics_;
336
337 // Report not every occurrence of throtteling but only every so often
338 uint64_t timestamp_last_throttle_report_;
339
340 bool is_curl_debug_;
341
342 /**
343 * Carries the path settings for SSL certificates
344 */
345 SslCertificateStore ssl_certificate_store_;
346
347 std::string dot_cvmfs_cache_control_header; // Cache-Control: max-age=...
348 }; // S3FanoutManager
349
350 std::string ComposeDeleteMultiXml(const std::vector<std::string> &keys);
351 unsigned ParseDeleteMultiResponse(const std::string &response,
352 std::vector<std::string> *error_keys,
353 std::vector<std::string> *error_codes,
354 std::vector<std::string> *error_messages);
355
356 } // namespace s3fanout
357
358 #endif // CVMFS_NETWORK_S3FANOUT_H_
359