CernVM-FS  2.9.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
upload_facility.cc
Go to the documentation of this file.
1 
5 #include "upload_facility.h"
6 
7 #include <cassert>
8 
9 #include "upload_gateway.h"
10 #include "upload_local.h"
11 #include "upload_s3.h"
12 #include "util/exception.h"
13 
14 namespace upload {
15 
17 
19  UploadStreamHandle *handle,
20  UploadBuffer buffer,
21  const CallbackTN *callback)
22  : type(Upload)
23  , stream_handle(handle)
24  , tag_(handle->tag)
25  , buffer(buffer)
26  , callback(callback)
27 { }
28 
30  UploadStreamHandle *handle,
31  const shash::Any &content_hash)
32  : type(Commit)
33  , stream_handle(handle)
34  , tag_(handle->tag)
35  , buffer()
36  , callback(NULL)
37  , content_hash(content_hash)
38 { }
39 
41  RegisterPlugin<LocalUploader>();
42  RegisterPlugin<S3Uploader>();
43  RegisterPlugin<GatewayUploader>();
44 }
45 
47  : spooler_definition_(spooler_definition)
48  , num_upload_tasks_(spooler_definition.num_upload_tasks)
49  , jobs_in_flight_(spooler_definition.number_of_concurrent_uploads)
50 { }
51 
52 
54  for (unsigned i = 0; i < GetNumTasks(); ++i) {
56  tubes_upload_.TakeTube(t);
57  tasks_upload_.TakeConsumer(new TaskUpload(this, t));
58  }
59  tubes_upload_.Activate();
60  tasks_upload_.Spawn();
61  return true;
62 }
63 
64 bool AbstractUploader::FinalizeSession(bool /*commit*/,
65  const std::string & /*old_root_hash*/,
66  const std::string & /*new_root_hash*/,
67  const RepositoryTag & /*tag*/) {
68  return true;
69 }
70 
71 
73  const std::string tmp_path =
74  CreateTempPath(spooler_definition_.temporary_path + "/" + "chunk", 0644);
75  if (tmp_path.empty()) {
77  "Failed to create temp file in %s for upload of file chunk"
78  " (errno: %d).",
79  spooler_definition_.temporary_path.c_str(), errno);
80  return -1;
81  }
82 
83  const int tmp_fd = open(tmp_path.c_str(), O_WRONLY);
84  if (tmp_fd < 0) {
86  "Failed to open temp file '%s' for upload of file chunk "
87  "(errno: %d)",
88  tmp_path.c_str(), errno);
89  unlink(tmp_path.c_str());
90  } else {
91  *path = tmp_path;
92  }
93 
94  return tmp_fd;
95 }
96 
98  tasks_upload_.Terminate();
99 }
100 
102 
104  counters_ = new UploadCounters(*statistics);
105 }
106 
108  if (counters_.IsValid()) {
109  perf::Inc(counters_->n_chunks_added);
110  }
111 }
112 
114  if (counters_.IsValid()) {
115  perf::Dec(counters_->n_chunks_added);
116  }
117 }
118 
119 void AbstractUploader::CountUploadedBytes(int64_t bytes_written) const {
120  if (counters_.IsValid()) {
121  perf::Xadd(counters_->sz_uploaded_bytes, bytes_written);
122  }
123 }
124 
126  if (counters_.IsValid()) {
127  perf::Inc(counters_->n_chunks_duplicated);
128  }
129 }
130 
132  if (counters_.IsValid()) {
133  perf::Inc(counters_->n_catalogs_added);
134  }
135 }
136 
137 void AbstractUploader::CountUploadedCatalogBytes(int64_t bytes_written) const {
138  if (counters_.IsValid()) {
139  perf::Xadd(counters_->sz_uploaded_catalog_bytes, bytes_written);
140  }
141 }
142 
143 //------------------------------------------------------------------------------
144 
145 
147  switch (upload_job->type) {
150  upload_job->stream_handle, upload_job->buffer, upload_job->callback);
151  break;
152 
155  upload_job->stream_handle, upload_job->content_hash);
156  break;
157 
158  default:
159  PANIC(NULL);
160  }
161 
162  delete upload_job;
163 }
164 
165 } // namespace upload
#define LogCvmfs(source, mask,...)
Definition: logging.h:20
virtual void WaitForUpload() const
void Dec(class Counter *counter)
Definition: statistics.h:49
void InitCounters(perf::StatisticsTemplate *statistics)
int64_t Xadd(class Counter *counter, const int64_t delta)
Definition: statistics.h:51
int64_t atomic_int64
Definition: atomic.h:18
UniquePtr< UploadCounters > counters_
SynchronizingCounter< int32_t > jobs_in_flight_
int CreateAndOpenTemporaryChunkFile(std::string *path) const
#define PANIC(...)
Definition: exception.h:26
std::string CreateTempPath(const std::string &path_prefix, const int mode)
Definition: posix.cc:1054
const SpoolerDefinition & spooler_definition() const
void CountUploadedChunks() const
virtual unsigned GetNumTasks() const
virtual bool FinalizeSession(bool commit, const std::string &old_root_hash, const std::string &new_root_hash, const RepositoryTag &tag)
std::string temporary_path
scratch space for the IngestionPipeline
virtual void FinalizeStreamedUpload(UploadStreamHandle *handle, const shash::Any &content_hash)=0
static atomic_int64 g_upload_stream_tag
void CountUploadedBytes(int64_t bytes_written) const
virtual void StreamedUpload(UploadStreamHandle *handle, UploadBuffer buffer, const CallbackTN *callback)=0
AbstractUploader(const SpoolerDefinition &spooler_definition)
void Inc(class Counter *counter)
Definition: statistics.h:50
void CountUploadedCatalogBytes(int64_t bytes_written) const
void WaitForZero() const
void CountUploadedCatalogs() const
TubeGroup< UploadJob > tubes_upload_
Definition: tube.h:39
const SpoolerDefinition spooler_definition_
TubeConsumerGroup< UploadJob > tasks_upload_
virtual void Process(AbstractUploader::UploadJob *upload_job)
AbstractUploader * uploader_