| Directory: | cvmfs/ |
|---|---|
| File: | cvmfs/session_context.cc |
| Date: | 2025-11-02 02:35:35 |
| Exec | Total | Coverage | |
|---|---|---|---|
| Lines: | 139 | 224 | 62.1% |
| Branches: | 80 | 300 | 26.7% |
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | /** | ||
| 2 | * This file is part of the CernVM File System. | ||
| 3 | */ | ||
| 4 | |||
| 5 | #include "session_context.h" | ||
| 6 | |||
| 7 | #include <algorithm> | ||
| 8 | #include <limits> | ||
| 9 | |||
| 10 | #include "curl/curl.h" | ||
| 11 | #include "gateway_util.h" | ||
| 12 | #include "json_document.h" | ||
| 13 | #include "json_document_write.h" | ||
| 14 | #include "swissknife_lease_curl.h" | ||
| 15 | #include "util/exception.h" | ||
| 16 | #include "util/pointer.h" | ||
| 17 | #include "util/string.h" | ||
| 18 | |||
| 19 | namespace { | ||
| 20 | // Maximum number of jobs during a session. No limit, for practical | ||
| 21 | // purposes. Note that we use uint32_t so that the Tube code works | ||
| 22 | // correctly with this limit on 32bit systems. | ||
| 23 | const uint32_t kMaxNumJobs = std::numeric_limits<uint32_t>::max(); | ||
| 24 | } // namespace | ||
| 25 | |||
| 26 | namespace upload { | ||
| 27 | |||
| 28 | 240 | size_t SendCB(void *ptr, size_t size, size_t nmemb, void *userp) { | |
| 29 | 240 | CurlSendPayload *payload = static_cast<CurlSendPayload *>(userp); | |
| 30 | |||
| 31 | 240 | const size_t max_chunk_size = size * nmemb; | |
| 32 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 240 times.
|
240 | if (max_chunk_size < 1) { |
| 33 | ✗ | return 0; | |
| 34 | } | ||
| 35 | |||
| 36 | 240 | size_t current_chunk_size = 0; | |
| 37 |
2/2✓ Branch 0 taken 320 times.
✓ Branch 1 taken 160 times.
|
480 | while (current_chunk_size < max_chunk_size) { |
| 38 |
2/2✓ Branch 1 taken 40 times.
✓ Branch 2 taken 280 times.
|
320 | if (payload->index < payload->json_message->size()) { |
| 39 | // Can add a chunk from the JSON message | ||
| 40 | 40 | const size_t read_size = std::min( | |
| 41 | 80 | max_chunk_size - current_chunk_size, | |
| 42 | 40 | payload->json_message->size() - payload->index); | |
| 43 | 40 | current_chunk_size += read_size; | |
| 44 | 40 | std::memcpy(ptr, payload->json_message->data() + payload->index, | |
| 45 | read_size); | ||
| 46 | 40 | payload->index += read_size; | |
| 47 | } else { | ||
| 48 | // Can add a chunk from the payload | ||
| 49 | 280 | const size_t max_read_size = max_chunk_size - current_chunk_size; | |
| 50 | 280 | const unsigned nbytes = payload->pack_serializer->ProduceNext( | |
| 51 | max_read_size, | ||
| 52 | static_cast<unsigned char *>(ptr) + current_chunk_size); | ||
| 53 | 280 | current_chunk_size += nbytes; | |
| 54 | |||
| 55 |
2/2✓ Branch 0 taken 80 times.
✓ Branch 1 taken 200 times.
|
280 | if (!nbytes) { |
| 56 | 80 | break; | |
| 57 | } | ||
| 58 | } | ||
| 59 | } | ||
| 60 | |||
| 61 | 240 | return current_chunk_size; | |
| 62 | } | ||
| 63 | |||
| 64 | ✗ | size_t RecvCB(void *buffer, size_t size, size_t nmemb, void *userp) { | |
| 65 | ✗ | std::string *my_buffer = static_cast<std::string *>(userp); | |
| 66 | |||
| 67 | ✗ | if (size * nmemb < 1) { | |
| 68 | ✗ | return 0; | |
| 69 | } | ||
| 70 | |||
| 71 | ✗ | *my_buffer = static_cast<char *>(buffer); | |
| 72 | |||
| 73 | ✗ | return my_buffer->size(); | |
| 74 | } | ||
| 75 | |||
| 76 | 309 | SessionContextBase::SessionContextBase() | |
| 77 | 309 | : upload_results_(kMaxNumJobs) | |
| 78 | 309 | , api_url_() | |
| 79 | 309 | , session_token_() | |
| 80 | 309 | , key_id_() | |
| 81 | 309 | , secret_() | |
| 82 | 309 | , max_pack_size_(ObjectPack::kDefaultLimit) | |
| 83 | 309 | , active_handles_() | |
| 84 | 309 | , current_pack_(NULL) | |
| 85 | 309 | , current_pack_mtx_() | |
| 86 | 309 | , bytes_committed_(0) | |
| 87 | 309 | , bytes_dispatched_(0) | |
| 88 | 309 | , initialized_(false) { } | |
| 89 | |||
| 90 | 618 | SessionContextBase::~SessionContextBase() { } | |
| 91 | |||
| 92 | 309 | bool SessionContextBase::Initialize(const std::string &api_url, | |
| 93 | const std::string &session_token, | ||
| 94 | const std::string &key_id, | ||
| 95 | const std::string &secret, | ||
| 96 | uint64_t max_pack_size, | ||
| 97 | uint64_t max_queue_size) { | ||
| 98 | 309 | bool ret = true; | |
| 99 | |||
| 100 | // Initialize session context lock | ||
| 101 | pthread_mutexattr_t attr; | ||
| 102 | 309 | if (pthread_mutexattr_init(&attr) | |
| 103 |
1/2✓ Branch 1 taken 309 times.
✗ Branch 2 not taken.
|
309 | || pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) |
| 104 |
1/2✓ Branch 1 taken 309 times.
✗ Branch 2 not taken.
|
309 | || pthread_mutex_init(¤t_pack_mtx_, &attr) |
| 105 |
3/6✓ Branch 0 taken 309 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 309 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 309 times.
|
618 | || pthread_mutexattr_destroy(&attr)) { |
| 106 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
| 107 | "Could not initialize SessionContext lock."); | ||
| 108 | ✗ | return false; | |
| 109 | } | ||
| 110 | |||
| 111 | // Set upstream URL and session token | ||
| 112 |
1/2✓ Branch 1 taken 309 times.
✗ Branch 2 not taken.
|
309 | api_url_ = api_url; |
| 113 |
1/2✓ Branch 1 taken 309 times.
✗ Branch 2 not taken.
|
309 | session_token_ = session_token; |
| 114 |
1/2✓ Branch 1 taken 309 times.
✗ Branch 2 not taken.
|
309 | key_id_ = key_id; |
| 115 |
1/2✓ Branch 1 taken 309 times.
✗ Branch 2 not taken.
|
309 | secret_ = secret; |
| 116 | 309 | max_pack_size_ = max_pack_size; | |
| 117 | |||
| 118 | 309 | bytes_committed_ = 0u; | |
| 119 | 309 | bytes_dispatched_ = 0u; | |
| 120 | |||
| 121 |
2/4✓ Branch 1 taken 309 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 309 times.
|
309 | assert(upload_results_.IsEmpty()); |
| 122 | |||
| 123 | // Ensure that there are not open object packs | ||
| 124 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 309 times.
|
309 | if (current_pack_) { |
| 125 | ✗ | LogCvmfs( | |
| 126 | kLogUploadGateway, kLogStderr, | ||
| 127 | "Could not initialize SessionContext - Existing open object packs."); | ||
| 128 | ✗ | ret = false; | |
| 129 | } | ||
| 130 | |||
| 131 |
3/6✓ Branch 1 taken 309 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 309 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 309 times.
✗ Branch 6 not taken.
|
309 | ret = InitializeDerived(max_queue_size) && ret; |
| 132 | |||
| 133 | 309 | initialized_ = true; | |
| 134 | |||
| 135 | 309 | return ret; | |
| 136 | } | ||
| 137 | |||
| 138 | 309 | bool SessionContextBase::Finalize(bool commit, const std::string &old_root_hash, | |
| 139 | const std::string &new_root_hash, | ||
| 140 | const RepositoryTag &tag) { | ||
| 141 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 309 times.
|
309 | assert(active_handles_.empty()); |
| 142 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 309 times.
|
309 | if (!initialized_) { |
| 143 | ✗ | assert(!commit); | |
| 144 | ✗ | return true; | |
| 145 | } | ||
| 146 | |||
| 147 | { | ||
| 148 | 309 | const MutexLockGuard lock(current_pack_mtx_); | |
| 149 | |||
| 150 |
5/6✓ Branch 0 taken 160 times.
✓ Branch 1 taken 149 times.
✓ Branch 3 taken 160 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 160 times.
✓ Branch 6 taken 149 times.
|
309 | if (current_pack_ && current_pack_->GetNoObjects() > 0) { |
| 151 |
1/2✓ Branch 1 taken 160 times.
✗ Branch 2 not taken.
|
160 | Dispatch(); |
| 152 | 160 | current_pack_ = NULL; | |
| 153 | } | ||
| 154 | 309 | } | |
| 155 | |||
| 156 | 309 | bool results = true; | |
| 157 |
2/2✓ Branch 1 taken 1240 times.
✓ Branch 2 taken 309 times.
|
1549 | while (!upload_results_.IsEmpty()) { |
| 158 | 1240 | Future<bool> *future = upload_results_.PopBack(); | |
| 159 |
2/4✓ Branch 1 taken 1240 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1240 times.
✗ Branch 4 not taken.
|
1240 | results = future->Get() && results; |
| 160 |
1/2✓ Branch 0 taken 1240 times.
✗ Branch 1 not taken.
|
1240 | delete future; |
| 161 | } | ||
| 162 | |||
| 163 |
2/2✓ Branch 0 taken 280 times.
✓ Branch 1 taken 29 times.
|
309 | if (commit) { |
| 164 |
3/6✓ Branch 1 taken 280 times.
✗ Branch 2 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 280 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 280 times.
|
280 | if (old_root_hash.empty() || new_root_hash.empty()) { |
| 165 | ✗ | return false; | |
| 166 | } | ||
| 167 | 280 | const bool commit_result = Commit(old_root_hash, new_root_hash, tag); | |
| 168 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 280 times.
|
280 | if (!commit_result) { |
| 169 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
| 170 | "SessionContext: could not commit session. Aborting."); | ||
| 171 | ✗ | FinalizeDerived(); | |
| 172 | ✗ | pthread_mutex_destroy(¤t_pack_mtx_); | |
| 173 | ✗ | initialized_ = false; | |
| 174 | ✗ | return false; | |
| 175 | } | ||
| 176 | } | ||
| 177 | |||
| 178 |
2/4✓ Branch 1 taken 309 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 309 times.
✗ Branch 4 not taken.
|
309 | results &= FinalizeDerived() && (bytes_committed_ == bytes_dispatched_); |
| 179 | |||
| 180 | 309 | pthread_mutex_destroy(¤t_pack_mtx_); | |
| 181 | |||
| 182 | 309 | initialized_ = false; | |
| 183 | |||
| 184 | 309 | return results; | |
| 185 | } | ||
| 186 | |||
| 187 | 2080 | ObjectPack::BucketHandle SessionContextBase::NewBucket() { | |
| 188 | 2080 | const MutexLockGuard lock(current_pack_mtx_); | |
| 189 |
2/2✓ Branch 0 taken 640 times.
✓ Branch 1 taken 1440 times.
|
2080 | if (!current_pack_) { |
| 190 |
2/4✓ Branch 1 taken 640 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 640 times.
✗ Branch 5 not taken.
|
640 | current_pack_ = new ObjectPack(max_pack_size_); |
| 191 | } | ||
| 192 |
1/2✓ Branch 1 taken 2080 times.
✗ Branch 2 not taken.
|
2080 | ObjectPack::BucketHandle hd = current_pack_->NewBucket(); |
| 193 |
1/2✓ Branch 1 taken 2080 times.
✗ Branch 2 not taken.
|
2080 | active_handles_.push_back(hd); |
| 194 | 2080 | return hd; | |
| 195 | 2080 | } | |
| 196 | |||
| 197 | 2720 | bool SessionContextBase::CommitBucket(const ObjectPack::BucketContentType type, | |
| 198 | const shash::Any &id, | ||
| 199 | const ObjectPack::BucketHandle handle, | ||
| 200 | const std::string &name, | ||
| 201 | const bool force_dispatch) { | ||
| 202 | 2720 | const MutexLockGuard lock(current_pack_mtx_); | |
| 203 | |||
| 204 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2720 times.
|
2720 | if (!current_pack_) { |
| 205 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
| 206 | "Error: Called SessionBaseContext::CommitBucket without an open " | ||
| 207 | "ObjectPack."); | ||
| 208 | ✗ | return false; | |
| 209 | } | ||
| 210 | |||
| 211 | 2720 | const uint64_t size0 = current_pack_->size(); | |
| 212 |
1/2✓ Branch 1 taken 2720 times.
✗ Branch 2 not taken.
|
2720 | const bool committed = current_pack_->CommitBucket(type, id, handle, name); |
| 213 | |||
| 214 |
2/2✓ Branch 0 taken 2080 times.
✓ Branch 1 taken 640 times.
|
2720 | if (committed) { // Current pack is still not full |
| 215 |
1/2✓ Branch 3 taken 2080 times.
✗ Branch 4 not taken.
|
4160 | active_handles_.erase( |
| 216 |
1/2✓ Branch 3 taken 2080 times.
✗ Branch 4 not taken.
|
2080 | std::remove(active_handles_.begin(), active_handles_.end(), handle), |
| 217 | 2080 | active_handles_.end()); | |
| 218 | 2080 | const uint64_t size1 = current_pack_->size(); | |
| 219 | 2080 | bytes_committed_ += size1 - size0; | |
| 220 |
2/2✓ Branch 0 taken 480 times.
✓ Branch 1 taken 1600 times.
|
2080 | if (force_dispatch) { |
| 221 |
1/2✓ Branch 1 taken 480 times.
✗ Branch 2 not taken.
|
480 | Dispatch(); |
| 222 | 480 | current_pack_ = NULL; | |
| 223 | } | ||
| 224 | } else { // Current pack is full and can be dispatched | ||
| 225 | 640 | uint64_t new_size = 0; | |
| 226 |
2/2✓ Branch 0 taken 40 times.
✓ Branch 1 taken 600 times.
|
640 | if (handle->capacity > max_pack_size_) { |
| 227 | 40 | new_size = handle->capacity + 1; | |
| 228 | } else { | ||
| 229 | 600 | new_size = max_pack_size_; | |
| 230 | } | ||
| 231 |
2/4✓ Branch 1 taken 640 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 640 times.
✗ Branch 5 not taken.
|
640 | ObjectPack *new_pack = new ObjectPack(new_size); |
| 232 |
2/2✓ Branch 1 taken 880 times.
✓ Branch 2 taken 640 times.
|
1520 | for (size_t i = 0u; i < active_handles_.size(); ++i) { |
| 233 |
1/2✓ Branch 2 taken 880 times.
✗ Branch 3 not taken.
|
880 | current_pack_->TransferBucket(active_handles_[i], new_pack); |
| 234 | } | ||
| 235 | |||
| 236 |
2/2✓ Branch 1 taken 600 times.
✓ Branch 2 taken 40 times.
|
640 | if (current_pack_->GetNoObjects() > 0) { |
| 237 |
1/2✓ Branch 1 taken 600 times.
✗ Branch 2 not taken.
|
600 | Dispatch(); |
| 238 | } | ||
| 239 | 640 | current_pack_ = new_pack; | |
| 240 | |||
| 241 |
1/2✓ Branch 1 taken 640 times.
✗ Branch 2 not taken.
|
640 | CommitBucket(type, id, handle, name, false); |
| 242 | } | ||
| 243 | |||
| 244 | 2720 | return true; | |
| 245 | 2720 | } | |
| 246 | |||
| 247 | 1240 | void SessionContextBase::Dispatch() { | |
| 248 | 1240 | const MutexLockGuard lock(current_pack_mtx_); | |
| 249 | |||
| 250 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1240 times.
|
1240 | if (!current_pack_) { |
| 251 | ✗ | return; | |
| 252 | } | ||
| 253 | |||
| 254 | 1240 | bytes_dispatched_ += current_pack_->size(); | |
| 255 |
2/4✓ Branch 1 taken 1240 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1240 times.
✗ Branch 5 not taken.
|
1240 | upload_results_.EnqueueFront(DispatchObjectPack(current_pack_)); |
| 256 |
1/2✓ Branch 1 taken 1240 times.
✗ Branch 2 not taken.
|
1240 | } |
| 257 | |||
| 258 | 309 | SessionContext::SessionContext() | |
| 259 |
1/2✓ Branch 2 taken 309 times.
✗ Branch 3 not taken.
|
309 | : SessionContextBase(), upload_jobs_(), worker_() { } |
| 260 | |||
| 261 | 309 | bool SessionContext::InitializeDerived(uint64_t max_queue_size) { | |
| 262 | // Start worker thread | ||
| 263 |
1/2✓ Branch 2 taken 309 times.
✗ Branch 3 not taken.
|
309 | upload_jobs_ = new Tube<UploadJob>(max_queue_size); |
| 264 | |||
| 265 | 309 | const int retval = pthread_create(&worker_, NULL, UploadLoop, | |
| 266 | reinterpret_cast<void *>(this)); | ||
| 267 | |||
| 268 | 309 | return !retval; | |
| 269 | } | ||
| 270 | |||
| 271 | 309 | bool SessionContext::FinalizeDerived() { | |
| 272 | // Note: in FinalizedDerived, we know that the worker is running. The | ||
| 273 | // SessionContext is called only from GatewayUploader::FinalizeSession(), | ||
| 274 | // which in turn is from Spooler::FinalizeSession(). The Spooler ensures | ||
| 275 | // that GatewayUploader::Initialize() is called on construction. | ||
| 276 | // | ||
| 277 | // TODO(jblomer): Refactor SessionContext (and Uploader*) classes to | ||
| 278 | // use a factory method for construction. | ||
| 279 | // | ||
| 280 | 309 | upload_jobs_->EnqueueFront(&terminator_); | |
| 281 | 309 | pthread_join(worker_, NULL); | |
| 282 | |||
| 283 | 309 | return true; | |
| 284 | } | ||
| 285 | |||
| 286 | ✗ | bool SessionContext::Commit(const std::string &old_root_hash, | |
| 287 | const std::string &new_root_hash, | ||
| 288 | const RepositoryTag &tag) { | ||
| 289 | ✗ | JsonStringGenerator request_input; | |
| 290 | ✗ | request_input.Add("old_root_hash", old_root_hash); | |
| 291 | ✗ | request_input.Add("new_root_hash", new_root_hash); | |
| 292 | ✗ | request_input.Add("tag_name", tag.name()); | |
| 293 | // Channels are no longer supported: send 0 (i.e. kChannelTrunk) for | ||
| 294 | // backwards compatibility with existing gateways | ||
| 295 | // | ||
| 296 | ✗ | request_input.Add("tag_channel", 0); | |
| 297 | ✗ | request_input.Add("tag_description", tag.description()); | |
| 298 | ✗ | const std::string request = request_input.GenerateString(); | |
| 299 | ✗ | CurlBuffer buffer; | |
| 300 | ✗ | return MakeEndRequest("POST", key_id_, secret_, session_token_, api_url_, | |
| 301 | ✗ | request, &buffer); | |
| 302 | } | ||
| 303 | |||
| 304 | 1240 | Future<bool> *SessionContext::DispatchObjectPack(ObjectPack *pack) { | |
| 305 | 1240 | UploadJob *job = new UploadJob; | |
| 306 | 1240 | Future<bool> *result = new Future<bool>(); | |
| 307 | 1240 | job->pack = pack; | |
| 308 | 1240 | job->result = result; | |
| 309 | 1240 | upload_jobs_->EnqueueFront(job); | |
| 310 | 1240 | return result; | |
| 311 | } | ||
| 312 | |||
| 313 | ✗ | bool SessionContext::DoUpload(const SessionContext::UploadJob *job) { | |
| 314 | // Set up the object pack serializer | ||
| 315 | ✗ | ObjectPackProducer serializer(job->pack); | |
| 316 | |||
| 317 | ✗ | shash::Any payload_digest(shash::kSha1); | |
| 318 | ✗ | serializer.GetDigest(&payload_digest); | |
| 319 | ✗ | const std::string json_msg = "{\"session_token\" : \"" + session_token_ | |
| 320 | ✗ | + "\", \"payload_digest\" : \"" | |
| 321 | ✗ | + payload_digest.ToString(false) | |
| 322 | ✗ | + "\", \"header_size\" : \"" | |
| 323 | ✗ | + StringifyInt(serializer.GetHeaderSize()) | |
| 324 | ✗ | + "\", \"api_version\" : \"" | |
| 325 | ✗ | + StringifyInt(gateway::APIVersion()) + "\"}"; | |
| 326 | |||
| 327 | // Compute HMAC | ||
| 328 | ✗ | shash::Any hmac(shash::kSha1); | |
| 329 | ✗ | shash::HmacString(secret_, json_msg, &hmac); | |
| 330 | |||
| 331 | CurlSendPayload payload; | ||
| 332 | ✗ | payload.json_message = &json_msg; | |
| 333 | ✗ | payload.pack_serializer = &serializer; | |
| 334 | ✗ | payload.index = 0; | |
| 335 | |||
| 336 | ✗ | const size_t payload_size = json_msg.size() + serializer.GetHeaderSize() | |
| 337 | ✗ | + job->pack->size(); | |
| 338 | |||
| 339 | // Prepare the Curl POST request | ||
| 340 | ✗ | CURL *h_curl = curl_easy_init(); | |
| 341 | |||
| 342 | ✗ | if (!h_curl) { | |
| 343 | ✗ | return false; | |
| 344 | } | ||
| 345 | |||
| 346 | // Set HTTP headers (Authorization and Message-Size) | ||
| 347 | ✗ | std::string header_str = std::string("Authorization: ") + key_id_ + " " | |
| 348 | ✗ | + Base64(hmac.ToString(false)); | |
| 349 | ✗ | struct curl_slist *auth_header = NULL; | |
| 350 | ✗ | auth_header = curl_slist_append(auth_header, header_str.c_str()); | |
| 351 | ✗ | header_str = std::string("Message-Size: ") + StringifyInt(json_msg.size()); | |
| 352 | ✗ | auth_header = curl_slist_append(auth_header, header_str.c_str()); | |
| 353 | ✗ | curl_easy_setopt(h_curl, CURLOPT_HTTPHEADER, auth_header); | |
| 354 | |||
| 355 | ✗ | std::string reply; | |
| 356 | ✗ | curl_easy_setopt(h_curl, CURLOPT_NOPROGRESS, 1L); | |
| 357 | ✗ | curl_easy_setopt(h_curl, CURLOPT_USERAGENT, "cvmfs/" CVMFS_VERSION); | |
| 358 | ✗ | curl_easy_setopt(h_curl, CURLOPT_MAXREDIRS, 50L); | |
| 359 | ✗ | curl_easy_setopt(h_curl, CURLOPT_CUSTOMREQUEST, "POST"); | |
| 360 | ✗ | curl_easy_setopt(h_curl, CURLOPT_URL, (api_url_ + "/payloads").c_str()); | |
| 361 | ✗ | curl_easy_setopt(h_curl, CURLOPT_POSTFIELDS, NULL); | |
| 362 | ✗ | curl_easy_setopt(h_curl, CURLOPT_POSTFIELDSIZE_LARGE, | |
| 363 | static_cast<curl_off_t>(payload_size)); | ||
| 364 | ✗ | curl_easy_setopt(h_curl, CURLOPT_READDATA, &payload); | |
| 365 | ✗ | curl_easy_setopt(h_curl, CURLOPT_READFUNCTION, SendCB); | |
| 366 | ✗ | curl_easy_setopt(h_curl, CURLOPT_WRITEFUNCTION, RecvCB); | |
| 367 | ✗ | curl_easy_setopt(h_curl, CURLOPT_WRITEDATA, &reply); | |
| 368 | |||
| 369 | // Perform the Curl POST request | ||
| 370 | ✗ | const CURLcode ret = curl_easy_perform(h_curl); | |
| 371 | ✗ | if (ret) { | |
| 372 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
| 373 | "SessionContext::DoUpload - curl_easy_perform failed: %d", ret); | ||
| 374 | } | ||
| 375 | |||
| 376 | ✗ | const UniquePtr<JsonDocument> reply_json(JsonDocument::Create(reply)); | |
| 377 | ✗ | const JSON *reply_status = JsonDocument::SearchInObject( | |
| 378 | reply_json->root(), "status", JSON_STRING); | ||
| 379 | const bool ok = (reply_status != NULL | ||
| 380 | ✗ | && std::string(reply_status->string_value) == "ok"); | |
| 381 | ✗ | if (!ok) { | |
| 382 | ✗ | LogCvmfs(kLogUploadGateway, kLogStderr, | |
| 383 | "SessionContext::DoUpload - error reply: %s", reply.c_str()); | ||
| 384 | } | ||
| 385 | |||
| 386 | ✗ | curl_easy_cleanup(h_curl); | |
| 387 | ✗ | h_curl = NULL; | |
| 388 | |||
| 389 | ✗ | return ok && !ret; | |
| 390 | } | ||
| 391 | |||
| 392 | 309 | void *SessionContext::UploadLoop(void *data) { | |
| 393 | 309 | SessionContext *ctx = reinterpret_cast<SessionContext *>(data); | |
| 394 | UploadJob *job; | ||
| 395 | |||
| 396 | while (true) { | ||
| 397 | 1549 | job = ctx->upload_jobs_->PopBack(); | |
| 398 |
2/2✓ Branch 0 taken 309 times.
✓ Branch 1 taken 1240 times.
|
1549 | if (job == &terminator_) |
| 399 | 309 | return NULL; | |
| 400 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 1240 times.
|
1240 | if (!ctx->DoUpload(job)) { |
| 401 | ✗ | PANIC(kLogStderr, "SessionContext: could not submit payload. Aborting."); | |
| 402 | } | ||
| 403 | 1240 | job->result->Set(true); | |
| 404 |
1/2✓ Branch 0 taken 1240 times.
✗ Branch 1 not taken.
|
1240 | delete job->pack; |
| 405 |
1/2✓ Branch 0 taken 1240 times.
✗ Branch 1 not taken.
|
1240 | delete job; |
| 406 | } | ||
| 407 | } | ||
| 408 | |||
| 409 | SessionContext::UploadJob SessionContext::terminator_; | ||
| 410 | |||
| 411 | } // namespace upload | ||
| 412 |