| Directory: | cvmfs/ |
|---|---|
| File: | cvmfs/upload_facility.h |
| Date: | 2025-11-02 02:35:35 |
| Exec | Total | Coverage | |
|---|---|---|---|
| Lines: | 64 | 66 | 97.0% |
| Branches: | 30 | 62 | 48.4% |
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | /** | ||
| 2 | * This file is part of the CernVM File System. | ||
| 3 | */ | ||
| 4 | |||
| 5 | #ifndef CVMFS_UPLOAD_FACILITY_H_ | ||
| 6 | #define CVMFS_UPLOAD_FACILITY_H_ | ||
| 7 | |||
| 8 | #include <fcntl.h> | ||
| 9 | #include <stdint.h> | ||
| 10 | |||
| 11 | #include <string> | ||
| 12 | |||
| 13 | #include "ingestion/ingestion_source.h" | ||
| 14 | #include "ingestion/task.h" | ||
| 15 | #include "repository_tag.h" | ||
| 16 | #include "statistics.h" | ||
| 17 | #include "upload_spooler_definition.h" | ||
| 18 | #include "util/atomic.h" | ||
| 19 | #include "util/concurrency.h" | ||
| 20 | #include "util/pointer.h" | ||
| 21 | #include "util/posix.h" | ||
| 22 | #include "util/tube.h" | ||
| 23 | |||
| 24 | namespace upload { | ||
| 25 | |||
| 26 | struct UploadCounters { | ||
| 27 | perf::Counter *n_chunks_added; | ||
| 28 | perf::Counter *n_chunks_duplicated; | ||
| 29 | perf::Counter *n_catalogs_added; | ||
| 30 | perf::Counter *sz_uploaded_bytes; | ||
| 31 | perf::Counter *sz_uploaded_catalog_bytes; | ||
| 32 | |||
| 33 | 56 | explicit UploadCounters(perf::StatisticsTemplate statistics) { | |
| 34 |
3/6✓ Branch 2 taken 56 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 56 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 56 times.
✗ Branch 10 not taken.
|
56 | n_chunks_added = statistics.RegisterOrLookupTemplated( |
| 35 | "n_chunks_added", "Number of new chunks added"); | ||
| 36 |
3/6✓ Branch 2 taken 56 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 56 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 56 times.
✗ Branch 10 not taken.
|
56 | n_chunks_duplicated = statistics.RegisterOrLookupTemplated( |
| 37 | "n_chunks_duplicated", "Number of duplicated chunks added"); | ||
| 38 |
3/6✓ Branch 2 taken 56 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 56 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 56 times.
✗ Branch 10 not taken.
|
56 | n_catalogs_added = statistics.RegisterOrLookupTemplated( |
| 39 | "n_catalogs_added", "Number of new catalogs added"); | ||
| 40 |
3/6✓ Branch 2 taken 56 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 56 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 56 times.
✗ Branch 10 not taken.
|
56 | sz_uploaded_bytes = statistics.RegisterOrLookupTemplated( |
| 41 | "sz_uploaded_bytes", "Number of uploaded bytes"); | ||
| 42 |
3/6✓ Branch 2 taken 56 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 56 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 56 times.
✗ Branch 10 not taken.
|
56 | sz_uploaded_catalog_bytes = statistics.RegisterOrLookupTemplated( |
| 43 | "sz_uploaded_catalog_bytes", "Number of uploaded bytes for catalogs"); | ||
| 44 | 56 | } | |
| 45 | }; // UploadCounters | ||
| 46 | |||
| 47 | struct UploaderResults { | ||
| 48 | enum Type { | ||
| 49 | kFileUpload, | ||
| 50 | kBufferUpload, | ||
| 51 | kChunkCommit, | ||
| 52 | kRemove, | ||
| 53 | kLookup | ||
| 54 | }; | ||
| 55 | |||
| 56 | 41334 | UploaderResults(const int return_code, const std::string &local_path) | |
| 57 | 41334 | : type(kFileUpload), return_code(return_code), local_path(local_path) { } | |
| 58 | |||
| 59 | 18873956 | explicit UploaderResults(Type t, const int return_code) | |
| 60 |
1/2✓ Branch 2 taken 18873946 times.
✗ Branch 3 not taken.
|
18873956 | : type(t), return_code(return_code), local_path("") { } |
| 61 | |||
| 62 | 11624 | UploaderResults() : type(kRemove), return_code(0) { } | |
| 63 | |||
| 64 | const Type type; | ||
| 65 | const int return_code; | ||
| 66 | const std::string local_path; | ||
| 67 | }; | ||
| 68 | |||
| 69 | struct UploadStreamHandle; | ||
| 70 | |||
| 71 | /** | ||
| 72 | * Abstract base class for all backend upload facilities | ||
| 73 | * This class defines an interface and constructs the concrete Uploaders, | ||
| 74 | * furthermore it handles callbacks to the outside world to notify users of done | ||
| 75 | * upload jobs. | ||
| 76 | * | ||
| 77 | * Note: Users could be both the Spooler (when calling Spooler::Upload()) and | ||
| 78 | * the IngestionPipeline (when calling Spooler::Process()). We therefore | ||
| 79 | * cannot use the Observable template here, since this would forward | ||
| 80 | * finished upload jobs to ALL listeners instead of only the owner of the | ||
| 81 | * specific job. | ||
| 82 | */ | ||
| 83 | class AbstractUploader | ||
| 84 | : public PolymorphicConstruction<AbstractUploader, SpoolerDefinition>, | ||
| 85 | public Callbackable<UploaderResults>, | ||
| 86 | public SingleCopy { | ||
| 87 | friend class TaskUpload; | ||
| 88 | |||
| 89 | public: | ||
| 90 | /** | ||
| 91 | * A read-only memory block that is supposed to be written out. | ||
| 92 | */ | ||
| 93 | struct UploadBuffer { | ||
| 94 | 7271622 | UploadBuffer() : size(0), data(NULL) { } | |
| 95 | 11547314 | UploadBuffer(uint64_t s, const void *d) : size(s), data(d) { } | |
| 96 | uint64_t size; | ||
| 97 | const void *data; | ||
| 98 | }; | ||
| 99 | |||
| 100 | struct JobStatus { | ||
| 101 | enum State { | ||
| 102 | kOk, | ||
| 103 | kTerminate, | ||
| 104 | kNoJobs | ||
| 105 | }; | ||
| 106 | }; | ||
| 107 | |||
| 108 | struct UploadJob { | ||
| 109 | enum Type { | ||
| 110 | Upload, | ||
| 111 | Commit, | ||
| 112 | Terminate | ||
| 113 | }; | ||
| 114 | |||
| 115 | UploadJob(UploadStreamHandle *handle, UploadBuffer buffer, | ||
| 116 | const CallbackTN *callback = NULL); | ||
| 117 | UploadJob(UploadStreamHandle *handle, const shash::Any &content_hash); | ||
| 118 | |||
| 119 | 3507 | UploadJob() | |
| 120 | 3507 | : type(Terminate) | |
| 121 | 3507 | , stream_handle(NULL) | |
| 122 | 3507 | , tag_(0) | |
| 123 | 3507 | , buffer() | |
| 124 | 3507 | , callback(NULL) { } | |
| 125 | |||
| 126 |
1/2✓ Branch 2 taken 3507 times.
✗ Branch 3 not taken.
|
3507 | static UploadJob *CreateQuitBeacon() { return new UploadJob(); } |
| 127 | 18830023 | bool IsQuitBeacon() { return type == Terminate; } | |
| 128 | |||
| 129 | Type type; | ||
| 130 | UploadStreamHandle *stream_handle; | ||
| 131 | /** | ||
| 132 | * Ensure that upload jobs belonging to the same file end up in the same | ||
| 133 | * upload task queue. | ||
| 134 | */ | ||
| 135 | int64_t tag_; | ||
| 136 | 36120 | int64_t tag() { return tag_; } | |
| 137 | |||
| 138 | // type==Upload specific fields | ||
| 139 | UploadBuffer buffer; | ||
| 140 | const CallbackTN *callback; | ||
| 141 | |||
| 142 | // type==Commit specific fields | ||
| 143 | shash::Any content_hash; | ||
| 144 | }; | ||
| 145 | |||
| 146 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 3027 times.
|
6054 | virtual ~AbstractUploader() { assert(!tasks_upload_.is_active()); } |
| 147 | |||
| 148 | /** | ||
| 149 | * A string identifying the uploader type | ||
| 150 | */ | ||
| 151 | virtual std::string name() const = 0; | ||
| 152 | |||
| 153 | /** | ||
| 154 | * Initializes a new repository storage area, e.g. create directory layout | ||
| 155 | * for local backend or create bucket for S3 backend. | ||
| 156 | */ | ||
| 157 | virtual bool Create() = 0; | ||
| 158 | |||
| 159 | /** | ||
| 160 | * Concrete uploaders might want to use a customized setting for multi-stream | ||
| 161 | * writing, for instance one per disk. Note that the S3 backend uses one task | ||
| 162 | * but this one task uses internally multiple HTTP streams through curl async | ||
| 163 | * I/O. | ||
| 164 | */ | ||
| 165 | 6536 | virtual unsigned GetNumTasks() const { return num_upload_tasks_; } | |
| 166 | |||
| 167 | /** | ||
| 168 | * This is called right after the constructor of AbstractUploader or/and its | ||
| 169 | * derived class has been executed. You can override that to do additional | ||
| 170 | * initialization that cannot be done in the constructor itself. | ||
| 171 | * | ||
| 172 | * @return true on successful initialization | ||
| 173 | */ | ||
| 174 | virtual bool Initialize(); | ||
| 175 | |||
| 176 | /** | ||
| 177 | * Called during Spooler::WaitForUpload(), to ensure that the upload has | ||
| 178 | * finished. If commit == true, then a Commit request is also sent, to apply | ||
| 179 | * all the the changes accumulated during the session. "catalog_path" | ||
| 180 | * represents the path of the root catalog with the changes. | ||
| 181 | * By default it is a noop and returns true; | ||
| 182 | */ | ||
| 183 | virtual bool FinalizeSession(bool commit, const std::string &old_root_hash, | ||
| 184 | const std::string &new_root_hash, | ||
| 185 | const RepositoryTag &tag); | ||
| 186 | |||
| 187 | /** | ||
| 188 | * This must be called right before the destruction of the AbstractUploader! | ||
| 189 | * You are _not_ supposed to overwrite this method in your concrete Uploader. | ||
| 190 | */ | ||
| 191 | void TearDown(); | ||
| 192 | |||
| 193 | /** | ||
| 194 | * Uploads the file at the path local_path into the backend storage under the | ||
| 195 | * path remote_path. When the upload has finished it calls callback. | ||
| 196 | * Note: This method might be implemented in a synchronous way. | ||
| 197 | * | ||
| 198 | * @param local_path path to the file to be uploaded | ||
| 199 | * @param remote_path desired path for the file in the backend storage | ||
| 200 | * @param callback (optional) gets notified when the upload was finished | ||
| 201 | */ | ||
| 202 | 21014 | void UploadFile(const std::string &local_path, | |
| 203 | const std::string &remote_path, | ||
| 204 | const CallbackTN *callback = NULL) { | ||
| 205 |
1/2✓ Branch 1 taken 21014 times.
✗ Branch 2 not taken.
|
21014 | ++jobs_in_flight_; |
| 206 |
1/2✓ Branch 1 taken 21014 times.
✗ Branch 2 not taken.
|
21014 | FileIngestionSource source(local_path); |
| 207 |
1/2✓ Branch 1 taken 21014 times.
✗ Branch 2 not taken.
|
21014 | DoUpload(remote_path, &source, callback); |
| 208 | 21014 | } | |
| 209 | |||
| 210 | 40 | void UploadIngestionSource(const std::string &remote_path, | |
| 211 | IngestionSource *source, | ||
| 212 | const CallbackTN *callback = NULL) { | ||
| 213 | 40 | ++jobs_in_flight_; | |
| 214 | 40 | DoUpload(remote_path, source, callback); | |
| 215 | 40 | } | |
| 216 | |||
| 217 | /** | ||
| 218 | * This method is called before the first data block of a streamed upload is | ||
| 219 | * scheduled (see above implementation of UploadStreamHandle for details). | ||
| 220 | * | ||
| 221 | * @param callback (optional) this callback will be invoked once this parti- | ||
| 222 | * cular streamed upload is committed. | ||
| 223 | * @return a pointer to the initialized UploadStreamHandle | ||
| 224 | */ | ||
| 225 | virtual UploadStreamHandle *InitStreamedUpload( | ||
| 226 | const CallbackTN *callback) = 0; | ||
| 227 | |||
| 228 | /** | ||
| 229 | * This method schedules a buffer to be uploaded in the context of the | ||
| 230 | * given UploadStreamHandle. The actual upload will happen asynchronously by | ||
| 231 | * a concrete implementation of AbstractUploader | ||
| 232 | * (see AbstractUploader::StreamedUpload()). | ||
| 233 | * As soon has the scheduled upload job is complete (either successful or not) | ||
| 234 | * the optionally passed callback is supposed to be invoked using | ||
| 235 | * AbstractUploader::Respond(). | ||
| 236 | * | ||
| 237 | * @param handle Pointer to a previously acquired UploadStreamHandle | ||
| 238 | * @param buffer contains the data block to be uploaded | ||
| 239 | * @param callback (optional) callback object to be invoked once the given | ||
| 240 | * upload is finished (see AbstractUploader::Respond()) | ||
| 241 | */ | ||
| 242 | 11547140 | void ScheduleUpload(UploadStreamHandle *handle, | |
| 243 | UploadBuffer buffer, | ||
| 244 | const CallbackTN *callback = NULL) { | ||
| 245 | 11547140 | ++jobs_in_flight_; | |
| 246 |
3/6✓ Branch 1 taken 11555550 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 11553694 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 11556478 times.
✗ Branch 8 not taken.
|
11556536 | tubes_upload_.Dispatch(new UploadJob(handle, buffer, callback)); |
| 247 | 11556478 | } | |
| 248 | |||
| 249 | /** | ||
| 250 | * This method schedules a commit job as soon as all data blocks of a streamed | ||
| 251 | * upload are (successfully) uploaded. Derived classes must override | ||
| 252 | * AbstractUploader::FinalizeStreamedUpload() for this to happen. | ||
| 253 | * | ||
| 254 | * @param handle Pointer to a previously acquired UploadStreamHandle | ||
| 255 | * @param content_hash the content hash of the full uploaded data Chunk | ||
| 256 | */ | ||
| 257 | 7266413 | void ScheduleCommit(UploadStreamHandle *handle, | |
| 258 | const shash::Any &content_hash) { | ||
| 259 | 7266413 | ++jobs_in_flight_; | |
| 260 |
1/2✓ Branch 2 taken 7268095 times.
✗ Branch 3 not taken.
|
7268269 | tubes_upload_.Dispatch(new UploadJob(handle, content_hash)); |
| 261 | 7268472 | } | |
| 262 | |||
| 263 | /** | ||
| 264 | * Removes a file from the backend storage. | ||
| 265 | * | ||
| 266 | * Note: If the file doesn't exist before calling this won't be an error. | ||
| 267 | * | ||
| 268 | * @param file_to_delete path to the file to be removed | ||
| 269 | */ | ||
| 270 | 11624 | void RemoveAsync(const std::string &file_to_delete) { | |
| 271 | 11624 | ++jobs_in_flight_; | |
| 272 | 11624 | DoRemoveAsync(file_to_delete); | |
| 273 | 11624 | } | |
| 274 | |||
| 275 | /** | ||
| 276 | * Overloaded method used to remove a object based on its content hash. | ||
| 277 | * | ||
| 278 | * @param hash_to_delete the content hash of a file to be deleted | ||
| 279 | */ | ||
| 280 | 11584 | void RemoveAsync(const shash::Any &hash_to_delete) { | |
| 281 |
2/4✓ Branch 2 taken 11584 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 11584 times.
✗ Branch 6 not taken.
|
11584 | RemoveAsync("data/" + hash_to_delete.MakePath()); |
| 282 | 11584 | } | |
| 283 | |||
| 284 | /** | ||
| 285 | * Get object size based on its content hash | ||
| 286 | * | ||
| 287 | * @param hash the content hash of a file | ||
| 288 | */ | ||
| 289 | ✗ | int64_t GetObjectSize(const shash::Any &hash) { | |
| 290 | ✗ | return DoGetObjectSize("data/" + hash.MakePath()); | |
| 291 | } | ||
| 292 | |||
| 293 | /** | ||
| 294 | * Checks if a file is already present in the backend storage. This might be a | ||
| 295 | * synchronous operation. | ||
| 296 | * | ||
| 297 | * @param path the path of the file to be checked | ||
| 298 | * @return true if the file was found in the backend storage | ||
| 299 | */ | ||
| 300 | virtual bool Peek(const std::string &path) = 0; | ||
| 301 | |||
| 302 | /** | ||
| 303 | * Make directory in upstream storage. Noop if directory already present. | ||
| 304 | * | ||
| 305 | * @param path relative directory path in the upstream storage | ||
| 306 | * @return true if the directory was successfully created or already present | ||
| 307 | */ | ||
| 308 | virtual bool Mkdir(const std::string &path) = 0; | ||
| 309 | |||
| 310 | /** | ||
| 311 | * Creates a top-level shortcut to the given data object. This is particularly | ||
| 312 | * useful for bootstrapping repositories whose data-directory is secured by | ||
| 313 | * a VOMS certificate. | ||
| 314 | * | ||
| 315 | * @param object content hash of the object to be exposed on the top-level | ||
| 316 | * @return true on success | ||
| 317 | */ | ||
| 318 | virtual bool PlaceBootstrappingShortcut(const shash::Any &object) = 0; | ||
| 319 | |||
| 320 | /** | ||
| 321 | * Waits until the current upload queue is empty. | ||
| 322 | * | ||
| 323 | * Note: This does NOT necessarily mean, that all files are actually uploaded. | ||
| 324 | * If new jobs are concurrently scheduled the behavior of this method is | ||
| 325 | * not defined (it returns also on intermediately empty queues) | ||
| 326 | */ | ||
| 327 | virtual void WaitForUpload() const; | ||
| 328 | |||
| 329 | virtual unsigned int GetNumberOfErrors() const = 0; | ||
| 330 | static void RegisterPlugins(); | ||
| 331 | void InitCounters(perf::StatisticsTemplate *statistics); | ||
| 332 | |||
| 333 | protected: | ||
| 334 | typedef Callbackable<UploaderResults>::CallbackTN *CallbackPtr; | ||
| 335 | |||
| 336 | explicit AbstractUploader(const SpoolerDefinition &spooler_definition); | ||
| 337 | |||
| 338 | /** | ||
| 339 | * Implementation of plain file upload | ||
| 340 | * Public interface: AbstractUploader::Upload() | ||
| 341 | * | ||
| 342 | * @param local_path file to be uploaded | ||
| 343 | * @param remote_path destination to be written in the backend | ||
| 344 | * @param callback callback to be called on completion | ||
| 345 | */ | ||
| 346 | virtual void DoUpload(const std::string &remote_path, | ||
| 347 | IngestionSource *source, | ||
| 348 | const CallbackTN *callback) = 0; | ||
| 349 | |||
| 350 | /** | ||
| 351 | * Implementation of a streamed upload step. See public interface for details. | ||
| 352 | * Public interface: AbstractUploader::ScheduleUpload() | ||
| 353 | * | ||
| 354 | * @param handle descendant of UploadStreamHandle specifying the stream | ||
| 355 | * @param buffer the CharBuffer to be uploaded to the stream | ||
| 356 | * @param callback callback to be called on completion | ||
| 357 | */ | ||
| 358 | virtual void StreamedUpload(UploadStreamHandle *handle, | ||
| 359 | UploadBuffer buffer, | ||
| 360 | const CallbackTN *callback) = 0; | ||
| 361 | |||
| 362 | /** | ||
| 363 | * Implementation of streamed upload commit | ||
| 364 | * Public interface: AbstractUploader::ScheduleUpload() | ||
| 365 | * | ||
| 366 | * @param handle descendant of UploadStreamHandle specifying the stream | ||
| 367 | * @param content_hash the computed content hash of the streamed object | ||
| 368 | */ | ||
| 369 | virtual void FinalizeStreamedUpload(UploadStreamHandle *handle, | ||
| 370 | const shash::Any &content_hash) = 0; | ||
| 371 | |||
| 372 | |||
| 373 | virtual void DoRemoveAsync(const std::string &file_to_delete) = 0; | ||
| 374 | |||
| 375 | virtual int64_t DoGetObjectSize(const std::string &file_name) = 0; | ||
| 376 | |||
| 377 | /** | ||
| 378 | * This notifies the callback that is associated to a finishing job. Please | ||
| 379 | * do not call the handed callback yourself in concrete Uploaders! | ||
| 380 | * | ||
| 381 | * Note: Since the job is finished after we respond to it, the callback object | ||
| 382 | * gets automatically destroyed by this call! | ||
| 383 | * Therefore you must not call Respond() twice or use the callback later | ||
| 384 | * by any means! | ||
| 385 | */ | ||
| 386 | 18859334 | void Respond(const CallbackTN *callback, | |
| 387 | const UploaderResults &result) const { | ||
| 388 |
2/2✓ Branch 0 taken 18847710 times.
✓ Branch 1 taken 11624 times.
|
18859334 | if (callback != NULL) { |
| 389 | 18847710 | (*callback)(result); | |
| 390 |
1/2✓ Branch 0 taken 18847710 times.
✗ Branch 1 not taken.
|
18847710 | delete callback; |
| 391 | } | ||
| 392 | |||
| 393 | 18859314 | --jobs_in_flight_; | |
| 394 | 18859344 | } | |
| 395 | |||
| 396 | /** | ||
| 397 | * Creates a temporary file in the backend storage's temporary location | ||
| 398 | * For the LocalUploader this usually is the 'txn' directory of the backend | ||
| 399 | * storage. Otherwise it is some scratch area. | ||
| 400 | * | ||
| 401 | * @param path pointer to a string that will contain the created file path | ||
| 402 | * @return a file descriptor to the opened file | ||
| 403 | */ | ||
| 404 | int CreateAndOpenTemporaryChunkFile(std::string *path) const; | ||
| 405 | |||
| 406 | 18240 | const SpoolerDefinition &spooler_definition() const { | |
| 407 | 18240 | return spooler_definition_; | |
| 408 | } | ||
| 409 | |||
| 410 | void CountUploadedChunks() const; | ||
| 411 | void DecUploadedChunks() const; | ||
| 412 | void CountUploadedBytes(int64_t bytes_written) const; | ||
| 413 | void CountDuplicates() const; | ||
| 414 | void CountUploadedCatalogs() const; | ||
| 415 | void CountUploadedCatalogBytes(int64_t bytes_written) const; | ||
| 416 | |||
| 417 | protected: | ||
| 418 | /** | ||
| 419 | * Used by concrete implementations when they use callbacks where it's not | ||
| 420 | * already foreseen, e.g. S3Uploader::Peek(). | ||
| 421 | */ | ||
| 422 | 150 | void IncJobsInFlight() { ++jobs_in_flight_; } | |
| 423 | |||
| 424 | private: | ||
| 425 | const SpoolerDefinition spooler_definition_; | ||
| 426 | |||
| 427 | /** | ||
| 428 | * Number of threads used for I/O write calls. Effectively this parameter | ||
| 429 | * sets the I/O depth. Defaults to 1. | ||
| 430 | */ | ||
| 431 | unsigned num_upload_tasks_; | ||
| 432 | mutable SynchronizingCounter<int32_t> jobs_in_flight_; | ||
| 433 | TubeGroup<UploadJob> tubes_upload_; | ||
| 434 | TubeConsumerGroup<UploadJob> tasks_upload_; | ||
| 435 | mutable UniquePtr<UploadCounters> counters_; | ||
| 436 | }; // class AbstractUploader | ||
| 437 | |||
| 438 | |||
| 439 | /** | ||
| 440 | * The actual writing is multi-threaded. | ||
| 441 | */ | ||
| 442 | class TaskUpload : public TubeConsumer<AbstractUploader::UploadJob> { | ||
| 443 | public: | ||
| 444 | 3508 | explicit TaskUpload(AbstractUploader *uploader, | |
| 445 | Tube<AbstractUploader::UploadJob> *tube) | ||
| 446 | 3508 | : TubeConsumer<AbstractUploader::UploadJob>(tube), uploader_(uploader) { } | |
| 447 | |||
| 448 | protected: | ||
| 449 | virtual void Process(AbstractUploader::UploadJob *upload_job); | ||
| 450 | |||
| 451 | private: | ||
| 452 | AbstractUploader *uploader_; | ||
| 453 | }; | ||
| 454 | |||
| 455 | |||
| 456 | /** | ||
| 457 | * Each implementation of AbstractUploader must provide its own derivate of the | ||
| 458 | * UploadStreamHandle that is supposed to contain state information for the | ||
| 459 | * streamed upload of one specific chunk. | ||
| 460 | * Each UploadStreamHandle contains a callback object that is invoked as soon as | ||
| 461 | * the streamed upload is committed. | ||
| 462 | */ | ||
| 463 | struct UploadStreamHandle { | ||
| 464 | typedef AbstractUploader::CallbackTN CallbackTN; | ||
| 465 | static atomic_int64 g_upload_stream_tag; | ||
| 466 | |||
| 467 | 7266674 | explicit UploadStreamHandle(const CallbackTN *commit_callback) | |
| 468 | 7266674 | : commit_callback(commit_callback) | |
| 469 | 7266674 | , tag(atomic_xadd64(&g_upload_stream_tag, 1)) { } | |
| 470 | 14537930 | virtual ~UploadStreamHandle() { } | |
| 471 | |||
| 472 | const CallbackTN *commit_callback; | ||
| 473 | |||
| 474 | int64_t tag; | ||
| 475 | |||
| 476 | std::string remote_path; // override remote location of the object | ||
| 477 | }; | ||
| 478 | |||
| 479 | } // namespace upload | ||
| 480 | |||
| 481 | #endif // CVMFS_UPLOAD_FACILITY_H_ | ||
| 482 |