Directory: | cvmfs/ |
---|---|
File: | cvmfs/upload_gateway.cc |
Date: | 2025-06-22 02:36:02 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 37 | 137 | 27.0% |
Branches: | 19 | 132 | 14.4% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #include "upload_gateway.h" | ||
6 | |||
7 | #include <cassert> | ||
8 | #include <limits> | ||
9 | #include <vector> | ||
10 | |||
11 | #include "gateway_util.h" | ||
12 | #include "util/exception.h" | ||
13 | #include "util/logging.h" | ||
14 | #include "util/string.h" | ||
15 | |||
16 | namespace upload { | ||
17 | |||
18 | ✗ | GatewayStreamHandle::GatewayStreamHandle(const CallbackTN *commit_callback, | |
19 | ✗ | ObjectPack::BucketHandle bkt) | |
20 | ✗ | : UploadStreamHandle(commit_callback), bucket(bkt) { } | |
21 | |||
22 | 57 | bool GatewayUploader::WillHandle(const SpoolerDefinition &spooler_definition) { | |
23 | 57 | return spooler_definition.driver_type == SpoolerDefinition::Gateway; | |
24 | } | ||
25 | |||
26 | 47 | bool GatewayUploader::ParseSpoolerDefinition( | |
27 | const SpoolerDefinition &spooler_definition, | ||
28 | GatewayUploader::Config *config) { | ||
29 | 47 | const std::string &config_string = spooler_definition.spooler_configuration; | |
30 |
2/2✓ Branch 0 taken 2 times.
✓ Branch 1 taken 45 times.
|
47 | if (!config) { |
31 | 2 | LogCvmfs(kLogUploadGateway, kLogStderr, "\"config\" argument is NULL"); | |
32 | 2 | return false; | |
33 | } | ||
34 | |||
35 |
2/2✓ Branch 1 taken 2 times.
✓ Branch 2 taken 43 times.
|
45 | if (spooler_definition.session_token_file.empty()) { |
36 | 2 | LogCvmfs(kLogUploadGateway, kLogStderr, | |
37 | "Failed to configure gateway uploader. " | ||
38 | "Missing session token file.\n"); | ||
39 | 2 | return false; | |
40 | } | ||
41 | 43 | config->session_token_file = spooler_definition.session_token_file; | |
42 | |||
43 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 43 times.
|
43 | if (spooler_definition.key_file.empty()) { |
44 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
45 | "Failed to configure gateway uploader. " | ||
46 | "Missing API key file.\n"); | ||
47 | ✗ | return false; | |
48 | } | ||
49 | 43 | config->key_file = spooler_definition.key_file; | |
50 | |||
51 | // Repo address, e.g. http://my.repo.address | ||
52 | 43 | config->api_url = config_string; | |
53 | |||
54 | 43 | return true; | |
55 | } | ||
56 | |||
57 | 41 | GatewayUploader::GatewayUploader(const SpoolerDefinition &spooler_definition) | |
58 | : AbstractUploader(spooler_definition) | ||
59 | 41 | , config_() | |
60 |
2/4✓ Branch 3 taken 41 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 41 times.
✗ Branch 7 not taken.
|
41 | , session_context_(new SessionContext()) { |
61 |
2/4✓ Branch 1 taken 41 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 41 times.
✗ Branch 4 not taken.
|
41 | assert(spooler_definition.IsValid() |
62 | && spooler_definition.driver_type == SpoolerDefinition::Gateway); | ||
63 | |||
64 |
2/4✓ Branch 1 taken 41 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 41 times.
|
41 | if (!ParseSpoolerDefinition(spooler_definition, &config_)) { |
65 | ✗ | PANIC(kLogStderr, "Error in parsing the spooler definition"); | |
66 | } | ||
67 | |||
68 | 41 | atomic_init32(&num_errors_); | |
69 | 41 | } | |
70 | |||
71 | 82 | GatewayUploader::~GatewayUploader() { | |
72 |
1/2✓ Branch 0 taken 41 times.
✗ Branch 1 not taken.
|
82 | if (session_context_) { |
73 |
1/2✓ Branch 0 taken 41 times.
✗ Branch 1 not taken.
|
82 | delete session_context_; |
74 | } | ||
75 | } | ||
76 | |||
77 | ✗ | bool GatewayUploader::Create() { | |
78 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
79 | "cannot create repository storage area when using the gateway"); | ||
80 | ✗ | return false; | |
81 | } | ||
82 | |||
83 | 41 | bool GatewayUploader::Initialize() { | |
84 |
2/4✓ Branch 1 taken 41 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 41 times.
|
41 | if (!AbstractUploader::Initialize()) { |
85 | ✗ | return false; | |
86 | } | ||
87 | 41 | std::string session_token; | |
88 |
1/2✓ Branch 1 taken 41 times.
✗ Branch 2 not taken.
|
41 | ReadSessionTokenFile(config_.session_token_file, &session_token); |
89 | |||
90 | 41 | std::string key_id; | |
91 | 41 | std::string secret; | |
92 |
2/4✓ Branch 1 taken 41 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 41 times.
|
41 | if (!ReadKey(config_.key_file, &key_id, &secret)) { |
93 | ✗ | return false; | |
94 | } | ||
95 | |||
96 |
1/2✓ Branch 1 taken 41 times.
✗ Branch 2 not taken.
|
41 | return session_context_->Initialize(config_.api_url, session_token, key_id, |
97 | 41 | secret); | |
98 | 41 | } | |
99 | |||
100 | 41 | bool GatewayUploader::FinalizeSession(bool commit, | |
101 | const std::string &old_root_hash, | ||
102 | const std::string &new_root_hash, | ||
103 | const RepositoryTag &tag) { | ||
104 | 41 | return session_context_->Finalize(commit, old_root_hash, new_root_hash, tag); | |
105 | } | ||
106 | |||
107 | ✗ | void GatewayUploader::WaitForUpload() const { | |
108 | ✗ | session_context_->WaitForUpload(); | |
109 | } | ||
110 | |||
111 | ✗ | std::string GatewayUploader::name() const { return "HTTP"; } | |
112 | |||
113 | ✗ | void GatewayUploader::DoRemoveAsync(const std::string & /*file_to_delete*/) { | |
114 | ✗ | atomic_inc32(&num_errors_); | |
115 | ✗ | Respond(NULL, UploaderResults()); | |
116 | } | ||
117 | |||
118 | ✗ | bool GatewayUploader::Peek(const std::string & /*path*/) { return false; } | |
119 | |||
120 | // TODO(jpriessn): implement Mkdir on gateway server-side | ||
121 | ✗ | bool GatewayUploader::Mkdir(const std::string &path) { return true; } | |
122 | |||
123 | ✗ | bool GatewayUploader::PlaceBootstrappingShortcut( | |
124 | const shash::Any & /*object*/) { | ||
125 | ✗ | return false; | |
126 | } | ||
127 | |||
128 | ✗ | unsigned int GatewayUploader::GetNumberOfErrors() const { | |
129 | ✗ | return atomic_read32(&num_errors_); | |
130 | } | ||
131 | |||
132 | ✗ | void GatewayUploader::DoUpload(const std::string &remote_path, | |
133 | IngestionSource *source, | ||
134 | const CallbackTN *callback) { | ||
135 | const UniquePtr<GatewayStreamHandle> handle( | ||
136 | ✗ | new GatewayStreamHandle(callback, session_context_->NewBucket())); | |
137 | |||
138 | ✗ | const bool rvb = source->Open(); | |
139 | ✗ | if (!rvb) { | |
140 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
141 | "File upload - could not open local file."); | ||
142 | ✗ | BumpErrors(); | |
143 | ✗ | Respond(callback, UploaderResults(1, source->GetPath())); | |
144 | ✗ | return; | |
145 | } | ||
146 | |||
147 | unsigned char hash_ctx[shash::kMaxContextSize]; | ||
148 | ✗ | const shash::ContextPtr hash_ctx_ptr(spooler_definition().hash_algorithm, | |
149 | ✗ | hash_ctx); | |
150 | ✗ | shash::Init(hash_ctx_ptr); | |
151 | ✗ | std::vector<char> buf(1024); | |
152 | ✗ | ssize_t read_bytes = 0; | |
153 | do { | ||
154 | ✗ | read_bytes = source->Read(&buf[0], buf.size()); | |
155 | ✗ | assert(read_bytes >= 0); | |
156 | ✗ | ObjectPack::AddToBucket(&buf[0], read_bytes, handle->bucket); | |
157 | ✗ | shash::Update(reinterpret_cast<unsigned char *>(&buf[0]), read_bytes, | |
158 | hash_ctx_ptr); | ||
159 | ✗ | } while (static_cast<size_t>(read_bytes) == buf.size()); | |
160 | ✗ | source->Close(); | |
161 | ✗ | shash::Any content_hash(spooler_definition().hash_algorithm); | |
162 | ✗ | shash::Final(hash_ctx_ptr, &content_hash); | |
163 | |||
164 | ✗ | if (!session_context_->CommitBucket(ObjectPack::kNamed, content_hash, | |
165 | ✗ | handle->bucket, remote_path)) { | |
166 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
167 | "File upload - could not commit bucket"); | ||
168 | ✗ | BumpErrors(); | |
169 | ✗ | Respond(handle->commit_callback, UploaderResults(2, source->GetPath())); | |
170 | ✗ | return; | |
171 | } | ||
172 | |||
173 | ✗ | Respond(callback, UploaderResults(0, source->GetPath())); | |
174 | } | ||
175 | |||
176 | ✗ | UploadStreamHandle *GatewayUploader::InitStreamedUpload( | |
177 | const CallbackTN *callback) { | ||
178 | ✗ | return new GatewayStreamHandle(callback, session_context_->NewBucket()); | |
179 | } | ||
180 | |||
181 | ✗ | void GatewayUploader::StreamedUpload(UploadStreamHandle *handle, | |
182 | UploadBuffer buffer, | ||
183 | const CallbackTN *callback) { | ||
184 | ✗ | GatewayStreamHandle *hd = dynamic_cast<GatewayStreamHandle *>(handle); | |
185 | ✗ | if (!hd) { | |
186 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
187 | "Streamed upload - incompatible upload handle"); | ||
188 | ✗ | BumpErrors(); | |
189 | ✗ | Respond(callback, UploaderResults(UploaderResults::kBufferUpload, 2)); | |
190 | ✗ | return; | |
191 | } | ||
192 | |||
193 | ✗ | ObjectPack::AddToBucket(buffer.data, buffer.size, hd->bucket); | |
194 | |||
195 | ✗ | Respond(callback, UploaderResults(UploaderResults::kBufferUpload, 0)); | |
196 | } | ||
197 | |||
198 | ✗ | void GatewayUploader::FinalizeStreamedUpload(UploadStreamHandle *handle, | |
199 | const shash::Any &content_hash) { | ||
200 | ✗ | GatewayStreamHandle *hd = dynamic_cast<GatewayStreamHandle *>(handle); | |
201 | ✗ | if (!hd) { | |
202 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
203 | "Finalize streamed upload - incompatible upload handle"); | ||
204 | ✗ | BumpErrors(); | |
205 | ✗ | Respond(handle->commit_callback, | |
206 | ✗ | UploaderResults(UploaderResults::kChunkCommit, 2)); | |
207 | ✗ | return; | |
208 | } | ||
209 | |||
210 | // hd->remote_path is ignored when empty | ||
211 | ✗ | if (!session_context_->CommitBucket(ObjectPack::kCas, content_hash, | |
212 | ✗ | hd->bucket, hd->remote_path)) { | |
213 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
214 | "Finalize streamed upload - could not commit bucket"); | ||
215 | ✗ | BumpErrors(); | |
216 | ✗ | Respond(handle->commit_callback, | |
217 | ✗ | UploaderResults(UploaderResults::kChunkCommit, 4)); | |
218 | ✗ | return; | |
219 | } | ||
220 | ✗ | if (!content_hash.HasSuffix() | |
221 | ✗ | || content_hash.suffix == shash::kSuffixPartial) { | |
222 | ✗ | CountUploadedChunks(); | |
223 | ✗ | CountUploadedBytes(hd->bucket->size); | |
224 | ✗ | } else if (content_hash.suffix == shash::kSuffixCatalog) { | |
225 | ✗ | CountUploadedCatalogs(); | |
226 | ✗ | CountUploadedCatalogBytes(hd->bucket->size); | |
227 | } | ||
228 | ✗ | Respond(handle->commit_callback, | |
229 | ✗ | UploaderResults(UploaderResults::kChunkCommit, 0)); | |
230 | } | ||
231 | |||
232 | ✗ | void GatewayUploader::ReadSessionTokenFile(const std::string &token_file_name, | |
233 | std::string *token) { | ||
234 | ✗ | assert(token); | |
235 | ✗ | *token = "INVALIDTOKEN"; // overwritten if reading from file works | |
236 | |||
237 | ✗ | FILE *token_file = std::fopen(token_file_name.c_str(), "r"); | |
238 | ✗ | if (!token_file) { | |
239 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
240 | "HTTP Uploader - Could not open session token file."); | ||
241 | ✗ | return; | |
242 | } | ||
243 | |||
244 | ✗ | GetLineFile(token_file, token); | |
245 | ✗ | fclose(token_file); | |
246 | } | ||
247 | |||
248 | ✗ | bool GatewayUploader::ReadKey(const std::string &key_file, std::string *key_id, | |
249 | std::string *secret) { | ||
250 | ✗ | return gateway::ReadKeys(key_file, key_id, secret); | |
251 | } | ||
252 | |||
253 | ✗ | int64_t GatewayUploader::DoGetObjectSize(const std::string &file_name) { | |
254 | ✗ | return -EOPNOTSUPP; | |
255 | } | ||
256 | |||
257 | ✗ | void GatewayUploader::BumpErrors() const { atomic_inc32(&num_errors_); } | |
258 | |||
259 | } // namespace upload | ||
260 |