Directory: | cvmfs/ |
---|---|
File: | cvmfs/session_context.cc |
Date: | 2025-06-22 02:36:02 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 139 | 224 | 62.1% |
Branches: | 80 | 300 | 26.7% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #include "session_context.h" | ||
6 | |||
7 | #include <algorithm> | ||
8 | #include <limits> | ||
9 | |||
10 | #include "curl/curl.h" | ||
11 | #include "gateway_util.h" | ||
12 | #include "json_document.h" | ||
13 | #include "json_document_write.h" | ||
14 | #include "swissknife_lease_curl.h" | ||
15 | #include "util/exception.h" | ||
16 | #include "util/pointer.h" | ||
17 | #include "util/string.h" | ||
18 | |||
19 | namespace { | ||
20 | // Maximum number of jobs during a session. No limit, for practical | ||
21 | // purposes. Note that we use uint32_t so that the Tube code works | ||
22 | // correctly with this limit on 32bit systems. | ||
23 | const uint32_t kMaxNumJobs = std::numeric_limits<uint32_t>::max(); | ||
24 | } // namespace | ||
25 | |||
26 | namespace upload { | ||
27 | |||
28 | 90 | size_t SendCB(void *ptr, size_t size, size_t nmemb, void *userp) { | |
29 | 90 | CurlSendPayload *payload = static_cast<CurlSendPayload *>(userp); | |
30 | |||
31 | 90 | const size_t max_chunk_size = size * nmemb; | |
32 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 90 times.
|
90 | if (max_chunk_size < 1) { |
33 | ✗ | return 0; | |
34 | } | ||
35 | |||
36 | 90 | size_t current_chunk_size = 0; | |
37 |
2/2✓ Branch 0 taken 120 times.
✓ Branch 1 taken 60 times.
|
180 | while (current_chunk_size < max_chunk_size) { |
38 |
2/2✓ Branch 1 taken 15 times.
✓ Branch 2 taken 105 times.
|
120 | if (payload->index < payload->json_message->size()) { |
39 | // Can add a chunk from the JSON message | ||
40 | 15 | const size_t read_size = std::min( | |
41 | 30 | max_chunk_size - current_chunk_size, | |
42 | 15 | payload->json_message->size() - payload->index); | |
43 | 15 | current_chunk_size += read_size; | |
44 | 15 | std::memcpy(ptr, payload->json_message->data() + payload->index, | |
45 | read_size); | ||
46 | 15 | payload->index += read_size; | |
47 | } else { | ||
48 | // Can add a chunk from the payload | ||
49 | 105 | const size_t max_read_size = max_chunk_size - current_chunk_size; | |
50 | 105 | const unsigned nbytes = payload->pack_serializer->ProduceNext( | |
51 | max_read_size, | ||
52 | static_cast<unsigned char *>(ptr) + current_chunk_size); | ||
53 | 105 | current_chunk_size += nbytes; | |
54 | |||
55 |
2/2✓ Branch 0 taken 30 times.
✓ Branch 1 taken 75 times.
|
105 | if (!nbytes) { |
56 | 30 | break; | |
57 | } | ||
58 | } | ||
59 | } | ||
60 | |||
61 | 90 | return current_chunk_size; | |
62 | } | ||
63 | |||
64 | ✗ | size_t RecvCB(void *buffer, size_t size, size_t nmemb, void *userp) { | |
65 | ✗ | std::string *my_buffer = static_cast<std::string *>(userp); | |
66 | |||
67 | ✗ | if (size * nmemb < 1) { | |
68 | ✗ | return 0; | |
69 | } | ||
70 | |||
71 | ✗ | *my_buffer = static_cast<char *>(buffer); | |
72 | |||
73 | ✗ | return my_buffer->size(); | |
74 | } | ||
75 | |||
76 | 146 | SessionContextBase::SessionContextBase() | |
77 | 146 | : upload_results_(kMaxNumJobs) | |
78 | 146 | , api_url_() | |
79 | 146 | , session_token_() | |
80 | 146 | , key_id_() | |
81 | 146 | , secret_() | |
82 | 146 | , max_pack_size_(ObjectPack::kDefaultLimit) | |
83 | 146 | , active_handles_() | |
84 | 146 | , current_pack_(NULL) | |
85 | 146 | , current_pack_mtx_() | |
86 | 146 | , bytes_committed_(0) | |
87 | 146 | , bytes_dispatched_(0) | |
88 | 146 | , initialized_(false) { } | |
89 | |||
90 | 292 | SessionContextBase::~SessionContextBase() { } | |
91 | |||
92 | 146 | bool SessionContextBase::Initialize(const std::string &api_url, | |
93 | const std::string &session_token, | ||
94 | const std::string &key_id, | ||
95 | const std::string &secret, | ||
96 | uint64_t max_pack_size, | ||
97 | uint64_t max_queue_size) { | ||
98 | 146 | bool ret = true; | |
99 | |||
100 | // Initialize session context lock | ||
101 | pthread_mutexattr_t attr; | ||
102 | 146 | if (pthread_mutexattr_init(&attr) | |
103 |
1/2✓ Branch 1 taken 146 times.
✗ Branch 2 not taken.
|
146 | || pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) |
104 |
1/2✓ Branch 1 taken 146 times.
✗ Branch 2 not taken.
|
146 | || pthread_mutex_init(¤t_pack_mtx_, &attr) |
105 |
3/6✓ Branch 0 taken 146 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 146 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 146 times.
|
292 | || pthread_mutexattr_destroy(&attr)) { |
106 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
107 | "Could not initialize SessionContext lock."); | ||
108 | ✗ | return false; | |
109 | } | ||
110 | |||
111 | // Set upstream URL and session token | ||
112 |
1/2✓ Branch 1 taken 146 times.
✗ Branch 2 not taken.
|
146 | api_url_ = api_url; |
113 |
1/2✓ Branch 1 taken 146 times.
✗ Branch 2 not taken.
|
146 | session_token_ = session_token; |
114 |
1/2✓ Branch 1 taken 146 times.
✗ Branch 2 not taken.
|
146 | key_id_ = key_id; |
115 |
1/2✓ Branch 1 taken 146 times.
✗ Branch 2 not taken.
|
146 | secret_ = secret; |
116 | 146 | max_pack_size_ = max_pack_size; | |
117 | |||
118 | 146 | bytes_committed_ = 0u; | |
119 | 146 | bytes_dispatched_ = 0u; | |
120 | |||
121 |
2/4✓ Branch 1 taken 146 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 146 times.
|
146 | assert(upload_results_.IsEmpty()); |
122 | |||
123 | // Ensure that there are not open object packs | ||
124 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 146 times.
|
146 | if (current_pack_) { |
125 | ✗ | LogCvmfs( | |
126 | kLogUploadGateway, kLogStderr, | ||
127 | "Could not initialize SessionContext - Existing open object packs."); | ||
128 | ✗ | ret = false; | |
129 | } | ||
130 | |||
131 |
3/6✓ Branch 1 taken 146 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 146 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 146 times.
✗ Branch 6 not taken.
|
146 | ret = InitializeDerived(max_queue_size) && ret; |
132 | |||
133 | 146 | initialized_ = true; | |
134 | |||
135 | 146 | return ret; | |
136 | } | ||
137 | |||
138 | 146 | bool SessionContextBase::Finalize(bool commit, const std::string &old_root_hash, | |
139 | const std::string &new_root_hash, | ||
140 | const RepositoryTag &tag) { | ||
141 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 146 times.
|
146 | assert(active_handles_.empty()); |
142 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 146 times.
|
146 | if (!initialized_) { |
143 | ✗ | assert(!commit); | |
144 | ✗ | return true; | |
145 | } | ||
146 | |||
147 | { | ||
148 | 146 | const MutexLockGuard lock(current_pack_mtx_); | |
149 | |||
150 |
5/6✓ Branch 0 taken 60 times.
✓ Branch 1 taken 86 times.
✓ Branch 3 taken 60 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 60 times.
✓ Branch 6 taken 86 times.
|
146 | if (current_pack_ && current_pack_->GetNoObjects() > 0) { |
151 |
1/2✓ Branch 1 taken 60 times.
✗ Branch 2 not taken.
|
60 | Dispatch(); |
152 | 60 | current_pack_ = NULL; | |
153 | } | ||
154 | 146 | } | |
155 | |||
156 | 146 | bool results = true; | |
157 |
2/2✓ Branch 1 taken 465 times.
✓ Branch 2 taken 146 times.
|
611 | while (!upload_results_.IsEmpty()) { |
158 | 465 | Future<bool> *future = upload_results_.PopBack(); | |
159 |
2/4✓ Branch 1 taken 465 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 465 times.
✗ Branch 4 not taken.
|
465 | results = future->Get() && results; |
160 |
1/2✓ Branch 0 taken 465 times.
✗ Branch 1 not taken.
|
465 | delete future; |
161 | } | ||
162 | |||
163 |
2/2✓ Branch 0 taken 105 times.
✓ Branch 1 taken 41 times.
|
146 | if (commit) { |
164 |
3/6✓ Branch 1 taken 105 times.
✗ Branch 2 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 105 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 105 times.
|
105 | if (old_root_hash.empty() || new_root_hash.empty()) { |
165 | ✗ | return false; | |
166 | } | ||
167 | 105 | const bool commit_result = Commit(old_root_hash, new_root_hash, tag); | |
168 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 105 times.
|
105 | if (!commit_result) { |
169 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
170 | "SessionContext: could not commit session. Aborting."); | ||
171 | ✗ | FinalizeDerived(); | |
172 | ✗ | pthread_mutex_destroy(¤t_pack_mtx_); | |
173 | ✗ | initialized_ = false; | |
174 | ✗ | return false; | |
175 | } | ||
176 | } | ||
177 | |||
178 |
2/4✓ Branch 1 taken 146 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 146 times.
✗ Branch 4 not taken.
|
146 | results &= FinalizeDerived() && (bytes_committed_ == bytes_dispatched_); |
179 | |||
180 | 146 | pthread_mutex_destroy(¤t_pack_mtx_); | |
181 | |||
182 | 146 | initialized_ = false; | |
183 | |||
184 | 146 | return results; | |
185 | } | ||
186 | |||
187 | 780 | ObjectPack::BucketHandle SessionContextBase::NewBucket() { | |
188 | 780 | const MutexLockGuard lock(current_pack_mtx_); | |
189 |
2/2✓ Branch 0 taken 240 times.
✓ Branch 1 taken 540 times.
|
780 | if (!current_pack_) { |
190 |
2/4✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 240 times.
✗ Branch 5 not taken.
|
240 | current_pack_ = new ObjectPack(max_pack_size_); |
191 | } | ||
192 |
1/2✓ Branch 1 taken 780 times.
✗ Branch 2 not taken.
|
780 | ObjectPack::BucketHandle hd = current_pack_->NewBucket(); |
193 |
1/2✓ Branch 1 taken 780 times.
✗ Branch 2 not taken.
|
780 | active_handles_.push_back(hd); |
194 | 780 | return hd; | |
195 | 780 | } | |
196 | |||
197 | 1020 | bool SessionContextBase::CommitBucket(const ObjectPack::BucketContentType type, | |
198 | const shash::Any &id, | ||
199 | const ObjectPack::BucketHandle handle, | ||
200 | const std::string &name, | ||
201 | const bool force_dispatch) { | ||
202 | 1020 | const MutexLockGuard lock(current_pack_mtx_); | |
203 | |||
204 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1020 times.
|
1020 | if (!current_pack_) { |
205 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
206 | "Error: Called SessionBaseContext::CommitBucket without an open " | ||
207 | "ObjectPack."); | ||
208 | ✗ | return false; | |
209 | } | ||
210 | |||
211 | 1020 | const uint64_t size0 = current_pack_->size(); | |
212 |
1/2✓ Branch 1 taken 1020 times.
✗ Branch 2 not taken.
|
1020 | const bool committed = current_pack_->CommitBucket(type, id, handle, name); |
213 | |||
214 |
2/2✓ Branch 0 taken 780 times.
✓ Branch 1 taken 240 times.
|
1020 | if (committed) { // Current pack is still not full |
215 |
1/2✓ Branch 3 taken 780 times.
✗ Branch 4 not taken.
|
1560 | active_handles_.erase( |
216 |
1/2✓ Branch 3 taken 780 times.
✗ Branch 4 not taken.
|
780 | std::remove(active_handles_.begin(), active_handles_.end(), handle), |
217 | 780 | active_handles_.end()); | |
218 | 780 | const uint64_t size1 = current_pack_->size(); | |
219 | 780 | bytes_committed_ += size1 - size0; | |
220 |
2/2✓ Branch 0 taken 180 times.
✓ Branch 1 taken 600 times.
|
780 | if (force_dispatch) { |
221 |
1/2✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
|
180 | Dispatch(); |
222 | 180 | current_pack_ = NULL; | |
223 | } | ||
224 | } else { // Current pack is full and can be dispatched | ||
225 | 240 | uint64_t new_size = 0; | |
226 |
2/2✓ Branch 0 taken 15 times.
✓ Branch 1 taken 225 times.
|
240 | if (handle->capacity > max_pack_size_) { |
227 | 15 | new_size = handle->capacity + 1; | |
228 | } else { | ||
229 | 225 | new_size = max_pack_size_; | |
230 | } | ||
231 |
2/4✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 240 times.
✗ Branch 5 not taken.
|
240 | ObjectPack *new_pack = new ObjectPack(new_size); |
232 |
2/2✓ Branch 1 taken 330 times.
✓ Branch 2 taken 240 times.
|
570 | for (size_t i = 0u; i < active_handles_.size(); ++i) { |
233 |
1/2✓ Branch 2 taken 330 times.
✗ Branch 3 not taken.
|
330 | current_pack_->TransferBucket(active_handles_[i], new_pack); |
234 | } | ||
235 | |||
236 |
2/2✓ Branch 1 taken 225 times.
✓ Branch 2 taken 15 times.
|
240 | if (current_pack_->GetNoObjects() > 0) { |
237 |
1/2✓ Branch 1 taken 225 times.
✗ Branch 2 not taken.
|
225 | Dispatch(); |
238 | } | ||
239 | 240 | current_pack_ = new_pack; | |
240 | |||
241 |
1/2✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
|
240 | CommitBucket(type, id, handle, name, false); |
242 | } | ||
243 | |||
244 | 1020 | return true; | |
245 | 1020 | } | |
246 | |||
247 | 465 | void SessionContextBase::Dispatch() { | |
248 | 465 | const MutexLockGuard lock(current_pack_mtx_); | |
249 | |||
250 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 465 times.
|
465 | if (!current_pack_) { |
251 | ✗ | return; | |
252 | } | ||
253 | |||
254 | 465 | bytes_dispatched_ += current_pack_->size(); | |
255 |
2/4✓ Branch 1 taken 465 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 465 times.
✗ Branch 5 not taken.
|
465 | upload_results_.EnqueueFront(DispatchObjectPack(current_pack_)); |
256 |
1/2✓ Branch 1 taken 465 times.
✗ Branch 2 not taken.
|
465 | } |
257 | |||
258 | 146 | SessionContext::SessionContext() | |
259 |
1/2✓ Branch 2 taken 146 times.
✗ Branch 3 not taken.
|
146 | : SessionContextBase(), upload_jobs_(), worker_() { } |
260 | |||
261 | 146 | bool SessionContext::InitializeDerived(uint64_t max_queue_size) { | |
262 | // Start worker thread | ||
263 |
1/2✓ Branch 2 taken 146 times.
✗ Branch 3 not taken.
|
146 | upload_jobs_ = new Tube<UploadJob>(max_queue_size); |
264 | |||
265 | 146 | const int retval = pthread_create(&worker_, NULL, UploadLoop, | |
266 | reinterpret_cast<void *>(this)); | ||
267 | |||
268 | 146 | return !retval; | |
269 | } | ||
270 | |||
271 | 146 | bool SessionContext::FinalizeDerived() { | |
272 | // Note: in FinalizedDerived, we know that the worker is running. The | ||
273 | // SessionContext is called only from GatewayUploader::FinalizeSession(), | ||
274 | // which in turn is from Spooler::FinalizeSession(). The Spooler ensures | ||
275 | // that GatewayUploader::Initialize() is called on construction. | ||
276 | // | ||
277 | // TODO(jblomer): Refactor SessionContext (and Uploader*) classes to | ||
278 | // use a factory method for construction. | ||
279 | // | ||
280 | 146 | upload_jobs_->EnqueueFront(&terminator_); | |
281 | 146 | pthread_join(worker_, NULL); | |
282 | |||
283 | 146 | return true; | |
284 | } | ||
285 | |||
286 | ✗ | bool SessionContext::Commit(const std::string &old_root_hash, | |
287 | const std::string &new_root_hash, | ||
288 | const RepositoryTag &tag) { | ||
289 | ✗ | JsonStringGenerator request_input; | |
290 | ✗ | request_input.Add("old_root_hash", old_root_hash); | |
291 | ✗ | request_input.Add("new_root_hash", new_root_hash); | |
292 | ✗ | request_input.Add("tag_name", tag.name()); | |
293 | // Channels are no longer supported: send 0 (i.e. kChannelTrunk) for | ||
294 | // backwards compatibility with existing gateways | ||
295 | // | ||
296 | ✗ | request_input.Add("tag_channel", 0); | |
297 | ✗ | request_input.Add("tag_description", tag.description()); | |
298 | ✗ | const std::string request = request_input.GenerateString(); | |
299 | ✗ | CurlBuffer buffer; | |
300 | ✗ | return MakeEndRequest("POST", key_id_, secret_, session_token_, api_url_, | |
301 | ✗ | request, &buffer); | |
302 | } | ||
303 | |||
304 | 465 | Future<bool> *SessionContext::DispatchObjectPack(ObjectPack *pack) { | |
305 | 465 | UploadJob *job = new UploadJob; | |
306 | 465 | Future<bool> *result = new Future<bool>(); | |
307 | 465 | job->pack = pack; | |
308 | 465 | job->result = result; | |
309 | 465 | upload_jobs_->EnqueueFront(job); | |
310 | 465 | return result; | |
311 | } | ||
312 | |||
313 | ✗ | bool SessionContext::DoUpload(const SessionContext::UploadJob *job) { | |
314 | // Set up the object pack serializer | ||
315 | ✗ | ObjectPackProducer serializer(job->pack); | |
316 | |||
317 | ✗ | shash::Any payload_digest(shash::kSha1); | |
318 | ✗ | serializer.GetDigest(&payload_digest); | |
319 | ✗ | const std::string json_msg = "{\"session_token\" : \"" + session_token_ | |
320 | ✗ | + "\", \"payload_digest\" : \"" | |
321 | ✗ | + payload_digest.ToString(false) | |
322 | ✗ | + "\", \"header_size\" : \"" | |
323 | ✗ | + StringifyInt(serializer.GetHeaderSize()) | |
324 | ✗ | + "\", \"api_version\" : \"" | |
325 | ✗ | + StringifyInt(gateway::APIVersion()) + "\"}"; | |
326 | |||
327 | // Compute HMAC | ||
328 | ✗ | shash::Any hmac(shash::kSha1); | |
329 | ✗ | shash::HmacString(secret_, json_msg, &hmac); | |
330 | |||
331 | CurlSendPayload payload; | ||
332 | ✗ | payload.json_message = &json_msg; | |
333 | ✗ | payload.pack_serializer = &serializer; | |
334 | ✗ | payload.index = 0; | |
335 | |||
336 | ✗ | const size_t payload_size = json_msg.size() + serializer.GetHeaderSize() | |
337 | ✗ | + job->pack->size(); | |
338 | |||
339 | // Prepare the Curl POST request | ||
340 | ✗ | CURL *h_curl = curl_easy_init(); | |
341 | |||
342 | ✗ | if (!h_curl) { | |
343 | ✗ | return false; | |
344 | } | ||
345 | |||
346 | // Set HTTP headers (Authorization and Message-Size) | ||
347 | ✗ | std::string header_str = std::string("Authorization: ") + key_id_ + " " | |
348 | ✗ | + Base64(hmac.ToString(false)); | |
349 | ✗ | struct curl_slist *auth_header = NULL; | |
350 | ✗ | auth_header = curl_slist_append(auth_header, header_str.c_str()); | |
351 | ✗ | header_str = std::string("Message-Size: ") + StringifyInt(json_msg.size()); | |
352 | ✗ | auth_header = curl_slist_append(auth_header, header_str.c_str()); | |
353 | ✗ | curl_easy_setopt(h_curl, CURLOPT_HTTPHEADER, auth_header); | |
354 | |||
355 | ✗ | std::string reply; | |
356 | ✗ | curl_easy_setopt(h_curl, CURLOPT_NOPROGRESS, 1L); | |
357 | ✗ | curl_easy_setopt(h_curl, CURLOPT_USERAGENT, "cvmfs/" CVMFS_VERSION); | |
358 | ✗ | curl_easy_setopt(h_curl, CURLOPT_MAXREDIRS, 50L); | |
359 | ✗ | curl_easy_setopt(h_curl, CURLOPT_CUSTOMREQUEST, "POST"); | |
360 | ✗ | curl_easy_setopt(h_curl, CURLOPT_URL, (api_url_ + "/payloads").c_str()); | |
361 | ✗ | curl_easy_setopt(h_curl, CURLOPT_POSTFIELDS, NULL); | |
362 | ✗ | curl_easy_setopt(h_curl, CURLOPT_POSTFIELDSIZE_LARGE, | |
363 | static_cast<curl_off_t>(payload_size)); | ||
364 | ✗ | curl_easy_setopt(h_curl, CURLOPT_READDATA, &payload); | |
365 | ✗ | curl_easy_setopt(h_curl, CURLOPT_READFUNCTION, SendCB); | |
366 | ✗ | curl_easy_setopt(h_curl, CURLOPT_WRITEFUNCTION, RecvCB); | |
367 | ✗ | curl_easy_setopt(h_curl, CURLOPT_WRITEDATA, &reply); | |
368 | |||
369 | // Perform the Curl POST request | ||
370 | ✗ | const CURLcode ret = curl_easy_perform(h_curl); | |
371 | ✗ | if (ret) { | |
372 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
373 | "SessionContext::DoUpload - curl_easy_perform failed: %d", ret); | ||
374 | } | ||
375 | |||
376 | ✗ | const UniquePtr<JsonDocument> reply_json(JsonDocument::Create(reply)); | |
377 | ✗ | const JSON *reply_status = JsonDocument::SearchInObject( | |
378 | reply_json->root(), "status", JSON_STRING); | ||
379 | const bool ok = (reply_status != NULL | ||
380 | ✗ | && std::string(reply_status->string_value) == "ok"); | |
381 | ✗ | if (!ok) { | |
382 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
383 | "SessionContext::DoUpload - error reply: %s", reply.c_str()); | ||
384 | } | ||
385 | |||
386 | ✗ | curl_easy_cleanup(h_curl); | |
387 | ✗ | h_curl = NULL; | |
388 | |||
389 | ✗ | return ok && !ret; | |
390 | } | ||
391 | |||
392 | 146 | void *SessionContext::UploadLoop(void *data) { | |
393 | 146 | SessionContext *ctx = reinterpret_cast<SessionContext *>(data); | |
394 | UploadJob *job; | ||
395 | |||
396 | while (true) { | ||
397 | 611 | job = ctx->upload_jobs_->PopBack(); | |
398 |
2/2✓ Branch 0 taken 146 times.
✓ Branch 1 taken 465 times.
|
611 | if (job == &terminator_) |
399 | 146 | return NULL; | |
400 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 465 times.
|
465 | if (!ctx->DoUpload(job)) { |
401 | ✗ | PANIC(kLogStderr, "SessionContext: could not submit payload. Aborting."); | |
402 | } | ||
403 | 465 | job->result->Set(true); | |
404 |
1/2✓ Branch 0 taken 465 times.
✗ Branch 1 not taken.
|
465 | delete job->pack; |
405 |
1/2✓ Branch 0 taken 465 times.
✗ Branch 1 not taken.
|
465 | delete job; |
406 | } | ||
407 | } | ||
408 | |||
409 | SessionContext::UploadJob SessionContext::terminator_; | ||
410 | |||
411 | } // namespace upload | ||
412 |