GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/upload_s3.cc Lines: 128 173 74.0 %
Date: 2019-02-03 02:48:13 Branches: 46 108 42.6 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#include "upload_s3.h"
6
7
#include <errno.h>
8
#include <fcntl.h>
9
#include <inttypes.h>
10
#ifdef _POSIX_PRIORITY_SCHEDULING
11
#include <sched.h>
12
#endif
13
#include <unistd.h>
14
15
#include <string>
16
#include <vector>
17
18
#include "compression.h"
19
#include "logging.h"
20
#include "options.h"
21
#include "s3fanout.h"
22
#include "util/posix.h"
23
#include "util/string.h"
24
25
namespace upload {
26
27
99
S3Uploader::S3Uploader(const SpoolerDefinition &spooler_definition)
28
  : AbstractUploader(spooler_definition)
29
  , dns_buckets_(true)
30
  , num_parallel_uploads_(kDefaultNumParallelUploads)
31
  , num_retries_(kDefaultNumRetries)
32
  , timeout_sec_(kDefaultTimeoutSec)
33
  , authz_method_(s3fanout::kAuthzAwsV2)
34
  , peek_before_put_(true)
35
99
  , temporary_path_(spooler_definition.temporary_path)
36
{
37
  assert(spooler_definition.IsValid() &&
38

99
         spooler_definition.driver_type == SpoolerDefinition::S3);
39
40
99
  atomic_init32(&io_errors_);
41
99
  atomic_init32(&terminate_);
42
43
99
  if (!ParseSpoolerDefinition(spooler_definition)) {
44
    abort();
45
  }
46
47
99
  s3fanout_mgr_.Init(num_parallel_uploads_, dns_buckets_);
48
99
  s3fanout_mgr_.SetTimeout(timeout_sec_);
49
  s3fanout_mgr_.SetRetryParameters(
50
99
    num_retries_, kDefaultBackoffInitMs, kDefaultBackoffMaxMs);
51
99
  s3fanout_mgr_.Spawn();
52
53
  int retval = pthread_create(
54
99
    &thread_collect_results_, NULL, MainCollectResults, this);
55
99
  assert(retval == 0);
56
}
57
58
59
198
S3Uploader::~S3Uploader() {
60
99
  s3fanout_mgr_.Fini();
61
99
  atomic_inc32(&terminate_);
62
99
  pthread_join(thread_collect_results_, NULL);
63

99
}
64
65
66
99
bool S3Uploader::ParseSpoolerDefinition(
67
  const SpoolerDefinition &spooler_definition)
68
{
69
  const std::vector<std::string> config =
70
99
      SplitString(spooler_definition.spooler_configuration, '@');
71
99
  if (config.size() != 2) {
72
    LogCvmfs(kLogUploadS3, kLogStderr,
73
             "Failed to parse spooler configuration string '%s'.\n"
74
             "Provide: <repo_alias>@/path/to/s3.conf",
75
             spooler_definition.spooler_configuration.c_str());
76
    return false;
77
  }
78
99
  repository_alias_              = config[0];
79
99
  const std::string &config_path = config[1];
80
81
99
  if (!FileExists(config_path)) {
82
    LogCvmfs(kLogUploadS3, kLogStderr,
83
             "Cannot find S3 config file at '%s'",
84
             config_path.c_str());
85
    return false;
86
  }
87
88
  // Parse S3 configuration
89
  BashOptionsManager options_manager = BashOptionsManager(
90
99
    new DefaultOptionsTemplateManager(repository_alias_));
91
99
  options_manager.ParsePath(config_path, false);
92
99
  std::string parameter;
93
94

99
  if (!options_manager.GetValue("CVMFS_S3_HOST", &host_name_)) {
95
    LogCvmfs(kLogUploadS3, kLogStderr,
96
             "Failed to parse CVMFS_S3_HOST from '%s'",
97
             config_path.c_str());
98
    return false;
99
  }
100

99
  if (options_manager.GetValue("CVMFS_S3_PORT", &parameter)) {
101
99
    host_name_port_ = host_name_ + ":" + parameter;
102
  } else {
103
    host_name_port_ = host_name_ + ":" + StringifyInt(kDefaultPort);
104
  }
105
106

99
  if (!options_manager.GetValue("CVMFS_S3_ACCESS_KEY", &access_key_)) {
107
    LogCvmfs(kLogUploadS3, kLogStderr,
108
             "Failed to parse CVMFS_S3_ACCESS_KEY from '%s'.",
109
             config_path.c_str());
110
    return false;
111
  }
112

99
  if (!options_manager.GetValue("CVMFS_S3_SECRET_KEY", &secret_key_)) {
113
    LogCvmfs(kLogUploadS3, kLogStderr,
114
             "Failed to parse CVMFS_S3_SECRET_KEY from '%s'.",
115
             config_path.c_str());
116
    return false;
117
  }
118

99
  if (!options_manager.GetValue("CVMFS_S3_BUCKET", &bucket_)) {
119
    LogCvmfs(kLogUploadS3, kLogStderr,
120
             "Failed to parse CVMFS_S3_BUCKET from '%s'.",
121
             config_path.c_str());
122
    return false;
123
  }
124

99
  if (options_manager.GetValue("CVMFS_S3_DNS_BUCKETS", &parameter)) {
125
99
    if (parameter == "false") {
126
99
      dns_buckets_ = false;
127
    }
128
  }
129

99
  if (options_manager.GetValue("CVMFS_S3_MAX_NUMBER_OF_PARALLEL_CONNECTIONS",
130
                               &parameter))
131
  {
132
99
    num_parallel_uploads_ = String2Uint64(parameter);
133
  }
134

99
  if (options_manager.GetValue("CVMFS_S3_MAX_RETRIES", &parameter)) {
135
    num_retries_ = String2Uint64(parameter);
136
  }
137

99
  if (options_manager.GetValue("CVMFS_S3_TIMEOUT", &parameter)) {
138
    timeout_sec_ = String2Uint64(parameter);
139
  }
140

99
  if (options_manager.GetValue("CVMFS_S3_REGION", &region_)) {
141
    authz_method_ = s3fanout::kAuthzAwsV4;
142
  }
143

99
  if (options_manager.GetValue("CVMFS_S3_PEEK_BEFORE_PUT", &parameter)) {
144
    peek_before_put_ = options_manager.IsOn(parameter);
145
  }
146
147
99
  return true;
148
}
149
150
151
105
bool S3Uploader::WillHandle(const SpoolerDefinition &spooler_definition) {
152
105
  return spooler_definition.driver_type == SpoolerDefinition::S3;
153
}
154
155
156
5
unsigned int S3Uploader::GetNumberOfErrors() const {
157
5
  return atomic_read32(&io_errors_);
158
}
159
160
161
/**
162
 * Worker thread takes care of requesting new jobs and cleaning old ones.
163
 */
164
99
void *S3Uploader::MainCollectResults(void *data) {
165
99
  LogCvmfs(kLogUploadS3, kLogDebug, "Upload_S3 WorkerThread started.");
166
99
  S3Uploader *uploader = reinterpret_cast<S3Uploader *>(data);
167
168
99
  std::vector<s3fanout::JobInfo *> jobs;
169
2330112851
  while (atomic_read32(&uploader->terminate_) == 0) {
170
2330112653
    jobs.clear();
171
2330112653
    uploader->s3fanout_mgr_.PopCompletedJobs(&jobs);
172
2330115619
    for (unsigned i = 0; i < jobs.size(); ++i) {
173
      // Report completed job
174
2966
      s3fanout::JobInfo *info = jobs[i];
175
2966
      int reply_code = 0;
176
2966
      if (info->error_code != s3fanout::kFailOk) {
177
        LogCvmfs(kLogUploadS3, kLogStderr,
178
                 "Upload job for '%s' failed. (error code: %d - %s)",
179
                 info->object_key.c_str(),
180
                 info->error_code,
181
                 s3fanout::Code2Ascii(info->error_code));
182
        reply_code = 99;
183
        atomic_inc32(&uploader->io_errors_);
184
      }
185
2966
      if (info->request == s3fanout::JobInfo::kReqDelete) {
186
5
        uploader->Respond(NULL, UploaderResults());
187
      } else {
188
2961
        if (info->request == s3fanout::JobInfo::kReqHead) {
189
          // The HEAD request was not transformed into a PUT request, thus this
190
          // was a duplicate
191
          uploader->CountDuplicates();
192
        }
193
2961
        if (info->origin == s3fanout::kOriginMem) {
194
          uploader->Respond(static_cast<CallbackTN*>(info->callback),
195
                            UploaderResults(UploaderResults::kChunkCommit,
196
907
                                            reply_code));
197
        } else {
198
          uploader->Respond(static_cast<CallbackTN*>(info->callback),
199
2054
                            UploaderResults(reply_code, info->origin_path));
200
        }
201
202
2961
        assert(info->mmf == NULL);
203
2961
        assert(info->origin_file == NULL);
204
      }
205
    }
206
#ifdef _POSIX_PRIORITY_SCHEDULING
207
2330112653
    sched_yield();
208
#endif
209
  }
210
211
99
  LogCvmfs(kLogUploadS3, kLogDebug, "Upload_S3 WorkerThread finished.");
212
99
  return NULL;
213
}
214
215
216
2054
void S3Uploader::FileUpload(
217
  const std::string &local_path,
218
  const std::string &remote_path,
219
  const CallbackTN  *callback
220
) {
221
  s3fanout::JobInfo *info =
222
    new s3fanout::JobInfo(access_key_,
223
                          secret_key_,
224
                          authz_method_,
225
                          host_name_port_,
226
                          region_,
227
                          bucket_,
228
                          repository_alias_ + "/" + remote_path,
229
                          const_cast<void*>(
230
                              static_cast<void const*>(callback)),
231
2054
                          local_path);
232
233

2054
  if (HasPrefix(remote_path, ".cvmfs", false /*ignore_case*/)) {
234
    info->request = s3fanout::JobInfo::kReqPutDotCvmfs;
235
  } else {
236
2054
    if (peek_before_put_)
237
2054
      info->request = s3fanout::JobInfo::kReqHead;
238
  }
239
240
2054
  UploadJobInfo(info);
241
  LogCvmfs(kLogUploadS3, kLogDebug, "Uploading from file finished: %s",
242
2054
           local_path.c_str());
243
2054
  CountUploadedBytes(GetFileSize(local_path));
244
2054
}
245
246
247
2961
void S3Uploader::UploadJobInfo(s3fanout::JobInfo *info) {
248
  LogCvmfs(kLogUploadS3, kLogDebug,
249
           "Uploading from %s:\n"
250
           "--> Object: '%s'\n"
251
           "--> Bucket: '%s'\n"
252
           "--> Host:   '%s'\n",
253
           info->origin_mem.data != NULL ? "buffer" : "file",
254
           info->object_key.c_str(),
255
           info->bucket.c_str(),
256
2961
           info->hostname.c_str());
257
258
2961
  s3fanout_mgr_.PushNewJob(info);
259
2961
}
260
261
262
907
UploadStreamHandle *S3Uploader::InitStreamedUpload(const CallbackTN *callback) {
263
907
  std::string tmp_path;
264
907
  const int tmp_fd = CreateAndOpenTemporaryChunkFile(&tmp_path);
265
266
907
  LogCvmfs(kLogUploadS3, kLogDebug, "InitStreamedUpload: %s", tmp_path.c_str());
267
268
907
  if (tmp_fd < 0) {
269
    LogCvmfs(kLogUploadS3, kLogStderr, "Failed to open file (%d), %s",
270
             errno, strerror(errno));
271
    atomic_inc32(&io_errors_);
272
273
    return NULL;
274
  }
275
276
907
  return new S3StreamHandle(callback, tmp_fd, tmp_path);
277
}
278
279
280
7198
void S3Uploader::StreamedUpload(
281
  UploadStreamHandle  *handle,
282
  UploadBuffer        buffer,
283
  const CallbackTN    *callback)
284
{
285
7198
  S3StreamHandle *local_handle = static_cast<S3StreamHandle*>(handle);
286
287
  LogCvmfs(kLogUploadS3, kLogDebug, "Upload target = %s",
288
7198
           local_handle->temporary_path.c_str());
289
290
7198
  if (!SafeWrite(local_handle->file_descriptor, buffer.data, buffer.size)) {
291
    const int cpy_errno = errno;
292
    LogCvmfs(kLogUploadS3, kLogStderr,
293
             "failed to write %d bytes to '%s' (errno: %d)",
294
             buffer.size,
295
             local_handle->temporary_path.c_str(),
296
             cpy_errno);
297
    atomic_inc32(&io_errors_);
298
    Respond(callback,
299
            UploaderResults(UploaderResults::kBufferUpload, cpy_errno));
300
    return;
301
  }
302
7198
  CountUploadedBytes(buffer.size);
303
7198
  Respond(callback, UploaderResults(UploaderResults::kBufferUpload, 0));
304
}
305
306
307
907
void S3Uploader::FinalizeStreamedUpload(
308
  UploadStreamHandle  *handle,
309
  const shash::Any    &content_hash)
310
{
311
907
  int retval = 0;
312
907
  S3StreamHandle *local_handle = static_cast<S3StreamHandle*>(handle);
313
314
907
  retval = close(local_handle->file_descriptor);
315
907
  if (retval != 0) {
316
    const int cpy_errno = errno;
317
    LogCvmfs(kLogUploadS3, kLogStderr,
318
             "failed to close temp file '%s' (errno: %d)",
319
             local_handle->temporary_path.c_str(), cpy_errno);
320
    atomic_inc32(&io_errors_);
321
    Respond(handle->commit_callback,
322
            UploaderResults(UploaderResults::kChunkCommit, cpy_errno));
323
    return;
324
  }
325
326
  // Open the file for reading
327
907
  MemoryMappedFile *mmf = new MemoryMappedFile(local_handle->temporary_path);
328
907
  if (!mmf->Map()) {
329
    LogCvmfs(kLogUploadS3, kLogStderr, "Failed to upload %s",
330
             local_handle->temporary_path.c_str());
331
    delete mmf;
332
    atomic_inc32(&io_errors_);
333
    Respond(handle->commit_callback,
334
            UploaderResults(100, local_handle->temporary_path));
335
    return;
336
  }
337
338
  // New file name based on content hash
339
  std::string final_path(
340
907
    repository_alias_ + "/data/" + content_hash.MakePath());
341
342
  s3fanout::JobInfo *info =
343
      new s3fanout::JobInfo(access_key_,
344
                            secret_key_,
345
                            authz_method_,
346
                            host_name_port_,
347
                            region_,
348
                            bucket_,
349
                            final_path,
350
                            const_cast<void*>(
351
                                static_cast<void const*>(
352
                                    handle->commit_callback)),
353
                            mmf,
354
                            reinterpret_cast<unsigned char *>(mmf->buffer()),
355
907
                            static_cast<size_t>(mmf->size()));
356
907
  assert(info != NULL);
357
358
907
  if (peek_before_put_)
359
907
      info->request = s3fanout::JobInfo::kReqHead;
360
907
  UploadJobInfo(info);
361
362
  LogCvmfs(kLogUploadS3, kLogDebug, "Uploading from stream finished: %s",
363
907
           local_handle->temporary_path.c_str());
364
365
  // Remove the temporary file
366
907
  retval = unlink(local_handle->temporary_path.c_str());
367
907
  assert(retval == 0);
368
907
  delete local_handle;
369
}
370
371
372
134
s3fanout::JobInfo *S3Uploader::CreateJobInfo(const std::string& path) const {
373
  return new s3fanout::JobInfo(access_key_,
374
                               secret_key_,
375
                               authz_method_,
376
                               host_name_port_,
377
                               region_,
378
                               bucket_,
379
                               path,
380
                               NULL,
381
                               NULL,
382
                               NULL,
383
134
                               0);
384
}
385
386
387
5
void S3Uploader::DoRemoveAsync(const std::string& file_to_delete) {
388
5
  const std::string mangled_path = repository_alias_ + "/" + file_to_delete;
389
5
  s3fanout::JobInfo *info = CreateJobInfo(mangled_path);
390
391
5
  info->request = s3fanout::JobInfo::kReqDelete;
392
393
  LogCvmfs(kLogUploadS3, kLogDebug, "Asynchronously removing %s/%s",
394
5
           info->bucket.c_str(), info->object_key.c_str());
395
5
  s3fanout_mgr_.PushNewJob(info);
396
5
}
397
398
399
// TODO(jblomer): this routine does not what one might think it does --
400
// peek + upload is currently handled in the S3 fanout manager by secretly
401
// changing a HEAD request into a PUT request.  See CVM-1669
402
129
bool S3Uploader::Peek(const std::string& path) const {
403
129
  const std::string mangled_path = repository_alias_ + "/" + path;
404
129
  s3fanout::JobInfo *info = CreateJobInfo(mangled_path);
405
406
129
  info->request = s3fanout::JobInfo::kReqHead;
407
129
  bool retme = s3fanout_mgr_.DoSingleJob(info);
408
129
  if (retme) {
409
15
    CountDuplicates();
410
  }
411
412
129
  delete info;
413
129
  return retme;
414
}
415
416
417
bool S3Uploader::PlaceBootstrappingShortcut(const shash::Any &object) const {
418
  return false;  // TODO(rmeusel): implement
419
}
420
421
422
int64_t S3Uploader::DoGetObjectSize(const std::string &file_name) {
423
  // TODO(dosarudaniel): use a head request for byte count
424
  // Re-enable 661 integration test when done
425
  return -EOPNOTSUPP;
426
}
427
428
}  // namespace upload