Directory: | cvmfs/ |
---|---|
File: | cvmfs/upload_facility.h |
Date: | 2025-06-22 02:36:02 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 64 | 66 | 97.0% |
Branches: | 30 | 62 | 48.4% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #ifndef CVMFS_UPLOAD_FACILITY_H_ | ||
6 | #define CVMFS_UPLOAD_FACILITY_H_ | ||
7 | |||
8 | #include <fcntl.h> | ||
9 | #include <stdint.h> | ||
10 | |||
11 | #include <string> | ||
12 | |||
13 | #include "ingestion/ingestion_source.h" | ||
14 | #include "ingestion/task.h" | ||
15 | #include "repository_tag.h" | ||
16 | #include "statistics.h" | ||
17 | #include "upload_spooler_definition.h" | ||
18 | #include "util/atomic.h" | ||
19 | #include "util/concurrency.h" | ||
20 | #include "util/posix.h" | ||
21 | #include "util/tube.h" | ||
22 | |||
23 | namespace upload { | ||
24 | |||
25 | struct UploadCounters { | ||
26 | perf::Counter *n_chunks_added; | ||
27 | perf::Counter *n_chunks_duplicated; | ||
28 | perf::Counter *n_catalogs_added; | ||
29 | perf::Counter *sz_uploaded_bytes; | ||
30 | perf::Counter *sz_uploaded_catalog_bytes; | ||
31 | |||
32 | 98 | explicit UploadCounters(perf::StatisticsTemplate statistics) { | |
33 |
3/6✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 98 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 98 times.
✗ Branch 10 not taken.
|
98 | n_chunks_added = statistics.RegisterOrLookupTemplated( |
34 | "n_chunks_added", "Number of new chunks added"); | ||
35 |
3/6✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 98 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 98 times.
✗ Branch 10 not taken.
|
98 | n_chunks_duplicated = statistics.RegisterOrLookupTemplated( |
36 | "n_chunks_duplicated", "Number of duplicated chunks added"); | ||
37 |
3/6✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 98 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 98 times.
✗ Branch 10 not taken.
|
98 | n_catalogs_added = statistics.RegisterOrLookupTemplated( |
38 | "n_catalogs_added", "Number of new catalogs added"); | ||
39 |
3/6✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 98 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 98 times.
✗ Branch 10 not taken.
|
98 | sz_uploaded_bytes = statistics.RegisterOrLookupTemplated( |
40 | "sz_uploaded_bytes", "Number of uploaded bytes"); | ||
41 |
3/6✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 98 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 98 times.
✗ Branch 10 not taken.
|
98 | sz_uploaded_catalog_bytes = statistics.RegisterOrLookupTemplated( |
42 | "sz_uploaded_catalog_bytes", "Number of uploaded bytes for catalogs"); | ||
43 | 98 | } | |
44 | }; // UploadCounters | ||
45 | |||
46 | struct UploaderResults { | ||
47 | enum Type { | ||
48 | kFileUpload, | ||
49 | kBufferUpload, | ||
50 | kChunkCommit, | ||
51 | kRemove, | ||
52 | kLookup | ||
53 | }; | ||
54 | |||
55 | 63829 | UploaderResults(const int return_code, const std::string &local_path) | |
56 | 63829 | : type(kFileUpload), return_code(return_code), local_path(local_path) { } | |
57 | |||
58 | 1212430 | explicit UploaderResults(Type t, const int return_code) | |
59 |
1/2✓ Branch 2 taken 1212320 times.
✗ Branch 3 not taken.
|
1212430 | : type(t), return_code(return_code), local_path("") { } |
60 | |||
61 | 7845 | UploaderResults() : type(kRemove), return_code(0) { } | |
62 | |||
63 | const Type type; | ||
64 | const int return_code; | ||
65 | const std::string local_path; | ||
66 | }; | ||
67 | |||
68 | struct UploadStreamHandle; | ||
69 | |||
70 | /** | ||
71 | * Abstract base class for all backend upload facilities | ||
72 | * This class defines an interface and constructs the concrete Uploaders, | ||
73 | * furthermore it handles callbacks to the outside world to notify users of done | ||
74 | * upload jobs. | ||
75 | * | ||
76 | * Note: Users could be both the Spooler (when calling Spooler::Upload()) and | ||
77 | * the IngestionPipeline (when calling Spooler::Process()). We therefore | ||
78 | * cannot use the Observable template here, since this would forward | ||
79 | * finished upload jobs to ALL listeners instead of only the owner of the | ||
80 | * specific job. | ||
81 | */ | ||
82 | class AbstractUploader | ||
83 | : public PolymorphicConstruction<AbstractUploader, SpoolerDefinition>, | ||
84 | public Callbackable<UploaderResults>, | ||
85 | public SingleCopy { | ||
86 | friend class TaskUpload; | ||
87 | |||
88 | public: | ||
89 | /** | ||
90 | * A read-only memory block that is supposed to be written out. | ||
91 | */ | ||
92 | struct UploadBuffer { | ||
93 | 262519 | UploadBuffer() : size(0), data(NULL) { } | |
94 | 896250 | UploadBuffer(uint64_t s, const void *d) : size(s), data(d) { } | |
95 | uint64_t size; | ||
96 | const void *data; | ||
97 | }; | ||
98 | |||
99 | struct JobStatus { | ||
100 | enum State { | ||
101 | kOk, | ||
102 | kTerminate, | ||
103 | kNoJobs | ||
104 | }; | ||
105 | }; | ||
106 | |||
107 | struct UploadJob { | ||
108 | enum Type { | ||
109 | Upload, | ||
110 | Commit, | ||
111 | Terminate | ||
112 | }; | ||
113 | |||
114 | UploadJob(UploadStreamHandle *handle, UploadBuffer buffer, | ||
115 | const CallbackTN *callback = NULL); | ||
116 | UploadJob(UploadStreamHandle *handle, const shash::Any &content_hash); | ||
117 | |||
118 | 3458 | UploadJob() | |
119 | 3458 | : type(Terminate) | |
120 | 3458 | , stream_handle(NULL) | |
121 | 3458 | , tag_(0) | |
122 | 3458 | , buffer() | |
123 | 3458 | , callback(NULL) { } | |
124 | |||
125 |
1/2✓ Branch 2 taken 3458 times.
✗ Branch 3 not taken.
|
3458 | static UploadJob *CreateQuitBeacon() { return new UploadJob(); } |
126 | 1158948 | bool IsQuitBeacon() { return type == Terminate; } | |
127 | |||
128 | Type type; | ||
129 | UploadStreamHandle *stream_handle; | ||
130 | /** | ||
131 | * Ensure that upload jobs belonging to the same file end up in the same | ||
132 | * upload task queue. | ||
133 | */ | ||
134 | int64_t tag_; | ||
135 | 55986 | int64_t tag() { return tag_; } | |
136 | |||
137 | // type==Upload specific fields | ||
138 | UploadBuffer buffer; | ||
139 | const CallbackTN *callback; | ||
140 | |||
141 | // type==Commit specific fields | ||
142 | shash::Any content_hash; | ||
143 | }; | ||
144 | |||
145 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 2714 times.
|
5428 | virtual ~AbstractUploader() { assert(!tasks_upload_.is_active()); } |
146 | |||
147 | /** | ||
148 | * A string identifying the uploader type | ||
149 | */ | ||
150 | virtual std::string name() const = 0; | ||
151 | |||
152 | /** | ||
153 | * Initializes a new repository storage area, e.g. create directory layout | ||
154 | * for local backend or create bucket for S3 backend. | ||
155 | */ | ||
156 | virtual bool Create() = 0; | ||
157 | |||
158 | /** | ||
159 | * Concrete uploaders might want to use a customized setting for multi-stream | ||
160 | * writing, for instance one per disk. Note that the S3 backend uses one task | ||
161 | * but this one task uses internally multiple HTTP streams through curl async | ||
162 | * I/O. | ||
163 | */ | ||
164 | 6174 | virtual unsigned GetNumTasks() const { return num_upload_tasks_; } | |
165 | |||
166 | /** | ||
167 | * This is called right after the constructor of AbstractUploader or/and its | ||
168 | * derived class has been executed. You can override that to do additional | ||
169 | * initialization that cannot be done in the constructor itself. | ||
170 | * | ||
171 | * @return true on successful initialization | ||
172 | */ | ||
173 | virtual bool Initialize(); | ||
174 | |||
175 | /** | ||
176 | * Called during Spooler::WaitForUpload(), to ensure that the upload has | ||
177 | * finished. If commit == true, then a Commit request is also sent, to apply | ||
178 | * all the the changes accumulated during the session. "catalog_path" | ||
179 | * represents the path of the root catalog with the changes. | ||
180 | * By default it is a noop and returns true; | ||
181 | */ | ||
182 | virtual bool FinalizeSession(bool commit, const std::string &old_root_hash, | ||
183 | const std::string &new_root_hash, | ||
184 | const RepositoryTag &tag); | ||
185 | |||
186 | /** | ||
187 | * This must be called right before the destruction of the AbstractUploader! | ||
188 | * You are _not_ supposed to overwrite this method in your concrete Uploader. | ||
189 | */ | ||
190 | void TearDown(); | ||
191 | |||
192 | /** | ||
193 | * Uploads the file at the path local_path into the backend storage under the | ||
194 | * path remote_path. When the upload has finished it calls callback. | ||
195 | * Note: This method might be implemented in a synchronous way. | ||
196 | * | ||
197 | * @param local_path path to the file to be uploaded | ||
198 | * @param remote_path desired path for the file in the backend storage | ||
199 | * @param callback (optional) gets notified when the upload was finished | ||
200 | */ | ||
201 | 32333 | void UploadFile(const std::string &local_path, | |
202 | const std::string &remote_path, | ||
203 | const CallbackTN *callback = NULL) { | ||
204 |
1/2✓ Branch 1 taken 32333 times.
✗ Branch 2 not taken.
|
32333 | ++jobs_in_flight_; |
205 |
1/2✓ Branch 1 taken 32333 times.
✗ Branch 2 not taken.
|
32333 | FileIngestionSource source(local_path); |
206 |
1/2✓ Branch 1 taken 32333 times.
✗ Branch 2 not taken.
|
32333 | DoUpload(remote_path, &source, callback); |
207 | 32333 | } | |
208 | |||
209 | 62 | void UploadIngestionSource(const std::string &remote_path, | |
210 | IngestionSource *source, | ||
211 | const CallbackTN *callback = NULL) { | ||
212 | 62 | ++jobs_in_flight_; | |
213 | 62 | DoUpload(remote_path, source, callback); | |
214 | 62 | } | |
215 | |||
216 | /** | ||
217 | * This method is called before the first data block of a streamed upload is | ||
218 | * scheduled (see above implementation of UploadStreamHandle for details). | ||
219 | * | ||
220 | * @param callback (optional) this callback will be invoked once this parti- | ||
221 | * cular streamed upload is committed. | ||
222 | * @return a pointer to the initialized UploadStreamHandle | ||
223 | */ | ||
224 | virtual UploadStreamHandle *InitStreamedUpload( | ||
225 | const CallbackTN *callback) = 0; | ||
226 | |||
227 | /** | ||
228 | * This method schedules a buffer to be uploaded in the context of the | ||
229 | * given UploadStreamHandle. The actual upload will happen asynchronously by | ||
230 | * a concrete implementation of AbstractUploader | ||
231 | * (see AbstractUploader::StreamedUpload()). | ||
232 | * As soon has the scheduled upload job is complete (either successful or not) | ||
233 | * the optionally passed callback is supposed to be invoked using | ||
234 | * AbstractUploader::Respond(). | ||
235 | * | ||
236 | * @param handle Pointer to a previously acquired UploadStreamHandle | ||
237 | * @param buffer contains the data block to be uploaded | ||
238 | * @param callback (optional) callback object to be invoked once the given | ||
239 | * upload is finished (see AbstractUploader::Respond()) | ||
240 | */ | ||
241 | 896261 | void ScheduleUpload(UploadStreamHandle *handle, | |
242 | UploadBuffer buffer, | ||
243 | const CallbackTN *callback = NULL) { | ||
244 | 896261 | ++jobs_in_flight_; | |
245 |
3/6✓ Branch 1 taken 896476 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 896439 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 896487 times.
✗ Branch 8 not taken.
|
896474 | tubes_upload_.Dispatch(new UploadJob(handle, buffer, callback)); |
246 | 896487 | } | |
247 | |||
248 | /** | ||
249 | * This method schedules a commit job as soon as all data blocks of a streamed | ||
250 | * upload are (successfully) uploaded. Derived classes must override | ||
251 | * AbstractUploader::FinalizeStreamedUpload() for this to happen. | ||
252 | * | ||
253 | * @param handle Pointer to a previously acquired UploadStreamHandle | ||
254 | * @param content_hash the content hash of the full uploaded data Chunk | ||
255 | */ | ||
256 | 258981 | void ScheduleCommit(UploadStreamHandle *handle, | |
257 | const shash::Any &content_hash) { | ||
258 | 258981 | ++jobs_in_flight_; | |
259 |
1/2✓ Branch 2 taken 259009 times.
✗ Branch 3 not taken.
|
259016 | tubes_upload_.Dispatch(new UploadJob(handle, content_hash)); |
260 | 259017 | } | |
261 | |||
262 | /** | ||
263 | * Removes a file from the backend storage. | ||
264 | * | ||
265 | * Note: If the file doesn't exist before calling this won't be an error. | ||
266 | * | ||
267 | * @param file_to_delete path to the file to be removed | ||
268 | */ | ||
269 | 7845 | void RemoveAsync(const std::string &file_to_delete) { | |
270 | 7845 | ++jobs_in_flight_; | |
271 | 7845 | DoRemoveAsync(file_to_delete); | |
272 | 7845 | } | |
273 | |||
274 | /** | ||
275 | * Overloaded method used to remove a object based on its content hash. | ||
276 | * | ||
277 | * @param hash_to_delete the content hash of a file to be deleted | ||
278 | */ | ||
279 | 7783 | void RemoveAsync(const shash::Any &hash_to_delete) { | |
280 |
2/4✓ Branch 2 taken 7783 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 7783 times.
✗ Branch 6 not taken.
|
7783 | RemoveAsync("data/" + hash_to_delete.MakePath()); |
281 | 7783 | } | |
282 | |||
283 | /** | ||
284 | * Get object size based on its content hash | ||
285 | * | ||
286 | * @param hash the content hash of a file | ||
287 | */ | ||
288 | ✗ | int64_t GetObjectSize(const shash::Any &hash) { | |
289 | ✗ | return DoGetObjectSize("data/" + hash.MakePath()); | |
290 | } | ||
291 | |||
292 | /** | ||
293 | * Checks if a file is already present in the backend storage. This might be a | ||
294 | * synchronous operation. | ||
295 | * | ||
296 | * @param path the path of the file to be checked | ||
297 | * @return true if the file was found in the backend storage | ||
298 | */ | ||
299 | virtual bool Peek(const std::string &path) = 0; | ||
300 | |||
301 | /** | ||
302 | * Make directory in upstream storage. Noop if directory already present. | ||
303 | * | ||
304 | * @param path relative directory path in the upstream storage | ||
305 | * @return true if the directory was successfully created or already present | ||
306 | */ | ||
307 | virtual bool Mkdir(const std::string &path) = 0; | ||
308 | |||
309 | /** | ||
310 | * Creates a top-level shortcut to the given data object. This is particularly | ||
311 | * useful for bootstrapping repositories whose data-directory is secured by | ||
312 | * a VOMS certificate. | ||
313 | * | ||
314 | * @param object content hash of the object to be exposed on the top-level | ||
315 | * @return true on success | ||
316 | */ | ||
317 | virtual bool PlaceBootstrappingShortcut(const shash::Any &object) = 0; | ||
318 | |||
319 | /** | ||
320 | * Waits until the current upload queue is empty. | ||
321 | * | ||
322 | * Note: This does NOT necessarily mean, that all files are actually uploaded. | ||
323 | * If new jobs are concurrently scheduled the behavior of this method is | ||
324 | * not defined (it returns also on intermediately empty queues) | ||
325 | */ | ||
326 | virtual void WaitForUpload() const; | ||
327 | |||
328 | virtual unsigned int GetNumberOfErrors() const = 0; | ||
329 | static void RegisterPlugins(); | ||
330 | void InitCounters(perf::StatisticsTemplate *statistics); | ||
331 | |||
332 | protected: | ||
333 | typedef Callbackable<UploaderResults>::CallbackTN *CallbackPtr; | ||
334 | |||
335 | explicit AbstractUploader(const SpoolerDefinition &spooler_definition); | ||
336 | |||
337 | /** | ||
338 | * Implementation of plain file upload | ||
339 | * Public interface: AbstractUploader::Upload() | ||
340 | * | ||
341 | * @param local_path file to be uploaded | ||
342 | * @param remote_path destination to be written in the backend | ||
343 | * @param callback callback to be called on completion | ||
344 | */ | ||
345 | virtual void DoUpload(const std::string &remote_path, | ||
346 | IngestionSource *source, | ||
347 | const CallbackTN *callback) = 0; | ||
348 | |||
349 | /** | ||
350 | * Implementation of a streamed upload step. See public interface for details. | ||
351 | * Public interface: AbstractUploader::ScheduleUpload() | ||
352 | * | ||
353 | * @param handle descendant of UploadStreamHandle specifying the stream | ||
354 | * @param buffer the CharBuffer to be uploaded to the stream | ||
355 | * @param callback callback to be called on completion | ||
356 | */ | ||
357 | virtual void StreamedUpload(UploadStreamHandle *handle, | ||
358 | UploadBuffer buffer, | ||
359 | const CallbackTN *callback) = 0; | ||
360 | |||
361 | /** | ||
362 | * Implementation of streamed upload commit | ||
363 | * Public interface: AbstractUploader::ScheduleUpload() | ||
364 | * | ||
365 | * @param handle descendant of UploadStreamHandle specifying the stream | ||
366 | * @param content_hash the computed content hash of the streamed object | ||
367 | */ | ||
368 | virtual void FinalizeStreamedUpload(UploadStreamHandle *handle, | ||
369 | const shash::Any &content_hash) = 0; | ||
370 | |||
371 | |||
372 | virtual void DoRemoveAsync(const std::string &file_to_delete) = 0; | ||
373 | |||
374 | virtual int64_t DoGetObjectSize(const std::string &file_name) = 0; | ||
375 | |||
376 | /** | ||
377 | * This notifies the callback that is associated to a finishing job. Please | ||
378 | * do not call the handed callback yourself in concrete Uploaders! | ||
379 | * | ||
380 | * Note: Since the job is finished after we respond to it, the callback object | ||
381 | * gets automatically destroyed by this call! | ||
382 | * Therefore you must not call Respond() twice or use the callback later | ||
383 | * by any means! | ||
384 | */ | ||
385 | 1195848 | void Respond(const CallbackTN *callback, | |
386 | const UploaderResults &result) const { | ||
387 |
2/2✓ Branch 0 taken 1188003 times.
✓ Branch 1 taken 7845 times.
|
1195848 | if (callback != NULL) { |
388 | 1188003 | (*callback)(result); | |
389 |
1/2✓ Branch 0 taken 1188003 times.
✗ Branch 1 not taken.
|
1188003 | delete callback; |
390 | } | ||
391 | |||
392 | 1195848 | --jobs_in_flight_; | |
393 | 1195848 | } | |
394 | |||
395 | /** | ||
396 | * Creates a temporary file in the backend storage's temporary location | ||
397 | * For the LocalUploader this usually is the 'txn' directory of the backend | ||
398 | * storage. Otherwise it is some scratch area. | ||
399 | * | ||
400 | * @param path pointer to a string that will contain the created file path | ||
401 | * @return a file descriptor to the opened file | ||
402 | */ | ||
403 | int CreateAndOpenTemporaryChunkFile(std::string *path) const; | ||
404 | |||
405 | 8512 | const SpoolerDefinition &spooler_definition() const { | |
406 | 8512 | return spooler_definition_; | |
407 | } | ||
408 | |||
409 | void CountUploadedChunks() const; | ||
410 | void DecUploadedChunks() const; | ||
411 | void CountUploadedBytes(int64_t bytes_written) const; | ||
412 | void CountDuplicates() const; | ||
413 | void CountUploadedCatalogs() const; | ||
414 | void CountUploadedCatalogBytes(int64_t bytes_written) const; | ||
415 | |||
416 | protected: | ||
417 | /** | ||
418 | * Used by concrete implementations when they use callbacks where it's not | ||
419 | * already foreseen, e.g. S3Uploader::Peek(). | ||
420 | */ | ||
421 | 70 | void IncJobsInFlight() { ++jobs_in_flight_; } | |
422 | |||
423 | private: | ||
424 | const SpoolerDefinition spooler_definition_; | ||
425 | |||
426 | /** | ||
427 | * Number of threads used for I/O write calls. Effectively this parameter | ||
428 | * sets the I/O depth. Defaults to 1. | ||
429 | */ | ||
430 | unsigned num_upload_tasks_; | ||
431 | mutable SynchronizingCounter<int32_t> jobs_in_flight_; | ||
432 | TubeGroup<UploadJob> tubes_upload_; | ||
433 | TubeConsumerGroup<UploadJob> tasks_upload_; | ||
434 | mutable UniquePtr<UploadCounters> counters_; | ||
435 | }; // class AbstractUploader | ||
436 | |||
437 | |||
438 | /** | ||
439 | * The actual writing is multi-threaded. | ||
440 | */ | ||
441 | class TaskUpload : public TubeConsumer<AbstractUploader::UploadJob> { | ||
442 | public: | ||
443 | 3459 | explicit TaskUpload(AbstractUploader *uploader, | |
444 | Tube<AbstractUploader::UploadJob> *tube) | ||
445 | 3459 | : TubeConsumer<AbstractUploader::UploadJob>(tube), uploader_(uploader) { } | |
446 | |||
447 | protected: | ||
448 | virtual void Process(AbstractUploader::UploadJob *upload_job); | ||
449 | |||
450 | private: | ||
451 | AbstractUploader *uploader_; | ||
452 | }; | ||
453 | |||
454 | |||
455 | /** | ||
456 | * Each implementation of AbstractUploader must provide its own derivate of the | ||
457 | * UploadStreamHandle that is supposed to contain state information for the | ||
458 | * streamed upload of one specific chunk. | ||
459 | * Each UploadStreamHandle contains a callback object that is invoked as soon as | ||
460 | * the streamed upload is committed. | ||
461 | */ | ||
462 | struct UploadStreamHandle { | ||
463 | typedef AbstractUploader::CallbackTN CallbackTN; | ||
464 | static atomic_int64 g_upload_stream_tag; | ||
465 | |||
466 | 258996 | explicit UploadStreamHandle(const CallbackTN *commit_callback) | |
467 | 258996 | : commit_callback(commit_callback) | |
468 | 258996 | , tag(atomic_xadd64(&g_upload_stream_tag, 1)) { } | |
469 | 518046 | virtual ~UploadStreamHandle() { } | |
470 | |||
471 | const CallbackTN *commit_callback; | ||
472 | |||
473 | int64_t tag; | ||
474 | |||
475 | std::string remote_path; // override remote location of the object | ||
476 | }; | ||
477 | |||
478 | } // namespace upload | ||
479 | |||
480 | #endif // CVMFS_UPLOAD_FACILITY_H_ | ||
481 |