GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/session_context.cc
Date: 2024-04-21 02:33:16
Exec Total Coverage
Lines: 140 222 63.1%
Branches: 80 298 26.8%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "session_context.h"
6
7 #include <algorithm>
8 #include <limits>
9
10 #include "curl/curl.h"
11 #include "cvmfs_config.h"
12 #include "gateway_util.h"
13 #include "json_document.h"
14 #include "json_document_write.h"
15 #include "swissknife_lease_curl.h"
16 #include "util/exception.h"
17 #include "util/pointer.h"
18 #include "util/string.h"
19
20 namespace {
21 // Maximum number of jobs during a session. No limit, for practical
22 // purposes. Note that we use uint32_t so that the Tube code works
23 // correctly with this limit on 32bit systems.
24 const uint32_t kMaxNumJobs = std::numeric_limits<uint32_t>::max();
25 }
26
27 namespace upload {
28
29 6 size_t SendCB(void* ptr, size_t size, size_t nmemb, void* userp) {
30 6 CurlSendPayload* payload = static_cast<CurlSendPayload*>(userp);
31
32 6 size_t max_chunk_size = size * nmemb;
33
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (max_chunk_size < 1) {
34 return 0;
35 }
36
37 6 size_t current_chunk_size = 0;
38
2/2
✓ Branch 0 taken 8 times.
✓ Branch 1 taken 4 times.
12 while (current_chunk_size < max_chunk_size) {
39
2/2
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 7 times.
8 if (payload->index < payload->json_message->size()) {
40 // Can add a chunk from the JSON message
41 const size_t read_size =
42 2 std::min(max_chunk_size - current_chunk_size,
43 1 payload->json_message->size() - payload->index);
44 1 current_chunk_size += read_size;
45 1 std::memcpy(ptr, payload->json_message->data() + payload->index,
46 read_size);
47 1 payload->index += read_size;
48 } else {
49 // Can add a chunk from the payload
50 7 const size_t max_read_size = max_chunk_size - current_chunk_size;
51 7 const unsigned nbytes = payload->pack_serializer->ProduceNext(
52 max_read_size, static_cast<unsigned char*>(ptr) + current_chunk_size);
53 7 current_chunk_size += nbytes;
54
55
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 5 times.
7 if (!nbytes) {
56 2 break;
57 }
58 }
59 }
60
61 6 return current_chunk_size;
62 }
63
64 size_t RecvCB(void* buffer, size_t size, size_t nmemb, void* userp) {
65 std::string* my_buffer = static_cast<std::string*>(userp);
66
67 if (size * nmemb < 1) {
68 return 0;
69 }
70
71 *my_buffer = static_cast<char*>(buffer);
72
73 return my_buffer->size();
74 }
75
76 8 SessionContextBase::SessionContextBase()
77 8 : upload_results_(kMaxNumJobs),
78 8 api_url_(),
79 8 session_token_(),
80 8 key_id_(),
81 8 secret_(),
82 8 max_pack_size_(ObjectPack::kDefaultLimit),
83 8 active_handles_(),
84 8 current_pack_(NULL),
85 8 current_pack_mtx_(),
86 8 bytes_committed_(0),
87 8 bytes_dispatched_(0),
88 8 initialized_(false) {}
89
90 16 SessionContextBase::~SessionContextBase() {}
91
92 8 bool SessionContextBase::Initialize(const std::string& api_url,
93 const std::string& session_token,
94 const std::string& key_id,
95 const std::string& secret,
96 uint64_t max_pack_size,
97 uint64_t max_queue_size) {
98 8 bool ret = true;
99
100 // Initialize session context lock
101 pthread_mutexattr_t attr;
102
1/2
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
16 if (pthread_mutexattr_init(&attr) ||
103
1/2
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
16 pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) ||
104
3/6
✓ Branch 0 taken 8 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 8 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 8 times.
24 pthread_mutex_init(&current_pack_mtx_, &attr) ||
105 8 pthread_mutexattr_destroy(&attr)) {
106 LogCvmfs(kLogUploadGateway, kLogStderr,
107 "Could not initialize SessionContext lock.");
108 return false;
109 }
110
111 // Set upstream URL and session token
112
1/2
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
8 api_url_ = api_url;
113
1/2
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
8 session_token_ = session_token;
114
1/2
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
8 key_id_ = key_id;
115
1/2
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
8 secret_ = secret;
116 8 max_pack_size_ = max_pack_size;
117
118 8 bytes_committed_ = 0u;
119 8 bytes_dispatched_ = 0u;
120
121
2/4
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 8 times.
8 assert(upload_results_.IsEmpty());
122
123 // Ensure that there are not open object packs
124
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8 times.
8 if (current_pack_) {
125 LogCvmfs(
126 kLogUploadGateway, kLogStderr,
127 "Could not initialize SessionContext - Existing open object packs.");
128 ret = false;
129 }
130
131
3/6
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 8 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 8 times.
✗ Branch 6 not taken.
8 ret = InitializeDerived(max_queue_size) && ret;
132
133 8 initialized_ = true;
134
135 8 return ret;
136 }
137
138 8 bool SessionContextBase::Finalize(bool commit, const std::string& old_root_hash,
139 const std::string& new_root_hash,
140 const RepositoryTag& tag) {
141
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 8 times.
8 assert(active_handles_.empty());
142
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8 times.
8 if (!initialized_) {
143 assert(!commit);
144 return true;
145 }
146
147 {
148 8 MutexLockGuard lock(current_pack_mtx_);
149
150
5/6
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 4 times.
✓ Branch 3 taken 4 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 4 times.
✓ Branch 6 taken 4 times.
8 if (current_pack_ && current_pack_->GetNoObjects() > 0) {
151
1/2
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
4 Dispatch();
152 4 current_pack_ = NULL;
153 }
154 8 }
155
156 8 bool results = true;
157
2/2
✓ Branch 1 taken 31 times.
✓ Branch 2 taken 8 times.
39 while (!upload_results_.IsEmpty()) {
158 31 Future<bool>* future = upload_results_.PopBack();
159
2/4
✓ Branch 1 taken 31 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 31 times.
✗ Branch 4 not taken.
31 results = future->Get() && results;
160
1/2
✓ Branch 0 taken 31 times.
✗ Branch 1 not taken.
31 delete future;
161 }
162
163
2/2
✓ Branch 0 taken 7 times.
✓ Branch 1 taken 1 times.
8 if (commit) {
164
3/6
✓ Branch 1 taken 7 times.
✗ Branch 2 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 7 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 7 times.
7 if (old_root_hash.empty() || new_root_hash.empty()) {
165 return false;
166 }
167 7 bool commit_result = Commit(old_root_hash, new_root_hash, tag);
168
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7 times.
7 if (!commit_result) {
169 LogCvmfs(kLogUploadGateway, kLogStderr,
170 "SessionContext: could not commit session. Aborting.");
171 FinalizeDerived();
172 pthread_mutex_destroy(&current_pack_mtx_);
173 initialized_ = false;
174 return false;
175 }
176 }
177
178
2/4
✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 8 times.
✗ Branch 4 not taken.
8 results &= FinalizeDerived() && (bytes_committed_ == bytes_dispatched_);
179
180 8 pthread_mutex_destroy(&current_pack_mtx_);
181
182 8 initialized_ = false;
183
184 8 return results;
185 }
186
187 52 ObjectPack::BucketHandle SessionContextBase::NewBucket() {
188 52 MutexLockGuard lock(current_pack_mtx_);
189
2/2
✓ Branch 0 taken 16 times.
✓ Branch 1 taken 36 times.
52 if (!current_pack_) {
190
2/4
✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 16 times.
✗ Branch 5 not taken.
16 current_pack_ = new ObjectPack(max_pack_size_);
191 }
192
1/2
✓ Branch 1 taken 52 times.
✗ Branch 2 not taken.
52 ObjectPack::BucketHandle hd = current_pack_->NewBucket();
193
1/2
✓ Branch 1 taken 52 times.
✗ Branch 2 not taken.
52 active_handles_.push_back(hd);
194 52 return hd;
195 52 }
196
197 68 bool SessionContextBase::CommitBucket(const ObjectPack::BucketContentType type,
198 const shash::Any& id,
199 const ObjectPack::BucketHandle handle,
200 const std::string& name,
201 const bool force_dispatch) {
202 68 MutexLockGuard lock(current_pack_mtx_);
203
204
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 68 times.
68 if (!current_pack_) {
205 LogCvmfs(kLogUploadGateway, kLogStderr,
206 "Error: Called SessionBaseContext::CommitBucket without an open "
207 "ObjectPack.");
208 return false;
209 }
210
211 68 uint64_t size0 = current_pack_->size();
212
1/2
✓ Branch 1 taken 68 times.
✗ Branch 2 not taken.
68 bool committed = current_pack_->CommitBucket(type, id, handle, name);
213
214
2/2
✓ Branch 0 taken 52 times.
✓ Branch 1 taken 16 times.
68 if (committed) { // Current pack is still not full
215
1/2
✓ Branch 3 taken 52 times.
✗ Branch 4 not taken.
104 active_handles_.erase(
216
1/2
✓ Branch 3 taken 52 times.
✗ Branch 4 not taken.
52 std::remove(active_handles_.begin(), active_handles_.end(), handle),
217 52 active_handles_.end());
218 52 uint64_t size1 = current_pack_->size();
219 52 bytes_committed_ += size1 - size0;
220
2/2
✓ Branch 0 taken 12 times.
✓ Branch 1 taken 40 times.
52 if (force_dispatch) {
221
1/2
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
12 Dispatch();
222 12 current_pack_ = NULL;
223 }
224 } else { // Current pack is full and can be dispatched
225 16 uint64_t new_size = 0;
226
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 15 times.
16 if (handle->capacity > max_pack_size_) {
227 1 new_size = handle->capacity + 1;
228 } else {
229 15 new_size = max_pack_size_;
230 }
231
2/4
✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 16 times.
✗ Branch 5 not taken.
16 ObjectPack* new_pack = new ObjectPack(new_size);
232
2/2
✓ Branch 1 taken 22 times.
✓ Branch 2 taken 16 times.
38 for (size_t i = 0u; i < active_handles_.size(); ++i) {
233
1/2
✓ Branch 2 taken 22 times.
✗ Branch 3 not taken.
22 current_pack_->TransferBucket(active_handles_[i], new_pack);
234 }
235
236
2/2
✓ Branch 1 taken 15 times.
✓ Branch 2 taken 1 times.
16 if (current_pack_->GetNoObjects() > 0) {
237
1/2
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
15 Dispatch();
238 }
239 16 current_pack_ = new_pack;
240
241
1/2
✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
16 CommitBucket(type, id, handle, name, false);
242 }
243
244 68 return true;
245 68 }
246
247 31 void SessionContextBase::Dispatch() {
248 31 MutexLockGuard lock(current_pack_mtx_);
249
250
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 31 times.
31 if (!current_pack_) {
251 return;
252 }
253
254 31 bytes_dispatched_ += current_pack_->size();
255
2/4
✓ Branch 1 taken 31 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 31 times.
✗ Branch 5 not taken.
31 upload_results_.EnqueueFront(DispatchObjectPack(current_pack_));
256
1/2
✓ Branch 1 taken 31 times.
✗ Branch 2 not taken.
31 }
257
258 8 SessionContext::SessionContext()
259 : SessionContextBase(),
260 8 upload_jobs_(),
261
1/2
✓ Branch 2 taken 8 times.
✗ Branch 3 not taken.
8 worker_()
262 {
263 8 }
264
265 8 bool SessionContext::InitializeDerived(uint64_t max_queue_size) {
266 // Start worker thread
267
1/2
✓ Branch 2 taken 8 times.
✗ Branch 3 not taken.
8 upload_jobs_ = new Tube<UploadJob>(max_queue_size);
268
269 int retval =
270 8 pthread_create(&worker_, NULL, UploadLoop, reinterpret_cast<void*>(this));
271
272 8 return !retval;
273 }
274
275 8 bool SessionContext::FinalizeDerived() {
276 // Note: in FinalizedDerived, we know that the worker is running. The
277 // SessionContext is called only from GatewayUploader::FinalizeSession(),
278 // which in turn is from Spooler::FinalizeSession(). The Spooler ensures
279 // that GatewayUploader::Initialize() is called on construction.
280 //
281 // TODO(jblomer): Refactor SessionContext (and Uploader*) classes to
282 // use a factory method for construction.
283 //
284 8 upload_jobs_->EnqueueFront(&terminator_);
285 8 pthread_join(worker_, NULL);
286
287 8 return true;
288 }
289
290 bool SessionContext::Commit(const std::string& old_root_hash,
291 const std::string& new_root_hash,
292 const RepositoryTag& tag) {
293 JsonStringGenerator request_input;
294 request_input.Add("old_root_hash", old_root_hash);
295 request_input.Add("new_root_hash", new_root_hash);
296 request_input.Add("tag_name", tag.name());
297 // Channels are no longer supported: send 0 (i.e. kChannelTrunk) for
298 // backwards compatibility with existing gateways
299 //
300 request_input.Add("tag_channel", 0);
301 request_input.Add("tag_description", tag.description());
302 std::string request = request_input.GenerateString();
303 CurlBuffer buffer;
304 return MakeEndRequest("POST", key_id_, secret_, session_token_, api_url_,
305 request, &buffer);
306 }
307
308 31 Future<bool> *SessionContext::DispatchObjectPack(ObjectPack* pack) {
309 31 UploadJob *job = new UploadJob;
310 31 Future<bool> *result = new Future<bool>();
311 31 job->pack = pack;
312 31 job->result = result;
313 31 upload_jobs_->EnqueueFront(job);
314 31 return result;
315 }
316
317 bool SessionContext::DoUpload(const SessionContext::UploadJob* job) {
318 // Set up the object pack serializer
319 ObjectPackProducer serializer(job->pack);
320
321 shash::Any payload_digest(shash::kSha1);
322 serializer.GetDigest(&payload_digest);
323 const std::string json_msg =
324 "{\"session_token\" : \"" + session_token_ +
325 "\", \"payload_digest\" : \"" + payload_digest.ToString(false) +
326 "\", \"header_size\" : \"" + StringifyInt(serializer.GetHeaderSize()) +
327 "\", \"api_version\" : \"" + StringifyInt(gateway::APIVersion()) + "\"}";
328
329 // Compute HMAC
330 shash::Any hmac(shash::kSha1);
331 shash::HmacString(secret_, json_msg, &hmac);
332
333 CurlSendPayload payload;
334 payload.json_message = &json_msg;
335 payload.pack_serializer = &serializer;
336 payload.index = 0;
337
338 const size_t payload_size =
339 json_msg.size() + serializer.GetHeaderSize() + job->pack->size();
340
341 // Prepare the Curl POST request
342 CURL* h_curl = curl_easy_init();
343
344 if (!h_curl) {
345 return false;
346 }
347
348 // Set HTTP headers (Authorization and Message-Size)
349 std::string header_str = std::string("Authorization: ") + key_id_ + " " +
350 Base64(hmac.ToString(false));
351 struct curl_slist* auth_header = NULL;
352 auth_header = curl_slist_append(auth_header, header_str.c_str());
353 header_str = std::string("Message-Size: ") + StringifyInt(json_msg.size());
354 auth_header = curl_slist_append(auth_header, header_str.c_str());
355 curl_easy_setopt(h_curl, CURLOPT_HTTPHEADER, auth_header);
356
357 std::string reply;
358 curl_easy_setopt(h_curl, CURLOPT_NOPROGRESS, 1L);
359 curl_easy_setopt(h_curl, CURLOPT_USERAGENT, "cvmfs/" VERSION);
360 curl_easy_setopt(h_curl, CURLOPT_MAXREDIRS, 50L);
361 curl_easy_setopt(h_curl, CURLOPT_CUSTOMREQUEST, "POST");
362 curl_easy_setopt(h_curl, CURLOPT_URL, (api_url_ + "/payloads").c_str());
363 curl_easy_setopt(h_curl, CURLOPT_POSTFIELDS, NULL);
364 curl_easy_setopt(h_curl, CURLOPT_POSTFIELDSIZE_LARGE,
365 static_cast<curl_off_t>(payload_size));
366 curl_easy_setopt(h_curl, CURLOPT_READDATA, &payload);
367 curl_easy_setopt(h_curl, CURLOPT_READFUNCTION, SendCB);
368 curl_easy_setopt(h_curl, CURLOPT_WRITEFUNCTION, RecvCB);
369 curl_easy_setopt(h_curl, CURLOPT_WRITEDATA, &reply);
370
371 // Perform the Curl POST request
372 CURLcode ret = curl_easy_perform(h_curl);
373 if (ret) {
374 LogCvmfs(kLogUploadGateway, kLogStderr,
375 "SessionContext::DoUpload - curl_easy_perform failed: %d",
376 ret);
377 }
378
379 UniquePtr<JsonDocument> reply_json(JsonDocument::Create(reply));
380 const JSON *reply_status =
381 JsonDocument::SearchInObject(reply_json->root(), "status", JSON_STRING);
382 const bool ok = (reply_status != NULL &&
383 std::string(reply_status->string_value) == "ok");
384 if (!ok) {
385 LogCvmfs(kLogUploadGateway, kLogStderr,
386 "SessionContext::DoUpload - error reply: %s",
387 reply.c_str());
388 }
389
390 curl_easy_cleanup(h_curl);
391 h_curl = NULL;
392
393 return ok && !ret;
394 }
395
396 8 void* SessionContext::UploadLoop(void* data) {
397 8 SessionContext* ctx = reinterpret_cast<SessionContext*>(data);
398 UploadJob *job;
399
400 while (true) {
401 39 job = ctx->upload_jobs_->PopBack();
402
2/2
✓ Branch 0 taken 8 times.
✓ Branch 1 taken 31 times.
39 if (job == &terminator_)
403 8 return NULL;
404
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 31 times.
31 if (!ctx->DoUpload(job)) {
405 PANIC(kLogStderr,
406 "SessionContext: could not submit payload. Aborting.");
407 }
408 31 job->result->Set(true);
409
1/2
✓ Branch 0 taken 31 times.
✗ Branch 1 not taken.
31 delete job->pack;
410
1/2
✓ Branch 0 taken 31 times.
✗ Branch 1 not taken.
31 delete job;
411 }
412 }
413
414 SessionContext::UploadJob SessionContext::terminator_;
415
416 } // namespace upload
417