CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
upload_gateway.cc
Go to the documentation of this file.
1 
5 #include "upload_gateway.h"
6 
7 #include <cassert>
8 #include <limits>
9 #include <vector>
10 
11 #include "gateway_util.h"
12 #include "util/exception.h"
13 #include "util/logging.h"
14 #include "util/string.h"
15 
16 namespace upload {
17 
20  : UploadStreamHandle(commit_callback), bucket(bkt) {}
21 
22 bool GatewayUploader::WillHandle(const SpoolerDefinition& spooler_definition) {
23  return spooler_definition.driver_type == SpoolerDefinition::Gateway;
24 }
25 
27  const SpoolerDefinition& spooler_definition,
28  GatewayUploader::Config* config) {
29  const std::string& config_string = spooler_definition.spooler_configuration;
30  if (!config) {
31  LogCvmfs(kLogUploadGateway, kLogStderr, "\"config\" argument is NULL");
32  return false;
33  }
34 
35  if (spooler_definition.session_token_file.empty()) {
37  "Failed to configure gateway uploader. "
38  "Missing session token file.\n");
39  return false;
40  }
41  config->session_token_file = spooler_definition.session_token_file;
42 
43  if (spooler_definition.key_file.empty()) {
45  "Failed to configure gateway uploader. "
46  "Missing API key file.\n");
47  return false;
48  }
49  config->key_file = spooler_definition.key_file;
50 
51  // Repo address, e.g. http://my.repo.address
52  config->api_url = config_string;
53 
54  return true;
55 }
56 
58  : AbstractUploader(spooler_definition),
59  config_(),
60  session_context_(new SessionContext()) {
61  assert(spooler_definition.IsValid() &&
62  spooler_definition.driver_type == SpoolerDefinition::Gateway);
63 
64  if (!ParseSpoolerDefinition(spooler_definition, &config_)) {
65  PANIC(kLogStderr, "Error in parsing the spooler definition");
66  }
67 
68  atomic_init32(&num_errors_);
69 }
70 
72  if (session_context_) {
73  delete session_context_;
74  }
75 }
76 
79  "cannot create repository storage area when using the gateway");
80  return false;
81 }
82 
85  return false;
86  }
87  std::string session_token;
89 
90  std::string key_id;
91  std::string secret;
92  if (!ReadKey(config_.key_file, &key_id, &secret)) {
93  return false;
94  }
95 
96  return session_context_->Initialize(config_.api_url, session_token, key_id,
97  secret);
98 }
99 
101  const std::string& old_root_hash,
102  const std::string& new_root_hash,
103  const RepositoryTag& tag) {
104  return session_context_->Finalize(commit, old_root_hash, new_root_hash, tag);
105 }
106 
109 }
110 
111 std::string GatewayUploader::name() const { return "HTTP"; }
112 
113 void GatewayUploader::DoRemoveAsync(const std::string& /*file_to_delete*/) {
114  atomic_inc32(&num_errors_);
115  Respond(NULL, UploaderResults());
116 }
117 
118 bool GatewayUploader::Peek(const std::string& /*path*/) { return false; }
119 
120 // TODO(jpriessn): implement Mkdir on gateway server-side
121 bool GatewayUploader::Mkdir(const std::string &path) {
122  return true;
123 }
124 
126  return false;
127 }
128 
130  return atomic_read32(&num_errors_);
131 }
132 
133 void GatewayUploader::DoUpload(const std::string& remote_path,
134  IngestionSource *source,
135  const CallbackTN* callback) {
138 
139  bool rvb = source->Open();
140  if (!rvb) {
142  "File upload - could not open local file.");
143  BumpErrors();
144  Respond(callback, UploaderResults(1, source->GetPath()));
145  return;
146  }
147 
148  unsigned char hash_ctx[shash::kMaxContextSize];
149  shash::ContextPtr hash_ctx_ptr(spooler_definition().hash_algorithm, hash_ctx);
150  shash::Init(hash_ctx_ptr);
151  std::vector<char> buf(1024);
152  ssize_t read_bytes = 0;
153  do {
154  read_bytes = source->Read(&buf[0], buf.size());
155  assert(read_bytes >= 0);
156  ObjectPack::AddToBucket(&buf[0], read_bytes, handle->bucket);
157  shash::Update(reinterpret_cast<unsigned char *>(&buf[0]), read_bytes,
158  hash_ctx_ptr);
159  } while (static_cast<size_t>(read_bytes) == buf.size());
160  source->Close();
161  shash::Any content_hash(spooler_definition().hash_algorithm);
162  shash::Final(hash_ctx_ptr, &content_hash);
163 
165  handle->bucket, remote_path)) {
167  "File upload - could not commit bucket");
168  BumpErrors();
169  Respond(handle->commit_callback, UploaderResults(2, source->GetPath()));
170  return;
171  }
172 
173  Respond(callback, UploaderResults(0, source->GetPath()));
174 }
175 
177  const CallbackTN* callback) {
178  return new GatewayStreamHandle(callback, session_context_->NewBucket());
179 }
180 
182  UploadBuffer buffer,
183  const CallbackTN* callback) {
184  GatewayStreamHandle* hd = dynamic_cast<GatewayStreamHandle*>(handle);
185  if (!hd) {
187  "Streamed upload - incompatible upload handle");
188  BumpErrors();
190  return;
191  }
192 
193  ObjectPack::AddToBucket(buffer.data, buffer.size, hd->bucket);
194 
196 }
197 
199  const shash::Any& content_hash) {
200  GatewayStreamHandle* hd = dynamic_cast<GatewayStreamHandle*>(handle);
201  if (!hd) {
203  "Finalize streamed upload - incompatible upload handle");
204  BumpErrors();
205  Respond(handle->commit_callback,
207  return;
208  }
209 
210  // hd->remote_path is ignored when empty
211  if (!session_context_->CommitBucket(ObjectPack::kCas, content_hash,
212  hd->bucket, hd->remote_path)) {
214  "Finalize streamed upload - could not commit bucket");
215  BumpErrors();
216  Respond(handle->commit_callback,
218  return;
219  }
220  if (!content_hash.HasSuffix()
221  || content_hash.suffix == shash::kSuffixPartial) {
224  } else if (content_hash.suffix == shash::kSuffixCatalog) {
227  }
228  Respond(handle->commit_callback,
230 }
231 
232 void GatewayUploader::ReadSessionTokenFile(const std::string& token_file_name,
233  std::string* token) {
234  assert(token);
235  *token = "INVALIDTOKEN"; // overwritten if reading from file works
236 
237  FILE* token_file = std::fopen(token_file_name.c_str(), "r");
238  if (!token_file) {
240  "HTTP Uploader - Could not open session token file.");
241  return;
242  }
243 
244  GetLineFile(token_file, token);
245  fclose(token_file);
246 }
247 
248 bool GatewayUploader::ReadKey(const std::string& key_file, std::string* key_id,
249  std::string* secret) {
250  return gateway::ReadKeys(key_file, key_id, secret);
251 }
252 
253 int64_t GatewayUploader::DoGetObjectSize(const std::string &file_name) {
254  return -EOPNOTSUPP;
255 }
256 
257 void GatewayUploader::BumpErrors() const { atomic_inc32(&num_errors_); }
258 
259 } // namespace upload
static bool WillHandle(const SpoolerDefinition &spooler_definition)
const unsigned kMaxContextSize
Definition: hash.h:77
virtual int64_t DoGetObjectSize(const std::string &file_name)
DriverType driver_type
the type of the spooler driver
#define PANIC(...)
Definition: exception.h:29
void Respond(const CallbackTN *callback, const UploaderResults &result) const
virtual std::string name() const
virtual bool Peek(const std::string &path)
bool HasSuffix() const
Definition: hash.h:239
virtual bool PlaceBootstrappingShortcut(const shash::Any &object)
virtual bool Close()=0
assert((mem||(size==0))&&"Out Of Memory")
const SpoolerDefinition & spooler_definition() const
void CountUploadedChunks() const
ObjectPack::BucketHandle NewBucket()
void Init(ContextPtr context)
Definition: hash.cc:164
const CallbackTN * commit_callback
bool GetLineFile(FILE *f, std::string *line)
Definition: string.cc:404
virtual bool Open()=0
const char kSuffixPartial
Definition: hash.h:57
virtual void StreamedUpload(UploadStreamHandle *handle, UploadBuffer buffer, const CallbackTN *callback)
const char kSuffixCatalog
Definition: hash.h:54
bool CommitBucket(const ObjectPack::BucketContentType type, const shash::Any &id, const ObjectPack::BucketHandle handle, const std::string &name="", const bool force_dispatch=false)
virtual bool FinalizeSession(bool commit, const std::string &old_root_hash, const std::string &new_root_hash, const RepositoryTag &tag)
SessionContext * session_context_
virtual unsigned int GetNumberOfErrors() const
bool ReadKeys(const std::string &key_file_name, std::string *key_id, std::string *secret)
Definition: gateway_util.cc:37
void CountUploadedBytes(int64_t bytes_written) const
void Final(ContextPtr context, Any *any_digest)
Definition: hash.cc:221
virtual void FinalizeStreamedUpload(UploadStreamHandle *handle, const shash::Any &content_hash)
GatewayStreamHandle(const CallbackTN *commit_callback, ObjectPack::BucketHandle bkt)
bool Finalize(bool commit, const std::string &old_root_hash, const std::string &new_root_hash, const RepositoryTag &tag)
static bool ParseSpoolerDefinition(const SpoolerDefinition &spooler_definition, Config *config)
uint64_t size
Definition: pack.h:101
virtual void WaitForUpload() const
virtual std::string GetPath() const =0
void Update(const unsigned char *buffer, const unsigned buffer_length, ContextPtr context)
Definition: hash.cc:190
AbstractUploader::CallbackTN CallbackTN
void CountUploadedCatalogBytes(int64_t bytes_written) const
virtual void DoRemoveAsync(const std::string &file_to_delete)
virtual void DoUpload(const std::string &remote_path, IngestionSource *source, const CallbackTN *callback)
virtual bool Mkdir(const std::string &path)
ObjectPack::BucketHandle bucket
void CountUploadedCatalogs() const
virtual ssize_t Read(void *buffer, size_t nbyte)=0
virtual bool ReadKey(const std::string &key_file, std::string *key_id, std::string *secret)
virtual void ReadSessionTokenFile(const std::string &token_file_name, std::string *token)
static void AddToBucket(const void *buf, const uint64_t size, const BucketHandle handle)
Definition: pack.cc:97
Suffix suffix
Definition: hash.h:126
bool Initialize(const std::string &api_url, const std::string &session_token, const std::string &key_id, const std::string &secret, uint64_t max_pack_size=ObjectPack::kDefaultLimit, uint64_t max_queue_size=10)
GatewayUploader(const SpoolerDefinition &spooler_definition)
virtual UploadStreamHandle * InitStreamedUpload(const CallbackTN *callback)
virtual bool Initialize()
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528