GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/upload_facility.h
Date: 2025-06-29 02:35:41
Exec Total Coverage
Lines: 64 66 97.0%
Branches: 30 62 48.4%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_UPLOAD_FACILITY_H_
6 #define CVMFS_UPLOAD_FACILITY_H_
7
8 #include <fcntl.h>
9 #include <stdint.h>
10
11 #include <string>
12
13 #include "ingestion/ingestion_source.h"
14 #include "ingestion/task.h"
15 #include "repository_tag.h"
16 #include "statistics.h"
17 #include "upload_spooler_definition.h"
18 #include "util/atomic.h"
19 #include "util/concurrency.h"
20 #include "util/pointer.h"
21 #include "util/posix.h"
22 #include "util/tube.h"
23
24 namespace upload {
25
26 struct UploadCounters {
27 perf::Counter *n_chunks_added;
28 perf::Counter *n_chunks_duplicated;
29 perf::Counter *n_catalogs_added;
30 perf::Counter *sz_uploaded_bytes;
31 perf::Counter *sz_uploaded_catalog_bytes;
32
33 4 explicit UploadCounters(perf::StatisticsTemplate statistics) {
34
3/6
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 4 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 4 times.
✗ Branch 10 not taken.
4 n_chunks_added = statistics.RegisterOrLookupTemplated(
35 "n_chunks_added", "Number of new chunks added");
36
3/6
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 4 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 4 times.
✗ Branch 10 not taken.
4 n_chunks_duplicated = statistics.RegisterOrLookupTemplated(
37 "n_chunks_duplicated", "Number of duplicated chunks added");
38
3/6
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 4 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 4 times.
✗ Branch 10 not taken.
4 n_catalogs_added = statistics.RegisterOrLookupTemplated(
39 "n_catalogs_added", "Number of new catalogs added");
40
3/6
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 4 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 4 times.
✗ Branch 10 not taken.
4 sz_uploaded_bytes = statistics.RegisterOrLookupTemplated(
41 "sz_uploaded_bytes", "Number of uploaded bytes");
42
3/6
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 4 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 4 times.
✗ Branch 10 not taken.
4 sz_uploaded_catalog_bytes = statistics.RegisterOrLookupTemplated(
43 "sz_uploaded_catalog_bytes", "Number of uploaded bytes for catalogs");
44 4 }
45 }; // UploadCounters
46
47 struct UploaderResults {
48 enum Type {
49 kFileUpload,
50 kBufferUpload,
51 kChunkCommit,
52 kRemove,
53 kLookup
54 };
55
56 63338 UploaderResults(const int return_code, const std::string &local_path)
57 63338 : type(kFileUpload), return_code(return_code), local_path(local_path) { }
58
59 26448064 explicit UploaderResults(Type t, const int return_code)
60
1/2
✓ Branch 2 taken 26448002 times.
✗ Branch 3 not taken.
26448064 : type(t), return_code(return_code), local_path("") { }
61
62 10198 UploaderResults() : type(kRemove), return_code(0) { }
63
64 const Type type;
65 const int return_code;
66 const std::string local_path;
67 };
68
69 struct UploadStreamHandle;
70
71 /**
72 * Abstract base class for all backend upload facilities
73 * This class defines an interface and constructs the concrete Uploaders,
74 * furthermore it handles callbacks to the outside world to notify users of done
75 * upload jobs.
76 *
77 * Note: Users could be both the Spooler (when calling Spooler::Upload()) and
78 * the IngestionPipeline (when calling Spooler::Process()). We therefore
79 * cannot use the Observable template here, since this would forward
80 * finished upload jobs to ALL listeners instead of only the owner of the
81 * specific job.
82 */
83 class AbstractUploader
84 : public PolymorphicConstruction<AbstractUploader, SpoolerDefinition>,
85 public Callbackable<UploaderResults>,
86 public SingleCopy {
87 friend class TaskUpload;
88
89 public:
90 /**
91 * A read-only memory block that is supposed to be written out.
92 */
93 struct UploadBuffer {
94 9779504 UploadBuffer() : size(0), data(NULL) { }
95 16600434 UploadBuffer(uint64_t s, const void *d) : size(s), data(d) { }
96 uint64_t size;
97 const void *data;
98 };
99
100 struct JobStatus {
101 enum State {
102 kOk,
103 kTerminate,
104 kNoJobs
105 };
106 };
107
108 struct UploadJob {
109 enum Type {
110 Upload,
111 Commit,
112 Terminate
113 };
114
115 UploadJob(UploadStreamHandle *handle, UploadBuffer buffer,
116 const CallbackTN *callback = NULL);
117 UploadJob(UploadStreamHandle *handle, const shash::Any &content_hash);
118
119 4132 UploadJob()
120 4132 : type(Terminate)
121 4132 , stream_handle(NULL)
122 4132 , tag_(0)
123 4132 , buffer()
124 4132 , callback(NULL) { }
125
126
1/2
✓ Branch 2 taken 4132 times.
✗ Branch 3 not taken.
4132 static UploadJob *CreateQuitBeacon() { return new UploadJob(); }
127 26394280 bool IsQuitBeacon() { return type == Terminate; }
128
129 Type type;
130 UploadStreamHandle *stream_handle;
131 /**
132 * Ensure that upload jobs belonging to the same file end up in the same
133 * upload task queue.
134 */
135 int64_t tag_;
136 55986 int64_t tag() { return tag_; }
137
138 // type==Upload specific fields
139 UploadBuffer buffer;
140 const CallbackTN *callback;
141
142 // type==Commit specific fields
143 shash::Any content_hash;
144 };
145
146
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3388 times.
6776 virtual ~AbstractUploader() { assert(!tasks_upload_.is_active()); }
147
148 /**
149 * A string identifying the uploader type
150 */
151 virtual std::string name() const = 0;
152
153 /**
154 * Initializes a new repository storage area, e.g. create directory layout
155 * for local backend or create bucket for S3 backend.
156 */
157 virtual bool Create() = 0;
158
159 /**
160 * Concrete uploaders might want to use a customized setting for multi-stream
161 * writing, for instance one per disk. Note that the S3 backend uses one task
162 * but this one task uses internally multiple HTTP streams through curl async
163 * I/O.
164 */
165 7522 virtual unsigned GetNumTasks() const { return num_upload_tasks_; }
166
167 /**
168 * This is called right after the constructor of AbstractUploader or/and its
169 * derived class has been executed. You can override that to do additional
170 * initialization that cannot be done in the constructor itself.
171 *
172 * @return true on successful initialization
173 */
174 virtual bool Initialize();
175
176 /**
177 * Called during Spooler::WaitForUpload(), to ensure that the upload has
178 * finished. If commit == true, then a Commit request is also sent, to apply
179 * all the the changes accumulated during the session. "catalog_path"
180 * represents the path of the root catalog with the changes.
181 * By default it is a noop and returns true;
182 */
183 virtual bool FinalizeSession(bool commit, const std::string &old_root_hash,
184 const std::string &new_root_hash,
185 const RepositoryTag &tag);
186
187 /**
188 * This must be called right before the destruction of the AbstractUploader!
189 * You are _not_ supposed to overwrite this method in your concrete Uploader.
190 */
191 void TearDown();
192
193 /**
194 * Uploads the file at the path local_path into the backend storage under the
195 * path remote_path. When the upload has finished it calls callback.
196 * Note: This method might be implemented in a synchronous way.
197 *
198 * @param local_path path to the file to be uploaded
199 * @param remote_path desired path for the file in the backend storage
200 * @param callback (optional) gets notified when the upload was finished
201 */
202 31842 void UploadFile(const std::string &local_path,
203 const std::string &remote_path,
204 const CallbackTN *callback = NULL) {
205
1/2
✓ Branch 1 taken 31842 times.
✗ Branch 2 not taken.
31842 ++jobs_in_flight_;
206
1/2
✓ Branch 1 taken 31842 times.
✗ Branch 2 not taken.
31842 FileIngestionSource source(local_path);
207
1/2
✓ Branch 1 taken 31842 times.
✗ Branch 2 not taken.
31842 DoUpload(remote_path, &source, callback);
208 31842 }
209
210 62 void UploadIngestionSource(const std::string &remote_path,
211 IngestionSource *source,
212 const CallbackTN *callback = NULL) {
213 62 ++jobs_in_flight_;
214 62 DoUpload(remote_path, source, callback);
215 62 }
216
217 /**
218 * This method is called before the first data block of a streamed upload is
219 * scheduled (see above implementation of UploadStreamHandle for details).
220 *
221 * @param callback (optional) this callback will be invoked once this parti-
222 * cular streamed upload is committed.
223 * @return a pointer to the initialized UploadStreamHandle
224 */
225 virtual UploadStreamHandle *InitStreamedUpload(
226 const CallbackTN *callback) = 0;
227
228 /**
229 * This method schedules a buffer to be uploaded in the context of the
230 * given UploadStreamHandle. The actual upload will happen asynchronously by
231 * a concrete implementation of AbstractUploader
232 * (see AbstractUploader::StreamedUpload()).
233 * As soon has the scheduled upload job is complete (either successful or not)
234 * the optionally passed callback is supposed to be invoked using
235 * AbstractUploader::Respond().
236 *
237 * @param handle Pointer to a previously acquired UploadStreamHandle
238 * @param buffer contains the data block to be uploaded
239 * @param callback (optional) callback object to be invoked once the given
240 * upload is finished (see AbstractUploader::Respond())
241 */
242 16600278 void ScheduleUpload(UploadStreamHandle *handle,
243 UploadBuffer buffer,
244 const CallbackTN *callback = NULL) {
245 16600278 ++jobs_in_flight_;
246
3/6
✓ Branch 1 taken 16612329 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 16610262 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 16612758 times.
✗ Branch 8 not taken.
16613187 tubes_upload_.Dispatch(new UploadJob(handle, buffer, callback));
247 16612758 }
248
249 /**
250 * This method schedules a commit job as soon as all data blocks of a streamed
251 * upload are (successfully) uploaded. Derived classes must override
252 * AbstractUploader::FinalizeStreamedUpload() for this to happen.
253 *
254 * @param handle Pointer to a previously acquired UploadStreamHandle
255 * @param content_hash the content hash of the full uploaded data Chunk
256 */
257 9772632 void ScheduleCommit(UploadStreamHandle *handle,
258 const shash::Any &content_hash) {
259 9772632 ++jobs_in_flight_;
260
1/2
✓ Branch 2 taken 9775323 times.
✗ Branch 3 not taken.
9775245 tubes_upload_.Dispatch(new UploadJob(handle, content_hash));
261 9774738 }
262
263 /**
264 * Removes a file from the backend storage.
265 *
266 * Note: If the file doesn't exist before calling this won't be an error.
267 *
268 * @param file_to_delete path to the file to be removed
269 */
270 10198 void RemoveAsync(const std::string &file_to_delete) {
271 10198 ++jobs_in_flight_;
272 10198 DoRemoveAsync(file_to_delete);
273 10198 }
274
275 /**
276 * Overloaded method used to remove a object based on its content hash.
277 *
278 * @param hash_to_delete the content hash of a file to be deleted
279 */
280 10136 void RemoveAsync(const shash::Any &hash_to_delete) {
281
2/4
✓ Branch 2 taken 10136 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 10136 times.
✗ Branch 6 not taken.
10136 RemoveAsync("data/" + hash_to_delete.MakePath());
282 10136 }
283
284 /**
285 * Get object size based on its content hash
286 *
287 * @param hash the content hash of a file
288 */
289 int64_t GetObjectSize(const shash::Any &hash) {
290 return DoGetObjectSize("data/" + hash.MakePath());
291 }
292
293 /**
294 * Checks if a file is already present in the backend storage. This might be a
295 * synchronous operation.
296 *
297 * @param path the path of the file to be checked
298 * @return true if the file was found in the backend storage
299 */
300 virtual bool Peek(const std::string &path) = 0;
301
302 /**
303 * Make directory in upstream storage. Noop if directory already present.
304 *
305 * @param path relative directory path in the upstream storage
306 * @return true if the directory was successfully created or already present
307 */
308 virtual bool Mkdir(const std::string &path) = 0;
309
310 /**
311 * Creates a top-level shortcut to the given data object. This is particularly
312 * useful for bootstrapping repositories whose data-directory is secured by
313 * a VOMS certificate.
314 *
315 * @param object content hash of the object to be exposed on the top-level
316 * @return true on success
317 */
318 virtual bool PlaceBootstrappingShortcut(const shash::Any &object) = 0;
319
320 /**
321 * Waits until the current upload queue is empty.
322 *
323 * Note: This does NOT necessarily mean, that all files are actually uploaded.
324 * If new jobs are concurrently scheduled the behavior of this method is
325 * not defined (it returns also on intermediately empty queues)
326 */
327 virtual void WaitForUpload() const;
328
329 virtual unsigned int GetNumberOfErrors() const = 0;
330 static void RegisterPlugins();
331 void InitCounters(perf::StatisticsTemplate *statistics);
332
333 protected:
334 typedef Callbackable<UploaderResults>::CallbackTN *CallbackPtr;
335
336 explicit AbstractUploader(const SpoolerDefinition &spooler_definition);
337
338 /**
339 * Implementation of plain file upload
340 * Public interface: AbstractUploader::Upload()
341 *
342 * @param local_path file to be uploaded
343 * @param remote_path destination to be written in the backend
344 * @param callback callback to be called on completion
345 */
346 virtual void DoUpload(const std::string &remote_path,
347 IngestionSource *source,
348 const CallbackTN *callback) = 0;
349
350 /**
351 * Implementation of a streamed upload step. See public interface for details.
352 * Public interface: AbstractUploader::ScheduleUpload()
353 *
354 * @param handle descendant of UploadStreamHandle specifying the stream
355 * @param buffer the CharBuffer to be uploaded to the stream
356 * @param callback callback to be called on completion
357 */
358 virtual void StreamedUpload(UploadStreamHandle *handle,
359 UploadBuffer buffer,
360 const CallbackTN *callback) = 0;
361
362 /**
363 * Implementation of streamed upload commit
364 * Public interface: AbstractUploader::ScheduleUpload()
365 *
366 * @param handle descendant of UploadStreamHandle specifying the stream
367 * @param content_hash the computed content hash of the streamed object
368 */
369 virtual void FinalizeStreamedUpload(UploadStreamHandle *handle,
370 const shash::Any &content_hash) = 0;
371
372
373 virtual void DoRemoveAsync(const std::string &file_to_delete) = 0;
374
375 virtual int64_t DoGetObjectSize(const std::string &file_name) = 0;
376
377 /**
378 * This notifies the callback that is associated to a finishing job. Please
379 * do not call the handed callback yourself in concrete Uploaders!
380 *
381 * Note: Since the job is finished after we respond to it, the callback object
382 * gets automatically destroyed by this call!
383 * Therefore you must not call Respond() twice or use the callback later
384 * by any means!
385 */
386 26432330 void Respond(const CallbackTN *callback,
387 const UploaderResults &result) const {
388
2/2
✓ Branch 0 taken 26422132 times.
✓ Branch 1 taken 10198 times.
26432330 if (callback != NULL) {
389 26422132 (*callback)(result);
390
1/2
✓ Branch 0 taken 26422132 times.
✗ Branch 1 not taken.
26422132 delete callback;
391 }
392
393 26432330 --jobs_in_flight_;
394 26432330 }
395
396 /**
397 * Creates a temporary file in the backend storage's temporary location
398 * For the LocalUploader this usually is the 'txn' directory of the backend
399 * storage. Otherwise it is some scratch area.
400 *
401 * @param path pointer to a string that will contain the created file path
402 * @return a file descriptor to the opened file
403 */
404 int CreateAndOpenTemporaryChunkFile(std::string *path) const;
405
406 9728 const SpoolerDefinition &spooler_definition() const {
407 9728 return spooler_definition_;
408 }
409
410 void CountUploadedChunks() const;
411 void DecUploadedChunks() const;
412 void CountUploadedBytes(int64_t bytes_written) const;
413 void CountDuplicates() const;
414 void CountUploadedCatalogs() const;
415 void CountUploadedCatalogBytes(int64_t bytes_written) const;
416
417 protected:
418 /**
419 * Used by concrete implementations when they use callbacks where it's not
420 * already foreseen, e.g. S3Uploader::Peek().
421 */
422 80 void IncJobsInFlight() { ++jobs_in_flight_; }
423
424 private:
425 const SpoolerDefinition spooler_definition_;
426
427 /**
428 * Number of threads used for I/O write calls. Effectively this parameter
429 * sets the I/O depth. Defaults to 1.
430 */
431 unsigned num_upload_tasks_;
432 mutable SynchronizingCounter<int32_t> jobs_in_flight_;
433 TubeGroup<UploadJob> tubes_upload_;
434 TubeConsumerGroup<UploadJob> tasks_upload_;
435 mutable UniquePtr<UploadCounters> counters_;
436 }; // class AbstractUploader
437
438
439 /**
440 * The actual writing is multi-threaded.
441 */
442 class TaskUpload : public TubeConsumer<AbstractUploader::UploadJob> {
443 public:
444 4133 explicit TaskUpload(AbstractUploader *uploader,
445 Tube<AbstractUploader::UploadJob> *tube)
446 4133 : TubeConsumer<AbstractUploader::UploadJob>(tube), uploader_(uploader) { }
447
448 protected:
449 virtual void Process(AbstractUploader::UploadJob *upload_job);
450
451 private:
452 AbstractUploader *uploader_;
453 };
454
455
456 /**
457 * Each implementation of AbstractUploader must provide its own derivate of the
458 * UploadStreamHandle that is supposed to contain state information for the
459 * streamed upload of one specific chunk.
460 * Each UploadStreamHandle contains a callback object that is invoked as soon as
461 * the streamed upload is committed.
462 */
463 struct UploadStreamHandle {
464 typedef AbstractUploader::CallbackTN CallbackTN;
465 static atomic_int64 g_upload_stream_tag;
466
467 9772476 explicit UploadStreamHandle(const CallbackTN *commit_callback)
468 9772476 : commit_callback(commit_callback)
469 9772476 , tag(atomic_xadd64(&g_upload_stream_tag, 1)) { }
470 19551816 virtual ~UploadStreamHandle() { }
471
472 const CallbackTN *commit_callback;
473
474 int64_t tag;
475
476 std::string remote_path; // override remote location of the object
477 };
478
479 } // namespace upload
480
481 #endif // CVMFS_UPLOAD_FACILITY_H_
482