GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/session_context.cc
Date: 2025-06-22 02:36:02
Exec Total Coverage
Lines: 139 224 62.1%
Branches: 80 300 26.7%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "session_context.h"
6
7 #include <algorithm>
8 #include <limits>
9
10 #include "curl/curl.h"
11 #include "gateway_util.h"
12 #include "json_document.h"
13 #include "json_document_write.h"
14 #include "swissknife_lease_curl.h"
15 #include "util/exception.h"
16 #include "util/pointer.h"
17 #include "util/string.h"
18
19 namespace {
20 // Maximum number of jobs during a session. No limit, for practical
21 // purposes. Note that we use uint32_t so that the Tube code works
22 // correctly with this limit on 32bit systems.
23 const uint32_t kMaxNumJobs = std::numeric_limits<uint32_t>::max();
24 } // namespace
25
26 namespace upload {
27
28 90 size_t SendCB(void *ptr, size_t size, size_t nmemb, void *userp) {
29 90 CurlSendPayload *payload = static_cast<CurlSendPayload *>(userp);
30
31 90 const size_t max_chunk_size = size * nmemb;
32
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 90 times.
90 if (max_chunk_size < 1) {
33 return 0;
34 }
35
36 90 size_t current_chunk_size = 0;
37
2/2
✓ Branch 0 taken 120 times.
✓ Branch 1 taken 60 times.
180 while (current_chunk_size < max_chunk_size) {
38
2/2
✓ Branch 1 taken 15 times.
✓ Branch 2 taken 105 times.
120 if (payload->index < payload->json_message->size()) {
39 // Can add a chunk from the JSON message
40 15 const size_t read_size = std::min(
41 30 max_chunk_size - current_chunk_size,
42 15 payload->json_message->size() - payload->index);
43 15 current_chunk_size += read_size;
44 15 std::memcpy(ptr, payload->json_message->data() + payload->index,
45 read_size);
46 15 payload->index += read_size;
47 } else {
48 // Can add a chunk from the payload
49 105 const size_t max_read_size = max_chunk_size - current_chunk_size;
50 105 const unsigned nbytes = payload->pack_serializer->ProduceNext(
51 max_read_size,
52 static_cast<unsigned char *>(ptr) + current_chunk_size);
53 105 current_chunk_size += nbytes;
54
55
2/2
✓ Branch 0 taken 30 times.
✓ Branch 1 taken 75 times.
105 if (!nbytes) {
56 30 break;
57 }
58 }
59 }
60
61 90 return current_chunk_size;
62 }
63
64 size_t RecvCB(void *buffer, size_t size, size_t nmemb, void *userp) {
65 std::string *my_buffer = static_cast<std::string *>(userp);
66
67 if (size * nmemb < 1) {
68 return 0;
69 }
70
71 *my_buffer = static_cast<char *>(buffer);
72
73 return my_buffer->size();
74 }
75
76 146 SessionContextBase::SessionContextBase()
77 146 : upload_results_(kMaxNumJobs)
78 146 , api_url_()
79 146 , session_token_()
80 146 , key_id_()
81 146 , secret_()
82 146 , max_pack_size_(ObjectPack::kDefaultLimit)
83 146 , active_handles_()
84 146 , current_pack_(NULL)
85 146 , current_pack_mtx_()
86 146 , bytes_committed_(0)
87 146 , bytes_dispatched_(0)
88 146 , initialized_(false) { }
89
90 292 SessionContextBase::~SessionContextBase() { }
91
92 146 bool SessionContextBase::Initialize(const std::string &api_url,
93 const std::string &session_token,
94 const std::string &key_id,
95 const std::string &secret,
96 uint64_t max_pack_size,
97 uint64_t max_queue_size) {
98 146 bool ret = true;
99
100 // Initialize session context lock
101 pthread_mutexattr_t attr;
102 146 if (pthread_mutexattr_init(&attr)
103
1/2
✓ Branch 1 taken 146 times.
✗ Branch 2 not taken.
146 || pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE)
104
1/2
✓ Branch 1 taken 146 times.
✗ Branch 2 not taken.
146 || pthread_mutex_init(&current_pack_mtx_, &attr)
105
3/6
✓ Branch 0 taken 146 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 146 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 146 times.
292 || pthread_mutexattr_destroy(&attr)) {
106 LogCvmfs(kLogUploadGateway, kLogStderr,
107 "Could not initialize SessionContext lock.");
108 return false;
109 }
110
111 // Set upstream URL and session token
112
1/2
✓ Branch 1 taken 146 times.
✗ Branch 2 not taken.
146 api_url_ = api_url;
113
1/2
✓ Branch 1 taken 146 times.
✗ Branch 2 not taken.
146 session_token_ = session_token;
114
1/2
✓ Branch 1 taken 146 times.
✗ Branch 2 not taken.
146 key_id_ = key_id;
115
1/2
✓ Branch 1 taken 146 times.
✗ Branch 2 not taken.
146 secret_ = secret;
116 146 max_pack_size_ = max_pack_size;
117
118 146 bytes_committed_ = 0u;
119 146 bytes_dispatched_ = 0u;
120
121
2/4
✓ Branch 1 taken 146 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 146 times.
146 assert(upload_results_.IsEmpty());
122
123 // Ensure that there are not open object packs
124
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 146 times.
146 if (current_pack_) {
125 LogCvmfs(
126 kLogUploadGateway, kLogStderr,
127 "Could not initialize SessionContext - Existing open object packs.");
128 ret = false;
129 }
130
131
3/6
✓ Branch 1 taken 146 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 146 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 146 times.
✗ Branch 6 not taken.
146 ret = InitializeDerived(max_queue_size) && ret;
132
133 146 initialized_ = true;
134
135 146 return ret;
136 }
137
138 146 bool SessionContextBase::Finalize(bool commit, const std::string &old_root_hash,
139 const std::string &new_root_hash,
140 const RepositoryTag &tag) {
141
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 146 times.
146 assert(active_handles_.empty());
142
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 146 times.
146 if (!initialized_) {
143 assert(!commit);
144 return true;
145 }
146
147 {
148 146 const MutexLockGuard lock(current_pack_mtx_);
149
150
5/6
✓ Branch 0 taken 60 times.
✓ Branch 1 taken 86 times.
✓ Branch 3 taken 60 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 60 times.
✓ Branch 6 taken 86 times.
146 if (current_pack_ && current_pack_->GetNoObjects() > 0) {
151
1/2
✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
60 Dispatch();
152 60 current_pack_ = NULL;
153 }
154 146 }
155
156 146 bool results = true;
157
2/2
✓ Branch 1 taken 465 times.
✓ Branch 2 taken 146 times.
611 while (!upload_results_.IsEmpty()) {
158 465 Future<bool> *future = upload_results_.PopBack();
159
2/4
✓ Branch 1 taken 465 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 465 times.
✗ Branch 4 not taken.
465 results = future->Get() && results;
160
1/2
✓ Branch 0 taken 465 times.
✗ Branch 1 not taken.
465 delete future;
161 }
162
163
2/2
✓ Branch 0 taken 105 times.
✓ Branch 1 taken 41 times.
146 if (commit) {
164
3/6
✓ Branch 1 taken 105 times.
✗ Branch 2 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 105 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 105 times.
105 if (old_root_hash.empty() || new_root_hash.empty()) {
165 return false;
166 }
167 105 const bool commit_result = Commit(old_root_hash, new_root_hash, tag);
168
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 105 times.
105 if (!commit_result) {
169 LogCvmfs(kLogUploadGateway, kLogStderr,
170 "SessionContext: could not commit session. Aborting.");
171 FinalizeDerived();
172 pthread_mutex_destroy(&current_pack_mtx_);
173 initialized_ = false;
174 return false;
175 }
176 }
177
178
2/4
✓ Branch 1 taken 146 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 146 times.
✗ Branch 4 not taken.
146 results &= FinalizeDerived() && (bytes_committed_ == bytes_dispatched_);
179
180 146 pthread_mutex_destroy(&current_pack_mtx_);
181
182 146 initialized_ = false;
183
184 146 return results;
185 }
186
187 780 ObjectPack::BucketHandle SessionContextBase::NewBucket() {
188 780 const MutexLockGuard lock(current_pack_mtx_);
189
2/2
✓ Branch 0 taken 240 times.
✓ Branch 1 taken 540 times.
780 if (!current_pack_) {
190
2/4
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 240 times.
✗ Branch 5 not taken.
240 current_pack_ = new ObjectPack(max_pack_size_);
191 }
192
1/2
✓ Branch 1 taken 780 times.
✗ Branch 2 not taken.
780 ObjectPack::BucketHandle hd = current_pack_->NewBucket();
193
1/2
✓ Branch 1 taken 780 times.
✗ Branch 2 not taken.
780 active_handles_.push_back(hd);
194 780 return hd;
195 780 }
196
197 1020 bool SessionContextBase::CommitBucket(const ObjectPack::BucketContentType type,
198 const shash::Any &id,
199 const ObjectPack::BucketHandle handle,
200 const std::string &name,
201 const bool force_dispatch) {
202 1020 const MutexLockGuard lock(current_pack_mtx_);
203
204
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1020 times.
1020 if (!current_pack_) {
205 LogCvmfs(kLogUploadGateway, kLogStderr,
206 "Error: Called SessionBaseContext::CommitBucket without an open "
207 "ObjectPack.");
208 return false;
209 }
210
211 1020 const uint64_t size0 = current_pack_->size();
212
1/2
✓ Branch 1 taken 1020 times.
✗ Branch 2 not taken.
1020 const bool committed = current_pack_->CommitBucket(type, id, handle, name);
213
214
2/2
✓ Branch 0 taken 780 times.
✓ Branch 1 taken 240 times.
1020 if (committed) { // Current pack is still not full
215
1/2
✓ Branch 3 taken 780 times.
✗ Branch 4 not taken.
1560 active_handles_.erase(
216
1/2
✓ Branch 3 taken 780 times.
✗ Branch 4 not taken.
780 std::remove(active_handles_.begin(), active_handles_.end(), handle),
217 780 active_handles_.end());
218 780 const uint64_t size1 = current_pack_->size();
219 780 bytes_committed_ += size1 - size0;
220
2/2
✓ Branch 0 taken 180 times.
✓ Branch 1 taken 600 times.
780 if (force_dispatch) {
221
1/2
✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
180 Dispatch();
222 180 current_pack_ = NULL;
223 }
224 } else { // Current pack is full and can be dispatched
225 240 uint64_t new_size = 0;
226
2/2
✓ Branch 0 taken 15 times.
✓ Branch 1 taken 225 times.
240 if (handle->capacity > max_pack_size_) {
227 15 new_size = handle->capacity + 1;
228 } else {
229 225 new_size = max_pack_size_;
230 }
231
2/4
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 240 times.
✗ Branch 5 not taken.
240 ObjectPack *new_pack = new ObjectPack(new_size);
232
2/2
✓ Branch 1 taken 330 times.
✓ Branch 2 taken 240 times.
570 for (size_t i = 0u; i < active_handles_.size(); ++i) {
233
1/2
✓ Branch 2 taken 330 times.
✗ Branch 3 not taken.
330 current_pack_->TransferBucket(active_handles_[i], new_pack);
234 }
235
236
2/2
✓ Branch 1 taken 225 times.
✓ Branch 2 taken 15 times.
240 if (current_pack_->GetNoObjects() > 0) {
237
1/2
✓ Branch 1 taken 225 times.
✗ Branch 2 not taken.
225 Dispatch();
238 }
239 240 current_pack_ = new_pack;
240
241
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 CommitBucket(type, id, handle, name, false);
242 }
243
244 1020 return true;
245 1020 }
246
247 465 void SessionContextBase::Dispatch() {
248 465 const MutexLockGuard lock(current_pack_mtx_);
249
250
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 465 times.
465 if (!current_pack_) {
251 return;
252 }
253
254 465 bytes_dispatched_ += current_pack_->size();
255
2/4
✓ Branch 1 taken 465 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 465 times.
✗ Branch 5 not taken.
465 upload_results_.EnqueueFront(DispatchObjectPack(current_pack_));
256
1/2
✓ Branch 1 taken 465 times.
✗ Branch 2 not taken.
465 }
257
258 146 SessionContext::SessionContext()
259
1/2
✓ Branch 2 taken 146 times.
✗ Branch 3 not taken.
146 : SessionContextBase(), upload_jobs_(), worker_() { }
260
261 146 bool SessionContext::InitializeDerived(uint64_t max_queue_size) {
262 // Start worker thread
263
1/2
✓ Branch 2 taken 146 times.
✗ Branch 3 not taken.
146 upload_jobs_ = new Tube<UploadJob>(max_queue_size);
264
265 146 const int retval = pthread_create(&worker_, NULL, UploadLoop,
266 reinterpret_cast<void *>(this));
267
268 146 return !retval;
269 }
270
271 146 bool SessionContext::FinalizeDerived() {
272 // Note: in FinalizedDerived, we know that the worker is running. The
273 // SessionContext is called only from GatewayUploader::FinalizeSession(),
274 // which in turn is from Spooler::FinalizeSession(). The Spooler ensures
275 // that GatewayUploader::Initialize() is called on construction.
276 //
277 // TODO(jblomer): Refactor SessionContext (and Uploader*) classes to
278 // use a factory method for construction.
279 //
280 146 upload_jobs_->EnqueueFront(&terminator_);
281 146 pthread_join(worker_, NULL);
282
283 146 return true;
284 }
285
286 bool SessionContext::Commit(const std::string &old_root_hash,
287 const std::string &new_root_hash,
288 const RepositoryTag &tag) {
289 JsonStringGenerator request_input;
290 request_input.Add("old_root_hash", old_root_hash);
291 request_input.Add("new_root_hash", new_root_hash);
292 request_input.Add("tag_name", tag.name());
293 // Channels are no longer supported: send 0 (i.e. kChannelTrunk) for
294 // backwards compatibility with existing gateways
295 //
296 request_input.Add("tag_channel", 0);
297 request_input.Add("tag_description", tag.description());
298 const std::string request = request_input.GenerateString();
299 CurlBuffer buffer;
300 return MakeEndRequest("POST", key_id_, secret_, session_token_, api_url_,
301 request, &buffer);
302 }
303
304 465 Future<bool> *SessionContext::DispatchObjectPack(ObjectPack *pack) {
305 465 UploadJob *job = new UploadJob;
306 465 Future<bool> *result = new Future<bool>();
307 465 job->pack = pack;
308 465 job->result = result;
309 465 upload_jobs_->EnqueueFront(job);
310 465 return result;
311 }
312
313 bool SessionContext::DoUpload(const SessionContext::UploadJob *job) {
314 // Set up the object pack serializer
315 ObjectPackProducer serializer(job->pack);
316
317 shash::Any payload_digest(shash::kSha1);
318 serializer.GetDigest(&payload_digest);
319 const std::string json_msg = "{\"session_token\" : \"" + session_token_
320 + "\", \"payload_digest\" : \""
321 + payload_digest.ToString(false)
322 + "\", \"header_size\" : \""
323 + StringifyInt(serializer.GetHeaderSize())
324 + "\", \"api_version\" : \""
325 + StringifyInt(gateway::APIVersion()) + "\"}";
326
327 // Compute HMAC
328 shash::Any hmac(shash::kSha1);
329 shash::HmacString(secret_, json_msg, &hmac);
330
331 CurlSendPayload payload;
332 payload.json_message = &json_msg;
333 payload.pack_serializer = &serializer;
334 payload.index = 0;
335
336 const size_t payload_size = json_msg.size() + serializer.GetHeaderSize()
337 + job->pack->size();
338
339 // Prepare the Curl POST request
340 CURL *h_curl = curl_easy_init();
341
342 if (!h_curl) {
343 return false;
344 }
345
346 // Set HTTP headers (Authorization and Message-Size)
347 std::string header_str = std::string("Authorization: ") + key_id_ + " "
348 + Base64(hmac.ToString(false));
349 struct curl_slist *auth_header = NULL;
350 auth_header = curl_slist_append(auth_header, header_str.c_str());
351 header_str = std::string("Message-Size: ") + StringifyInt(json_msg.size());
352 auth_header = curl_slist_append(auth_header, header_str.c_str());
353 curl_easy_setopt(h_curl, CURLOPT_HTTPHEADER, auth_header);
354
355 std::string reply;
356 curl_easy_setopt(h_curl, CURLOPT_NOPROGRESS, 1L);
357 curl_easy_setopt(h_curl, CURLOPT_USERAGENT, "cvmfs/" CVMFS_VERSION);
358 curl_easy_setopt(h_curl, CURLOPT_MAXREDIRS, 50L);
359 curl_easy_setopt(h_curl, CURLOPT_CUSTOMREQUEST, "POST");
360 curl_easy_setopt(h_curl, CURLOPT_URL, (api_url_ + "/payloads").c_str());
361 curl_easy_setopt(h_curl, CURLOPT_POSTFIELDS, NULL);
362 curl_easy_setopt(h_curl, CURLOPT_POSTFIELDSIZE_LARGE,
363 static_cast<curl_off_t>(payload_size));
364 curl_easy_setopt(h_curl, CURLOPT_READDATA, &payload);
365 curl_easy_setopt(h_curl, CURLOPT_READFUNCTION, SendCB);
366 curl_easy_setopt(h_curl, CURLOPT_WRITEFUNCTION, RecvCB);
367 curl_easy_setopt(h_curl, CURLOPT_WRITEDATA, &reply);
368
369 // Perform the Curl POST request
370 const CURLcode ret = curl_easy_perform(h_curl);
371 if (ret) {
372 LogCvmfs(kLogUploadGateway, kLogStderr,
373 "SessionContext::DoUpload - curl_easy_perform failed: %d", ret);
374 }
375
376 const UniquePtr<JsonDocument> reply_json(JsonDocument::Create(reply));
377 const JSON *reply_status = JsonDocument::SearchInObject(
378 reply_json->root(), "status", JSON_STRING);
379 const bool ok = (reply_status != NULL
380 && std::string(reply_status->string_value) == "ok");
381 if (!ok) {
382 LogCvmfs(kLogUploadGateway, kLogStderr,
383 "SessionContext::DoUpload - error reply: %s", reply.c_str());
384 }
385
386 curl_easy_cleanup(h_curl);
387 h_curl = NULL;
388
389 return ok && !ret;
390 }
391
392 146 void *SessionContext::UploadLoop(void *data) {
393 146 SessionContext *ctx = reinterpret_cast<SessionContext *>(data);
394 UploadJob *job;
395
396 while (true) {
397 611 job = ctx->upload_jobs_->PopBack();
398
2/2
✓ Branch 0 taken 146 times.
✓ Branch 1 taken 465 times.
611 if (job == &terminator_)
399 146 return NULL;
400
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 465 times.
465 if (!ctx->DoUpload(job)) {
401 PANIC(kLogStderr, "SessionContext: could not submit payload. Aborting.");
402 }
403 465 job->result->Set(true);
404
1/2
✓ Branch 0 taken 465 times.
✗ Branch 1 not taken.
465 delete job->pack;
405
1/2
✓ Branch 0 taken 465 times.
✗ Branch 1 not taken.
465 delete job;
406 }
407 }
408
409 SessionContext::UploadJob SessionContext::terminator_;
410
411 } // namespace upload
412