CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
upload_facility.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_UPLOAD_FACILITY_H_
6 #define CVMFS_UPLOAD_FACILITY_H_
7 
8 #include <fcntl.h>
9 #include <stdint.h>
10 
11 #include <string>
12 
14 #include "ingestion/task.h"
15 #include "repository_tag.h"
16 #include "statistics.h"
18 #include "util/atomic.h"
19 #include "util/concurrency.h"
20 #include "util/posix.h"
21 #include "util/tube.h"
22 
23 namespace upload {
24 
31 
34  "n_chunks_added", "Number of new chunks added");
36  "n_chunks_duplicated", "Number of duplicated chunks added");
38  "n_catalogs_added", "Number of new catalogs added");
40  "sz_uploaded_bytes", "Number of uploaded bytes");
42  "sz_uploaded_catalog_bytes", "Number of uploaded bytes for catalogs");
43  }
44 }; // UploadCounters
45 
48 
49  UploaderResults(const int return_code, const std::string &local_path)
50  : type(kFileUpload),
51  return_code(return_code),
52  local_path(local_path) {}
53 
54  explicit UploaderResults(Type t, const int return_code)
55  : type(t),
56  return_code(return_code),
57  local_path("") {}
58 
60  : type(kRemove)
61  , return_code(0)
62  { }
63 
64  const Type type;
65  const int return_code;
66  const std::string local_path;
67 };
68 
69 struct UploadStreamHandle;
70 
84  : public PolymorphicConstruction<AbstractUploader, SpoolerDefinition>
85  , public Callbackable<UploaderResults>
86  , public SingleCopy {
87  friend class TaskUpload;
88 
89  public:
93  struct UploadBuffer {
94  UploadBuffer() : size(0), data(NULL) { }
95  UploadBuffer(uint64_t s, const void *d) : size(s), data(d) { }
96  uint64_t size;
97  const void *data;
98  };
99 
100  struct JobStatus {
102  };
103 
104  struct UploadJob {
106 
108  const CallbackTN *callback = NULL);
110 
112  : type(Terminate)
113  , stream_handle(NULL)
114  , tag_(0)
115  , buffer()
116  , callback(NULL) {}
117 
118  static UploadJob *CreateQuitBeacon() { return new UploadJob(); }
119  bool IsQuitBeacon() { return type == Terminate; }
120 
127  int64_t tag_;
128  int64_t tag() { return tag_; }
129 
130  // type==Upload specific fields
133 
134  // type==Commit specific fields
136  };
137 
138  virtual ~AbstractUploader() { assert(!tasks_upload_.is_active()); }
139 
143  virtual std::string name() const = 0;
144 
149  virtual bool Create() = 0;
150 
157  virtual unsigned GetNumTasks() const { return num_upload_tasks_; }
158 
166  virtual bool Initialize();
167 
175  virtual bool FinalizeSession(bool commit, const std::string &old_root_hash,
176  const std::string &new_root_hash,
177  const RepositoryTag &tag);
178 
183  void TearDown();
184 
195  const std::string &local_path,
196  const std::string &remote_path,
197  const CallbackTN *callback = NULL)
198  {
199  ++jobs_in_flight_;
200  FileIngestionSource source(local_path);
201  DoUpload(remote_path, &source, callback);
202  }
203 
205  const std::string &remote_path,
206  IngestionSource *source,
207  const CallbackTN *callback = NULL)
208  {
209  ++jobs_in_flight_;
210  DoUpload(remote_path, source, callback);
211  }
212 
222  const CallbackTN *callback) = 0;
223 
239  UploadStreamHandle *handle,
240  UploadBuffer buffer,
241  const CallbackTN *callback = NULL)
242  {
243  ++jobs_in_flight_;
244  tubes_upload_.Dispatch(new UploadJob(handle, buffer, callback));
245  }
246 
256  UploadStreamHandle *handle,
257  const shash::Any &content_hash)
258  {
259  ++jobs_in_flight_;
260  tubes_upload_.Dispatch(new UploadJob(handle, content_hash));
261  }
262 
270  void RemoveAsync(const std::string &file_to_delete) {
271  ++jobs_in_flight_;
272  DoRemoveAsync(file_to_delete);
273  }
274 
280  void RemoveAsync(const shash::Any &hash_to_delete) {
281  RemoveAsync("data/" + hash_to_delete.MakePath());
282  }
283 
289  int64_t GetObjectSize(const shash::Any &hash) {
290  return DoGetObjectSize("data/" + hash.MakePath());
291  }
292 
300  virtual bool Peek(const std::string &path) = 0;
301 
308  virtual bool Mkdir(const std::string &path) = 0;
309 
318  virtual bool PlaceBootstrappingShortcut(const shash::Any &object) = 0;
319 
327  virtual void WaitForUpload() const;
328 
329  virtual unsigned int GetNumberOfErrors() const = 0;
330  static void RegisterPlugins();
331  void InitCounters(perf::StatisticsTemplate *statistics);
332 
333  protected:
335 
337 
346  virtual void DoUpload(const std::string &remote_path,
347  IngestionSource *source,
348  const CallbackTN *callback) = 0;
349 
358  virtual void StreamedUpload(UploadStreamHandle *handle,
359  UploadBuffer buffer,
360  const CallbackTN *callback) = 0;
361 
369  virtual void FinalizeStreamedUpload(UploadStreamHandle *handle,
370  const shash::Any &content_hash) = 0;
371 
372 
373  virtual void DoRemoveAsync(const std::string &file_to_delete) = 0;
374 
375  virtual int64_t DoGetObjectSize(const std::string &file_name) = 0;
376 
386  void Respond(const CallbackTN *callback,
387  const UploaderResults &result) const
388  {
389  if (callback != NULL) {
390  (*callback)(result);
391  delete callback;
392  }
393 
394  --jobs_in_flight_;
395  }
396 
405  int CreateAndOpenTemporaryChunkFile(std::string *path) const;
406 
408  return spooler_definition_;
409  }
410 
411  void CountUploadedChunks() const;
412  void DecUploadedChunks() const;
413  void CountUploadedBytes(int64_t bytes_written) const;
414  void CountDuplicates() const;
415  void CountUploadedCatalogs() const;
416  void CountUploadedCatalogBytes(int64_t bytes_written) const;
417 
418  protected:
424  ++jobs_in_flight_;
425  }
426 
427  private:
429 
439 }; // class AbstractUploader
440 
441 
445 class TaskUpload : public TubeConsumer<AbstractUploader::UploadJob> {
446  public:
447  explicit TaskUpload(
448  AbstractUploader *uploader,
450  : TubeConsumer<AbstractUploader::UploadJob>(tube)
451  , uploader_(uploader)
452  { }
453 
454  protected:
455  virtual void Process(AbstractUploader::UploadJob *upload_job);
456 
457  private:
459 };
460 
461 
472 
474  : commit_callback(commit_callback)
475  , tag(atomic_xadd64(&g_upload_stream_tag, 1)) {}
476  virtual ~UploadStreamHandle() {}
477 
479 
480  int64_t tag;
481 
482  std::string remote_path; // override remote location of the object
483 };
484 
485 } // namespace upload
486 
487 #endif // CVMFS_UPLOAD_FACILITY_H_
virtual void WaitForUpload() const
void InitCounters(perf::StatisticsTemplate *statistics)
void ScheduleUpload(UploadStreamHandle *handle, UploadBuffer buffer, const CallbackTN *callback=NULL)
perf::Counter * n_chunks_duplicated
virtual void DoRemoveAsync(const std::string &file_to_delete)=0
UploadBuffer(uint64_t s, const void *d)
int64_t atomic_int64
Definition: atomic.h:18
UniquePtr< UploadCounters > counters_
SynchronizingCounter< int32_t > jobs_in_flight_
int CreateAndOpenTemporaryChunkFile(std::string *path) const
virtual bool Mkdir(const std::string &path)=0
virtual UploadStreamHandle * InitStreamedUpload(const CallbackTN *callback)=0
void Respond(const CallbackTN *callback, const UploaderResults &result) const
virtual int64_t DoGetObjectSize(const std::string &file_name)=0
Counter * RegisterOrLookupTemplated(const std::string &name_minor, const std::string &desc)
Definition: statistics.h:117
void ScheduleCommit(UploadStreamHandle *handle, const shash::Any &content_hash)
assert((mem||(size==0))&&"Out Of Memory")
const SpoolerDefinition & spooler_definition() const
TaskUpload(AbstractUploader *uploader, Tube< AbstractUploader::UploadJob > *tube)
void CountUploadedChunks() const
virtual bool PlaceBootstrappingShortcut(const shash::Any &object)=0
virtual unsigned GetNumTasks() const
const CallbackTN * commit_callback
virtual bool FinalizeSession(bool commit, const std::string &old_root_hash, const std::string &new_root_hash, const RepositoryTag &tag)
int64_t GetObjectSize(const shash::Any &hash)
void RemoveAsync(const shash::Any &hash_to_delete)
virtual void FinalizeStreamedUpload(UploadStreamHandle *handle, const shash::Any &content_hash)=0
virtual bool Peek(const std::string &path)=0
Callbackable< UploaderResults >::CallbackTN * CallbackPtr
perf::Counter * sz_uploaded_catalog_bytes
static atomic_int64 g_upload_stream_tag
void CountUploadedBytes(int64_t bytes_written) const
virtual void StreamedUpload(UploadStreamHandle *handle, UploadBuffer buffer, const CallbackTN *callback)=0
UploaderResults(const int return_code, const std::string &local_path)
AbstractUploader(const SpoolerDefinition &spooler_definition)
virtual unsigned int GetNumberOfErrors() const =0
virtual std::string name() const =0
UploaderResults(Type t, const int return_code)
void UploadFile(const std::string &local_path, const std::string &remote_path, const CallbackTN *callback=NULL)
AbstractUploader::CallbackTN CallbackTN
void CountUploadedCatalogBytes(int64_t bytes_written) const
perf::Counter * n_chunks_added
void RemoveAsync(const std::string &file_to_delete)
virtual void DoUpload(const std::string &remote_path, IngestionSource *source, const CallbackTN *callback)=0
void CountUploadedCatalogs() const
TubeGroup< UploadJob > tubes_upload_
const std::string local_path
perf::Counter * n_catalogs_added
const SpoolerDefinition spooler_definition_
TubeConsumerGroup< UploadJob > tasks_upload_
std::string MakePath() const
Definition: hash.h:316
void UploadIngestionSource(const std::string &remote_path, IngestionSource *source, const CallbackTN *callback=NULL)
virtual bool Create()=0
CallbackBase< UploaderResults > CallbackTN
Definition: async.h:192
UploadCounters(perf::StatisticsTemplate statistics)
virtual void Process(AbstractUploader::UploadJob *upload_job)
perf::Counter * sz_uploaded_bytes
AbstractUploader * uploader_
UploadStreamHandle(const CallbackTN *commit_callback)