Directory: | cvmfs/ |
---|---|
File: | cvmfs/upload_facility.h |
Date: | 2025-06-29 02:35:41 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 64 | 66 | 97.0% |
Branches: | 30 | 62 | 48.4% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #ifndef CVMFS_UPLOAD_FACILITY_H_ | ||
6 | #define CVMFS_UPLOAD_FACILITY_H_ | ||
7 | |||
8 | #include <fcntl.h> | ||
9 | #include <stdint.h> | ||
10 | |||
11 | #include <string> | ||
12 | |||
13 | #include "ingestion/ingestion_source.h" | ||
14 | #include "ingestion/task.h" | ||
15 | #include "repository_tag.h" | ||
16 | #include "statistics.h" | ||
17 | #include "upload_spooler_definition.h" | ||
18 | #include "util/atomic.h" | ||
19 | #include "util/concurrency.h" | ||
20 | #include "util/pointer.h" | ||
21 | #include "util/posix.h" | ||
22 | #include "util/tube.h" | ||
23 | |||
24 | namespace upload { | ||
25 | |||
26 | struct UploadCounters { | ||
27 | perf::Counter *n_chunks_added; | ||
28 | perf::Counter *n_chunks_duplicated; | ||
29 | perf::Counter *n_catalogs_added; | ||
30 | perf::Counter *sz_uploaded_bytes; | ||
31 | perf::Counter *sz_uploaded_catalog_bytes; | ||
32 | |||
33 | 4 | explicit UploadCounters(perf::StatisticsTemplate statistics) { | |
34 |
3/6✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 4 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 4 times.
✗ Branch 10 not taken.
|
4 | n_chunks_added = statistics.RegisterOrLookupTemplated( |
35 | "n_chunks_added", "Number of new chunks added"); | ||
36 |
3/6✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 4 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 4 times.
✗ Branch 10 not taken.
|
4 | n_chunks_duplicated = statistics.RegisterOrLookupTemplated( |
37 | "n_chunks_duplicated", "Number of duplicated chunks added"); | ||
38 |
3/6✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 4 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 4 times.
✗ Branch 10 not taken.
|
4 | n_catalogs_added = statistics.RegisterOrLookupTemplated( |
39 | "n_catalogs_added", "Number of new catalogs added"); | ||
40 |
3/6✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 4 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 4 times.
✗ Branch 10 not taken.
|
4 | sz_uploaded_bytes = statistics.RegisterOrLookupTemplated( |
41 | "sz_uploaded_bytes", "Number of uploaded bytes"); | ||
42 |
3/6✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 4 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 4 times.
✗ Branch 10 not taken.
|
4 | sz_uploaded_catalog_bytes = statistics.RegisterOrLookupTemplated( |
43 | "sz_uploaded_catalog_bytes", "Number of uploaded bytes for catalogs"); | ||
44 | 4 | } | |
45 | }; // UploadCounters | ||
46 | |||
47 | struct UploaderResults { | ||
48 | enum Type { | ||
49 | kFileUpload, | ||
50 | kBufferUpload, | ||
51 | kChunkCommit, | ||
52 | kRemove, | ||
53 | kLookup | ||
54 | }; | ||
55 | |||
56 | 63338 | UploaderResults(const int return_code, const std::string &local_path) | |
57 | 63338 | : type(kFileUpload), return_code(return_code), local_path(local_path) { } | |
58 | |||
59 | 26448064 | explicit UploaderResults(Type t, const int return_code) | |
60 |
1/2✓ Branch 2 taken 26448002 times.
✗ Branch 3 not taken.
|
26448064 | : type(t), return_code(return_code), local_path("") { } |
61 | |||
62 | 10198 | UploaderResults() : type(kRemove), return_code(0) { } | |
63 | |||
64 | const Type type; | ||
65 | const int return_code; | ||
66 | const std::string local_path; | ||
67 | }; | ||
68 | |||
69 | struct UploadStreamHandle; | ||
70 | |||
71 | /** | ||
72 | * Abstract base class for all backend upload facilities | ||
73 | * This class defines an interface and constructs the concrete Uploaders, | ||
74 | * furthermore it handles callbacks to the outside world to notify users of done | ||
75 | * upload jobs. | ||
76 | * | ||
77 | * Note: Users could be both the Spooler (when calling Spooler::Upload()) and | ||
78 | * the IngestionPipeline (when calling Spooler::Process()). We therefore | ||
79 | * cannot use the Observable template here, since this would forward | ||
80 | * finished upload jobs to ALL listeners instead of only the owner of the | ||
81 | * specific job. | ||
82 | */ | ||
83 | class AbstractUploader | ||
84 | : public PolymorphicConstruction<AbstractUploader, SpoolerDefinition>, | ||
85 | public Callbackable<UploaderResults>, | ||
86 | public SingleCopy { | ||
87 | friend class TaskUpload; | ||
88 | |||
89 | public: | ||
90 | /** | ||
91 | * A read-only memory block that is supposed to be written out. | ||
92 | */ | ||
93 | struct UploadBuffer { | ||
94 | 9779504 | UploadBuffer() : size(0), data(NULL) { } | |
95 | 16600434 | UploadBuffer(uint64_t s, const void *d) : size(s), data(d) { } | |
96 | uint64_t size; | ||
97 | const void *data; | ||
98 | }; | ||
99 | |||
100 | struct JobStatus { | ||
101 | enum State { | ||
102 | kOk, | ||
103 | kTerminate, | ||
104 | kNoJobs | ||
105 | }; | ||
106 | }; | ||
107 | |||
108 | struct UploadJob { | ||
109 | enum Type { | ||
110 | Upload, | ||
111 | Commit, | ||
112 | Terminate | ||
113 | }; | ||
114 | |||
115 | UploadJob(UploadStreamHandle *handle, UploadBuffer buffer, | ||
116 | const CallbackTN *callback = NULL); | ||
117 | UploadJob(UploadStreamHandle *handle, const shash::Any &content_hash); | ||
118 | |||
119 | 4132 | UploadJob() | |
120 | 4132 | : type(Terminate) | |
121 | 4132 | , stream_handle(NULL) | |
122 | 4132 | , tag_(0) | |
123 | 4132 | , buffer() | |
124 | 4132 | , callback(NULL) { } | |
125 | |||
126 |
1/2✓ Branch 2 taken 4132 times.
✗ Branch 3 not taken.
|
4132 | static UploadJob *CreateQuitBeacon() { return new UploadJob(); } |
127 | 26394280 | bool IsQuitBeacon() { return type == Terminate; } | |
128 | |||
129 | Type type; | ||
130 | UploadStreamHandle *stream_handle; | ||
131 | /** | ||
132 | * Ensure that upload jobs belonging to the same file end up in the same | ||
133 | * upload task queue. | ||
134 | */ | ||
135 | int64_t tag_; | ||
136 | 55986 | int64_t tag() { return tag_; } | |
137 | |||
138 | // type==Upload specific fields | ||
139 | UploadBuffer buffer; | ||
140 | const CallbackTN *callback; | ||
141 | |||
142 | // type==Commit specific fields | ||
143 | shash::Any content_hash; | ||
144 | }; | ||
145 | |||
146 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 3388 times.
|
6776 | virtual ~AbstractUploader() { assert(!tasks_upload_.is_active()); } |
147 | |||
148 | /** | ||
149 | * A string identifying the uploader type | ||
150 | */ | ||
151 | virtual std::string name() const = 0; | ||
152 | |||
153 | /** | ||
154 | * Initializes a new repository storage area, e.g. create directory layout | ||
155 | * for local backend or create bucket for S3 backend. | ||
156 | */ | ||
157 | virtual bool Create() = 0; | ||
158 | |||
159 | /** | ||
160 | * Concrete uploaders might want to use a customized setting for multi-stream | ||
161 | * writing, for instance one per disk. Note that the S3 backend uses one task | ||
162 | * but this one task uses internally multiple HTTP streams through curl async | ||
163 | * I/O. | ||
164 | */ | ||
165 | 7522 | virtual unsigned GetNumTasks() const { return num_upload_tasks_; } | |
166 | |||
167 | /** | ||
168 | * This is called right after the constructor of AbstractUploader or/and its | ||
169 | * derived class has been executed. You can override that to do additional | ||
170 | * initialization that cannot be done in the constructor itself. | ||
171 | * | ||
172 | * @return true on successful initialization | ||
173 | */ | ||
174 | virtual bool Initialize(); | ||
175 | |||
176 | /** | ||
177 | * Called during Spooler::WaitForUpload(), to ensure that the upload has | ||
178 | * finished. If commit == true, then a Commit request is also sent, to apply | ||
179 | * all the the changes accumulated during the session. "catalog_path" | ||
180 | * represents the path of the root catalog with the changes. | ||
181 | * By default it is a noop and returns true; | ||
182 | */ | ||
183 | virtual bool FinalizeSession(bool commit, const std::string &old_root_hash, | ||
184 | const std::string &new_root_hash, | ||
185 | const RepositoryTag &tag); | ||
186 | |||
187 | /** | ||
188 | * This must be called right before the destruction of the AbstractUploader! | ||
189 | * You are _not_ supposed to overwrite this method in your concrete Uploader. | ||
190 | */ | ||
191 | void TearDown(); | ||
192 | |||
193 | /** | ||
194 | * Uploads the file at the path local_path into the backend storage under the | ||
195 | * path remote_path. When the upload has finished it calls callback. | ||
196 | * Note: This method might be implemented in a synchronous way. | ||
197 | * | ||
198 | * @param local_path path to the file to be uploaded | ||
199 | * @param remote_path desired path for the file in the backend storage | ||
200 | * @param callback (optional) gets notified when the upload was finished | ||
201 | */ | ||
202 | 31842 | void UploadFile(const std::string &local_path, | |
203 | const std::string &remote_path, | ||
204 | const CallbackTN *callback = NULL) { | ||
205 |
1/2✓ Branch 1 taken 31842 times.
✗ Branch 2 not taken.
|
31842 | ++jobs_in_flight_; |
206 |
1/2✓ Branch 1 taken 31842 times.
✗ Branch 2 not taken.
|
31842 | FileIngestionSource source(local_path); |
207 |
1/2✓ Branch 1 taken 31842 times.
✗ Branch 2 not taken.
|
31842 | DoUpload(remote_path, &source, callback); |
208 | 31842 | } | |
209 | |||
210 | 62 | void UploadIngestionSource(const std::string &remote_path, | |
211 | IngestionSource *source, | ||
212 | const CallbackTN *callback = NULL) { | ||
213 | 62 | ++jobs_in_flight_; | |
214 | 62 | DoUpload(remote_path, source, callback); | |
215 | 62 | } | |
216 | |||
217 | /** | ||
218 | * This method is called before the first data block of a streamed upload is | ||
219 | * scheduled (see above implementation of UploadStreamHandle for details). | ||
220 | * | ||
221 | * @param callback (optional) this callback will be invoked once this parti- | ||
222 | * cular streamed upload is committed. | ||
223 | * @return a pointer to the initialized UploadStreamHandle | ||
224 | */ | ||
225 | virtual UploadStreamHandle *InitStreamedUpload( | ||
226 | const CallbackTN *callback) = 0; | ||
227 | |||
228 | /** | ||
229 | * This method schedules a buffer to be uploaded in the context of the | ||
230 | * given UploadStreamHandle. The actual upload will happen asynchronously by | ||
231 | * a concrete implementation of AbstractUploader | ||
232 | * (see AbstractUploader::StreamedUpload()). | ||
233 | * As soon has the scheduled upload job is complete (either successful or not) | ||
234 | * the optionally passed callback is supposed to be invoked using | ||
235 | * AbstractUploader::Respond(). | ||
236 | * | ||
237 | * @param handle Pointer to a previously acquired UploadStreamHandle | ||
238 | * @param buffer contains the data block to be uploaded | ||
239 | * @param callback (optional) callback object to be invoked once the given | ||
240 | * upload is finished (see AbstractUploader::Respond()) | ||
241 | */ | ||
242 | 16600278 | void ScheduleUpload(UploadStreamHandle *handle, | |
243 | UploadBuffer buffer, | ||
244 | const CallbackTN *callback = NULL) { | ||
245 | 16600278 | ++jobs_in_flight_; | |
246 |
3/6✓ Branch 1 taken 16612329 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 16610262 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 16612758 times.
✗ Branch 8 not taken.
|
16613187 | tubes_upload_.Dispatch(new UploadJob(handle, buffer, callback)); |
247 | 16612758 | } | |
248 | |||
249 | /** | ||
250 | * This method schedules a commit job as soon as all data blocks of a streamed | ||
251 | * upload are (successfully) uploaded. Derived classes must override | ||
252 | * AbstractUploader::FinalizeStreamedUpload() for this to happen. | ||
253 | * | ||
254 | * @param handle Pointer to a previously acquired UploadStreamHandle | ||
255 | * @param content_hash the content hash of the full uploaded data Chunk | ||
256 | */ | ||
257 | 9772632 | void ScheduleCommit(UploadStreamHandle *handle, | |
258 | const shash::Any &content_hash) { | ||
259 | 9772632 | ++jobs_in_flight_; | |
260 |
1/2✓ Branch 2 taken 9775323 times.
✗ Branch 3 not taken.
|
9775245 | tubes_upload_.Dispatch(new UploadJob(handle, content_hash)); |
261 | 9774738 | } | |
262 | |||
263 | /** | ||
264 | * Removes a file from the backend storage. | ||
265 | * | ||
266 | * Note: If the file doesn't exist before calling this won't be an error. | ||
267 | * | ||
268 | * @param file_to_delete path to the file to be removed | ||
269 | */ | ||
270 | 10198 | void RemoveAsync(const std::string &file_to_delete) { | |
271 | 10198 | ++jobs_in_flight_; | |
272 | 10198 | DoRemoveAsync(file_to_delete); | |
273 | 10198 | } | |
274 | |||
275 | /** | ||
276 | * Overloaded method used to remove a object based on its content hash. | ||
277 | * | ||
278 | * @param hash_to_delete the content hash of a file to be deleted | ||
279 | */ | ||
280 | 10136 | void RemoveAsync(const shash::Any &hash_to_delete) { | |
281 |
2/4✓ Branch 2 taken 10136 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 10136 times.
✗ Branch 6 not taken.
|
10136 | RemoveAsync("data/" + hash_to_delete.MakePath()); |
282 | 10136 | } | |
283 | |||
284 | /** | ||
285 | * Get object size based on its content hash | ||
286 | * | ||
287 | * @param hash the content hash of a file | ||
288 | */ | ||
289 | ✗ | int64_t GetObjectSize(const shash::Any &hash) { | |
290 | ✗ | return DoGetObjectSize("data/" + hash.MakePath()); | |
291 | } | ||
292 | |||
293 | /** | ||
294 | * Checks if a file is already present in the backend storage. This might be a | ||
295 | * synchronous operation. | ||
296 | * | ||
297 | * @param path the path of the file to be checked | ||
298 | * @return true if the file was found in the backend storage | ||
299 | */ | ||
300 | virtual bool Peek(const std::string &path) = 0; | ||
301 | |||
302 | /** | ||
303 | * Make directory in upstream storage. Noop if directory already present. | ||
304 | * | ||
305 | * @param path relative directory path in the upstream storage | ||
306 | * @return true if the directory was successfully created or already present | ||
307 | */ | ||
308 | virtual bool Mkdir(const std::string &path) = 0; | ||
309 | |||
310 | /** | ||
311 | * Creates a top-level shortcut to the given data object. This is particularly | ||
312 | * useful for bootstrapping repositories whose data-directory is secured by | ||
313 | * a VOMS certificate. | ||
314 | * | ||
315 | * @param object content hash of the object to be exposed on the top-level | ||
316 | * @return true on success | ||
317 | */ | ||
318 | virtual bool PlaceBootstrappingShortcut(const shash::Any &object) = 0; | ||
319 | |||
320 | /** | ||
321 | * Waits until the current upload queue is empty. | ||
322 | * | ||
323 | * Note: This does NOT necessarily mean, that all files are actually uploaded. | ||
324 | * If new jobs are concurrently scheduled the behavior of this method is | ||
325 | * not defined (it returns also on intermediately empty queues) | ||
326 | */ | ||
327 | virtual void WaitForUpload() const; | ||
328 | |||
329 | virtual unsigned int GetNumberOfErrors() const = 0; | ||
330 | static void RegisterPlugins(); | ||
331 | void InitCounters(perf::StatisticsTemplate *statistics); | ||
332 | |||
333 | protected: | ||
334 | typedef Callbackable<UploaderResults>::CallbackTN *CallbackPtr; | ||
335 | |||
336 | explicit AbstractUploader(const SpoolerDefinition &spooler_definition); | ||
337 | |||
338 | /** | ||
339 | * Implementation of plain file upload | ||
340 | * Public interface: AbstractUploader::Upload() | ||
341 | * | ||
342 | * @param local_path file to be uploaded | ||
343 | * @param remote_path destination to be written in the backend | ||
344 | * @param callback callback to be called on completion | ||
345 | */ | ||
346 | virtual void DoUpload(const std::string &remote_path, | ||
347 | IngestionSource *source, | ||
348 | const CallbackTN *callback) = 0; | ||
349 | |||
350 | /** | ||
351 | * Implementation of a streamed upload step. See public interface for details. | ||
352 | * Public interface: AbstractUploader::ScheduleUpload() | ||
353 | * | ||
354 | * @param handle descendant of UploadStreamHandle specifying the stream | ||
355 | * @param buffer the CharBuffer to be uploaded to the stream | ||
356 | * @param callback callback to be called on completion | ||
357 | */ | ||
358 | virtual void StreamedUpload(UploadStreamHandle *handle, | ||
359 | UploadBuffer buffer, | ||
360 | const CallbackTN *callback) = 0; | ||
361 | |||
362 | /** | ||
363 | * Implementation of streamed upload commit | ||
364 | * Public interface: AbstractUploader::ScheduleUpload() | ||
365 | * | ||
366 | * @param handle descendant of UploadStreamHandle specifying the stream | ||
367 | * @param content_hash the computed content hash of the streamed object | ||
368 | */ | ||
369 | virtual void FinalizeStreamedUpload(UploadStreamHandle *handle, | ||
370 | const shash::Any &content_hash) = 0; | ||
371 | |||
372 | |||
373 | virtual void DoRemoveAsync(const std::string &file_to_delete) = 0; | ||
374 | |||
375 | virtual int64_t DoGetObjectSize(const std::string &file_name) = 0; | ||
376 | |||
377 | /** | ||
378 | * This notifies the callback that is associated to a finishing job. Please | ||
379 | * do not call the handed callback yourself in concrete Uploaders! | ||
380 | * | ||
381 | * Note: Since the job is finished after we respond to it, the callback object | ||
382 | * gets automatically destroyed by this call! | ||
383 | * Therefore you must not call Respond() twice or use the callback later | ||
384 | * by any means! | ||
385 | */ | ||
386 | 26432330 | void Respond(const CallbackTN *callback, | |
387 | const UploaderResults &result) const { | ||
388 |
2/2✓ Branch 0 taken 26422132 times.
✓ Branch 1 taken 10198 times.
|
26432330 | if (callback != NULL) { |
389 | 26422132 | (*callback)(result); | |
390 |
1/2✓ Branch 0 taken 26422132 times.
✗ Branch 1 not taken.
|
26422132 | delete callback; |
391 | } | ||
392 | |||
393 | 26432330 | --jobs_in_flight_; | |
394 | 26432330 | } | |
395 | |||
396 | /** | ||
397 | * Creates a temporary file in the backend storage's temporary location | ||
398 | * For the LocalUploader this usually is the 'txn' directory of the backend | ||
399 | * storage. Otherwise it is some scratch area. | ||
400 | * | ||
401 | * @param path pointer to a string that will contain the created file path | ||
402 | * @return a file descriptor to the opened file | ||
403 | */ | ||
404 | int CreateAndOpenTemporaryChunkFile(std::string *path) const; | ||
405 | |||
406 | 9728 | const SpoolerDefinition &spooler_definition() const { | |
407 | 9728 | return spooler_definition_; | |
408 | } | ||
409 | |||
410 | void CountUploadedChunks() const; | ||
411 | void DecUploadedChunks() const; | ||
412 | void CountUploadedBytes(int64_t bytes_written) const; | ||
413 | void CountDuplicates() const; | ||
414 | void CountUploadedCatalogs() const; | ||
415 | void CountUploadedCatalogBytes(int64_t bytes_written) const; | ||
416 | |||
417 | protected: | ||
418 | /** | ||
419 | * Used by concrete implementations when they use callbacks where it's not | ||
420 | * already foreseen, e.g. S3Uploader::Peek(). | ||
421 | */ | ||
422 | 80 | void IncJobsInFlight() { ++jobs_in_flight_; } | |
423 | |||
424 | private: | ||
425 | const SpoolerDefinition spooler_definition_; | ||
426 | |||
427 | /** | ||
428 | * Number of threads used for I/O write calls. Effectively this parameter | ||
429 | * sets the I/O depth. Defaults to 1. | ||
430 | */ | ||
431 | unsigned num_upload_tasks_; | ||
432 | mutable SynchronizingCounter<int32_t> jobs_in_flight_; | ||
433 | TubeGroup<UploadJob> tubes_upload_; | ||
434 | TubeConsumerGroup<UploadJob> tasks_upload_; | ||
435 | mutable UniquePtr<UploadCounters> counters_; | ||
436 | }; // class AbstractUploader | ||
437 | |||
438 | |||
439 | /** | ||
440 | * The actual writing is multi-threaded. | ||
441 | */ | ||
442 | class TaskUpload : public TubeConsumer<AbstractUploader::UploadJob> { | ||
443 | public: | ||
444 | 4133 | explicit TaskUpload(AbstractUploader *uploader, | |
445 | Tube<AbstractUploader::UploadJob> *tube) | ||
446 | 4133 | : TubeConsumer<AbstractUploader::UploadJob>(tube), uploader_(uploader) { } | |
447 | |||
448 | protected: | ||
449 | virtual void Process(AbstractUploader::UploadJob *upload_job); | ||
450 | |||
451 | private: | ||
452 | AbstractUploader *uploader_; | ||
453 | }; | ||
454 | |||
455 | |||
456 | /** | ||
457 | * Each implementation of AbstractUploader must provide its own derivate of the | ||
458 | * UploadStreamHandle that is supposed to contain state information for the | ||
459 | * streamed upload of one specific chunk. | ||
460 | * Each UploadStreamHandle contains a callback object that is invoked as soon as | ||
461 | * the streamed upload is committed. | ||
462 | */ | ||
463 | struct UploadStreamHandle { | ||
464 | typedef AbstractUploader::CallbackTN CallbackTN; | ||
465 | static atomic_int64 g_upload_stream_tag; | ||
466 | |||
467 | 9772476 | explicit UploadStreamHandle(const CallbackTN *commit_callback) | |
468 | 9772476 | : commit_callback(commit_callback) | |
469 | 9772476 | , tag(atomic_xadd64(&g_upload_stream_tag, 1)) { } | |
470 | 19551816 | virtual ~UploadStreamHandle() { } | |
471 | |||
472 | const CallbackTN *commit_callback; | ||
473 | |||
474 | int64_t tag; | ||
475 | |||
476 | std::string remote_path; // override remote location of the object | ||
477 | }; | ||
478 | |||
479 | } // namespace upload | ||
480 | |||
481 | #endif // CVMFS_UPLOAD_FACILITY_H_ | ||
482 |