CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
upload_gateway.cc
Go to the documentation of this file.
1 
5 #include "upload_gateway.h"
6 
7 #include <cassert>
8 #include <limits>
9 #include <vector>
10 
11 #include "gateway_util.h"
12 #include "util/exception.h"
13 #include "util/logging.h"
14 #include "util/string.h"
15 
16 namespace upload {
17 
20  : UploadStreamHandle(commit_callback), bucket(bkt) { }
21 
22 bool GatewayUploader::WillHandle(const SpoolerDefinition &spooler_definition) {
23  return spooler_definition.driver_type == SpoolerDefinition::Gateway;
24 }
25 
27  const SpoolerDefinition &spooler_definition,
28  GatewayUploader::Config *config) {
29  const std::string &config_string = spooler_definition.spooler_configuration;
30  if (!config) {
31  LogCvmfs(kLogUploadGateway, kLogStderr, "\"config\" argument is NULL");
32  return false;
33  }
34 
35  if (spooler_definition.session_token_file.empty()) {
37  "Failed to configure gateway uploader. "
38  "Missing session token file.\n");
39  return false;
40  }
41  config->session_token_file = spooler_definition.session_token_file;
42 
43  if (spooler_definition.key_file.empty()) {
45  "Failed to configure gateway uploader. "
46  "Missing API key file.\n");
47  return false;
48  }
49  config->key_file = spooler_definition.key_file;
50 
51  // Repo address, e.g. http://my.repo.address
52  config->api_url = config_string;
53 
54  return true;
55 }
56 
58  : AbstractUploader(spooler_definition)
59  , config_()
60  , session_context_(new SessionContext()) {
61  assert(spooler_definition.IsValid()
62  && spooler_definition.driver_type == SpoolerDefinition::Gateway);
63 
64  if (!ParseSpoolerDefinition(spooler_definition, &config_)) {
65  PANIC(kLogStderr, "Error in parsing the spooler definition");
66  }
67 
68  atomic_init32(&num_errors_);
69 }
70 
72  if (session_context_) {
73  delete session_context_;
74  }
75 }
76 
79  "cannot create repository storage area when using the gateway");
80  return false;
81 }
82 
85  return false;
86  }
87  std::string session_token;
89 
90  std::string key_id;
91  std::string secret;
92  if (!ReadKey(config_.key_file, &key_id, &secret)) {
93  return false;
94  }
95 
96  return session_context_->Initialize(config_.api_url, session_token, key_id,
97  secret);
98 }
99 
101  const std::string &old_root_hash,
102  const std::string &new_root_hash,
103  const RepositoryTag &tag) {
104  return session_context_->Finalize(commit, old_root_hash, new_root_hash, tag);
105 }
106 
109 }
110 
111 std::string GatewayUploader::name() const { return "HTTP"; }
112 
113 void GatewayUploader::DoRemoveAsync(const std::string & /*file_to_delete*/) {
114  atomic_inc32(&num_errors_);
115  Respond(NULL, UploaderResults());
116 }
117 
118 bool GatewayUploader::Peek(const std::string & /*path*/) { return false; }
119 
120 // TODO(jpriessn): implement Mkdir on gateway server-side
121 bool GatewayUploader::Mkdir(const std::string &path) { return true; }
122 
124  const shash::Any & /*object*/) {
125  return false;
126 }
127 
129  return atomic_read32(&num_errors_);
130 }
131 
132 void GatewayUploader::DoUpload(const std::string &remote_path,
134  const CallbackTN *callback) {
137 
138  bool rvb = source->Open();
139  if (!rvb) {
141  "File upload - could not open local file.");
142  BumpErrors();
143  Respond(callback, UploaderResults(1, source->GetPath()));
144  return;
145  }
146 
147  unsigned char hash_ctx[shash::kMaxContextSize];
148  shash::ContextPtr hash_ctx_ptr(spooler_definition().hash_algorithm, hash_ctx);
149  shash::Init(hash_ctx_ptr);
150  std::vector<char> buf(1024);
151  ssize_t read_bytes = 0;
152  do {
153  read_bytes = source->Read(&buf[0], buf.size());
154  assert(read_bytes >= 0);
155  ObjectPack::AddToBucket(&buf[0], read_bytes, handle->bucket);
156  shash::Update(reinterpret_cast<unsigned char *>(&buf[0]), read_bytes,
157  hash_ctx_ptr);
158  } while (static_cast<size_t>(read_bytes) == buf.size());
159  source->Close();
160  shash::Any content_hash(spooler_definition().hash_algorithm);
161  shash::Final(hash_ctx_ptr, &content_hash);
162 
164  handle->bucket, remote_path)) {
166  "File upload - could not commit bucket");
167  BumpErrors();
168  Respond(handle->commit_callback, UploaderResults(2, source->GetPath()));
169  return;
170  }
171 
172  Respond(callback, UploaderResults(0, source->GetPath()));
173 }
174 
176  const CallbackTN *callback) {
177  return new GatewayStreamHandle(callback, session_context_->NewBucket());
178 }
179 
181  UploadBuffer buffer,
182  const CallbackTN *callback) {
183  GatewayStreamHandle *hd = dynamic_cast<GatewayStreamHandle *>(handle);
184  if (!hd) {
186  "Streamed upload - incompatible upload handle");
187  BumpErrors();
189  return;
190  }
191 
192  ObjectPack::AddToBucket(buffer.data, buffer.size, hd->bucket);
193 
195 }
196 
198  const shash::Any &content_hash) {
199  GatewayStreamHandle *hd = dynamic_cast<GatewayStreamHandle *>(handle);
200  if (!hd) {
202  "Finalize streamed upload - incompatible upload handle");
203  BumpErrors();
204  Respond(handle->commit_callback,
206  return;
207  }
208 
209  // hd->remote_path is ignored when empty
210  if (!session_context_->CommitBucket(ObjectPack::kCas, content_hash,
211  hd->bucket, hd->remote_path)) {
213  "Finalize streamed upload - could not commit bucket");
214  BumpErrors();
215  Respond(handle->commit_callback,
217  return;
218  }
219  if (!content_hash.HasSuffix()
220  || content_hash.suffix == shash::kSuffixPartial) {
223  } else if (content_hash.suffix == shash::kSuffixCatalog) {
226  }
227  Respond(handle->commit_callback,
229 }
230 
231 void GatewayUploader::ReadSessionTokenFile(const std::string &token_file_name,
232  std::string *token) {
233  assert(token);
234  *token = "INVALIDTOKEN"; // overwritten if reading from file works
235 
236  FILE *token_file = std::fopen(token_file_name.c_str(), "r");
237  if (!token_file) {
239  "HTTP Uploader - Could not open session token file.");
240  return;
241  }
242 
243  GetLineFile(token_file, token);
244  fclose(token_file);
245 }
246 
247 bool GatewayUploader::ReadKey(const std::string &key_file, std::string *key_id,
248  std::string *secret) {
249  return gateway::ReadKeys(key_file, key_id, secret);
250 }
251 
252 int64_t GatewayUploader::DoGetObjectSize(const std::string &file_name) {
253  return -EOPNOTSUPP;
254 }
255 
256 void GatewayUploader::BumpErrors() const { atomic_inc32(&num_errors_); }
257 
258 } // namespace upload
static bool WillHandle(const SpoolerDefinition &spooler_definition)
const unsigned kMaxContextSize
Definition: hash.h:76
virtual int64_t DoGetObjectSize(const std::string &file_name)
DriverType driver_type
the type of the spooler driver
#define PANIC(...)
Definition: exception.h:29
CVMFS_EXPORT const LogSource source
Definition: exception.h:33
void Respond(const CallbackTN *callback, const UploaderResults &result) const
virtual std::string name() const
virtual bool Peek(const std::string &path)
bool HasSuffix() const
Definition: hash.h:231
virtual bool PlaceBootstrappingShortcut(const shash::Any &object)
virtual bool Close()=0
assert((mem||(size==0))&&"Out Of Memory")
const SpoolerDefinition & spooler_definition() const
void CountUploadedChunks() const
ObjectPack::BucketHandle NewBucket()
void Init(ContextPtr context)
Definition: hash.cc:166
const CallbackTN * commit_callback
bool GetLineFile(FILE *f, std::string *line)
Definition: string.cc:422
virtual bool Open()=0
const char kSuffixPartial
Definition: hash.h:57
virtual void StreamedUpload(UploadStreamHandle *handle, UploadBuffer buffer, const CallbackTN *callback)
const char kSuffixCatalog
Definition: hash.h:54
bool CommitBucket(const ObjectPack::BucketContentType type, const shash::Any &id, const ObjectPack::BucketHandle handle, const std::string &name="", const bool force_dispatch=false)
virtual bool FinalizeSession(bool commit, const std::string &old_root_hash, const std::string &new_root_hash, const RepositoryTag &tag)
SessionContext * session_context_
virtual unsigned int GetNumberOfErrors() const
bool ReadKeys(const std::string &key_file_name, std::string *key_id, std::string *secret)
Definition: gateway_util.cc:37
void CountUploadedBytes(int64_t bytes_written) const
void Final(ContextPtr context, Any *any_digest)
Definition: hash.cc:223
virtual void FinalizeStreamedUpload(UploadStreamHandle *handle, const shash::Any &content_hash)
GatewayStreamHandle(const CallbackTN *commit_callback, ObjectPack::BucketHandle bkt)
bool Finalize(bool commit, const std::string &old_root_hash, const std::string &new_root_hash, const RepositoryTag &tag)
static bool ParseSpoolerDefinition(const SpoolerDefinition &spooler_definition, Config *config)
uint64_t size
Definition: pack.h:105
virtual void WaitForUpload() const
virtual std::string GetPath() const =0
void Update(const unsigned char *buffer, const unsigned buffer_length, ContextPtr context)
Definition: hash.cc:192
AbstractUploader::CallbackTN CallbackTN
void CountUploadedCatalogBytes(int64_t bytes_written) const
virtual void DoRemoveAsync(const std::string &file_to_delete)
virtual void DoUpload(const std::string &remote_path, IngestionSource *source, const CallbackTN *callback)
virtual bool Mkdir(const std::string &path)
ObjectPack::BucketHandle bucket
void CountUploadedCatalogs() const
virtual ssize_t Read(void *buffer, size_t nbyte)=0
virtual bool ReadKey(const std::string &key_file, std::string *key_id, std::string *secret)
virtual void ReadSessionTokenFile(const std::string &token_file_name, std::string *token)
static void AddToBucket(const void *buf, const uint64_t size, const BucketHandle handle)
Definition: pack.cc:100
Suffix suffix
Definition: hash.h:123
bool Initialize(const std::string &api_url, const std::string &session_token, const std::string &key_id, const std::string &secret, uint64_t max_pack_size=ObjectPack::kDefaultLimit, uint64_t max_queue_size=10)
GatewayUploader(const SpoolerDefinition &spooler_definition)
virtual UploadStreamHandle * InitStreamedUpload(const CallbackTN *callback)
virtual bool Initialize()
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545