GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/receiver/payload_processor.cc
Date: 2025-06-22 02:36:02
Exec Total Coverage
Lines: 33 134 24.6%
Branches: 24 164 14.6%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "payload_processor.h"
6
7 #include <fcntl.h>
8 #include <unistd.h>
9
10 #include <vector>
11
12 #include "params.h"
13 #include "util/logging.h"
14 #include "util/posix.h"
15 #include "util/string.h"
16
17 namespace {
18
19 const size_t kConsumerBuffer = 10 * 1024 * 1024; // 10 MB
20
21 }
22
23 namespace receiver {
24
25 FileInfo::FileInfo()
26 : handle(NULL)
27 , total_size(0)
28 , current_size(0)
29 , hash_context()
30 , hash_buffer() { }
31
32 FileInfo::FileInfo(const ObjectPackBuild::Event &event)
33 : handle(NULL)
34 , total_size(event.size)
35 , current_size(0)
36 , hash_context(shash::ContextPtr(event.id.algorithm))
37 , hash_buffer(hash_context.size, 0) {
38 hash_context.buffer = &hash_buffer[0];
39 shash::Init(hash_context);
40 }
41
42 FileInfo::FileInfo(const FileInfo &other)
43 : handle(other.handle)
44 , total_size(other.total_size)
45 , current_size(other.current_size)
46 , hash_context(other.hash_context)
47 , hash_buffer(other.hash_buffer) {
48 hash_context.buffer = &hash_buffer[0];
49 }
50
51 FileInfo &FileInfo::operator=(const FileInfo &other) {
52 handle = other.handle;
53 total_size = other.total_size;
54 current_size = other.current_size;
55 hash_context = other.hash_context;
56 hash_buffer = other.hash_buffer;
57 hash_context.buffer = &hash_buffer[0];
58
59 return *this;
60 }
61
62 17 PayloadProcessor::PayloadProcessor()
63 17 : pending_files_()
64 17 , current_repo_()
65
1/2
✓ Branch 1 taken 17 times.
✗ Branch 2 not taken.
17 , uploader_()
66
1/2
✓ Branch 1 taken 17 times.
✗ Branch 2 not taken.
17 , temp_dir_()
67 17 , num_errors_(0)
68
1/2
✓ Branch 2 taken 17 times.
✗ Branch 3 not taken.
34 , statistics_(NULL) { }
69
70 34 PayloadProcessor::~PayloadProcessor() { }
71
72 17 PayloadProcessor::Result PayloadProcessor::Process(
73 int fdin, const std::string &header_digest, const std::string &path,
74 uint64_t header_size) {
75
1/2
✓ Branch 3 taken 17 times.
✗ Branch 4 not taken.
17 LogCvmfs(kLogReceiver, kLogSyslog,
76 "PayloadProcessor - lease_path: %s, header digest: %s, header "
77 "size: %ld",
78 path.c_str(), header_digest.c_str(), header_size);
79
80 17 const size_t first_slash_idx = path.find('/', 0);
81
82
1/2
✓ Branch 1 taken 17 times.
✗ Branch 2 not taken.
17 current_repo_ = path.substr(0, first_slash_idx);
83
84
1/2
✓ Branch 1 taken 17 times.
✗ Branch 2 not taken.
17 const Result init_result = Initialize();
85
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 17 times.
17 if (init_result != kSuccess) {
86 return init_result;
87 }
88
89 // Set up object pack deserialization
90
1/2
✓ Branch 2 taken 17 times.
✗ Branch 3 not taken.
17 const shash::Any digest = shash::MkFromHexPtr(shash::HexPtr(header_digest));
91
92
1/2
✓ Branch 1 taken 17 times.
✗ Branch 2 not taken.
17 ObjectPackConsumer deserializer(digest, header_size);
93
1/2
✓ Branch 1 taken 17 times.
✗ Branch 2 not taken.
17 deserializer.RegisterListener(&PayloadProcessor::ConsumerEventCallback, this);
94
95 17 int nb = 0;
96 17 ObjectPackBuild::State consumer_state = ObjectPackBuild::kStateContinue;
97
1/2
✓ Branch 2 taken 17 times.
✗ Branch 3 not taken.
17 std::vector<unsigned char> buffer(kConsumerBuffer, 0);
98 do {
99
1/2
✓ Branch 3 taken 17 times.
✗ Branch 4 not taken.
17 nb = read(fdin, &buffer[0], buffer.size());
100
1/2
✓ Branch 2 taken 17 times.
✗ Branch 3 not taken.
17 consumer_state = deserializer.ConsumeNext(nb, &buffer[0]);
101
1/2
✓ Branch 0 taken 17 times.
✗ Branch 1 not taken.
17 if (consumer_state != ObjectPackBuild::kStateContinue
102
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 17 times.
17 && consumer_state != ObjectPackBuild::kStateDone) {
103 LogCvmfs(kLogReceiver, kLogSyslogErr,
104 "PayloadProcessor - error: %d encountered when consuming object "
105 "pack.",
106 consumer_state);
107 break;
108 }
109
2/4
✓ Branch 0 taken 17 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 17 times.
17 } while (nb > 0 && consumer_state != ObjectPackBuild::kStateDone);
110
111
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 17 times.
17 assert(pending_files_.empty());
112
113
1/2
✓ Branch 1 taken 17 times.
✗ Branch 2 not taken.
17 const Result res = Finalize();
114
115
1/2
✓ Branch 1 taken 17 times.
✗ Branch 2 not taken.
17 deserializer.UnregisterListeners();
116
117 17 return res;
118 17 }
119
120 void PayloadProcessor::ConsumerEventCallback(
121 const ObjectPackBuild::Event &event) {
122 std::string path("");
123
124 if (event.object_type == ObjectPack::kCas) {
125 path = event.id.MakePath();
126 } else if (event.object_type == ObjectPack::kNamed) {
127 path = event.object_name;
128 } else {
129 // kEmpty - this is an error.
130 LogCvmfs(kLogReceiver, kLogSyslogErr,
131 "PayloadProcessor - error: Event received with unknown object.");
132 num_errors_++;
133 return;
134 }
135
136 const FileIterator it = pending_files_.find(event.id);
137 if (it == pending_files_.end()) {
138 // Schedule file upload if it's not being uploaded yet.
139 // Uploaders later check if the file is already present
140 // in the upstream storage and will not upload it twice.
141 FileInfo info(event);
142 // info.handle is later deleted by FinalizeStreamedUpload
143 info.handle = uploader_->InitStreamedUpload(NULL);
144 pending_files_[event.id] = info;
145 }
146
147 FileInfo &info = pending_files_[event.id];
148
149 void *buf_copied = smalloc(event.buf_size);
150 memcpy(buf_copied, event.buf, event.buf_size);
151 const upload::AbstractUploader::UploadBuffer buf(event.buf_size, buf_copied);
152 uploader_->ScheduleUpload(
153 info.handle, buf,
154 upload::AbstractUploader::MakeClosure(
155 &PayloadProcessor::OnUploadJobComplete, this, buf_copied));
156
157 shash::Update(static_cast<const unsigned char *>(event.buf),
158 event.buf_size,
159 info.hash_context);
160
161 info.current_size += event.buf_size;
162
163 if (info.current_size == info.total_size) {
164 shash::Any file_hash(event.id.algorithm);
165 shash::Final(info.hash_context, &file_hash);
166
167 if (file_hash != event.id) {
168 LogCvmfs(
169 kLogReceiver, kLogSyslogErr,
170 "PayloadProcessor - error: Hash mismatch for unpacked file: event "
171 "size: %ld, file size: %ld, event hash: %s, file hash: %s",
172 event.size, info.current_size, event.id.ToString(true).c_str(),
173 file_hash.ToString(true).c_str());
174 num_errors_++;
175 return;
176 }
177 // override final remote path if not CAS object
178 if (event.object_type == ObjectPack::kNamed) {
179 info.handle->remote_path = path;
180 }
181 uploader_->ScheduleCommit(info.handle, event.id);
182
183 pending_files_.erase(event.id);
184 }
185 }
186
187 void PayloadProcessor::OnUploadJobComplete(
188 const upload::UploaderResults &results, void *buffer) {
189 free(buffer);
190 }
191
192 2 void PayloadProcessor::SetStatistics(perf::Statistics *st) {
193
4/8
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 2 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 2 times.
✗ Branch 12 not taken.
2 statistics_ = new perf::StatisticsTemplate("publish", st);
194 2 }
195
196 PayloadProcessor::Result PayloadProcessor::Initialize() {
197 Params params;
198 if (!GetParamsFromFile(current_repo_, &params)) {
199 LogCvmfs(
200 kLogReceiver, kLogSyslogErr,
201 "PayloadProcessor - error: Could not get configuration parameters.");
202 return kOtherError;
203 }
204
205 const std::string spooler_temp_dir = GetSpoolerTempDir(
206 params.spooler_configuration);
207 assert(!spooler_temp_dir.empty());
208 assert(MkdirDeep(spooler_temp_dir + "/receiver", 0770, true));
209 temp_dir_ = RaiiTempDir::Create(spooler_temp_dir
210 + "/receiver/payload_processor");
211
212 const upload::SpoolerDefinition definition(
213 params.spooler_configuration, params.hash_alg, params.compression_alg,
214 params.generate_legacy_bulk_chunks, params.use_file_chunking,
215 params.min_chunk_size, params.avg_chunk_size, params.max_chunk_size,
216 "dummy_token", "dummy_key");
217
218 uploader_.Destroy();
219
220 // configure the uploader environment
221 uploader_ = upload::AbstractUploader::Construct(definition);
222 if (!uploader_.IsValid()) {
223 LogCvmfs(kLogSpooler, kLogWarning,
224 "Failed to initialize backend upload "
225 "facility in PayloadProcessor.");
226 return kUploaderError;
227 }
228
229 if (statistics_.IsValid()) {
230 uploader_->InitCounters(statistics_.weak_ref());
231 }
232
233 return kSuccess;
234 }
235
236 PayloadProcessor::Result PayloadProcessor::Finalize() {
237 uploader_->WaitForUpload();
238 temp_dir_.Destroy();
239
240 const unsigned num_uploader_errors = uploader_->GetNumberOfErrors();
241 uploader_->TearDown();
242 if (num_uploader_errors > 0) {
243 LogCvmfs(kLogReceiver, kLogSyslogErr,
244 "PayloadProcessor - error: Uploader - %d upload(s) failed.",
245 num_uploader_errors);
246 return kUploaderError;
247 }
248
249 if (GetNumErrors() > 0) {
250 LogCvmfs(kLogReceiver, kLogSyslogErr,
251 "PayloadProcessor - error: %d unpacking error(s).",
252 GetNumErrors());
253 return kOtherError;
254 }
255
256 return kSuccess;
257 }
258
259 } // namespace receiver
260