CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
upload_facility.cc
Go to the documentation of this file.
1 
5 #include "upload_facility.h"
6 
7 #include <cassert>
8 
9 #include "upload_gateway.h"
10 #include "upload_local.h"
11 #include "upload_s3.h"
12 #include "util/exception.h"
13 
14 namespace upload {
15 
17 
19  UploadBuffer buffer,
20  const CallbackTN *callback)
21  : type(Upload)
22  , stream_handle(handle)
23  , tag_(handle->tag)
24  , buffer(buffer)
25  , callback(callback) { }
26 
28  const shash::Any &content_hash)
29  : type(Commit)
30  , stream_handle(handle)
31  , tag_(handle->tag)
32  , buffer()
33  , callback(NULL)
34  , content_hash(content_hash) { }
35 
37  RegisterPlugin<LocalUploader>();
38  RegisterPlugin<S3Uploader>();
39  RegisterPlugin<GatewayUploader>();
40 }
41 
43  : spooler_definition_(spooler_definition)
44  , num_upload_tasks_(spooler_definition.num_upload_tasks)
45  , jobs_in_flight_(spooler_definition.number_of_concurrent_uploads) { }
46 
47 
49  for (unsigned i = 0; i < GetNumTasks(); ++i) {
51  tubes_upload_.TakeTube(t);
52  tasks_upload_.TakeConsumer(new TaskUpload(this, t));
53  }
54  tubes_upload_.Activate();
55  tasks_upload_.Spawn();
56  return true;
57 }
58 
59 bool AbstractUploader::FinalizeSession(bool /*commit*/,
60  const std::string & /*old_root_hash*/,
61  const std::string & /*new_root_hash*/,
62  const RepositoryTag & /*tag*/) {
63  return true;
64 }
65 
66 
68  const std::string tmp_path = CreateTempPath(
69  spooler_definition_.temporary_path + "/" + "chunk", 0644);
70  if (tmp_path.empty()) {
72  "Failed to create temp file in %s for upload of file chunk"
73  " (errno: %d).",
74  spooler_definition_.temporary_path.c_str(), errno);
75  return -1;
76  }
77 
78  const int tmp_fd = open(tmp_path.c_str(), O_WRONLY);
79  if (tmp_fd < 0) {
81  "Failed to open temp file '%s' for upload of file chunk "
82  "(errno: %d)",
83  tmp_path.c_str(), errno);
84  unlink(tmp_path.c_str());
85  } else {
86  *path = tmp_path;
87  }
88 
89  return tmp_fd;
90 }
91 
93 
95 
97  counters_ = new UploadCounters(*statistics);
98 }
99 
101  if (counters_.IsValid()) {
102  perf::Inc(counters_->n_chunks_added);
103  }
104 }
105 
107  if (counters_.IsValid()) {
108  perf::Dec(counters_->n_chunks_added);
109  }
110 }
111 
112 void AbstractUploader::CountUploadedBytes(int64_t bytes_written) const {
113  if (counters_.IsValid()) {
114  perf::Xadd(counters_->sz_uploaded_bytes, bytes_written);
115  }
116 }
117 
119  if (counters_.IsValid()) {
120  perf::Inc(counters_->n_chunks_duplicated);
121  }
122 }
123 
125  if (counters_.IsValid()) {
126  perf::Inc(counters_->n_catalogs_added);
127  }
128 }
129 
130 void AbstractUploader::CountUploadedCatalogBytes(int64_t bytes_written) const {
131  if (counters_.IsValid()) {
132  perf::Xadd(counters_->sz_uploaded_catalog_bytes, bytes_written);
133  }
134 }
135 
136 //------------------------------------------------------------------------------
137 
138 
140  switch (upload_job->type) {
142  uploader_->StreamedUpload(upload_job->stream_handle, upload_job->buffer,
143  upload_job->callback);
144  break;
145 
148  upload_job->content_hash);
149  break;
150 
151  default:
152  PANIC(NULL);
153  }
154 
155  delete upload_job;
156 }
157 
158 } // namespace upload
virtual void WaitForUpload() const
void Dec(class Counter *counter)
Definition: statistics.h:49
void InitCounters(perf::StatisticsTemplate *statistics)
int64_t Xadd(class Counter *counter, const int64_t delta)
Definition: statistics.h:51
int64_t atomic_int64
Definition: atomic.h:18
UniquePtr< UploadCounters > counters_
SynchronizingCounter< int32_t > jobs_in_flight_
int CreateAndOpenTemporaryChunkFile(std::string *path) const
#define PANIC(...)
Definition: exception.h:29
std::string CreateTempPath(const std::string &path_prefix, const int mode)
Definition: posix.cc:1042
const SpoolerDefinition & spooler_definition() const
void CountUploadedChunks() const
virtual unsigned GetNumTasks() const
virtual bool FinalizeSession(bool commit, const std::string &old_root_hash, const std::string &new_root_hash, const RepositoryTag &tag)
std::string temporary_path
scratch space for the IngestionPipeline
virtual void FinalizeStreamedUpload(UploadStreamHandle *handle, const shash::Any &content_hash)=0
static atomic_int64 g_upload_stream_tag
void CountUploadedBytes(int64_t bytes_written) const
virtual void StreamedUpload(UploadStreamHandle *handle, UploadBuffer buffer, const CallbackTN *callback)=0
AbstractUploader(const SpoolerDefinition &spooler_definition)
void Inc(class Counter *counter)
Definition: statistics.h:50
void CountUploadedCatalogBytes(int64_t bytes_written) const
void WaitForZero() const
Definition: concurrency.h:175
void CountUploadedCatalogs() const
TubeGroup< UploadJob > tubes_upload_
Definition: tube.h:39
const SpoolerDefinition spooler_definition_
TubeConsumerGroup< UploadJob > tasks_upload_
virtual void Process(AbstractUploader::UploadJob *upload_job)
AbstractUploader * uploader_
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545