CernVM-FS  2.13.0
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
session_context.cc
Go to the documentation of this file.
1 
5 #include "session_context.h"
6 
7 #include <algorithm>
8 #include <limits>
9 
10 #include "curl/curl.h"
11 #include "gateway_util.h"
12 #include "json_document.h"
13 #include "json_document_write.h"
14 #include "swissknife_lease_curl.h"
15 #include "util/exception.h"
16 #include "util/pointer.h"
17 #include "util/string.h"
18 
19 namespace {
20 // Maximum number of jobs during a session. No limit, for practical
21 // purposes. Note that we use uint32_t so that the Tube code works
22 // correctly with this limit on 32bit systems.
23 const uint32_t kMaxNumJobs = std::numeric_limits<uint32_t>::max();
24 } // namespace
25 
26 namespace upload {
27 
28 size_t SendCB(void *ptr, size_t size, size_t nmemb, void *userp) {
29  CurlSendPayload *payload = static_cast<CurlSendPayload *>(userp);
30 
31  size_t max_chunk_size = size * nmemb;
32  if (max_chunk_size < 1) {
33  return 0;
34  }
35 
36  size_t current_chunk_size = 0;
37  while (current_chunk_size < max_chunk_size) {
38  if (payload->index < payload->json_message->size()) {
39  // Can add a chunk from the JSON message
40  const size_t read_size = std::min(
41  max_chunk_size - current_chunk_size,
42  payload->json_message->size() - payload->index);
43  current_chunk_size += read_size;
44  std::memcpy(ptr, payload->json_message->data() + payload->index,
45  read_size);
46  payload->index += read_size;
47  } else {
48  // Can add a chunk from the payload
49  const size_t max_read_size = max_chunk_size - current_chunk_size;
50  const unsigned nbytes = payload->pack_serializer->ProduceNext(
51  max_read_size,
52  static_cast<unsigned char *>(ptr) + current_chunk_size);
53  current_chunk_size += nbytes;
54 
55  if (!nbytes) {
56  break;
57  }
58  }
59  }
60 
61  return current_chunk_size;
62 }
63 
64 size_t RecvCB(void *buffer, size_t size, size_t nmemb, void *userp) {
65  std::string *my_buffer = static_cast<std::string *>(userp);
66 
67  if (size * nmemb < 1) {
68  return 0;
69  }
70 
71  *my_buffer = static_cast<char *>(buffer);
72 
73  return my_buffer->size();
74 }
75 
77  : upload_results_(kMaxNumJobs)
78  , api_url_()
79  , session_token_()
80  , key_id_()
81  , secret_()
82  , max_pack_size_(ObjectPack::kDefaultLimit)
83  , active_handles_()
84  , current_pack_(NULL)
85  , current_pack_mtx_()
86  , bytes_committed_(0)
87  , bytes_dispatched_(0)
88  , initialized_(false) { }
89 
91 
92 bool SessionContextBase::Initialize(const std::string &api_url,
93  const std::string &session_token,
94  const std::string &key_id,
95  const std::string &secret,
96  uint64_t max_pack_size,
97  uint64_t max_queue_size) {
98  bool ret = true;
99 
100  // Initialize session context lock
101  pthread_mutexattr_t attr;
102  if (pthread_mutexattr_init(&attr)
103  || pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE)
104  || pthread_mutex_init(&current_pack_mtx_, &attr)
105  || pthread_mutexattr_destroy(&attr)) {
107  "Could not initialize SessionContext lock.");
108  return false;
109  }
110 
111  // Set upstream URL and session token
112  api_url_ = api_url;
113  session_token_ = session_token;
114  key_id_ = key_id;
115  secret_ = secret;
116  max_pack_size_ = max_pack_size;
117 
118  bytes_committed_ = 0u;
119  bytes_dispatched_ = 0u;
120 
122 
123  // Ensure that there are not open object packs
124  if (current_pack_) {
125  LogCvmfs(
127  "Could not initialize SessionContext - Existing open object packs.");
128  ret = false;
129  }
130 
131  ret = InitializeDerived(max_queue_size) && ret;
132 
133  initialized_ = true;
134 
135  return ret;
136 }
137 
138 bool SessionContextBase::Finalize(bool commit, const std::string &old_root_hash,
139  const std::string &new_root_hash,
140  const RepositoryTag &tag) {
141  assert(active_handles_.empty());
142  if (!initialized_) {
143  assert(!commit);
144  return true;
145  }
146 
147  {
149 
150  if (current_pack_ && current_pack_->GetNoObjects() > 0) {
151  Dispatch();
152  current_pack_ = NULL;
153  }
154  }
155 
156  bool results = true;
157  while (!upload_results_.IsEmpty()) {
159  results = future->Get() && results;
160  delete future;
161  }
162 
163  if (commit) {
164  if (old_root_hash.empty() || new_root_hash.empty()) {
165  return false;
166  }
167  bool commit_result = Commit(old_root_hash, new_root_hash, tag);
168  if (!commit_result) {
170  "SessionContext: could not commit session. Aborting.");
171  FinalizeDerived();
172  pthread_mutex_destroy(&current_pack_mtx_);
173  initialized_ = false;
174  return false;
175  }
176  }
177 
179 
180  pthread_mutex_destroy(&current_pack_mtx_);
181 
182  initialized_ = false;
183 
184  return results;
185 }
186 
189  if (!current_pack_) {
191  }
193  active_handles_.push_back(hd);
194  return hd;
195 }
196 
198  const shash::Any &id,
199  const ObjectPack::BucketHandle handle,
200  const std::string &name,
201  const bool force_dispatch) {
203 
204  if (!current_pack_) {
206  "Error: Called SessionBaseContext::CommitBucket without an open "
207  "ObjectPack.");
208  return false;
209  }
210 
211  uint64_t size0 = current_pack_->size();
212  bool committed = current_pack_->CommitBucket(type, id, handle, name);
213 
214  if (committed) { // Current pack is still not full
215  active_handles_.erase(
216  std::remove(active_handles_.begin(), active_handles_.end(), handle),
217  active_handles_.end());
218  uint64_t size1 = current_pack_->size();
219  bytes_committed_ += size1 - size0;
220  if (force_dispatch) {
221  Dispatch();
222  current_pack_ = NULL;
223  }
224  } else { // Current pack is full and can be dispatched
225  uint64_t new_size = 0;
226  if (handle->capacity > max_pack_size_) {
227  new_size = handle->capacity + 1;
228  } else {
229  new_size = max_pack_size_;
230  }
231  ObjectPack *new_pack = new ObjectPack(new_size);
232  for (size_t i = 0u; i < active_handles_.size(); ++i) {
234  }
235 
236  if (current_pack_->GetNoObjects() > 0) {
237  Dispatch();
238  }
239  current_pack_ = new_pack;
240 
241  CommitBucket(type, id, handle, name, false);
242  }
243 
244  return true;
245 }
246 
249 
250  if (!current_pack_) {
251  return;
252  }
253 
256 }
257 
259  : SessionContextBase(), upload_jobs_(), worker_() { }
260 
261 bool SessionContext::InitializeDerived(uint64_t max_queue_size) {
262  // Start worker thread
263  upload_jobs_ = new Tube<UploadJob>(max_queue_size);
264 
265  int retval = pthread_create(&worker_, NULL, UploadLoop,
266  reinterpret_cast<void *>(this));
267 
268  return !retval;
269 }
270 
272  // Note: in FinalizedDerived, we know that the worker is running. The
273  // SessionContext is called only from GatewayUploader::FinalizeSession(),
274  // which in turn is from Spooler::FinalizeSession(). The Spooler ensures
275  // that GatewayUploader::Initialize() is called on construction.
276  //
277  // TODO(jblomer): Refactor SessionContext (and Uploader*) classes to
278  // use a factory method for construction.
279  //
280  upload_jobs_->EnqueueFront(&terminator_);
281  pthread_join(worker_, NULL);
282 
283  return true;
284 }
285 
286 bool SessionContext::Commit(const std::string &old_root_hash,
287  const std::string &new_root_hash,
288  const RepositoryTag &tag) {
289  JsonStringGenerator request_input;
290  request_input.Add("old_root_hash", old_root_hash);
291  request_input.Add("new_root_hash", new_root_hash);
292  request_input.Add("tag_name", tag.name());
293  // Channels are no longer supported: send 0 (i.e. kChannelTrunk) for
294  // backwards compatibility with existing gateways
295  //
296  request_input.Add("tag_channel", 0);
297  request_input.Add("tag_description", tag.description());
298  std::string request = request_input.GenerateString();
299  CurlBuffer buffer;
301  request, &buffer);
302 }
303 
305  UploadJob *job = new UploadJob;
306  Future<bool> *result = new Future<bool>();
307  job->pack = pack;
308  job->result = result;
309  upload_jobs_->EnqueueFront(job);
310  return result;
311 }
312 
314  // Set up the object pack serializer
315  ObjectPackProducer serializer(job->pack);
316 
317  shash::Any payload_digest(shash::kSha1);
318  serializer.GetDigest(&payload_digest);
319  const std::string json_msg = "{\"session_token\" : \"" + session_token_
320  + "\", \"payload_digest\" : \""
321  + payload_digest.ToString(false)
322  + "\", \"header_size\" : \""
323  + StringifyInt(serializer.GetHeaderSize())
324  + "\", \"api_version\" : \""
325  + StringifyInt(gateway::APIVersion()) + "\"}";
326 
327  // Compute HMAC
328  shash::Any hmac(shash::kSha1);
329  shash::HmacString(secret_, json_msg, &hmac);
330 
331  CurlSendPayload payload;
332  payload.json_message = &json_msg;
333  payload.pack_serializer = &serializer;
334  payload.index = 0;
335 
336  const size_t payload_size = json_msg.size() + serializer.GetHeaderSize()
337  + job->pack->size();
338 
339  // Prepare the Curl POST request
340  CURL *h_curl = curl_easy_init();
341 
342  if (!h_curl) {
343  return false;
344  }
345 
346  // Set HTTP headers (Authorization and Message-Size)
347  std::string header_str = std::string("Authorization: ") + key_id_ + " "
348  + Base64(hmac.ToString(false));
349  struct curl_slist *auth_header = NULL;
350  auth_header = curl_slist_append(auth_header, header_str.c_str());
351  header_str = std::string("Message-Size: ") + StringifyInt(json_msg.size());
352  auth_header = curl_slist_append(auth_header, header_str.c_str());
353  curl_easy_setopt(h_curl, CURLOPT_HTTPHEADER, auth_header);
354 
355  std::string reply;
356  curl_easy_setopt(h_curl, CURLOPT_NOPROGRESS, 1L);
357  curl_easy_setopt(h_curl, CURLOPT_USERAGENT, "cvmfs/" CVMFS_VERSION);
358  curl_easy_setopt(h_curl, CURLOPT_MAXREDIRS, 50L);
359  curl_easy_setopt(h_curl, CURLOPT_CUSTOMREQUEST, "POST");
360  curl_easy_setopt(h_curl, CURLOPT_URL, (api_url_ + "/payloads").c_str());
361  curl_easy_setopt(h_curl, CURLOPT_POSTFIELDS, NULL);
362  curl_easy_setopt(h_curl, CURLOPT_POSTFIELDSIZE_LARGE,
363  static_cast<curl_off_t>(payload_size));
364  curl_easy_setopt(h_curl, CURLOPT_READDATA, &payload);
365  curl_easy_setopt(h_curl, CURLOPT_READFUNCTION, SendCB);
366  curl_easy_setopt(h_curl, CURLOPT_WRITEFUNCTION, RecvCB);
367  curl_easy_setopt(h_curl, CURLOPT_WRITEDATA, &reply);
368 
369  // Perform the Curl POST request
370  CURLcode ret = curl_easy_perform(h_curl);
371  if (ret) {
373  "SessionContext::DoUpload - curl_easy_perform failed: %d", ret);
374  }
375 
377  const JSON *reply_status = JsonDocument::SearchInObject(
378  reply_json->root(), "status", JSON_STRING);
379  const bool ok = (reply_status != NULL
380  && std::string(reply_status->string_value) == "ok");
381  if (!ok) {
383  "SessionContext::DoUpload - error reply: %s", reply.c_str());
384  }
385 
386  curl_easy_cleanup(h_curl);
387  h_curl = NULL;
388 
389  return ok && !ret;
390 }
391 
392 void *SessionContext::UploadLoop(void *data) {
393  SessionContext *ctx = reinterpret_cast<SessionContext *>(data);
394  UploadJob *job;
395 
396  while (true) {
397  job = ctx->upload_jobs_->PopBack();
398  if (job == &terminator_)
399  return NULL;
400  if (!ctx->DoUpload(job)) {
401  PANIC(kLogStderr, "SessionContext: could not submit payload. Aborting.");
402  }
403  job->result->Set(true);
404  delete job->pack;
405  delete job;
406  }
407 }
408 
410 
411 } // namespace upload
std::string name() const
static void * UploadLoop(void *data)
UniquePtr< Tube< UploadJob > > upload_jobs_
void Add(const std::string &key, const std::string &val)
struct cvmcache_context * ctx
int APIVersion()
Definition: gateway_util.cc:26
std::string description() const
BucketContentType
Definition: pack.h:53
bool MakeEndRequest(const std::string &method, const std::string &key_id, const std::string &secret, const std::string &session_token, const std::string &repo_service_url, const std::string &request_payload, CurlBuffer *reply)
static JSON * SearchInObject(const JSON *json_object, const std::string &name, const json_type type)
virtual Future< bool > * DispatchObjectPack(ObjectPack *pack)=0
#define PANIC(...)
Definition: exception.h:29
void TransferBucket(const BucketHandle handle, ObjectPack *other)
Definition: pack.cc:154
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:241
pthread_mutex_t current_pack_mtx_
virtual bool InitializeDerived(uint64_t max_queue_size)
assert((mem||(size==0))&&"Out Of Memory")
uint64_t size() const
Definition: pack.h:86
void Set(const T &object)
Definition: future.h:53
bool IsEmpty()
Definition: tube.h:164
virtual bool Commit(const std::string &old_root_hash, const std::string &new_root_hash, const RepositoryTag &tag)=0
ObjectPack::BucketHandle NewBucket()
T & Get()
Definition: future.h:66
virtual Future< bool > * DispatchObjectPack(ObjectPack *pack)
virtual bool Commit(const std::string &old_root_hash, const std::string &new_root_hash, const RepositoryTag &tag)
BucketHandle NewBucket()
Definition: pack.cc:105
static JsonDocument * Create(const std::string &text)
Link * EnqueueFront(ItemT *item)
Definition: tube.h:92
uint64_t capacity
Definition: pack.h:106
bool CommitBucket(const ObjectPack::BucketContentType type, const shash::Any &id, const ObjectPack::BucketHandle handle, const std::string &name="", const bool force_dispatch=false)
virtual bool DoUpload(const UploadJob *job)
const std::string * json_message
ObjectPackProducer * pack_serializer
virtual bool FinalizeDerived()=0
void GetDigest(shash::Any *hash)
Definition: pack.cc:181
string StringifyInt(const int64_t value)
Definition: string.cc:77
std::vector< ObjectPack::BucketHandle > active_handles_
bool Finalize(bool commit, const std::string &old_root_hash, const std::string &new_root_hash, const RepositoryTag &tag)
ItemT * PopBack()
Definition: tube.h:148
string Base64(const string &data)
Definition: string.cc:537
void HmacString(const std::string &key, const std::string &content, Any *any_digest)
Definition: hash.h:515
virtual bool InitializeDerived(uint64_t max_queue_size)=0
static UploadJob terminator_
Definition: mutex.h:42
size_t SendCB(void *ptr, size_t size, size_t nmemb, void *userp)
unsigned GetHeaderSize()
Definition: pack.h:193
bool CommitBucket(const BucketContentType type, const shash::Any &id, const BucketHandle handle, const std::string &name="")
Definition: pack.cc:116
Definition: tube.h:39
Tube< Future< bool > > upload_results_
size_t GetNoObjects() const
Definition: pack.h:90
bool Initialize(const std::string &api_url, const std::string &session_token, const std::string &key_id, const std::string &secret, uint64_t max_pack_size=ObjectPack::kDefaultLimit, uint64_t max_queue_size=10)
std::string GenerateString() const
virtual bool FinalizeDerived()
struct json_value JSON
Definition: helper_allow.cc:11
static void size_t size
Definition: smalloc.h:54
size_t RecvCB(void *buffer, size_t size, size_t nmemb, void *userp)
unsigned ProduceNext(const unsigned buf_size, unsigned char *buf)
Definition: pack.cc:221
const JSON * root() const
Definition: json_document.h:25
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545