Directory: | cvmfs/ |
---|---|
File: | cvmfs/receiver/payload_processor.cc |
Date: | 2025-06-22 02:36:02 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 33 | 134 | 24.6% |
Branches: | 24 | 164 | 14.6% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #include "payload_processor.h" | ||
6 | |||
7 | #include <fcntl.h> | ||
8 | #include <unistd.h> | ||
9 | |||
10 | #include <vector> | ||
11 | |||
12 | #include "params.h" | ||
13 | #include "util/logging.h" | ||
14 | #include "util/posix.h" | ||
15 | #include "util/string.h" | ||
16 | |||
17 | namespace { | ||
18 | |||
19 | const size_t kConsumerBuffer = 10 * 1024 * 1024; // 10 MB | ||
20 | |||
21 | } | ||
22 | |||
23 | namespace receiver { | ||
24 | |||
25 | ✗ | FileInfo::FileInfo() | |
26 | ✗ | : handle(NULL) | |
27 | ✗ | , total_size(0) | |
28 | ✗ | , current_size(0) | |
29 | ✗ | , hash_context() | |
30 | ✗ | , hash_buffer() { } | |
31 | |||
32 | ✗ | FileInfo::FileInfo(const ObjectPackBuild::Event &event) | |
33 | ✗ | : handle(NULL) | |
34 | ✗ | , total_size(event.size) | |
35 | ✗ | , current_size(0) | |
36 | ✗ | , hash_context(shash::ContextPtr(event.id.algorithm)) | |
37 | ✗ | , hash_buffer(hash_context.size, 0) { | |
38 | ✗ | hash_context.buffer = &hash_buffer[0]; | |
39 | ✗ | shash::Init(hash_context); | |
40 | } | ||
41 | |||
42 | ✗ | FileInfo::FileInfo(const FileInfo &other) | |
43 | ✗ | : handle(other.handle) | |
44 | ✗ | , total_size(other.total_size) | |
45 | ✗ | , current_size(other.current_size) | |
46 | ✗ | , hash_context(other.hash_context) | |
47 | ✗ | , hash_buffer(other.hash_buffer) { | |
48 | ✗ | hash_context.buffer = &hash_buffer[0]; | |
49 | } | ||
50 | |||
51 | ✗ | FileInfo &FileInfo::operator=(const FileInfo &other) { | |
52 | ✗ | handle = other.handle; | |
53 | ✗ | total_size = other.total_size; | |
54 | ✗ | current_size = other.current_size; | |
55 | ✗ | hash_context = other.hash_context; | |
56 | ✗ | hash_buffer = other.hash_buffer; | |
57 | ✗ | hash_context.buffer = &hash_buffer[0]; | |
58 | |||
59 | ✗ | return *this; | |
60 | } | ||
61 | |||
62 | 17 | PayloadProcessor::PayloadProcessor() | |
63 | 17 | : pending_files_() | |
64 | 17 | , current_repo_() | |
65 |
1/2✓ Branch 1 taken 17 times.
✗ Branch 2 not taken.
|
17 | , uploader_() |
66 |
1/2✓ Branch 1 taken 17 times.
✗ Branch 2 not taken.
|
17 | , temp_dir_() |
67 | 17 | , num_errors_(0) | |
68 |
1/2✓ Branch 2 taken 17 times.
✗ Branch 3 not taken.
|
34 | , statistics_(NULL) { } |
69 | |||
70 | 34 | PayloadProcessor::~PayloadProcessor() { } | |
71 | |||
72 | 17 | PayloadProcessor::Result PayloadProcessor::Process( | |
73 | int fdin, const std::string &header_digest, const std::string &path, | ||
74 | uint64_t header_size) { | ||
75 |
1/2✓ Branch 3 taken 17 times.
✗ Branch 4 not taken.
|
17 | LogCvmfs(kLogReceiver, kLogSyslog, |
76 | "PayloadProcessor - lease_path: %s, header digest: %s, header " | ||
77 | "size: %ld", | ||
78 | path.c_str(), header_digest.c_str(), header_size); | ||
79 | |||
80 | 17 | const size_t first_slash_idx = path.find('/', 0); | |
81 | |||
82 |
1/2✓ Branch 1 taken 17 times.
✗ Branch 2 not taken.
|
17 | current_repo_ = path.substr(0, first_slash_idx); |
83 | |||
84 |
1/2✓ Branch 1 taken 17 times.
✗ Branch 2 not taken.
|
17 | const Result init_result = Initialize(); |
85 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 17 times.
|
17 | if (init_result != kSuccess) { |
86 | ✗ | return init_result; | |
87 | } | ||
88 | |||
89 | // Set up object pack deserialization | ||
90 |
1/2✓ Branch 2 taken 17 times.
✗ Branch 3 not taken.
|
17 | const shash::Any digest = shash::MkFromHexPtr(shash::HexPtr(header_digest)); |
91 | |||
92 |
1/2✓ Branch 1 taken 17 times.
✗ Branch 2 not taken.
|
17 | ObjectPackConsumer deserializer(digest, header_size); |
93 |
1/2✓ Branch 1 taken 17 times.
✗ Branch 2 not taken.
|
17 | deserializer.RegisterListener(&PayloadProcessor::ConsumerEventCallback, this); |
94 | |||
95 | 17 | int nb = 0; | |
96 | 17 | ObjectPackBuild::State consumer_state = ObjectPackBuild::kStateContinue; | |
97 |
1/2✓ Branch 2 taken 17 times.
✗ Branch 3 not taken.
|
17 | std::vector<unsigned char> buffer(kConsumerBuffer, 0); |
98 | do { | ||
99 |
1/2✓ Branch 3 taken 17 times.
✗ Branch 4 not taken.
|
17 | nb = read(fdin, &buffer[0], buffer.size()); |
100 |
1/2✓ Branch 2 taken 17 times.
✗ Branch 3 not taken.
|
17 | consumer_state = deserializer.ConsumeNext(nb, &buffer[0]); |
101 |
1/2✓ Branch 0 taken 17 times.
✗ Branch 1 not taken.
|
17 | if (consumer_state != ObjectPackBuild::kStateContinue |
102 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 17 times.
|
17 | && consumer_state != ObjectPackBuild::kStateDone) { |
103 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
104 | "PayloadProcessor - error: %d encountered when consuming object " | ||
105 | "pack.", | ||
106 | consumer_state); | ||
107 | ✗ | break; | |
108 | } | ||
109 |
2/4✓ Branch 0 taken 17 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 17 times.
|
17 | } while (nb > 0 && consumer_state != ObjectPackBuild::kStateDone); |
110 | |||
111 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 17 times.
|
17 | assert(pending_files_.empty()); |
112 | |||
113 |
1/2✓ Branch 1 taken 17 times.
✗ Branch 2 not taken.
|
17 | const Result res = Finalize(); |
114 | |||
115 |
1/2✓ Branch 1 taken 17 times.
✗ Branch 2 not taken.
|
17 | deserializer.UnregisterListeners(); |
116 | |||
117 | 17 | return res; | |
118 | 17 | } | |
119 | |||
120 | ✗ | void PayloadProcessor::ConsumerEventCallback( | |
121 | const ObjectPackBuild::Event &event) { | ||
122 | ✗ | std::string path(""); | |
123 | |||
124 | ✗ | if (event.object_type == ObjectPack::kCas) { | |
125 | ✗ | path = event.id.MakePath(); | |
126 | ✗ | } else if (event.object_type == ObjectPack::kNamed) { | |
127 | ✗ | path = event.object_name; | |
128 | } else { | ||
129 | // kEmpty - this is an error. | ||
130 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
131 | "PayloadProcessor - error: Event received with unknown object."); | ||
132 | ✗ | num_errors_++; | |
133 | ✗ | return; | |
134 | } | ||
135 | |||
136 | ✗ | const FileIterator it = pending_files_.find(event.id); | |
137 | ✗ | if (it == pending_files_.end()) { | |
138 | // Schedule file upload if it's not being uploaded yet. | ||
139 | // Uploaders later check if the file is already present | ||
140 | // in the upstream storage and will not upload it twice. | ||
141 | ✗ | FileInfo info(event); | |
142 | // info.handle is later deleted by FinalizeStreamedUpload | ||
143 | ✗ | info.handle = uploader_->InitStreamedUpload(NULL); | |
144 | ✗ | pending_files_[event.id] = info; | |
145 | } | ||
146 | |||
147 | ✗ | FileInfo &info = pending_files_[event.id]; | |
148 | |||
149 | ✗ | void *buf_copied = smalloc(event.buf_size); | |
150 | ✗ | memcpy(buf_copied, event.buf, event.buf_size); | |
151 | ✗ | const upload::AbstractUploader::UploadBuffer buf(event.buf_size, buf_copied); | |
152 | ✗ | uploader_->ScheduleUpload( | |
153 | info.handle, buf, | ||
154 | ✗ | upload::AbstractUploader::MakeClosure( | |
155 | &PayloadProcessor::OnUploadJobComplete, this, buf_copied)); | ||
156 | |||
157 | ✗ | shash::Update(static_cast<const unsigned char *>(event.buf), | |
158 | ✗ | event.buf_size, | |
159 | info.hash_context); | ||
160 | |||
161 | ✗ | info.current_size += event.buf_size; | |
162 | |||
163 | ✗ | if (info.current_size == info.total_size) { | |
164 | ✗ | shash::Any file_hash(event.id.algorithm); | |
165 | ✗ | shash::Final(info.hash_context, &file_hash); | |
166 | |||
167 | ✗ | if (file_hash != event.id) { | |
168 | ✗ | LogCvmfs( | |
169 | kLogReceiver, kLogSyslogErr, | ||
170 | "PayloadProcessor - error: Hash mismatch for unpacked file: event " | ||
171 | "size: %ld, file size: %ld, event hash: %s, file hash: %s", | ||
172 | ✗ | event.size, info.current_size, event.id.ToString(true).c_str(), | |
173 | ✗ | file_hash.ToString(true).c_str()); | |
174 | ✗ | num_errors_++; | |
175 | ✗ | return; | |
176 | } | ||
177 | // override final remote path if not CAS object | ||
178 | ✗ | if (event.object_type == ObjectPack::kNamed) { | |
179 | ✗ | info.handle->remote_path = path; | |
180 | } | ||
181 | ✗ | uploader_->ScheduleCommit(info.handle, event.id); | |
182 | |||
183 | ✗ | pending_files_.erase(event.id); | |
184 | } | ||
185 | } | ||
186 | |||
187 | ✗ | void PayloadProcessor::OnUploadJobComplete( | |
188 | const upload::UploaderResults &results, void *buffer) { | ||
189 | ✗ | free(buffer); | |
190 | } | ||
191 | |||
192 | 2 | void PayloadProcessor::SetStatistics(perf::Statistics *st) { | |
193 |
4/8✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 2 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 2 times.
✗ Branch 12 not taken.
|
2 | statistics_ = new perf::StatisticsTemplate("publish", st); |
194 | 2 | } | |
195 | |||
196 | ✗ | PayloadProcessor::Result PayloadProcessor::Initialize() { | |
197 | ✗ | Params params; | |
198 | ✗ | if (!GetParamsFromFile(current_repo_, ¶ms)) { | |
199 | ✗ | LogCvmfs( | |
200 | kLogReceiver, kLogSyslogErr, | ||
201 | "PayloadProcessor - error: Could not get configuration parameters."); | ||
202 | ✗ | return kOtherError; | |
203 | } | ||
204 | |||
205 | const std::string spooler_temp_dir = GetSpoolerTempDir( | ||
206 | ✗ | params.spooler_configuration); | |
207 | ✗ | assert(!spooler_temp_dir.empty()); | |
208 | ✗ | assert(MkdirDeep(spooler_temp_dir + "/receiver", 0770, true)); | |
209 | temp_dir_ = RaiiTempDir::Create(spooler_temp_dir | ||
210 | ✗ | + "/receiver/payload_processor"); | |
211 | |||
212 | const upload::SpoolerDefinition definition( | ||
213 | params.spooler_configuration, params.hash_alg, params.compression_alg, | ||
214 | ✗ | params.generate_legacy_bulk_chunks, params.use_file_chunking, | |
215 | params.min_chunk_size, params.avg_chunk_size, params.max_chunk_size, | ||
216 | ✗ | "dummy_token", "dummy_key"); | |
217 | |||
218 | ✗ | uploader_.Destroy(); | |
219 | |||
220 | // configure the uploader environment | ||
221 | ✗ | uploader_ = upload::AbstractUploader::Construct(definition); | |
222 | ✗ | if (!uploader_.IsValid()) { | |
223 | ✗ | LogCvmfs(kLogSpooler, kLogWarning, | |
224 | "Failed to initialize backend upload " | ||
225 | "facility in PayloadProcessor."); | ||
226 | ✗ | return kUploaderError; | |
227 | } | ||
228 | |||
229 | ✗ | if (statistics_.IsValid()) { | |
230 | ✗ | uploader_->InitCounters(statistics_.weak_ref()); | |
231 | } | ||
232 | |||
233 | ✗ | return kSuccess; | |
234 | } | ||
235 | |||
236 | ✗ | PayloadProcessor::Result PayloadProcessor::Finalize() { | |
237 | ✗ | uploader_->WaitForUpload(); | |
238 | ✗ | temp_dir_.Destroy(); | |
239 | |||
240 | ✗ | const unsigned num_uploader_errors = uploader_->GetNumberOfErrors(); | |
241 | ✗ | uploader_->TearDown(); | |
242 | ✗ | if (num_uploader_errors > 0) { | |
243 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
244 | "PayloadProcessor - error: Uploader - %d upload(s) failed.", | ||
245 | num_uploader_errors); | ||
246 | ✗ | return kUploaderError; | |
247 | } | ||
248 | |||
249 | ✗ | if (GetNumErrors() > 0) { | |
250 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
251 | "PayloadProcessor - error: %d unpacking error(s).", | ||
252 | GetNumErrors()); | ||
253 | ✗ | return kOtherError; | |
254 | } | ||
255 | |||
256 | ✗ | return kSuccess; | |
257 | } | ||
258 | |||
259 | } // namespace receiver | ||
260 |