CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
session_context.cc
Go to the documentation of this file.
1 
5 #include "session_context.h"
6 
7 #include <algorithm>
8 #include <limits>
9 
10 #include "curl/curl.h"
11 #include "cvmfs_config.h"
12 #include "gateway_util.h"
13 #include "json_document.h"
14 #include "json_document_write.h"
15 #include "swissknife_lease_curl.h"
16 #include "util/exception.h"
17 #include "util/pointer.h"
18 #include "util/string.h"
19 
20 namespace {
21 // Maximum number of jobs during a session. No limit, for practical
22 // purposes. Note that we use uint32_t so that the FifoChannel code works
23 // correctly with this limit on 32bit systems.
24 const uint32_t kMaxNumJobs = std::numeric_limits<uint32_t>::max();
25 }
26 
27 namespace upload {
28 
29 size_t SendCB(void* ptr, size_t size, size_t nmemb, void* userp) {
30  CurlSendPayload* payload = static_cast<CurlSendPayload*>(userp);
31 
32  size_t max_chunk_size = size * nmemb;
33  if (max_chunk_size < 1) {
34  return 0;
35  }
36 
37  size_t current_chunk_size = 0;
38  while (current_chunk_size < max_chunk_size) {
39  if (payload->index < payload->json_message->size()) {
40  // Can add a chunk from the JSON message
41  const size_t read_size =
42  std::min(max_chunk_size - current_chunk_size,
43  payload->json_message->size() - payload->index);
44  current_chunk_size += read_size;
45  std::memcpy(ptr, payload->json_message->data() + payload->index,
46  read_size);
47  payload->index += read_size;
48  } else {
49  // Can add a chunk from the payload
50  const size_t max_read_size = max_chunk_size - current_chunk_size;
51  const unsigned nbytes = payload->pack_serializer->ProduceNext(
52  max_read_size, static_cast<unsigned char*>(ptr) + current_chunk_size);
53  current_chunk_size += nbytes;
54 
55  if (!nbytes) {
56  break;
57  }
58  }
59  }
60 
61  return current_chunk_size;
62 }
63 
64 size_t RecvCB(void* buffer, size_t size, size_t nmemb, void* userp) {
65  std::string* my_buffer = static_cast<std::string*>(userp);
66 
67  if (size * nmemb < 1) {
68  return 0;
69  }
70 
71  *my_buffer = static_cast<char*>(buffer);
72 
73  return my_buffer->size();
74 }
75 
77  : upload_results_(kMaxNumJobs, kMaxNumJobs),
78  api_url_(),
79  session_token_(),
80  key_id_(),
81  secret_(),
82  max_pack_size_(ObjectPack::kDefaultLimit),
83  active_handles_(),
84  current_pack_(NULL),
85  current_pack_mtx_(),
86  objects_dispatched_(0),
87  bytes_committed_(0),
88  bytes_dispatched_(0),
89  initialized_(false) {}
90 
92 
93 bool SessionContextBase::Initialize(const std::string& api_url,
94  const std::string& session_token,
95  const std::string& key_id,
96  const std::string& secret,
97  uint64_t max_pack_size,
98  uint64_t max_queue_size) {
99  bool ret = true;
100 
101  // Initialize session context lock
102  pthread_mutexattr_t attr;
103  if (pthread_mutexattr_init(&attr) ||
104  pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) ||
105  pthread_mutex_init(&current_pack_mtx_, &attr) ||
106  pthread_mutexattr_destroy(&attr)) {
108  "Could not initialize SessionContext lock.");
109  return false;
110  }
111 
112  // Set upstream URL and session token
113  api_url_ = api_url;
114  session_token_ = session_token;
115  key_id_ = key_id;
116  secret_ = secret;
117  max_pack_size_ = max_pack_size;
118 
119  atomic_init64(&objects_dispatched_);
120  bytes_committed_ = 0u;
121  bytes_dispatched_ = 0u;
122 
123  // Ensure that the upload job and result queues are empty
125 
126  // Ensure that there are not open object packs
127  if (current_pack_) {
128  LogCvmfs(
130  "Could not initialize SessionContext - Existing open object packs.");
131  ret = false;
132  }
133 
134  ret = InitializeDerived(max_queue_size) && ret;
135 
136  initialized_ = true;
137 
138  return ret;
139 }
140 
141 bool SessionContextBase::Finalize(bool commit, const std::string& old_root_hash,
142  const std::string& new_root_hash,
143  const RepositoryTag& tag) {
144  assert(active_handles_.empty());
145  if (!initialized_) {
146  assert(!commit);
147  return true;
148  }
149 
150  {
152 
153  if (current_pack_ && current_pack_->GetNoObjects() > 0) {
154  Dispatch();
155  current_pack_ = NULL;
156  }
157  }
158 
159  bool results = true;
160  int64_t jobs_finished = 0;
161  while (!upload_results_.IsEmpty() || (jobs_finished < NumJobsSubmitted())) {
163  results = future->Get() && results;
164  delete future;
165  jobs_finished++;
166  }
167 
168  if (commit) {
169  if (old_root_hash.empty() || new_root_hash.empty()) {
170  return false;
171  }
172  bool commit_result = Commit(old_root_hash, new_root_hash, tag);
173  if (!commit_result) {
175  "SessionContext: could not commit session. Aborting.");
176  FinalizeDerived();
177  pthread_mutex_destroy(&current_pack_mtx_);
178  initialized_ = false;
179  return false;
180  }
181  }
182 
184 
185  pthread_mutex_destroy(&current_pack_mtx_);
186 
187  initialized_ = false;
188 
189  return results;
190 }
191 
194  if (!current_pack_) {
196  }
198  active_handles_.push_back(hd);
199  return hd;
200 }
201 
203  const shash::Any& id,
204  const ObjectPack::BucketHandle handle,
205  const std::string& name,
206  const bool force_dispatch) {
208 
209  if (!current_pack_) {
211  "Error: Called SessionBaseContext::CommitBucket without an open "
212  "ObjectPack.");
213  return false;
214  }
215 
216  uint64_t size0 = current_pack_->size();
217  bool committed = current_pack_->CommitBucket(type, id, handle, name);
218 
219  if (committed) { // Current pack is still not full
220  active_handles_.erase(
221  std::remove(active_handles_.begin(), active_handles_.end(), handle),
222  active_handles_.end());
223  uint64_t size1 = current_pack_->size();
224  bytes_committed_ += size1 - size0;
225  if (force_dispatch) {
226  Dispatch();
227  current_pack_ = NULL;
228  }
229  } else { // Current pack is full and can be dispatched
230  uint64_t new_size = 0;
231  if (handle->capacity > max_pack_size_) {
232  new_size = handle->capacity + 1;
233  } else {
234  new_size = max_pack_size_;
235  }
236  ObjectPack* new_pack = new ObjectPack(new_size);
237  for (size_t i = 0u; i < active_handles_.size(); ++i) {
239  }
240 
241  if (current_pack_->GetNoObjects() > 0) {
242  Dispatch();
243  }
244  current_pack_ = new_pack;
245 
246  CommitBucket(type, id, handle, name, false);
247  }
248 
249  return true;
250 }
251 
253  return atomic_read64(&objects_dispatched_);
254 }
255 
258 
259  if (!current_pack_) {
260  return;
261  }
262 
263  atomic_inc64(&objects_dispatched_);
266 }
267 
269  : SessionContextBase(),
270  upload_jobs_(),
271  worker_()
272 {
273 }
274 
275 bool SessionContext::InitializeDerived(uint64_t max_queue_size) {
276  // Start worker thread
277  upload_jobs_ = new FifoChannel<UploadJob*>(max_queue_size, max_queue_size);
278  upload_jobs_->Drop();
279 
280  int retval =
281  pthread_create(&worker_, NULL, UploadLoop, reinterpret_cast<void*>(this));
282 
283  return !retval;
284 }
285 
287  // Note: in FinalizedDerived, we know that the worker is running. The
288  // SessionContext is called only from GatewayUploader::FinalizeSession(),
289  // which in turn is from Spooler::FinalizeSession(). The Spooler ensures
290  // that GatewayUploader::Initialize() is called on construction.
291  //
292  // TODO(jblomer): Refactor SessionContext (and Uploader*) classes to
293  // use a factory method for construction.
294  //
295  upload_jobs_->Enqueue(&terminator_);
296  pthread_join(worker_, NULL);
297 
298  return true;
299 }
300 
301 bool SessionContext::Commit(const std::string& old_root_hash,
302  const std::string& new_root_hash,
303  const RepositoryTag& tag) {
304  JsonStringGenerator request_input;
305  request_input.Add("old_root_hash", old_root_hash);
306  request_input.Add("new_root_hash", new_root_hash);
307  request_input.Add("tag_name", tag.name());
308  // Channels are no longer supported: send 0 (i.e. kChannelTrunk) for
309  // backwards compatibility with existing gateways
310  //
311  request_input.Add("tag_channel", 0);
312  request_input.Add("tag_description", tag.description());
313  std::string request = request_input.GenerateString();
314  CurlBuffer buffer;
316  request, &buffer);
317 }
318 
320  UploadJob* job = new UploadJob;
321  job->pack = pack;
322  job->result = new Future<bool>();
323  upload_jobs_->Enqueue(job);
324  return job->result;
325 }
326 
328  // Set up the object pack serializer
329  ObjectPackProducer serializer(job->pack);
330 
331  shash::Any payload_digest(shash::kSha1);
332  serializer.GetDigest(&payload_digest);
333  const std::string json_msg =
334  "{\"session_token\" : \"" + session_token_ +
335  "\", \"payload_digest\" : \"" + payload_digest.ToString(false) +
336  "\", \"header_size\" : \"" + StringifyInt(serializer.GetHeaderSize()) +
337  "\", \"api_version\" : \"" + StringifyInt(gateway::APIVersion()) + "\"}";
338 
339  // Compute HMAC
340  shash::Any hmac(shash::kSha1);
341  shash::HmacString(secret_, json_msg, &hmac);
342 
343  CurlSendPayload payload;
344  payload.json_message = &json_msg;
345  payload.pack_serializer = &serializer;
346  payload.index = 0;
347 
348  const size_t payload_size =
349  json_msg.size() + serializer.GetHeaderSize() + job->pack->size();
350 
351  // Prepare the Curl POST request
352  CURL* h_curl = curl_easy_init();
353 
354  if (!h_curl) {
355  return false;
356  }
357 
358  // Set HTTP headers (Authorization and Message-Size)
359  std::string header_str = std::string("Authorization: ") + key_id_ + " " +
360  Base64(hmac.ToString(false));
361  struct curl_slist* auth_header = NULL;
362  auth_header = curl_slist_append(auth_header, header_str.c_str());
363  header_str = std::string("Message-Size: ") + StringifyInt(json_msg.size());
364  auth_header = curl_slist_append(auth_header, header_str.c_str());
365  curl_easy_setopt(h_curl, CURLOPT_HTTPHEADER, auth_header);
366 
367  std::string reply;
368  curl_easy_setopt(h_curl, CURLOPT_NOPROGRESS, 1L);
369  curl_easy_setopt(h_curl, CURLOPT_USERAGENT, "cvmfs/" VERSION);
370  curl_easy_setopt(h_curl, CURLOPT_MAXREDIRS, 50L);
371  curl_easy_setopt(h_curl, CURLOPT_CUSTOMREQUEST, "POST");
372  curl_easy_setopt(h_curl, CURLOPT_URL, (api_url_ + "/payloads").c_str());
373  curl_easy_setopt(h_curl, CURLOPT_POSTFIELDS, NULL);
374  curl_easy_setopt(h_curl, CURLOPT_POSTFIELDSIZE_LARGE,
375  static_cast<curl_off_t>(payload_size));
376  curl_easy_setopt(h_curl, CURLOPT_READDATA, &payload);
377  curl_easy_setopt(h_curl, CURLOPT_READFUNCTION, SendCB);
378  curl_easy_setopt(h_curl, CURLOPT_WRITEFUNCTION, RecvCB);
379  curl_easy_setopt(h_curl, CURLOPT_WRITEDATA, &reply);
380 
381  // Perform the Curl POST request
382  CURLcode ret = curl_easy_perform(h_curl);
383  if (ret) {
385  "SessionContext::DoUpload - curl_easy_perform failed: %d",
386  ret);
387  }
388 
390  const JSON *reply_status =
391  JsonDocument::SearchInObject(reply_json->root(), "status", JSON_STRING);
392  const bool ok = (reply_status != NULL &&
393  std::string(reply_status->string_value) == "ok");
394  if (!ok) {
396  "SessionContext::DoUpload - error reply: %s",
397  reply.c_str());
398  }
399 
400  curl_easy_cleanup(h_curl);
401  h_curl = NULL;
402 
403  return ok && !ret;
404 }
405 
406 void* SessionContext::UploadLoop(void* data) {
407  SessionContext* ctx = reinterpret_cast<SessionContext*>(data);
408  UploadJob *job;
409 
410  while (true) {
411  job = ctx->upload_jobs_->Dequeue();
412  if (job == &terminator_)
413  return NULL;
414  if (!ctx->DoUpload(job)) {
416  "SessionContext: could not submit payload. Aborting.");
417  }
418  job->result->Set(true);
419  delete job->pack;
420  delete job;
421  }
422 }
423 
425 
426 } // namespace upload
#define LogCvmfs(source, mask,...)
Definition: logging.h:25
std::string name() const
bool IsEmpty() const
void Enqueue(const T &data)
static void * UploadLoop(void *data)
void Add(const std::string &key, const std::string &val)
struct cvmcache_context * ctx
int APIVersion()
Definition: gateway_util.cc:26
std::string description() const
BucketContentType
Definition: pack.h:53
bool MakeEndRequest(const std::string &method, const std::string &key_id, const std::string &secret, const std::string &session_token, const std::string &repo_service_url, const std::string &request_payload, CurlBuffer *reply)
static JSON * SearchInObject(const JSON *json_object, const std::string &name, const json_type type)
virtual Future< bool > * DispatchObjectPack(ObjectPack *pack)=0
#define PANIC(...)
Definition: exception.h:29
const T Dequeue()
void TransferBucket(const BucketHandle handle, ObjectPack *other)
Definition: pack.cc:149
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:249
pthread_mutex_t current_pack_mtx_
virtual bool InitializeDerived(uint64_t max_queue_size)
assert((mem||(size==0))&&"Out Of Memory")
uint64_t size() const
Definition: pack.h:82
void Set(const T &object)
virtual bool Commit(const std::string &old_root_hash, const std::string &new_root_hash, const RepositoryTag &tag)=0
ObjectPack::BucketHandle NewBucket()
virtual Future< bool > * DispatchObjectPack(ObjectPack *pack)
virtual bool Commit(const std::string &old_root_hash, const std::string &new_root_hash, const RepositoryTag &tag)
BucketHandle NewBucket()
Definition: pack.cc:102
static JsonDocument * Create(const std::string &text)
uint64_t capacity
Definition: pack.h:102
bool CommitBucket(const ObjectPack::BucketContentType type, const shash::Any &id, const ObjectPack::BucketHandle handle, const std::string &name="", const bool force_dispatch=false)
virtual bool DoUpload(const UploadJob *job)
UniquePtr< FifoChannel< UploadJob * > > upload_jobs_
const std::string * json_message
ObjectPackProducer * pack_serializer
virtual bool FinalizeDerived()=0
void GetDigest(shash::Any *hash)
Definition: pack.cc:176
string StringifyInt(const int64_t value)
Definition: string.cc:78
FifoChannel< Future< bool > * > upload_results_
std::vector< ObjectPack::BucketHandle > active_handles_
bool Finalize(bool commit, const std::string &old_root_hash, const std::string &new_root_hash, const RepositoryTag &tag)
unsigned int Drop()
string Base64(const string &data)
Definition: string.cc:504
void HmacString(const std::string &key, const std::string &content, Any *any_digest)
Definition: hash.h:527
virtual bool InitializeDerived(uint64_t max_queue_size)=0
static UploadJob terminator_
Definition: mutex.h:42
size_t SendCB(void *ptr, size_t size, size_t nmemb, void *userp)
unsigned GetHeaderSize()
Definition: pack.h:189
bool CommitBucket(const BucketContentType type, const shash::Any &id, const BucketHandle handle, const std::string &name="")
Definition: pack.cc:113
size_t GetNoObjects() const
Definition: pack.h:86
bool Initialize(const std::string &api_url, const std::string &session_token, const std::string &key_id, const std::string &secret, uint64_t max_pack_size=ObjectPack::kDefaultLimit, uint64_t max_queue_size=10)
std::string GenerateString() const
virtual bool FinalizeDerived()
int64_t NumJobsSubmitted() const
struct json_value JSON
Definition: helper_allow.cc:11
static void size_t size
Definition: smalloc.h:54
size_t RecvCB(void *buffer, size_t size, size_t nmemb, void *userp)
unsigned ProduceNext(const unsigned buf_size, unsigned char *buf)
Definition: pack.cc:216
const JSON * root() const
Definition: json_document.h:25