Directory: | cvmfs/ |
---|---|
File: | cvmfs/upload_facility.h |
Date: | 2025-05-11 02:35:43 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 75 | 77 | 97.4% |
Branches: | 30 | 62 | 48.4% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #ifndef CVMFS_UPLOAD_FACILITY_H_ | ||
6 | #define CVMFS_UPLOAD_FACILITY_H_ | ||
7 | |||
8 | #include <fcntl.h> | ||
9 | #include <stdint.h> | ||
10 | |||
11 | #include <string> | ||
12 | |||
13 | #include "ingestion/ingestion_source.h" | ||
14 | #include "ingestion/task.h" | ||
15 | #include "repository_tag.h" | ||
16 | #include "statistics.h" | ||
17 | #include "upload_spooler_definition.h" | ||
18 | #include "util/atomic.h" | ||
19 | #include "util/concurrency.h" | ||
20 | #include "util/posix.h" | ||
21 | #include "util/tube.h" | ||
22 | |||
23 | namespace upload { | ||
24 | |||
25 | struct UploadCounters { | ||
26 | perf::Counter *n_chunks_added; | ||
27 | perf::Counter *n_chunks_duplicated; | ||
28 | perf::Counter *n_catalogs_added; | ||
29 | perf::Counter *sz_uploaded_bytes; | ||
30 | perf::Counter *sz_uploaded_catalog_bytes; | ||
31 | |||
32 | 2 | explicit UploadCounters(perf::StatisticsTemplate statistics) { | |
33 |
3/6✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2 times.
✗ Branch 10 not taken.
|
2 | n_chunks_added = statistics.RegisterOrLookupTemplated( |
34 | "n_chunks_added", "Number of new chunks added"); | ||
35 |
3/6✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2 times.
✗ Branch 10 not taken.
|
2 | n_chunks_duplicated = statistics.RegisterOrLookupTemplated( |
36 | "n_chunks_duplicated", "Number of duplicated chunks added"); | ||
37 |
3/6✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2 times.
✗ Branch 10 not taken.
|
2 | n_catalogs_added = statistics.RegisterOrLookupTemplated( |
38 | "n_catalogs_added", "Number of new catalogs added"); | ||
39 |
3/6✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2 times.
✗ Branch 10 not taken.
|
2 | sz_uploaded_bytes = statistics.RegisterOrLookupTemplated( |
40 | "sz_uploaded_bytes", "Number of uploaded bytes"); | ||
41 |
3/6✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2 times.
✗ Branch 10 not taken.
|
2 | sz_uploaded_catalog_bytes = statistics.RegisterOrLookupTemplated( |
42 | "sz_uploaded_catalog_bytes", "Number of uploaded bytes for catalogs"); | ||
43 | 2 | } | |
44 | }; // UploadCounters | ||
45 | |||
46 | struct UploaderResults { | ||
47 | enum Type { kFileUpload, kBufferUpload, kChunkCommit, kRemove, kLookup }; | ||
48 | |||
49 | 2052 | UploaderResults(const int return_code, const std::string &local_path) | |
50 | 2052 | : type(kFileUpload), | |
51 | 2052 | return_code(return_code), | |
52 | 2052 | local_path(local_path) {} | |
53 | |||
54 | 673344 | explicit UploaderResults(Type t, const int return_code) | |
55 | 673344 | : type(t), | |
56 | 673344 | return_code(return_code), | |
57 |
1/2✓ Branch 2 taken 673343 times.
✗ Branch 3 not taken.
|
673344 | local_path("") {} |
58 | |||
59 | 364 | UploaderResults() | |
60 | 364 | : type(kRemove) | |
61 | 364 | , return_code(0) | |
62 | 364 | { } | |
63 | |||
64 | const Type type; | ||
65 | const int return_code; | ||
66 | const std::string local_path; | ||
67 | }; | ||
68 | |||
69 | struct UploadStreamHandle; | ||
70 | |||
71 | /** | ||
72 | * Abstract base class for all backend upload facilities | ||
73 | * This class defines an interface and constructs the concrete Uploaders, | ||
74 | * furthermore it handles callbacks to the outside world to notify users of done | ||
75 | * upload jobs. | ||
76 | * | ||
77 | * Note: Users could be both the Spooler (when calling Spooler::Upload()) and | ||
78 | * the IngestionPipeline (when calling Spooler::Process()). We therefore | ||
79 | * cannot use the Observable template here, since this would forward | ||
80 | * finished upload jobs to ALL listeners instead of only the owner of the | ||
81 | * specific job. | ||
82 | */ | ||
83 | class AbstractUploader | ||
84 | : public PolymorphicConstruction<AbstractUploader, SpoolerDefinition> | ||
85 | , public Callbackable<UploaderResults> | ||
86 | , public SingleCopy { | ||
87 | friend class TaskUpload; | ||
88 | |||
89 | public: | ||
90 | /** | ||
91 | * A read-only memory block that is supposed to be written out. | ||
92 | */ | ||
93 | struct UploadBuffer { | ||
94 | 250834 | UploadBuffer() : size(0), data(NULL) { } | |
95 | 420191 | UploadBuffer(uint64_t s, const void *d) : size(s), data(d) { } | |
96 | uint64_t size; | ||
97 | const void *data; | ||
98 | }; | ||
99 | |||
100 | struct JobStatus { | ||
101 | enum State { kOk, kTerminate, kNoJobs }; | ||
102 | }; | ||
103 | |||
104 | struct UploadJob { | ||
105 | enum Type { Upload, Commit, Terminate }; | ||
106 | |||
107 | UploadJob(UploadStreamHandle *handle, UploadBuffer buffer, | ||
108 | const CallbackTN *callback = NULL); | ||
109 | UploadJob(UploadStreamHandle *handle, const shash::Any &content_hash); | ||
110 | |||
111 | 138 | UploadJob() | |
112 | 138 | : type(Terminate) | |
113 | 138 | , stream_handle(NULL) | |
114 | 138 | , tag_(0) | |
115 | 138 | , buffer() | |
116 | 138 | , callback(NULL) {} | |
117 | |||
118 |
1/2✓ Branch 2 taken 138 times.
✗ Branch 3 not taken.
|
138 | static UploadJob *CreateQuitBeacon() { return new UploadJob(); } |
119 | 671365 | bool IsQuitBeacon() { return type == Terminate; } | |
120 | |||
121 | Type type; | ||
122 | UploadStreamHandle *stream_handle; | ||
123 | /** | ||
124 | * Ensure that upload jobs belonging to the same file end up in the same | ||
125 | * upload task queue. | ||
126 | */ | ||
127 | int64_t tag_; | ||
128 | 1806 | int64_t tag() { return tag_; } | |
129 | |||
130 | // type==Upload specific fields | ||
131 | UploadBuffer buffer; | ||
132 | const CallbackTN *callback; | ||
133 | |||
134 | // type==Commit specific fields | ||
135 | shash::Any content_hash; | ||
136 | }; | ||
137 | |||
138 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 114 times.
|
228 | virtual ~AbstractUploader() { assert(!tasks_upload_.is_active()); } |
139 | |||
140 | /** | ||
141 | * A string identifying the uploader type | ||
142 | */ | ||
143 | virtual std::string name() const = 0; | ||
144 | |||
145 | /** | ||
146 | * Initializes a new repository storage area, e.g. create directory layout | ||
147 | * for local backend or create bucket for S3 backend. | ||
148 | */ | ||
149 | virtual bool Create() = 0; | ||
150 | |||
151 | /** | ||
152 | * Concrete uploaders might want to use a customized setting for multi-stream | ||
153 | * writing, for instance one per disk. Note that the S3 backend uses one task | ||
154 | * but this one task uses internally multiple HTTP streams through curl async | ||
155 | * I/O. | ||
156 | */ | ||
157 | 252 | virtual unsigned GetNumTasks() const { return num_upload_tasks_; } | |
158 | |||
159 | /** | ||
160 | * This is called right after the constructor of AbstractUploader or/and its | ||
161 | * derived class has been executed. You can override that to do additional | ||
162 | * initialization that cannot be done in the constructor itself. | ||
163 | * | ||
164 | * @return true on successful initialization | ||
165 | */ | ||
166 | virtual bool Initialize(); | ||
167 | |||
168 | /** | ||
169 | * Called during Spooler::WaitForUpload(), to ensure that the upload has | ||
170 | * finished. If commit == true, then a Commit request is also sent, to apply | ||
171 | * all the the changes accumulated during the session. "catalog_path" | ||
172 | * represents the path of the root catalog with the changes. | ||
173 | * By default it is a noop and returns true; | ||
174 | */ | ||
175 | virtual bool FinalizeSession(bool commit, const std::string &old_root_hash, | ||
176 | const std::string &new_root_hash, | ||
177 | const RepositoryTag &tag); | ||
178 | |||
179 | /** | ||
180 | * This must be called right before the destruction of the AbstractUploader! | ||
181 | * You are _not_ supposed to overwrite this method in your concrete Uploader. | ||
182 | */ | ||
183 | void TearDown(); | ||
184 | |||
185 | /** | ||
186 | * Uploads the file at the path local_path into the backend storage under the | ||
187 | * path remote_path. When the upload has finished it calls callback. | ||
188 | * Note: This method might be implemented in a synchronous way. | ||
189 | * | ||
190 | * @param local_path path to the file to be uploaded | ||
191 | * @param remote_path desired path for the file in the backend storage | ||
192 | * @param callback (optional) gets notified when the upload was finished | ||
193 | */ | ||
194 | 1036 | void UploadFile( | |
195 | const std::string &local_path, | ||
196 | const std::string &remote_path, | ||
197 | const CallbackTN *callback = NULL) | ||
198 | { | ||
199 |
1/2✓ Branch 1 taken 1036 times.
✗ Branch 2 not taken.
|
1036 | ++jobs_in_flight_; |
200 |
1/2✓ Branch 1 taken 1036 times.
✗ Branch 2 not taken.
|
1036 | FileIngestionSource source(local_path); |
201 |
1/2✓ Branch 1 taken 1036 times.
✗ Branch 2 not taken.
|
1036 | DoUpload(remote_path, &source, callback); |
202 | 1036 | } | |
203 | |||
204 | 2 | void UploadIngestionSource( | |
205 | const std::string &remote_path, | ||
206 | IngestionSource *source, | ||
207 | const CallbackTN *callback = NULL) | ||
208 | { | ||
209 | 2 | ++jobs_in_flight_; | |
210 | 2 | DoUpload(remote_path, source, callback); | |
211 | 2 | } | |
212 | |||
213 | /** | ||
214 | * This method is called before the first data block of a streamed upload is | ||
215 | * scheduled (see above implementation of UploadStreamHandle for details). | ||
216 | * | ||
217 | * @param callback (optional) this callback will be invoked once this parti- | ||
218 | * cular streamed upload is committed. | ||
219 | * @return a pointer to the initialized UploadStreamHandle | ||
220 | */ | ||
221 | virtual UploadStreamHandle *InitStreamedUpload( | ||
222 | const CallbackTN *callback) = 0; | ||
223 | |||
224 | /** | ||
225 | * This method schedules a buffer to be uploaded in the context of the | ||
226 | * given UploadStreamHandle. The actual upload will happen asynchronously by | ||
227 | * a concrete implementation of AbstractUploader | ||
228 | * (see AbstractUploader::StreamedUpload()). | ||
229 | * As soon has the scheduled upload job is complete (either successful or not) | ||
230 | * the optionally passed callback is supposed to be invoked using | ||
231 | * AbstractUploader::Respond(). | ||
232 | * | ||
233 | * @param handle Pointer to a previously acquired UploadStreamHandle | ||
234 | * @param buffer contains the data block to be uploaded | ||
235 | * @param callback (optional) callback object to be invoked once the given | ||
236 | * upload is finished (see AbstractUploader::Respond()) | ||
237 | */ | ||
238 | 420196 | void ScheduleUpload( | |
239 | UploadStreamHandle *handle, | ||
240 | UploadBuffer buffer, | ||
241 | const CallbackTN *callback = NULL) | ||
242 | { | ||
243 | 420196 | ++jobs_in_flight_; | |
244 |
3/6✓ Branch 1 taken 420435 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 420389 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 420464 times.
✗ Branch 8 not taken.
|
420457 | tubes_upload_.Dispatch(new UploadJob(handle, buffer, callback)); |
245 | 420464 | } | |
246 | |||
247 | /** | ||
248 | * This method schedules a commit job as soon as all data blocks of a streamed | ||
249 | * upload are (successfully) uploaded. Derived classes must override | ||
250 | * AbstractUploader::FinalizeStreamedUpload() for this to happen. | ||
251 | * | ||
252 | * @param handle Pointer to a previously acquired UploadStreamHandle | ||
253 | * @param content_hash the content hash of the full uploaded data Chunk | ||
254 | */ | ||
255 | 250636 | void ScheduleCommit( | |
256 | UploadStreamHandle *handle, | ||
257 | const shash::Any &content_hash) | ||
258 | { | ||
259 | 250636 | ++jobs_in_flight_; | |
260 |
1/2✓ Branch 2 taken 250692 times.
✗ Branch 3 not taken.
|
250711 | tubes_upload_.Dispatch(new UploadJob(handle, content_hash)); |
261 | 250699 | } | |
262 | |||
263 | /** | ||
264 | * Removes a file from the backend storage. | ||
265 | * | ||
266 | * Note: If the file doesn't exist before calling this won't be an error. | ||
267 | * | ||
268 | * @param file_to_delete path to the file to be removed | ||
269 | */ | ||
270 | 364 | void RemoveAsync(const std::string &file_to_delete) { | |
271 | 364 | ++jobs_in_flight_; | |
272 | 364 | DoRemoveAsync(file_to_delete); | |
273 | 364 | } | |
274 | |||
275 | /** | ||
276 | * Overloaded method used to remove a object based on its content hash. | ||
277 | * | ||
278 | * @param hash_to_delete the content hash of a file to be deleted | ||
279 | */ | ||
280 | 362 | void RemoveAsync(const shash::Any &hash_to_delete) { | |
281 |
2/4✓ Branch 2 taken 362 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 362 times.
✗ Branch 6 not taken.
|
362 | RemoveAsync("data/" + hash_to_delete.MakePath()); |
282 | 362 | } | |
283 | |||
284 | /** | ||
285 | * Get object size based on its content hash | ||
286 | * | ||
287 | * @param hash the content hash of a file | ||
288 | */ | ||
289 | ✗ | int64_t GetObjectSize(const shash::Any &hash) { | |
290 | ✗ | return DoGetObjectSize("data/" + hash.MakePath()); | |
291 | } | ||
292 | |||
293 | /** | ||
294 | * Checks if a file is already present in the backend storage. This might be a | ||
295 | * synchronous operation. | ||
296 | * | ||
297 | * @param path the path of the file to be checked | ||
298 | * @return true if the file was found in the backend storage | ||
299 | */ | ||
300 | virtual bool Peek(const std::string &path) = 0; | ||
301 | |||
302 | /** | ||
303 | * Make directory in upstream storage. Noop if directory already present. | ||
304 | * | ||
305 | * @param path relative directory path in the upstream storage | ||
306 | * @return true if the directory was successfully created or already present | ||
307 | */ | ||
308 | virtual bool Mkdir(const std::string &path) = 0; | ||
309 | |||
310 | /** | ||
311 | * Creates a top-level shortcut to the given data object. This is particularly | ||
312 | * useful for bootstrapping repositories whose data-directory is secured by | ||
313 | * a VOMS certificate. | ||
314 | * | ||
315 | * @param object content hash of the object to be exposed on the top-level | ||
316 | * @return true on success | ||
317 | */ | ||
318 | virtual bool PlaceBootstrappingShortcut(const shash::Any &object) = 0; | ||
319 | |||
320 | /** | ||
321 | * Waits until the current upload queue is empty. | ||
322 | * | ||
323 | * Note: This does NOT necessarily mean, that all files are actually uploaded. | ||
324 | * If new jobs are concurrently scheduled the behavior of this method is | ||
325 | * not defined (it returns also on intermediately empty queues) | ||
326 | */ | ||
327 | virtual void WaitForUpload() const; | ||
328 | |||
329 | virtual unsigned int GetNumberOfErrors() const = 0; | ||
330 | static void RegisterPlugins(); | ||
331 | void InitCounters(perf::StatisticsTemplate *statistics); | ||
332 | |||
333 | protected: | ||
334 | typedef Callbackable<UploaderResults>::CallbackTN *CallbackPtr; | ||
335 | |||
336 | explicit AbstractUploader(const SpoolerDefinition &spooler_definition); | ||
337 | |||
338 | /** | ||
339 | * Implementation of plain file upload | ||
340 | * Public interface: AbstractUploader::Upload() | ||
341 | * | ||
342 | * @param local_path file to be uploaded | ||
343 | * @param remote_path destination to be written in the backend | ||
344 | * @param callback callback to be called on completion | ||
345 | */ | ||
346 | virtual void DoUpload(const std::string &remote_path, | ||
347 | IngestionSource *source, | ||
348 | const CallbackTN *callback) = 0; | ||
349 | |||
350 | /** | ||
351 | * Implementation of a streamed upload step. See public interface for details. | ||
352 | * Public interface: AbstractUploader::ScheduleUpload() | ||
353 | * | ||
354 | * @param handle descendant of UploadStreamHandle specifying the stream | ||
355 | * @param buffer the CharBuffer to be uploaded to the stream | ||
356 | * @param callback callback to be called on completion | ||
357 | */ | ||
358 | virtual void StreamedUpload(UploadStreamHandle *handle, | ||
359 | UploadBuffer buffer, | ||
360 | const CallbackTN *callback) = 0; | ||
361 | |||
362 | /** | ||
363 | * Implementation of streamed upload commit | ||
364 | * Public interface: AbstractUploader::ScheduleUpload() | ||
365 | * | ||
366 | * @param handle descendant of UploadStreamHandle specifying the stream | ||
367 | * @param content_hash the computed content hash of the streamed object | ||
368 | */ | ||
369 | virtual void FinalizeStreamedUpload(UploadStreamHandle *handle, | ||
370 | const shash::Any &content_hash) = 0; | ||
371 | |||
372 | |||
373 | virtual void DoRemoveAsync(const std::string &file_to_delete) = 0; | ||
374 | |||
375 | virtual int64_t DoGetObjectSize(const std::string &file_name) = 0; | ||
376 | |||
377 | /** | ||
378 | * This notifies the callback that is associated to a finishing job. Please | ||
379 | * do not call the handed callback yourself in concrete Uploaders! | ||
380 | * | ||
381 | * Note: Since the job is finished after we respond to it, the callback object | ||
382 | * gets automatically destroyed by this call! | ||
383 | * Therefore you must not call Respond() twice or use the callback later | ||
384 | * by any means! | ||
385 | */ | ||
386 | 672635 | void Respond(const CallbackTN *callback, | |
387 | const UploaderResults &result) const | ||
388 | { | ||
389 |
2/2✓ Branch 0 taken 672271 times.
✓ Branch 1 taken 364 times.
|
672635 | if (callback != NULL) { |
390 | 672271 | (*callback)(result); | |
391 |
1/2✓ Branch 0 taken 672268 times.
✗ Branch 1 not taken.
|
672268 | delete callback; |
392 | } | ||
393 | |||
394 | 672634 | --jobs_in_flight_; | |
395 | 672635 | } | |
396 | |||
397 | /** | ||
398 | * Creates a temporary file in the backend storage's temporary location | ||
399 | * For the LocalUploader this usually is the 'txn' directory of the backend | ||
400 | * storage. Otherwise it is some scratch area. | ||
401 | * | ||
402 | * @param path pointer to a string that will contain the created file path | ||
403 | * @return a file descriptor to the opened file | ||
404 | */ | ||
405 | int CreateAndOpenTemporaryChunkFile(std::string *path) const; | ||
406 | |||
407 | 608 | const SpoolerDefinition &spooler_definition() const { | |
408 | 608 | return spooler_definition_; | |
409 | } | ||
410 | |||
411 | void CountUploadedChunks() const; | ||
412 | void DecUploadedChunks() const; | ||
413 | void CountUploadedBytes(int64_t bytes_written) const; | ||
414 | void CountDuplicates() const; | ||
415 | void CountUploadedCatalogs() const; | ||
416 | void CountUploadedCatalogBytes(int64_t bytes_written) const; | ||
417 | |||
418 | protected: | ||
419 | /** | ||
420 | * Used by concrete implementations when they use callbacks where it's not | ||
421 | * already foreseen, e.g. S3Uploader::Peek(). | ||
422 | */ | ||
423 | 5 | void IncJobsInFlight() { | |
424 | 5 | ++jobs_in_flight_; | |
425 | 5 | } | |
426 | |||
427 | private: | ||
428 | const SpoolerDefinition spooler_definition_; | ||
429 | |||
430 | /** | ||
431 | * Number of threads used for I/O write calls. Effectively this parameter | ||
432 | * sets the I/O depth. Defaults to 1. | ||
433 | */ | ||
434 | unsigned num_upload_tasks_; | ||
435 | mutable SynchronizingCounter<int32_t> jobs_in_flight_; | ||
436 | TubeGroup<UploadJob> tubes_upload_; | ||
437 | TubeConsumerGroup<UploadJob> tasks_upload_; | ||
438 | mutable UniquePtr<UploadCounters> counters_; | ||
439 | }; // class AbstractUploader | ||
440 | |||
441 | |||
442 | /** | ||
443 | * The actual writing is multi-threaded. | ||
444 | */ | ||
445 | class TaskUpload : public TubeConsumer<AbstractUploader::UploadJob> { | ||
446 | public: | ||
447 | 138 | explicit TaskUpload( | |
448 | AbstractUploader *uploader, | ||
449 | Tube<AbstractUploader::UploadJob> *tube) | ||
450 | 138 | : TubeConsumer<AbstractUploader::UploadJob>(tube) | |
451 | 138 | , uploader_(uploader) | |
452 | 138 | { } | |
453 | |||
454 | protected: | ||
455 | virtual void Process(AbstractUploader::UploadJob *upload_job); | ||
456 | |||
457 | private: | ||
458 | AbstractUploader *uploader_; | ||
459 | }; | ||
460 | |||
461 | |||
462 | /** | ||
463 | * Each implementation of AbstractUploader must provide its own derivate of the | ||
464 | * UploadStreamHandle that is supposed to contain state information for the | ||
465 | * streamed upload of one specific chunk. | ||
466 | * Each UploadStreamHandle contains a callback object that is invoked as soon as | ||
467 | * the streamed upload is committed. | ||
468 | */ | ||
469 | struct UploadStreamHandle { | ||
470 | typedef AbstractUploader::CallbackTN CallbackTN; | ||
471 | static atomic_int64 g_upload_stream_tag; | ||
472 | |||
473 | 250637 | explicit UploadStreamHandle(const CallbackTN *commit_callback) | |
474 | 250637 | : commit_callback(commit_callback) | |
475 | 250637 | , tag(atomic_xadd64(&g_upload_stream_tag, 1)) {} | |
476 | 501468 | virtual ~UploadStreamHandle() {} | |
477 | |||
478 | const CallbackTN *commit_callback; | ||
479 | |||
480 | int64_t tag; | ||
481 | |||
482 | std::string remote_path; // override remote location of the object | ||
483 | }; | ||
484 | |||
485 | } // namespace upload | ||
486 | |||
487 | #endif // CVMFS_UPLOAD_FACILITY_H_ | ||
488 |