CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
upload_facility.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_UPLOAD_FACILITY_H_
6 #define CVMFS_UPLOAD_FACILITY_H_
7 
8 #include <fcntl.h>
9 #include <stdint.h>
10 
11 #include <string>
12 
14 #include "ingestion/task.h"
15 #include "repository_tag.h"
16 #include "statistics.h"
18 #include "util/atomic.h"
19 #include "util/concurrency.h"
20 #include "util/posix.h"
21 #include "util/tube.h"
22 
23 namespace upload {
24 
31 
34  "n_chunks_added", "Number of new chunks added");
36  "n_chunks_duplicated", "Number of duplicated chunks added");
38  "n_catalogs_added", "Number of new catalogs added");
40  "sz_uploaded_bytes", "Number of uploaded bytes");
42  "sz_uploaded_catalog_bytes", "Number of uploaded bytes for catalogs");
43  }
44 }; // UploadCounters
45 
47  enum Type {
53  };
54 
55  UploaderResults(const int return_code, const std::string &local_path)
56  : type(kFileUpload), return_code(return_code), local_path(local_path) { }
57 
58  explicit UploaderResults(Type t, const int return_code)
59  : type(t), return_code(return_code), local_path("") { }
60 
62 
63  const Type type;
64  const int return_code;
65  const std::string local_path;
66 };
67 
68 struct UploadStreamHandle;
69 
83  : public PolymorphicConstruction<AbstractUploader, SpoolerDefinition>,
84  public Callbackable<UploaderResults>,
85  public SingleCopy {
86  friend class TaskUpload;
87 
88  public:
92  struct UploadBuffer {
93  UploadBuffer() : size(0), data(NULL) { }
94  UploadBuffer(uint64_t s, const void *d) : size(s), data(d) { }
95  uint64_t size;
96  const void *data;
97  };
98 
99  struct JobStatus {
100  enum State {
104  };
105  };
106 
107  struct UploadJob {
108  enum Type {
112  };
113 
115  const CallbackTN *callback = NULL);
117 
119  : type(Terminate)
120  , stream_handle(NULL)
121  , tag_(0)
122  , buffer()
123  , callback(NULL) { }
124 
125  static UploadJob *CreateQuitBeacon() { return new UploadJob(); }
126  bool IsQuitBeacon() { return type == Terminate; }
127 
134  int64_t tag_;
135  int64_t tag() { return tag_; }
136 
137  // type==Upload specific fields
140 
141  // type==Commit specific fields
143  };
144 
145  virtual ~AbstractUploader() { assert(!tasks_upload_.is_active()); }
146 
150  virtual std::string name() const = 0;
151 
156  virtual bool Create() = 0;
157 
164  virtual unsigned GetNumTasks() const { return num_upload_tasks_; }
165 
173  virtual bool Initialize();
174 
182  virtual bool FinalizeSession(bool commit, const std::string &old_root_hash,
183  const std::string &new_root_hash,
184  const RepositoryTag &tag);
185 
190  void TearDown();
191 
201  void UploadFile(const std::string &local_path,
202  const std::string &remote_path,
203  const CallbackTN *callback = NULL) {
204  ++jobs_in_flight_;
205  FileIngestionSource source(local_path);
206  DoUpload(remote_path, &source, callback);
207  }
208 
209  void UploadIngestionSource(const std::string &remote_path,
211  const CallbackTN *callback = NULL) {
212  ++jobs_in_flight_;
213  DoUpload(remote_path, source, callback);
214  }
215 
225  const CallbackTN *callback) = 0;
226 
242  UploadBuffer buffer,
243  const CallbackTN *callback = NULL) {
244  ++jobs_in_flight_;
245  tubes_upload_.Dispatch(new UploadJob(handle, buffer, callback));
246  }
247 
257  const shash::Any &content_hash) {
258  ++jobs_in_flight_;
259  tubes_upload_.Dispatch(new UploadJob(handle, content_hash));
260  }
261 
269  void RemoveAsync(const std::string &file_to_delete) {
270  ++jobs_in_flight_;
271  DoRemoveAsync(file_to_delete);
272  }
273 
279  void RemoveAsync(const shash::Any &hash_to_delete) {
280  RemoveAsync("data/" + hash_to_delete.MakePath());
281  }
282 
288  int64_t GetObjectSize(const shash::Any &hash) {
289  return DoGetObjectSize("data/" + hash.MakePath());
290  }
291 
299  virtual bool Peek(const std::string &path) = 0;
300 
307  virtual bool Mkdir(const std::string &path) = 0;
308 
317  virtual bool PlaceBootstrappingShortcut(const shash::Any &object) = 0;
318 
326  virtual void WaitForUpload() const;
327 
328  virtual unsigned int GetNumberOfErrors() const = 0;
329  static void RegisterPlugins();
330  void InitCounters(perf::StatisticsTemplate *statistics);
331 
332  protected:
334 
336 
345  virtual void DoUpload(const std::string &remote_path,
347  const CallbackTN *callback) = 0;
348 
357  virtual void StreamedUpload(UploadStreamHandle *handle,
358  UploadBuffer buffer,
359  const CallbackTN *callback) = 0;
360 
368  virtual void FinalizeStreamedUpload(UploadStreamHandle *handle,
369  const shash::Any &content_hash) = 0;
370 
371 
372  virtual void DoRemoveAsync(const std::string &file_to_delete) = 0;
373 
374  virtual int64_t DoGetObjectSize(const std::string &file_name) = 0;
375 
385  void Respond(const CallbackTN *callback,
386  const UploaderResults &result) const {
387  if (callback != NULL) {
388  (*callback)(result);
389  delete callback;
390  }
391 
392  --jobs_in_flight_;
393  }
394 
403  int CreateAndOpenTemporaryChunkFile(std::string *path) const;
404 
406  return spooler_definition_;
407  }
408 
409  void CountUploadedChunks() const;
410  void DecUploadedChunks() const;
411  void CountUploadedBytes(int64_t bytes_written) const;
412  void CountDuplicates() const;
413  void CountUploadedCatalogs() const;
414  void CountUploadedCatalogBytes(int64_t bytes_written) const;
415 
416  protected:
422 
423  private:
425 
435 }; // class AbstractUploader
436 
437 
441 class TaskUpload : public TubeConsumer<AbstractUploader::UploadJob> {
442  public:
443  explicit TaskUpload(AbstractUploader *uploader,
445  : TubeConsumer<AbstractUploader::UploadJob>(tube), uploader_(uploader) { }
446 
447  protected:
448  virtual void Process(AbstractUploader::UploadJob *upload_job);
449 
450  private:
452 };
453 
454 
465 
467  : commit_callback(commit_callback)
468  , tag(atomic_xadd64(&g_upload_stream_tag, 1)) { }
469  virtual ~UploadStreamHandle() { }
470 
472 
473  int64_t tag;
474 
475  std::string remote_path; // override remote location of the object
476 };
477 
478 } // namespace upload
479 
480 #endif // CVMFS_UPLOAD_FACILITY_H_
virtual void WaitForUpload() const
void InitCounters(perf::StatisticsTemplate *statistics)
void ScheduleUpload(UploadStreamHandle *handle, UploadBuffer buffer, const CallbackTN *callback=NULL)
perf::Counter * n_chunks_duplicated
virtual void DoRemoveAsync(const std::string &file_to_delete)=0
UploadBuffer(uint64_t s, const void *d)
int64_t atomic_int64
Definition: atomic.h:18
UniquePtr< UploadCounters > counters_
SynchronizingCounter< int32_t > jobs_in_flight_
int CreateAndOpenTemporaryChunkFile(std::string *path) const
virtual bool Mkdir(const std::string &path)=0
virtual UploadStreamHandle * InitStreamedUpload(const CallbackTN *callback)=0
CVMFS_EXPORT const LogSource source
Definition: exception.h:33
void Respond(const CallbackTN *callback, const UploaderResults &result) const
virtual int64_t DoGetObjectSize(const std::string &file_name)=0
Counter * RegisterOrLookupTemplated(const std::string &name_minor, const std::string &desc)
Definition: statistics.h:114
void ScheduleCommit(UploadStreamHandle *handle, const shash::Any &content_hash)
assert((mem||(size==0))&&"Out Of Memory")
const SpoolerDefinition & spooler_definition() const
TaskUpload(AbstractUploader *uploader, Tube< AbstractUploader::UploadJob > *tube)
void CountUploadedChunks() const
virtual bool PlaceBootstrappingShortcut(const shash::Any &object)=0
virtual unsigned GetNumTasks() const
const CallbackTN * commit_callback
virtual bool FinalizeSession(bool commit, const std::string &old_root_hash, const std::string &new_root_hash, const RepositoryTag &tag)
int64_t GetObjectSize(const shash::Any &hash)
void RemoveAsync(const shash::Any &hash_to_delete)
virtual void FinalizeStreamedUpload(UploadStreamHandle *handle, const shash::Any &content_hash)=0
virtual bool Peek(const std::string &path)=0
Callbackable< UploaderResults >::CallbackTN * CallbackPtr
perf::Counter * sz_uploaded_catalog_bytes
static atomic_int64 g_upload_stream_tag
void CountUploadedBytes(int64_t bytes_written) const
virtual void StreamedUpload(UploadStreamHandle *handle, UploadBuffer buffer, const CallbackTN *callback)=0
UploaderResults(const int return_code, const std::string &local_path)
AbstractUploader(const SpoolerDefinition &spooler_definition)
virtual unsigned int GetNumberOfErrors() const =0
virtual std::string name() const =0
UploaderResults(Type t, const int return_code)
void UploadFile(const std::string &local_path, const std::string &remote_path, const CallbackTN *callback=NULL)
AbstractUploader::CallbackTN CallbackTN
void CountUploadedCatalogBytes(int64_t bytes_written) const
perf::Counter * n_chunks_added
void RemoveAsync(const std::string &file_to_delete)
virtual void DoUpload(const std::string &remote_path, IngestionSource *source, const CallbackTN *callback)=0
void CountUploadedCatalogs() const
TubeGroup< UploadJob > tubes_upload_
const std::string local_path
perf::Counter * n_catalogs_added
const SpoolerDefinition spooler_definition_
TubeConsumerGroup< UploadJob > tasks_upload_
std::string MakePath() const
Definition: hash.h:306
void UploadIngestionSource(const std::string &remote_path, IngestionSource *source, const CallbackTN *callback=NULL)
virtual bool Create()=0
CallbackBase< UploaderResults > CallbackTN
Definition: async.h:185
UploadCounters(perf::StatisticsTemplate statistics)
virtual void Process(AbstractUploader::UploadJob *upload_job)
perf::Counter * sz_uploaded_bytes
AbstractUploader * uploader_
UploadStreamHandle(const CallbackTN *commit_callback)