CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
upload.cc
Go to the documentation of this file.
1 
5 #include "upload.h"
6 
7 #include <vector>
8 
9 #include "util/concurrency.h"
10 #include "util/shared_ptr.h"
11 
12 namespace upload {
13 
14 Spooler *Spooler::Construct(const SpoolerDefinition &spooler_definition,
15  perf::StatisticsTemplate *statistics) {
16  Spooler *result = new Spooler(spooler_definition);
17  if (!result->Initialize(statistics)) {
18  delete result;
19  result = NULL;
20  }
21  return result;
22 }
23 
24 Spooler::Spooler(const SpoolerDefinition &spooler_definition)
25  : spooler_definition_(spooler_definition) { }
26 
27 Spooler::~Spooler() {
28  FinalizeSession(false);
29  if (uploader_.IsValid()) {
30  uploader_->TearDown();
31  }
32 }
33 
34 std::string Spooler::backend_name() const { return uploader_->name(); }
35 
36 bool Spooler::Initialize(perf::StatisticsTemplate *statistics) {
37  // configure the uploader environment
38  uploader_ = AbstractUploader::Construct(spooler_definition_);
39  if (!uploader_.IsValid()) {
41  "Failed to initialize backend upload "
42  "facility in Spooler.");
43  return false;
44  }
45 
46  if (statistics != NULL) {
47  uploader_->InitCounters(statistics);
48  }
49 
50  // configure the file processor context
51  ingestion_pipeline_ = new IngestionPipeline(uploader_.weak_ref(),
52  spooler_definition_);
53  ingestion_pipeline_->RegisterListener(&Spooler::ProcessingCallback, this);
54  ingestion_pipeline_->Spawn();
55 
56  // all done...
57  return true;
58 }
59 
60 bool Spooler::Create() { return uploader_->Create(); }
61 
62 void Spooler::Process(IngestionSource *source, const bool allow_chunking) {
63  ingestion_pipeline_->Process(source, allow_chunking);
64 }
65 
66 void Spooler::ProcessCatalog(const std::string &local_path) {
67  ingestion_pipeline_->Process(new FileIngestionSource(local_path), false,
69 }
70 
71 void Spooler::ProcessHistory(const std::string &local_path) {
72  ingestion_pipeline_->Process(new FileIngestionSource(local_path), false,
74 }
75 
76 void Spooler::ProcessCertificate(const std::string &local_path) {
77  ingestion_pipeline_->Process(new FileIngestionSource(local_path), false,
79 }
80 
81 void Spooler::ProcessCertificate(IngestionSource *source) {
82  ingestion_pipeline_->Process(source, false, shash::kSuffixCertificate);
83 }
84 
85 void Spooler::ProcessMetainfo(const std::string &local_path) {
86  ingestion_pipeline_->Process(new FileIngestionSource(local_path), false,
88 }
89 
90 void Spooler::ProcessMetainfo(IngestionSource *source) {
91  ingestion_pipeline_->Process(source, false, shash::kSuffixMetainfo);
92 }
93 
94 void Spooler::Upload(const std::string &local_path,
95  const std::string &remote_path) {
96  uploader_->UploadFile(
97  local_path, remote_path,
98  AbstractUploader::MakeCallback(&Spooler::UploadingCallback, this));
99 }
100 
101 void Spooler::Upload(const std::string &remote_path, IngestionSource *source) {
102  uploader_->UploadIngestionSource(
103  remote_path, source,
104  AbstractUploader::MakeCallback(&Spooler::UploadingCallback, this));
105  delete source;
106 }
107 
108 
109 void Spooler::UploadManifest(const std::string &local_path) {
110  Upload(local_path, ".cvmfspublished");
111 }
112 
113 void Spooler::UploadReflog(const std::string &local_path) {
114  Upload(local_path, ".cvmfsreflog");
115 }
116 
117 void Spooler::RemoveAsync(const std::string &file_to_delete) {
118  uploader_->RemoveAsync(file_to_delete);
119 }
120 
121 bool Spooler::Peek(const std::string &path) const {
122  return uploader_->Peek(path);
123 }
124 
125 bool Spooler::Mkdir(const std::string &path) { return uploader_->Mkdir(path); }
126 
127 bool Spooler::PlaceBootstrappingShortcut(const shash::Any &object) const {
128  assert(!object.IsNull());
129  return uploader_->PlaceBootstrappingShortcut(object);
130 }
131 
132 void Spooler::ProcessingCallback(const SpoolerResult &data) {
133  NotifyListeners(data);
134 }
135 
136 void Spooler::UploadingCallback(const UploaderResults &data) {
137  NotifyListeners(SpoolerResult(data.return_code, data.local_path));
138 }
139 
140 void Spooler::WaitForUpload() const {
141  ingestion_pipeline_->WaitFor();
142  uploader_->WaitForUpload();
143 }
144 
145 bool Spooler::FinalizeSession(bool commit, const std::string &old_root_hash,
146  const std::string &new_root_hash,
147  const RepositoryTag &tag) const {
148  return uploader_->FinalizeSession(commit, old_root_hash, new_root_hash, tag);
149 }
150 
151 unsigned int Spooler::GetNumberOfErrors() const {
152  return uploader_->GetNumberOfErrors();
153 }
154 
155 } // namespace upload
CallbackPtr RegisterListener(typename BoundClosure< ParamT, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data)
static Publisher * Create(const SettingsPublisher &settings)
CVMFS_EXPORT const LogSource source
Definition: exception.h:33
const char kSuffixCertificate
Definition: hash.h:59
const int kLogWarning
assert((mem||(size==0))&&"Out Of Memory")
const char kSuffixCatalog
Definition: hash.h:54
const char kSuffixMetainfo
Definition: hash.h:60
const char kSuffixHistory
Definition: hash.h:55
static bool Peek(const string &remote_path)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545