CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
session_context.cc
Go to the documentation of this file.
1 
5 #include "session_context.h"
6 
7 #include <algorithm>
8 #include <limits>
9 
10 #include "curl/curl.h"
11 #include "cvmfs_config.h"
12 #include "gateway_util.h"
13 #include "json_document.h"
14 #include "json_document_write.h"
15 #include "swissknife_lease_curl.h"
16 #include "util/exception.h"
17 #include "util/pointer.h"
18 #include "util/string.h"
19 
20 namespace {
21 // Maximum number of jobs during a session. No limit, for practical
22 // purposes. Note that we use uint32_t so that the Tube code works
23 // correctly with this limit on 32bit systems.
24 const uint32_t kMaxNumJobs = std::numeric_limits<uint32_t>::max();
25 }
26 
27 namespace upload {
28 
29 size_t SendCB(void* ptr, size_t size, size_t nmemb, void* userp) {
30  CurlSendPayload* payload = static_cast<CurlSendPayload*>(userp);
31 
32  size_t max_chunk_size = size * nmemb;
33  if (max_chunk_size < 1) {
34  return 0;
35  }
36 
37  size_t current_chunk_size = 0;
38  while (current_chunk_size < max_chunk_size) {
39  if (payload->index < payload->json_message->size()) {
40  // Can add a chunk from the JSON message
41  const size_t read_size =
42  std::min(max_chunk_size - current_chunk_size,
43  payload->json_message->size() - payload->index);
44  current_chunk_size += read_size;
45  std::memcpy(ptr, payload->json_message->data() + payload->index,
46  read_size);
47  payload->index += read_size;
48  } else {
49  // Can add a chunk from the payload
50  const size_t max_read_size = max_chunk_size - current_chunk_size;
51  const unsigned nbytes = payload->pack_serializer->ProduceNext(
52  max_read_size, static_cast<unsigned char*>(ptr) + current_chunk_size);
53  current_chunk_size += nbytes;
54 
55  if (!nbytes) {
56  break;
57  }
58  }
59  }
60 
61  return current_chunk_size;
62 }
63 
64 size_t RecvCB(void* buffer, size_t size, size_t nmemb, void* userp) {
65  std::string* my_buffer = static_cast<std::string*>(userp);
66 
67  if (size * nmemb < 1) {
68  return 0;
69  }
70 
71  *my_buffer = static_cast<char*>(buffer);
72 
73  return my_buffer->size();
74 }
75 
77  : upload_results_(kMaxNumJobs),
78  api_url_(),
79  session_token_(),
80  key_id_(),
81  secret_(),
82  max_pack_size_(ObjectPack::kDefaultLimit),
83  active_handles_(),
84  current_pack_(NULL),
85  current_pack_mtx_(),
86  bytes_committed_(0),
87  bytes_dispatched_(0),
88  initialized_(false) {}
89 
91 
92 bool SessionContextBase::Initialize(const std::string& api_url,
93  const std::string& session_token,
94  const std::string& key_id,
95  const std::string& secret,
96  uint64_t max_pack_size,
97  uint64_t max_queue_size) {
98  bool ret = true;
99 
100  // Initialize session context lock
101  pthread_mutexattr_t attr;
102  if (pthread_mutexattr_init(&attr) ||
103  pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) ||
104  pthread_mutex_init(&current_pack_mtx_, &attr) ||
105  pthread_mutexattr_destroy(&attr)) {
107  "Could not initialize SessionContext lock.");
108  return false;
109  }
110 
111  // Set upstream URL and session token
112  api_url_ = api_url;
113  session_token_ = session_token;
114  key_id_ = key_id;
115  secret_ = secret;
116  max_pack_size_ = max_pack_size;
117 
118  bytes_committed_ = 0u;
119  bytes_dispatched_ = 0u;
120 
122 
123  // Ensure that there are not open object packs
124  if (current_pack_) {
125  LogCvmfs(
127  "Could not initialize SessionContext - Existing open object packs.");
128  ret = false;
129  }
130 
131  ret = InitializeDerived(max_queue_size) && ret;
132 
133  initialized_ = true;
134 
135  return ret;
136 }
137 
138 bool SessionContextBase::Finalize(bool commit, const std::string& old_root_hash,
139  const std::string& new_root_hash,
140  const RepositoryTag& tag) {
141  assert(active_handles_.empty());
142  if (!initialized_) {
143  assert(!commit);
144  return true;
145  }
146 
147  {
149 
150  if (current_pack_ && current_pack_->GetNoObjects() > 0) {
151  Dispatch();
152  current_pack_ = NULL;
153  }
154  }
155 
156  bool results = true;
157  while (!upload_results_.IsEmpty()) {
159  results = future->Get() && results;
160  delete future;
161  }
162 
163  if (commit) {
164  if (old_root_hash.empty() || new_root_hash.empty()) {
165  return false;
166  }
167  bool commit_result = Commit(old_root_hash, new_root_hash, tag);
168  if (!commit_result) {
170  "SessionContext: could not commit session. Aborting.");
171  FinalizeDerived();
172  pthread_mutex_destroy(&current_pack_mtx_);
173  initialized_ = false;
174  return false;
175  }
176  }
177 
179 
180  pthread_mutex_destroy(&current_pack_mtx_);
181 
182  initialized_ = false;
183 
184  return results;
185 }
186 
189  if (!current_pack_) {
191  }
193  active_handles_.push_back(hd);
194  return hd;
195 }
196 
198  const shash::Any& id,
199  const ObjectPack::BucketHandle handle,
200  const std::string& name,
201  const bool force_dispatch) {
203 
204  if (!current_pack_) {
206  "Error: Called SessionBaseContext::CommitBucket without an open "
207  "ObjectPack.");
208  return false;
209  }
210 
211  uint64_t size0 = current_pack_->size();
212  bool committed = current_pack_->CommitBucket(type, id, handle, name);
213 
214  if (committed) { // Current pack is still not full
215  active_handles_.erase(
216  std::remove(active_handles_.begin(), active_handles_.end(), handle),
217  active_handles_.end());
218  uint64_t size1 = current_pack_->size();
219  bytes_committed_ += size1 - size0;
220  if (force_dispatch) {
221  Dispatch();
222  current_pack_ = NULL;
223  }
224  } else { // Current pack is full and can be dispatched
225  uint64_t new_size = 0;
226  if (handle->capacity > max_pack_size_) {
227  new_size = handle->capacity + 1;
228  } else {
229  new_size = max_pack_size_;
230  }
231  ObjectPack* new_pack = new ObjectPack(new_size);
232  for (size_t i = 0u; i < active_handles_.size(); ++i) {
234  }
235 
236  if (current_pack_->GetNoObjects() > 0) {
237  Dispatch();
238  }
239  current_pack_ = new_pack;
240 
241  CommitBucket(type, id, handle, name, false);
242  }
243 
244  return true;
245 }
246 
249 
250  if (!current_pack_) {
251  return;
252  }
253 
256 }
257 
259  : SessionContextBase(),
260  upload_jobs_(),
261  worker_()
262 {
263 }
264 
265 bool SessionContext::InitializeDerived(uint64_t max_queue_size) {
266  // Start worker thread
267  upload_jobs_ = new Tube<UploadJob>(max_queue_size);
268 
269  int retval =
270  pthread_create(&worker_, NULL, UploadLoop, reinterpret_cast<void*>(this));
271 
272  return !retval;
273 }
274 
276  // Note: in FinalizedDerived, we know that the worker is running. The
277  // SessionContext is called only from GatewayUploader::FinalizeSession(),
278  // which in turn is from Spooler::FinalizeSession(). The Spooler ensures
279  // that GatewayUploader::Initialize() is called on construction.
280  //
281  // TODO(jblomer): Refactor SessionContext (and Uploader*) classes to
282  // use a factory method for construction.
283  //
284  upload_jobs_->EnqueueFront(&terminator_);
285  pthread_join(worker_, NULL);
286 
287  return true;
288 }
289 
290 bool SessionContext::Commit(const std::string& old_root_hash,
291  const std::string& new_root_hash,
292  const RepositoryTag& tag) {
293  JsonStringGenerator request_input;
294  request_input.Add("old_root_hash", old_root_hash);
295  request_input.Add("new_root_hash", new_root_hash);
296  request_input.Add("tag_name", tag.name());
297  // Channels are no longer supported: send 0 (i.e. kChannelTrunk) for
298  // backwards compatibility with existing gateways
299  //
300  request_input.Add("tag_channel", 0);
301  request_input.Add("tag_description", tag.description());
302  std::string request = request_input.GenerateString();
303  CurlBuffer buffer;
305  request, &buffer);
306 }
307 
309  UploadJob *job = new UploadJob;
310  Future<bool> *result = new Future<bool>();
311  job->pack = pack;
312  job->result = result;
313  upload_jobs_->EnqueueFront(job);
314  return result;
315 }
316 
318  // Set up the object pack serializer
319  ObjectPackProducer serializer(job->pack);
320 
321  shash::Any payload_digest(shash::kSha1);
322  serializer.GetDigest(&payload_digest);
323  const std::string json_msg =
324  "{\"session_token\" : \"" + session_token_ +
325  "\", \"payload_digest\" : \"" + payload_digest.ToString(false) +
326  "\", \"header_size\" : \"" + StringifyInt(serializer.GetHeaderSize()) +
327  "\", \"api_version\" : \"" + StringifyInt(gateway::APIVersion()) + "\"}";
328 
329  // Compute HMAC
330  shash::Any hmac(shash::kSha1);
331  shash::HmacString(secret_, json_msg, &hmac);
332 
333  CurlSendPayload payload;
334  payload.json_message = &json_msg;
335  payload.pack_serializer = &serializer;
336  payload.index = 0;
337 
338  const size_t payload_size =
339  json_msg.size() + serializer.GetHeaderSize() + job->pack->size();
340 
341  // Prepare the Curl POST request
342  CURL* h_curl = curl_easy_init();
343 
344  if (!h_curl) {
345  return false;
346  }
347 
348  // Set HTTP headers (Authorization and Message-Size)
349  std::string header_str = std::string("Authorization: ") + key_id_ + " " +
350  Base64(hmac.ToString(false));
351  struct curl_slist* auth_header = NULL;
352  auth_header = curl_slist_append(auth_header, header_str.c_str());
353  header_str = std::string("Message-Size: ") + StringifyInt(json_msg.size());
354  auth_header = curl_slist_append(auth_header, header_str.c_str());
355  curl_easy_setopt(h_curl, CURLOPT_HTTPHEADER, auth_header);
356 
357  std::string reply;
358  curl_easy_setopt(h_curl, CURLOPT_NOPROGRESS, 1L);
359  curl_easy_setopt(h_curl, CURLOPT_USERAGENT, "cvmfs/" VERSION);
360  curl_easy_setopt(h_curl, CURLOPT_MAXREDIRS, 50L);
361  curl_easy_setopt(h_curl, CURLOPT_CUSTOMREQUEST, "POST");
362  curl_easy_setopt(h_curl, CURLOPT_URL, (api_url_ + "/payloads").c_str());
363  curl_easy_setopt(h_curl, CURLOPT_POSTFIELDS, NULL);
364  curl_easy_setopt(h_curl, CURLOPT_POSTFIELDSIZE_LARGE,
365  static_cast<curl_off_t>(payload_size));
366  curl_easy_setopt(h_curl, CURLOPT_READDATA, &payload);
367  curl_easy_setopt(h_curl, CURLOPT_READFUNCTION, SendCB);
368  curl_easy_setopt(h_curl, CURLOPT_WRITEFUNCTION, RecvCB);
369  curl_easy_setopt(h_curl, CURLOPT_WRITEDATA, &reply);
370 
371  // Perform the Curl POST request
372  CURLcode ret = curl_easy_perform(h_curl);
373  if (ret) {
375  "SessionContext::DoUpload - curl_easy_perform failed: %d",
376  ret);
377  }
378 
380  const JSON *reply_status =
381  JsonDocument::SearchInObject(reply_json->root(), "status", JSON_STRING);
382  const bool ok = (reply_status != NULL &&
383  std::string(reply_status->string_value) == "ok");
384  if (!ok) {
386  "SessionContext::DoUpload - error reply: %s",
387  reply.c_str());
388  }
389 
390  curl_easy_cleanup(h_curl);
391  h_curl = NULL;
392 
393  return ok && !ret;
394 }
395 
396 void* SessionContext::UploadLoop(void* data) {
397  SessionContext* ctx = reinterpret_cast<SessionContext*>(data);
398  UploadJob *job;
399 
400  while (true) {
401  job = ctx->upload_jobs_->PopBack();
402  if (job == &terminator_)
403  return NULL;
404  if (!ctx->DoUpload(job)) {
406  "SessionContext: could not submit payload. Aborting.");
407  }
408  job->result->Set(true);
409  delete job->pack;
410  delete job;
411  }
412 }
413 
415 
416 } // namespace upload
std::string name() const
static void * UploadLoop(void *data)
UniquePtr< Tube< UploadJob > > upload_jobs_
void Add(const std::string &key, const std::string &val)
struct cvmcache_context * ctx
int APIVersion()
Definition: gateway_util.cc:26
std::string description() const
BucketContentType
Definition: pack.h:53
bool MakeEndRequest(const std::string &method, const std::string &key_id, const std::string &secret, const std::string &session_token, const std::string &repo_service_url, const std::string &request_payload, CurlBuffer *reply)
static JSON * SearchInObject(const JSON *json_object, const std::string &name, const json_type type)
virtual Future< bool > * DispatchObjectPack(ObjectPack *pack)=0
#define PANIC(...)
Definition: exception.h:29
void TransferBucket(const BucketHandle handle, ObjectPack *other)
Definition: pack.cc:149
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:249
pthread_mutex_t current_pack_mtx_
virtual bool InitializeDerived(uint64_t max_queue_size)
assert((mem||(size==0))&&"Out Of Memory")
uint64_t size() const
Definition: pack.h:82
void Set(const T &object)
Definition: future.h:53
bool IsEmpty()
Definition: tube.h:165
virtual bool Commit(const std::string &old_root_hash, const std::string &new_root_hash, const RepositoryTag &tag)=0
ObjectPack::BucketHandle NewBucket()
T & Get()
Definition: future.h:66
virtual Future< bool > * DispatchObjectPack(ObjectPack *pack)
virtual bool Commit(const std::string &old_root_hash, const std::string &new_root_hash, const RepositoryTag &tag)
BucketHandle NewBucket()
Definition: pack.cc:102
static JsonDocument * Create(const std::string &text)
Link * EnqueueFront(ItemT *item)
Definition: tube.h:93
uint64_t capacity
Definition: pack.h:102
bool CommitBucket(const ObjectPack::BucketContentType type, const shash::Any &id, const ObjectPack::BucketHandle handle, const std::string &name="", const bool force_dispatch=false)
virtual bool DoUpload(const UploadJob *job)
const std::string * json_message
ObjectPackProducer * pack_serializer
virtual bool FinalizeDerived()=0
void GetDigest(shash::Any *hash)
Definition: pack.cc:176
string StringifyInt(const int64_t value)
Definition: string.cc:78
std::vector< ObjectPack::BucketHandle > active_handles_
bool Finalize(bool commit, const std::string &old_root_hash, const std::string &new_root_hash, const RepositoryTag &tag)
ItemT * PopBack()
Definition: tube.h:149
string Base64(const string &data)
Definition: string.cc:504
void HmacString(const std::string &key, const std::string &content, Any *any_digest)
Definition: hash.h:527
virtual bool InitializeDerived(uint64_t max_queue_size)=0
static UploadJob terminator_
Definition: mutex.h:42
size_t SendCB(void *ptr, size_t size, size_t nmemb, void *userp)
unsigned GetHeaderSize()
Definition: pack.h:189
bool CommitBucket(const BucketContentType type, const shash::Any &id, const BucketHandle handle, const std::string &name="")
Definition: pack.cc:113
Definition: tube.h:39
Tube< Future< bool > > upload_results_
size_t GetNoObjects() const
Definition: pack.h:86
bool Initialize(const std::string &api_url, const std::string &session_token, const std::string &key_id, const std::string &secret, uint64_t max_pack_size=ObjectPack::kDefaultLimit, uint64_t max_queue_size=10)
std::string GenerateString() const
virtual bool FinalizeDerived()
struct json_value JSON
Definition: helper_allow.cc:11
static void size_t size
Definition: smalloc.h:54
size_t RecvCB(void *buffer, size_t size, size_t nmemb, void *userp)
unsigned ProduceNext(const unsigned buf_size, unsigned char *buf)
Definition: pack.cc:216
const JSON * root() const
Definition: json_document.h:25
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528