GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/session_context.cc Lines: 131 205 63.9 %
Date: 2019-02-03 02:48:13 Branches: 61 106 57.5 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#include "session_context.h"
6
7
#include <algorithm>
8
#include <limits>
9
10
#include "curl/curl.h"
11
#include "cvmfs_config.h"
12
#include "gateway_util.h"
13
#include "json_document.h"
14
#include "swissknife_lease_curl.h"
15
#include "util/string.h"
16
17
namespace {
18
// Maximum number of jobs during a session. No limit, for practical
19
// purposes.
20
15
const uint64_t kMaxNumJobs = std::numeric_limits<uint64_t>::max();
21
}
22
23
namespace upload {
24
25
6
size_t SendCB(void* ptr, size_t size, size_t nmemb, void* userp) {
26
6
  CurlSendPayload* payload = static_cast<CurlSendPayload*>(userp);
27
28
6
  size_t max_chunk_size = size * nmemb;
29
6
  if (max_chunk_size < 1) {
30
    return 0;
31
  }
32
33
6
  size_t current_chunk_size = 0;
34
18
  while (current_chunk_size < max_chunk_size) {
35
8
    if (payload->index < payload->json_message->size()) {
36
      // Can add a chunk from the JSON message
37
      const size_t read_size =
38
          std::min(max_chunk_size - current_chunk_size,
39
1
                   payload->json_message->size() - payload->index);
40
1
      current_chunk_size += read_size;
41
      std::memcpy(ptr, payload->json_message->data() + payload->index,
42
1
                  read_size);
43
1
      payload->index += read_size;
44
    } else {
45
      // Can add a chunk from the payload
46
7
      const size_t max_read_size = max_chunk_size - current_chunk_size;
47
      const unsigned nbytes = payload->pack_serializer->ProduceNext(
48
7
          max_read_size, static_cast<unsigned char*>(ptr) + current_chunk_size);
49
7
      current_chunk_size += nbytes;
50
51
7
      if (!nbytes) {
52
2
        break;
53
      }
54
    }
55
  }
56
57
6
  return current_chunk_size;
58
}
59
60
size_t RecvCB(void* buffer, size_t size, size_t nmemb, void* userp) {
61
  std::string* my_buffer = static_cast<std::string*>(userp);
62
63
  if (size * nmemb < 1) {
64
    return 0;
65
  }
66
67
  *my_buffer = static_cast<char*>(buffer);
68
69
  return my_buffer->size();
70
}
71
72
11
SessionContextBase::SessionContextBase()
73
    : upload_results_(kMaxNumJobs, kMaxNumJobs),
74
      api_url_(),
75
      session_token_(),
76
      key_id_(),
77
      secret_(),
78
      queue_was_flushed_(1, 1),
79
      max_pack_size_(ObjectPack::kDefaultLimit),
80
      active_handles_(),
81
      current_pack_(NULL),
82
      current_pack_mtx_(),
83
      objects_dispatched_(0),
84
      bytes_committed_(0),
85
11
      bytes_dispatched_(0) {}
86
87

11
SessionContextBase::~SessionContextBase() {}
88
89
11
bool SessionContextBase::Initialize(const std::string& api_url,
90
                                    const std::string& session_token,
91
                                    const std::string& key_id,
92
                                    const std::string& secret,
93
                                    uint64_t max_pack_size,
94
                                    uint64_t max_queue_size) {
95
11
  bool ret = true;
96
97
  // Initialize session context lock
98
  pthread_mutexattr_t attr;
99


11
  if (pthread_mutexattr_init(&attr) ||
100
      pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) ||
101
      pthread_mutex_init(&current_pack_mtx_, &attr) ||
102
      pthread_mutexattr_destroy(&attr)) {
103
    LogCvmfs(kLogUploadGateway, kLogStderr,
104
             "Could not initialize SessionContext lock.");
105
    return false;
106
  }
107
108
  // Set upstream URL and session token
109
11
  api_url_ = api_url;
110
11
  session_token_ = session_token;
111
11
  key_id_ = key_id;
112
11
  secret_ = secret;
113
11
  max_pack_size_ = max_pack_size;
114
115
11
  atomic_init64(&objects_dispatched_);
116
11
  bytes_committed_ = 0u;
117
11
  bytes_dispatched_ = 0u;
118
119
  // Ensure that the upload job and result queues are empty
120
11
  upload_results_.Drop();
121
122
11
  queue_was_flushed_.Drop();
123
11
  queue_was_flushed_.Enqueue(true);
124
125
  // Ensure that there are not open object packs
126
11
  if (current_pack_) {
127
    LogCvmfs(
128
        kLogUploadGateway, kLogStderr,
129
        "Could not initialize SessionContext - Existing open object packs.");
130
    ret = false;
131
  }
132
133

11
  ret = InitializeDerived(max_queue_size) && ret;
134
135
11
  return ret;
136
}
137
138
11
bool SessionContextBase::Finalize(bool commit, const std::string& old_root_hash,
139
                                  const std::string& new_root_hash,
140
                                  const RepositoryTag& tag) {
141
11
  assert(active_handles_.empty());
142
  {
143
11
    MutexLockGuard lock(current_pack_mtx_);
144
145

11
    if (current_pack_ && current_pack_->GetNoObjects() > 0) {
146
4
      Dispatch();
147
4
      current_pack_ = NULL;
148
    }
149
  }
150
151
11
  bool results = true;
152
11
  int64_t jobs_finished = 0;
153

53
  while (!upload_results_.IsEmpty() || (jobs_finished < NumJobsSubmitted())) {
154
31
    Future<bool>* future = upload_results_.Dequeue();
155

31
    results = future->Get() && results;
156
31
    delete future;
157
31
    jobs_finished++;
158
  }
159
160
11
  if (commit) {
161

7
    if (old_root_hash.empty() || new_root_hash.empty()) {
162
      return false;
163
    }
164
7
    bool commit_result = Commit(old_root_hash, new_root_hash, tag);
165
7
    if (!commit_result) {
166
      LogCvmfs(kLogUploadGateway, kLogStderr,
167
               "SessionContext: could not commit session. Aborting.");
168
      return false;
169
    }
170
  }
171
172

11
  results &= FinalizeDerived() && (bytes_committed_ == bytes_dispatched_);
173
174
11
  pthread_mutex_destroy(&current_pack_mtx_);
175
11
  return results;
176
}
177
178
void SessionContextBase::WaitForUpload() {
179
  if (!upload_results_.IsEmpty()) {
180
    queue_was_flushed_.Dequeue();
181
  }
182
}
183
184
52
ObjectPack::BucketHandle SessionContextBase::NewBucket() {
185
52
  MutexLockGuard lock(current_pack_mtx_);
186
52
  if (!current_pack_) {
187
16
    current_pack_ = new ObjectPack(max_pack_size_);
188
  }
189
52
  ObjectPack::BucketHandle hd = current_pack_->NewBucket();
190
52
  active_handles_.push_back(hd);
191
52
  return hd;
192
}
193
194
68
bool SessionContextBase::CommitBucket(const ObjectPack::BucketContentType type,
195
                                      const shash::Any& id,
196
                                      const ObjectPack::BucketHandle handle,
197
                                      const std::string& name,
198
                                      const bool force_dispatch) {
199
68
  MutexLockGuard lock(current_pack_mtx_);
200
201
68
  if (!current_pack_) {
202
    LogCvmfs(kLogUploadGateway, kLogStderr,
203
             "Error: Called SessionBaseContext::CommitBucket without an open "
204
             "ObjectPack.");
205
    return false;
206
  }
207
208
68
  uint64_t size0 = current_pack_->size();
209
68
  bool committed = current_pack_->CommitBucket(type, id, handle, name);
210
211
68
  if (committed) {  // Current pack is still not full
212
    active_handles_.erase(
213
        std::remove(active_handles_.begin(), active_handles_.end(), handle),
214
52
        active_handles_.end());
215
52
    uint64_t size1 = current_pack_->size();
216
52
    bytes_committed_ += size1 - size0;
217
52
    if (force_dispatch) {
218
12
      Dispatch();
219
12
      current_pack_ = NULL;
220
    }
221
  } else {  // Current pack is full and can be dispatched
222
16
    uint64_t new_size = 0;
223
16
    if (handle->capacity > max_pack_size_) {
224
1
      new_size = handle->capacity + 1;
225
    } else {
226
15
      new_size = max_pack_size_;
227
    }
228
16
    ObjectPack* new_pack = new ObjectPack(new_size);
229
38
    for (size_t i = 0u; i < active_handles_.size(); ++i) {
230
22
      current_pack_->TransferBucket(active_handles_[i], new_pack);
231
    }
232
233
16
    if (current_pack_->GetNoObjects() > 0) {
234
15
      Dispatch();
235
    }
236
16
    current_pack_ = new_pack;
237
238
16
    CommitBucket(type, id, handle, name, false);
239
  }
240
241
68
  return true;
242
}
243
244
8967
int64_t SessionContextBase::NumJobsSubmitted() const {
245
8967
  return atomic_read64(&objects_dispatched_);
246
}
247
248
31
void SessionContextBase::Dispatch() {
249
31
  MutexLockGuard lock(current_pack_mtx_);
250
251
31
  if (!current_pack_) {
252
    return;
253
  }
254
255
31
  atomic_inc64(&objects_dispatched_);
256
31
  bytes_dispatched_ += current_pack_->size();
257
31
  upload_results_.Enqueue(DispatchObjectPack(current_pack_));
258
}
259
260
11
SessionContext::SessionContext()
261
    : SessionContextBase(),
262
      upload_jobs_(),
263
      worker_terminate_(),
264
11
      worker_() {}
265
266
11
bool SessionContext::InitializeDerived(uint64_t max_queue_size) {
267
  // Start worker thread
268
11
  atomic_init32(&worker_terminate_);
269
270
11
  upload_jobs_ = new FifoChannel<UploadJob*>(max_queue_size, max_queue_size);
271
11
  upload_jobs_->Drop();
272
273
  int retval =
274
11
      pthread_create(&worker_, NULL, UploadLoop, reinterpret_cast<void*>(this));
275
276
11
  return !retval;
277
}
278
279
11
bool SessionContext::FinalizeDerived() {
280
11
  atomic_write32(&worker_terminate_, 1);
281
282
11
  pthread_join(worker_, NULL);
283
284
11
  return true;
285
}
286
287
bool SessionContext::Commit(const std::string& old_root_hash,
288
                            const std::string& new_root_hash,
289
                            const RepositoryTag& tag) {
290
  std::string request;
291
  JsonStringInput request_input;
292
  request_input.push_back(
293
      std::make_pair("old_root_hash", old_root_hash.c_str()));
294
  request_input.push_back(
295
      std::make_pair("new_root_hash", new_root_hash.c_str()));
296
  request_input.push_back(std::make_pair("tag_name",
297
                                         tag.name_.c_str()));
298
  request_input.push_back(std::make_pair("tag_channel",
299
                                         tag.channel_.c_str()));
300
  request_input.push_back(std::make_pair("tag_description",
301
                                         tag.description_.c_str()));
302
  ToJsonString(request_input, &request);
303
  CurlBuffer buffer;
304
  return MakeEndRequest("POST", key_id_, secret_, session_token_, api_url_,
305
                        request, &buffer);
306
}
307
308
31
Future<bool>* SessionContext::DispatchObjectPack(ObjectPack* pack) {
309
31
  UploadJob* job = new UploadJob;
310
31
  job->pack = pack;
311
31
  job->result = new Future<bool>();
312
31
  upload_jobs_->Enqueue(job);
313
31
  return job->result;
314
}
315
316
bool SessionContext::DoUpload(const SessionContext::UploadJob* job) {
317
  // Set up the object pack serializer
318
  ObjectPackProducer serializer(job->pack);
319
320
  shash::Any payload_digest(shash::kSha1);
321
  serializer.GetDigest(&payload_digest);
322
  const std::string json_msg =
323
      "{\"session_token\" : \"" + session_token_ +
324
      "\", \"payload_digest\" : \"" + payload_digest.ToString(false) +
325
      "\", \"header_size\" : \"" + StringifyInt(serializer.GetHeaderSize()) +
326
      "\", \"api_version\" : \"" + StringifyInt(gateway::APIVersion()) + "\"}";
327
328
  // Compute HMAC
329
  shash::Any hmac(shash::kSha1);
330
  shash::HmacString(secret_, json_msg, &hmac);
331
332
  CurlSendPayload payload;
333
  payload.json_message = &json_msg;
334
  payload.pack_serializer = &serializer;
335
  payload.index = 0;
336
337
  const size_t payload_size =
338
      json_msg.size() + serializer.GetHeaderSize() + job->pack->size();
339
340
  // Prepare the Curl POST request
341
  CURL* h_curl = curl_easy_init();
342
343
  if (!h_curl) {
344
    return false;
345
  }
346
347
  // Set HTTP headers (Authorization and Message-Size)
348
  std::string header_str = std::string("Authorization: ") + key_id_ + " " +
349
                           Base64(hmac.ToString(false));
350
  struct curl_slist* auth_header = NULL;
351
  auth_header = curl_slist_append(auth_header, header_str.c_str());
352
  header_str = std::string("Message-Size: ") + StringifyInt(json_msg.size());
353
  auth_header = curl_slist_append(auth_header, header_str.c_str());
354
  curl_easy_setopt(h_curl, CURLOPT_HTTPHEADER, auth_header);
355
356
  std::string reply;
357
  curl_easy_setopt(h_curl, CURLOPT_NOPROGRESS, 1L);
358
  curl_easy_setopt(h_curl, CURLOPT_USERAGENT, "cvmfs/" VERSION);
359
  curl_easy_setopt(h_curl, CURLOPT_MAXREDIRS, 50L);
360
  curl_easy_setopt(h_curl, CURLOPT_CUSTOMREQUEST, "POST");
361
  curl_easy_setopt(h_curl, CURLOPT_URL, (api_url_ + "/payloads").c_str());
362
  curl_easy_setopt(h_curl, CURLOPT_POSTFIELDS, NULL);
363
  curl_easy_setopt(h_curl, CURLOPT_POSTFIELDSIZE_LARGE,
364
                   static_cast<curl_off_t>(payload_size));
365
  curl_easy_setopt(h_curl, CURLOPT_READDATA, &payload);
366
  curl_easy_setopt(h_curl, CURLOPT_READFUNCTION, SendCB);
367
  curl_easy_setopt(h_curl, CURLOPT_WRITEFUNCTION, RecvCB);
368
  curl_easy_setopt(h_curl, CURLOPT_WRITEDATA, &reply);
369
370
  // Perform the Curl POST request
371
  CURLcode ret = curl_easy_perform(h_curl);
372
  if (ret) {
373
    LogCvmfs(kLogUploadGateway, kLogStderr,
374
             "SessionContext::DoUpload - curl_easy_perform failed: %d",
375
             ret);
376
  }
377
378
  const bool ok = (reply == "{\"status\":\"ok\"}");
379
  if (!ok) {
380
    LogCvmfs(kLogUploadGateway, kLogStderr,
381
             "SessionContext::DoUpload - error reply: %s",
382
             reply.c_str());
383
  }
384
385
  curl_easy_cleanup(h_curl);
386
  h_curl = NULL;
387
388
  return ok && !ret;
389
}
390
391
11
void* SessionContext::UploadLoop(void* data) {
392
11
  SessionContext* ctx = reinterpret_cast<SessionContext*>(data);
393
394
11
  int64_t jobs_processed = 0;
395
8947
  while (!ctx->ShouldTerminate()) {
396
17881
    while (jobs_processed < ctx->NumJobsSubmitted()) {
397
31
      UploadJob* job = ctx->upload_jobs_->Dequeue();
398
31
      if (!ctx->DoUpload(job)) {
399
        LogCvmfs(kLogUploadGateway, kLogStderr,
400
                 "SessionContext: could not submit payload. Aborting.");
401
        abort();
402
      }
403
31
      job->result->Set(true);
404
31
      delete job->pack;
405
31
      delete job;
406
31
      jobs_processed++;
407
    }
408
8925
    if (ctx->queue_was_flushed_.IsEmpty()) {
409
      ctx->queue_was_flushed_.Enqueue(true);
410
    }
411
  }
412
413
11
  return NULL;
414
}
415
416
8936
bool SessionContext::ShouldTerminate() {
417
8936
  return atomic_read32(&worker_terminate_);
418
}
419
420

45
}  // namespace upload