GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/session_context.cc
Date: 2025-09-14 02:35:40
Exec Total Coverage
Lines: 139 224 62.1%
Branches: 80 300 26.7%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "session_context.h"
6
7 #include <algorithm>
8 #include <limits>
9
10 #include "curl/curl.h"
11 #include "gateway_util.h"
12 #include "json_document.h"
13 #include "json_document_write.h"
14 #include "swissknife_lease_curl.h"
15 #include "util/exception.h"
16 #include "util/pointer.h"
17 #include "util/string.h"
18
19 namespace {
20 // Maximum number of jobs during a session. No limit, for practical
21 // purposes. Note that we use uint32_t so that the Tube code works
22 // correctly with this limit on 32bit systems.
23 const uint32_t kMaxNumJobs = std::numeric_limits<uint32_t>::max();
24 } // namespace
25
26 namespace upload {
27
28 222 size_t SendCB(void *ptr, size_t size, size_t nmemb, void *userp) {
29 222 CurlSendPayload *payload = static_cast<CurlSendPayload *>(userp);
30
31 222 const size_t max_chunk_size = size * nmemb;
32
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 222 times.
222 if (max_chunk_size < 1) {
33 return 0;
34 }
35
36 222 size_t current_chunk_size = 0;
37
2/2
✓ Branch 0 taken 296 times.
✓ Branch 1 taken 148 times.
444 while (current_chunk_size < max_chunk_size) {
38
2/2
✓ Branch 1 taken 37 times.
✓ Branch 2 taken 259 times.
296 if (payload->index < payload->json_message->size()) {
39 // Can add a chunk from the JSON message
40 37 const size_t read_size = std::min(
41 74 max_chunk_size - current_chunk_size,
42 37 payload->json_message->size() - payload->index);
43 37 current_chunk_size += read_size;
44 37 std::memcpy(ptr, payload->json_message->data() + payload->index,
45 read_size);
46 37 payload->index += read_size;
47 } else {
48 // Can add a chunk from the payload
49 259 const size_t max_read_size = max_chunk_size - current_chunk_size;
50 259 const unsigned nbytes = payload->pack_serializer->ProduceNext(
51 max_read_size,
52 static_cast<unsigned char *>(ptr) + current_chunk_size);
53 259 current_chunk_size += nbytes;
54
55
2/2
✓ Branch 0 taken 74 times.
✓ Branch 1 taken 185 times.
259 if (!nbytes) {
56 74 break;
57 }
58 }
59 }
60
61 222 return current_chunk_size;
62 }
63
64 size_t RecvCB(void *buffer, size_t size, size_t nmemb, void *userp) {
65 std::string *my_buffer = static_cast<std::string *>(userp);
66
67 if (size * nmemb < 1) {
68 return 0;
69 }
70
71 *my_buffer = static_cast<char *>(buffer);
72
73 return my_buffer->size();
74 }
75
76 294 SessionContextBase::SessionContextBase()
77 294 : upload_results_(kMaxNumJobs)
78 294 , api_url_()
79 294 , session_token_()
80 294 , key_id_()
81 294 , secret_()
82 294 , max_pack_size_(ObjectPack::kDefaultLimit)
83 294 , active_handles_()
84 294 , current_pack_(NULL)
85 294 , current_pack_mtx_()
86 294 , bytes_committed_(0)
87 294 , bytes_dispatched_(0)
88 294 , initialized_(false) { }
89
90 588 SessionContextBase::~SessionContextBase() { }
91
92 294 bool SessionContextBase::Initialize(const std::string &api_url,
93 const std::string &session_token,
94 const std::string &key_id,
95 const std::string &secret,
96 uint64_t max_pack_size,
97 uint64_t max_queue_size) {
98 294 bool ret = true;
99
100 // Initialize session context lock
101 pthread_mutexattr_t attr;
102 294 if (pthread_mutexattr_init(&attr)
103
1/2
✓ Branch 1 taken 294 times.
✗ Branch 2 not taken.
294 || pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE)
104
1/2
✓ Branch 1 taken 294 times.
✗ Branch 2 not taken.
294 || pthread_mutex_init(&current_pack_mtx_, &attr)
105
3/6
✓ Branch 0 taken 294 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 294 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 294 times.
588 || pthread_mutexattr_destroy(&attr)) {
106 LogCvmfs(kLogUploadGateway, kLogStderr,
107 "Could not initialize SessionContext lock.");
108 return false;
109 }
110
111 // Set upstream URL and session token
112
1/2
✓ Branch 1 taken 294 times.
✗ Branch 2 not taken.
294 api_url_ = api_url;
113
1/2
✓ Branch 1 taken 294 times.
✗ Branch 2 not taken.
294 session_token_ = session_token;
114
1/2
✓ Branch 1 taken 294 times.
✗ Branch 2 not taken.
294 key_id_ = key_id;
115
1/2
✓ Branch 1 taken 294 times.
✗ Branch 2 not taken.
294 secret_ = secret;
116 294 max_pack_size_ = max_pack_size;
117
118 294 bytes_committed_ = 0u;
119 294 bytes_dispatched_ = 0u;
120
121
2/4
✓ Branch 1 taken 294 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 294 times.
294 assert(upload_results_.IsEmpty());
122
123 // Ensure that there are not open object packs
124
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 294 times.
294 if (current_pack_) {
125 LogCvmfs(
126 kLogUploadGateway, kLogStderr,
127 "Could not initialize SessionContext - Existing open object packs.");
128 ret = false;
129 }
130
131
3/6
✓ Branch 1 taken 294 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 294 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 294 times.
✗ Branch 6 not taken.
294 ret = InitializeDerived(max_queue_size) && ret;
132
133 294 initialized_ = true;
134
135 294 return ret;
136 }
137
138 294 bool SessionContextBase::Finalize(bool commit, const std::string &old_root_hash,
139 const std::string &new_root_hash,
140 const RepositoryTag &tag) {
141
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 294 times.
294 assert(active_handles_.empty());
142
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 294 times.
294 if (!initialized_) {
143 assert(!commit);
144 return true;
145 }
146
147 {
148 294 const MutexLockGuard lock(current_pack_mtx_);
149
150
5/6
✓ Branch 0 taken 148 times.
✓ Branch 1 taken 146 times.
✓ Branch 3 taken 148 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 148 times.
✓ Branch 6 taken 146 times.
294 if (current_pack_ && current_pack_->GetNoObjects() > 0) {
151
1/2
✓ Branch 1 taken 148 times.
✗ Branch 2 not taken.
148 Dispatch();
152 148 current_pack_ = NULL;
153 }
154 294 }
155
156 294 bool results = true;
157
2/2
✓ Branch 1 taken 1147 times.
✓ Branch 2 taken 294 times.
1441 while (!upload_results_.IsEmpty()) {
158 1147 Future<bool> *future = upload_results_.PopBack();
159
2/4
✓ Branch 1 taken 1147 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1147 times.
✗ Branch 4 not taken.
1147 results = future->Get() && results;
160
1/2
✓ Branch 0 taken 1147 times.
✗ Branch 1 not taken.
1147 delete future;
161 }
162
163
2/2
✓ Branch 0 taken 259 times.
✓ Branch 1 taken 35 times.
294 if (commit) {
164
3/6
✓ Branch 1 taken 259 times.
✗ Branch 2 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 259 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 259 times.
259 if (old_root_hash.empty() || new_root_hash.empty()) {
165 return false;
166 }
167 259 const bool commit_result = Commit(old_root_hash, new_root_hash, tag);
168
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 259 times.
259 if (!commit_result) {
169 LogCvmfs(kLogUploadGateway, kLogStderr,
170 "SessionContext: could not commit session. Aborting.");
171 FinalizeDerived();
172 pthread_mutex_destroy(&current_pack_mtx_);
173 initialized_ = false;
174 return false;
175 }
176 }
177
178
2/4
✓ Branch 1 taken 294 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 294 times.
✗ Branch 4 not taken.
294 results &= FinalizeDerived() && (bytes_committed_ == bytes_dispatched_);
179
180 294 pthread_mutex_destroy(&current_pack_mtx_);
181
182 294 initialized_ = false;
183
184 294 return results;
185 }
186
187 1924 ObjectPack::BucketHandle SessionContextBase::NewBucket() {
188 1924 const MutexLockGuard lock(current_pack_mtx_);
189
2/2
✓ Branch 0 taken 592 times.
✓ Branch 1 taken 1332 times.
1924 if (!current_pack_) {
190
2/4
✓ Branch 1 taken 592 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 592 times.
✗ Branch 5 not taken.
592 current_pack_ = new ObjectPack(max_pack_size_);
191 }
192
1/2
✓ Branch 1 taken 1924 times.
✗ Branch 2 not taken.
1924 ObjectPack::BucketHandle hd = current_pack_->NewBucket();
193
1/2
✓ Branch 1 taken 1924 times.
✗ Branch 2 not taken.
1924 active_handles_.push_back(hd);
194 1924 return hd;
195 1924 }
196
197 2516 bool SessionContextBase::CommitBucket(const ObjectPack::BucketContentType type,
198 const shash::Any &id,
199 const ObjectPack::BucketHandle handle,
200 const std::string &name,
201 const bool force_dispatch) {
202 2516 const MutexLockGuard lock(current_pack_mtx_);
203
204
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2516 times.
2516 if (!current_pack_) {
205 LogCvmfs(kLogUploadGateway, kLogStderr,
206 "Error: Called SessionBaseContext::CommitBucket without an open "
207 "ObjectPack.");
208 return false;
209 }
210
211 2516 const uint64_t size0 = current_pack_->size();
212
1/2
✓ Branch 1 taken 2516 times.
✗ Branch 2 not taken.
2516 const bool committed = current_pack_->CommitBucket(type, id, handle, name);
213
214
2/2
✓ Branch 0 taken 1924 times.
✓ Branch 1 taken 592 times.
2516 if (committed) { // Current pack is still not full
215
1/2
✓ Branch 3 taken 1924 times.
✗ Branch 4 not taken.
3848 active_handles_.erase(
216
1/2
✓ Branch 3 taken 1924 times.
✗ Branch 4 not taken.
1924 std::remove(active_handles_.begin(), active_handles_.end(), handle),
217 1924 active_handles_.end());
218 1924 const uint64_t size1 = current_pack_->size();
219 1924 bytes_committed_ += size1 - size0;
220
2/2
✓ Branch 0 taken 444 times.
✓ Branch 1 taken 1480 times.
1924 if (force_dispatch) {
221
1/2
✓ Branch 1 taken 444 times.
✗ Branch 2 not taken.
444 Dispatch();
222 444 current_pack_ = NULL;
223 }
224 } else { // Current pack is full and can be dispatched
225 592 uint64_t new_size = 0;
226
2/2
✓ Branch 0 taken 37 times.
✓ Branch 1 taken 555 times.
592 if (handle->capacity > max_pack_size_) {
227 37 new_size = handle->capacity + 1;
228 } else {
229 555 new_size = max_pack_size_;
230 }
231
2/4
✓ Branch 1 taken 592 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 592 times.
✗ Branch 5 not taken.
592 ObjectPack *new_pack = new ObjectPack(new_size);
232
2/2
✓ Branch 1 taken 814 times.
✓ Branch 2 taken 592 times.
1406 for (size_t i = 0u; i < active_handles_.size(); ++i) {
233
1/2
✓ Branch 2 taken 814 times.
✗ Branch 3 not taken.
814 current_pack_->TransferBucket(active_handles_[i], new_pack);
234 }
235
236
2/2
✓ Branch 1 taken 555 times.
✓ Branch 2 taken 37 times.
592 if (current_pack_->GetNoObjects() > 0) {
237
1/2
✓ Branch 1 taken 555 times.
✗ Branch 2 not taken.
555 Dispatch();
238 }
239 592 current_pack_ = new_pack;
240
241
1/2
✓ Branch 1 taken 592 times.
✗ Branch 2 not taken.
592 CommitBucket(type, id, handle, name, false);
242 }
243
244 2516 return true;
245 2516 }
246
247 1147 void SessionContextBase::Dispatch() {
248 1147 const MutexLockGuard lock(current_pack_mtx_);
249
250
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1147 times.
1147 if (!current_pack_) {
251 return;
252 }
253
254 1147 bytes_dispatched_ += current_pack_->size();
255
2/4
✓ Branch 1 taken 1147 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1147 times.
✗ Branch 5 not taken.
1147 upload_results_.EnqueueFront(DispatchObjectPack(current_pack_));
256
1/2
✓ Branch 1 taken 1147 times.
✗ Branch 2 not taken.
1147 }
257
258 294 SessionContext::SessionContext()
259
1/2
✓ Branch 2 taken 294 times.
✗ Branch 3 not taken.
294 : SessionContextBase(), upload_jobs_(), worker_() { }
260
261 294 bool SessionContext::InitializeDerived(uint64_t max_queue_size) {
262 // Start worker thread
263
1/2
✓ Branch 2 taken 294 times.
✗ Branch 3 not taken.
294 upload_jobs_ = new Tube<UploadJob>(max_queue_size);
264
265 294 const int retval = pthread_create(&worker_, NULL, UploadLoop,
266 reinterpret_cast<void *>(this));
267
268 294 return !retval;
269 }
270
271 294 bool SessionContext::FinalizeDerived() {
272 // Note: in FinalizedDerived, we know that the worker is running. The
273 // SessionContext is called only from GatewayUploader::FinalizeSession(),
274 // which in turn is from Spooler::FinalizeSession(). The Spooler ensures
275 // that GatewayUploader::Initialize() is called on construction.
276 //
277 // TODO(jblomer): Refactor SessionContext (and Uploader*) classes to
278 // use a factory method for construction.
279 //
280 294 upload_jobs_->EnqueueFront(&terminator_);
281 294 pthread_join(worker_, NULL);
282
283 294 return true;
284 }
285
286 bool SessionContext::Commit(const std::string &old_root_hash,
287 const std::string &new_root_hash,
288 const RepositoryTag &tag) {
289 JsonStringGenerator request_input;
290 request_input.Add("old_root_hash", old_root_hash);
291 request_input.Add("new_root_hash", new_root_hash);
292 request_input.Add("tag_name", tag.name());
293 // Channels are no longer supported: send 0 (i.e. kChannelTrunk) for
294 // backwards compatibility with existing gateways
295 //
296 request_input.Add("tag_channel", 0);
297 request_input.Add("tag_description", tag.description());
298 const std::string request = request_input.GenerateString();
299 CurlBuffer buffer;
300 return MakeEndRequest("POST", key_id_, secret_, session_token_, api_url_,
301 request, &buffer);
302 }
303
304 1147 Future<bool> *SessionContext::DispatchObjectPack(ObjectPack *pack) {
305 1147 UploadJob *job = new UploadJob;
306 1147 Future<bool> *result = new Future<bool>();
307 1147 job->pack = pack;
308 1147 job->result = result;
309 1147 upload_jobs_->EnqueueFront(job);
310 1147 return result;
311 }
312
313 bool SessionContext::DoUpload(const SessionContext::UploadJob *job) {
314 // Set up the object pack serializer
315 ObjectPackProducer serializer(job->pack);
316
317 shash::Any payload_digest(shash::kSha1);
318 serializer.GetDigest(&payload_digest);
319 const std::string json_msg = "{\"session_token\" : \"" + session_token_
320 + "\", \"payload_digest\" : \""
321 + payload_digest.ToString(false)
322 + "\", \"header_size\" : \""
323 + StringifyInt(serializer.GetHeaderSize())
324 + "\", \"api_version\" : \""
325 + StringifyInt(gateway::APIVersion()) + "\"}";
326
327 // Compute HMAC
328 shash::Any hmac(shash::kSha1);
329 shash::HmacString(secret_, json_msg, &hmac);
330
331 CurlSendPayload payload;
332 payload.json_message = &json_msg;
333 payload.pack_serializer = &serializer;
334 payload.index = 0;
335
336 const size_t payload_size = json_msg.size() + serializer.GetHeaderSize()
337 + job->pack->size();
338
339 // Prepare the Curl POST request
340 CURL *h_curl = curl_easy_init();
341
342 if (!h_curl) {
343 return false;
344 }
345
346 // Set HTTP headers (Authorization and Message-Size)
347 std::string header_str = std::string("Authorization: ") + key_id_ + " "
348 + Base64(hmac.ToString(false));
349 struct curl_slist *auth_header = NULL;
350 auth_header = curl_slist_append(auth_header, header_str.c_str());
351 header_str = std::string("Message-Size: ") + StringifyInt(json_msg.size());
352 auth_header = curl_slist_append(auth_header, header_str.c_str());
353 curl_easy_setopt(h_curl, CURLOPT_HTTPHEADER, auth_header);
354
355 std::string reply;
356 curl_easy_setopt(h_curl, CURLOPT_NOPROGRESS, 1L);
357 curl_easy_setopt(h_curl, CURLOPT_USERAGENT, "cvmfs/" CVMFS_VERSION);
358 curl_easy_setopt(h_curl, CURLOPT_MAXREDIRS, 50L);
359 curl_easy_setopt(h_curl, CURLOPT_CUSTOMREQUEST, "POST");
360 curl_easy_setopt(h_curl, CURLOPT_URL, (api_url_ + "/payloads").c_str());
361 curl_easy_setopt(h_curl, CURLOPT_POSTFIELDS, NULL);
362 curl_easy_setopt(h_curl, CURLOPT_POSTFIELDSIZE_LARGE,
363 static_cast<curl_off_t>(payload_size));
364 curl_easy_setopt(h_curl, CURLOPT_READDATA, &payload);
365 curl_easy_setopt(h_curl, CURLOPT_READFUNCTION, SendCB);
366 curl_easy_setopt(h_curl, CURLOPT_WRITEFUNCTION, RecvCB);
367 curl_easy_setopt(h_curl, CURLOPT_WRITEDATA, &reply);
368
369 // Perform the Curl POST request
370 const CURLcode ret = curl_easy_perform(h_curl);
371 if (ret) {
372 LogCvmfs(kLogUploadGateway, kLogStderr,
373 "SessionContext::DoUpload - curl_easy_perform failed: %d", ret);
374 }
375
376 const UniquePtr<JsonDocument> reply_json(JsonDocument::Create(reply));
377 const JSON *reply_status = JsonDocument::SearchInObject(
378 reply_json->root(), "status", JSON_STRING);
379 const bool ok = (reply_status != NULL
380 && std::string(reply_status->string_value) == "ok");
381 if (!ok) {
382 LogCvmfs(kLogUploadGateway, kLogStderr,
383 "SessionContext::DoUpload - error reply: %s", reply.c_str());
384 }
385
386 curl_easy_cleanup(h_curl);
387 h_curl = NULL;
388
389 return ok && !ret;
390 }
391
392 294 void *SessionContext::UploadLoop(void *data) {
393 294 SessionContext *ctx = reinterpret_cast<SessionContext *>(data);
394 UploadJob *job;
395
396 while (true) {
397 1441 job = ctx->upload_jobs_->PopBack();
398
2/2
✓ Branch 0 taken 294 times.
✓ Branch 1 taken 1147 times.
1441 if (job == &terminator_)
399 294 return NULL;
400
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1147 times.
1147 if (!ctx->DoUpload(job)) {
401 PANIC(kLogStderr, "SessionContext: could not submit payload. Aborting.");
402 }
403 1147 job->result->Set(true);
404
1/2
✓ Branch 0 taken 1147 times.
✗ Branch 1 not taken.
1147 delete job->pack;
405
1/2
✓ Branch 0 taken 1147 times.
✗ Branch 1 not taken.
1147 delete job;
406 }
407 }
408
409 SessionContext::UploadJob SessionContext::terminator_;
410
411 } // namespace upload
412