CernVM-FS  2.10.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
upload_gateway.cc
Go to the documentation of this file.
1 
5 #include "upload_gateway.h"
6 
7 #include <limits>
8 #include <vector>
9 
10 #include "gateway_util.h"
11 #include "logging.h"
12 #include "util/exception.h"
13 #include "util/string.h"
14 
15 namespace upload {
16 
19  : UploadStreamHandle(commit_callback), bucket(bkt) {}
20 
21 bool GatewayUploader::WillHandle(const SpoolerDefinition& spooler_definition) {
22  return spooler_definition.driver_type == SpoolerDefinition::Gateway;
23 }
24 
26  const SpoolerDefinition& spooler_definition,
27  GatewayUploader::Config* config) {
28  const std::string& config_string = spooler_definition.spooler_configuration;
29  if (!config) {
30  LogCvmfs(kLogUploadGateway, kLogStderr, "\"config\" argument is NULL");
31  return false;
32  }
33 
34  if (spooler_definition.session_token_file.empty()) {
36  "Failed to configure gateway uploader. "
37  "Missing session token file.\n");
38  return false;
39  }
40  config->session_token_file = spooler_definition.session_token_file;
41 
42  if (spooler_definition.key_file.empty()) {
44  "Failed to configure gateway uploader. "
45  "Missing API key file.\n");
46  return false;
47  }
48  config->key_file = spooler_definition.key_file;
49 
50  // Repo address, e.g. http://my.repo.address
51  config->api_url = config_string;
52 
53  return true;
54 }
55 
57  : AbstractUploader(spooler_definition),
58  config_(),
59  session_context_(new SessionContext()) {
60  assert(spooler_definition.IsValid() &&
61  spooler_definition.driver_type == SpoolerDefinition::Gateway);
62 
63  if (!ParseSpoolerDefinition(spooler_definition, &config_)) {
64  PANIC(kLogStderr, "Error in parsing the spooler definition");
65  }
66 
67  atomic_init32(&num_errors_);
68 }
69 
71  if (session_context_) {
72  delete session_context_;
73  }
74 }
75 
78  "cannot create repository storage area when using the gateway");
79  return false;
80 }
81 
84  return false;
85  }
86  std::string session_token;
87  if (!ReadSessionTokenFile(config_.session_token_file, &session_token)) {
88  return false;
89  }
90 
91  std::string key_id;
92  std::string secret;
93  if (!ReadKey(config_.key_file, &key_id, &secret)) {
94  return false;
95  }
96 
97  return session_context_->Initialize(config_.api_url, session_token, key_id,
98  secret);
99 }
100 
102  const std::string& old_root_hash,
103  const std::string& new_root_hash,
104  const RepositoryTag& tag) {
105  return session_context_->Finalize(commit, old_root_hash, new_root_hash, tag);
106 }
107 
110 }
111 
112 std::string GatewayUploader::name() const { return "HTTP"; }
113 
114 void GatewayUploader::DoRemoveAsync(const std::string& /*file_to_delete*/) {
115  atomic_inc32(&num_errors_);
116  Respond(NULL, UploaderResults());
117 }
118 
119 bool GatewayUploader::Peek(const std::string& /*path*/) { return false; }
120 
121 // TODO(jpriessn): implement Mkdir on gateway server-side
122 bool GatewayUploader::Mkdir(const std::string &path) {
123  return true;
124 }
125 
127  return false;
128 }
129 
131  return atomic_read32(&num_errors_);
132 }
133 
134 void GatewayUploader::DoUpload(const std::string& remote_path,
135  IngestionSource *source,
136  const CallbackTN* callback) {
139 
140  bool rvb = source->Open();
141  if (!rvb) {
143  "File upload - could not open local file.");
144  BumpErrors();
145  Respond(callback, UploaderResults(1, source->GetPath()));
146  return;
147  }
148 
149  unsigned char hash_ctx[shash::kMaxContextSize];
150  shash::ContextPtr hash_ctx_ptr(spooler_definition().hash_algorithm, hash_ctx);
151  shash::Init(hash_ctx_ptr);
152  std::vector<char> buf(1024);
153  ssize_t read_bytes = 0;
154  do {
155  read_bytes = source->Read(&buf[0], buf.size());
156  assert(read_bytes >= 0);
157  ObjectPack::AddToBucket(&buf[0], read_bytes, handle->bucket);
158  shash::Update(reinterpret_cast<unsigned char *>(&buf[0]), read_bytes,
159  hash_ctx_ptr);
160  } while (static_cast<size_t>(read_bytes) == buf.size());
161  source->Close();
162  shash::Any content_hash(spooler_definition().hash_algorithm);
163  shash::Final(hash_ctx_ptr, &content_hash);
164 
166  handle->bucket, remote_path)) {
168  "File upload - could not commit bucket");
169  BumpErrors();
170  Respond(handle->commit_callback, UploaderResults(2, source->GetPath()));
171  return;
172  }
173 
174  Respond(callback, UploaderResults(0, source->GetPath()));
175 }
176 
178  const CallbackTN* callback) {
179  return new GatewayStreamHandle(callback, session_context_->NewBucket());
180 }
181 
183  UploadBuffer buffer,
184  const CallbackTN* callback) {
185  GatewayStreamHandle* hd = dynamic_cast<GatewayStreamHandle*>(handle);
186  if (!hd) {
188  "Streamed upload - incompatible upload handle");
189  BumpErrors();
191  return;
192  }
193 
194  ObjectPack::AddToBucket(buffer.data, buffer.size, hd->bucket);
195 
197 }
198 
200  const shash::Any& content_hash) {
201  GatewayStreamHandle* hd = dynamic_cast<GatewayStreamHandle*>(handle);
202  if (!hd) {
204  "Finalize streamed upload - incompatible upload handle");
205  BumpErrors();
206  Respond(handle->commit_callback,
208  return;
209  }
210 
211  // hd->remote_path is ignored when empty
212  if (!session_context_->CommitBucket(ObjectPack::kCas, content_hash,
213  hd->bucket, hd->remote_path)) {
215  "Finalize streamed upload - could not commit bucket");
216  BumpErrors();
217  Respond(handle->commit_callback,
219  return;
220  }
221  if (!content_hash.HasSuffix()
222  || content_hash.suffix == shash::kSuffixPartial) {
225  } else if (content_hash.suffix == shash::kSuffixCatalog) {
228  }
229  Respond(handle->commit_callback,
231 }
232 
233 bool GatewayUploader::ReadSessionTokenFile(const std::string& token_file_name,
234  std::string* token) {
235  if (!token) {
236  return false;
237  }
238 
239  FILE* token_file = std::fopen(token_file_name.c_str(), "r");
240  if (!token_file) {
242  "HTTP Uploader - Could not open session token "
243  "file. Aborting.");
244  return false;
245  }
246 
247  bool ret = GetLineFile(token_file, token);
248  fclose(token_file);
249 
250  return ret;
251 }
252 
253 bool GatewayUploader::ReadKey(const std::string& key_file, std::string* key_id,
254  std::string* secret) {
255  return gateway::ReadKeys(key_file, key_id, secret);
256 }
257 
258 int64_t GatewayUploader::DoGetObjectSize(const std::string &file_name) {
259  return -EOPNOTSUPP;
260 }
261 
262 void GatewayUploader::BumpErrors() const { atomic_inc32(&num_errors_); }
263 
264 } // namespace upload
static bool WillHandle(const SpoolerDefinition &spooler_definition)
#define LogCvmfs(source, mask,...)
Definition: logging.h:20
const unsigned kMaxContextSize
Definition: hash.h:76
virtual int64_t DoGetObjectSize(const std::string &file_name)
DriverType driver_type
the type of the spooler driver
#define PANIC(...)
Definition: exception.h:26
void Respond(const CallbackTN *callback, const UploaderResults &result) const
void Init(ContextPtr context)
Definition: hash.cc:164
virtual std::string name() const
virtual bool Peek(const std::string &path)
bool HasSuffix() const
Definition: hash.h:238
virtual bool PlaceBootstrappingShortcut(const shash::Any &object)
virtual bool Close()=0
assert((mem||(size==0))&&"Out Of Memory")
const SpoolerDefinition & spooler_definition() const
void CountUploadedChunks() const
ObjectPack::BucketHandle NewBucket()
const CallbackTN * commit_callback
bool GetLineFile(FILE *f, std::string *line)
Definition: string.cc:379
void Update(const unsigned char *buffer, const unsigned buffer_length, ContextPtr context)
Definition: hash.cc:190
virtual bool Open()=0
void Final(ContextPtr context, Any *any_digest)
Definition: hash.cc:221
const char kSuffixPartial
Definition: hash.h:56
virtual void StreamedUpload(UploadStreamHandle *handle, UploadBuffer buffer, const CallbackTN *callback)
const char kSuffixCatalog
Definition: hash.h:53
bool CommitBucket(const ObjectPack::BucketContentType type, const shash::Any &id, const ObjectPack::BucketHandle handle, const std::string &name="", const bool force_dispatch=false)
virtual bool FinalizeSession(bool commit, const std::string &old_root_hash, const std::string &new_root_hash, const RepositoryTag &tag)
SessionContext * session_context_
virtual unsigned int GetNumberOfErrors() const
bool ReadKeys(const std::string &key_file_name, std::string *key_id, std::string *secret)
Definition: gateway_util.cc:37
void CountUploadedBytes(int64_t bytes_written) const
virtual void FinalizeStreamedUpload(UploadStreamHandle *handle, const shash::Any &content_hash)
GatewayStreamHandle(const CallbackTN *commit_callback, ObjectPack::BucketHandle bkt)
bool Finalize(bool commit, const std::string &old_root_hash, const std::string &new_root_hash, const RepositoryTag &tag)
static bool ParseSpoolerDefinition(const SpoolerDefinition &spooler_definition, Config *config)
virtual bool ReadSessionTokenFile(const std::string &token_file_name, std::string *token)
uint64_t size
Definition: pack.h:101
virtual void WaitForUpload() const
virtual std::string GetPath() const =0
AbstractUploader::CallbackTN CallbackTN
void CountUploadedCatalogBytes(int64_t bytes_written) const
virtual void DoRemoveAsync(const std::string &file_to_delete)
virtual void DoUpload(const std::string &remote_path, IngestionSource *source, const CallbackTN *callback)
virtual bool Mkdir(const std::string &path)
ObjectPack::BucketHandle bucket
void CountUploadedCatalogs() const
virtual ssize_t Read(void *buffer, size_t nbyte)=0
virtual bool ReadKey(const std::string &key_file, std::string *key_id, std::string *secret)
static void AddToBucket(const void *buf, const uint64_t size, const BucketHandle handle)
Definition: pack.cc:97
Suffix suffix
Definition: hash.h:125
bool Initialize(const std::string &api_url, const std::string &session_token, const std::string &key_id, const std::string &secret, uint64_t max_pack_size=ObjectPack::kDefaultLimit, uint64_t max_queue_size=10)
GatewayUploader(const SpoolerDefinition &spooler_definition)
virtual UploadStreamHandle * InitStreamedUpload(const CallbackTN *callback)
virtual bool Initialize()