Directory: | cvmfs/ |
---|---|
File: | cvmfs/upload_gateway.cc |
Date: | 2025-03-09 02:34:28 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 37 | 137 | 27.0% |
Branches: | 19 | 132 | 14.4% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #include "upload_gateway.h" | ||
6 | |||
7 | #include <cassert> | ||
8 | #include <limits> | ||
9 | #include <vector> | ||
10 | |||
11 | #include "gateway_util.h" | ||
12 | #include "util/exception.h" | ||
13 | #include "util/logging.h" | ||
14 | #include "util/string.h" | ||
15 | |||
16 | namespace upload { | ||
17 | |||
18 | ✗ | GatewayStreamHandle::GatewayStreamHandle(const CallbackTN* commit_callback, | |
19 | ✗ | ObjectPack::BucketHandle bkt) | |
20 | ✗ | : UploadStreamHandle(commit_callback), bucket(bkt) {} | |
21 | |||
22 | 3 | bool GatewayUploader::WillHandle(const SpoolerDefinition& spooler_definition) { | |
23 | 3 | return spooler_definition.driver_type == SpoolerDefinition::Gateway; | |
24 | } | ||
25 | |||
26 | 4 | bool GatewayUploader::ParseSpoolerDefinition( | |
27 | const SpoolerDefinition& spooler_definition, | ||
28 | GatewayUploader::Config* config) { | ||
29 | 4 | const std::string& config_string = spooler_definition.spooler_configuration; | |
30 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 3 times.
|
4 | if (!config) { |
31 | 1 | LogCvmfs(kLogUploadGateway, kLogStderr, "\"config\" argument is NULL"); | |
32 | 1 | return false; | |
33 | } | ||
34 | |||
35 |
2/2✓ Branch 1 taken 1 times.
✓ Branch 2 taken 2 times.
|
3 | if (spooler_definition.session_token_file.empty()) { |
36 | 1 | LogCvmfs(kLogUploadGateway, kLogStderr, | |
37 | "Failed to configure gateway uploader. " | ||
38 | "Missing session token file.\n"); | ||
39 | 1 | return false; | |
40 | } | ||
41 | 2 | config->session_token_file = spooler_definition.session_token_file; | |
42 | |||
43 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
|
2 | if (spooler_definition.key_file.empty()) { |
44 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
45 | "Failed to configure gateway uploader. " | ||
46 | "Missing API key file.\n"); | ||
47 | ✗ | return false; | |
48 | } | ||
49 | 2 | config->key_file = spooler_definition.key_file; | |
50 | |||
51 | // Repo address, e.g. http://my.repo.address | ||
52 | 2 | config->api_url = config_string; | |
53 | |||
54 | 2 | return true; | |
55 | } | ||
56 | |||
57 | 1 | GatewayUploader::GatewayUploader(const SpoolerDefinition& spooler_definition) | |
58 | : AbstractUploader(spooler_definition), | ||
59 | 1 | config_(), | |
60 |
2/4✓ Branch 3 taken 1 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 1 times.
✗ Branch 7 not taken.
|
1 | session_context_(new SessionContext()) { |
61 |
2/4✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 times.
✗ Branch 4 not taken.
|
1 | assert(spooler_definition.IsValid() && |
62 | spooler_definition.driver_type == SpoolerDefinition::Gateway); | ||
63 | |||
64 |
2/4✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 1 times.
|
1 | if (!ParseSpoolerDefinition(spooler_definition, &config_)) { |
65 | ✗ | PANIC(kLogStderr, "Error in parsing the spooler definition"); | |
66 | } | ||
67 | |||
68 | 1 | atomic_init32(&num_errors_); | |
69 | 1 | } | |
70 | |||
71 | 2 | GatewayUploader::~GatewayUploader() { | |
72 |
1/2✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
|
2 | if (session_context_) { |
73 |
1/2✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
|
2 | delete session_context_; |
74 | } | ||
75 | } | ||
76 | |||
77 | ✗ | bool GatewayUploader::Create() { | |
78 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
79 | "cannot create repository storage area when using the gateway"); | ||
80 | ✗ | return false; | |
81 | } | ||
82 | |||
83 | 1 | bool GatewayUploader::Initialize() { | |
84 |
2/4✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 1 times.
|
1 | if (!AbstractUploader::Initialize()) { |
85 | ✗ | return false; | |
86 | } | ||
87 | 1 | std::string session_token; | |
88 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | ReadSessionTokenFile(config_.session_token_file, &session_token); |
89 | |||
90 | 1 | std::string key_id; | |
91 | 1 | std::string secret; | |
92 |
2/4✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 1 times.
|
1 | if (!ReadKey(config_.key_file, &key_id, &secret)) { |
93 | ✗ | return false; | |
94 | } | ||
95 | |||
96 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | return session_context_->Initialize(config_.api_url, session_token, key_id, |
97 | 1 | secret); | |
98 | 1 | } | |
99 | |||
100 | 1 | bool GatewayUploader::FinalizeSession(bool commit, | |
101 | const std::string& old_root_hash, | ||
102 | const std::string& new_root_hash, | ||
103 | const RepositoryTag& tag) { | ||
104 | 1 | return session_context_->Finalize(commit, old_root_hash, new_root_hash, tag); | |
105 | } | ||
106 | |||
107 | ✗ | void GatewayUploader::WaitForUpload() const { | |
108 | ✗ | session_context_->WaitForUpload(); | |
109 | } | ||
110 | |||
111 | ✗ | std::string GatewayUploader::name() const { return "HTTP"; } | |
112 | |||
113 | ✗ | void GatewayUploader::DoRemoveAsync(const std::string& /*file_to_delete*/) { | |
114 | ✗ | atomic_inc32(&num_errors_); | |
115 | ✗ | Respond(NULL, UploaderResults()); | |
116 | } | ||
117 | |||
118 | ✗ | bool GatewayUploader::Peek(const std::string& /*path*/) { return false; } | |
119 | |||
120 | // TODO(jpriessn): implement Mkdir on gateway server-side | ||
121 | ✗ | bool GatewayUploader::Mkdir(const std::string &path) { | |
122 | ✗ | return true; | |
123 | } | ||
124 | |||
125 | ✗ | bool GatewayUploader::PlaceBootstrappingShortcut(const shash::Any& /*object*/) { | |
126 | ✗ | return false; | |
127 | } | ||
128 | |||
129 | ✗ | unsigned int GatewayUploader::GetNumberOfErrors() const { | |
130 | ✗ | return atomic_read32(&num_errors_); | |
131 | } | ||
132 | |||
133 | ✗ | void GatewayUploader::DoUpload(const std::string& remote_path, | |
134 | IngestionSource *source, | ||
135 | const CallbackTN* callback) { | ||
136 | UniquePtr<GatewayStreamHandle> handle( | ||
137 | ✗ | new GatewayStreamHandle(callback, session_context_->NewBucket())); | |
138 | |||
139 | ✗ | bool rvb = source->Open(); | |
140 | ✗ | if (!rvb) { | |
141 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
142 | "File upload - could not open local file."); | ||
143 | ✗ | BumpErrors(); | |
144 | ✗ | Respond(callback, UploaderResults(1, source->GetPath())); | |
145 | ✗ | return; | |
146 | } | ||
147 | |||
148 | unsigned char hash_ctx[shash::kMaxContextSize]; | ||
149 | ✗ | shash::ContextPtr hash_ctx_ptr(spooler_definition().hash_algorithm, hash_ctx); | |
150 | ✗ | shash::Init(hash_ctx_ptr); | |
151 | ✗ | std::vector<char> buf(1024); | |
152 | ✗ | ssize_t read_bytes = 0; | |
153 | do { | ||
154 | ✗ | read_bytes = source->Read(&buf[0], buf.size()); | |
155 | ✗ | assert(read_bytes >= 0); | |
156 | ✗ | ObjectPack::AddToBucket(&buf[0], read_bytes, handle->bucket); | |
157 | ✗ | shash::Update(reinterpret_cast<unsigned char *>(&buf[0]), read_bytes, | |
158 | hash_ctx_ptr); | ||
159 | ✗ | } while (static_cast<size_t>(read_bytes) == buf.size()); | |
160 | ✗ | source->Close(); | |
161 | ✗ | shash::Any content_hash(spooler_definition().hash_algorithm); | |
162 | ✗ | shash::Final(hash_ctx_ptr, &content_hash); | |
163 | |||
164 | ✗ | if (!session_context_->CommitBucket(ObjectPack::kNamed, content_hash, | |
165 | ✗ | handle->bucket, remote_path)) { | |
166 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
167 | "File upload - could not commit bucket"); | ||
168 | ✗ | BumpErrors(); | |
169 | ✗ | Respond(handle->commit_callback, UploaderResults(2, source->GetPath())); | |
170 | ✗ | return; | |
171 | } | ||
172 | |||
173 | ✗ | Respond(callback, UploaderResults(0, source->GetPath())); | |
174 | } | ||
175 | |||
176 | ✗ | UploadStreamHandle* GatewayUploader::InitStreamedUpload( | |
177 | const CallbackTN* callback) { | ||
178 | ✗ | return new GatewayStreamHandle(callback, session_context_->NewBucket()); | |
179 | } | ||
180 | |||
181 | ✗ | void GatewayUploader::StreamedUpload(UploadStreamHandle* handle, | |
182 | UploadBuffer buffer, | ||
183 | const CallbackTN* callback) { | ||
184 | ✗ | GatewayStreamHandle* hd = dynamic_cast<GatewayStreamHandle*>(handle); | |
185 | ✗ | if (!hd) { | |
186 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
187 | "Streamed upload - incompatible upload handle"); | ||
188 | ✗ | BumpErrors(); | |
189 | ✗ | Respond(callback, UploaderResults(UploaderResults::kBufferUpload, 2)); | |
190 | ✗ | return; | |
191 | } | ||
192 | |||
193 | ✗ | ObjectPack::AddToBucket(buffer.data, buffer.size, hd->bucket); | |
194 | |||
195 | ✗ | Respond(callback, UploaderResults(UploaderResults::kBufferUpload, 0)); | |
196 | } | ||
197 | |||
198 | ✗ | void GatewayUploader::FinalizeStreamedUpload(UploadStreamHandle* handle, | |
199 | const shash::Any& content_hash) { | ||
200 | ✗ | GatewayStreamHandle* hd = dynamic_cast<GatewayStreamHandle*>(handle); | |
201 | ✗ | if (!hd) { | |
202 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
203 | "Finalize streamed upload - incompatible upload handle"); | ||
204 | ✗ | BumpErrors(); | |
205 | ✗ | Respond(handle->commit_callback, | |
206 | ✗ | UploaderResults(UploaderResults::kChunkCommit, 2)); | |
207 | ✗ | return; | |
208 | } | ||
209 | |||
210 | // hd->remote_path is ignored when empty | ||
211 | ✗ | if (!session_context_->CommitBucket(ObjectPack::kCas, content_hash, | |
212 | ✗ | hd->bucket, hd->remote_path)) { | |
213 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
214 | "Finalize streamed upload - could not commit bucket"); | ||
215 | ✗ | BumpErrors(); | |
216 | ✗ | Respond(handle->commit_callback, | |
217 | ✗ | UploaderResults(UploaderResults::kChunkCommit, 4)); | |
218 | ✗ | return; | |
219 | } | ||
220 | ✗ | if (!content_hash.HasSuffix() | |
221 | ✗ | || content_hash.suffix == shash::kSuffixPartial) { | |
222 | ✗ | CountUploadedChunks(); | |
223 | ✗ | CountUploadedBytes(hd->bucket->size); | |
224 | ✗ | } else if (content_hash.suffix == shash::kSuffixCatalog) { | |
225 | ✗ | CountUploadedCatalogs(); | |
226 | ✗ | CountUploadedCatalogBytes(hd->bucket->size); | |
227 | } | ||
228 | ✗ | Respond(handle->commit_callback, | |
229 | ✗ | UploaderResults(UploaderResults::kChunkCommit, 0)); | |
230 | } | ||
231 | |||
232 | ✗ | void GatewayUploader::ReadSessionTokenFile(const std::string& token_file_name, | |
233 | std::string* token) { | ||
234 | ✗ | assert(token); | |
235 | ✗ | *token = "INVALIDTOKEN"; // overwritten if reading from file works | |
236 | |||
237 | ✗ | FILE* token_file = std::fopen(token_file_name.c_str(), "r"); | |
238 | ✗ | if (!token_file) { | |
239 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
240 | "HTTP Uploader - Could not open session token file."); | ||
241 | ✗ | return; | |
242 | } | ||
243 | |||
244 | ✗ | GetLineFile(token_file, token); | |
245 | ✗ | fclose(token_file); | |
246 | } | ||
247 | |||
248 | ✗ | bool GatewayUploader::ReadKey(const std::string& key_file, std::string* key_id, | |
249 | std::string* secret) { | ||
250 | ✗ | return gateway::ReadKeys(key_file, key_id, secret); | |
251 | } | ||
252 | |||
253 | ✗ | int64_t GatewayUploader::DoGetObjectSize(const std::string &file_name) { | |
254 | ✗ | return -EOPNOTSUPP; | |
255 | } | ||
256 | |||
257 | ✗ | void GatewayUploader::BumpErrors() const { atomic_inc32(&num_errors_); } | |
258 | |||
259 | } // namespace upload | ||
260 |