GCC Code Coverage Report | |||||||||||||||||||||
|
|||||||||||||||||||||
Line | Branch | Exec | Source |
1 |
/** |
||
2 |
* This file is part of the CernVM File System. |
||
3 |
*/ |
||
4 |
|||
5 |
#include "session_context.h" |
||
6 |
|||
7 |
#include <algorithm> |
||
8 |
#include <limits> |
||
9 |
|||
10 |
#include "curl/curl.h" |
||
11 |
#include "cvmfs_config.h" |
||
12 |
#include "gateway_util.h" |
||
13 |
#include "json_document.h" |
||
14 |
#include "swissknife_lease_curl.h" |
||
15 |
#include "util/string.h" |
||
16 |
|||
17 |
namespace { |
||
18 |
// Maximum number of jobs during a session. No limit, for practical |
||
19 |
// purposes. |
||
20 |
15 |
const uint64_t kMaxNumJobs = std::numeric_limits<uint64_t>::max(); |
|
21 |
} |
||
22 |
|||
23 |
namespace upload { |
||
24 |
|||
25 |
6 |
size_t SendCB(void* ptr, size_t size, size_t nmemb, void* userp) { |
|
26 |
6 |
CurlSendPayload* payload = static_cast<CurlSendPayload*>(userp); |
|
27 |
|||
28 |
6 |
size_t max_chunk_size = size * nmemb; |
|
29 |
✗✓ | 6 |
if (max_chunk_size < 1) { |
30 |
return 0; |
||
31 |
} |
||
32 |
|||
33 |
6 |
size_t current_chunk_size = 0; |
|
34 |
✓✓ | 18 |
while (current_chunk_size < max_chunk_size) { |
35 |
✓✓ | 8 |
if (payload->index < payload->json_message->size()) { |
36 |
// Can add a chunk from the JSON message |
||
37 |
const size_t read_size = |
||
38 |
std::min(max_chunk_size - current_chunk_size, |
||
39 |
1 |
payload->json_message->size() - payload->index); |
|
40 |
1 |
current_chunk_size += read_size; |
|
41 |
std::memcpy(ptr, payload->json_message->data() + payload->index, |
||
42 |
1 |
read_size); |
|
43 |
1 |
payload->index += read_size; |
|
44 |
} else { |
||
45 |
// Can add a chunk from the payload |
||
46 |
7 |
const size_t max_read_size = max_chunk_size - current_chunk_size; |
|
47 |
const unsigned nbytes = payload->pack_serializer->ProduceNext( |
||
48 |
7 |
max_read_size, static_cast<unsigned char*>(ptr) + current_chunk_size); |
|
49 |
7 |
current_chunk_size += nbytes; |
|
50 |
|||
51 |
✓✓ | 7 |
if (!nbytes) { |
52 |
2 |
break; |
|
53 |
} |
||
54 |
} |
||
55 |
} |
||
56 |
|||
57 |
6 |
return current_chunk_size; |
|
58 |
} |
||
59 |
|||
60 |
size_t RecvCB(void* buffer, size_t size, size_t nmemb, void* userp) { |
||
61 |
std::string* my_buffer = static_cast<std::string*>(userp); |
||
62 |
|||
63 |
if (size * nmemb < 1) { |
||
64 |
return 0; |
||
65 |
} |
||
66 |
|||
67 |
*my_buffer = static_cast<char*>(buffer); |
||
68 |
|||
69 |
return my_buffer->size(); |
||
70 |
} |
||
71 |
|||
72 |
11 |
SessionContextBase::SessionContextBase() |
|
73 |
: upload_results_(kMaxNumJobs, kMaxNumJobs), |
||
74 |
api_url_(), |
||
75 |
session_token_(), |
||
76 |
key_id_(), |
||
77 |
secret_(), |
||
78 |
queue_was_flushed_(1, 1), |
||
79 |
max_pack_size_(ObjectPack::kDefaultLimit), |
||
80 |
active_handles_(), |
||
81 |
current_pack_(NULL), |
||
82 |
current_pack_mtx_(), |
||
83 |
objects_dispatched_(0), |
||
84 |
bytes_committed_(0), |
||
85 |
11 |
bytes_dispatched_(0) {} |
|
86 |
|||
87 |
✗✗✓✗ |
11 |
SessionContextBase::~SessionContextBase() {} |
88 |
|||
89 |
11 |
bool SessionContextBase::Initialize(const std::string& api_url, |
|
90 |
const std::string& session_token, |
||
91 |
const std::string& key_id, |
||
92 |
const std::string& secret, |
||
93 |
uint64_t max_pack_size, |
||
94 |
uint64_t max_queue_size) { |
||
95 |
11 |
bool ret = true; |
|
96 |
|||
97 |
// Initialize session context lock |
||
98 |
pthread_mutexattr_t attr; |
||
99 |
✓✗✓✗ ✓✗✗✓ ✗✓ |
11 |
if (pthread_mutexattr_init(&attr) || |
100 |
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) || |
||
101 |
pthread_mutex_init(¤t_pack_mtx_, &attr) || |
||
102 |
pthread_mutexattr_destroy(&attr)) { |
||
103 |
LogCvmfs(kLogUploadGateway, kLogStderr, |
||
104 |
"Could not initialize SessionContext lock."); |
||
105 |
return false; |
||
106 |
} |
||
107 |
|||
108 |
// Set upstream URL and session token |
||
109 |
11 |
api_url_ = api_url; |
|
110 |
11 |
session_token_ = session_token; |
|
111 |
11 |
key_id_ = key_id; |
|
112 |
11 |
secret_ = secret; |
|
113 |
11 |
max_pack_size_ = max_pack_size; |
|
114 |
|||
115 |
11 |
atomic_init64(&objects_dispatched_); |
|
116 |
11 |
bytes_committed_ = 0u; |
|
117 |
11 |
bytes_dispatched_ = 0u; |
|
118 |
|||
119 |
// Ensure that the upload job and result queues are empty |
||
120 |
11 |
upload_results_.Drop(); |
|
121 |
|||
122 |
11 |
queue_was_flushed_.Drop(); |
|
123 |
11 |
queue_was_flushed_.Enqueue(true); |
|
124 |
|||
125 |
// Ensure that there are not open object packs |
||
126 |
✗✓ | 11 |
if (current_pack_) { |
127 |
LogCvmfs( |
||
128 |
kLogUploadGateway, kLogStderr, |
||
129 |
"Could not initialize SessionContext - Existing open object packs."); |
||
130 |
ret = false; |
||
131 |
} |
||
132 |
|||
133 |
✓✗✓✗ |
11 |
ret = InitializeDerived(max_queue_size) && ret; |
134 |
|||
135 |
11 |
return ret; |
|
136 |
} |
||
137 |
|||
138 |
11 |
bool SessionContextBase::Finalize(bool commit, const std::string& old_root_hash, |
|
139 |
const std::string& new_root_hash, |
||
140 |
const RepositoryTag& tag) { |
||
141 |
✗✓ | 11 |
assert(active_handles_.empty()); |
142 |
{ |
||
143 |
11 |
MutexLockGuard lock(current_pack_mtx_); |
|
144 |
|||
145 |
✓✓✓✗ ✓✓ |
11 |
if (current_pack_ && current_pack_->GetNoObjects() > 0) { |
146 |
4 |
Dispatch(); |
|
147 |
4 |
current_pack_ = NULL; |
|
148 |
} |
||
149 |
} |
||
150 |
|||
151 |
11 |
bool results = true; |
|
152 |
11 |
int64_t jobs_finished = 0; |
|
153 |
✓✓✗✓ ✓✓ |
53 |
while (!upload_results_.IsEmpty() || (jobs_finished < NumJobsSubmitted())) { |
154 |
31 |
Future<bool>* future = upload_results_.Dequeue(); |
|
155 |
✓✗✓✗ |
31 |
results = future->Get() && results; |
156 |
✓✗ | 31 |
delete future; |
157 |
31 |
jobs_finished++; |
|
158 |
} |
||
159 |
|||
160 |
✓✓ | 11 |
if (commit) { |
161 |
✓✗✗✓ ✗✓ |
7 |
if (old_root_hash.empty() || new_root_hash.empty()) { |
162 |
return false; |
||
163 |
} |
||
164 |
7 |
bool commit_result = Commit(old_root_hash, new_root_hash, tag); |
|
165 |
✗✓ | 7 |
if (!commit_result) { |
166 |
LogCvmfs(kLogUploadGateway, kLogStderr, |
||
167 |
"SessionContext: could not commit session. Aborting."); |
||
168 |
return false; |
||
169 |
} |
||
170 |
} |
||
171 |
|||
172 |
✓✗✓✗ |
11 |
results &= FinalizeDerived() && (bytes_committed_ == bytes_dispatched_); |
173 |
|||
174 |
11 |
pthread_mutex_destroy(¤t_pack_mtx_); |
|
175 |
11 |
return results; |
|
176 |
} |
||
177 |
|||
178 |
void SessionContextBase::WaitForUpload() { |
||
179 |
if (!upload_results_.IsEmpty()) { |
||
180 |
queue_was_flushed_.Dequeue(); |
||
181 |
} |
||
182 |
} |
||
183 |
|||
184 |
52 |
ObjectPack::BucketHandle SessionContextBase::NewBucket() { |
|
185 |
52 |
MutexLockGuard lock(current_pack_mtx_); |
|
186 |
✓✓ | 52 |
if (!current_pack_) { |
187 |
16 |
current_pack_ = new ObjectPack(max_pack_size_); |
|
188 |
} |
||
189 |
52 |
ObjectPack::BucketHandle hd = current_pack_->NewBucket(); |
|
190 |
52 |
active_handles_.push_back(hd); |
|
191 |
52 |
return hd; |
|
192 |
} |
||
193 |
|||
194 |
68 |
bool SessionContextBase::CommitBucket(const ObjectPack::BucketContentType type, |
|
195 |
const shash::Any& id, |
||
196 |
const ObjectPack::BucketHandle handle, |
||
197 |
const std::string& name, |
||
198 |
const bool force_dispatch) { |
||
199 |
68 |
MutexLockGuard lock(current_pack_mtx_); |
|
200 |
|||
201 |
✗✓ | 68 |
if (!current_pack_) { |
202 |
LogCvmfs(kLogUploadGateway, kLogStderr, |
||
203 |
"Error: Called SessionBaseContext::CommitBucket without an open " |
||
204 |
"ObjectPack."); |
||
205 |
return false; |
||
206 |
} |
||
207 |
|||
208 |
68 |
uint64_t size0 = current_pack_->size(); |
|
209 |
68 |
bool committed = current_pack_->CommitBucket(type, id, handle, name); |
|
210 |
|||
211 |
✓✓ | 68 |
if (committed) { // Current pack is still not full |
212 |
active_handles_.erase( |
||
213 |
std::remove(active_handles_.begin(), active_handles_.end(), handle), |
||
214 |
52 |
active_handles_.end()); |
|
215 |
52 |
uint64_t size1 = current_pack_->size(); |
|
216 |
52 |
bytes_committed_ += size1 - size0; |
|
217 |
✓✓ | 52 |
if (force_dispatch) { |
218 |
12 |
Dispatch(); |
|
219 |
12 |
current_pack_ = NULL; |
|
220 |
} |
||
221 |
} else { // Current pack is full and can be dispatched |
||
222 |
16 |
uint64_t new_size = 0; |
|
223 |
✓✓ | 16 |
if (handle->capacity > max_pack_size_) { |
224 |
1 |
new_size = handle->capacity + 1; |
|
225 |
} else { |
||
226 |
15 |
new_size = max_pack_size_; |
|
227 |
} |
||
228 |
16 |
ObjectPack* new_pack = new ObjectPack(new_size); |
|
229 |
✓✓ | 38 |
for (size_t i = 0u; i < active_handles_.size(); ++i) { |
230 |
22 |
current_pack_->TransferBucket(active_handles_[i], new_pack); |
|
231 |
} |
||
232 |
|||
233 |
✓✓ | 16 |
if (current_pack_->GetNoObjects() > 0) { |
234 |
15 |
Dispatch(); |
|
235 |
} |
||
236 |
16 |
current_pack_ = new_pack; |
|
237 |
|||
238 |
16 |
CommitBucket(type, id, handle, name, false); |
|
239 |
} |
||
240 |
|||
241 |
68 |
return true; |
|
242 |
} |
||
243 |
|||
244 |
8967 |
int64_t SessionContextBase::NumJobsSubmitted() const { |
|
245 |
8967 |
return atomic_read64(&objects_dispatched_); |
|
246 |
} |
||
247 |
|||
248 |
31 |
void SessionContextBase::Dispatch() { |
|
249 |
31 |
MutexLockGuard lock(current_pack_mtx_); |
|
250 |
|||
251 |
✗✓ | 31 |
if (!current_pack_) { |
252 |
return; |
||
253 |
} |
||
254 |
|||
255 |
31 |
atomic_inc64(&objects_dispatched_); |
|
256 |
31 |
bytes_dispatched_ += current_pack_->size(); |
|
257 |
31 |
upload_results_.Enqueue(DispatchObjectPack(current_pack_)); |
|
258 |
} |
||
259 |
|||
260 |
11 |
SessionContext::SessionContext() |
|
261 |
: SessionContextBase(), |
||
262 |
upload_jobs_(), |
||
263 |
worker_terminate_(), |
||
264 |
11 |
worker_() {} |
|
265 |
|||
266 |
11 |
bool SessionContext::InitializeDerived(uint64_t max_queue_size) { |
|
267 |
// Start worker thread |
||
268 |
11 |
atomic_init32(&worker_terminate_); |
|
269 |
|||
270 |
11 |
upload_jobs_ = new FifoChannel<UploadJob*>(max_queue_size, max_queue_size); |
|
271 |
11 |
upload_jobs_->Drop(); |
|
272 |
|||
273 |
int retval = |
||
274 |
11 |
pthread_create(&worker_, NULL, UploadLoop, reinterpret_cast<void*>(this)); |
|
275 |
|||
276 |
11 |
return !retval; |
|
277 |
} |
||
278 |
|||
279 |
11 |
bool SessionContext::FinalizeDerived() { |
|
280 |
11 |
atomic_write32(&worker_terminate_, 1); |
|
281 |
|||
282 |
11 |
pthread_join(worker_, NULL); |
|
283 |
|||
284 |
11 |
return true; |
|
285 |
} |
||
286 |
|||
287 |
bool SessionContext::Commit(const std::string& old_root_hash, |
||
288 |
const std::string& new_root_hash, |
||
289 |
const RepositoryTag& tag) { |
||
290 |
std::string request; |
||
291 |
JsonStringInput request_input; |
||
292 |
request_input.push_back( |
||
293 |
std::make_pair("old_root_hash", old_root_hash.c_str())); |
||
294 |
request_input.push_back( |
||
295 |
std::make_pair("new_root_hash", new_root_hash.c_str())); |
||
296 |
request_input.push_back(std::make_pair("tag_name", |
||
297 |
tag.name_.c_str())); |
||
298 |
request_input.push_back(std::make_pair("tag_channel", |
||
299 |
tag.channel_.c_str())); |
||
300 |
request_input.push_back(std::make_pair("tag_description", |
||
301 |
tag.description_.c_str())); |
||
302 |
ToJsonString(request_input, &request); |
||
303 |
CurlBuffer buffer; |
||
304 |
return MakeEndRequest("POST", key_id_, secret_, session_token_, api_url_, |
||
305 |
request, &buffer); |
||
306 |
} |
||
307 |
|||
308 |
31 |
Future<bool>* SessionContext::DispatchObjectPack(ObjectPack* pack) { |
|
309 |
31 |
UploadJob* job = new UploadJob; |
|
310 |
31 |
job->pack = pack; |
|
311 |
31 |
job->result = new Future<bool>(); |
|
312 |
31 |
upload_jobs_->Enqueue(job); |
|
313 |
31 |
return job->result; |
|
314 |
} |
||
315 |
|||
316 |
bool SessionContext::DoUpload(const SessionContext::UploadJob* job) { |
||
317 |
// Set up the object pack serializer |
||
318 |
ObjectPackProducer serializer(job->pack); |
||
319 |
|||
320 |
shash::Any payload_digest(shash::kSha1); |
||
321 |
serializer.GetDigest(&payload_digest); |
||
322 |
const std::string json_msg = |
||
323 |
"{\"session_token\" : \"" + session_token_ + |
||
324 |
"\", \"payload_digest\" : \"" + payload_digest.ToString(false) + |
||
325 |
"\", \"header_size\" : \"" + StringifyInt(serializer.GetHeaderSize()) + |
||
326 |
"\", \"api_version\" : \"" + StringifyInt(gateway::APIVersion()) + "\"}"; |
||
327 |
|||
328 |
// Compute HMAC |
||
329 |
shash::Any hmac(shash::kSha1); |
||
330 |
shash::HmacString(secret_, json_msg, &hmac); |
||
331 |
|||
332 |
CurlSendPayload payload; |
||
333 |
payload.json_message = &json_msg; |
||
334 |
payload.pack_serializer = &serializer; |
||
335 |
payload.index = 0; |
||
336 |
|||
337 |
const size_t payload_size = |
||
338 |
json_msg.size() + serializer.GetHeaderSize() + job->pack->size(); |
||
339 |
|||
340 |
// Prepare the Curl POST request |
||
341 |
CURL* h_curl = curl_easy_init(); |
||
342 |
|||
343 |
if (!h_curl) { |
||
344 |
return false; |
||
345 |
} |
||
346 |
|||
347 |
// Set HTTP headers (Authorization and Message-Size) |
||
348 |
std::string header_str = std::string("Authorization: ") + key_id_ + " " + |
||
349 |
Base64(hmac.ToString(false)); |
||
350 |
struct curl_slist* auth_header = NULL; |
||
351 |
auth_header = curl_slist_append(auth_header, header_str.c_str()); |
||
352 |
header_str = std::string("Message-Size: ") + StringifyInt(json_msg.size()); |
||
353 |
auth_header = curl_slist_append(auth_header, header_str.c_str()); |
||
354 |
curl_easy_setopt(h_curl, CURLOPT_HTTPHEADER, auth_header); |
||
355 |
|||
356 |
std::string reply; |
||
357 |
curl_easy_setopt(h_curl, CURLOPT_NOPROGRESS, 1L); |
||
358 |
curl_easy_setopt(h_curl, CURLOPT_USERAGENT, "cvmfs/" VERSION); |
||
359 |
curl_easy_setopt(h_curl, CURLOPT_MAXREDIRS, 50L); |
||
360 |
curl_easy_setopt(h_curl, CURLOPT_CUSTOMREQUEST, "POST"); |
||
361 |
curl_easy_setopt(h_curl, CURLOPT_URL, (api_url_ + "/payloads").c_str()); |
||
362 |
curl_easy_setopt(h_curl, CURLOPT_POSTFIELDS, NULL); |
||
363 |
curl_easy_setopt(h_curl, CURLOPT_POSTFIELDSIZE_LARGE, |
||
364 |
static_cast<curl_off_t>(payload_size)); |
||
365 |
curl_easy_setopt(h_curl, CURLOPT_READDATA, &payload); |
||
366 |
curl_easy_setopt(h_curl, CURLOPT_READFUNCTION, SendCB); |
||
367 |
curl_easy_setopt(h_curl, CURLOPT_WRITEFUNCTION, RecvCB); |
||
368 |
curl_easy_setopt(h_curl, CURLOPT_WRITEDATA, &reply); |
||
369 |
|||
370 |
// Perform the Curl POST request |
||
371 |
CURLcode ret = curl_easy_perform(h_curl); |
||
372 |
if (ret) { |
||
373 |
LogCvmfs(kLogUploadGateway, kLogStderr, |
||
374 |
"SessionContext::DoUpload - curl_easy_perform failed: %d", |
||
375 |
ret); |
||
376 |
} |
||
377 |
|||
378 |
const bool ok = (reply == "{\"status\":\"ok\"}"); |
||
379 |
if (!ok) { |
||
380 |
LogCvmfs(kLogUploadGateway, kLogStderr, |
||
381 |
"SessionContext::DoUpload - error reply: %s", |
||
382 |
reply.c_str()); |
||
383 |
} |
||
384 |
|||
385 |
curl_easy_cleanup(h_curl); |
||
386 |
h_curl = NULL; |
||
387 |
|||
388 |
return ok && !ret; |
||
389 |
} |
||
390 |
|||
391 |
11 |
void* SessionContext::UploadLoop(void* data) { |
|
392 |
11 |
SessionContext* ctx = reinterpret_cast<SessionContext*>(data); |
|
393 |
|||
394 |
11 |
int64_t jobs_processed = 0; |
|
395 |
✓✓ | 8947 |
while (!ctx->ShouldTerminate()) { |
396 |
✓✓ | 17881 |
while (jobs_processed < ctx->NumJobsSubmitted()) { |
397 |
31 |
UploadJob* job = ctx->upload_jobs_->Dequeue(); |
|
398 |
✗✓ | 31 |
if (!ctx->DoUpload(job)) { |
399 |
LogCvmfs(kLogUploadGateway, kLogStderr, |
||
400 |
"SessionContext: could not submit payload. Aborting."); |
||
401 |
abort(); |
||
402 |
} |
||
403 |
31 |
job->result->Set(true); |
|
404 |
✓✗ | 31 |
delete job->pack; |
405 |
31 |
delete job; |
|
406 |
31 |
jobs_processed++; |
|
407 |
} |
||
408 |
✗✓ | 8925 |
if (ctx->queue_was_flushed_.IsEmpty()) { |
409 |
ctx->queue_was_flushed_.Enqueue(true); |
||
410 |
} |
||
411 |
} |
||
412 |
|||
413 |
11 |
return NULL; |
|
414 |
} |
||
415 |
|||
416 |
8936 |
bool SessionContext::ShouldTerminate() { |
|
417 |
8936 |
return atomic_read32(&worker_terminate_); |
|
418 |
} |
||
419 |
|||
420 |
✓✗✓✗ |
45 |
} // namespace upload |
Generated by: GCOVR (Version 4.1) |