GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/upload_facility.h Lines: 49 56 87.5 %
Date: 2019-02-03 02:48:13 Branches: 7 14 50.0 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#ifndef CVMFS_UPLOAD_FACILITY_H_
6
#define CVMFS_UPLOAD_FACILITY_H_
7
8
#include <fcntl.h>
9
#include <stdint.h>
10
11
#include <string>
12
13
#include "atomic.h"
14
#include "ingestion/task.h"
15
#include "ingestion/tube.h"
16
#include "repository_tag.h"
17
#include "statistics.h"
18
#include "upload_spooler_definition.h"
19
#include "util/posix.h"
20
#include "util_concurrency.h"
21
22
namespace upload {
23
24
struct UploadCounters {
25
  perf::Counter *n_duplicated_files;
26
  perf::Counter *sz_uploaded_bytes;
27
28
  explicit UploadCounters(perf::StatisticsTemplate statistics) {
29
    n_duplicated_files = statistics.RegisterTemplated("n_duplicated_files",
30
        "Number of duplicated files added");
31
    sz_uploaded_bytes = statistics.RegisterTemplated("sz_uploaded_bytes",
32
        "Number of uploaded bytes");
33
  }
34
};  // UploadCounters
35
36
664559
struct UploaderResults {
37
  enum Type { kFileUpload, kBufferUpload, kChunkCommit, kRemove };
38
39
5140
  UploaderResults(const int return_code, const std::string &local_path)
40
    : type(kFileUpload),
41
      return_code(return_code),
42
5140
      local_path(local_path) {}
43
44
595840
  explicit UploaderResults(Type t, const int return_code)
45
    : type(t),
46
      return_code(return_code),
47
595840
      local_path("") {}
48
49
220
  UploaderResults()
50
    : type(kRemove)
51
220
    , return_code(0)
52
220
  { }
53
54
  const Type type;
55
  const int return_code;
56
  const std::string local_path;
57
};
58
59
struct UploadStreamHandle;
60
61
/**
62
 * Abstract base class for all backend upload facilities
63
 * This class defines an interface and constructs the concrete Uploaders,
64
 * futhermore it handles callbacks to the outside world to notify users of done
65
 * upload jobs.
66
 *
67
 * Note: Users could be both the Spooler (when calling Spooler::Upload()) and
68
 *       the IngestionPipeline (when calling Spooler::Process()). We therefore
69
 *       cannot use the Observable template here, since this would forward
70
 *       finished upload jobs to ALL listeners instead of only the owner of the
71
 *       specific job.
72
 */
73
class AbstractUploader
74
  : public PolymorphicConstruction<AbstractUploader, SpoolerDefinition>
75
  , public Callbackable<UploaderResults>
76
{
77
  friend class TaskUpload;
78
79
 public:
80
  /**
81
   * A read-only memory block that is supposed to be written out.
82
   */
83
  struct UploadBuffer {
84
3180
    UploadBuffer() : size(0), data(NULL) { }
85
585016
    UploadBuffer(uint64_t s, void *d) : size(s), data(d) { }
86
    uint64_t size;
87
    void *data;
88
  };
89
90
  struct JobStatus {
91
    enum State { kOk, kTerminate, kNoJobs };
92
  };
93
94
  struct UploadJob {
95
    enum Type { Upload, Commit, Terminate };
96
97
    UploadJob(UploadStreamHandle *handle, UploadBuffer buffer,
98
              const CallbackTN *callback = NULL);
99
    UploadJob(UploadStreamHandle *handle, const shash::Any &content_hash);
100
101
341
    UploadJob()
102
        : type(Terminate)
103
        , stream_handle(NULL)
104
        , tag_(0)
105
        , buffer()
106
341
        , callback(NULL) {}
107
108
341
    static UploadJob *CreateQuitBeacon() { return new UploadJob(); }
109
588181
    bool IsQuitBeacon() { return type == Terminate; }
110
111
    Type type;
112
    UploadStreamHandle *stream_handle;
113
    /**
114
     * Ensure that upload jobs belonging to the same file end up in the same
115
     * upload task queue.
116
     */
117
    int64_t tag_;
118
9008
    int64_t tag() { return tag_; }
119
120
    // type==Upload specific fields
121
    UploadBuffer buffer;
122
    const CallbackTN *callback;
123
124
    // type==Commit specific fields
125
    shash::Any content_hash;
126
  };
127
128

231
  virtual ~AbstractUploader() { assert(!tasks_upload_.is_active()); }
129
130
  /**
131
   * A string identifying the uploader type
132
   */
133
  virtual std::string name() const = 0;
134
135
  /**
136
   * Concrete uploaders might want to use a customized setting for multi-stream
137
   * writing, for instance one per disk.  Note that the S3 backend uses one task
138
   * but this one task uses internally mutliple HTTP streams through curl async
139
   * I/O.
140
   */
141
572
  virtual unsigned GetNumTasks() const { return num_upload_tasks_; }
142
143
  /**
144
   * This is called right after the constructor of AbstractUploader or/and its
145
   * derived class has been executed. You can override that to do additional
146
   * initialization that cannot be done in the constructor itself.
147
   *
148
   * @return   true on successful initialization
149
   */
150
  virtual bool Initialize();
151
152
  /**
153
   * Called during Spooler::WaitForUpload(), to ensure that the upload has
154
   * finished. If commit == true, then a Commit request is also sent, to apply
155
   * all the the changes accumulated during the session. "catalog_path"
156
   * represents the path of the root catalog with the changes.
157
   * By default it is a noop and returns true;
158
   */
159
  virtual bool FinalizeSession(bool commit, const std::string &old_root_hash,
160
                               const std::string &new_root_hash,
161
                               const RepositoryTag &tag);
162
163
  /**
164
   * This must be called right before the destruction of the AbstractUploader!
165
   * You are _not_ supposed to overwrite this method in your concrete Uploader.
166
   */
167
  void TearDown();
168
169
  /**
170
   * Uploads the file at the path local_path into the backend storage under the
171
   * path remote_path. When the upload has finished it calls callback.
172
   * Note: This method might be implemented in a synchronous way.
173
   *
174
   * @param local_path   path to the file to be uploaded
175
   * @param remote_path  desired path for the file in the backend storage
176
   * @param callback     (optional) gets notified when the upload was finished
177
   */
178
2580
  void Upload(
179
    const std::string &local_path,
180
    const std::string &remote_path,
181
    const CallbackTN *callback = NULL)
182
  {
183
2580
    ++jobs_in_flight_;
184
2580
    FileUpload(local_path, remote_path, callback);
185
2580
  }
186
187
  /**
188
   * This method is called before the first data block of a streamed upload is
189
   * scheduled (see above implementation of UploadStreamHandle for details).
190
   *
191
   * @param callback   (optional) this callback will be invoked once this parti-
192
   *                   cular streamed upload is committed.
193
   * @return           a pointer to the initialized UploadStreamHandle
194
   */
195
  virtual UploadStreamHandle *InitStreamedUpload(
196
      const CallbackTN *callback = NULL) = 0;
197
198
  /**
199
   * This method schedules a buffer to be uploaded in the context of the
200
   * given UploadStreamHandle. The actual upload will happen asynchronously by
201
   * a concrete implementation of AbstractUploader
202
   * (see AbstractUploader::StreamedUpload()).
203
   * As soon has the scheduled upload job is complete (either successful or not)
204
   * the optionally passed callback is supposed to be invoked using
205
   * AbstractUploader::Respond().
206
   *
207
   * @param handle    Pointer to a previously acquired UploadStreamHandle
208
   * @param buffer    contains the data block to be uploaded
209
   * @param callback  (optional) callback object to be invoked once the given
210
   *                  upload is finished (see AbstractUploader::Respond())
211
   */
212
585016
  void ScheduleUpload(
213
    UploadStreamHandle *handle,
214
    UploadBuffer buffer,
215
    const CallbackTN *callback = NULL)
216
  {
217
585016
    ++jobs_in_flight_;
218
585016
    tubes_upload_.Dispatch(new UploadJob(handle, buffer, callback));
219
585016
  }
220
221
  /**
222
   * This method schedules a commit job as soon as all data blocks of a streamed
223
   * upload are (successfully) uploaded. Derived classes must override
224
   * AbstractUploader::FinalizeStreamedUpload() for this to happen.
225
   *
226
   * @param handle        Pointer to a previously acquired UploadStreamHandle
227
   * @param content_hash  the content hash of the full uploaded data Chunk
228
   */
229
2824
  void ScheduleCommit(
230
    UploadStreamHandle *handle,
231
    const shash::Any &content_hash)
232
  {
233
2824
    ++jobs_in_flight_;
234
2824
    tubes_upload_.Dispatch(new UploadJob(handle, content_hash));
235
2824
  }
236
237
  /**
238
   * Removes a file from the backend storage.
239
   *
240
   * Note: If the file doesn't exist before calling this won't be an error.
241
   *
242
   * @param file_to_delete  path to the file to be removed
243
   */
244
220
  void RemoveAsync(const std::string &file_to_delete) {
245
220
    ++jobs_in_flight_;
246
220
    DoRemoveAsync(file_to_delete);
247
220
  }
248
249
  /**
250
   * Overloaded method used to remove a object based on its content hash.
251
   *
252
   * @param hash_to_delete  the content hash of a file to be deleted
253
   */
254
214
  void RemoveAsync(const shash::Any &hash_to_delete) {
255
214
    RemoveAsync("data/" + hash_to_delete.MakePath());
256
214
  }
257
258
  /**
259
   * Get object size based on its content hash
260
   *
261
   * @param hash  the content hash of a file
262
   */
263
  int64_t GetObjectSize(const shash::Any &hash) {
264
    return DoGetObjectSize("data/" + hash.MakePath());
265
  }
266
267
  /**
268
   * Checks if a file is already present in the backend storage. This might be a
269
   * synchronous operation.
270
   *
271
   * @param path  the path of the file to be checked
272
   * @return      true if the file was found in the backend storage
273
   */
274
  virtual bool Peek(const std::string &path) const = 0;
275
276
  /**
277
   * Creates a top-level shortcut to the given data object. This is particularly
278
   * useful for bootstrapping repositories whose data-directory is secured by
279
   * a VOMS certificate.
280
   *
281
   * @param object  content hash of the object to be exposed on the top-level
282
   * @return        true on success
283
   */
284
  virtual bool PlaceBootstrappingShortcut(const shash::Any &object) const = 0;
285
286
  /**
287
   * Waits until the current upload queue is empty.
288
   *
289
   * Note: This does NOT necessarily mean, that all files are actuall uploaded.
290
   *       If new jobs are concurrently scheduled the behavior of this method is
291
   *       not defined (it returns also on intermediately empty queues)
292
   */
293
  virtual void WaitForUpload() const;
294
295
  virtual unsigned int GetNumberOfErrors() const = 0;
296
  static void RegisterPlugins();
297
  void InitCounters(perf::StatisticsTemplate *statistics);
298
299
 protected:
300
  typedef Callbackable<UploaderResults>::CallbackTN *CallbackPtr;
301
302
  explicit AbstractUploader(const SpoolerDefinition &spooler_definition);
303
304
  /**
305
   * Implementation of plain file upload
306
   * Public interface: AbstractUploader::Upload()
307
   *
308
   * @param local_path   file to be uploaded
309
   * @param remote_path  destination to be written in the backend
310
   * @param callback     callback to be called on completion
311
   */
312
  virtual void FileUpload(const std::string &local_path,
313
                          const std::string &remote_path,
314
                          const CallbackTN *callback = NULL) = 0;
315
316
  /**
317
   * Implementation of a streamed upload step. See public interface for details.
318
   * Public interface: AbstractUploader::ScheduleUpload()
319
   *
320
   * @param handle     decendant of UploadStreamHandle specifying the stream
321
   * @param buffer     the CharBuffer to be uploaded to the stream
322
   * @param callback   callback to be called on completion
323
   */
324
  virtual void StreamedUpload(UploadStreamHandle *handle,
325
                              UploadBuffer buffer,
326
                              const CallbackTN *callback) = 0;
327
328
  /**
329
   * Implemetation of streamed upload commit
330
   * Public interface: AbstractUploader::ScheduleUpload()
331
   *
332
   * @param handle        decendant of UploadStreamHandle specifying the stream
333
   * @param content_hash  the computed content hash of the streamed object
334
   */
335
  virtual void FinalizeStreamedUpload(UploadStreamHandle *handle,
336
                                      const shash::Any &content_hash) = 0;
337
338
339
  virtual void DoRemoveAsync(const std::string &file_to_delete) = 0;
340
341
  virtual int64_t DoGetObjectSize(const std::string &file_name) = 0;
342
343
  /**
344
   * This notifies the callback that is associated to a finishing job. Please
345
   * do not call the handed callback yourself in concrete Uploaders!
346
   *
347
   * Note: Since the job is finished after we respond to it, the callback object
348
   *       gets automatically destroyed by this call!
349
   *       Therefore you must not call Respond() twice or use the callback later
350
   *       by any means!
351
   */
352
590640
  void Respond(const CallbackTN *callback,
353
               const UploaderResults &result) const
354
  {
355
590640
    if (callback != NULL) {
356
590420
      (*callback)(result);
357
590420
      delete callback;
358
    }
359
360
590640
    --jobs_in_flight_;
361
590640
  }
362
363
  /**
364
   * Creates a temporary file in the backend storage's temporary location
365
   * For the LocalUploader this usually is the 'txn' directory of the backend
366
   * storage. Otherwise it is some scratch area.
367
   *
368
   * @param path   pointer to a string that will contain the created file path
369
   * @return       a file descriptor to the opened file
370
   */
371
  int CreateAndOpenTemporaryChunkFile(std::string *path) const;
372
373
  const SpoolerDefinition &spooler_definition() const {
374
    return spooler_definition_;
375
  }
376
377
  void CountUploadedBytes(int64_t bytes_written) const;
378
379
  void CountDuplicates() const;
380
381
 private:
382
  const SpoolerDefinition spooler_definition_;
383
384
  /**
385
   * Number of threads used for I/O write calls. Effectively this paramater
386
   * sets the I/O depth. Defaults to 1.
387
   */
388
  unsigned num_upload_tasks_;
389
  mutable SynchronizingCounter<int32_t> jobs_in_flight_;
390
  TubeGroup<UploadJob> tubes_upload_;
391
  TubeConsumerGroup<UploadJob> tasks_upload_;
392
  mutable UniquePtr<UploadCounters> counters_;
393
};  // class AbstractUploader
394
395
396
/**
397
 * The actual writing is multi-threaded.
398
 */
399
682
class TaskUpload : public TubeConsumer<AbstractUploader::UploadJob> {
400
 public:
401
341
  explicit TaskUpload(
402
    AbstractUploader *uploader,
403
    Tube<AbstractUploader::UploadJob> *tube)
404
    : TubeConsumer<AbstractUploader::UploadJob>(tube)
405
341
    , uploader_(uploader)
406
341
  { }
407
408
 protected:
409
  virtual void Process(AbstractUploader::UploadJob *upload_job);
410
411
 private:
412
  AbstractUploader *uploader_;
413
};
414
415
416
/**
417
 * Each implementation of AbstractUploader must provide its own derivate of the
418
 * UploadStreamHandle that is supposed to contain state information for the
419
 * streamed upload of one specific chunk.
420
 * Each UploadStreamHandle contains a callback object that is invoked as soon as
421
 * the streamed upload is committed.
422
 */
423
struct UploadStreamHandle {
424
  typedef AbstractUploader::CallbackTN CallbackTN;
425
  static atomic_int64 g_upload_stream_tag;
426
427
2824
  explicit UploadStreamHandle(const CallbackTN *commit_callback)
428
      : commit_callback(commit_callback)
429
2824
      , tag(atomic_xadd64(&g_upload_stream_tag, 1)) {}
430
2824
  virtual ~UploadStreamHandle() {}
431
432
  const CallbackTN *commit_callback;
433
434
  int64_t tag;
435
};
436
437
}  // namespace upload
438
439
#endif  // CVMFS_UPLOAD_FACILITY_H_