GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/upload_gateway.cc
Date: 2024-04-28 02:33:07
Exec Total Coverage
Lines: 37 137 27.0%
Branches: 19 132 14.4%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "upload_gateway.h"
6
7 #include <cassert>
8 #include <limits>
9 #include <vector>
10
11 #include "gateway_util.h"
12 #include "util/exception.h"
13 #include "util/logging.h"
14 #include "util/string.h"
15
16 namespace upload {
17
18 GatewayStreamHandle::GatewayStreamHandle(const CallbackTN* commit_callback,
19 ObjectPack::BucketHandle bkt)
20 : UploadStreamHandle(commit_callback), bucket(bkt) {}
21
22 5 bool GatewayUploader::WillHandle(const SpoolerDefinition& spooler_definition) {
23 5 return spooler_definition.driver_type == SpoolerDefinition::Gateway;
24 }
25
26 4 bool GatewayUploader::ParseSpoolerDefinition(
27 const SpoolerDefinition& spooler_definition,
28 GatewayUploader::Config* config) {
29 4 const std::string& config_string = spooler_definition.spooler_configuration;
30
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 3 times.
4 if (!config) {
31 1 LogCvmfs(kLogUploadGateway, kLogStderr, "\"config\" argument is NULL");
32 1 return false;
33 }
34
35
2/2
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 2 times.
3 if (spooler_definition.session_token_file.empty()) {
36 1 LogCvmfs(kLogUploadGateway, kLogStderr,
37 "Failed to configure gateway uploader. "
38 "Missing session token file.\n");
39 1 return false;
40 }
41 2 config->session_token_file = spooler_definition.session_token_file;
42
43
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
2 if (spooler_definition.key_file.empty()) {
44 LogCvmfs(kLogUploadGateway, kLogStderr,
45 "Failed to configure gateway uploader. "
46 "Missing API key file.\n");
47 return false;
48 }
49 2 config->key_file = spooler_definition.key_file;
50
51 // Repo address, e.g. http://my.repo.address
52 2 config->api_url = config_string;
53
54 2 return true;
55 }
56
57 1 GatewayUploader::GatewayUploader(const SpoolerDefinition& spooler_definition)
58 : AbstractUploader(spooler_definition),
59 1 config_(),
60
2/4
✓ Branch 3 taken 1 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 1 times.
✗ Branch 7 not taken.
1 session_context_(new SessionContext()) {
61
2/4
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 times.
✗ Branch 4 not taken.
1 assert(spooler_definition.IsValid() &&
62 spooler_definition.driver_type == SpoolerDefinition::Gateway);
63
64
2/4
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 1 times.
1 if (!ParseSpoolerDefinition(spooler_definition, &config_)) {
65 PANIC(kLogStderr, "Error in parsing the spooler definition");
66 }
67
68 1 atomic_init32(&num_errors_);
69 1 }
70
71 2 GatewayUploader::~GatewayUploader() {
72
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
2 if (session_context_) {
73
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
2 delete session_context_;
74 }
75 }
76
77 bool GatewayUploader::Create() {
78 LogCvmfs(kLogUploadGateway, kLogStderr,
79 "cannot create repository storage area when using the gateway");
80 return false;
81 }
82
83 1 bool GatewayUploader::Initialize() {
84
2/4
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 1 times.
1 if (!AbstractUploader::Initialize()) {
85 return false;
86 }
87 1 std::string session_token;
88
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 ReadSessionTokenFile(config_.session_token_file, &session_token);
89
90 1 std::string key_id;
91 1 std::string secret;
92
2/4
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 1 times.
1 if (!ReadKey(config_.key_file, &key_id, &secret)) {
93 return false;
94 }
95
96
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 return session_context_->Initialize(config_.api_url, session_token, key_id,
97 1 secret);
98 1 }
99
100 1 bool GatewayUploader::FinalizeSession(bool commit,
101 const std::string& old_root_hash,
102 const std::string& new_root_hash,
103 const RepositoryTag& tag) {
104 1 return session_context_->Finalize(commit, old_root_hash, new_root_hash, tag);
105 }
106
107 void GatewayUploader::WaitForUpload() const {
108 session_context_->WaitForUpload();
109 }
110
111 std::string GatewayUploader::name() const { return "HTTP"; }
112
113 void GatewayUploader::DoRemoveAsync(const std::string& /*file_to_delete*/) {
114 atomic_inc32(&num_errors_);
115 Respond(NULL, UploaderResults());
116 }
117
118 bool GatewayUploader::Peek(const std::string& /*path*/) { return false; }
119
120 // TODO(jpriessn): implement Mkdir on gateway server-side
121 bool GatewayUploader::Mkdir(const std::string &path) {
122 return true;
123 }
124
125 bool GatewayUploader::PlaceBootstrappingShortcut(const shash::Any& /*object*/) {
126 return false;
127 }
128
129 unsigned int GatewayUploader::GetNumberOfErrors() const {
130 return atomic_read32(&num_errors_);
131 }
132
133 void GatewayUploader::DoUpload(const std::string& remote_path,
134 IngestionSource *source,
135 const CallbackTN* callback) {
136 UniquePtr<GatewayStreamHandle> handle(
137 new GatewayStreamHandle(callback, session_context_->NewBucket()));
138
139 bool rvb = source->Open();
140 if (!rvb) {
141 LogCvmfs(kLogUploadGateway, kLogStderr,
142 "File upload - could not open local file.");
143 BumpErrors();
144 Respond(callback, UploaderResults(1, source->GetPath()));
145 return;
146 }
147
148 unsigned char hash_ctx[shash::kMaxContextSize];
149 shash::ContextPtr hash_ctx_ptr(spooler_definition().hash_algorithm, hash_ctx);
150 shash::Init(hash_ctx_ptr);
151 std::vector<char> buf(1024);
152 ssize_t read_bytes = 0;
153 do {
154 read_bytes = source->Read(&buf[0], buf.size());
155 assert(read_bytes >= 0);
156 ObjectPack::AddToBucket(&buf[0], read_bytes, handle->bucket);
157 shash::Update(reinterpret_cast<unsigned char *>(&buf[0]), read_bytes,
158 hash_ctx_ptr);
159 } while (static_cast<size_t>(read_bytes) == buf.size());
160 source->Close();
161 shash::Any content_hash(spooler_definition().hash_algorithm);
162 shash::Final(hash_ctx_ptr, &content_hash);
163
164 if (!session_context_->CommitBucket(ObjectPack::kNamed, content_hash,
165 handle->bucket, remote_path)) {
166 LogCvmfs(kLogUploadGateway, kLogStderr,
167 "File upload - could not commit bucket");
168 BumpErrors();
169 Respond(handle->commit_callback, UploaderResults(2, source->GetPath()));
170 return;
171 }
172
173 Respond(callback, UploaderResults(0, source->GetPath()));
174 }
175
176 UploadStreamHandle* GatewayUploader::InitStreamedUpload(
177 const CallbackTN* callback) {
178 return new GatewayStreamHandle(callback, session_context_->NewBucket());
179 }
180
181 void GatewayUploader::StreamedUpload(UploadStreamHandle* handle,
182 UploadBuffer buffer,
183 const CallbackTN* callback) {
184 GatewayStreamHandle* hd = dynamic_cast<GatewayStreamHandle*>(handle);
185 if (!hd) {
186 LogCvmfs(kLogUploadGateway, kLogStderr,
187 "Streamed upload - incompatible upload handle");
188 BumpErrors();
189 Respond(callback, UploaderResults(UploaderResults::kBufferUpload, 2));
190 return;
191 }
192
193 ObjectPack::AddToBucket(buffer.data, buffer.size, hd->bucket);
194
195 Respond(callback, UploaderResults(UploaderResults::kBufferUpload, 0));
196 }
197
198 void GatewayUploader::FinalizeStreamedUpload(UploadStreamHandle* handle,
199 const shash::Any& content_hash) {
200 GatewayStreamHandle* hd = dynamic_cast<GatewayStreamHandle*>(handle);
201 if (!hd) {
202 LogCvmfs(kLogUploadGateway, kLogStderr,
203 "Finalize streamed upload - incompatible upload handle");
204 BumpErrors();
205 Respond(handle->commit_callback,
206 UploaderResults(UploaderResults::kChunkCommit, 2));
207 return;
208 }
209
210 // hd->remote_path is ignored when empty
211 if (!session_context_->CommitBucket(ObjectPack::kCas, content_hash,
212 hd->bucket, hd->remote_path)) {
213 LogCvmfs(kLogUploadGateway, kLogStderr,
214 "Finalize streamed upload - could not commit bucket");
215 BumpErrors();
216 Respond(handle->commit_callback,
217 UploaderResults(UploaderResults::kChunkCommit, 4));
218 return;
219 }
220 if (!content_hash.HasSuffix()
221 || content_hash.suffix == shash::kSuffixPartial) {
222 CountUploadedChunks();
223 CountUploadedBytes(hd->bucket->size);
224 } else if (content_hash.suffix == shash::kSuffixCatalog) {
225 CountUploadedCatalogs();
226 CountUploadedCatalogBytes(hd->bucket->size);
227 }
228 Respond(handle->commit_callback,
229 UploaderResults(UploaderResults::kChunkCommit, 0));
230 }
231
232 void GatewayUploader::ReadSessionTokenFile(const std::string& token_file_name,
233 std::string* token) {
234 assert(token);
235 *token = "INVALIDTOKEN"; // overwritten if reading from file works
236
237 FILE* token_file = std::fopen(token_file_name.c_str(), "r");
238 if (!token_file) {
239 LogCvmfs(kLogUploadGateway, kLogStderr,
240 "HTTP Uploader - Could not open session token file.");
241 return;
242 }
243
244 GetLineFile(token_file, token);
245 fclose(token_file);
246 }
247
248 bool GatewayUploader::ReadKey(const std::string& key_file, std::string* key_id,
249 std::string* secret) {
250 return gateway::ReadKeys(key_file, key_id, secret);
251 }
252
253 int64_t GatewayUploader::DoGetObjectSize(const std::string &file_name) {
254 return -EOPNOTSUPP;
255 }
256
257 void GatewayUploader::BumpErrors() const { atomic_inc32(&num_errors_); }
258
259 } // namespace upload
260