GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/upload.h
Date: 2024-04-28 02:33:07
Exec Total Coverage
Lines: 2 2 100.0%
Branches: 0 0 -%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 /**
6 * Backend Storage Spooler
7 * ~~~~~~~~~~~~~~~~~~~~~~~
8 *
9 * This is the entry point to the general file processing facility of the CVMFS
10 * backend. It works with a two-stage approach:
11 *
12 * 1. Process file content
13 * -> create smaller file chunks for big input files
14 * -> compress the file content (optionally chunked)
15 * -> generate a content hash of the compression result
16 *
17 * 2. Upload files
18 * -> pluggable to support different upload paths (local, S3, ...)
19 *
20 * There are a number of different entities involved in this process. Namely:
21 * -> Spooler - general steering tasks ( + common interface )
22 * -> IngestionPipeline - chunking, compression and hashing of files
23 * -> AbstractUploader - abstract base class for uploading facilities
24 * -> concrete Uploaders - upload functionality for various backend storages
25 *
26 * Stage 1 aka. the processing of files is handled by the IngestionPipeline,
27 * since it is independent from the actual uploading this functionality is
28 * outsourced. The IngestionPipeline will take care of the above mentioned steps
29 * in a concurrent fashion. This process is invoked by calling
30 * Spooler::Process(). While processing, the IngestionPipeline immediately
31 * schedules Upload jobs in order to push data to the backend storage as early
32 * as possible.
33 *
34 * Stage 2 aka. the upload is handled by one of the concrete Uploader classes.
35 * Uploaders have a thin interface described in the AbstractUploader class.
36 * The IngestionPipeline uses a concrete Uploader to push files into the backend
37 * storage as part of it's processing.
38 * Furthermore the user can directly upload files using the Spooler::Upload()
39 * method as described in the next paragraph.
40 *
41 * For some specific files we need to be able to circumvent the
42 * IngestionPipeline to directly push them into the backend storage (i.e.
43 * .cvmfspublished). For this Spooler::Upload() is used. It directly schedules
44 * an upload job in the used concrete Uploader facility. These files will not
45 * get compressed or check-summed by any means.
46 *
47 * In any case, calling Spooler::Process() or Spooler::Upload() will invoke a
48 * callback once the whole job has been finished. Callbacks are provided by the
49 * Observable template. Please see the implementation of this template for more
50 * details on usage and implementation.
51 * The data structure provided by this callback is called SpoolerResult and con-
52 * tains information about the processed file (status, content hash, chunks, ..)
53 * Note: Even if a concrete Uploader internally spawns more than one upload job
54 * to send out chunked files, the user will only see a single invocation
55 * containing information about the uploaded file including it's generated
56 * chunks.
57 *
58 * Workflow:
59 *
60 * User
61 * \O/ Callback (SpoolerResult)
62 * | <----------------------+
63 * / \ |
64 * | |
65 * | Process() | File
66 * +-----------> ################### -----------------> #####################
67 * | Upload() # Spooler # # IngestionPipeline #
68 * +-----------> ################### <----------------- #####################
69 * | ^ SpoolerResult | ^
70 * | | | |
71 * direct Upload | Callback | | |
72 * `|´ | Schedule Upload | |
73 * ##################### <-------------------------+ |
74 * # Upload facility # |
75 * ##################### -------------------------------+
76 * | Callback (UploaderResults)
77 * Upload |
78 * `|´
79 * *********************
80 * * Backend Storage *
81 * *********************
82 *
83 *
84 * TODO(rmeusel): special purpose ::Process...() methods should (optionally?)
85 * return Future<> instead of relying on the callbacks. Those are
86 * somewhat one-shot calls and require a rather fishy idiom when
87 * using callbacks, like:
88 *
89 * cb = spooler->RegisterListener(...certificate_callback...);
90 * spooler->ProcessCertificate(...);
91 * spooler->WaitForUpload();
92 * spooler->UnregisterListener(cb);
93 * cert_hash = global_cert_hash;
94 *
95 * void certificate_callback(shash::Any &hash) {
96 * global_cert_hash = hash;
97 * }
98 *
99 * If ProcessCertificate(), ProcessHistory(), ProcessMetainfo(),
100 * UploadManifest() and UploaderReflog() would return Future<>,
101 * the code would be much more comprehensible and free of user-
102 * managed global state:
103 *
104 * Future<shash::Any> fc = spooler->ProcessCertificate(...);
105 * cert_hash = fc.get();
106 *
107 */
108
109 #ifndef CVMFS_UPLOAD_H_
110 #define CVMFS_UPLOAD_H_
111
112 #include <cstdio>
113 #include <string>
114 #include <vector>
115
116 #include "crypto/hash.h"
117 #include "file_chunk.h"
118 #include "ingestion/ingestion_source.h"
119 #include "ingestion/pipeline.h"
120 #include "repository_tag.h"
121 #include "upload_facility.h"
122 #include "upload_spooler_definition.h"
123 #include "upload_spooler_result.h"
124 #include "util/concurrency.h"
125 #include "util/pointer.h"
126 #include "util/shared_ptr.h"
127
128 namespace upload {
129
130 /**
131 * The Spooler takes care of the upload procedure of files into a backend
132 * storage. It can be extended to multiple supported backend storage types,
133 * like f.e. the local file system or a key value storage.
134 *
135 * This AbstractSpooler defines not much more than the common spooler inter-
136 * face. There are derived classes that actually implement different types of
137 * spoolers.
138 *
139 * Note: A spooler is derived from the Observable template, meaning that it
140 * allows for Listeners to be registered onto it.
141 */
142 class Spooler : public Observable<SpoolerResult> {
143 public:
144 static Spooler *Construct(const SpoolerDefinition &spooler_definition,
145 perf::StatisticsTemplate *statistics = NULL);
146 virtual ~Spooler();
147
148 /**
149 * Prints the name of the targeted backend storage.
150 * Intended for debugging purposes only!
151 */
152 std::string backend_name() const;
153
154 /**
155 * Calls the concrete uploader to create a new repository area
156 */
157 bool Create();
158
159 /**
160 * Schedules a copy job that transfers a file found at local_path to the
161 * location pointed to by remote_path. Copy Jobs do not hash or compress the
162 * given file. They simply upload it.
163 * When the copying has finished a callback will be invoked asynchronously.
164 *
165 * @param local_path path to the file which needs to be copied into the
166 * backend storage
167 * @param remote_path the destination of the file to be copied in the
168 * backend storage
169 */
170 void Upload(const std::string &local_path, const std::string &remote_path);
171
172 /**
173 * Ownership of source is transferred to the spooler
174 */
175 void Upload(const std::string &remote_path, IngestionSource *source);
176
177 /**
178 * Convenience wrapper to upload the Manifest file into the backend storage
179 *
180 * @param local_path the location of the (signed) manifest to be uploaded
181 */
182 void UploadManifest(const std::string &local_path);
183
184 /**
185 * Convenience wrapper to upload a Reflog database into the backend storage
186 *
187 * @param local_path the SQLite file location of the Reflog to be uploaded
188 */
189 void UploadReflog(const std::string &local_path);
190
191 /**
192 * Schedules a process job that compresses and hashes the provided file in
193 * local_path and uploads it into the CAS backend. The remote path to the
194 * file is determined by the content hash of the compressed file appended by
195 * file_suffix.
196 * When the processing has finish a callback will be invoked asynchronously.
197 *
198 * Note: This method might decide to chunk the file into a number of smaller
199 * parts and upload them separately. Still, you will receive a single
200 * callback for the whole job, that contains information about the
201 * generated chunks.
202 *
203 * @param source the ingestion source of the file to be processed
204 * and uploaded into the backend storage
205 * @param allow_chunking (optional) controls if this file should be cut in
206 * chunks or uploaded at once
207 */
208 void Process(IngestionSource *source, const bool allow_chunking = true);
209
210 /**
211 * Convenience wrapper to process a catalog file. Please always use this
212 * for catalog processing. It will add special flags and hash suffixes
213 *
214 * @param local_path the location of the catalog file to be processed
215 */
216 void ProcessCatalog(const std::string &local_path);
217
218 /**
219 * Convenience wrapper to process a history database file. This sets the
220 * processing parameters (like chunking and hash suffixes) accordingly.
221 *
222 * @param local_path the location of the history database file
223 */
224 void ProcessHistory(const std::string &local_path);
225
226 /**
227 * Convenience wrapper to process a certificate file. This sets the
228 * processing parameters (like chunking and hash suffixes) accordingly.
229 *
230 * @param local_path the location of the source of the certificate file
231 */
232 void ProcessCertificate(const std::string &local_path);
233 /**
234 * Ownership of source is transferred to the ingestion pipeline
235 */
236 void ProcessCertificate(IngestionSource *source);
237
238 /**
239 * Convenience wrapper to process a meta info file.
240 *
241 * @param local_path the location of the meta info file
242 */
243 void ProcessMetainfo(const std::string &local_path);
244 /**
245 * Ownership of source is transferred to the ingestion pipeline
246 */
247 void ProcessMetainfo(IngestionSource *source);
248
249 /**
250 * Deletes the given file from the repository backend storage. This requires
251 * using WaitForUpload() to make sure the delete operations reached the
252 * upload backend.
253 *
254 * @param file_to_delete path to the file to be deleted
255 * @return true if file was successfully removed
256 */
257 void RemoveAsync(const std::string &file_to_delete);
258
259 /**
260 * Checks if a file is already present in the backend storage
261 *
262 * @param path the path of the file to be peeked
263 * @return true if the file was found in the backend storage
264 */
265 bool Peek(const std::string &path) const;
266
267 /**
268 * Make directory in upstream storage. Noop if directory already present.
269 * NOTE: currently only used to create the 'stats/' subdirectory
270 *
271 * @param path relative directory path in the upstream storage
272 * @return true if the directory was successfully created or already present
273 */
274 bool Mkdir(const std::string &path);
275
276 /**
277 * Creates a top-level shortcut to the given data object. This is particularly
278 * useful for bootstrapping repositories whose data-directory is secured by
279 * a VOMS certificate.
280 *
281 * @param object content hash of the object to be exposed on the top-level
282 * @return true on success
283 */
284 bool PlaceBootstrappingShortcut(const shash::Any &object) const;
285
286 /**
287 * Blocks until all jobs currently under processing are finished. After it
288 * returned, more jobs can be scheduled if needed.
289 * Note: We assume that no one schedules new jobs while this method is in
290 * waiting state. Otherwise it might never return, since the job queue
291 * does not get empty.
292 */
293 void WaitForUpload() const;
294
295 bool FinalizeSession(bool commit, const std::string &old_root_hash = "",
296 const std::string &new_root_hash = "",
297 const RepositoryTag &tag = RepositoryTag()) const;
298
299 /**
300 * Checks how many of the already processed jobs have failed.
301 *
302 * @return the number of failed jobs at the time this method is invoked
303 */
304 unsigned int GetNumberOfErrors() const;
305
306 51 shash::Algorithms GetHashAlgorithm() const {
307 51 return spooler_definition_.hash_algorithm;
308 }
309
310 SpoolerDefinition::DriverType GetDriverType() const {
311 return spooler_definition_.driver_type;
312 }
313
314 protected:
315 /**
316 * This method is called once before any other operations are performed on
317 * a Spooler. Implements global initialization work.
318 */
319 bool Initialize(perf::StatisticsTemplate *statistics);
320
321 /**
322 * @param spooler_definition the SpoolerDefinition structure that defines
323 * some intrinsics of the concrete Spoolers.
324 */
325 explicit Spooler(const SpoolerDefinition &spooler_definition);
326
327 /**
328 * Used internally: Is called when ingestion pipeline finishes a job.
329 * Automatically takes care of processed files and prepares them for upload.
330 */
331 void ProcessingCallback(const SpoolerResult &data);
332
333 void UploadingCallback(const UploaderResults &data);
334
335 /*
336 * @return the spooler definition that was initially given to any Spooler
337 * constructor.
338 */
339 inline const SpoolerDefinition &spooler_definition() const {
340 return spooler_definition_;
341 }
342
343 private:
344 // Status Information
345 const SpoolerDefinition spooler_definition_;
346
347 UniquePtr<IngestionPipeline> ingestion_pipeline_;
348 UniquePtr<AbstractUploader> uploader_;
349 };
350
351 } // namespace upload
352
353 #endif // CVMFS_UPLOAD_H_
354