GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/session_context.cc
Date: 2025-12-21 02:39:23
Exec Total Coverage
Lines: 139 224 62.1%
Branches: 80 300 26.7%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "session_context.h"
6
7 #include <algorithm>
8 #include <limits>
9
10 #include "curl/curl.h"
11 #include "gateway_util.h"
12 #include "json_document.h"
13 #include "json_document_write.h"
14 #include "swissknife_lease_curl.h"
15 #include "util/exception.h"
16 #include "util/pointer.h"
17 #include "util/string.h"
18
19 namespace {
20 // Maximum number of jobs during a session. No limit, for practical
21 // purposes. Note that we use uint32_t so that the Tube code works
22 // correctly with this limit on 32bit systems.
23 const uint32_t kMaxNumJobs = std::numeric_limits<uint32_t>::max();
24 } // namespace
25
26 namespace upload {
27
28 252 size_t SendCB(void *ptr, size_t size, size_t nmemb, void *userp) {
29 252 CurlSendPayload *payload = static_cast<CurlSendPayload *>(userp);
30
31 252 const size_t max_chunk_size = size * nmemb;
32
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 252 times.
252 if (max_chunk_size < 1) {
33 return 0;
34 }
35
36 252 size_t current_chunk_size = 0;
37
2/2
✓ Branch 0 taken 336 times.
✓ Branch 1 taken 168 times.
504 while (current_chunk_size < max_chunk_size) {
38
2/2
✓ Branch 1 taken 42 times.
✓ Branch 2 taken 294 times.
336 if (payload->index < payload->json_message->size()) {
39 // Can add a chunk from the JSON message
40 42 const size_t read_size = std::min(
41 84 max_chunk_size - current_chunk_size,
42 42 payload->json_message->size() - payload->index);
43 42 current_chunk_size += read_size;
44 42 std::memcpy(ptr, payload->json_message->data() + payload->index,
45 read_size);
46 42 payload->index += read_size;
47 } else {
48 // Can add a chunk from the payload
49 294 const size_t max_read_size = max_chunk_size - current_chunk_size;
50 294 const unsigned nbytes = payload->pack_serializer->ProduceNext(
51 max_read_size,
52 static_cast<unsigned char *>(ptr) + current_chunk_size);
53 294 current_chunk_size += nbytes;
54
55
2/2
✓ Branch 0 taken 84 times.
✓ Branch 1 taken 210 times.
294 if (!nbytes) {
56 84 break;
57 }
58 }
59 }
60
61 252 return current_chunk_size;
62 }
63
64 size_t RecvCB(void *buffer, size_t size, size_t nmemb, void *userp) {
65 std::string *my_buffer = static_cast<std::string *>(userp);
66
67 if (size * nmemb < 1) {
68 return 0;
69 }
70
71 *my_buffer = static_cast<char *>(buffer);
72
73 return my_buffer->size();
74 }
75
76 343 SessionContextBase::SessionContextBase()
77 343 : upload_results_(kMaxNumJobs)
78 343 , api_url_()
79 343 , session_token_()
80 343 , key_id_()
81 343 , secret_()
82 343 , max_pack_size_(ObjectPack::kDefaultLimit)
83 343 , active_handles_()
84 343 , current_pack_(NULL)
85 343 , current_pack_mtx_()
86 343 , bytes_committed_(0)
87 343 , bytes_dispatched_(0)
88 343 , initialized_(false) { }
89
90 686 SessionContextBase::~SessionContextBase() { }
91
92 343 bool SessionContextBase::Initialize(const std::string &api_url,
93 const std::string &session_token,
94 const std::string &key_id,
95 const std::string &secret,
96 uint64_t max_pack_size,
97 uint64_t max_queue_size) {
98 343 bool ret = true;
99
100 // Initialize session context lock
101 pthread_mutexattr_t attr;
102 343 if (pthread_mutexattr_init(&attr)
103
1/2
✓ Branch 1 taken 343 times.
✗ Branch 2 not taken.
343 || pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE)
104
1/2
✓ Branch 1 taken 343 times.
✗ Branch 2 not taken.
343 || pthread_mutex_init(&current_pack_mtx_, &attr)
105
3/6
✓ Branch 0 taken 343 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 343 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 343 times.
686 || pthread_mutexattr_destroy(&attr)) {
106 LogCvmfs(kLogUploadGateway, kLogStderr,
107 "Could not initialize SessionContext lock.");
108 return false;
109 }
110
111 // Set upstream URL and session token
112
1/2
✓ Branch 1 taken 343 times.
✗ Branch 2 not taken.
343 api_url_ = api_url;
113
1/2
✓ Branch 1 taken 343 times.
✗ Branch 2 not taken.
343 session_token_ = session_token;
114
1/2
✓ Branch 1 taken 343 times.
✗ Branch 2 not taken.
343 key_id_ = key_id;
115
1/2
✓ Branch 1 taken 343 times.
✗ Branch 2 not taken.
343 secret_ = secret;
116 343 max_pack_size_ = max_pack_size;
117
118 343 bytes_committed_ = 0u;
119 343 bytes_dispatched_ = 0u;
120
121
2/4
✓ Branch 1 taken 343 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 343 times.
343 assert(upload_results_.IsEmpty());
122
123 // Ensure that there are not open object packs
124
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 343 times.
343 if (current_pack_) {
125 LogCvmfs(
126 kLogUploadGateway, kLogStderr,
127 "Could not initialize SessionContext - Existing open object packs.");
128 ret = false;
129 }
130
131
3/6
✓ Branch 1 taken 343 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 343 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 343 times.
✗ Branch 6 not taken.
343 ret = InitializeDerived(max_queue_size) && ret;
132
133 343 initialized_ = true;
134
135 343 return ret;
136 }
137
138 343 bool SessionContextBase::Finalize(bool commit, const std::string &old_root_hash,
139 const std::string &new_root_hash,
140 const RepositoryTag &tag) {
141
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 343 times.
343 assert(active_handles_.empty());
142
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 343 times.
343 if (!initialized_) {
143 assert(!commit);
144 return true;
145 }
146
147 {
148 343 const MutexLockGuard lock(current_pack_mtx_);
149
150
5/6
✓ Branch 0 taken 168 times.
✓ Branch 1 taken 175 times.
✓ Branch 3 taken 168 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 168 times.
✓ Branch 6 taken 175 times.
343 if (current_pack_ && current_pack_->GetNoObjects() > 0) {
151
1/2
✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
168 Dispatch();
152 168 current_pack_ = NULL;
153 }
154 343 }
155
156 343 bool results = true;
157
2/2
✓ Branch 1 taken 1302 times.
✓ Branch 2 taken 343 times.
1645 while (!upload_results_.IsEmpty()) {
158 1302 Future<bool> *future = upload_results_.PopBack();
159
2/4
✓ Branch 1 taken 1302 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1302 times.
✗ Branch 4 not taken.
1302 results = future->Get() && results;
160
1/2
✓ Branch 0 taken 1302 times.
✗ Branch 1 not taken.
1302 delete future;
161 }
162
163
2/2
✓ Branch 0 taken 294 times.
✓ Branch 1 taken 49 times.
343 if (commit) {
164
3/6
✓ Branch 1 taken 294 times.
✗ Branch 2 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 294 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 294 times.
294 if (old_root_hash.empty() || new_root_hash.empty()) {
165 return false;
166 }
167 294 const bool commit_result = Commit(old_root_hash, new_root_hash, tag);
168
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 294 times.
294 if (!commit_result) {
169 LogCvmfs(kLogUploadGateway, kLogStderr,
170 "SessionContext: could not commit session. Aborting.");
171 FinalizeDerived();
172 pthread_mutex_destroy(&current_pack_mtx_);
173 initialized_ = false;
174 return false;
175 }
176 }
177
178
2/4
✓ Branch 1 taken 343 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 343 times.
✗ Branch 4 not taken.
343 results &= FinalizeDerived() && (bytes_committed_ == bytes_dispatched_);
179
180 343 pthread_mutex_destroy(&current_pack_mtx_);
181
182 343 initialized_ = false;
183
184 343 return results;
185 }
186
187 2184 ObjectPack::BucketHandle SessionContextBase::NewBucket() {
188 2184 const MutexLockGuard lock(current_pack_mtx_);
189
2/2
✓ Branch 0 taken 672 times.
✓ Branch 1 taken 1512 times.
2184 if (!current_pack_) {
190
2/4
✓ Branch 1 taken 672 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 672 times.
✗ Branch 5 not taken.
672 current_pack_ = new ObjectPack(max_pack_size_);
191 }
192
1/2
✓ Branch 1 taken 2184 times.
✗ Branch 2 not taken.
2184 ObjectPack::BucketHandle hd = current_pack_->NewBucket();
193
1/2
✓ Branch 1 taken 2184 times.
✗ Branch 2 not taken.
2184 active_handles_.push_back(hd);
194 2184 return hd;
195 2184 }
196
197 2856 bool SessionContextBase::CommitBucket(const ObjectPack::BucketContentType type,
198 const shash::Any &id,
199 const ObjectPack::BucketHandle handle,
200 const std::string &name,
201 const bool force_dispatch) {
202 2856 const MutexLockGuard lock(current_pack_mtx_);
203
204
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2856 times.
2856 if (!current_pack_) {
205 LogCvmfs(kLogUploadGateway, kLogStderr,
206 "Error: Called SessionBaseContext::CommitBucket without an open "
207 "ObjectPack.");
208 return false;
209 }
210
211 2856 const uint64_t size0 = current_pack_->size();
212
1/2
✓ Branch 1 taken 2856 times.
✗ Branch 2 not taken.
2856 const bool committed = current_pack_->CommitBucket(type, id, handle, name);
213
214
2/2
✓ Branch 0 taken 2184 times.
✓ Branch 1 taken 672 times.
2856 if (committed) { // Current pack is still not full
215
1/2
✓ Branch 3 taken 2184 times.
✗ Branch 4 not taken.
4368 active_handles_.erase(
216
1/2
✓ Branch 3 taken 2184 times.
✗ Branch 4 not taken.
2184 std::remove(active_handles_.begin(), active_handles_.end(), handle),
217 2184 active_handles_.end());
218 2184 const uint64_t size1 = current_pack_->size();
219 2184 bytes_committed_ += size1 - size0;
220
2/2
✓ Branch 0 taken 504 times.
✓ Branch 1 taken 1680 times.
2184 if (force_dispatch) {
221
1/2
✓ Branch 1 taken 504 times.
✗ Branch 2 not taken.
504 Dispatch();
222 504 current_pack_ = NULL;
223 }
224 } else { // Current pack is full and can be dispatched
225 672 uint64_t new_size = 0;
226
2/2
✓ Branch 0 taken 42 times.
✓ Branch 1 taken 630 times.
672 if (handle->capacity > max_pack_size_) {
227 42 new_size = handle->capacity + 1;
228 } else {
229 630 new_size = max_pack_size_;
230 }
231
2/4
✓ Branch 1 taken 672 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 672 times.
✗ Branch 5 not taken.
672 ObjectPack *new_pack = new ObjectPack(new_size);
232
2/2
✓ Branch 1 taken 924 times.
✓ Branch 2 taken 672 times.
1596 for (size_t i = 0u; i < active_handles_.size(); ++i) {
233
1/2
✓ Branch 2 taken 924 times.
✗ Branch 3 not taken.
924 current_pack_->TransferBucket(active_handles_[i], new_pack);
234 }
235
236
2/2
✓ Branch 1 taken 630 times.
✓ Branch 2 taken 42 times.
672 if (current_pack_->GetNoObjects() > 0) {
237
1/2
✓ Branch 1 taken 630 times.
✗ Branch 2 not taken.
630 Dispatch();
238 }
239 672 current_pack_ = new_pack;
240
241
1/2
✓ Branch 1 taken 672 times.
✗ Branch 2 not taken.
672 CommitBucket(type, id, handle, name, false);
242 }
243
244 2856 return true;
245 2856 }
246
247 1302 void SessionContextBase::Dispatch() {
248 1302 const MutexLockGuard lock(current_pack_mtx_);
249
250
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1302 times.
1302 if (!current_pack_) {
251 return;
252 }
253
254 1302 bytes_dispatched_ += current_pack_->size();
255
2/4
✓ Branch 1 taken 1302 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1302 times.
✗ Branch 5 not taken.
1302 upload_results_.EnqueueFront(DispatchObjectPack(current_pack_));
256
1/2
✓ Branch 1 taken 1302 times.
✗ Branch 2 not taken.
1302 }
257
258 343 SessionContext::SessionContext()
259
1/2
✓ Branch 2 taken 343 times.
✗ Branch 3 not taken.
343 : SessionContextBase(), upload_jobs_(), worker_() { }
260
261 343 bool SessionContext::InitializeDerived(uint64_t max_queue_size) {
262 // Start worker thread
263
1/2
✓ Branch 2 taken 343 times.
✗ Branch 3 not taken.
343 upload_jobs_ = new Tube<UploadJob>(max_queue_size);
264
265 343 const int retval = pthread_create(&worker_, NULL, UploadLoop,
266 reinterpret_cast<void *>(this));
267
268 343 return !retval;
269 }
270
271 343 bool SessionContext::FinalizeDerived() {
272 // Note: in FinalizedDerived, we know that the worker is running. The
273 // SessionContext is called only from GatewayUploader::FinalizeSession(),
274 // which in turn is from Spooler::FinalizeSession(). The Spooler ensures
275 // that GatewayUploader::Initialize() is called on construction.
276 //
277 // TODO(jblomer): Refactor SessionContext (and Uploader*) classes to
278 // use a factory method for construction.
279 //
280 343 upload_jobs_->EnqueueFront(&terminator_);
281 343 pthread_join(worker_, NULL);
282
283 343 return true;
284 }
285
286 bool SessionContext::Commit(const std::string &old_root_hash,
287 const std::string &new_root_hash,
288 const RepositoryTag &tag) {
289 JsonStringGenerator request_input;
290 request_input.Add("old_root_hash", old_root_hash);
291 request_input.Add("new_root_hash", new_root_hash);
292 request_input.Add("tag_name", tag.name());
293 // Channels are no longer supported: send 0 (i.e. kChannelTrunk) for
294 // backwards compatibility with existing gateways
295 //
296 request_input.Add("tag_channel", 0);
297 request_input.Add("tag_description", tag.description());
298 const std::string request = request_input.GenerateString();
299 CurlBuffer buffer;
300 return MakeEndRequest("POST", key_id_, secret_, session_token_, api_url_,
301 request, &buffer);
302 }
303
304 1302 Future<bool> *SessionContext::DispatchObjectPack(ObjectPack *pack) {
305 1302 UploadJob *job = new UploadJob;
306 1302 Future<bool> *result = new Future<bool>();
307 1302 job->pack = pack;
308 1302 job->result = result;
309 1302 upload_jobs_->EnqueueFront(job);
310 1302 return result;
311 }
312
313 bool SessionContext::DoUpload(const SessionContext::UploadJob *job) {
314 // Set up the object pack serializer
315 ObjectPackProducer serializer(job->pack);
316
317 shash::Any payload_digest(shash::kSha1);
318 serializer.GetDigest(&payload_digest);
319 const std::string json_msg = "{\"session_token\" : \"" + session_token_
320 + "\", \"payload_digest\" : \""
321 + payload_digest.ToString(false)
322 + "\", \"header_size\" : \""
323 + StringifyInt(serializer.GetHeaderSize())
324 + "\", \"api_version\" : \""
325 + StringifyInt(gateway::APIVersion()) + "\"}";
326
327 // Compute HMAC
328 shash::Any hmac(shash::kSha1);
329 shash::HmacString(secret_, json_msg, &hmac);
330
331 CurlSendPayload payload;
332 payload.json_message = &json_msg;
333 payload.pack_serializer = &serializer;
334 payload.index = 0;
335
336 const size_t payload_size = json_msg.size() + serializer.GetHeaderSize()
337 + job->pack->size();
338
339 // Prepare the Curl POST request
340 CURL *h_curl = curl_easy_init();
341
342 if (!h_curl) {
343 return false;
344 }
345
346 // Set HTTP headers (Authorization and Message-Size)
347 std::string header_str = std::string("Authorization: ") + key_id_ + " "
348 + Base64(hmac.ToString(false));
349 struct curl_slist *auth_header = NULL;
350 auth_header = curl_slist_append(auth_header, header_str.c_str());
351 header_str = std::string("Message-Size: ") + StringifyInt(json_msg.size());
352 auth_header = curl_slist_append(auth_header, header_str.c_str());
353 curl_easy_setopt(h_curl, CURLOPT_HTTPHEADER, auth_header);
354
355 std::string reply;
356 curl_easy_setopt(h_curl, CURLOPT_NOPROGRESS, 1L);
357 curl_easy_setopt(h_curl, CURLOPT_USERAGENT, "cvmfs/" CVMFS_VERSION);
358 curl_easy_setopt(h_curl, CURLOPT_MAXREDIRS, 50L);
359 curl_easy_setopt(h_curl, CURLOPT_CUSTOMREQUEST, "POST");
360 curl_easy_setopt(h_curl, CURLOPT_URL, (api_url_ + "/payloads").c_str());
361 curl_easy_setopt(h_curl, CURLOPT_POSTFIELDS, NULL);
362 curl_easy_setopt(h_curl, CURLOPT_POSTFIELDSIZE_LARGE,
363 static_cast<curl_off_t>(payload_size));
364 curl_easy_setopt(h_curl, CURLOPT_READDATA, &payload);
365 curl_easy_setopt(h_curl, CURLOPT_READFUNCTION, SendCB);
366 curl_easy_setopt(h_curl, CURLOPT_WRITEFUNCTION, RecvCB);
367 curl_easy_setopt(h_curl, CURLOPT_WRITEDATA, &reply);
368
369 // Perform the Curl POST request
370 const CURLcode ret = curl_easy_perform(h_curl);
371 if (ret) {
372 LogCvmfs(kLogUploadGateway, kLogStderr,
373 "SessionContext::DoUpload - curl_easy_perform failed: %d", ret);
374 }
375
376 const UniquePtr<JsonDocument> reply_json(JsonDocument::Create(reply));
377 const JSON *reply_status = JsonDocument::SearchInObject(
378 reply_json->root(), "status", JSON_STRING);
379 const bool ok = (reply_status != NULL
380 && std::string(reply_status->string_value) == "ok");
381 if (!ok) {
382 LogCvmfs(kLogUploadGateway, kLogStderr,
383 "SessionContext::DoUpload - error reply: %s", reply.c_str());
384 }
385
386 curl_easy_cleanup(h_curl);
387 h_curl = NULL;
388
389 return ok && !ret;
390 }
391
392 343 void *SessionContext::UploadLoop(void *data) {
393 343 SessionContext *ctx = reinterpret_cast<SessionContext *>(data);
394 UploadJob *job;
395
396 while (true) {
397 1645 job = ctx->upload_jobs_->PopBack();
398
2/2
✓ Branch 0 taken 343 times.
✓ Branch 1 taken 1302 times.
1645 if (job == &terminator_)
399 343 return NULL;
400
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1302 times.
1302 if (!ctx->DoUpload(job)) {
401 PANIC(kLogStderr, "SessionContext: could not submit payload. Aborting.");
402 }
403 1302 job->result->Set(true);
404
1/2
✓ Branch 0 taken 1302 times.
✗ Branch 1 not taken.
1302 delete job->pack;
405
1/2
✓ Branch 0 taken 1302 times.
✗ Branch 1 not taken.
1302 delete job;
406 }
407 }
408
409 SessionContext::UploadJob SessionContext::terminator_;
410
411 } // namespace upload
412