| Directory: | cvmfs/ |
|---|---|
| File: | cvmfs/receiver/payload_processor.cc |
| Date: | 2025-11-02 02:35:35 |
| Exec | Total | Coverage | |
|---|---|---|---|
| Lines: | 33 | 134 | 24.6% |
| Branches: | 24 | 164 | 14.6% |
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | /** | ||
| 2 | * This file is part of the CernVM File System. | ||
| 3 | */ | ||
| 4 | |||
| 5 | #include "payload_processor.h" | ||
| 6 | |||
| 7 | #include <fcntl.h> | ||
| 8 | #include <unistd.h> | ||
| 9 | |||
| 10 | #include <vector> | ||
| 11 | |||
| 12 | #include "params.h" | ||
| 13 | #include "util/logging.h" | ||
| 14 | #include "util/posix.h" | ||
| 15 | #include "util/string.h" | ||
| 16 | |||
| 17 | namespace { | ||
| 18 | |||
| 19 | const size_t kConsumerBuffer = 10 * 1024 * 1024; // 10 MB | ||
| 20 | |||
| 21 | } | ||
| 22 | |||
| 23 | namespace receiver { | ||
| 24 | |||
| 25 | ✗ | FileInfo::FileInfo() | |
| 26 | ✗ | : handle(NULL) | |
| 27 | ✗ | , total_size(0) | |
| 28 | ✗ | , current_size(0) | |
| 29 | ✗ | , hash_context() | |
| 30 | ✗ | , hash_buffer() { } | |
| 31 | |||
| 32 | ✗ | FileInfo::FileInfo(const ObjectPackBuild::Event &event) | |
| 33 | ✗ | : handle(NULL) | |
| 34 | ✗ | , total_size(event.size) | |
| 35 | ✗ | , current_size(0) | |
| 36 | ✗ | , hash_context(shash::ContextPtr(event.id.algorithm)) | |
| 37 | ✗ | , hash_buffer(hash_context.size, 0) { | |
| 38 | ✗ | hash_context.buffer = &hash_buffer[0]; | |
| 39 | ✗ | shash::Init(hash_context); | |
| 40 | } | ||
| 41 | |||
| 42 | ✗ | FileInfo::FileInfo(const FileInfo &other) | |
| 43 | ✗ | : handle(other.handle) | |
| 44 | ✗ | , total_size(other.total_size) | |
| 45 | ✗ | , current_size(other.current_size) | |
| 46 | ✗ | , hash_context(other.hash_context) | |
| 47 | ✗ | , hash_buffer(other.hash_buffer) { | |
| 48 | ✗ | hash_context.buffer = &hash_buffer[0]; | |
| 49 | } | ||
| 50 | |||
| 51 | ✗ | FileInfo &FileInfo::operator=(const FileInfo &other) { | |
| 52 | ✗ | handle = other.handle; | |
| 53 | ✗ | total_size = other.total_size; | |
| 54 | ✗ | current_size = other.current_size; | |
| 55 | ✗ | hash_context = other.hash_context; | |
| 56 | ✗ | hash_buffer = other.hash_buffer; | |
| 57 | ✗ | hash_context.buffer = &hash_buffer[0]; | |
| 58 | |||
| 59 | ✗ | return *this; | |
| 60 | } | ||
| 61 | |||
| 62 | 64 | PayloadProcessor::PayloadProcessor() | |
| 63 | 64 | : pending_files_() | |
| 64 | 64 | , current_repo_() | |
| 65 |
1/2✓ Branch 1 taken 64 times.
✗ Branch 2 not taken.
|
64 | , uploader_() |
| 66 |
1/2✓ Branch 1 taken 64 times.
✗ Branch 2 not taken.
|
64 | , temp_dir_() |
| 67 | 64 | , num_errors_(0) | |
| 68 |
1/2✓ Branch 2 taken 64 times.
✗ Branch 3 not taken.
|
128 | , statistics_(NULL) { } |
| 69 | |||
| 70 | 128 | PayloadProcessor::~PayloadProcessor() { } | |
| 71 | |||
| 72 | 64 | PayloadProcessor::Result PayloadProcessor::Process( | |
| 73 | int fdin, const std::string &header_digest, const std::string &path, | ||
| 74 | uint64_t header_size) { | ||
| 75 |
1/2✓ Branch 3 taken 64 times.
✗ Branch 4 not taken.
|
64 | LogCvmfs(kLogReceiver, kLogSyslog, |
| 76 | "PayloadProcessor - lease_path: %s, header digest: %s, header " | ||
| 77 | "size: %ld", | ||
| 78 | path.c_str(), header_digest.c_str(), header_size); | ||
| 79 | |||
| 80 | 64 | const size_t first_slash_idx = path.find('/', 0); | |
| 81 | |||
| 82 |
1/2✓ Branch 1 taken 64 times.
✗ Branch 2 not taken.
|
64 | current_repo_ = path.substr(0, first_slash_idx); |
| 83 | |||
| 84 |
1/2✓ Branch 1 taken 64 times.
✗ Branch 2 not taken.
|
64 | const Result init_result = Initialize(); |
| 85 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 64 times.
|
64 | if (init_result != kSuccess) { |
| 86 | ✗ | return init_result; | |
| 87 | } | ||
| 88 | |||
| 89 | // Set up object pack deserialization | ||
| 90 |
1/2✓ Branch 2 taken 64 times.
✗ Branch 3 not taken.
|
64 | const shash::Any digest = shash::MkFromHexPtr(shash::HexPtr(header_digest)); |
| 91 | |||
| 92 |
1/2✓ Branch 1 taken 64 times.
✗ Branch 2 not taken.
|
64 | ObjectPackConsumer deserializer(digest, header_size); |
| 93 |
1/2✓ Branch 1 taken 64 times.
✗ Branch 2 not taken.
|
64 | deserializer.RegisterListener(&PayloadProcessor::ConsumerEventCallback, this); |
| 94 | |||
| 95 | 64 | int nb = 0; | |
| 96 | 64 | ObjectPackBuild::State consumer_state = ObjectPackBuild::kStateContinue; | |
| 97 |
1/2✓ Branch 2 taken 64 times.
✗ Branch 3 not taken.
|
64 | std::vector<unsigned char> buffer(kConsumerBuffer, 0); |
| 98 | do { | ||
| 99 |
1/2✓ Branch 3 taken 64 times.
✗ Branch 4 not taken.
|
64 | nb = read(fdin, &buffer[0], buffer.size()); |
| 100 |
1/2✓ Branch 2 taken 64 times.
✗ Branch 3 not taken.
|
64 | consumer_state = deserializer.ConsumeNext(nb, &buffer[0]); |
| 101 |
1/2✓ Branch 0 taken 64 times.
✗ Branch 1 not taken.
|
64 | if (consumer_state != ObjectPackBuild::kStateContinue |
| 102 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 64 times.
|
64 | && consumer_state != ObjectPackBuild::kStateDone) { |
| 103 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 104 | "PayloadProcessor - error: %d encountered when consuming object " | ||
| 105 | "pack.", | ||
| 106 | consumer_state); | ||
| 107 | ✗ | break; | |
| 108 | } | ||
| 109 |
2/4✓ Branch 0 taken 64 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 64 times.
|
64 | } while (nb > 0 && consumer_state != ObjectPackBuild::kStateDone); |
| 110 | |||
| 111 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 64 times.
|
64 | assert(pending_files_.empty()); |
| 112 | |||
| 113 |
1/2✓ Branch 1 taken 64 times.
✗ Branch 2 not taken.
|
64 | const Result res = Finalize(); |
| 114 | |||
| 115 |
1/2✓ Branch 1 taken 64 times.
✗ Branch 2 not taken.
|
64 | deserializer.UnregisterListeners(); |
| 116 | |||
| 117 | 64 | return res; | |
| 118 | 64 | } | |
| 119 | |||
| 120 | ✗ | void PayloadProcessor::ConsumerEventCallback( | |
| 121 | const ObjectPackBuild::Event &event) { | ||
| 122 | ✗ | std::string path(""); | |
| 123 | |||
| 124 | ✗ | if (event.object_type == ObjectPack::kCas) { | |
| 125 | ✗ | path = event.id.MakePath(); | |
| 126 | ✗ | } else if (event.object_type == ObjectPack::kNamed) { | |
| 127 | ✗ | path = event.object_name; | |
| 128 | } else { | ||
| 129 | // kEmpty - this is an error. | ||
| 130 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 131 | "PayloadProcessor - error: Event received with unknown object."); | ||
| 132 | ✗ | num_errors_++; | |
| 133 | ✗ | return; | |
| 134 | } | ||
| 135 | |||
| 136 | ✗ | const FileIterator it = pending_files_.find(event.id); | |
| 137 | ✗ | if (it == pending_files_.end()) { | |
| 138 | // Schedule file upload if it's not being uploaded yet. | ||
| 139 | // Uploaders later check if the file is already present | ||
| 140 | // in the upstream storage and will not upload it twice. | ||
| 141 | ✗ | FileInfo info(event); | |
| 142 | // info.handle is later deleted by FinalizeStreamedUpload | ||
| 143 | ✗ | info.handle = uploader_->InitStreamedUpload(NULL); | |
| 144 | ✗ | pending_files_[event.id] = info; | |
| 145 | } | ||
| 146 | |||
| 147 | ✗ | FileInfo &info = pending_files_[event.id]; | |
| 148 | |||
| 149 | ✗ | void *buf_copied = smalloc(event.buf_size); | |
| 150 | ✗ | memcpy(buf_copied, event.buf, event.buf_size); | |
| 151 | ✗ | const upload::AbstractUploader::UploadBuffer buf(event.buf_size, buf_copied); | |
| 152 | ✗ | uploader_->ScheduleUpload( | |
| 153 | info.handle, buf, | ||
| 154 | ✗ | upload::AbstractUploader::MakeClosure( | |
| 155 | &PayloadProcessor::OnUploadJobComplete, this, buf_copied)); | ||
| 156 | |||
| 157 | ✗ | shash::Update(static_cast<const unsigned char *>(event.buf), | |
| 158 | ✗ | event.buf_size, | |
| 159 | info.hash_context); | ||
| 160 | |||
| 161 | ✗ | info.current_size += event.buf_size; | |
| 162 | |||
| 163 | ✗ | if (info.current_size == info.total_size) { | |
| 164 | ✗ | shash::Any file_hash(event.id.algorithm); | |
| 165 | ✗ | shash::Final(info.hash_context, &file_hash); | |
| 166 | |||
| 167 | ✗ | if (file_hash != event.id) { | |
| 168 | ✗ | LogCvmfs( | |
| 169 | kLogReceiver, kLogSyslogErr, | ||
| 170 | "PayloadProcessor - error: Hash mismatch for unpacked file: event " | ||
| 171 | "size: %ld, file size: %ld, event hash: %s, file hash: %s", | ||
| 172 | ✗ | event.size, info.current_size, event.id.ToString(true).c_str(), | |
| 173 | ✗ | file_hash.ToString(true).c_str()); | |
| 174 | ✗ | num_errors_++; | |
| 175 | ✗ | return; | |
| 176 | } | ||
| 177 | // override final remote path if not CAS object | ||
| 178 | ✗ | if (event.object_type == ObjectPack::kNamed) { | |
| 179 | ✗ | info.handle->remote_path = path; | |
| 180 | } | ||
| 181 | ✗ | uploader_->ScheduleCommit(info.handle, event.id); | |
| 182 | |||
| 183 | ✗ | pending_files_.erase(event.id); | |
| 184 | } | ||
| 185 | } | ||
| 186 | |||
| 187 | ✗ | void PayloadProcessor::OnUploadJobComplete( | |
| 188 | const upload::UploaderResults &results, void *buffer) { | ||
| 189 | ✗ | free(buffer); | |
| 190 | } | ||
| 191 | |||
| 192 | 28 | void PayloadProcessor::SetStatistics(perf::Statistics *st) { | |
| 193 |
4/8✓ Branch 2 taken 28 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 28 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 28 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 28 times.
✗ Branch 12 not taken.
|
28 | statistics_ = new perf::StatisticsTemplate("publish", st); |
| 194 | 28 | } | |
| 195 | |||
| 196 | ✗ | PayloadProcessor::Result PayloadProcessor::Initialize() { | |
| 197 | ✗ | Params params; | |
| 198 | ✗ | if (!GetParamsFromFile(current_repo_, ¶ms)) { | |
| 199 | ✗ | LogCvmfs( | |
| 200 | kLogReceiver, kLogSyslogErr, | ||
| 201 | "PayloadProcessor - error: Could not get configuration parameters."); | ||
| 202 | ✗ | return kOtherError; | |
| 203 | } | ||
| 204 | |||
| 205 | const std::string spooler_temp_dir = GetSpoolerTempDir( | ||
| 206 | ✗ | params.spooler_configuration); | |
| 207 | ✗ | assert(!spooler_temp_dir.empty()); | |
| 208 | ✗ | assert(MkdirDeep(spooler_temp_dir + "/receiver", 0770, true)); | |
| 209 | temp_dir_ = RaiiTempDir::Create(spooler_temp_dir | ||
| 210 | ✗ | + "/receiver/payload_processor"); | |
| 211 | |||
| 212 | const upload::SpoolerDefinition definition( | ||
| 213 | params.spooler_configuration, params.hash_alg, params.compression_alg, | ||
| 214 | ✗ | params.generate_legacy_bulk_chunks, params.use_file_chunking, | |
| 215 | params.min_chunk_size, params.avg_chunk_size, params.max_chunk_size, | ||
| 216 | ✗ | "dummy_token", "dummy_key"); | |
| 217 | |||
| 218 | ✗ | uploader_.Destroy(); | |
| 219 | |||
| 220 | // configure the uploader environment | ||
| 221 | ✗ | uploader_ = upload::AbstractUploader::Construct(definition); | |
| 222 | ✗ | if (!uploader_.IsValid()) { | |
| 223 | ✗ | LogCvmfs(kLogSpooler, kLogWarning, | |
| 224 | "Failed to initialize backend upload " | ||
| 225 | "facility in PayloadProcessor."); | ||
| 226 | ✗ | return kUploaderError; | |
| 227 | } | ||
| 228 | |||
| 229 | ✗ | if (statistics_.IsValid()) { | |
| 230 | ✗ | uploader_->InitCounters(statistics_.weak_ref()); | |
| 231 | } | ||
| 232 | |||
| 233 | ✗ | return kSuccess; | |
| 234 | } | ||
| 235 | |||
| 236 | ✗ | PayloadProcessor::Result PayloadProcessor::Finalize() { | |
| 237 | ✗ | uploader_->WaitForUpload(); | |
| 238 | ✗ | temp_dir_.Destroy(); | |
| 239 | |||
| 240 | ✗ | const unsigned num_uploader_errors = uploader_->GetNumberOfErrors(); | |
| 241 | ✗ | uploader_->TearDown(); | |
| 242 | ✗ | if (num_uploader_errors > 0) { | |
| 243 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 244 | "PayloadProcessor - error: Uploader - %d upload(s) failed.", | ||
| 245 | num_uploader_errors); | ||
| 246 | ✗ | return kUploaderError; | |
| 247 | } | ||
| 248 | |||
| 249 | ✗ | if (GetNumErrors() > 0) { | |
| 250 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 251 | "PayloadProcessor - error: %d unpacking error(s).", | ||
| 252 | GetNumErrors()); | ||
| 253 | ✗ | return kOtherError; | |
| 254 | } | ||
| 255 | |||
| 256 | ✗ | return kSuccess; | |
| 257 | } | ||
| 258 | |||
| 259 | } // namespace receiver | ||
| 260 |