GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/session_context.cc
Date: 2026-06-28 02:36:10
Exec Total Coverage
Lines: 139 229 60.7%
Branches: 80 313 25.6%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "session_context.h"
6
7 #include <algorithm>
8 #include <limits>
9
10 #include "curl/curl.h"
11 #include "gateway_util.h"
12 #include "json_document.h"
13 #include "json_document_write.h"
14 #include "swissknife_lease_curl.h"
15 #include "util/exception.h"
16 #include "util/pointer.h"
17 #include "util/string.h"
18
19 namespace {
20 // Maximum number of jobs during a session. No limit, for practical
21 // purposes. Note that we use uint32_t so that the Tube code works
22 // correctly with this limit on 32bit systems.
23 const uint32_t kMaxNumJobs = std::numeric_limits<uint32_t>::max();
24 } // namespace
25
26 namespace upload {
27
28 246 size_t SendCB(void *ptr, size_t size, size_t nmemb, void *userp) {
29 246 CurlSendPayload *payload = static_cast<CurlSendPayload *>(userp);
30
31 246 const size_t max_chunk_size = size * nmemb;
32
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 246 times.
246 if (max_chunk_size < 1) {
33 return 0;
34 }
35
36 246 size_t current_chunk_size = 0;
37
2/2
✓ Branch 0 taken 328 times.
✓ Branch 1 taken 164 times.
492 while (current_chunk_size < max_chunk_size) {
38
2/2
✓ Branch 1 taken 41 times.
✓ Branch 2 taken 287 times.
328 if (payload->index < payload->json_message->size()) {
39 // Can add a chunk from the JSON message
40 41 const size_t read_size = std::min(
41 82 max_chunk_size - current_chunk_size,
42 41 payload->json_message->size() - payload->index);
43 41 current_chunk_size += read_size;
44 41 std::memcpy(ptr, payload->json_message->data() + payload->index,
45 read_size);
46 41 payload->index += read_size;
47 } else {
48 // Can add a chunk from the payload
49 287 const size_t max_read_size = max_chunk_size - current_chunk_size;
50 287 const unsigned nbytes = payload->pack_serializer->ProduceNext(
51 max_read_size,
52 static_cast<unsigned char *>(ptr) + current_chunk_size);
53 287 current_chunk_size += nbytes;
54
55
2/2
✓ Branch 0 taken 82 times.
✓ Branch 1 taken 205 times.
287 if (!nbytes) {
56 82 break;
57 }
58 }
59 }
60
61 246 return current_chunk_size;
62 }
63
64 size_t RecvCB(void *buffer, size_t size, size_t nmemb, void *userp) {
65 std::string *my_buffer = static_cast<std::string *>(userp);
66
67 if (size * nmemb < 1) {
68 return 0;
69 }
70
71 *my_buffer = static_cast<char *>(buffer);
72
73 return my_buffer->size();
74 }
75
76 298 SessionContextBase::SessionContextBase()
77 298 : upload_results_(kMaxNumJobs)
78 298 , api_url_()
79 298 , session_token_()
80 298 , key_id_()
81 298 , secret_()
82 298 , max_pack_size_(ObjectPack::kDefaultLimit)
83 298 , active_handles_()
84 298 , current_pack_(NULL)
85 298 , current_pack_mtx_()
86 298 , bytes_committed_(0)
87 298 , bytes_dispatched_(0)
88 298 , initialized_(false) { }
89
90 596 SessionContextBase::~SessionContextBase() { }
91
92 298 bool SessionContextBase::Initialize(const std::string &api_url,
93 const std::string &session_token,
94 const std::string &key_id,
95 const std::string &secret,
96 uint64_t max_pack_size,
97 uint64_t max_queue_size) {
98 298 bool ret = true;
99
100 // Initialize session context lock
101 pthread_mutexattr_t attr;
102 298 if (pthread_mutexattr_init(&attr)
103
1/2
✓ Branch 1 taken 298 times.
✗ Branch 2 not taken.
298 || pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE)
104
1/2
✓ Branch 1 taken 298 times.
✗ Branch 2 not taken.
298 || pthread_mutex_init(&current_pack_mtx_, &attr)
105
3/6
✓ Branch 0 taken 298 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 298 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 298 times.
596 || pthread_mutexattr_destroy(&attr)) {
106 LogCvmfs(kLogUploadGateway, kLogStderr,
107 "Could not initialize SessionContext lock.");
108 return false;
109 }
110
111 // Set upstream URL and session token
112
1/2
✓ Branch 1 taken 298 times.
✗ Branch 2 not taken.
298 api_url_ = api_url;
113
1/2
✓ Branch 1 taken 298 times.
✗ Branch 2 not taken.
298 session_token_ = session_token;
114
1/2
✓ Branch 1 taken 298 times.
✗ Branch 2 not taken.
298 key_id_ = key_id;
115
1/2
✓ Branch 1 taken 298 times.
✗ Branch 2 not taken.
298 secret_ = secret;
116 298 max_pack_size_ = max_pack_size;
117
118 298 bytes_committed_ = 0u;
119 298 bytes_dispatched_ = 0u;
120
121
2/4
✓ Branch 1 taken 298 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 298 times.
298 assert(upload_results_.IsEmpty());
122
123 // Ensure that there are not open object packs
124
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 298 times.
298 if (current_pack_) {
125 LogCvmfs(
126 kLogUploadGateway, kLogStderr,
127 "Could not initialize SessionContext - Existing open object packs.");
128 ret = false;
129 }
130
131
3/6
✓ Branch 1 taken 298 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 298 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 298 times.
✗ Branch 6 not taken.
298 ret = InitializeDerived(max_queue_size) && ret;
132
133 298 initialized_ = true;
134
135 298 return ret;
136 }
137
138 298 bool SessionContextBase::Finalize(bool commit, const std::string &old_root_hash,
139 const std::string &new_root_hash,
140 const RepositoryTag &tag) {
141
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 298 times.
298 assert(active_handles_.empty());
142
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 298 times.
298 if (!initialized_) {
143 assert(!commit);
144 return true;
145 }
146
147 {
148 298 const MutexLockGuard lock(current_pack_mtx_);
149
150
5/6
✓ Branch 0 taken 164 times.
✓ Branch 1 taken 134 times.
✓ Branch 3 taken 164 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 164 times.
✓ Branch 6 taken 134 times.
298 if (current_pack_ && current_pack_->GetNoObjects() > 0) {
151
1/2
✓ Branch 1 taken 164 times.
✗ Branch 2 not taken.
164 Dispatch();
152 164 current_pack_ = NULL;
153 }
154 298 }
155
156 298 bool results = true;
157
2/2
✓ Branch 1 taken 1271 times.
✓ Branch 2 taken 298 times.
1569 while (!upload_results_.IsEmpty()) {
158 1271 Future<bool> *future = upload_results_.PopBack();
159
2/4
✓ Branch 1 taken 1271 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1271 times.
✗ Branch 4 not taken.
1271 results = future->Get() && results;
160
1/2
✓ Branch 0 taken 1271 times.
✗ Branch 1 not taken.
1271 delete future;
161 }
162
163
2/2
✓ Branch 0 taken 287 times.
✓ Branch 1 taken 11 times.
298 if (commit) {
164
3/6
✓ Branch 1 taken 287 times.
✗ Branch 2 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 287 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 287 times.
287 if (old_root_hash.empty() || new_root_hash.empty()) {
165 return false;
166 }
167 287 const bool commit_result = Commit(old_root_hash, new_root_hash, tag);
168
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 287 times.
287 if (!commit_result) {
169 LogCvmfs(kLogUploadGateway, kLogStderr,
170 "SessionContext: could not commit session. Aborting.");
171 FinalizeDerived();
172 pthread_mutex_destroy(&current_pack_mtx_);
173 initialized_ = false;
174 return false;
175 }
176 }
177
178
2/4
✓ Branch 1 taken 298 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 298 times.
✗ Branch 4 not taken.
298 results &= FinalizeDerived() && (bytes_committed_ == bytes_dispatched_);
179
180 298 pthread_mutex_destroy(&current_pack_mtx_);
181
182 298 initialized_ = false;
183
184 298 return results;
185 }
186
187 2132 ObjectPack::BucketHandle SessionContextBase::NewBucket() {
188 2132 const MutexLockGuard lock(current_pack_mtx_);
189
2/2
✓ Branch 0 taken 656 times.
✓ Branch 1 taken 1476 times.
2132 if (!current_pack_) {
190
2/4
✓ Branch 1 taken 656 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 656 times.
✗ Branch 5 not taken.
656 current_pack_ = new ObjectPack(max_pack_size_);
191 }
192
1/2
✓ Branch 1 taken 2132 times.
✗ Branch 2 not taken.
2132 ObjectPack::BucketHandle hd = current_pack_->NewBucket();
193
1/2
✓ Branch 1 taken 2132 times.
✗ Branch 2 not taken.
2132 active_handles_.push_back(hd);
194 2132 return hd;
195 2132 }
196
197 2788 bool SessionContextBase::CommitBucket(const ObjectPack::BucketContentType type,
198 const shash::Any &id,
199 const ObjectPack::BucketHandle handle,
200 const std::string &name,
201 const bool force_dispatch) {
202 2788 const MutexLockGuard lock(current_pack_mtx_);
203
204
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2788 times.
2788 if (!current_pack_) {
205 LogCvmfs(kLogUploadGateway, kLogStderr,
206 "Error: Called SessionBaseContext::CommitBucket without an open "
207 "ObjectPack.");
208 return false;
209 }
210
211 2788 const uint64_t size0 = current_pack_->size();
212
1/2
✓ Branch 1 taken 2788 times.
✗ Branch 2 not taken.
2788 const bool committed = current_pack_->CommitBucket(type, id, handle, name);
213
214
2/2
✓ Branch 0 taken 2132 times.
✓ Branch 1 taken 656 times.
2788 if (committed) { // Current pack is still not full
215
1/2
✓ Branch 3 taken 2132 times.
✗ Branch 4 not taken.
4264 active_handles_.erase(
216
1/2
✓ Branch 3 taken 2132 times.
✗ Branch 4 not taken.
2132 std::remove(active_handles_.begin(), active_handles_.end(), handle),
217 2132 active_handles_.end());
218 2132 const uint64_t size1 = current_pack_->size();
219 2132 bytes_committed_ += size1 - size0;
220
2/2
✓ Branch 0 taken 492 times.
✓ Branch 1 taken 1640 times.
2132 if (force_dispatch) {
221
1/2
✓ Branch 1 taken 492 times.
✗ Branch 2 not taken.
492 Dispatch();
222 492 current_pack_ = NULL;
223 }
224 } else { // Current pack is full and can be dispatched
225 656 uint64_t new_size = 0;
226
2/2
✓ Branch 0 taken 41 times.
✓ Branch 1 taken 615 times.
656 if (handle->capacity > max_pack_size_) {
227 41 new_size = handle->capacity + 1;
228 } else {
229 615 new_size = max_pack_size_;
230 }
231
2/4
✓ Branch 1 taken 656 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 656 times.
✗ Branch 5 not taken.
656 ObjectPack *new_pack = new ObjectPack(new_size);
232
2/2
✓ Branch 1 taken 902 times.
✓ Branch 2 taken 656 times.
1558 for (size_t i = 0u; i < active_handles_.size(); ++i) {
233
1/2
✓ Branch 2 taken 902 times.
✗ Branch 3 not taken.
902 current_pack_->TransferBucket(active_handles_[i], new_pack);
234 }
235
236
2/2
✓ Branch 1 taken 615 times.
✓ Branch 2 taken 41 times.
656 if (current_pack_->GetNoObjects() > 0) {
237
1/2
✓ Branch 1 taken 615 times.
✗ Branch 2 not taken.
615 Dispatch();
238 }
239 656 current_pack_ = new_pack;
240
241
1/2
✓ Branch 1 taken 656 times.
✗ Branch 2 not taken.
656 CommitBucket(type, id, handle, name, false);
242 }
243
244 2788 return true;
245 2788 }
246
247 1271 void SessionContextBase::Dispatch() {
248 1271 const MutexLockGuard lock(current_pack_mtx_);
249
250
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1271 times.
1271 if (!current_pack_) {
251 return;
252 }
253
254 1271 bytes_dispatched_ += current_pack_->size();
255
2/4
✓ Branch 1 taken 1271 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1271 times.
✗ Branch 5 not taken.
1271 upload_results_.EnqueueFront(DispatchObjectPack(current_pack_));
256
1/2
✓ Branch 1 taken 1271 times.
✗ Branch 2 not taken.
1271 }
257
258 298 SessionContext::SessionContext()
259
1/2
✓ Branch 2 taken 298 times.
✗ Branch 3 not taken.
298 : SessionContextBase(), upload_jobs_(), worker_() { }
260
261 298 bool SessionContext::InitializeDerived(uint64_t max_queue_size) {
262 // Start worker thread
263
1/2
✓ Branch 2 taken 298 times.
✗ Branch 3 not taken.
298 upload_jobs_ = new Tube<UploadJob>(max_queue_size);
264
265 298 const int retval = pthread_create(&worker_, NULL, UploadLoop,
266 reinterpret_cast<void *>(this));
267
268 298 return !retval;
269 }
270
271 298 bool SessionContext::FinalizeDerived() {
272 // Note: in FinalizedDerived, we know that the worker is running. The
273 // SessionContext is called only from GatewayUploader::FinalizeSession(),
274 // which in turn is from Spooler::FinalizeSession(). The Spooler ensures
275 // that GatewayUploader::Initialize() is called on construction.
276 //
277 // TODO(jblomer): Refactor SessionContext (and Uploader*) classes to
278 // use a factory method for construction.
279 //
280 298 upload_jobs_->EnqueueFront(&terminator_);
281 298 pthread_join(worker_, NULL);
282
283 298 return true;
284 }
285
286 bool SessionContext::Commit(const std::string &old_root_hash,
287 const std::string &new_root_hash,
288 const RepositoryTag &tag) {
289 JsonStringGenerator request_input;
290 request_input.Add("old_root_hash", old_root_hash);
291 request_input.Add("new_root_hash", new_root_hash);
292 request_input.Add("tag_name", tag.name());
293 // Channels are no longer supported: send 0 (i.e. kChannelTrunk) for
294 // backwards compatibility with existing gateways
295 //
296 request_input.Add("tag_channel", 0);
297 request_input.Add("tag_description", tag.description());
298 // Space-separated list of tags the receiver should remove in the same history
299 // transaction as this commit. Only sent for tag-removal commits; omitted
300 // otherwise for backwards compatibility with older gateways.
301 if (!tag.delete_tags().empty()) {
302 request_input.Add("delete_tags", tag.delete_tags());
303 }
304 if (tag.auto_tag_threshold() > 0) {
305 request_input.Add("auto_tag_threshold",
306 static_cast<int64_t>(tag.auto_tag_threshold()));
307 }
308 const std::string request = request_input.GenerateString();
309 CurlBuffer buffer;
310 return MakeEndRequest("POST", key_id_, secret_, session_token_, api_url_,
311 request, &buffer);
312 }
313
314 1271 Future<bool> *SessionContext::DispatchObjectPack(ObjectPack *pack) {
315 1271 UploadJob *job = new UploadJob;
316 1271 Future<bool> *result = new Future<bool>();
317 1271 job->pack = pack;
318 1271 job->result = result;
319 1271 upload_jobs_->EnqueueFront(job);
320 1271 return result;
321 }
322
323 bool SessionContext::DoUpload(const SessionContext::UploadJob *job) {
324 // Set up the object pack serializer
325 ObjectPackProducer serializer(job->pack);
326
327 shash::Any payload_digest(shash::kSha1);
328 serializer.GetDigest(&payload_digest);
329 const std::string json_msg = "{\"session_token\" : \"" + session_token_
330 + "\", \"payload_digest\" : \""
331 + payload_digest.ToString(false)
332 + "\", \"header_size\" : \""
333 + StringifyInt(serializer.GetHeaderSize())
334 + "\", \"api_version\" : \""
335 + StringifyInt(gateway::APIVersion()) + "\"}";
336
337 // Compute HMAC
338 shash::Any hmac(shash::kSha1);
339 shash::HmacString(secret_, json_msg, &hmac);
340
341 CurlSendPayload payload;
342 payload.json_message = &json_msg;
343 payload.pack_serializer = &serializer;
344 payload.index = 0;
345
346 const size_t payload_size = json_msg.size() + serializer.GetHeaderSize()
347 + job->pack->size();
348
349 // Prepare the Curl POST request
350 CURL *h_curl = curl_easy_init();
351
352 if (!h_curl) {
353 return false;
354 }
355
356 // Set HTTP headers (Authorization and Message-Size)
357 std::string header_str = std::string("Authorization: ") + key_id_ + " "
358 + Base64(hmac.ToString(false));
359 struct curl_slist *auth_header = NULL;
360 auth_header = curl_slist_append(auth_header, header_str.c_str());
361 header_str = std::string("Message-Size: ") + StringifyInt(json_msg.size());
362 auth_header = curl_slist_append(auth_header, header_str.c_str());
363 curl_easy_setopt(h_curl, CURLOPT_HTTPHEADER, auth_header);
364
365 std::string reply;
366 curl_easy_setopt(h_curl, CURLOPT_NOPROGRESS, 1L);
367 curl_easy_setopt(h_curl, CURLOPT_USERAGENT, "cvmfs/" CVMFS_VERSION);
368 curl_easy_setopt(h_curl, CURLOPT_MAXREDIRS, 50L);
369 curl_easy_setopt(h_curl, CURLOPT_CUSTOMREQUEST, "POST");
370 curl_easy_setopt(h_curl, CURLOPT_URL, (api_url_ + "/payloads").c_str());
371 curl_easy_setopt(h_curl, CURLOPT_POSTFIELDS, NULL);
372 curl_easy_setopt(h_curl, CURLOPT_POSTFIELDSIZE_LARGE,
373 static_cast<curl_off_t>(payload_size));
374 curl_easy_setopt(h_curl, CURLOPT_READDATA, &payload);
375 curl_easy_setopt(h_curl, CURLOPT_READFUNCTION, SendCB);
376 curl_easy_setopt(h_curl, CURLOPT_WRITEFUNCTION, RecvCB);
377 curl_easy_setopt(h_curl, CURLOPT_WRITEDATA, &reply);
378
379 // Perform the Curl POST request
380 const CURLcode ret = curl_easy_perform(h_curl);
381 if (ret) {
382 LogCvmfs(kLogUploadGateway, kLogStderr,
383 "SessionContext::DoUpload - curl_easy_perform failed: %d", ret);
384 }
385
386 const UniquePtr<JsonDocument> reply_json(JsonDocument::Create(reply));
387 const JSON *reply_status = JsonDocument::SearchInObject(
388 reply_json->root(), "status", JSON_STRING);
389 const bool ok = (reply_status != NULL
390 && std::string(reply_status->get<std::string>()) == "ok");
391 if (!ok) {
392 LogCvmfs(kLogUploadGateway, kLogStderr,
393 "SessionContext::DoUpload - error reply: %s", reply.c_str());
394 }
395
396 curl_easy_cleanup(h_curl);
397 h_curl = NULL;
398
399 return ok && !ret;
400 }
401
402 298 void *SessionContext::UploadLoop(void *data) {
403 298 SessionContext *ctx = reinterpret_cast<SessionContext *>(data);
404 UploadJob *job;
405
406 while (true) {
407 1569 job = ctx->upload_jobs_->PopBack();
408
2/2
✓ Branch 0 taken 298 times.
✓ Branch 1 taken 1271 times.
1569 if (job == &terminator_)
409 298 return NULL;
410
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1271 times.
1271 if (!ctx->DoUpload(job)) {
411 PANIC(kLogStderr, "SessionContext: could not submit payload. Aborting.");
412 }
413 1271 job->result->Set(true);
414
1/2
✓ Branch 0 taken 1271 times.
✗ Branch 1 not taken.
1271 delete job->pack;
415
1/2
✓ Branch 0 taken 1271 times.
✗ Branch 1 not taken.
1271 delete job;
416 }
417 }
418
419 SessionContext::UploadJob SessionContext::terminator_;
420
421 } // namespace upload
422