GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/upload_facility.cc Lines: 47 59 79.7 %
Date: 2019-02-03 02:48:13 Branches: 10 17 58.8 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#include "upload_facility.h"
6
7
#include <cassert>
8
9
#include "upload_gateway.h"
10
#include "upload_local.h"
11
#include "upload_s3.h"
12
13
namespace upload {
14
15
atomic_int64 UploadStreamHandle::g_upload_stream_tag = 0;
16
17
585016
AbstractUploader::UploadJob::UploadJob(
18
  UploadStreamHandle *handle,
19
  UploadBuffer buffer,
20
  const CallbackTN *callback)
21
  : type(Upload)
22
  , stream_handle(handle)
23
  , tag_(handle->tag)
24
  , buffer(buffer)
25
585016
  , callback(callback)
26
585016
{ }
27
28
2824
AbstractUploader::UploadJob::UploadJob(
29
  UploadStreamHandle *handle,
30
  const shash::Any &content_hash)
31
  : type(Commit)
32
  , stream_handle(handle)
33
  , tag_(handle->tag)
34
  , buffer()
35
  , callback(NULL)
36
2824
  , content_hash(content_hash)
37
2824
{ }
38
39
109
void AbstractUploader::RegisterPlugins() {
40
109
  RegisterPlugin<LocalUploader>();
41
109
  RegisterPlugin<S3Uploader>();
42
109
  RegisterPlugin<GatewayUploader>();
43
109
}
44
45
231
AbstractUploader::AbstractUploader(const SpoolerDefinition &spooler_definition)
46
  : spooler_definition_(spooler_definition)
47
  , num_upload_tasks_(spooler_definition.num_upload_tasks)
48
231
  , jobs_in_flight_(spooler_definition.number_of_concurrent_uploads)
49
{ }
50
51
52
231
bool AbstractUploader::Initialize() {
53
572
  for (unsigned i = 0; i < GetNumTasks(); ++i) {
54
341
    Tube<UploadJob> *t = new Tube<UploadJob>();
55
341
    tubes_upload_.TakeTube(t);
56
341
    tasks_upload_.TakeConsumer(new TaskUpload(this, t));
57
  }
58
231
  tubes_upload_.Activate();
59
231
  tasks_upload_.Spawn();
60
231
  return true;
61
}
62
63
bool AbstractUploader::FinalizeSession(bool /*commit*/,
64
                                       const std::string & /*old_root_hash*/,
65
                                       const std::string & /*new_root_hash*/,
66
                                       const RepositoryTag & /*tag*/) {
67
  return true;
68
}
69
70
71
1023
int AbstractUploader::CreateAndOpenTemporaryChunkFile(std::string *path) const {
72
  const std::string tmp_path =
73
1023
      CreateTempPath(spooler_definition_.temporary_path + "/" + "chunk", 0644);
74
1023
  if (tmp_path.empty()) {
75
    LogCvmfs(kLogSpooler, kLogStderr,
76
             "Failed to create temp file for upload of file chunk (errno: %d).",
77
             errno);
78
    return -1;
79
  }
80
81
1023
  const int tmp_fd = open(tmp_path.c_str(), O_WRONLY);
82
1023
  if (tmp_fd < 0) {
83
    LogCvmfs(kLogSpooler, kLogStderr,
84
             "Failed to open temp file '%s' for upload of file chunk "
85
             "(errno: %d)",
86
             tmp_path.c_str(), errno);
87
    unlink(tmp_path.c_str());
88
  } else {
89
1023
    *path = tmp_path;
90
  }
91
92
1023
  return tmp_fd;
93
}
94
95
231
void AbstractUploader::TearDown() {
96
231
  tasks_upload_.Terminate();
97
231
}
98
99
149
void AbstractUploader::WaitForUpload() const { jobs_in_flight_.WaitForZero(); }
100
101
void AbstractUploader::InitCounters(perf::StatisticsTemplate *statistics) {
102
  counters_ = new UploadCounters(*statistics);
103
}
104
105
9878
void AbstractUploader::CountUploadedBytes(int64_t bytes_written) const {
106
9878
  if (counters_.IsValid()) {
107
    perf::Xadd(counters_->sz_uploaded_bytes, bytes_written);
108
  }
109
9878
}
110
111
17
void AbstractUploader::CountDuplicates() const {
112
17
  if (counters_.IsValid()) {
113
    perf::Inc(counters_->n_duplicated_files);
114
  }
115
17
}
116
117
//------------------------------------------------------------------------------
118
119
120
587840
void TaskUpload::Process(AbstractUploader::UploadJob *upload_job) {
121
587840
  switch (upload_job->type) {
122
    case AbstractUploader::UploadJob::Upload:
123
      uploader_->StreamedUpload(
124
585016
        upload_job->stream_handle, upload_job->buffer, upload_job->callback);
125
585016
      break;
126
127
    case AbstractUploader::UploadJob::Commit:
128
      uploader_->FinalizeStreamedUpload(
129
2824
        upload_job->stream_handle, upload_job->content_hash);
130
2824
      break;
131
132
    default:
133
      abort();
134
  }
135
136
587840
  delete upload_job;
137
587840
}
138
139

45
}  // namespace upload