| Directory: | cvmfs/ |
|---|---|
| File: | cvmfs/network/s3fanout.h |
| Date: | 2026-03-15 02:35:27 |
| Exec | Total | Coverage | |
|---|---|---|---|
| Lines: | 52 | 68 | 76.5% |
| Branches: | 11 | 30 | 36.7% |
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | /** | ||
| 2 | * This file is part of the CernVM File System. | ||
| 3 | */ | ||
| 4 | |||
| 5 | #ifndef CVMFS_NETWORK_S3FANOUT_H_ | ||
| 6 | #define CVMFS_NETWORK_S3FANOUT_H_ | ||
| 7 | |||
| 8 | #include <poll.h> | ||
| 9 | #include <semaphore.h> | ||
| 10 | |||
| 11 | #include <climits> | ||
| 12 | #include <cstdlib> | ||
| 13 | #include <map> | ||
| 14 | #include <set> | ||
| 15 | #include <string> | ||
| 16 | #include <utility> | ||
| 17 | #include <vector> | ||
| 18 | |||
| 19 | #include "dns.h" | ||
| 20 | #include "duplex_curl.h" | ||
| 21 | #include "ssl.h" | ||
| 22 | #include "util/concurrency.h" | ||
| 23 | #include "util/file_backed_buffer.h" | ||
| 24 | #include "util/mmap_file.h" | ||
| 25 | #include "util/pointer.h" | ||
| 26 | #include "util/prng.h" | ||
| 27 | #include "util/single_copy.h" | ||
| 28 | #include "util/smalloc.h" | ||
| 29 | |||
| 30 | namespace s3fanout { | ||
| 31 | |||
| 32 | enum AuthzMethods { | ||
| 33 | kAuthzAwsV2 = 0, | ||
| 34 | kAuthzAwsV4, | ||
| 35 | kAuthzAzure | ||
| 36 | }; | ||
| 37 | |||
| 38 | /** | ||
| 39 | * Possible return values. | ||
| 40 | */ | ||
| 41 | enum Failures { | ||
| 42 | kFailOk = 0, | ||
| 43 | kFailLocalIO, | ||
| 44 | kFailBadRequest, | ||
| 45 | kFailForbidden, | ||
| 46 | kFailHostResolve, | ||
| 47 | kFailHostConnection, | ||
| 48 | kFailNotFound, | ||
| 49 | kFailServiceUnavailable, | ||
| 50 | kFailRetry, | ||
| 51 | kFailInsufficientStorage, | ||
| 52 | kFailOther, | ||
| 53 | |||
| 54 | kFailNumEntries | ||
| 55 | }; // Failures | ||
| 56 | |||
| 57 | |||
| 58 | ✗ | inline const char *Code2Ascii(const Failures error) { | |
| 59 | const char *texts[kFailNumEntries + 1]; | ||
| 60 | ✗ | texts[0] = "S3: OK"; | |
| 61 | ✗ | texts[1] = "S3: local I/O failure"; | |
| 62 | ✗ | texts[2] = "S3: malformed URL (bad request)"; | |
| 63 | ✗ | texts[3] = "S3: forbidden"; | |
| 64 | ✗ | texts[4] = "S3: failed to resolve host address"; | |
| 65 | ✗ | texts[5] = "S3: host connection problem"; | |
| 66 | ✗ | texts[6] = "S3: not found"; | |
| 67 | ✗ | texts[7] = "S3: service not available"; | |
| 68 | ✗ | texts[8] = "S3: unknown service error, perhaps wrong authentication protocol"; | |
| 69 | ✗ | texts[8] = "S3: too many requests, service asks for backoff and retry"; | |
| 70 | ✗ | texts[9] = "S3: Insufficient Storage"; | |
| 71 | ✗ | texts[10] = "unclassified failure"; | |
| 72 | ✗ | return texts[error]; | |
| 73 | } | ||
| 74 | |||
| 75 | |||
| 76 | struct Statistics { | ||
| 77 | double transferred_bytes; | ||
| 78 | double transfer_time; | ||
| 79 | uint64_t num_requests; | ||
| 80 | uint64_t num_retries; | ||
| 81 | uint64_t ms_throttled; // Total waiting time imposed by HTTP 429 replies | ||
| 82 | |||
| 83 | 492 | Statistics() { | |
| 84 | 492 | transferred_bytes = 0.0; | |
| 85 | 492 | transfer_time = 0.0; | |
| 86 | 492 | num_requests = 0; | |
| 87 | 492 | num_retries = 0; | |
| 88 | 492 | ms_throttled = 0; | |
| 89 | 492 | } | |
| 90 | |||
| 91 | std::string Print() const; | ||
| 92 | }; // Statistics | ||
| 93 | |||
| 94 | |||
| 95 | /** | ||
| 96 | * Contains all the information to specify an upload job. | ||
| 97 | */ | ||
| 98 | struct JobInfo : SingleCopy { | ||
| 99 | enum RequestType { | ||
| 100 | kReqHeadOnly = 0, // peek | ||
| 101 | kReqHeadPut, // conditional upload of content-addressed objects | ||
| 102 | kReqPutCas, // immutable data object | ||
| 103 | kReqPutDotCvmfs, // one of the /.cvmfs... top level files | ||
| 104 | kReqPutHtml, // HTML file - display instead of downloading | ||
| 105 | kReqPutBucket, // bucket creation | ||
| 106 | kReqDelete, | ||
| 107 | kReqDeleteMulti, // S3 multi-object delete (POST /?delete) | ||
| 108 | }; | ||
| 109 | |||
| 110 | const std::string object_key; | ||
| 111 | void *callback; // Callback to be called when job is finished | ||
| 112 | UniquePtr<FileBackedBuffer> origin; | ||
| 113 | |||
| 114 | // One constructor per destination | ||
| 115 | 25209 | JobInfo(const std::string &object_key, | |
| 116 | void *callback, | ||
| 117 | FileBackedBuffer *origin) | ||
| 118 |
1/2✓ Branch 3 taken 25209 times.
✗ Branch 4 not taken.
|
25209 | : object_key(object_key), origin(origin) { |
| 119 | 25209 | JobInfoInit(); | |
| 120 | 25209 | this->callback = callback; | |
| 121 | 25209 | } | |
| 122 | 25209 | void JobInfoInit() { | |
| 123 | 25209 | curl_handle = NULL; | |
| 124 | 25209 | http_headers = NULL; | |
| 125 | 25209 | callback = NULL; | |
| 126 | 25209 | request = kReqPutCas; | |
| 127 | 25209 | error_code = kFailOk; | |
| 128 | 25209 | http_error = 0; | |
| 129 | 25209 | num_retries = 0; | |
| 130 | 25209 | backoff_ms = 0; | |
| 131 | 25209 | throttle_ms = 0; | |
| 132 | 25209 | throttle_timestamp = 0; | |
| 133 | 25209 | errorbuffer = reinterpret_cast<char *>( | |
| 134 | 25209 | smalloc(sizeof(char) * CURL_ERROR_SIZE)); | |
| 135 | 25209 | } | |
| 136 | 25209 | ~JobInfo() { free(errorbuffer); } | |
| 137 | |||
| 138 | // For kReqDeleteMulti: keys included in the batch, and response body | ||
| 139 | std::vector<std::string> multi_delete_keys; | ||
| 140 | std::string response_body; | ||
| 141 | |||
| 142 | // Internal state, don't touch | ||
| 143 | CURL *curl_handle; | ||
| 144 | struct curl_slist *http_headers; | ||
| 145 | uint64_t payload_size; | ||
| 146 | RequestType request; | ||
| 147 | Failures error_code; | ||
| 148 | int http_error; | ||
| 149 | unsigned char num_retries; | ||
| 150 | // Exponential backoff with cutoff in case of errors | ||
| 151 | unsigned backoff_ms; | ||
| 152 | // Throttle imposed by HTTP 429 reply; mutually exclusive with backoff_ms | ||
| 153 | unsigned throttle_ms; | ||
| 154 | // Remember when the 429 reply came in to only throttle if still necessary | ||
| 155 | uint64_t throttle_timestamp; | ||
| 156 | char *errorbuffer; | ||
| 157 | }; // JobInfo | ||
| 158 | |||
| 159 | struct S3FanOutDnsEntry { | ||
| 160 | 410 | S3FanOutDnsEntry() | |
| 161 | 410 | : counter(0) | |
| 162 | 410 | , dns_name() | |
| 163 | 410 | , ip() | |
| 164 |
1/2✓ Branch 2 taken 410 times.
✗ Branch 3 not taken.
|
410 | , port("80") |
| 165 | 410 | , clist(NULL) | |
| 166 | 410 | , sharehandle(NULL) { } | |
| 167 | unsigned int counter; | ||
| 168 | std::string dns_name; | ||
| 169 | std::string ip; | ||
| 170 | std::string port; | ||
| 171 | struct curl_slist *clist; | ||
| 172 | CURLSH *sharehandle; | ||
| 173 | }; // S3FanOutDnsEntry | ||
| 174 | |||
| 175 | |||
| 176 | class S3FanoutManager : SingleCopy { | ||
| 177 | protected: | ||
| 178 | typedef SynchronizingCounter<uint32_t> Semaphore; | ||
| 179 | |||
| 180 | public: | ||
| 181 | // 250ms pause after HTTP 429 "Too Many Retries" | ||
| 182 | static const unsigned kDefault429ThrottleMs; | ||
| 183 | // Don't throttle for more than a few seconds | ||
| 184 | static const unsigned kMax429ThrottleMs; | ||
| 185 | // Report throttle operations only every so often | ||
| 186 | static const unsigned kThrottleReportIntervalSec; | ||
| 187 | static const unsigned kDefaultHTTPPort; | ||
| 188 | static const unsigned kDefaultHTTPSPort; | ||
| 189 | |||
| 190 | struct S3Config { | ||
| 191 | 492 | S3Config() { | |
| 192 | 492 | authz_method = kAuthzAwsV2; | |
| 193 | 492 | dns_buckets = true; | |
| 194 |
1/2✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
|
492 | protocol = "http"; |
| 195 | 492 | pool_max_handles = 0; | |
| 196 | 492 | opt_timeout_sec = 20; | |
| 197 | 492 | opt_max_retries = 3; | |
| 198 | 492 | opt_backoff_init_ms = 100; | |
| 199 | 492 | opt_backoff_max_ms = 2000; | |
| 200 |
1/2✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
|
492 | x_amz_acl = "public-read"; |
| 201 | 492 | } | |
| 202 | std::string access_key; | ||
| 203 | std::string secret_key; | ||
| 204 | std::string hostname_port; | ||
| 205 | AuthzMethods authz_method; | ||
| 206 | std::string region; | ||
| 207 | std::string flavor; | ||
| 208 | std::string bucket; | ||
| 209 | bool dns_buckets; | ||
| 210 | std::string protocol; | ||
| 211 | uint32_t pool_max_handles; | ||
| 212 | unsigned opt_timeout_sec; | ||
| 213 | unsigned opt_max_retries; | ||
| 214 | unsigned opt_backoff_init_ms; | ||
| 215 | unsigned opt_backoff_max_ms; | ||
| 216 | std::string proxy; | ||
| 217 | std::string x_amz_acl; | ||
| 218 | }; | ||
| 219 | |||
| 220 | static void DetectThrottleIndicator(const std::string &header, JobInfo *info); | ||
| 221 | |||
| 222 | explicit S3FanoutManager(const S3Config &config); | ||
| 223 | |||
| 224 | ~S3FanoutManager(); | ||
| 225 | |||
| 226 | void Spawn(); | ||
| 227 | |||
| 228 | void PushNewJob(JobInfo *info); | ||
| 229 | void PushCompletedJob(JobInfo *info); | ||
| 230 | JobInfo *PopCompletedJob(); | ||
| 231 | |||
| 232 | const Statistics &GetStatistics(); | ||
| 233 | |||
| 234 | private: | ||
| 235 | // Reflects the default Apache configuration of the local backend | ||
| 236 | static const char *kCacheControlCas; // Cache-Control: max-age=259200 | ||
| 237 | static const unsigned kLowSpeedLimit = 1024; // Require at least 1kB/s | ||
| 238 | |||
| 239 | std::string MkDotCvmfsCacheControlHeader(unsigned defaultMaxAge=61, int overrideMaxAge=-1); | ||
| 240 | |||
| 241 | static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, | ||
| 242 | void *userp, void *socketp); | ||
| 243 | static void *MainUpload(void *data); | ||
| 244 | std::vector<s3fanout::JobInfo *> jobs_todo_; | ||
| 245 | pthread_mutex_t *jobs_todo_lock_; | ||
| 246 | pthread_mutex_t *curl_handle_lock_; | ||
| 247 | |||
| 248 | CURL *AcquireCurlHandle() const; | ||
| 249 | void ReleaseCurlHandle(JobInfo *info, CURL *handle) const; | ||
| 250 | void InitPipeWatchFds(); | ||
| 251 | int InitializeDnsSettings(CURL *handle, std::string remote_host) const; | ||
| 252 | void InitializeDnsSettingsCurl(CURL *handle, CURLSH *sharehandle, | ||
| 253 | curl_slist *clist) const; | ||
| 254 | Failures InitializeRequest(JobInfo *info, CURL *handle) const; | ||
| 255 | void SetUrlOptions(JobInfo *info) const; | ||
| 256 | void UpdateStatistics(CURL *handle); | ||
| 257 | bool CanRetry(const JobInfo *info); | ||
| 258 | void Backoff(JobInfo *info); | ||
| 259 | bool VerifyAndFinalize(const int curl_error, JobInfo *info); | ||
| 260 | std::string GetRequestString(const JobInfo &info) const; | ||
| 261 | std::string GetContentType(const JobInfo &info) const; | ||
| 262 | std::string GetUriEncode(const std::string &val, bool encode_slash) const; | ||
| 263 | std::string GetAwsV4SigningKey(const std::string &date) const; | ||
| 264 | bool MkPayloadHash(const JobInfo &info, std::string *hex_hash) const; | ||
| 265 | bool MkV2Authz(const JobInfo &info, std::vector<std::string> *headers) const; | ||
| 266 | bool MkV4Authz(const JobInfo &info, std::vector<std::string> *headers) const; | ||
| 267 | bool MkAzureAuthz(const JobInfo &info, | ||
| 268 | std::vector<std::string> *headers) const; | ||
| 269 | 50102 | std::string MkUrl(const std::string &objkey) const { | |
| 270 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 50102 times.
|
50102 | if (config_.dns_buckets) { |
| 271 | ✗ | return config_.protocol + "://" + complete_hostname_ + "/" + objkey; | |
| 272 | } else { | ||
| 273 |
2/4✓ Branch 1 taken 50102 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 50102 times.
✗ Branch 5 not taken.
|
100204 | return config_.protocol + "://" + complete_hostname_ + "/" |
| 274 |
3/6✓ Branch 2 taken 50102 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 50102 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 50102 times.
✗ Branch 9 not taken.
|
150306 | + config_.bucket + "/" + objkey; |
| 275 | } | ||
| 276 | } | ||
| 277 | 492 | std::string MkCompleteHostname() { | |
| 278 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 492 times.
|
492 | if (config_.dns_buckets) { |
| 279 | ✗ | return config_.bucket + "." + config_.hostname_port; | |
| 280 | } else { | ||
| 281 | 492 | return config_.hostname_port; | |
| 282 | } | ||
| 283 | } | ||
| 284 | |||
| 285 | const S3Config config_; | ||
| 286 | std::string complete_hostname_; | ||
| 287 | |||
| 288 | Prng prng_; | ||
| 289 | /** | ||
| 290 | * This is not strictly necessary but it helps the debugging | ||
| 291 | */ | ||
| 292 | std::set<JobInfo *> *active_requests_; | ||
| 293 | |||
| 294 | std::set<CURL *> *pool_handles_idle_; | ||
| 295 | std::set<CURL *> *pool_handles_inuse_; | ||
| 296 | std::set<S3FanOutDnsEntry *> *sharehandles_; | ||
| 297 | std::map<CURL *, S3FanOutDnsEntry *> *curl_sharehandles_; | ||
| 298 | dns::CaresResolver *resolver_; | ||
| 299 | CURLM *curl_multi_; | ||
| 300 | std::string *user_agent_; | ||
| 301 | |||
| 302 | /** | ||
| 303 | * AWS4 signing keys are derived from the secret key, a region and a date. | ||
| 304 | * The signing key for current day can be cached. | ||
| 305 | */ | ||
| 306 | mutable std::pair<std::string, std::string> last_signing_key_; | ||
| 307 | |||
| 308 | pthread_t thread_upload_; | ||
| 309 | atomic_int32 multi_threaded_; | ||
| 310 | |||
| 311 | struct pollfd *watch_fds_; | ||
| 312 | uint32_t watch_fds_size_; | ||
| 313 | uint32_t watch_fds_inuse_; | ||
| 314 | uint32_t watch_fds_max_; | ||
| 315 | |||
| 316 | // A pipe used to signal termination from S3FanoutManager to MainUpload | ||
| 317 | // thread. Anything written into it results in MainUpload thread exit. | ||
| 318 | int pipe_terminate_[2]; | ||
| 319 | // A pipe to used to push jobs from S3FanoutManager to MainUpload thread. | ||
| 320 | // S3FanoutManager writes a JobInfo* pointer. MainUpload then reads the | ||
| 321 | // pointer and processes the job. | ||
| 322 | int pipe_jobs_[2]; | ||
| 323 | // A pipe used to collect completed jobs. MainUpload writes in the | ||
| 324 | // pointer to the completed job. PopCompletedJob() used to | ||
| 325 | // retrieve pointer. | ||
| 326 | int pipe_completed_[2]; | ||
| 327 | |||
| 328 | bool opt_ipv4_only_; | ||
| 329 | |||
| 330 | unsigned int max_available_jobs_; | ||
| 331 | Semaphore *available_jobs_; | ||
| 332 | |||
| 333 | // Writes and reads should be atomic because reading happens in a different | ||
| 334 | // thread than writing. | ||
| 335 | Statistics *statistics_; | ||
| 336 | |||
| 337 | // Report not every occurrence of throtteling but only every so often | ||
| 338 | uint64_t timestamp_last_throttle_report_; | ||
| 339 | |||
| 340 | bool is_curl_debug_; | ||
| 341 | |||
| 342 | /** | ||
| 343 | * Carries the path settings for SSL certificates | ||
| 344 | */ | ||
| 345 | SslCertificateStore ssl_certificate_store_; | ||
| 346 | |||
| 347 | std::string dot_cvmfs_cache_control_header; // Cache-Control: max-age=... | ||
| 348 | }; // S3FanoutManager | ||
| 349 | |||
| 350 | std::string ComposeDeleteMultiXml(const std::vector<std::string> &keys); | ||
| 351 | unsigned ParseDeleteMultiResponse(const std::string &response, | ||
| 352 | std::vector<std::string> *error_keys, | ||
| 353 | std::vector<std::string> *error_codes, | ||
| 354 | std::vector<std::string> *error_messages); | ||
| 355 | |||
| 356 | } // namespace s3fanout | ||
| 357 | |||
| 358 | #endif // CVMFS_NETWORK_S3FANOUT_H_ | ||
| 359 |