GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/upload_facility.h
Date: 2024-04-28 02:33:07
Exec Total Coverage
Lines: 75 77 97.4%
Branches: 30 62 48.4%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_UPLOAD_FACILITY_H_
6 #define CVMFS_UPLOAD_FACILITY_H_
7
8 #include <fcntl.h>
9 #include <stdint.h>
10
11 #include <string>
12
13 #include "ingestion/ingestion_source.h"
14 #include "ingestion/task.h"
15 #include "repository_tag.h"
16 #include "statistics.h"
17 #include "upload_spooler_definition.h"
18 #include "util/atomic.h"
19 #include "util/concurrency.h"
20 #include "util/posix.h"
21 #include "util/tube.h"
22
23 namespace upload {
24
25 struct UploadCounters {
26 perf::Counter *n_chunks_added;
27 perf::Counter *n_chunks_duplicated;
28 perf::Counter *n_catalogs_added;
29 perf::Counter *sz_uploaded_bytes;
30 perf::Counter *sz_uploaded_catalog_bytes;
31
32 2 explicit UploadCounters(perf::StatisticsTemplate statistics) {
33
3/6
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2 times.
✗ Branch 10 not taken.
2 n_chunks_added = statistics.RegisterOrLookupTemplated(
34 "n_chunks_added", "Number of new chunks added");
35
3/6
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2 times.
✗ Branch 10 not taken.
2 n_chunks_duplicated = statistics.RegisterOrLookupTemplated(
36 "n_chunks_duplicated", "Number of duplicated chunks added");
37
3/6
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2 times.
✗ Branch 10 not taken.
2 n_catalogs_added = statistics.RegisterOrLookupTemplated(
38 "n_catalogs_added", "Number of new catalogs added");
39
3/6
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2 times.
✗ Branch 10 not taken.
2 sz_uploaded_bytes = statistics.RegisterOrLookupTemplated(
40 "sz_uploaded_bytes", "Number of uploaded bytes");
41
3/6
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2 times.
✗ Branch 10 not taken.
2 sz_uploaded_catalog_bytes = statistics.RegisterOrLookupTemplated(
42 "sz_uploaded_catalog_bytes", "Number of uploaded bytes for catalogs");
43 2 }
44 }; // UploadCounters
45
46 struct UploaderResults {
47 enum Type { kFileUpload, kBufferUpload, kChunkCommit, kRemove, kLookup };
48
49 2050 UploaderResults(const int return_code, const std::string &local_path)
50 2050 : type(kFileUpload),
51 2050 return_code(return_code),
52 2050 local_path(local_path) {}
53
54 673327 explicit UploaderResults(Type t, const int return_code)
55 673327 : type(t),
56 673327 return_code(return_code),
57
1/2
✓ Branch 2 taken 673327 times.
✗ Branch 3 not taken.
673327 local_path("") {}
58
59 364 UploaderResults()
60 364 : type(kRemove)
61 364 , return_code(0)
62 364 { }
63
64 const Type type;
65 const int return_code;
66 const std::string local_path;
67 };
68
69 struct UploadStreamHandle;
70
71 /**
72 * Abstract base class for all backend upload facilities
73 * This class defines an interface and constructs the concrete Uploaders,
74 * furthermore it handles callbacks to the outside world to notify users of done
75 * upload jobs.
76 *
77 * Note: Users could be both the Spooler (when calling Spooler::Upload()) and
78 * the IngestionPipeline (when calling Spooler::Process()). We therefore
79 * cannot use the Observable template here, since this would forward
80 * finished upload jobs to ALL listeners instead of only the owner of the
81 * specific job.
82 */
83 class AbstractUploader
84 : public PolymorphicConstruction<AbstractUploader, SpoolerDefinition>
85 , public Callbackable<UploaderResults>
86 , public SingleCopy {
87 friend class TaskUpload;
88
89 public:
90 /**
91 * A read-only memory block that is supposed to be written out.
92 */
93 struct UploadBuffer {
94 250847 UploadBuffer() : size(0), data(NULL) { }
95 420196 UploadBuffer(uint64_t s, const void *d) : size(s), data(d) { }
96 uint64_t size;
97 const void *data;
98 };
99
100 struct JobStatus {
101 enum State { kOk, kTerminate, kNoJobs };
102 };
103
104 struct UploadJob {
105 enum Type { Upload, Commit, Terminate };
106
107 UploadJob(UploadStreamHandle *handle, UploadBuffer buffer,
108 const CallbackTN *callback = NULL);
109 UploadJob(UploadStreamHandle *handle, const shash::Any &content_hash);
110
111 136 UploadJob()
112 136 : type(Terminate)
113 136 , stream_handle(NULL)
114 136 , tag_(0)
115 136 , buffer()
116 136 , callback(NULL) {}
117
118
1/2
✓ Branch 2 taken 136 times.
✗ Branch 3 not taken.
136 static UploadJob *CreateQuitBeacon() { return new UploadJob(); }
119 671348 bool IsQuitBeacon() { return type == Terminate; }
120
121 Type type;
122 UploadStreamHandle *stream_handle;
123 /**
124 * Ensure that upload jobs belonging to the same file end up in the same
125 * upload task queue.
126 */
127 int64_t tag_;
128 1806 int64_t tag() { return tag_; }
129
130 // type==Upload specific fields
131 UploadBuffer buffer;
132 const CallbackTN *callback;
133
134 // type==Commit specific fields
135 shash::Any content_hash;
136 };
137
138
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 112 times.
224 virtual ~AbstractUploader() { assert(!tasks_upload_.is_active()); }
139
140 /**
141 * A string identifying the uploader type
142 */
143 virtual std::string name() const = 0;
144
145 /**
146 * Initializes a new repository storage area, e.g. create directory layout
147 * for local backend or create bucket for S3 backend.
148 */
149 virtual bool Create() = 0;
150
151 /**
152 * Concrete uploaders might want to use a customized setting for multi-stream
153 * writing, for instance one per disk. Note that the S3 backend uses one task
154 * but this one task uses internally multiple HTTP streams through curl async
155 * I/O.
156 */
157 248 virtual unsigned GetNumTasks() const { return num_upload_tasks_; }
158
159 /**
160 * This is called right after the constructor of AbstractUploader or/and its
161 * derived class has been executed. You can override that to do additional
162 * initialization that cannot be done in the constructor itself.
163 *
164 * @return true on successful initialization
165 */
166 virtual bool Initialize();
167
168 /**
169 * Called during Spooler::WaitForUpload(), to ensure that the upload has
170 * finished. If commit == true, then a Commit request is also sent, to apply
171 * all the the changes accumulated during the session. "catalog_path"
172 * represents the path of the root catalog with the changes.
173 * By default it is a noop and returns true;
174 */
175 virtual bool FinalizeSession(bool commit, const std::string &old_root_hash,
176 const std::string &new_root_hash,
177 const RepositoryTag &tag);
178
179 /**
180 * This must be called right before the destruction of the AbstractUploader!
181 * You are _not_ supposed to overwrite this method in your concrete Uploader.
182 */
183 void TearDown();
184
185 /**
186 * Uploads the file at the path local_path into the backend storage under the
187 * path remote_path. When the upload has finished it calls callback.
188 * Note: This method might be implemented in a synchronous way.
189 *
190 * @param local_path path to the file to be uploaded
191 * @param remote_path desired path for the file in the backend storage
192 * @param callback (optional) gets notified when the upload was finished
193 */
194 1034 void UploadFile(
195 const std::string &local_path,
196 const std::string &remote_path,
197 const CallbackTN *callback = NULL)
198 {
199
1/2
✓ Branch 1 taken 1034 times.
✗ Branch 2 not taken.
1034 ++jobs_in_flight_;
200
1/2
✓ Branch 1 taken 1034 times.
✗ Branch 2 not taken.
1034 FileIngestionSource source(local_path);
201
1/2
✓ Branch 1 taken 1034 times.
✗ Branch 2 not taken.
1034 DoUpload(remote_path, &source, callback);
202 1034 }
203
204 2 void UploadIngestionSource(
205 const std::string &remote_path,
206 IngestionSource *source,
207 const CallbackTN *callback = NULL)
208 {
209 2 ++jobs_in_flight_;
210 2 DoUpload(remote_path, source, callback);
211 2 }
212
213 /**
214 * This method is called before the first data block of a streamed upload is
215 * scheduled (see above implementation of UploadStreamHandle for details).
216 *
217 * @param callback (optional) this callback will be invoked once this parti-
218 * cular streamed upload is committed.
219 * @return a pointer to the initialized UploadStreamHandle
220 */
221 virtual UploadStreamHandle *InitStreamedUpload(
222 const CallbackTN *callback) = 0;
223
224 /**
225 * This method schedules a buffer to be uploaded in the context of the
226 * given UploadStreamHandle. The actual upload will happen asynchronously by
227 * a concrete implementation of AbstractUploader
228 * (see AbstractUploader::StreamedUpload()).
229 * As soon has the scheduled upload job is complete (either successful or not)
230 * the optionally passed callback is supposed to be invoked using
231 * AbstractUploader::Respond().
232 *
233 * @param handle Pointer to a previously acquired UploadStreamHandle
234 * @param buffer contains the data block to be uploaded
235 * @param callback (optional) callback object to be invoked once the given
236 * upload is finished (see AbstractUploader::Respond())
237 */
238 420180 void ScheduleUpload(
239 UploadStreamHandle *handle,
240 UploadBuffer buffer,
241 const CallbackTN *callback = NULL)
242 {
243 420180 ++jobs_in_flight_;
244
3/6
✓ Branch 1 taken 420426 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 420405 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 420443 times.
✗ Branch 8 not taken.
420449 tubes_upload_.Dispatch(new UploadJob(handle, buffer, callback));
245 420443 }
246
247 /**
248 * This method schedules a commit job as soon as all data blocks of a streamed
249 * upload are (successfully) uploaded. Derived classes must override
250 * AbstractUploader::FinalizeStreamedUpload() for this to happen.
251 *
252 * @param handle Pointer to a previously acquired UploadStreamHandle
253 * @param content_hash the content hash of the full uploaded data Chunk
254 */
255 250681 void ScheduleCommit(
256 UploadStreamHandle *handle,
257 const shash::Any &content_hash)
258 {
259 250681 ++jobs_in_flight_;
260
1/2
✓ Branch 2 taken 250707 times.
✗ Branch 3 not taken.
250718 tubes_upload_.Dispatch(new UploadJob(handle, content_hash));
261 250713 }
262
263 /**
264 * Removes a file from the backend storage.
265 *
266 * Note: If the file doesn't exist before calling this won't be an error.
267 *
268 * @param file_to_delete path to the file to be removed
269 */
270 364 void RemoveAsync(const std::string &file_to_delete) {
271 364 ++jobs_in_flight_;
272 364 DoRemoveAsync(file_to_delete);
273 364 }
274
275 /**
276 * Overloaded method used to remove a object based on its content hash.
277 *
278 * @param hash_to_delete the content hash of a file to be deleted
279 */
280 362 void RemoveAsync(const shash::Any &hash_to_delete) {
281
2/4
✓ Branch 2 taken 362 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 362 times.
✗ Branch 6 not taken.
362 RemoveAsync("data/" + hash_to_delete.MakePath());
282 362 }
283
284 /**
285 * Get object size based on its content hash
286 *
287 * @param hash the content hash of a file
288 */
289 int64_t GetObjectSize(const shash::Any &hash) {
290 return DoGetObjectSize("data/" + hash.MakePath());
291 }
292
293 /**
294 * Checks if a file is already present in the backend storage. This might be a
295 * synchronous operation.
296 *
297 * @param path the path of the file to be checked
298 * @return true if the file was found in the backend storage
299 */
300 virtual bool Peek(const std::string &path) = 0;
301
302 /**
303 * Make directory in upstream storage. Noop if directory already present.
304 *
305 * @param path relative directory path in the upstream storage
306 * @return true if the directory was successfully created or already present
307 */
308 virtual bool Mkdir(const std::string &path) = 0;
309
310 /**
311 * Creates a top-level shortcut to the given data object. This is particularly
312 * useful for bootstrapping repositories whose data-directory is secured by
313 * a VOMS certificate.
314 *
315 * @param object content hash of the object to be exposed on the top-level
316 * @return true on success
317 */
318 virtual bool PlaceBootstrappingShortcut(const shash::Any &object) = 0;
319
320 /**
321 * Waits until the current upload queue is empty.
322 *
323 * Note: This does NOT necessarily mean, that all files are actually uploaded.
324 * If new jobs are concurrently scheduled the behavior of this method is
325 * not defined (it returns also on intermediately empty queues)
326 */
327 virtual void WaitForUpload() const;
328
329 virtual unsigned int GetNumberOfErrors() const = 0;
330 static void RegisterPlugins();
331 void InitCounters(perf::StatisticsTemplate *statistics);
332
333 protected:
334 typedef Callbackable<UploaderResults>::CallbackTN *CallbackPtr;
335
336 explicit AbstractUploader(const SpoolerDefinition &spooler_definition);
337
338 /**
339 * Implementation of plain file upload
340 * Public interface: AbstractUploader::Upload()
341 *
342 * @param local_path file to be uploaded
343 * @param remote_path destination to be written in the backend
344 * @param callback callback to be called on completion
345 */
346 virtual void DoUpload(const std::string &remote_path,
347 IngestionSource *source,
348 const CallbackTN *callback) = 0;
349
350 /**
351 * Implementation of a streamed upload step. See public interface for details.
352 * Public interface: AbstractUploader::ScheduleUpload()
353 *
354 * @param handle descendant of UploadStreamHandle specifying the stream
355 * @param buffer the CharBuffer to be uploaded to the stream
356 * @param callback callback to be called on completion
357 */
358 virtual void StreamedUpload(UploadStreamHandle *handle,
359 UploadBuffer buffer,
360 const CallbackTN *callback) = 0;
361
362 /**
363 * Implementation of streamed upload commit
364 * Public interface: AbstractUploader::ScheduleUpload()
365 *
366 * @param handle descendant of UploadStreamHandle specifying the stream
367 * @param content_hash the computed content hash of the streamed object
368 */
369 virtual void FinalizeStreamedUpload(UploadStreamHandle *handle,
370 const shash::Any &content_hash) = 0;
371
372
373 virtual void DoRemoveAsync(const std::string &file_to_delete) = 0;
374
375 virtual int64_t DoGetObjectSize(const std::string &file_name) = 0;
376
377 /**
378 * This notifies the callback that is associated to a finishing job. Please
379 * do not call the handed callback yourself in concrete Uploaders!
380 *
381 * Note: Since the job is finished after we respond to it, the callback object
382 * gets automatically destroyed by this call!
383 * Therefore you must not call Respond() twice or use the callback later
384 * by any means!
385 */
386 672617 void Respond(const CallbackTN *callback,
387 const UploaderResults &result) const
388 {
389
2/2
✓ Branch 0 taken 672253 times.
✓ Branch 1 taken 364 times.
672617 if (callback != NULL) {
390 672253 (*callback)(result);
391
1/2
✓ Branch 0 taken 672253 times.
✗ Branch 1 not taken.
672253 delete callback;
392 }
393
394 672617 --jobs_in_flight_;
395 672617 }
396
397 /**
398 * Creates a temporary file in the backend storage's temporary location
399 * For the LocalUploader this usually is the 'txn' directory of the backend
400 * storage. Otherwise it is some scratch area.
401 *
402 * @param path pointer to a string that will contain the created file path
403 * @return a file descriptor to the opened file
404 */
405 int CreateAndOpenTemporaryChunkFile(std::string *path) const;
406
407 608 const SpoolerDefinition &spooler_definition() const {
408 608 return spooler_definition_;
409 }
410
411 void CountUploadedChunks() const;
412 void DecUploadedChunks() const;
413 void CountUploadedBytes(int64_t bytes_written) const;
414 void CountDuplicates() const;
415 void CountUploadedCatalogs() const;
416 void CountUploadedCatalogBytes(int64_t bytes_written) const;
417
418 protected:
419 /**
420 * Used by concrete implementations when they use callbacks where it's not
421 * already forseen, e.g. S3Uploader::Peek().
422 */
423 5 void IncJobsInFlight() {
424 5 ++jobs_in_flight_;
425 5 }
426
427 private:
428 const SpoolerDefinition spooler_definition_;
429
430 /**
431 * Number of threads used for I/O write calls. Effectively this parameter
432 * sets the I/O depth. Defaults to 1.
433 */
434 unsigned num_upload_tasks_;
435 mutable SynchronizingCounter<int32_t> jobs_in_flight_;
436 TubeGroup<UploadJob> tubes_upload_;
437 TubeConsumerGroup<UploadJob> tasks_upload_;
438 mutable UniquePtr<UploadCounters> counters_;
439 }; // class AbstractUploader
440
441
442 /**
443 * The actual writing is multi-threaded.
444 */
445 class TaskUpload : public TubeConsumer<AbstractUploader::UploadJob> {
446 public:
447 136 explicit TaskUpload(
448 AbstractUploader *uploader,
449 Tube<AbstractUploader::UploadJob> *tube)
450 136 : TubeConsumer<AbstractUploader::UploadJob>(tube)
451 136 , uploader_(uploader)
452 136 { }
453
454 protected:
455 virtual void Process(AbstractUploader::UploadJob *upload_job);
456
457 private:
458 AbstractUploader *uploader_;
459 };
460
461
462 /**
463 * Each implementation of AbstractUploader must provide its own derivate of the
464 * UploadStreamHandle that is supposed to contain state information for the
465 * streamed upload of one specific chunk.
466 * Each UploadStreamHandle contains a callback object that is invoked as soon as
467 * the streamed upload is committed.
468 */
469 struct UploadStreamHandle {
470 typedef AbstractUploader::CallbackTN CallbackTN;
471 static atomic_int64 g_upload_stream_tag;
472
473 250683 explicit UploadStreamHandle(const CallbackTN *commit_callback)
474 250683 : commit_callback(commit_callback)
475 250683 , tag(atomic_xadd64(&g_upload_stream_tag, 1)) {}
476 501452 virtual ~UploadStreamHandle() {}
477
478 const CallbackTN *commit_callback;
479
480 int64_t tag;
481
482 std::string remote_path; // override remote location of the object
483 };
484
485 } // namespace upload
486
487 #endif // CVMFS_UPLOAD_FACILITY_H_
488