GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/s3fanout.cc Lines: 591 743 79.5 %
Date: 2019-02-03 02:48:13 Branches: 243 440 55.2 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 *
4
 * Runs a thread using libcurls asynchronous I/O mode to push data to S3
5
 */
6
7
#include <pthread.h>
8
9
#include <algorithm>
10
#include <cerrno>
11
#include <utility>
12
13
#include "cvmfs_config.h"
14
#include "platform.h"
15
#include "s3fanout.h"
16
#include "upload_facility.h"
17
#include "util/posix.h"
18
#include "util/string.h"
19
#include "util_concurrency.h"
20
21
using namespace std;  // NOLINT
22
23
namespace s3fanout {
24
25
const char *S3FanoutManager::kCacheControlCas = "Cache-Control: max-age=259200";
26
const char *S3FanoutManager::kCacheControlDotCvmfs =
27
  "Cache-Control: max-age=61";
28
const unsigned S3FanoutManager::kDefault429ThrottleMs = 250;
29
const unsigned S3FanoutManager::kMax429ThrottleMs = 10000;
30
const unsigned S3FanoutManager::kThrottleReportIntervalSec = 60;
31
32
33
/**
34
 * Parses Retry-After and X-Retry-In headers attached to HTTP 429 responses
35
 */
36
105
void S3FanoutManager::DetectThrottleIndicator(
37
  const std::string &header,
38
  JobInfo *info)
39
{
40
105
  std::string value_str;
41

105
  if (HasPrefix(header, "retry-after:", true))
42
52
    value_str = header.substr(12);
43

105
  if (HasPrefix(header, "x-retry-in:", true))
44
3
    value_str = header.substr(11);
45
46
105
  value_str = Trim(value_str);
47
105
  if (!value_str.empty()) {
48
53
    unsigned value_ms = String2Uint64(value_str) * 1000;
49
53
    if (value_ms > 0)
50
52
      info->throttle_ms = std::min(value_ms, kMax429ThrottleMs);
51
  }
52
105
}
53
54
55
/**
56
 * Called by curl for every HTTP header. Not called for file:// transfers.
57
 */
58
12063
static size_t CallbackCurlHeader(void *ptr, size_t size, size_t nmemb,
59
                                 void *info_link) {
60
12063
  const size_t num_bytes = size*nmemb;
61
12063
  const string header_line(static_cast<const char *>(ptr), num_bytes);
62
12063
  JobInfo *info = static_cast<JobInfo *>(info_link);
63
64
  // Check for http status code errors
65

12063
  if (HasPrefix(header_line, "HTTP/1.", false)) {
66
6005
    if (header_line.length() < 10)
67
      return 0;
68
69
    unsigned i;
70

6005
    for (i = 8; (i < header_line.length()) && (header_line[i] == ' '); ++i) {}
71
72
6005
    if (header_line[i] == '2') {
73
2981
      return num_bytes;
74
    } else {
75
      LogCvmfs(kLogS3Fanout, kLogDebug, "http status error code: %s",
76
3024
               header_line.c_str());
77
3024
      if (header_line.length() < i+3) {
78
        LogCvmfs(kLogS3Fanout, kLogStderr, "S3: invalid HTTP response '%s'",
79
                 header_line.c_str());
80
        info->error_code = kFailOther;
81
        return 0;
82
      }
83
3024
      info->http_error = String2Int64(string(&header_line[i], 3));
84
85

3024
      switch (info->http_error) {
86
        case 429:
87
48
          info->error_code = kFailRetry;
88
48
          info->throttle_ms = S3FanoutManager::kDefault429ThrottleMs;
89
48
          info->throttle_timestamp = platform_monotonic_time();
90
48
          return num_bytes;
91
        case 503:
92
        case 502:  // Can happen if the S3 gateway-backend connection breaks
93
          info->error_code = kFailServiceUnavailable;
94
          break;
95
        case 501:
96
        case 400:
97
          info->error_code = kFailBadRequest;
98
          break;
99
        case 403:
100
          info->error_code = kFailForbidden;
101
          break;
102
        case 404:
103
2976
          info->error_code = kFailNotFound;
104
2976
          break;
105
        default:
106
          info->error_code = kFailOther;
107
      }
108
2976
      return 0;
109
    }
110
  }
111
112
6058
  if (info->error_code == kFailRetry) {
113
96
    S3FanoutManager::DetectThrottleIndicator(header_line, info);
114
  }
115
116
6058
  return num_bytes;
117
}
118
119
120
/**
121
 * Called by curl for every new chunk to upload.
122
 */
123
93233
static size_t CallbackCurlData(void *ptr, size_t size, size_t nmemb,
124
                               void *info_link) {
125
93233
  const size_t num_bytes = size*nmemb;
126
93233
  JobInfo *info = static_cast<JobInfo *>(info_link);
127
128
  // In case of 429, we potentially need to read all the headers so we can
129
  // only abort the transfer in the (probably empty) data section
130
93233
  if (info->error_code == kFailRetry)
131
    return 0;
132
133
93233
  LogCvmfs(kLogS3Fanout, kLogDebug, "Data callback with %d bytes", num_bytes);
134
135
93233
  if (num_bytes == 0)
136
    return 0;
137
138
93233
  if (info->origin == kOriginMem) {
139
28858
    const size_t avail_bytes = info->origin_mem.size - info->origin_mem.pos;
140
28858
    const size_t send_size = avail_bytes < num_bytes ? avail_bytes : num_bytes;
141
28858
    memcpy(ptr, info->origin_mem.data + info->origin_mem.pos, send_size);
142
28858
    info->origin_mem.pos += send_size;
143
28858
    LogCvmfs(kLogS3Fanout, kLogDebug, "mem pushed out %d bytes", send_size);
144
28858
    return send_size;
145
64375
  } else if (info->origin == kOriginPath) {
146
64375
    size_t read_bytes = fread(ptr, 1, num_bytes, info->origin_file);
147
64375
    if (read_bytes != num_bytes) {
148
695
      if (ferror(info->origin_file) != 0) {
149
        LogCvmfs(kLogS3Fanout, kLogStderr, "local I/O error reading %s",
150
                 info->origin_path.c_str());
151
        return CURL_READFUNC_ABORT;
152
      }
153
    }
154
64375
    LogCvmfs(kLogS3Fanout, kLogDebug, "file pushed out %d bytes", read_bytes);
155
64375
    return read_bytes;
156
  }
157
158
  return CURL_READFUNC_ABORT;
159
}
160
161
162
/**
163
 * Called when new curl sockets arrive or existing curl sockets depart.
164
 */
165
13408
int S3FanoutManager::CallbackCurlSocket(CURL *easy, curl_socket_t s, int action,
166
                                        void *userp, void *socketp) {
167
13408
  S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(userp);
168
13408
  const int ajobs = *s3fanout_mgr->available_jobs_;
169
  LogCvmfs(kLogS3Fanout, kLogDebug, "CallbackCurlSocket called with easy "
170
           "handle %p, socket %d, action %d, up %d, "
171
           "sp %d, fds_inuse %d, jobs %d",
172
           easy, s, action, userp,
173
13408
           socketp, s3fanout_mgr->watch_fds_inuse_, ajobs);
174
13408
  if (action == CURL_POLL_NONE)
175
    return 0;
176
177
  // Find s in watch_fds_
178
  unsigned index;
179
90745
  for (index = 0; index < s3fanout_mgr->watch_fds_inuse_; ++index) {
180
84824
    if (s3fanout_mgr->watch_fds_[index].fd == s)
181
7487
      break;
182
  }
183
  // Or create newly
184
13408
  if (index == s3fanout_mgr->watch_fds_inuse_) {
185
    // Extend array if necessary
186
5921
    if (s3fanout_mgr->watch_fds_inuse_ == s3fanout_mgr->watch_fds_size_) {
187
39
      s3fanout_mgr->watch_fds_size_ *= 2;
188
      s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
189
          srealloc(s3fanout_mgr->watch_fds_,
190
39
                   s3fanout_mgr->watch_fds_size_*sizeof(struct pollfd)));
191
    }
192
5921
    s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].fd = s;
193
5921
    s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].events = 0;
194
5921
    s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_].revents = 0;
195
5921
    s3fanout_mgr->watch_fds_inuse_++;
196
  }
197
198

13408
  switch (action) {
199
    case CURL_POLL_IN:
200
5921
      s3fanout_mgr->watch_fds_[index].events = POLLIN | POLLPRI;
201
5921
      break;
202
    case CURL_POLL_OUT:
203
21
      s3fanout_mgr->watch_fds_[index].events = POLLOUT | POLLWRBAND;
204
21
      break;
205
    case CURL_POLL_INOUT:
206
1545
      s3fanout_mgr->watch_fds_[index].events =
207
1545
          POLLIN | POLLPRI | POLLOUT | POLLWRBAND;
208
1545
      break;
209
    case CURL_POLL_REMOVE:
210
5921
      if (index < s3fanout_mgr->watch_fds_inuse_-1)
211
        s3fanout_mgr->watch_fds_[index] =
212
5390
            s3fanout_mgr->watch_fds_[s3fanout_mgr->watch_fds_inuse_-1];
213
5921
      s3fanout_mgr->watch_fds_inuse_--;
214
      // Shrink array if necessary
215

5921
      if ((s3fanout_mgr->watch_fds_inuse_ > s3fanout_mgr->watch_fds_max_) &&
216
          (s3fanout_mgr->watch_fds_inuse_ < s3fanout_mgr->watch_fds_size_/2)) {
217
        s3fanout_mgr->watch_fds_size_ /= 2;
218
        s3fanout_mgr->watch_fds_ = static_cast<struct pollfd *>(
219
            srealloc(s3fanout_mgr->watch_fds_,
220
                     s3fanout_mgr->watch_fds_size_*sizeof(struct pollfd)));
221
      }
222
      break;
223
    default:
224
      break;
225
  }
226
227
13408
  return 0;
228
}
229
230
231
/**
232
 * Worker thread event loop.
233
 */
234
99
void *S3FanoutManager::MainUpload(void *data) {
235
99
  LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread started");
236
99
  S3FanoutManager *s3fanout_mgr = static_cast<S3FanoutManager *>(data);
237
238
  // Don't schedule more jobs into the multi handle than the maximum number of
239
  // parallel connections.  This should prevent starvation and thus a timeout
240
  // of the authorization header (CVM-1339).
241
99
  unsigned jobs_in_flight = 0;
242
243
196024
  while (s3fanout_mgr->thread_upload_run_) {
244
195826
    JobInfo *info = NULL;
245
195826
    pthread_mutex_lock(s3fanout_mgr->jobs_todo_lock_);
246

195826
    if (!s3fanout_mgr->jobs_todo_.empty() &&
247
        (jobs_in_flight < s3fanout_mgr->pool_max_handles_))
248
    {
249
2966
      info = s3fanout_mgr->jobs_todo_.back();
250
2966
      s3fanout_mgr->jobs_todo_.pop_back();
251
    }
252
195826
    pthread_mutex_unlock(s3fanout_mgr->jobs_todo_lock_);
253
254
195826
    if (info != NULL) {
255
2966
      CURL *handle = s3fanout_mgr->AcquireCurlHandle();
256
2966
      if (handle == NULL) {
257
        LogCvmfs(kLogS3Fanout, kLogStderr, "Failed to acquire CURL handle.");
258
        assert(handle != NULL);
259
      }
260
261
      s3fanout::Failures init_failure =
262
2966
        s3fanout_mgr->InitializeRequest(info, handle);
263
2966
      if (init_failure != s3fanout::kFailOk) {
264
        LogCvmfs(kLogS3Fanout, kLogStderr,
265
                "Failed to initialize CURL handle (error: %d - %s | errno: %d)",
266
                 init_failure, Code2Ascii(init_failure), errno);
267
        abort();
268
      }
269
2966
      s3fanout_mgr->SetUrlOptions(info);
270
271
2966
      curl_multi_add_handle(s3fanout_mgr->curl_multi_, handle);
272
2966
      jobs_in_flight++;
273
2966
      int still_running = 0, retval = 0;
274
      retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
275
                                        CURL_SOCKET_TIMEOUT,
276
                                        0,
277
2966
                                        &still_running);
278
279
      LogCvmfs(kLogS3Fanout, kLogDebug,
280
               "curl_multi_socket_action: %d - %d",
281
2966
               retval, still_running);
282
    }
283
284
    // Check events with 1ms timeout
285
195826
    int timeout = 1;
286
    int retval = poll(s3fanout_mgr->watch_fds_, s3fanout_mgr->watch_fds_inuse_,
287
195826
                      timeout);
288
195826
    if (retval == 0) {
289
      // Handle timeout
290
123279
      int still_running = 0;
291
      retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
292
                                        CURL_SOCKET_TIMEOUT,
293
                                        0,
294
123279
                                        &still_running);
295
123279
      if (retval != CURLM_OK) {
296
        LogCvmfs(kLogS3Fanout, kLogStderr, "Error, timeout due to: %d", retval);
297
        assert(retval == CURLM_OK);
298
      }
299
72547
    } else if (retval < 0) {
300
      assert(errno == EINTR);
301
      continue;
302
    }
303
304
    // Activity on curl sockets
305
1274900
    for (unsigned i = 0; i < s3fanout_mgr->watch_fds_inuse_; ++i) {
306
1079074
      if (s3fanout_mgr->watch_fds_[i].revents) {
307
98183
        int ev_bitmask = 0;
308
98183
        if (s3fanout_mgr->watch_fds_[i].revents & (POLLIN | POLLPRI))
309
8750
          ev_bitmask |= CURL_CSELECT_IN;
310
98183
        if (s3fanout_mgr->watch_fds_[i].revents & (POLLOUT | POLLWRBAND))
311
89433
          ev_bitmask |= CURL_CSELECT_OUT;
312
98183
        if (s3fanout_mgr->watch_fds_[i].revents &
313
            (POLLERR | POLLHUP | POLLNVAL))
314
4
          ev_bitmask |= CURL_CSELECT_ERR;
315
98183
        s3fanout_mgr->watch_fds_[i].revents = 0;
316
317
98183
        int still_running = 0;
318
        retval = curl_multi_socket_action(s3fanout_mgr->curl_multi_,
319
98183
                                          s3fanout_mgr->watch_fds_[i].fd,
320
                                          ev_bitmask,
321
98183
                                          &still_running);
322
      }
323
    }
324
325
    // Check if transfers are completed
326
    CURLMsg *curl_msg;
327
    int msgs_in_queue;
328
397631
    while ((curl_msg = curl_multi_info_read(s3fanout_mgr->curl_multi_,
329
                                            &msgs_in_queue))) {
330
5979
      if (curl_msg->msg == CURLMSG_DONE) {
331
5979
        s3fanout_mgr->statistics_->num_requests++;
332
        JobInfo *info;
333
5979
        CURL *easy_handle = curl_msg->easy_handle;
334
5979
        int curl_error = curl_msg->data.result;
335
5979
        curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &info);
336
337
5979
        curl_multi_remove_handle(s3fanout_mgr->curl_multi_, easy_handle);
338
5979
        if (s3fanout_mgr->VerifyAndFinalize(curl_error, info)) {
339
3013
          curl_multi_add_handle(s3fanout_mgr->curl_multi_, easy_handle);
340
3013
          int still_running = 0;
341
          curl_multi_socket_action(s3fanout_mgr->curl_multi_,
342
                                   CURL_SOCKET_TIMEOUT,
343
                                   0,
344
3013
                                   &still_running);
345
        } else {
346
          // Return easy handle into pool and write result back
347
2966
          jobs_in_flight--;
348
2966
          s3fanout_mgr->ReleaseCurlHandle(info, easy_handle);
349
2966
          s3fanout_mgr->available_jobs_->Decrement();
350
351
2966
          pthread_mutex_lock(s3fanout_mgr->jobs_completed_lock_);
352
2966
          s3fanout_mgr->jobs_completed_.push_back(info);
353
2966
          pthread_mutex_unlock(s3fanout_mgr->jobs_completed_lock_);
354
        }
355
      }
356
    }
357
  }
358
359
99
  set<CURL *>::iterator i = s3fanout_mgr->pool_handles_inuse_->begin();
360
  const set<CURL *>::const_iterator i_end =
361
99
    s3fanout_mgr->pool_handles_inuse_->end();
362
99
  for (; i != i_end; ++i) {
363
    curl_multi_remove_handle(s3fanout_mgr->curl_multi_, *i);
364
    curl_easy_cleanup(*i);
365
  }
366
99
  s3fanout_mgr->pool_handles_inuse_->clear();
367
99
  free(s3fanout_mgr->watch_fds_);
368
369
99
  LogCvmfs(kLogS3Fanout, kLogDebug, "Upload I/O thread terminated");
370
99
  return NULL;
371
}
372
373
374
/**
375
 * Gets an idle CURL handle from the pool. Creates a new one and adds it to
376
 * the pool if necessary.
377
 */
378
3095
CURL *S3FanoutManager::AcquireCurlHandle() const {
379
  CURL *handle;
380
381
3095
  MutexLockGuard guard(curl_handle_lock_);
382
383
3095
  if (pool_handles_idle_->empty()) {
384
    CURLcode retval;
385
386
    // Create a new handle
387
216
    handle = curl_easy_init();
388
216
    assert(handle != NULL);
389
390
    // Other settings
391
216
    retval = curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
392
216
    assert(retval == CURLE_OK);
393
    retval = curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION,
394
216
                              CallbackCurlHeader);
395
216
    assert(retval == CURLE_OK);
396
216
    retval = curl_easy_setopt(handle, CURLOPT_READFUNCTION, CallbackCurlData);
397
216
    assert(retval == CURLE_OK);
398
  } else {
399
2879
    handle = *(pool_handles_idle_->begin());
400
2879
    pool_handles_idle_->erase(pool_handles_idle_->begin());
401
  }
402
403
3095
  pool_handles_inuse_->insert(handle);
404
405
3095
  return handle;
406
}
407
408
409
3095
void S3FanoutManager::ReleaseCurlHandle(JobInfo *info, CURL *handle) const {
410
3095
  if (info->http_headers) {
411
3095
    curl_slist_free_all(info->http_headers);
412
3095
    info->http_headers = NULL;
413
  }
414
415
3095
  MutexLockGuard guard(curl_handle_lock_);
416
417
3095
  set<CURL *>::iterator elem = pool_handles_inuse_->find(handle);
418
3095
  assert(elem != pool_handles_inuse_->end());
419
420
3095
  if (pool_handles_idle_->size() > pool_max_handles_) {
421
    CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, NULL);
422
    assert(retval == CURLE_OK);
423
    curl_easy_cleanup(handle);
424
    std::map<CURL *, S3FanOutDnsEntry *>::size_type retitems =
425
        curl_sharehandles_->erase(handle);
426
    assert(retitems == 1);
427
  } else {
428
3095
    pool_handles_idle_->insert(handle);
429
  }
430
431
3095
  pool_handles_inuse_->erase(elem);
432
3095
}
433
434
435
/**
436
 * The Amazon AWS 2 authorization header according to
437
 * http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html#ConstructingTheAuthenticationHeader
438
 */
439
6056
bool S3FanoutManager::MkV2Authz(const JobInfo &info, vector<string> *headers)
440
  const
441
{
442
6056
  string payload_hash;
443
6056
  bool retval = MkPayloadHash(info, &payload_hash);
444
6056
  if (!retval)
445
    return false;
446
6056
  string content_type = GetContentType(info);
447
6056
  string request = GetRequestString(info);
448
449
6056
  string timestamp = RfcTimestamp();
450
  string to_sign = request + "\n" +
451
                   payload_hash + "\n" +
452
                   content_type + "\n" +
453
                   timestamp + "\n" +
454
                   "x-amz-acl:public-read" + "\n" +  // default ACL
455
6056
                   "/" + info.bucket + "/" + info.object_key;
456
  LogCvmfs(kLogS3Fanout, kLogDebug, "%s string to sign for: %s",
457
6056
           request.c_str(), info.object_key.c_str());
458
459
6056
  shash::Any hmac;
460
6056
  hmac.algorithm = shash::kSha1;
461
  shash::Hmac(info.secret_key,
462
              reinterpret_cast<const unsigned char *>(to_sign.data()),
463
6056
              to_sign.length(), &hmac);
464
465
  headers->push_back("Authorization: AWS " + info.access_key + ":" +
466
                     Base64(string(reinterpret_cast<char *>(hmac.digest),
467
6056
                                   hmac.GetDigestSize())));
468
6056
  headers->push_back("Date: " + timestamp);
469
6056
  headers->push_back("x-amz-acl: public-read");
470
6056
  if (!payload_hash.empty())
471
2961
    headers->push_back("Content-MD5: " + payload_hash);
472
6056
  if (!content_type.empty())
473
2961
    headers->push_back("Content-Type: " + content_type);
474
6056
  return true;
475
}
476
477
478
string S3FanoutManager::GetUriEncode(const string &val, bool encode_slash)
479
  const
480
{
481
  string result;
482
  const unsigned len = val.length();
483
  result.reserve(len);
484
  for (unsigned i = 0; i < len; ++i) {
485
    char c = val[i];
486
    if ((c >= 'A' && c <= 'Z') ||
487
        (c >= 'a' && c <= 'z') ||
488
        (c >= '0' && c <= '9') ||
489
        c == '_' || c == '-' || c == '~' || c == '.')
490
    {
491
      result.push_back(c);
492
    } else if (c == '/') {
493
      if (encode_slash) {
494
        result += "%2F";
495
      } else {
496
        result.push_back(c);
497
      }
498
    } else {
499
      result.push_back('%');
500
      result.push_back((c / 16) + ((c / 16 <= 9) ? '0' : 'A'-10));
501
      result.push_back((c % 16) + ((c % 16 <= 9) ? '0' : 'A'-10));
502
    }
503
  }
504
  return result;
505
}
506
507
508
string S3FanoutManager::GetAwsV4SigningKey(
509
  const JobInfo &info,
510
  const string &date) const
511
{
512
  string id = info.secret_key + info.region + date;
513
  map<string, string>::const_iterator iter = signing_keys_.find(id);
514
  if (iter != signing_keys_.end())
515
    return iter->second;
516
517
  string date_key = shash::Hmac256("AWS4" + info.secret_key, date, true);
518
  string date_region_key = shash::Hmac256(date_key, info.region, true);
519
  string date_region_service_key = shash::Hmac256(date_region_key, "s3", true);
520
  string signing_key =
521
    shash::Hmac256(date_region_service_key, "aws4_request", true);
522
  signing_keys_[id] = signing_key;
523
  return signing_key;
524
}
525
526
527
/**
528
 * The Amazon AWS4 authorization header according to
529
 * http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-auth-using-authorization-header.html
530
 */
531
bool S3FanoutManager::MkV4Authz(const JobInfo &info, vector<string> *headers)
532
  const
533
{
534
  string payload_hash;
535
  bool retval = MkPayloadHash(info, &payload_hash);
536
  if (!retval)
537
    return false;
538
  string content_type = GetContentType(info);
539
  string timestamp = IsoTimestamp();
540
  string date = timestamp.substr(0, 8);
541
  vector<string> tokens = SplitString(info.hostname, ':');
542
  string host_only = tokens[0];
543
544
  string signed_headers;
545
  string canonical_headers;
546
  if (!content_type.empty()) {
547
    signed_headers += "content-type;";
548
    headers->push_back("Content-Type: " + content_type);
549
    canonical_headers += "content-type:" + content_type + "\n";
550
  }
551
  signed_headers += "host;x-amz-acl;x-amz-content-sha256;x-amz-date";
552
  canonical_headers +=
553
    "host:" + host_only + "\n" +
554
    "x-amz-acl:public-read\n"
555
    "x-amz-content-sha256:" + payload_hash + "\n" +
556
    "x-amz-date:" + timestamp + "\n";
557
558
  string scope = date + "/" + info.region + "/s3/aws4_request";
559
560
  string canonical_request =
561
    GetRequestString(info) + "\n" +
562
    GetUriEncode("/" + info.bucket + "/" + info.object_key, false) + "\n" +
563
    "\n" +
564
    canonical_headers + "\n" +
565
    signed_headers + "\n" +
566
    payload_hash;
567
568
  string hash_request = shash::Sha256String(canonical_request.c_str());
569
570
  string string_to_sign =
571
    "AWS4-HMAC-SHA256\n" +
572
    timestamp + "\n" +
573
    scope + "\n" +
574
    hash_request;
575
576
  string signing_key = GetAwsV4SigningKey(info, date);
577
  string signature = shash::Hmac256(signing_key, string_to_sign);
578
579
  headers->push_back("x-amz-acl: public-read");
580
  headers->push_back("x-amz-content-sha256: " + payload_hash);
581
  headers->push_back("x-amz-date: " + timestamp);
582
  headers->push_back(
583
    "Authorization: AWS4-HMAC-SHA256 "
584
    "Credential=" + info.access_key + "/" + scope + ","
585
    "SignedHeaders=" + signed_headers + ","
586
    "Signature=" + signature);
587
  return true;
588
}
589
590
591
6056
void S3FanoutManager::InitializeDnsSettingsCurl(
592
  CURL *handle,
593
  CURLSH *sharehandle,
594
  curl_slist *clist) const
595
{
596
6056
  CURLcode retval = curl_easy_setopt(handle, CURLOPT_SHARE, sharehandle);
597
6056
  assert(retval == CURLE_OK);
598
6056
  retval = curl_easy_setopt(handle, CURLOPT_RESOLVE, clist);
599
6056
  assert(retval == CURLE_OK);
600
6056
}
601
602
603
6056
int S3FanoutManager::InitializeDnsSettings(
604
  CURL *handle,
605
  std::string host_with_port) const
606
{
607
  // Use existing handle
608
  std::map<CURL *, S3FanOutDnsEntry *>::const_iterator it =
609
6056
      curl_sharehandles_->find(handle);
610
6056
  if (it != curl_sharehandles_->end()) {
611
    InitializeDnsSettingsCurl(handle, it->second->sharehandle,
612
5840
                              it->second->clist);
613
5840
    return 0;
614
  }
615
616
  // Remove port number if such exists
617

216
  if (!HasPrefix(host_with_port, "http://", false /*ignore_case*/))
618
216
    host_with_port = "http://" + host_with_port;
619
216
  std::string remote_host = dns::ExtractHost(host_with_port);
620
216
  std::string remote_port = dns::ExtractPort(host_with_port);
621
622
  // If we have the name already resolved, use the least used IP
623
216
  S3FanOutDnsEntry *useme = NULL;
624
216
  unsigned int usemin = UINT_MAX;
625
216
  std::set<S3FanOutDnsEntry *>::iterator its3 = sharehandles_->begin();
626
333
  for (; its3 != sharehandles_->end(); ++its3) {
627
117
    if ((*its3)->dns_name == remote_host) {
628
117
      if (usemin >= (*its3)->counter) {
629
117
        usemin = (*its3)->counter;
630
117
        useme = (*its3);
631
      }
632
    }
633
  }
634
216
  if (useme != NULL) {
635
    curl_sharehandles_->insert(std::pair<CURL *,
636
117
                               S3FanOutDnsEntry *>(handle, useme));
637
117
    useme->counter++;
638
117
    InitializeDnsSettingsCurl(handle, useme->sharehandle, useme->clist);
639
117
    return 0;
640
  }
641
642
  // We need to resolve the hostname
643
  // TODO(ssheikki): support ipv6 also...  if (opt_ipv4_only_)
644
99
  dns::Host host = resolver_->Resolve(remote_host);
645
99
  set<string> ipv4_addresses = host.ipv4_addresses();
646
99
  std::set<string>::iterator its = ipv4_addresses.begin();
647
99
  S3FanOutDnsEntry *dnse = NULL;
648
198
  for ( ; its != ipv4_addresses.end(); ++its) {
649
99
    dnse = new S3FanOutDnsEntry();
650
99
    dnse->counter = 0;
651
99
    dnse->dns_name = remote_host;
652

99
    dnse->port = remote_port.size() == 0 ? "80" : remote_port;
653
99
    dnse->ip = *its;
654
99
    dnse->clist = NULL;
655
    dnse->clist = curl_slist_append(dnse->clist,
656
                                    (dnse->dns_name+":"+
657
                                     dnse->port+":"+
658
99
                                     dnse->ip).c_str());
659
99
    dnse->sharehandle = curl_share_init();
660
99
    assert(dnse->sharehandle != NULL);
661
99
    CURLSHcode share_retval = curl_share_setopt(dnse->sharehandle,
662
                                                CURLSHOPT_SHARE,
663
                                                CURL_LOCK_DATA_DNS);
664
99
    assert(share_retval == CURLSHE_OK);
665
99
    sharehandles_->insert(dnse);
666
  }
667
99
  if (dnse == NULL) {
668
    LogCvmfs(kLogS3Fanout, kLogStderr | kLogSyslogErr,
669
             "Error: DNS resolve failed for address '%s'.",
670
             remote_host.c_str());
671
    assert(dnse != NULL);
672
    return -1;
673
  }
674
  curl_sharehandles_->insert(
675
99
    std::pair<CURL *, S3FanOutDnsEntry *>(handle, dnse));
676
99
  dnse->counter++;
677
99
  InitializeDnsSettingsCurl(handle, dnse->sharehandle, dnse->clist);
678
679
99
  return 0;
680
}
681
682
683
6056
bool S3FanoutManager::MkPayloadHash(const JobInfo &info, string *hex_hash)
684
  const
685
{
686

6056
  if ((info.request == JobInfo::kReqHead) ||
687
      (info.request == JobInfo::kReqDelete))
688
  {
689
3095
    switch (info.authz_method) {
690
      case kAuthzAwsV2:
691
3095
        hex_hash->clear();
692
3095
        break;
693
      case kAuthzAwsV4:
694
        // Sha256 over empty string
695
        *hex_hash =
696
          "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
697
        break;
698
      default:
699
        abort();
700
    }
701
3095
    return true;
702
  }
703
704
  // PUT, there is actually payload
705
2961
  shash::Any payload_hash(shash::kMd5);
706
  bool retval;
707
708
2961
  switch (info.origin) {
709
    case kOriginMem:
710
907
      switch (info.authz_method) {
711
        case kAuthzAwsV2:
712
          shash::HashMem(info.origin_mem.data, info.origin_mem.size,
713
907
                         &payload_hash);
714
          *hex_hash =
715
            Base64(string(reinterpret_cast<char *>(payload_hash.digest),
716
907
                          payload_hash.GetDigestSize()));
717
907
          return true;
718
        case kAuthzAwsV4:
719
          *hex_hash =
720
            shash::Sha256Mem(info.origin_mem.data, info.origin_mem.size);
721
          return true;
722
        default:
723
          abort();
724
      }
725
    case kOriginPath:
726
2054
      switch (info.authz_method) {
727
        case kAuthzAwsV2:
728
2054
          retval = shash::HashFile(info.origin_path, &payload_hash);
729
2054
          if (!retval) {
730
            LogCvmfs(kLogS3Fanout, kLogStderr,
731
                     "failed to hash file %s (errno: %d)",
732
                     info.origin_path.c_str(), errno);
733
            return false;
734
          }
735
          *hex_hash =
736
            Base64(string(reinterpret_cast<char *>(payload_hash.digest),
737
2054
                          payload_hash.GetDigestSize()));
738
2054
          return true;
739
        case kAuthzAwsV4:
740
          *hex_hash = shash::Sha256File(info.origin_path);
741
          if (hex_hash->empty()) {
742
            LogCvmfs(kLogS3Fanout, kLogStderr,
743
                     "failed to hash file %s (errno: %d)",
744
                     info.origin_path.c_str(), errno);
745
            return false;
746
          }
747
          return true;
748
        default:
749
          abort();
750
      }
751
    default:
752
      abort();
753
  }
754
}
755
756
757
6056
bool S3FanoutManager::MkPayloadSize(const JobInfo &info, uint64_t *size) const {
758

6056
  if ((info.request == JobInfo::kReqHead) ||
759
      (info.request == JobInfo::kReqDelete))
760
  {
761
3095
    *size = 0;
762
3095
    return true;
763
  }
764
765
  int64_t file_size;
766
2961
  switch (info.origin) {
767
    case kOriginMem:
768
907
      *size = info.origin_mem.size;
769
907
      return true;
770
    case kOriginPath:
771
2054
      file_size = GetFileSize(info.origin_path);
772
2054
      if (file_size < 0) {
773
        LogCvmfs(kLogS3Fanout, kLogStderr, "failed to stat file %s (errno: %d)",
774
                 info.origin_path.c_str(), errno);
775
        return false;
776
      }
777
2054
      *size = file_size;
778
2054
      return true;
779
    default:
780
      abort();
781
  }
782
}
783
784
785
6061
string S3FanoutManager::GetRequestString(const JobInfo &info) const {
786

6061
  switch (info.request) {
787
    case JobInfo::kReqHead:
788
3090
      return "HEAD";
789
    case JobInfo::kReqPutCas:
790
      // fall through
791
    case JobInfo::kReqPutDotCvmfs:
792
2961
      return "PUT";
793
    case JobInfo::kReqDelete:
794
10
      return "DELETE";
795
    default:
796
      abort();
797
  }
798
}
799
800
801
6056
string S3FanoutManager::GetContentType(const JobInfo &info) const {
802

6056
  switch (info.request) {
803
    case JobInfo::kReqHead:
804
      // fall through
805
    case JobInfo::kReqDelete:
806
3095
      return "";
807
    case JobInfo::kReqPutCas:
808
2961
      return "application/octet-stream";
809
    case JobInfo::kReqPutDotCvmfs:
810
      return "application/x-cvmfs";
811
    default:
812
      abort();
813
  }
814
}
815
816
817
/**
818
 * Request parameters set the URL and other options such as timeout and
819
 * proxy.
820
 */
821
6056
Failures S3FanoutManager::InitializeRequest(JobInfo *info, CURL *handle) const {
822
  // Initialize internal download state
823
6056
  info->curl_handle = handle;
824
6056
  info->error_code = kFailOk;
825
6056
  info->http_error = 0;
826
6056
  info->num_retries = 0;
827
6056
  info->backoff_ms = 0;
828
6056
  info->throttle_ms = 0;
829
6056
  info->throttle_timestamp = 0;
830
6056
  info->http_headers = NULL;
831
832
6056
  InitializeDnsSettings(handle, info->hostname);
833
834
  bool retval_b;
835
  uint64_t payload_size;
836
6056
  retval_b = MkPayloadSize(*info, &payload_size);
837
6056
  if (!retval_b)
838
    return kFailLocalIO;
839
840
  CURLcode retval;
841

9151
  if (info->request == JobInfo::kReqHead ||
842
      info->request == JobInfo::kReqDelete)
843
  {
844
3095
    retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 0);
845
3095
    assert(retval == CURLE_OK);
846
3095
    retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 1);
847
3095
    assert(retval == CURLE_OK);
848
    info->http_headers =
849
3095
      curl_slist_append(info->http_headers, "Content-Length: 0");
850
851
3095
    if (info->request == JobInfo::kReqDelete) {
852
      retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST,
853
5
                                GetRequestString(*info).c_str());
854
5
      assert(retval == CURLE_OK);
855
    } else {
856
3090
      retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
857
3090
      assert(retval == CURLE_OK);
858
    }
859
  } else {
860
2961
    retval = curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, NULL);
861
2961
    assert(retval == CURLE_OK);
862
2961
    retval = curl_easy_setopt(handle, CURLOPT_UPLOAD, 1);
863
2961
    assert(retval == CURLE_OK);
864
2961
    retval = curl_easy_setopt(handle, CURLOPT_NOBODY, 0);
865
2961
    assert(retval == CURLE_OK);
866
    retval = curl_easy_setopt(handle, CURLOPT_INFILESIZE_LARGE,
867
2961
                              static_cast<curl_off_t>(payload_size));
868
2961
    assert(retval == CURLE_OK);
869
2961
    if (info->origin == kOriginPath) {
870
2054
      assert(info->origin_file == NULL);
871
2054
      info->origin_file = fopen(info->origin_path.c_str(), "r");
872
2054
      if (info->origin_file == NULL) {
873
        LogCvmfs(kLogS3Fanout, kLogStderr, "failed to open file %s (errno: %d)",
874
                 info->origin_path.c_str(), errno);
875
        return kFailLocalIO;
876
      }
877
    }
878
879
2961
    if (info->request == JobInfo::kReqPutDotCvmfs) {
880
      info->http_headers =
881
          curl_slist_append(info->http_headers, kCacheControlDotCvmfs);
882
    } else {
883
      info->http_headers =
884
2961
          curl_slist_append(info->http_headers, kCacheControlCas);
885
    }
886
  }
887
888
  // Authorization
889
6056
  vector<string> authz_headers;
890
6056
  switch (info->authz_method) {
891
    case kAuthzAwsV2:
892
6056
      retval_b = MkV2Authz(*info, &authz_headers);
893
6056
      break;
894
    case kAuthzAwsV4:
895
      retval_b = MkV4Authz(*info, &authz_headers);
896
      break;
897
    default:
898
      abort();
899
  }
900
6056
  if (!retval_b)
901
    return kFailLocalIO;
902
30146
  for (unsigned i = 0; i < authz_headers.size(); ++i) {
903
    info->http_headers =
904
24090
      curl_slist_append(info->http_headers, authz_headers[i].c_str());
905
  }
906
907
  // Common headers
908
  info->http_headers =
909
6056
      curl_slist_append(info->http_headers, "Connection: Keep-Alive");
910
6056
  info->http_headers = curl_slist_append(info->http_headers, "Pragma:");
911
  // No 100-continue
912
6056
  info->http_headers = curl_slist_append(info->http_headers, "Expect:");
913
  // Strip unnecessary header
914
6056
  info->http_headers = curl_slist_append(info->http_headers, "Accept:");
915
  info->http_headers = curl_slist_append(info->http_headers,
916
6056
                                         user_agent_->c_str());
917
918
  // Set curl parameters
919
6056
  retval = curl_easy_setopt(handle, CURLOPT_PRIVATE, static_cast<void *>(info));
920
6056
  assert(retval == CURLE_OK);
921
  retval = curl_easy_setopt(handle, CURLOPT_WRITEHEADER,
922
6056
                            static_cast<void *>(info));
923
6056
  assert(retval == CURLE_OK);
924
  retval = curl_easy_setopt(handle, CURLOPT_READDATA,
925
6056
                            static_cast<void *>(info));
926
6056
  assert(retval == CURLE_OK);
927
6056
  retval = curl_easy_setopt(handle, CURLOPT_HTTPHEADER, info->http_headers);
928
6056
  assert(retval == CURLE_OK);
929
6056
  if (opt_ipv4_only_) {
930
    retval = curl_easy_setopt(handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
931
    assert(retval == CURLE_OK);
932
  }
933
  // Follow HTTP redirects
934
6056
  retval = curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L);
935
6056
  assert(retval == CURLE_OK);
936
937
6056
  return kFailOk;
938
}
939
940
941
/**
942
 * Sets the URL specific options such as host to use and timeout.
943
 */
944
6056
void S3FanoutManager::SetUrlOptions(JobInfo *info) const {
945
6056
  CURL *curl_handle = info->curl_handle;
946
  CURLcode retval;
947
948
6056
  pthread_mutex_lock(lock_options_);
949
6056
  retval = curl_easy_setopt(curl_handle, CURLOPT_CONNECTTIMEOUT, opt_timeout_);
950
6056
  assert(retval == CURLE_OK);
951
6056
  pthread_mutex_unlock(lock_options_);
952
953
6056
  string url = MkUrl(info->hostname, info->bucket, (info->object_key));
954
6056
  retval = curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
955
6056
  assert(retval == CURLE_OK);
956
6056
}
957
958
959
/**
960
 * Adds transfer time and uploaded bytes to the global counters.
961
 */
962
5979
void S3FanoutManager::UpdateStatistics(CURL *handle) {
963
  double val;
964
965
5979
  if (curl_easy_getinfo(handle, CURLINFO_SIZE_UPLOAD, &val) == CURLE_OK)
966
5979
    statistics_->transferred_bytes += val;
967
5979
}
968
969
970
/**
971
 * Retry if possible and if not already done too often.
972
 */
973
52
bool S3FanoutManager::CanRetry(const JobInfo *info) {
974
52
  pthread_mutex_lock(lock_options_);
975
52
  unsigned max_retries = opt_max_retries_;
976
52
  pthread_mutex_unlock(lock_options_);
977
978
  return
979
      (info->error_code == kFailHostConnection ||
980
       info->error_code == kFailHostResolve ||
981
       info->error_code == kFailServiceUnavailable ||
982
       info->error_code == kFailRetry) &&
983


52
      (info->num_retries < max_retries);
984
}
985
986
987
/**
988
 * Backoff for retry to introduce a jitter into a upload sequence.
989
 *
990
 * \return true if backoff has been performed, false otherwise
991
 */
992
52
void S3FanoutManager::Backoff(JobInfo *info) {
993
52
  pthread_mutex_lock(lock_options_);
994
52
  unsigned backoff_init_ms = opt_backoff_init_ms_;
995
52
  unsigned backoff_max_ms = opt_backoff_max_ms_;
996
52
  pthread_mutex_unlock(lock_options_);
997
998
52
  if (info->error_code != kFailRetry)
999
4
    info->num_retries++;
1000
52
  statistics_->num_retries++;
1001
1002
52
  if (info->throttle_ms > 0) {
1003
    LogCvmfs(kLogS3Fanout, kLogDebug, "throttling for %d ms",
1004
48
             info->throttle_ms);
1005
48
    uint64_t now = platform_monotonic_time();
1006
48
    if ((info->throttle_timestamp + (info->throttle_ms / 1000)) > now) {
1007
48
      if ((now - timestamp_last_throttle_report_) > kThrottleReportIntervalSec)
1008
      {
1009
        LogCvmfs(kLogS3Fanout, kLogStdout,
1010
48
                 "Warning: S3 backend throttling (%ums)", info->throttle_ms);
1011
      }
1012
48
      statistics_->ms_throttled += info->throttle_ms;
1013
48
      SafeSleepMs(info->throttle_ms);
1014
    }
1015
  } else {
1016
4
    if (info->backoff_ms == 0) {
1017
4
      info->backoff_ms = prng_.Next(backoff_init_ms + 1);  // Must be != 0
1018
    } else {
1019
      info->backoff_ms *= 2;
1020
    }
1021
4
    if (info->backoff_ms > backoff_max_ms)
1022
      info->backoff_ms = backoff_max_ms;
1023
1024
    LogCvmfs(kLogS3Fanout, kLogDebug, "backing off for %d ms",
1025
4
             info->backoff_ms);
1026
4
    SafeSleepMs(info->backoff_ms);
1027
  }
1028
52
}
1029
1030
1031
/**
1032
 * Checks the result of a curl request and implements the failure logic
1033
 * and takes care of cleanup.
1034
 *
1035
 * @return true if request should be repeated, false otherwise
1036
 */
1037
5979
bool S3FanoutManager::VerifyAndFinalize(const int curl_error, JobInfo *info) {
1038
  LogCvmfs(kLogS3Fanout, kLogDebug, "Verify uploaded/tested object %s "
1039
           "(curl error %d, info error %d, info request %d)",
1040
           info->object_key.c_str(),
1041
5979
           curl_error, info->error_code, info->request);
1042
5979
  UpdateStatistics(info->curl_handle);
1043
1044
  // Verification and error classification
1045

5979
  switch (curl_error) {
1046
    case CURLE_OK:
1047
3014
      if (info->error_code != kFailRetry)
1048
2966
        info->error_code = kFailOk;
1049
3014
      break;
1050
    case CURLE_UNSUPPORTED_PROTOCOL:
1051
    case CURLE_URL_MALFORMAT:
1052
      info->error_code = kFailBadRequest;
1053
      break;
1054
    case CURLE_COULDNT_RESOLVE_HOST:
1055
      info->error_code = kFailHostResolve;
1056
      break;
1057
    case CURLE_COULDNT_CONNECT:
1058
    case CURLE_OPERATION_TIMEDOUT:
1059
    case CURLE_SEND_ERROR:
1060
    case CURLE_RECV_ERROR:
1061
4
      info->error_code = kFailHostConnection;
1062
4
      break;
1063
    case CURLE_ABORTED_BY_CALLBACK:
1064
    case CURLE_WRITE_ERROR:
1065
      // Error set by callback
1066
2961
      break;
1067
    default:
1068
      LogCvmfs(kLogS3Fanout, kLogStderr | kLogSyslogErr,
1069
               "unexpected curl error (%d) while trying to upload %s",
1070
               curl_error, info->object_key.c_str());
1071
      info->error_code = kFailOther;
1072
      break;
1073
  }
1074
1075
  // Transform HEAD to PUT request
1076

5979
  if ((info->error_code == kFailNotFound) &&
1077
      (info->request == JobInfo::kReqHead)) {
1078
    LogCvmfs(kLogS3Fanout, kLogDebug, "not found: %s, uploading",
1079
2961
             info->object_key.c_str());
1080
2961
    info->request = JobInfo::kReqPutCas;
1081
2961
    curl_slist_free_all(info->http_headers);
1082
2961
    info->http_headers = NULL;
1083
    s3fanout::Failures init_failure = InitializeRequest(info,
1084
2961
                                                        info->curl_handle);
1085
1086
2961
    if (init_failure != s3fanout::kFailOk) {
1087
      LogCvmfs(kLogS3Fanout, kLogStderr, "Failed to initialize CURL handle "
1088
                                         "(error: %d - %s | errno: %d)",
1089
               init_failure, Code2Ascii(init_failure), errno);
1090
      abort();
1091
    }
1092
2961
    SetUrlOptions(info);
1093
    // Reset origin
1094
2961
    if (info->origin == kOriginMem)
1095
907
      info->origin_mem.pos = 0;
1096
2961
    if (info->origin == kOriginPath)
1097
2054
      rewind(info->origin_file);
1098
2961
    return true;  // Again, Put
1099
  }
1100
1101
  // Determination if failed request should be repeated
1102
3018
  bool try_again = false;
1103
3018
  if (info->error_code != kFailOk) {
1104
52
    try_again = CanRetry(info);
1105
  }
1106
3018
  if (try_again) {
1107

52
    if (info->request == JobInfo::kReqPutCas ||
1108
        info->request == JobInfo::kReqPutDotCvmfs) {
1109
      LogCvmfs(kLogS3Fanout, kLogDebug, "Trying again to upload %s",
1110
               info->object_key.c_str());
1111
      // Reset origin
1112
      if (info->origin == kOriginMem)
1113
        info->origin_mem.pos = 0;
1114
      if (info->origin == kOriginPath) {
1115
        assert(info->origin_file != NULL);
1116
        rewind(info->origin_file);
1117
      }
1118
    }
1119
52
    Backoff(info);
1120
52
    return true;  // try again
1121
  }
1122
1123
  // Cleanup opened resources
1124
2966
  if (info->origin == kOriginPath) {
1125
2054
    assert(info->mmf == NULL);
1126
2054
    if (info->origin_file != NULL) {
1127
2054
      if (fclose(info->origin_file) != 0)
1128
        info->error_code = kFailLocalIO;
1129
2054
      info->origin_file = NULL;
1130
    }
1131
912
  } else if (info->origin == kOriginMem) {
1132
912
    assert(info->origin_file == NULL);
1133
912
    if (info->mmf != NULL) {
1134
907
      info->mmf->Unmap();
1135
907
      delete info->mmf;
1136
907
      info->mmf = NULL;
1137
    }
1138
  }
1139
1140

2966
  if ((info->error_code != kFailOk) &&
1141
      (info->http_error != 0) && (info->http_error != 404))
1142
  {
1143
    LogCvmfs(kLogS3Fanout, kLogStderr, "S3: HTTP failure %d", info->http_error);
1144
  }
1145
2966
  return false;  // stop transfer
1146
}
1147
1148
99
S3FanoutManager::S3FanoutManager() {
1149
99
  pool_handles_idle_ = NULL;
1150
99
  pool_handles_inuse_ = NULL;
1151
99
  sharehandles_ = NULL;
1152
99
  curl_sharehandles_ = NULL;
1153
99
  pool_max_handles_ = 0;
1154
99
  curl_multi_ = NULL;
1155
99
  user_agent_ = NULL;
1156
1157
99
  dns_buckets_ = true;
1158
1159
99
  atomic_init32(&multi_threaded_);
1160
99
  watch_fds_ = NULL;
1161
99
  watch_fds_size_ = 0;
1162
99
  watch_fds_inuse_ = 0;
1163
99
  watch_fds_max_ = 0;
1164
1165
  lock_options_ =
1166
99
      reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1167
99
  int retval = pthread_mutex_init(lock_options_, NULL);
1168
99
  assert(retval == 0);
1169
  jobs_completed_lock_ =
1170
99
      reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1171
99
  retval = pthread_mutex_init(jobs_completed_lock_, NULL);
1172
99
  assert(retval == 0);
1173
  jobs_todo_lock_ =
1174
99
      reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1175
99
  retval = pthread_mutex_init(jobs_todo_lock_, NULL);
1176
99
  assert(retval == 0);
1177
  curl_handle_lock_ =
1178
99
      reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
1179
99
  retval = pthread_mutex_init(curl_handle_lock_, NULL);
1180
99
  assert(retval == 0);
1181
1182
99
  opt_timeout_ = 0;
1183
99
  opt_max_retries_ = 0;
1184
99
  opt_backoff_init_ms_ = 0;
1185
99
  opt_backoff_max_ms_ = 0;
1186
99
  opt_ipv4_only_ = false;
1187
1188
99
  max_available_jobs_ = 0;
1189
99
  thread_upload_ = 0;
1190
99
  thread_upload_run_ = false;
1191
99
  resolver_ = NULL;
1192
99
  available_jobs_ = NULL;
1193
99
  statistics_ = NULL;
1194
99
  timestamp_last_throttle_report_ = 0;
1195
99
}
1196
1197
1198
99
S3FanoutManager::~S3FanoutManager() {
1199
99
  pthread_mutex_destroy(lock_options_);
1200
99
  free(lock_options_);
1201
99
  pthread_mutex_destroy(jobs_completed_lock_);
1202
99
  free(jobs_completed_lock_);
1203
99
  pthread_mutex_destroy(jobs_todo_lock_);
1204
99
  free(jobs_todo_lock_);
1205
99
  pthread_mutex_destroy(curl_handle_lock_);
1206
99
  free(curl_handle_lock_);
1207
99
}
1208
1209
99
void S3FanoutManager::Init(const unsigned int max_pool_handles,
1210
                           bool dns_buckets) {
1211
99
  atomic_init32(&multi_threaded_);
1212
99
  CURLcode retval = curl_global_init(CURL_GLOBAL_ALL);
1213
99
  assert(retval == CURLE_OK);
1214
99
  pool_handles_idle_ = new set<CURL *>;
1215
99
  pool_handles_inuse_ = new set<CURL *>;
1216
99
  curl_sharehandles_ = new map<CURL *, S3FanOutDnsEntry *>;
1217
99
  sharehandles_ = new set<S3FanOutDnsEntry *>;
1218
99
  pool_max_handles_ = max_pool_handles;
1219
99
  watch_fds_max_ = 4 * pool_max_handles_;
1220
1221
99
  max_available_jobs_ = 4 * pool_max_handles_;
1222
99
  available_jobs_ = new Semaphore(max_available_jobs_);
1223
99
  assert(NULL != available_jobs_);
1224
1225
99
  opt_timeout_ = 20;
1226
99
  statistics_ = new Statistics();
1227
99
  user_agent_ = new string();
1228
99
  *user_agent_ = "User-Agent: cvmfs " + string(VERSION);
1229
1230
99
  dns_buckets_ = dns_buckets;
1231
1232
99
  curl_multi_ = curl_multi_init();
1233
99
  assert(curl_multi_ != NULL);
1234
  CURLMcode mretval;
1235
  mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETFUNCTION,
1236
99
                              CallbackCurlSocket);
1237
99
  assert(mretval == CURLM_OK);
1238
  mretval = curl_multi_setopt(curl_multi_, CURLMOPT_SOCKETDATA,
1239
99
                              static_cast<void *>(this));
1240
99
  assert(mretval == CURLM_OK);
1241
  mretval = curl_multi_setopt(curl_multi_, CURLMOPT_MAX_TOTAL_CONNECTIONS,
1242
99
                              pool_max_handles_);
1243
99
  assert(mretval == CURLM_OK);
1244
1245
99
  prng_.InitLocaltime();
1246
1247
  // Parsing environment variables
1248

99
  if ((getenv("CVMFS_IPV4_ONLY") != NULL) &&
1249
      (strlen(getenv("CVMFS_IPV4_ONLY")) > 0)) {
1250
    opt_ipv4_only_ = true;
1251
  }
1252
1253
99
  watch_fds_ = static_cast<struct pollfd *>(smalloc(2 * sizeof(struct pollfd)));
1254
99
  watch_fds_size_ = 2;
1255
99
  watch_fds_inuse_ = 0;
1256
1257
99
  SetRetryParameters(3, 100, 2000);
1258
1259
99
  resolver_ = dns::CaresResolver::Create(opt_ipv4_only_, 2, 2000);
1260
99
}
1261
1262
99
void S3FanoutManager::Fini() {
1263
99
  if (atomic_xadd32(&multi_threaded_, 0) == 1) {
1264
    // Shutdown I/O thread
1265
99
    thread_upload_run_ = false;
1266
99
    pthread_join(thread_upload_, NULL);
1267
  }
1268
1269
99
  set<CURL *>::iterator             i    = pool_handles_idle_->begin();
1270
99
  const set<CURL *>::const_iterator iEnd = pool_handles_idle_->end();
1271
315
  for (; i != iEnd; ++i) {
1272
216
    curl_easy_cleanup(*i);
1273
  }
1274
1275
99
  set<S3FanOutDnsEntry *>::iterator             is    = sharehandles_->begin();
1276
99
  const set<S3FanOutDnsEntry *>::const_iterator isEnd = sharehandles_->end();
1277
198
  for (; is != isEnd; ++is) {
1278
99
    curl_share_cleanup((*is)->sharehandle);
1279
99
    curl_slist_free_all((*is)->clist);
1280
99
    delete *is;
1281
  }
1282
99
  pool_handles_idle_->clear();
1283
99
  curl_sharehandles_->clear();
1284
99
  sharehandles_->clear();
1285
99
  delete pool_handles_idle_;
1286
99
  delete pool_handles_inuse_;
1287
99
  delete curl_sharehandles_;
1288
99
  delete sharehandles_;
1289
99
  delete user_agent_;
1290
99
  curl_multi_cleanup(curl_multi_);
1291
99
  pool_handles_idle_ = NULL;
1292
99
  pool_handles_inuse_ = NULL;
1293
99
  curl_sharehandles_ = NULL;
1294
99
  sharehandles_ = NULL;
1295
99
  user_agent_ = NULL;
1296
99
  curl_multi_ = NULL;
1297
1298
99
  delete statistics_;
1299
99
  statistics_ = NULL;
1300
1301
99
  delete available_jobs_;
1302
1303
99
  curl_global_cleanup();
1304
99
}
1305
1306
1307
/**
1308
 * Spawns the I/O worker thread.  No way back except Fini(); Init();
1309
 */
1310
99
void S3FanoutManager::Spawn() {
1311
99
  LogCvmfs(kLogS3Fanout, kLogDebug, "S3FanoutManager spawned");
1312
1313
99
  thread_upload_run_ = true;
1314
  int retval = pthread_create(&thread_upload_, NULL, MainUpload,
1315
99
                              static_cast<void *>(this));
1316
99
  assert(retval == 0);
1317
1318
99
  atomic_inc32(&multi_threaded_);
1319
99
}
1320
1321
1322
/**
1323
 * The timeout counts for all sorts of connection phases,
1324
 * DNS, HTTP connect, etc.
1325
 */
1326
99
void S3FanoutManager::SetTimeout(const unsigned seconds) {
1327
99
  pthread_mutex_lock(lock_options_);
1328
99
  opt_timeout_ = seconds;
1329
99
  pthread_mutex_unlock(lock_options_);
1330
99
}
1331
1332
1333
/**
1334
 * Receives the currently active timeout value.
1335
 */
1336
void S3FanoutManager::GetTimeout(unsigned *seconds) {
1337
  pthread_mutex_lock(lock_options_);
1338
  *seconds = opt_timeout_;
1339
  pthread_mutex_unlock(lock_options_);
1340
}
1341
1342
1343
36
const Statistics &S3FanoutManager::GetStatistics() {
1344
36
  return *statistics_;
1345
}
1346
1347
1348
198
void S3FanoutManager::SetRetryParameters(const unsigned max_retries,
1349
                                         const unsigned backoff_init_ms,
1350
                                         const unsigned backoff_max_ms) {
1351
198
  pthread_mutex_lock(lock_options_);
1352
198
  opt_max_retries_ = max_retries;
1353
198
  opt_backoff_init_ms_ = backoff_init_ms;
1354
198
  opt_backoff_max_ms_ = backoff_max_ms;
1355
198
  pthread_mutex_unlock(lock_options_);
1356
198
}
1357
1358
/**
1359
 * Get completed jobs, so they can be cleaned and deleted properly.
1360
 */
1361
2330112653
int S3FanoutManager::PopCompletedJobs(std::vector<s3fanout::JobInfo*> *jobs) {
1362
2330112653
  pthread_mutex_lock(jobs_completed_lock_);
1363
2330112653
  std::vector<JobInfo*>::iterator             it    = jobs_completed_.begin();
1364
2330112653
  const std::vector<JobInfo*>::const_iterator itend = jobs_completed_.end();
1365
2330115619
  for (; it != itend; ++it) {
1366
2966
    jobs->push_back(*it);
1367
  }
1368
2330112653
  jobs_completed_.clear();
1369
2330112653
  pthread_mutex_unlock(jobs_completed_lock_);
1370
1371
2330112653
  return 0;
1372
}
1373
1374
/**
1375
 * Push new job to be uploaded to the S3 cloud storage.
1376
 */
1377
2966
void S3FanoutManager::PushNewJob(JobInfo *info) {
1378
2966
  available_jobs_->Increment();
1379
1380
2966
  pthread_mutex_lock(jobs_todo_lock_);
1381
2966
  jobs_todo_.push_back(info);
1382
2966
  pthread_mutex_unlock(jobs_todo_lock_);
1383
2966
}
1384
1385
1386
/**
1387
 * Performs given job synchronously.
1388
 *
1389
 * @return true if exists, otherwise false
1390
 */
1391
129
bool S3FanoutManager::DoSingleJob(JobInfo *info) const {
1392
129
  bool retme = false;
1393
1394
129
  CURL *handle = AcquireCurlHandle();
1395
129
  if (handle == NULL) {
1396
    LogCvmfs(kLogS3Fanout, kLogStderr, "Failed to acquire CURL handle.");
1397
    assert(handle != NULL);
1398
  }
1399
1400
129
  InitializeRequest(info, handle);
1401
129
  SetUrlOptions(info);
1402
1403
129
  CURLcode resl = curl_easy_perform(handle);
1404

129
  if (resl == CURLE_OK && info->error_code == kFailOk) {
1405
15
    retme = true;
1406
  }
1407
1408
129
  ReleaseCurlHandle(info, handle);
1409
1410
129
  return retme;
1411
}
1412
1413
//------------------------------------------------------------------------------
1414
1415
1416
string Statistics::Print() const {
1417
  return
1418
      "Transferred Bytes:  " +
1419
      StringifyInt(uint64_t(transferred_bytes)) + "\n" +
1420
      "Transfer duration:  " +
1421
      StringifyInt(uint64_t(transfer_time)) + " s\n" +
1422
      "Number of requests: " +
1423
      StringifyInt(num_requests) + "\n" +
1424
      "Number of retries:  " +
1425
      StringifyInt(num_retries) + "\n";
1426
}
1427
1428
}  // namespace s3fanout