| Directory: | cvmfs/ |
|---|---|
| File: | cvmfs/s3fanout.h |
| Date: | 2023-02-05 02:36:10 |
| Exec | Total | Coverage | |
|---|---|---|---|
| Lines: | 50 | 65 | 76.9% |
| Branches: | 11 | 30 | 36.7% |
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | /** | ||
| 2 | * This file is part of the CernVM File System. | ||
| 3 | */ | ||
| 4 | |||
| 5 | #ifndef CVMFS_S3FANOUT_H_ | ||
| 6 | #define CVMFS_S3FANOUT_H_ | ||
| 7 | |||
| 8 | #include <poll.h> | ||
| 9 | #include <semaphore.h> | ||
| 10 | |||
| 11 | #include <climits> | ||
| 12 | #include <cstdlib> | ||
| 13 | #include <map> | ||
| 14 | #include <set> | ||
| 15 | #include <string> | ||
| 16 | #include <utility> | ||
| 17 | #include <vector> | ||
| 18 | |||
| 19 | #include "dns.h" | ||
| 20 | #include "duplex_curl.h" | ||
| 21 | #include "ssl.h" | ||
| 22 | #include "util/concurrency.h" | ||
| 23 | #include "util/file_backed_buffer.h" | ||
| 24 | #include "util/mmap_file.h" | ||
| 25 | #include "util/pointer.h" | ||
| 26 | #include "util/prng.h" | ||
| 27 | #include "util/single_copy.h" | ||
| 28 | #include "util/smalloc.h" | ||
| 29 | |||
| 30 | namespace s3fanout { | ||
| 31 | |||
| 32 | enum AuthzMethods { | ||
| 33 | kAuthzAwsV2 = 0, | ||
| 34 | kAuthzAwsV4, | ||
| 35 | kAuthzAzure | ||
| 36 | }; | ||
| 37 | |||
| 38 | /** | ||
| 39 | * Possible return values. | ||
| 40 | */ | ||
| 41 | enum Failures { | ||
| 42 | kFailOk = 0, | ||
| 43 | kFailLocalIO, | ||
| 44 | kFailBadRequest, | ||
| 45 | kFailForbidden, | ||
| 46 | kFailHostResolve, | ||
| 47 | kFailHostConnection, | ||
| 48 | kFailNotFound, | ||
| 49 | kFailServiceUnavailable, | ||
| 50 | kFailRetry, | ||
| 51 | kFailOther, | ||
| 52 | |||
| 53 | kFailNumEntries | ||
| 54 | }; // Failures | ||
| 55 | |||
| 56 | |||
| 57 | ✗ | inline const char *Code2Ascii(const Failures error) { | |
| 58 | const char *texts[kFailNumEntries + 1]; | ||
| 59 | ✗ | texts[0] = "S3: OK"; | |
| 60 | ✗ | texts[1] = "S3: local I/O failure"; | |
| 61 | ✗ | texts[2] = "S3: malformed URL (bad request)"; | |
| 62 | ✗ | texts[3] = "S3: forbidden"; | |
| 63 | ✗ | texts[4] = "S3: failed to resolve host address"; | |
| 64 | ✗ | texts[5] = "S3: host connection problem"; | |
| 65 | ✗ | texts[6] = "S3: not found"; | |
| 66 | ✗ | texts[7] = "S3: service not available"; | |
| 67 | ✗ | texts[8] = "S3: unknown service error, perhaps wrong authentication protocol"; | |
| 68 | ✗ | texts[8] = "S3: too many requests, service asks for backoff and retry"; | |
| 69 | ✗ | texts[9] = "no text"; | |
| 70 | ✗ | return texts[error]; | |
| 71 | } | ||
| 72 | |||
| 73 | |||
| 74 | |||
| 75 | struct Statistics { | ||
| 76 | double transferred_bytes; | ||
| 77 | double transfer_time; | ||
| 78 | uint64_t num_requests; | ||
| 79 | uint64_t num_retries; | ||
| 80 | uint64_t ms_throttled; // Total waiting time imposed by HTTP 429 replies | ||
| 81 | |||
| 82 | 12 | Statistics() { | |
| 83 | 12 | transferred_bytes = 0.0; | |
| 84 | 12 | transfer_time = 0.0; | |
| 85 | 12 | num_requests = 0; | |
| 86 | 12 | num_retries = 0; | |
| 87 | 12 | ms_throttled = 0; | |
| 88 | 12 | } | |
| 89 | |||
| 90 | std::string Print() const; | ||
| 91 | }; // Statistics | ||
| 92 | |||
| 93 | |||
| 94 | /** | ||
| 95 | * Contains all the information to specify an upload job. | ||
| 96 | */ | ||
| 97 | struct JobInfo : SingleCopy { | ||
| 98 | enum RequestType { | ||
| 99 | kReqHeadOnly = 0, // peek | ||
| 100 | kReqHeadPut, // conditional upload of content-addressed objects | ||
| 101 | kReqPutCas, // immutable data object | ||
| 102 | kReqPutDotCvmfs, // one of the /.cvmfs... top level files | ||
| 103 | kReqPutHtml, // HTML file - display instead of downloading | ||
| 104 | kReqPutBucket, // bucket creation | ||
| 105 | kReqDelete, | ||
| 106 | }; | ||
| 107 | |||
| 108 | const std::string object_key; | ||
| 109 | void *callback; // Callback to be called when job is finished | ||
| 110 | UniquePtr<FileBackedBuffer> origin; | ||
| 111 | |||
| 112 | // One constructor per destination | ||
| 113 | 615 | JobInfo( | |
| 114 | const std::string &object_key, | ||
| 115 | void *callback, | ||
| 116 | FileBackedBuffer *origin) | ||
| 117 | 615 | : object_key(object_key), | |
| 118 |
1/2✓ Branch 2 taken 615 times.
✗ Branch 3 not taken.
|
1230 | origin(origin) |
| 119 | { | ||
| 120 | 615 | JobInfoInit(); | |
| 121 | 615 | this->callback = callback; | |
| 122 | 615 | } | |
| 123 | 615 | void JobInfoInit() { | |
| 124 | 615 | curl_handle = NULL; | |
| 125 | 615 | http_headers = NULL; | |
| 126 | 615 | callback = NULL; | |
| 127 | 615 | request = kReqPutCas; | |
| 128 | 615 | error_code = kFailOk; | |
| 129 | 615 | http_error = 0; | |
| 130 | 615 | num_retries = 0; | |
| 131 | 615 | backoff_ms = 0; | |
| 132 | 615 | throttle_ms = 0; | |
| 133 | 615 | throttle_timestamp = 0; | |
| 134 | 615 | errorbuffer = | |
| 135 | 615 | reinterpret_cast<char *>(smalloc(sizeof(char) * CURL_ERROR_SIZE)); | |
| 136 | 615 | } | |
| 137 | 1230 | ~JobInfo() { | |
| 138 | 615 | free(errorbuffer); | |
| 139 | 615 | } | |
| 140 | |||
| 141 | // Internal state, don't touch | ||
| 142 | CURL *curl_handle; | ||
| 143 | struct curl_slist *http_headers; | ||
| 144 | uint64_t payload_size; | ||
| 145 | RequestType request; | ||
| 146 | Failures error_code; | ||
| 147 | int http_error; | ||
| 148 | unsigned char num_retries; | ||
| 149 | // Exponential backoff with cutoff in case of errors | ||
| 150 | unsigned backoff_ms; | ||
| 151 | // Throttle imposed by HTTP 429 reply; mutually exclusive with backoff_ms | ||
| 152 | unsigned throttle_ms; | ||
| 153 | // Remember when the 429 reply came in to only throttle if still necessary | ||
| 154 | uint64_t throttle_timestamp; | ||
| 155 | char *errorbuffer; | ||
| 156 | }; // JobInfo | ||
| 157 | |||
| 158 | struct S3FanOutDnsEntry { | ||
| 159 |
1/2✓ Branch 4 taken 10 times.
✗ Branch 5 not taken.
|
10 | S3FanOutDnsEntry() : counter(0), dns_name(), ip(), port("80"), |
| 160 | 10 | clist(NULL), sharehandle(NULL) {} | |
| 161 | unsigned int counter; | ||
| 162 | std::string dns_name; | ||
| 163 | std::string ip; | ||
| 164 | std::string port; | ||
| 165 | struct curl_slist *clist; | ||
| 166 | CURLSH *sharehandle; | ||
| 167 | }; // S3FanOutDnsEntry | ||
| 168 | |||
| 169 | |||
| 170 | class S3FanoutManager : SingleCopy { | ||
| 171 | protected: | ||
| 172 | typedef SynchronizingCounter<uint32_t> Semaphore; | ||
| 173 | |||
| 174 | public: | ||
| 175 | // 250ms pause after HTTP 429 "Too Many Retries" | ||
| 176 | static const unsigned kDefault429ThrottleMs; | ||
| 177 | // Don't throttle for more than a few seconds | ||
| 178 | static const unsigned kMax429ThrottleMs; | ||
| 179 | // Report throttle operations only every so often | ||
| 180 | static const unsigned kThrottleReportIntervalSec; | ||
| 181 | static const unsigned kDefaultHTTPPort; | ||
| 182 | static const unsigned kDefaultHTTPSPort; | ||
| 183 | |||
| 184 | struct S3Config { | ||
| 185 | 12 | S3Config() { | |
| 186 | 12 | authz_method = kAuthzAwsV2; | |
| 187 | 12 | dns_buckets = true; | |
| 188 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | protocol = "http"; |
| 189 | 12 | pool_max_handles = 0; | |
| 190 | 12 | opt_timeout_sec = 20; | |
| 191 | 12 | opt_max_retries = 3; | |
| 192 | 12 | opt_backoff_init_ms = 100; | |
| 193 | 12 | opt_backoff_max_ms = 2000; | |
| 194 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | x_amz_acl = "public-read"; |
| 195 | 12 | } | |
| 196 | std::string access_key; | ||
| 197 | std::string secret_key; | ||
| 198 | std::string hostname_port; | ||
| 199 | AuthzMethods authz_method; | ||
| 200 | std::string region; | ||
| 201 | std::string flavor; | ||
| 202 | std::string bucket; | ||
| 203 | bool dns_buckets; | ||
| 204 | std::string protocol; | ||
| 205 | uint32_t pool_max_handles; | ||
| 206 | unsigned opt_timeout_sec; | ||
| 207 | unsigned opt_max_retries; | ||
| 208 | unsigned opt_backoff_init_ms; | ||
| 209 | unsigned opt_backoff_max_ms; | ||
| 210 | std::string proxy; | ||
| 211 | std::string x_amz_acl; | ||
| 212 | }; | ||
| 213 | |||
| 214 | static void DetectThrottleIndicator(const std::string &header, JobInfo *info); | ||
| 215 | |||
| 216 | explicit S3FanoutManager(const S3Config &config); | ||
| 217 | |||
| 218 | ~S3FanoutManager(); | ||
| 219 | |||
| 220 | void Spawn(); | ||
| 221 | |||
| 222 | void PushNewJob(JobInfo *info); | ||
| 223 | void PushCompletedJob(JobInfo *info); | ||
| 224 | JobInfo *PopCompletedJob(); | ||
| 225 | |||
| 226 | const Statistics &GetStatistics(); | ||
| 227 | |||
| 228 | private: | ||
| 229 | // Reflects the default Apache configuration of the local backend | ||
| 230 | static const char *kCacheControlCas; // Cache-Control: max-age=259200 | ||
| 231 | static const char *kCacheControlDotCvmfs; // Cache-Control: max-age=61 | ||
| 232 | static const unsigned kLowSpeedLimit = 1024; // Require at least 1kB/s | ||
| 233 | |||
| 234 | static int CallbackCurlSocket(CURL *easy, curl_socket_t s, int action, | ||
| 235 | void *userp, void *socketp); | ||
| 236 | static void *MainUpload(void *data); | ||
| 237 | std::vector<s3fanout::JobInfo*> jobs_todo_; | ||
| 238 | pthread_mutex_t *jobs_todo_lock_; | ||
| 239 | pthread_mutex_t *curl_handle_lock_; | ||
| 240 | |||
| 241 | CURL *AcquireCurlHandle() const; | ||
| 242 | void ReleaseCurlHandle(JobInfo *info, CURL *handle) const; | ||
| 243 | void InitPipeWatchFds(); | ||
| 244 | int InitializeDnsSettings(CURL *handle, | ||
| 245 | std::string remote_host) const; | ||
| 246 | void InitializeDnsSettingsCurl(CURL *handle, CURLSH *sharehandle, | ||
| 247 | curl_slist *clist) const; | ||
| 248 | Failures InitializeRequest(JobInfo *info, CURL *handle) const; | ||
| 249 | void SetUrlOptions(JobInfo *info) const; | ||
| 250 | void UpdateStatistics(CURL *handle); | ||
| 251 | bool CanRetry(const JobInfo *info); | ||
| 252 | void Backoff(JobInfo *info); | ||
| 253 | bool VerifyAndFinalize(const int curl_error, JobInfo *info); | ||
| 254 | std::string GetRequestString(const JobInfo &info) const; | ||
| 255 | std::string GetContentType(const JobInfo &info) const; | ||
| 256 | std::string GetUriEncode(const std::string &val, bool encode_slash) const; | ||
| 257 | std::string GetAwsV4SigningKey(const std::string &date) const; | ||
| 258 | bool MkPayloadHash(const JobInfo &info, std::string *hex_hash) const; | ||
| 259 | bool MkV2Authz(const JobInfo &info, | ||
| 260 | std::vector<std::string> *headers) const; | ||
| 261 | bool MkV4Authz(const JobInfo &info, | ||
| 262 | std::vector<std::string> *headers) const; | ||
| 263 | bool MkAzureAuthz(const JobInfo &info, | ||
| 264 | std::vector<std::string> *headers) const; | ||
| 265 | 1222 | std::string MkUrl(const std::string &objkey) const { | |
| 266 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1222 times.
|
1222 | if (config_.dns_buckets) { |
| 267 | ✗ | return config_.protocol + "://" + complete_hostname_ + "/" + objkey; | |
| 268 | } else { | ||
| 269 |
3/6✓ Branch 1 taken 1222 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1222 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 1222 times.
✗ Branch 8 not taken.
|
2444 | return config_.protocol + "://" + complete_hostname_ + "/" + |
| 270 |
2/4✓ Branch 2 taken 1222 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1222 times.
✗ Branch 6 not taken.
|
3666 | config_.bucket + "/" + objkey; |
| 271 | } | ||
| 272 | } | ||
| 273 | 12 | std::string MkCompleteHostname() { | |
| 274 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
|
12 | if (config_.dns_buckets) { |
| 275 | ✗ | return config_.bucket + "." + config_.hostname_port; | |
| 276 | } else { | ||
| 277 | 12 | return config_.hostname_port; | |
| 278 | } | ||
| 279 | } | ||
| 280 | |||
| 281 | const S3Config config_; | ||
| 282 | std::string complete_hostname_; | ||
| 283 | |||
| 284 | Prng prng_; | ||
| 285 | /** | ||
| 286 | * This is not strictly necessary but it helps the debugging | ||
| 287 | */ | ||
| 288 | std::set<JobInfo *> *active_requests_; | ||
| 289 | |||
| 290 | std::set<CURL *> *pool_handles_idle_; | ||
| 291 | std::set<CURL *> *pool_handles_inuse_; | ||
| 292 | std::set<S3FanOutDnsEntry *> *sharehandles_; | ||
| 293 | std::map<CURL *, S3FanOutDnsEntry *> *curl_sharehandles_; | ||
| 294 | dns::CaresResolver *resolver_; | ||
| 295 | CURLM *curl_multi_; | ||
| 296 | std::string *user_agent_; | ||
| 297 | |||
| 298 | /** | ||
| 299 | * AWS4 signing keys are derived from the secret key, a region and a date. | ||
| 300 | * The signing key for current day can be cached. | ||
| 301 | */ | ||
| 302 | mutable std::pair<std::string, std::string> last_signing_key_; | ||
| 303 | |||
| 304 | pthread_t thread_upload_; | ||
| 305 | atomic_int32 multi_threaded_; | ||
| 306 | |||
| 307 | struct pollfd *watch_fds_; | ||
| 308 | uint32_t watch_fds_size_; | ||
| 309 | uint32_t watch_fds_inuse_; | ||
| 310 | uint32_t watch_fds_max_; | ||
| 311 | |||
| 312 | // A pipe used to signal termination from S3FanoutManager to MainUpload | ||
| 313 | // thread. Anything written into it results in MainUpload thread exit. | ||
| 314 | int pipe_terminate_[2]; | ||
| 315 | // A pipe to used to push jobs from S3FanoutManager to MainUpload thread. | ||
| 316 | // S3FanoutManager writes a JobInfo* pointer. MainUpload then reads the | ||
| 317 | // pointer and processes the job. | ||
| 318 | int pipe_jobs_[2]; | ||
| 319 | // A pipe used to collect completed jobs. MainUpload writes in the | ||
| 320 | // pointer to the completed job. PopCompletedJob() used to | ||
| 321 | // retrieve pointer. | ||
| 322 | int pipe_completed_[2]; | ||
| 323 | |||
| 324 | bool opt_ipv4_only_; | ||
| 325 | |||
| 326 | unsigned int max_available_jobs_; | ||
| 327 | Semaphore *available_jobs_; | ||
| 328 | |||
| 329 | // Writes and reads should be atomic because reading happens in a different | ||
| 330 | // thread than writing. | ||
| 331 | Statistics *statistics_; | ||
| 332 | |||
| 333 | // Report not every occurance of throtteling but only every so often | ||
| 334 | uint64_t timestamp_last_throttle_report_; | ||
| 335 | |||
| 336 | bool is_curl_debug_; | ||
| 337 | |||
| 338 | /** | ||
| 339 | * Carries the path settings for SSL certificates | ||
| 340 | */ | ||
| 341 | SslCertificateStore ssl_certificate_store_; | ||
| 342 | }; // S3FanoutManager | ||
| 343 | |||
| 344 | } // namespace s3fanout | ||
| 345 | |||
| 346 | #endif // CVMFS_S3FANOUT_H_ | ||
| 347 |