Directory: | cvmfs/ |
---|---|
File: | cvmfs/session_context.cc |
Date: | 2025-02-09 02:34:19 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 140 | 222 | 63.1% |
Branches: | 80 | 298 | 26.8% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #include "session_context.h" | ||
6 | |||
7 | #include <algorithm> | ||
8 | #include <limits> | ||
9 | |||
10 | #include "curl/curl.h" | ||
11 | |||
12 | #include "gateway_util.h" | ||
13 | #include "json_document.h" | ||
14 | #include "json_document_write.h" | ||
15 | #include "swissknife_lease_curl.h" | ||
16 | #include "util/exception.h" | ||
17 | #include "util/pointer.h" | ||
18 | #include "util/string.h" | ||
19 | |||
20 | namespace { | ||
21 | // Maximum number of jobs during a session. No limit, for practical | ||
22 | // purposes. Note that we use uint32_t so that the Tube code works | ||
23 | // correctly with this limit on 32bit systems. | ||
24 | const uint32_t kMaxNumJobs = std::numeric_limits<uint32_t>::max(); | ||
25 | } | ||
26 | |||
27 | namespace upload { | ||
28 | |||
29 | 6 | size_t SendCB(void* ptr, size_t size, size_t nmemb, void* userp) { | |
30 | 6 | CurlSendPayload* payload = static_cast<CurlSendPayload*>(userp); | |
31 | |||
32 | 6 | size_t max_chunk_size = size * nmemb; | |
33 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
|
6 | if (max_chunk_size < 1) { |
34 | ✗ | return 0; | |
35 | } | ||
36 | |||
37 | 6 | size_t current_chunk_size = 0; | |
38 |
2/2✓ Branch 0 taken 8 times.
✓ Branch 1 taken 4 times.
|
12 | while (current_chunk_size < max_chunk_size) { |
39 |
2/2✓ Branch 1 taken 1 times.
✓ Branch 2 taken 7 times.
|
8 | if (payload->index < payload->json_message->size()) { |
40 | // Can add a chunk from the JSON message | ||
41 | const size_t read_size = | ||
42 | 2 | std::min(max_chunk_size - current_chunk_size, | |
43 | 1 | payload->json_message->size() - payload->index); | |
44 | 1 | current_chunk_size += read_size; | |
45 | 1 | std::memcpy(ptr, payload->json_message->data() + payload->index, | |
46 | read_size); | ||
47 | 1 | payload->index += read_size; | |
48 | } else { | ||
49 | // Can add a chunk from the payload | ||
50 | 7 | const size_t max_read_size = max_chunk_size - current_chunk_size; | |
51 | 7 | const unsigned nbytes = payload->pack_serializer->ProduceNext( | |
52 | max_read_size, static_cast<unsigned char*>(ptr) + current_chunk_size); | ||
53 | 7 | current_chunk_size += nbytes; | |
54 | |||
55 |
2/2✓ Branch 0 taken 2 times.
✓ Branch 1 taken 5 times.
|
7 | if (!nbytes) { |
56 | 2 | break; | |
57 | } | ||
58 | } | ||
59 | } | ||
60 | |||
61 | 6 | return current_chunk_size; | |
62 | } | ||
63 | |||
64 | ✗ | size_t RecvCB(void* buffer, size_t size, size_t nmemb, void* userp) { | |
65 | ✗ | std::string* my_buffer = static_cast<std::string*>(userp); | |
66 | |||
67 | ✗ | if (size * nmemb < 1) { | |
68 | ✗ | return 0; | |
69 | } | ||
70 | |||
71 | ✗ | *my_buffer = static_cast<char*>(buffer); | |
72 | |||
73 | ✗ | return my_buffer->size(); | |
74 | } | ||
75 | |||
76 | 8 | SessionContextBase::SessionContextBase() | |
77 | 8 | : upload_results_(kMaxNumJobs), | |
78 | 8 | api_url_(), | |
79 | 8 | session_token_(), | |
80 | 8 | key_id_(), | |
81 | 8 | secret_(), | |
82 | 8 | max_pack_size_(ObjectPack::kDefaultLimit), | |
83 | 8 | active_handles_(), | |
84 | 8 | current_pack_(NULL), | |
85 | 8 | current_pack_mtx_(), | |
86 | 8 | bytes_committed_(0), | |
87 | 8 | bytes_dispatched_(0), | |
88 | 8 | initialized_(false) {} | |
89 | |||
90 | 16 | SessionContextBase::~SessionContextBase() {} | |
91 | |||
92 | 8 | bool SessionContextBase::Initialize(const std::string& api_url, | |
93 | const std::string& session_token, | ||
94 | const std::string& key_id, | ||
95 | const std::string& secret, | ||
96 | uint64_t max_pack_size, | ||
97 | uint64_t max_queue_size) { | ||
98 | 8 | bool ret = true; | |
99 | |||
100 | // Initialize session context lock | ||
101 | pthread_mutexattr_t attr; | ||
102 |
1/2✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
|
16 | if (pthread_mutexattr_init(&attr) || |
103 |
1/2✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
|
16 | pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) || |
104 |
3/6✓ Branch 0 taken 8 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 8 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 8 times.
|
24 | pthread_mutex_init(¤t_pack_mtx_, &attr) || |
105 | 8 | pthread_mutexattr_destroy(&attr)) { | |
106 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
107 | "Could not initialize SessionContext lock."); | ||
108 | ✗ | return false; | |
109 | } | ||
110 | |||
111 | // Set upstream URL and session token | ||
112 |
1/2✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
|
8 | api_url_ = api_url; |
113 |
1/2✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
|
8 | session_token_ = session_token; |
114 |
1/2✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
|
8 | key_id_ = key_id; |
115 |
1/2✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
|
8 | secret_ = secret; |
116 | 8 | max_pack_size_ = max_pack_size; | |
117 | |||
118 | 8 | bytes_committed_ = 0u; | |
119 | 8 | bytes_dispatched_ = 0u; | |
120 | |||
121 |
2/4✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 8 times.
|
8 | assert(upload_results_.IsEmpty()); |
122 | |||
123 | // Ensure that there are not open object packs | ||
124 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 8 times.
|
8 | if (current_pack_) { |
125 | ✗ | LogCvmfs( | |
126 | kLogUploadGateway, kLogStderr, | ||
127 | "Could not initialize SessionContext - Existing open object packs."); | ||
128 | ✗ | ret = false; | |
129 | } | ||
130 | |||
131 |
3/6✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 8 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 8 times.
✗ Branch 6 not taken.
|
8 | ret = InitializeDerived(max_queue_size) && ret; |
132 | |||
133 | 8 | initialized_ = true; | |
134 | |||
135 | 8 | return ret; | |
136 | } | ||
137 | |||
138 | 8 | bool SessionContextBase::Finalize(bool commit, const std::string& old_root_hash, | |
139 | const std::string& new_root_hash, | ||
140 | const RepositoryTag& tag) { | ||
141 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 8 times.
|
8 | assert(active_handles_.empty()); |
142 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 8 times.
|
8 | if (!initialized_) { |
143 | ✗ | assert(!commit); | |
144 | ✗ | return true; | |
145 | } | ||
146 | |||
147 | { | ||
148 | 8 | MutexLockGuard lock(current_pack_mtx_); | |
149 | |||
150 |
5/6✓ Branch 0 taken 4 times.
✓ Branch 1 taken 4 times.
✓ Branch 3 taken 4 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 4 times.
✓ Branch 6 taken 4 times.
|
8 | if (current_pack_ && current_pack_->GetNoObjects() > 0) { |
151 |
1/2✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
|
4 | Dispatch(); |
152 | 4 | current_pack_ = NULL; | |
153 | } | ||
154 | 8 | } | |
155 | |||
156 | 8 | bool results = true; | |
157 |
2/2✓ Branch 1 taken 31 times.
✓ Branch 2 taken 8 times.
|
39 | while (!upload_results_.IsEmpty()) { |
158 | 31 | Future<bool>* future = upload_results_.PopBack(); | |
159 |
2/4✓ Branch 1 taken 31 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 31 times.
✗ Branch 4 not taken.
|
31 | results = future->Get() && results; |
160 |
1/2✓ Branch 0 taken 31 times.
✗ Branch 1 not taken.
|
31 | delete future; |
161 | } | ||
162 | |||
163 |
2/2✓ Branch 0 taken 7 times.
✓ Branch 1 taken 1 times.
|
8 | if (commit) { |
164 |
3/6✓ Branch 1 taken 7 times.
✗ Branch 2 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 7 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 7 times.
|
7 | if (old_root_hash.empty() || new_root_hash.empty()) { |
165 | ✗ | return false; | |
166 | } | ||
167 | 7 | bool commit_result = Commit(old_root_hash, new_root_hash, tag); | |
168 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7 times.
|
7 | if (!commit_result) { |
169 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
170 | "SessionContext: could not commit session. Aborting."); | ||
171 | ✗ | FinalizeDerived(); | |
172 | ✗ | pthread_mutex_destroy(¤t_pack_mtx_); | |
173 | ✗ | initialized_ = false; | |
174 | ✗ | return false; | |
175 | } | ||
176 | } | ||
177 | |||
178 |
2/4✓ Branch 1 taken 8 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 8 times.
✗ Branch 4 not taken.
|
8 | results &= FinalizeDerived() && (bytes_committed_ == bytes_dispatched_); |
179 | |||
180 | 8 | pthread_mutex_destroy(¤t_pack_mtx_); | |
181 | |||
182 | 8 | initialized_ = false; | |
183 | |||
184 | 8 | return results; | |
185 | } | ||
186 | |||
187 | 52 | ObjectPack::BucketHandle SessionContextBase::NewBucket() { | |
188 | 52 | MutexLockGuard lock(current_pack_mtx_); | |
189 |
2/2✓ Branch 0 taken 16 times.
✓ Branch 1 taken 36 times.
|
52 | if (!current_pack_) { |
190 |
2/4✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 16 times.
✗ Branch 5 not taken.
|
16 | current_pack_ = new ObjectPack(max_pack_size_); |
191 | } | ||
192 |
1/2✓ Branch 1 taken 52 times.
✗ Branch 2 not taken.
|
52 | ObjectPack::BucketHandle hd = current_pack_->NewBucket(); |
193 |
1/2✓ Branch 1 taken 52 times.
✗ Branch 2 not taken.
|
52 | active_handles_.push_back(hd); |
194 | 52 | return hd; | |
195 | 52 | } | |
196 | |||
197 | 68 | bool SessionContextBase::CommitBucket(const ObjectPack::BucketContentType type, | |
198 | const shash::Any& id, | ||
199 | const ObjectPack::BucketHandle handle, | ||
200 | const std::string& name, | ||
201 | const bool force_dispatch) { | ||
202 | 68 | MutexLockGuard lock(current_pack_mtx_); | |
203 | |||
204 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 68 times.
|
68 | if (!current_pack_) { |
205 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
206 | "Error: Called SessionBaseContext::CommitBucket without an open " | ||
207 | "ObjectPack."); | ||
208 | ✗ | return false; | |
209 | } | ||
210 | |||
211 | 68 | uint64_t size0 = current_pack_->size(); | |
212 |
1/2✓ Branch 1 taken 68 times.
✗ Branch 2 not taken.
|
68 | bool committed = current_pack_->CommitBucket(type, id, handle, name); |
213 | |||
214 |
2/2✓ Branch 0 taken 52 times.
✓ Branch 1 taken 16 times.
|
68 | if (committed) { // Current pack is still not full |
215 |
1/2✓ Branch 3 taken 52 times.
✗ Branch 4 not taken.
|
104 | active_handles_.erase( |
216 |
1/2✓ Branch 3 taken 52 times.
✗ Branch 4 not taken.
|
52 | std::remove(active_handles_.begin(), active_handles_.end(), handle), |
217 | 52 | active_handles_.end()); | |
218 | 52 | uint64_t size1 = current_pack_->size(); | |
219 | 52 | bytes_committed_ += size1 - size0; | |
220 |
2/2✓ Branch 0 taken 12 times.
✓ Branch 1 taken 40 times.
|
52 | if (force_dispatch) { |
221 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | Dispatch(); |
222 | 12 | current_pack_ = NULL; | |
223 | } | ||
224 | } else { // Current pack is full and can be dispatched | ||
225 | 16 | uint64_t new_size = 0; | |
226 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 15 times.
|
16 | if (handle->capacity > max_pack_size_) { |
227 | 1 | new_size = handle->capacity + 1; | |
228 | } else { | ||
229 | 15 | new_size = max_pack_size_; | |
230 | } | ||
231 |
2/4✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 16 times.
✗ Branch 5 not taken.
|
16 | ObjectPack* new_pack = new ObjectPack(new_size); |
232 |
2/2✓ Branch 1 taken 22 times.
✓ Branch 2 taken 16 times.
|
38 | for (size_t i = 0u; i < active_handles_.size(); ++i) { |
233 |
1/2✓ Branch 2 taken 22 times.
✗ Branch 3 not taken.
|
22 | current_pack_->TransferBucket(active_handles_[i], new_pack); |
234 | } | ||
235 | |||
236 |
2/2✓ Branch 1 taken 15 times.
✓ Branch 2 taken 1 times.
|
16 | if (current_pack_->GetNoObjects() > 0) { |
237 |
1/2✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
|
15 | Dispatch(); |
238 | } | ||
239 | 16 | current_pack_ = new_pack; | |
240 | |||
241 |
1/2✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
|
16 | CommitBucket(type, id, handle, name, false); |
242 | } | ||
243 | |||
244 | 68 | return true; | |
245 | 68 | } | |
246 | |||
247 | 31 | void SessionContextBase::Dispatch() { | |
248 | 31 | MutexLockGuard lock(current_pack_mtx_); | |
249 | |||
250 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 31 times.
|
31 | if (!current_pack_) { |
251 | ✗ | return; | |
252 | } | ||
253 | |||
254 | 31 | bytes_dispatched_ += current_pack_->size(); | |
255 |
2/4✓ Branch 1 taken 31 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 31 times.
✗ Branch 5 not taken.
|
31 | upload_results_.EnqueueFront(DispatchObjectPack(current_pack_)); |
256 |
1/2✓ Branch 1 taken 31 times.
✗ Branch 2 not taken.
|
31 | } |
257 | |||
258 | 8 | SessionContext::SessionContext() | |
259 | : SessionContextBase(), | ||
260 | 8 | upload_jobs_(), | |
261 |
1/2✓ Branch 2 taken 8 times.
✗ Branch 3 not taken.
|
8 | worker_() |
262 | { | ||
263 | 8 | } | |
264 | |||
265 | 8 | bool SessionContext::InitializeDerived(uint64_t max_queue_size) { | |
266 | // Start worker thread | ||
267 |
1/2✓ Branch 2 taken 8 times.
✗ Branch 3 not taken.
|
8 | upload_jobs_ = new Tube<UploadJob>(max_queue_size); |
268 | |||
269 | int retval = | ||
270 | 8 | pthread_create(&worker_, NULL, UploadLoop, reinterpret_cast<void*>(this)); | |
271 | |||
272 | 8 | return !retval; | |
273 | } | ||
274 | |||
275 | 8 | bool SessionContext::FinalizeDerived() { | |
276 | // Note: in FinalizedDerived, we know that the worker is running. The | ||
277 | // SessionContext is called only from GatewayUploader::FinalizeSession(), | ||
278 | // which in turn is from Spooler::FinalizeSession(). The Spooler ensures | ||
279 | // that GatewayUploader::Initialize() is called on construction. | ||
280 | // | ||
281 | // TODO(jblomer): Refactor SessionContext (and Uploader*) classes to | ||
282 | // use a factory method for construction. | ||
283 | // | ||
284 | 8 | upload_jobs_->EnqueueFront(&terminator_); | |
285 | 8 | pthread_join(worker_, NULL); | |
286 | |||
287 | 8 | return true; | |
288 | } | ||
289 | |||
290 | ✗ | bool SessionContext::Commit(const std::string& old_root_hash, | |
291 | const std::string& new_root_hash, | ||
292 | const RepositoryTag& tag) { | ||
293 | ✗ | JsonStringGenerator request_input; | |
294 | ✗ | request_input.Add("old_root_hash", old_root_hash); | |
295 | ✗ | request_input.Add("new_root_hash", new_root_hash); | |
296 | ✗ | request_input.Add("tag_name", tag.name()); | |
297 | // Channels are no longer supported: send 0 (i.e. kChannelTrunk) for | ||
298 | // backwards compatibility with existing gateways | ||
299 | // | ||
300 | ✗ | request_input.Add("tag_channel", 0); | |
301 | ✗ | request_input.Add("tag_description", tag.description()); | |
302 | ✗ | std::string request = request_input.GenerateString(); | |
303 | ✗ | CurlBuffer buffer; | |
304 | ✗ | return MakeEndRequest("POST", key_id_, secret_, session_token_, api_url_, | |
305 | ✗ | request, &buffer); | |
306 | } | ||
307 | |||
308 | 31 | Future<bool> *SessionContext::DispatchObjectPack(ObjectPack* pack) { | |
309 | 31 | UploadJob *job = new UploadJob; | |
310 | 31 | Future<bool> *result = new Future<bool>(); | |
311 | 31 | job->pack = pack; | |
312 | 31 | job->result = result; | |
313 | 31 | upload_jobs_->EnqueueFront(job); | |
314 | 31 | return result; | |
315 | } | ||
316 | |||
317 | ✗ | bool SessionContext::DoUpload(const SessionContext::UploadJob* job) { | |
318 | // Set up the object pack serializer | ||
319 | ✗ | ObjectPackProducer serializer(job->pack); | |
320 | |||
321 | ✗ | shash::Any payload_digest(shash::kSha1); | |
322 | ✗ | serializer.GetDigest(&payload_digest); | |
323 | const std::string json_msg = | ||
324 | ✗ | "{\"session_token\" : \"" + session_token_ + | |
325 | ✗ | "\", \"payload_digest\" : \"" + payload_digest.ToString(false) + | |
326 | ✗ | "\", \"header_size\" : \"" + StringifyInt(serializer.GetHeaderSize()) + | |
327 | ✗ | "\", \"api_version\" : \"" + StringifyInt(gateway::APIVersion()) + "\"}"; | |
328 | |||
329 | // Compute HMAC | ||
330 | ✗ | shash::Any hmac(shash::kSha1); | |
331 | ✗ | shash::HmacString(secret_, json_msg, &hmac); | |
332 | |||
333 | CurlSendPayload payload; | ||
334 | ✗ | payload.json_message = &json_msg; | |
335 | ✗ | payload.pack_serializer = &serializer; | |
336 | ✗ | payload.index = 0; | |
337 | |||
338 | const size_t payload_size = | ||
339 | ✗ | json_msg.size() + serializer.GetHeaderSize() + job->pack->size(); | |
340 | |||
341 | // Prepare the Curl POST request | ||
342 | ✗ | CURL* h_curl = curl_easy_init(); | |
343 | |||
344 | ✗ | if (!h_curl) { | |
345 | ✗ | return false; | |
346 | } | ||
347 | |||
348 | // Set HTTP headers (Authorization and Message-Size) | ||
349 | ✗ | std::string header_str = std::string("Authorization: ") + key_id_ + " " + | |
350 | ✗ | Base64(hmac.ToString(false)); | |
351 | ✗ | struct curl_slist* auth_header = NULL; | |
352 | ✗ | auth_header = curl_slist_append(auth_header, header_str.c_str()); | |
353 | ✗ | header_str = std::string("Message-Size: ") + StringifyInt(json_msg.size()); | |
354 | ✗ | auth_header = curl_slist_append(auth_header, header_str.c_str()); | |
355 | ✗ | curl_easy_setopt(h_curl, CURLOPT_HTTPHEADER, auth_header); | |
356 | |||
357 | ✗ | std::string reply; | |
358 | ✗ | curl_easy_setopt(h_curl, CURLOPT_NOPROGRESS, 1L); | |
359 | ✗ | curl_easy_setopt(h_curl, CURLOPT_USERAGENT, "cvmfs/" CVMFS_VERSION); | |
360 | ✗ | curl_easy_setopt(h_curl, CURLOPT_MAXREDIRS, 50L); | |
361 | ✗ | curl_easy_setopt(h_curl, CURLOPT_CUSTOMREQUEST, "POST"); | |
362 | ✗ | curl_easy_setopt(h_curl, CURLOPT_URL, (api_url_ + "/payloads").c_str()); | |
363 | ✗ | curl_easy_setopt(h_curl, CURLOPT_POSTFIELDS, NULL); | |
364 | ✗ | curl_easy_setopt(h_curl, CURLOPT_POSTFIELDSIZE_LARGE, | |
365 | static_cast<curl_off_t>(payload_size)); | ||
366 | ✗ | curl_easy_setopt(h_curl, CURLOPT_READDATA, &payload); | |
367 | ✗ | curl_easy_setopt(h_curl, CURLOPT_READFUNCTION, SendCB); | |
368 | ✗ | curl_easy_setopt(h_curl, CURLOPT_WRITEFUNCTION, RecvCB); | |
369 | ✗ | curl_easy_setopt(h_curl, CURLOPT_WRITEDATA, &reply); | |
370 | |||
371 | // Perform the Curl POST request | ||
372 | ✗ | CURLcode ret = curl_easy_perform(h_curl); | |
373 | ✗ | if (ret) { | |
374 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
375 | "SessionContext::DoUpload - curl_easy_perform failed: %d", | ||
376 | ret); | ||
377 | } | ||
378 | |||
379 | ✗ | UniquePtr<JsonDocument> reply_json(JsonDocument::Create(reply)); | |
380 | const JSON *reply_status = | ||
381 | ✗ | JsonDocument::SearchInObject(reply_json->root(), "status", JSON_STRING); | |
382 | ✗ | const bool ok = (reply_status != NULL && | |
383 | ✗ | std::string(reply_status->string_value) == "ok"); | |
384 | ✗ | if (!ok) { | |
385 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
386 | "SessionContext::DoUpload - error reply: %s", | ||
387 | reply.c_str()); | ||
388 | } | ||
389 | |||
390 | ✗ | curl_easy_cleanup(h_curl); | |
391 | ✗ | h_curl = NULL; | |
392 | |||
393 | ✗ | return ok && !ret; | |
394 | } | ||
395 | |||
396 | 8 | void* SessionContext::UploadLoop(void* data) { | |
397 | 8 | SessionContext* ctx = reinterpret_cast<SessionContext*>(data); | |
398 | UploadJob *job; | ||
399 | |||
400 | while (true) { | ||
401 | 39 | job = ctx->upload_jobs_->PopBack(); | |
402 |
2/2✓ Branch 0 taken 8 times.
✓ Branch 1 taken 31 times.
|
39 | if (job == &terminator_) |
403 | 8 | return NULL; | |
404 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 31 times.
|
31 | if (!ctx->DoUpload(job)) { |
405 | ✗ | PANIC(kLogStderr, | |
406 | "SessionContext: could not submit payload. Aborting."); | ||
407 | } | ||
408 | 31 | job->result->Set(true); | |
409 |
1/2✓ Branch 0 taken 31 times.
✗ Branch 1 not taken.
|
31 | delete job->pack; |
410 |
1/2✓ Branch 0 taken 31 times.
✗ Branch 1 not taken.
|
31 | delete job; |
411 | } | ||
412 | } | ||
413 | |||
414 | SessionContext::UploadJob SessionContext::terminator_; | ||
415 | |||
416 | } // namespace upload | ||
417 |