Directory: | cvmfs/ |
---|---|
File: | cvmfs/receiver/payload_processor.cc |
Date: | 2025-04-20 02:34:28 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 32 | 133 | 24.1% |
Branches: | 24 | 164 | 14.6% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #include "payload_processor.h" | ||
6 | |||
7 | #include <fcntl.h> | ||
8 | #include <unistd.h> | ||
9 | #include <vector> | ||
10 | |||
11 | #include "params.h" | ||
12 | #include "util/logging.h" | ||
13 | #include "util/posix.h" | ||
14 | #include "util/string.h" | ||
15 | |||
16 | namespace { | ||
17 | |||
18 | const size_t kConsumerBuffer = 10 * 1024 * 1024; // 10 MB | ||
19 | |||
20 | } | ||
21 | |||
22 | namespace receiver { | ||
23 | |||
24 | ✗ | FileInfo::FileInfo() | |
25 | ✗ | : handle(NULL), | |
26 | ✗ | total_size(0), | |
27 | ✗ | current_size(0), | |
28 | ✗ | hash_context(), | |
29 | ✗ | hash_buffer() | |
30 | {} | ||
31 | |||
32 | ✗ | FileInfo::FileInfo(const ObjectPackBuild::Event& event) | |
33 | ✗ | : handle(NULL), | |
34 | ✗ | total_size(event.size), | |
35 | ✗ | current_size(0), | |
36 | ✗ | hash_context(shash::ContextPtr(event.id.algorithm)), | |
37 | ✗ | hash_buffer(hash_context.size, 0) | |
38 | { | ||
39 | ✗ | hash_context.buffer = &hash_buffer[0]; | |
40 | ✗ | shash::Init(hash_context); | |
41 | } | ||
42 | |||
43 | ✗ | FileInfo::FileInfo(const FileInfo& other) | |
44 | ✗ | : handle(other.handle), | |
45 | ✗ | total_size(other.total_size), | |
46 | ✗ | current_size(other.current_size), | |
47 | ✗ | hash_context(other.hash_context), | |
48 | ✗ | hash_buffer(other.hash_buffer) | |
49 | { | ||
50 | ✗ | hash_context.buffer = &hash_buffer[0]; | |
51 | } | ||
52 | |||
53 | ✗ | FileInfo& FileInfo::operator=(const FileInfo& other) { | |
54 | ✗ | handle = other.handle; | |
55 | ✗ | total_size = other.total_size; | |
56 | ✗ | current_size = other.current_size; | |
57 | ✗ | hash_context = other.hash_context; | |
58 | ✗ | hash_buffer = other.hash_buffer; | |
59 | ✗ | hash_context.buffer = &hash_buffer[0]; | |
60 | |||
61 | ✗ | return *this; | |
62 | } | ||
63 | |||
64 | 2 | PayloadProcessor::PayloadProcessor() | |
65 | 2 | : pending_files_(), | |
66 | 2 | current_repo_(), | |
67 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | uploader_(), |
68 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | temp_dir_(), |
69 | 2 | num_errors_(0), | |
70 |
1/2✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
|
4 | statistics_(NULL) {} |
71 | |||
72 | 4 | PayloadProcessor::~PayloadProcessor() {} | |
73 | |||
74 | 2 | PayloadProcessor::Result PayloadProcessor::Process( | |
75 | int fdin, const std::string& header_digest, const std::string& path, | ||
76 | uint64_t header_size) { | ||
77 |
1/2✓ Branch 3 taken 2 times.
✗ Branch 4 not taken.
|
2 | LogCvmfs(kLogReceiver, kLogSyslog, |
78 | "PayloadProcessor - lease_path: %s, header digest: %s, header " | ||
79 | "size: %ld", | ||
80 | path.c_str(), header_digest.c_str(), header_size); | ||
81 | |||
82 | 2 | const size_t first_slash_idx = path.find('/', 0); | |
83 | |||
84 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | current_repo_ = path.substr(0, first_slash_idx); |
85 | |||
86 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | Result init_result = Initialize(); |
87 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
|
2 | if (init_result != kSuccess) { |
88 | ✗ | return init_result; | |
89 | } | ||
90 | |||
91 | // Set up object pack deserialization | ||
92 |
1/2✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
|
2 | shash::Any digest = shash::MkFromHexPtr(shash::HexPtr(header_digest)); |
93 | |||
94 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | ObjectPackConsumer deserializer(digest, header_size); |
95 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | deserializer.RegisterListener(&PayloadProcessor::ConsumerEventCallback, this); |
96 | |||
97 | 2 | int nb = 0; | |
98 | 2 | ObjectPackBuild::State consumer_state = ObjectPackBuild::kStateContinue; | |
99 |
1/2✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
|
2 | std::vector<unsigned char> buffer(kConsumerBuffer, 0); |
100 | do { | ||
101 |
1/2✓ Branch 3 taken 2 times.
✗ Branch 4 not taken.
|
2 | nb = read(fdin, &buffer[0], buffer.size()); |
102 |
1/2✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
|
2 | consumer_state = deserializer.ConsumeNext(nb, &buffer[0]); |
103 |
2/4✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
|
2 | if (consumer_state != ObjectPackBuild::kStateContinue && |
104 | consumer_state != ObjectPackBuild::kStateDone) { | ||
105 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
106 | "PayloadProcessor - error: %d encountered when consuming object " | ||
107 | "pack.", | ||
108 | consumer_state); | ||
109 | ✗ | break; | |
110 | } | ||
111 |
2/4✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
|
2 | } while (nb > 0 && consumer_state != ObjectPackBuild::kStateDone); |
112 | |||
113 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
|
2 | assert(pending_files_.empty()); |
114 | |||
115 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | Result res = Finalize(); |
116 | |||
117 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | deserializer.UnregisterListeners(); |
118 | |||
119 | 2 | return res; | |
120 | 2 | } | |
121 | |||
122 | ✗ | void PayloadProcessor::ConsumerEventCallback( | |
123 | const ObjectPackBuild::Event& event) { | ||
124 | ✗ | std::string path(""); | |
125 | |||
126 | ✗ | if (event.object_type == ObjectPack::kCas) { | |
127 | ✗ | path = event.id.MakePath(); | |
128 | ✗ | } else if (event.object_type == ObjectPack::kNamed) { | |
129 | ✗ | path = event.object_name; | |
130 | } else { | ||
131 | // kEmpty - this is an error. | ||
132 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
133 | "PayloadProcessor - error: Event received with unknown object."); | ||
134 | ✗ | num_errors_++; | |
135 | ✗ | return; | |
136 | } | ||
137 | |||
138 | ✗ | FileIterator it = pending_files_.find(event.id); | |
139 | ✗ | if (it == pending_files_.end()) { | |
140 | // Schedule file upload if it's not being uploaded yet. | ||
141 | // Uploaders later check if the file is already present | ||
142 | // in the upstream storage and will not upload it twice. | ||
143 | ✗ | FileInfo info(event); | |
144 | // info.handle is later deleted by FinalizeStreamedUpload | ||
145 | ✗ | info.handle = uploader_->InitStreamedUpload(NULL); | |
146 | ✗ | pending_files_[event.id] = info; | |
147 | } | ||
148 | |||
149 | ✗ | FileInfo& info = pending_files_[event.id]; | |
150 | |||
151 | ✗ | void *buf_copied = smalloc(event.buf_size); | |
152 | ✗ | memcpy(buf_copied, event.buf, event.buf_size); | |
153 | ✗ | upload::AbstractUploader::UploadBuffer buf(event.buf_size, buf_copied); | |
154 | ✗ | uploader_->ScheduleUpload(info.handle, buf, | |
155 | ✗ | upload::AbstractUploader::MakeClosure( | |
156 | &PayloadProcessor::OnUploadJobComplete, this, buf_copied)); | ||
157 | |||
158 | ✗ | shash::Update(static_cast<const unsigned char*>(event.buf), | |
159 | ✗ | event.buf_size, | |
160 | info.hash_context); | ||
161 | |||
162 | ✗ | info.current_size += event.buf_size; | |
163 | |||
164 | ✗ | if (info.current_size == info.total_size) { | |
165 | ✗ | shash::Any file_hash(event.id.algorithm); | |
166 | ✗ | shash::Final(info.hash_context, &file_hash); | |
167 | |||
168 | ✗ | if (file_hash != event.id) { | |
169 | ✗ | LogCvmfs( | |
170 | kLogReceiver, kLogSyslogErr, | ||
171 | "PayloadProcessor - error: Hash mismatch for unpacked file: event " | ||
172 | "size: %ld, file size: %ld, event hash: %s, file hash: %s", | ||
173 | ✗ | event.size, info.current_size, | |
174 | ✗ | event.id.ToString(true).c_str(), file_hash.ToString(true).c_str()); | |
175 | ✗ | num_errors_++; | |
176 | ✗ | return; | |
177 | } | ||
178 | // override final remote path if not CAS object | ||
179 | ✗ | if (event.object_type == ObjectPack::kNamed) { | |
180 | ✗ | info.handle->remote_path = path; | |
181 | } | ||
182 | ✗ | uploader_->ScheduleCommit(info.handle, event.id); | |
183 | |||
184 | ✗ | pending_files_.erase(event.id); | |
185 | } | ||
186 | } | ||
187 | |||
188 | ✗ | void PayloadProcessor::OnUploadJobComplete( | |
189 | const upload::UploaderResults &results, | ||
190 | void *buffer) | ||
191 | { | ||
192 | ✗ | free(buffer); | |
193 | } | ||
194 | |||
195 | 1 | void PayloadProcessor::SetStatistics(perf::Statistics *st) { | |
196 |
4/8✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 1 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 1 times.
✗ Branch 12 not taken.
|
1 | statistics_ = new perf::StatisticsTemplate("publish", st); |
197 | 1 | } | |
198 | |||
199 | ✗ | PayloadProcessor::Result PayloadProcessor::Initialize() { | |
200 | ✗ | Params params; | |
201 | ✗ | if (!GetParamsFromFile(current_repo_, ¶ms)) { | |
202 | ✗ | LogCvmfs( | |
203 | kLogReceiver, kLogSyslogErr, | ||
204 | "PayloadProcessor - error: Could not get configuration parameters."); | ||
205 | ✗ | return kOtherError; | |
206 | } | ||
207 | |||
208 | const std::string spooler_temp_dir = | ||
209 | ✗ | GetSpoolerTempDir(params.spooler_configuration); | |
210 | ✗ | assert(!spooler_temp_dir.empty()); | |
211 | ✗ | assert(MkdirDeep(spooler_temp_dir + "/receiver", 0770, true)); | |
212 | temp_dir_ = | ||
213 | ✗ | RaiiTempDir::Create(spooler_temp_dir + "/receiver/payload_processor"); | |
214 | |||
215 | upload::SpoolerDefinition definition( | ||
216 | params.spooler_configuration, params.hash_alg, params.compression_alg, | ||
217 | ✗ | params.generate_legacy_bulk_chunks, params.use_file_chunking, | |
218 | params.min_chunk_size, params.avg_chunk_size, params.max_chunk_size, | ||
219 | ✗ | "dummy_token", "dummy_key"); | |
220 | |||
221 | ✗ | uploader_.Destroy(); | |
222 | |||
223 | // configure the uploader environment | ||
224 | ✗ | uploader_ = upload::AbstractUploader::Construct(definition); | |
225 | ✗ | if (!uploader_.IsValid()) { | |
226 | ✗ | LogCvmfs(kLogSpooler, kLogWarning, | |
227 | "Failed to initialize backend upload " | ||
228 | "facility in PayloadProcessor."); | ||
229 | ✗ | return kUploaderError; | |
230 | } | ||
231 | |||
232 | ✗ | if (statistics_.IsValid()) { | |
233 | ✗ | uploader_->InitCounters(statistics_.weak_ref()); | |
234 | } | ||
235 | |||
236 | ✗ | return kSuccess; | |
237 | } | ||
238 | |||
239 | ✗ | PayloadProcessor::Result PayloadProcessor::Finalize() { | |
240 | ✗ | uploader_->WaitForUpload(); | |
241 | ✗ | temp_dir_.Destroy(); | |
242 | |||
243 | ✗ | const unsigned num_uploader_errors = uploader_->GetNumberOfErrors(); | |
244 | ✗ | uploader_->TearDown(); | |
245 | ✗ | if (num_uploader_errors > 0) { | |
246 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
247 | "PayloadProcessor - error: Uploader - %d upload(s) failed.", | ||
248 | num_uploader_errors); | ||
249 | ✗ | return kUploaderError; | |
250 | } | ||
251 | |||
252 | ✗ | if (GetNumErrors() > 0) { | |
253 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
254 | "PayloadProcessor - error: %d unpacking error(s).", GetNumErrors()); | ||
255 | ✗ | return kOtherError; | |
256 | } | ||
257 | |||
258 | ✗ | return kSuccess; | |
259 | } | ||
260 | |||
261 | } // namespace receiver | ||
262 |