GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/upload_facility.h
Date: 2025-06-22 02:36:02
Exec Total Coverage
Lines: 64 66 97.0%
Branches: 30 62 48.4%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_UPLOAD_FACILITY_H_
6 #define CVMFS_UPLOAD_FACILITY_H_
7
8 #include <fcntl.h>
9 #include <stdint.h>
10
11 #include <string>
12
13 #include "ingestion/ingestion_source.h"
14 #include "ingestion/task.h"
15 #include "repository_tag.h"
16 #include "statistics.h"
17 #include "upload_spooler_definition.h"
18 #include "util/atomic.h"
19 #include "util/concurrency.h"
20 #include "util/posix.h"
21 #include "util/tube.h"
22
23 namespace upload {
24
25 struct UploadCounters {
26 perf::Counter *n_chunks_added;
27 perf::Counter *n_chunks_duplicated;
28 perf::Counter *n_catalogs_added;
29 perf::Counter *sz_uploaded_bytes;
30 perf::Counter *sz_uploaded_catalog_bytes;
31
32 98 explicit UploadCounters(perf::StatisticsTemplate statistics) {
33
3/6
✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 98 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 98 times.
✗ Branch 10 not taken.
98 n_chunks_added = statistics.RegisterOrLookupTemplated(
34 "n_chunks_added", "Number of new chunks added");
35
3/6
✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 98 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 98 times.
✗ Branch 10 not taken.
98 n_chunks_duplicated = statistics.RegisterOrLookupTemplated(
36 "n_chunks_duplicated", "Number of duplicated chunks added");
37
3/6
✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 98 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 98 times.
✗ Branch 10 not taken.
98 n_catalogs_added = statistics.RegisterOrLookupTemplated(
38 "n_catalogs_added", "Number of new catalogs added");
39
3/6
✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 98 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 98 times.
✗ Branch 10 not taken.
98 sz_uploaded_bytes = statistics.RegisterOrLookupTemplated(
40 "sz_uploaded_bytes", "Number of uploaded bytes");
41
3/6
✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 98 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 98 times.
✗ Branch 10 not taken.
98 sz_uploaded_catalog_bytes = statistics.RegisterOrLookupTemplated(
42 "sz_uploaded_catalog_bytes", "Number of uploaded bytes for catalogs");
43 98 }
44 }; // UploadCounters
45
46 struct UploaderResults {
47 enum Type {
48 kFileUpload,
49 kBufferUpload,
50 kChunkCommit,
51 kRemove,
52 kLookup
53 };
54
55 63829 UploaderResults(const int return_code, const std::string &local_path)
56 63829 : type(kFileUpload), return_code(return_code), local_path(local_path) { }
57
58 1212430 explicit UploaderResults(Type t, const int return_code)
59
1/2
✓ Branch 2 taken 1212320 times.
✗ Branch 3 not taken.
1212430 : type(t), return_code(return_code), local_path("") { }
60
61 7845 UploaderResults() : type(kRemove), return_code(0) { }
62
63 const Type type;
64 const int return_code;
65 const std::string local_path;
66 };
67
68 struct UploadStreamHandle;
69
70 /**
71 * Abstract base class for all backend upload facilities
72 * This class defines an interface and constructs the concrete Uploaders,
73 * furthermore it handles callbacks to the outside world to notify users of done
74 * upload jobs.
75 *
76 * Note: Users could be both the Spooler (when calling Spooler::Upload()) and
77 * the IngestionPipeline (when calling Spooler::Process()). We therefore
78 * cannot use the Observable template here, since this would forward
79 * finished upload jobs to ALL listeners instead of only the owner of the
80 * specific job.
81 */
82 class AbstractUploader
83 : public PolymorphicConstruction<AbstractUploader, SpoolerDefinition>,
84 public Callbackable<UploaderResults>,
85 public SingleCopy {
86 friend class TaskUpload;
87
88 public:
89 /**
90 * A read-only memory block that is supposed to be written out.
91 */
92 struct UploadBuffer {
93 262519 UploadBuffer() : size(0), data(NULL) { }
94 896250 UploadBuffer(uint64_t s, const void *d) : size(s), data(d) { }
95 uint64_t size;
96 const void *data;
97 };
98
99 struct JobStatus {
100 enum State {
101 kOk,
102 kTerminate,
103 kNoJobs
104 };
105 };
106
107 struct UploadJob {
108 enum Type {
109 Upload,
110 Commit,
111 Terminate
112 };
113
114 UploadJob(UploadStreamHandle *handle, UploadBuffer buffer,
115 const CallbackTN *callback = NULL);
116 UploadJob(UploadStreamHandle *handle, const shash::Any &content_hash);
117
118 3458 UploadJob()
119 3458 : type(Terminate)
120 3458 , stream_handle(NULL)
121 3458 , tag_(0)
122 3458 , buffer()
123 3458 , callback(NULL) { }
124
125
1/2
✓ Branch 2 taken 3458 times.
✗ Branch 3 not taken.
3458 static UploadJob *CreateQuitBeacon() { return new UploadJob(); }
126 1158948 bool IsQuitBeacon() { return type == Terminate; }
127
128 Type type;
129 UploadStreamHandle *stream_handle;
130 /**
131 * Ensure that upload jobs belonging to the same file end up in the same
132 * upload task queue.
133 */
134 int64_t tag_;
135 55986 int64_t tag() { return tag_; }
136
137 // type==Upload specific fields
138 UploadBuffer buffer;
139 const CallbackTN *callback;
140
141 // type==Commit specific fields
142 shash::Any content_hash;
143 };
144
145
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2714 times.
5428 virtual ~AbstractUploader() { assert(!tasks_upload_.is_active()); }
146
147 /**
148 * A string identifying the uploader type
149 */
150 virtual std::string name() const = 0;
151
152 /**
153 * Initializes a new repository storage area, e.g. create directory layout
154 * for local backend or create bucket for S3 backend.
155 */
156 virtual bool Create() = 0;
157
158 /**
159 * Concrete uploaders might want to use a customized setting for multi-stream
160 * writing, for instance one per disk. Note that the S3 backend uses one task
161 * but this one task uses internally multiple HTTP streams through curl async
162 * I/O.
163 */
164 6174 virtual unsigned GetNumTasks() const { return num_upload_tasks_; }
165
166 /**
167 * This is called right after the constructor of AbstractUploader or/and its
168 * derived class has been executed. You can override that to do additional
169 * initialization that cannot be done in the constructor itself.
170 *
171 * @return true on successful initialization
172 */
173 virtual bool Initialize();
174
175 /**
176 * Called during Spooler::WaitForUpload(), to ensure that the upload has
177 * finished. If commit == true, then a Commit request is also sent, to apply
178 * all the the changes accumulated during the session. "catalog_path"
179 * represents the path of the root catalog with the changes.
180 * By default it is a noop and returns true;
181 */
182 virtual bool FinalizeSession(bool commit, const std::string &old_root_hash,
183 const std::string &new_root_hash,
184 const RepositoryTag &tag);
185
186 /**
187 * This must be called right before the destruction of the AbstractUploader!
188 * You are _not_ supposed to overwrite this method in your concrete Uploader.
189 */
190 void TearDown();
191
192 /**
193 * Uploads the file at the path local_path into the backend storage under the
194 * path remote_path. When the upload has finished it calls callback.
195 * Note: This method might be implemented in a synchronous way.
196 *
197 * @param local_path path to the file to be uploaded
198 * @param remote_path desired path for the file in the backend storage
199 * @param callback (optional) gets notified when the upload was finished
200 */
201 32333 void UploadFile(const std::string &local_path,
202 const std::string &remote_path,
203 const CallbackTN *callback = NULL) {
204
1/2
✓ Branch 1 taken 32333 times.
✗ Branch 2 not taken.
32333 ++jobs_in_flight_;
205
1/2
✓ Branch 1 taken 32333 times.
✗ Branch 2 not taken.
32333 FileIngestionSource source(local_path);
206
1/2
✓ Branch 1 taken 32333 times.
✗ Branch 2 not taken.
32333 DoUpload(remote_path, &source, callback);
207 32333 }
208
209 62 void UploadIngestionSource(const std::string &remote_path,
210 IngestionSource *source,
211 const CallbackTN *callback = NULL) {
212 62 ++jobs_in_flight_;
213 62 DoUpload(remote_path, source, callback);
214 62 }
215
216 /**
217 * This method is called before the first data block of a streamed upload is
218 * scheduled (see above implementation of UploadStreamHandle for details).
219 *
220 * @param callback (optional) this callback will be invoked once this parti-
221 * cular streamed upload is committed.
222 * @return a pointer to the initialized UploadStreamHandle
223 */
224 virtual UploadStreamHandle *InitStreamedUpload(
225 const CallbackTN *callback) = 0;
226
227 /**
228 * This method schedules a buffer to be uploaded in the context of the
229 * given UploadStreamHandle. The actual upload will happen asynchronously by
230 * a concrete implementation of AbstractUploader
231 * (see AbstractUploader::StreamedUpload()).
232 * As soon has the scheduled upload job is complete (either successful or not)
233 * the optionally passed callback is supposed to be invoked using
234 * AbstractUploader::Respond().
235 *
236 * @param handle Pointer to a previously acquired UploadStreamHandle
237 * @param buffer contains the data block to be uploaded
238 * @param callback (optional) callback object to be invoked once the given
239 * upload is finished (see AbstractUploader::Respond())
240 */
241 896261 void ScheduleUpload(UploadStreamHandle *handle,
242 UploadBuffer buffer,
243 const CallbackTN *callback = NULL) {
244 896261 ++jobs_in_flight_;
245
3/6
✓ Branch 1 taken 896476 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 896439 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 896487 times.
✗ Branch 8 not taken.
896474 tubes_upload_.Dispatch(new UploadJob(handle, buffer, callback));
246 896487 }
247
248 /**
249 * This method schedules a commit job as soon as all data blocks of a streamed
250 * upload are (successfully) uploaded. Derived classes must override
251 * AbstractUploader::FinalizeStreamedUpload() for this to happen.
252 *
253 * @param handle Pointer to a previously acquired UploadStreamHandle
254 * @param content_hash the content hash of the full uploaded data Chunk
255 */
256 258981 void ScheduleCommit(UploadStreamHandle *handle,
257 const shash::Any &content_hash) {
258 258981 ++jobs_in_flight_;
259
1/2
✓ Branch 2 taken 259009 times.
✗ Branch 3 not taken.
259016 tubes_upload_.Dispatch(new UploadJob(handle, content_hash));
260 259017 }
261
262 /**
263 * Removes a file from the backend storage.
264 *
265 * Note: If the file doesn't exist before calling this won't be an error.
266 *
267 * @param file_to_delete path to the file to be removed
268 */
269 7845 void RemoveAsync(const std::string &file_to_delete) {
270 7845 ++jobs_in_flight_;
271 7845 DoRemoveAsync(file_to_delete);
272 7845 }
273
274 /**
275 * Overloaded method used to remove a object based on its content hash.
276 *
277 * @param hash_to_delete the content hash of a file to be deleted
278 */
279 7783 void RemoveAsync(const shash::Any &hash_to_delete) {
280
2/4
✓ Branch 2 taken 7783 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 7783 times.
✗ Branch 6 not taken.
7783 RemoveAsync("data/" + hash_to_delete.MakePath());
281 7783 }
282
283 /**
284 * Get object size based on its content hash
285 *
286 * @param hash the content hash of a file
287 */
288 int64_t GetObjectSize(const shash::Any &hash) {
289 return DoGetObjectSize("data/" + hash.MakePath());
290 }
291
292 /**
293 * Checks if a file is already present in the backend storage. This might be a
294 * synchronous operation.
295 *
296 * @param path the path of the file to be checked
297 * @return true if the file was found in the backend storage
298 */
299 virtual bool Peek(const std::string &path) = 0;
300
301 /**
302 * Make directory in upstream storage. Noop if directory already present.
303 *
304 * @param path relative directory path in the upstream storage
305 * @return true if the directory was successfully created or already present
306 */
307 virtual bool Mkdir(const std::string &path) = 0;
308
309 /**
310 * Creates a top-level shortcut to the given data object. This is particularly
311 * useful for bootstrapping repositories whose data-directory is secured by
312 * a VOMS certificate.
313 *
314 * @param object content hash of the object to be exposed on the top-level
315 * @return true on success
316 */
317 virtual bool PlaceBootstrappingShortcut(const shash::Any &object) = 0;
318
319 /**
320 * Waits until the current upload queue is empty.
321 *
322 * Note: This does NOT necessarily mean, that all files are actually uploaded.
323 * If new jobs are concurrently scheduled the behavior of this method is
324 * not defined (it returns also on intermediately empty queues)
325 */
326 virtual void WaitForUpload() const;
327
328 virtual unsigned int GetNumberOfErrors() const = 0;
329 static void RegisterPlugins();
330 void InitCounters(perf::StatisticsTemplate *statistics);
331
332 protected:
333 typedef Callbackable<UploaderResults>::CallbackTN *CallbackPtr;
334
335 explicit AbstractUploader(const SpoolerDefinition &spooler_definition);
336
337 /**
338 * Implementation of plain file upload
339 * Public interface: AbstractUploader::Upload()
340 *
341 * @param local_path file to be uploaded
342 * @param remote_path destination to be written in the backend
343 * @param callback callback to be called on completion
344 */
345 virtual void DoUpload(const std::string &remote_path,
346 IngestionSource *source,
347 const CallbackTN *callback) = 0;
348
349 /**
350 * Implementation of a streamed upload step. See public interface for details.
351 * Public interface: AbstractUploader::ScheduleUpload()
352 *
353 * @param handle descendant of UploadStreamHandle specifying the stream
354 * @param buffer the CharBuffer to be uploaded to the stream
355 * @param callback callback to be called on completion
356 */
357 virtual void StreamedUpload(UploadStreamHandle *handle,
358 UploadBuffer buffer,
359 const CallbackTN *callback) = 0;
360
361 /**
362 * Implementation of streamed upload commit
363 * Public interface: AbstractUploader::ScheduleUpload()
364 *
365 * @param handle descendant of UploadStreamHandle specifying the stream
366 * @param content_hash the computed content hash of the streamed object
367 */
368 virtual void FinalizeStreamedUpload(UploadStreamHandle *handle,
369 const shash::Any &content_hash) = 0;
370
371
372 virtual void DoRemoveAsync(const std::string &file_to_delete) = 0;
373
374 virtual int64_t DoGetObjectSize(const std::string &file_name) = 0;
375
376 /**
377 * This notifies the callback that is associated to a finishing job. Please
378 * do not call the handed callback yourself in concrete Uploaders!
379 *
380 * Note: Since the job is finished after we respond to it, the callback object
381 * gets automatically destroyed by this call!
382 * Therefore you must not call Respond() twice or use the callback later
383 * by any means!
384 */
385 1195848 void Respond(const CallbackTN *callback,
386 const UploaderResults &result) const {
387
2/2
✓ Branch 0 taken 1188003 times.
✓ Branch 1 taken 7845 times.
1195848 if (callback != NULL) {
388 1188003 (*callback)(result);
389
1/2
✓ Branch 0 taken 1188003 times.
✗ Branch 1 not taken.
1188003 delete callback;
390 }
391
392 1195848 --jobs_in_flight_;
393 1195848 }
394
395 /**
396 * Creates a temporary file in the backend storage's temporary location
397 * For the LocalUploader this usually is the 'txn' directory of the backend
398 * storage. Otherwise it is some scratch area.
399 *
400 * @param path pointer to a string that will contain the created file path
401 * @return a file descriptor to the opened file
402 */
403 int CreateAndOpenTemporaryChunkFile(std::string *path) const;
404
405 8512 const SpoolerDefinition &spooler_definition() const {
406 8512 return spooler_definition_;
407 }
408
409 void CountUploadedChunks() const;
410 void DecUploadedChunks() const;
411 void CountUploadedBytes(int64_t bytes_written) const;
412 void CountDuplicates() const;
413 void CountUploadedCatalogs() const;
414 void CountUploadedCatalogBytes(int64_t bytes_written) const;
415
416 protected:
417 /**
418 * Used by concrete implementations when they use callbacks where it's not
419 * already foreseen, e.g. S3Uploader::Peek().
420 */
421 70 void IncJobsInFlight() { ++jobs_in_flight_; }
422
423 private:
424 const SpoolerDefinition spooler_definition_;
425
426 /**
427 * Number of threads used for I/O write calls. Effectively this parameter
428 * sets the I/O depth. Defaults to 1.
429 */
430 unsigned num_upload_tasks_;
431 mutable SynchronizingCounter<int32_t> jobs_in_flight_;
432 TubeGroup<UploadJob> tubes_upload_;
433 TubeConsumerGroup<UploadJob> tasks_upload_;
434 mutable UniquePtr<UploadCounters> counters_;
435 }; // class AbstractUploader
436
437
438 /**
439 * The actual writing is multi-threaded.
440 */
441 class TaskUpload : public TubeConsumer<AbstractUploader::UploadJob> {
442 public:
443 3459 explicit TaskUpload(AbstractUploader *uploader,
444 Tube<AbstractUploader::UploadJob> *tube)
445 3459 : TubeConsumer<AbstractUploader::UploadJob>(tube), uploader_(uploader) { }
446
447 protected:
448 virtual void Process(AbstractUploader::UploadJob *upload_job);
449
450 private:
451 AbstractUploader *uploader_;
452 };
453
454
455 /**
456 * Each implementation of AbstractUploader must provide its own derivate of the
457 * UploadStreamHandle that is supposed to contain state information for the
458 * streamed upload of one specific chunk.
459 * Each UploadStreamHandle contains a callback object that is invoked as soon as
460 * the streamed upload is committed.
461 */
462 struct UploadStreamHandle {
463 typedef AbstractUploader::CallbackTN CallbackTN;
464 static atomic_int64 g_upload_stream_tag;
465
466 258996 explicit UploadStreamHandle(const CallbackTN *commit_callback)
467 258996 : commit_callback(commit_callback)
468 258996 , tag(atomic_xadd64(&g_upload_stream_tag, 1)) { }
469 518046 virtual ~UploadStreamHandle() { }
470
471 const CallbackTN *commit_callback;
472
473 int64_t tag;
474
475 std::string remote_path; // override remote location of the object
476 };
477
478 } // namespace upload
479
480 #endif // CVMFS_UPLOAD_FACILITY_H_
481