GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/upload_gateway.cc Lines: 34 114 29.8 %
Date: 2019-02-03 02:48:13 Branches: 14 52 26.9 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#include "upload_gateway.h"
6
7
#include <limits>
8
#include <vector>
9
10
#include "gateway_util.h"
11
#include "util/string.h"
12
13
namespace upload {
14
15
GatewayStreamHandle::GatewayStreamHandle(const CallbackTN* commit_callback,
16
                                         ObjectPack::BucketHandle bkt)
17
    : UploadStreamHandle(commit_callback), bucket(bkt) {}
18
19
6
bool GatewayUploader::WillHandle(const SpoolerDefinition& spooler_definition) {
20
6
  return spooler_definition.driver_type == SpoolerDefinition::Gateway;
21
}
22
23
7
bool GatewayUploader::ParseSpoolerDefinition(
24
    const SpoolerDefinition& spooler_definition,
25
    GatewayUploader::Config* config) {
26
7
  const std::string& config_string = spooler_definition.spooler_configuration;
27
7
  if (!config) {
28
1
    LogCvmfs(kLogUploadGateway, kLogStderr, "\"config\" argument is NULL");
29
1
    return false;
30
  }
31
32
6
  if (spooler_definition.session_token_file.empty()) {
33
    LogCvmfs(kLogUploadGateway, kLogStderr,
34
             "Failed to configure HTTP uploader. "
35
1
             "Missing session token file.\n");
36
1
    return false;
37
  }
38
5
  config->session_token_file = spooler_definition.session_token_file;
39
40
5
  if (spooler_definition.key_file.empty()) {
41
    LogCvmfs(kLogUploadGateway, kLogStderr,
42
             "Failed to configure HTTP uploader. "
43
             "Missing HTTP API key file.\n");
44
    return false;
45
  }
46
5
  config->key_file = spooler_definition.key_file;
47
48
  // Repo address, e.g. http://my.repo.address
49
5
  config->api_url = config_string;
50
51
5
  return true;
52
}
53
54
4
GatewayUploader::GatewayUploader(const SpoolerDefinition& spooler_definition)
55
    : AbstractUploader(spooler_definition),
56
      config_(),
57
4
      session_context_(new SessionContext()) {
58
  assert(spooler_definition.IsValid() &&
59

4
         spooler_definition.driver_type == SpoolerDefinition::Gateway);
60
61
4
  if (!ParseSpoolerDefinition(spooler_definition, &config_)) {
62
    abort();
63
  }
64
65
4
  atomic_init32(&num_errors_);
66
4
}
67
68
4
GatewayUploader::~GatewayUploader() {
69
4
  if (session_context_) {
70
4
    delete session_context_;
71
  }
72
}
73
74
4
bool GatewayUploader::Initialize() {
75
4
  if (!AbstractUploader::Initialize()) {
76
    return false;
77
  }
78
4
  std::string session_token;
79
4
  if (!ReadSessionTokenFile(config_.session_token_file, &session_token)) {
80
    return false;
81
  }
82
83
4
  std::string key_id;
84
4
  std::string secret;
85
4
  if (!ReadKey(config_.key_file, &key_id, &secret)) {
86
    return false;
87
  }
88
89
  return session_context_->Initialize(config_.api_url, session_token, key_id,
90
4
                                      secret);
91
}
92
93
4
bool GatewayUploader::FinalizeSession(bool commit,
94
                                      const std::string& old_root_hash,
95
                                      const std::string& new_root_hash,
96
                                      const RepositoryTag& tag) {
97
4
  return session_context_->Finalize(commit, old_root_hash, new_root_hash, tag);
98
}
99
100
void GatewayUploader::WaitForUpload() const {
101
  session_context_->WaitForUpload();
102
}
103
104
std::string GatewayUploader::name() const { return "HTTP"; }
105
106
void GatewayUploader::DoRemoveAsync(const std::string& /*file_to_delete*/) {
107
  atomic_inc32(&num_errors_);
108
  Respond(NULL, UploaderResults());
109
}
110
111
bool GatewayUploader::Peek(const std::string& /*path*/) const { return false; }
112
113
bool GatewayUploader::PlaceBootstrappingShortcut(
114
    const shash::Any& /*object*/) const {
115
  return false;
116
}
117
118
unsigned int GatewayUploader::GetNumberOfErrors() const {
119
  return atomic_read32(&num_errors_);
120
}
121
122
void GatewayUploader::FileUpload(const std::string& local_path,
123
                                 const std::string& remote_path,
124
                                 const CallbackTN* callback) {
125
  UniquePtr<GatewayStreamHandle> handle(
126
      new GatewayStreamHandle(callback, session_context_->NewBucket()));
127
128
  FILE* local_file = fopen(local_path.c_str(), "rb");
129
  if (!local_file) {
130
    LogCvmfs(kLogUploadGateway, kLogStderr,
131
             "File upload - could not open local file.");
132
    BumpErrors();
133
    Respond(callback, UploaderResults(1, local_path));
134
    return;
135
  }
136
137
  std::vector<char> buf(1024);
138
  size_t read_bytes = 0;
139
  do {
140
    read_bytes = fread(&buf[0], buf.size(), 1, local_file);
141
    ObjectPack::AddToBucket(&buf[0], buf.size(), handle->bucket);
142
  } while (read_bytes == buf.size());
143
  fclose(local_file);
144
145
  shash::Any content_hash(spooler_definition().hash_algorithm);
146
  shash::HashFile(local_path, &content_hash);
147
  if (!session_context_->CommitBucket(ObjectPack::kNamed, content_hash,
148
                                      handle->bucket, remote_path)) {
149
    LogCvmfs(kLogUploadGateway, kLogStderr,
150
             "File upload - could not commit bucket");
151
    BumpErrors();
152
    Respond(handle->commit_callback, UploaderResults(2, local_path));
153
    return;
154
  }
155
156
  CountUploadedBytes(handle->bucket->size);
157
  Respond(callback, UploaderResults(0, local_path));
158
}
159
160
UploadStreamHandle* GatewayUploader::InitStreamedUpload(
161
    const CallbackTN* callback) {
162
  return new GatewayStreamHandle(callback, session_context_->NewBucket());
163
}
164
165
void GatewayUploader::StreamedUpload(UploadStreamHandle* handle,
166
                                     UploadBuffer buffer,
167
                                     const CallbackTN* callback) {
168
  GatewayStreamHandle* hd = dynamic_cast<GatewayStreamHandle*>(handle);
169
  if (!hd) {
170
    LogCvmfs(kLogUploadGateway, kLogStderr,
171
             "Streamed upload - incompatible upload handle");
172
    BumpErrors();
173
    Respond(callback, UploaderResults(UploaderResults::kBufferUpload, 2));
174
    return;
175
  }
176
177
  ObjectPack::AddToBucket(buffer.data, buffer.size, hd->bucket);
178
179
  Respond(callback, UploaderResults(UploaderResults::kBufferUpload, 0));
180
}
181
182
void GatewayUploader::FinalizeStreamedUpload(UploadStreamHandle* handle,
183
                                             const shash::Any& content_hash) {
184
  GatewayStreamHandle* hd = dynamic_cast<GatewayStreamHandle*>(handle);
185
  if (!hd) {
186
    LogCvmfs(kLogUploadGateway, kLogStderr,
187
             "Finalize streamed upload - incompatible upload handle");
188
    BumpErrors();
189
    Respond(handle->commit_callback,
190
            UploaderResults(UploaderResults::kChunkCommit, 2));
191
    return;
192
  }
193
194
  if (!session_context_->CommitBucket(ObjectPack::kCas, content_hash,
195
                                      hd->bucket, "")) {
196
    LogCvmfs(kLogUploadGateway, kLogStderr,
197
             "Finalize streamed upload - could not commit bucket");
198
    BumpErrors();
199
    Respond(handle->commit_callback,
200
            UploaderResults(UploaderResults::kChunkCommit, 4));
201
    return;
202
  }
203
204
  CountUploadedBytes(hd->bucket->size);
205
  Respond(handle->commit_callback,
206
          UploaderResults(UploaderResults::kChunkCommit, 0));
207
}
208
209
bool GatewayUploader::ReadSessionTokenFile(const std::string& token_file_name,
210
                                           std::string* token) {
211
  if (!token) {
212
    return false;
213
  }
214
215
  FILE* token_file = std::fopen(token_file_name.c_str(), "r");
216
  if (!token_file) {
217
    LogCvmfs(kLogUploadGateway, kLogStderr,
218
             "HTTP Uploader - Could not open session token "
219
             "file. Aborting.");
220
    return false;
221
  }
222
223
  bool ret = GetLineFile(token_file, token);
224
  fclose(token_file);
225
226
  return ret;
227
}
228
229
bool GatewayUploader::ReadKey(const std::string& key_file, std::string* key_id,
230
                              std::string* secret) {
231
  return gateway::ReadKeys(key_file, key_id, secret);
232
}
233
234
int64_t GatewayUploader::DoGetObjectSize(const std::string &file_name) {
235
  return -EOPNOTSUPP;
236
}
237
238
void GatewayUploader::BumpErrors() const { atomic_inc32(&num_errors_); }
239
240
}  // namespace upload