GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/receiver/payload_processor.cc
Date: 2024-04-28 02:33:07
Exec Total Coverage
Lines: 32 133 24.1%
Branches: 24 164 14.6%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "payload_processor.h"
6
7 #include <fcntl.h>
8 #include <unistd.h>
9 #include <vector>
10
11 #include "params.h"
12 #include "util/logging.h"
13 #include "util/posix.h"
14 #include "util/string.h"
15
16 namespace {
17
18 const size_t kConsumerBuffer = 10 * 1024 * 1024; // 10 MB
19
20 }
21
22 namespace receiver {
23
24 FileInfo::FileInfo()
25 : handle(NULL),
26 total_size(0),
27 current_size(0),
28 hash_context(),
29 hash_buffer()
30 {}
31
32 FileInfo::FileInfo(const ObjectPackBuild::Event& event)
33 : handle(NULL),
34 total_size(event.size),
35 current_size(0),
36 hash_context(shash::ContextPtr(event.id.algorithm)),
37 hash_buffer(hash_context.size, 0)
38 {
39 hash_context.buffer = &hash_buffer[0];
40 shash::Init(hash_context);
41 }
42
43 FileInfo::FileInfo(const FileInfo& other)
44 : handle(other.handle),
45 total_size(other.total_size),
46 current_size(other.current_size),
47 hash_context(other.hash_context),
48 hash_buffer(other.hash_buffer)
49 {
50 hash_context.buffer = &hash_buffer[0];
51 }
52
53 FileInfo& FileInfo::operator=(const FileInfo& other) {
54 handle = other.handle;
55 total_size = other.total_size;
56 current_size = other.current_size;
57 hash_context = other.hash_context;
58 hash_buffer = other.hash_buffer;
59 hash_context.buffer = &hash_buffer[0];
60
61 return *this;
62 }
63
64 2 PayloadProcessor::PayloadProcessor()
65 2 : pending_files_(),
66 2 current_repo_(),
67
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 uploader_(),
68
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 temp_dir_(),
69 2 num_errors_(0),
70
1/2
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
4 statistics_(NULL) {}
71
72 4 PayloadProcessor::~PayloadProcessor() {}
73
74 2 PayloadProcessor::Result PayloadProcessor::Process(
75 int fdin, const std::string& header_digest, const std::string& path,
76 uint64_t header_size) {
77
1/2
✓ Branch 3 taken 2 times.
✗ Branch 4 not taken.
2 LogCvmfs(kLogReceiver, kLogSyslog,
78 "PayloadProcessor - lease_path: %s, header digest: %s, header "
79 "size: %ld",
80 path.c_str(), header_digest.c_str(), header_size);
81
82 2 const size_t first_slash_idx = path.find('/', 0);
83
84
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 current_repo_ = path.substr(0, first_slash_idx);
85
86
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 Result init_result = Initialize();
87
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (init_result != kSuccess) {
88 return init_result;
89 }
90
91 // Set up object pack deserialization
92
1/2
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
2 shash::Any digest = shash::MkFromHexPtr(shash::HexPtr(header_digest));
93
94
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 ObjectPackConsumer deserializer(digest, header_size);
95
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 deserializer.RegisterListener(&PayloadProcessor::ConsumerEventCallback, this);
96
97 2 int nb = 0;
98 2 ObjectPackBuild::State consumer_state = ObjectPackBuild::kStateContinue;
99
1/2
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
2 std::vector<unsigned char> buffer(kConsumerBuffer, 0);
100 do {
101
1/2
✓ Branch 3 taken 2 times.
✗ Branch 4 not taken.
2 nb = read(fdin, &buffer[0], buffer.size());
102
1/2
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
2 consumer_state = deserializer.ConsumeNext(nb, &buffer[0]);
103
2/4
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
2 if (consumer_state != ObjectPackBuild::kStateContinue &&
104 consumer_state != ObjectPackBuild::kStateDone) {
105 LogCvmfs(kLogReceiver, kLogSyslogErr,
106 "PayloadProcessor - error: %d encountered when consuming object "
107 "pack.",
108 consumer_state);
109 break;
110 }
111
2/4
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
2 } while (nb > 0 && consumer_state != ObjectPackBuild::kStateDone);
112
113
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
2 assert(pending_files_.empty());
114
115
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 Result res = Finalize();
116
117
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 deserializer.UnregisterListeners();
118
119 2 return res;
120 2 }
121
122 void PayloadProcessor::ConsumerEventCallback(
123 const ObjectPackBuild::Event& event) {
124 std::string path("");
125
126 if (event.object_type == ObjectPack::kCas) {
127 path = event.id.MakePath();
128 } else if (event.object_type == ObjectPack::kNamed) {
129 path = event.object_name;
130 } else {
131 // kEmpty - this is an error.
132 LogCvmfs(kLogReceiver, kLogSyslogErr,
133 "PayloadProcessor - error: Event received with unknown object.");
134 num_errors_++;
135 return;
136 }
137
138 FileIterator it = pending_files_.find(event.id);
139 if (it == pending_files_.end()) {
140 // Schedule file upload if it's not being uploaded yet.
141 // Uploaders later check if the file is already present
142 // in the upstream storage and will not upload it twice.
143 FileInfo info(event);
144 // info.handle is later deleted by FinalizeStreamedUpload
145 info.handle = uploader_->InitStreamedUpload(NULL);
146 pending_files_[event.id] = info;
147 }
148
149 FileInfo& info = pending_files_[event.id];
150
151 void *buf_copied = smalloc(event.buf_size);
152 memcpy(buf_copied, event.buf, event.buf_size);
153 upload::AbstractUploader::UploadBuffer buf(event.buf_size, buf_copied);
154 uploader_->ScheduleUpload(info.handle, buf,
155 upload::AbstractUploader::MakeClosure(
156 &PayloadProcessor::OnUploadJobComplete, this, buf_copied));
157
158 shash::Update(static_cast<const unsigned char*>(event.buf),
159 event.buf_size,
160 info.hash_context);
161
162 info.current_size += event.buf_size;
163
164 if (info.current_size == info.total_size) {
165 shash::Any file_hash(event.id.algorithm);
166 shash::Final(info.hash_context, &file_hash);
167
168 if (file_hash != event.id) {
169 LogCvmfs(
170 kLogReceiver, kLogSyslogErr,
171 "PayloadProcessor - error: Hash mismatch for unpacked file: event "
172 "size: %ld, file size: %ld, event hash: %s, file hash: %s",
173 event.size, info.current_size,
174 event.id.ToString(true).c_str(), file_hash.ToString(true).c_str());
175 num_errors_++;
176 return;
177 }
178 // override final remote path if not CAS object
179 if (event.object_type == ObjectPack::kNamed) {
180 info.handle->remote_path = path;
181 }
182 uploader_->ScheduleCommit(info.handle, event.id);
183
184 pending_files_.erase(event.id);
185 }
186 }
187
188 void PayloadProcessor::OnUploadJobComplete(
189 const upload::UploaderResults &results,
190 void *buffer)
191 {
192 free(buffer);
193 }
194
195 1 void PayloadProcessor::SetStatistics(perf::Statistics *st) {
196
4/8
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 1 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 1 times.
✗ Branch 12 not taken.
1 statistics_ = new perf::StatisticsTemplate("publish", st);
197 1 }
198
199 PayloadProcessor::Result PayloadProcessor::Initialize() {
200 Params params;
201 if (!GetParamsFromFile(current_repo_, &params)) {
202 LogCvmfs(
203 kLogReceiver, kLogSyslogErr,
204 "PayloadProcessor - error: Could not get configuration parameters.");
205 return kOtherError;
206 }
207
208 const std::string spooler_temp_dir =
209 GetSpoolerTempDir(params.spooler_configuration);
210 assert(!spooler_temp_dir.empty());
211 assert(MkdirDeep(spooler_temp_dir + "/receiver", 0770, true));
212 temp_dir_ =
213 RaiiTempDir::Create(spooler_temp_dir + "/receiver/payload_processor");
214
215 upload::SpoolerDefinition definition(
216 params.spooler_configuration, params.hash_alg, params.compression_alg,
217 params.generate_legacy_bulk_chunks, params.use_file_chunking,
218 params.min_chunk_size, params.avg_chunk_size, params.max_chunk_size,
219 "dummy_token", "dummy_key");
220
221 uploader_.Destroy();
222
223 // configure the uploader environment
224 uploader_ = upload::AbstractUploader::Construct(definition);
225 if (!uploader_.IsValid()) {
226 LogCvmfs(kLogSpooler, kLogWarning,
227 "Failed to initialize backend upload "
228 "facility in PayloadProcessor.");
229 return kUploaderError;
230 }
231
232 if (statistics_.IsValid()) {
233 uploader_->InitCounters(statistics_.weak_ref());
234 }
235
236 return kSuccess;
237 }
238
239 PayloadProcessor::Result PayloadProcessor::Finalize() {
240 uploader_->WaitForUpload();
241 temp_dir_.Destroy();
242
243 const unsigned num_uploader_errors = uploader_->GetNumberOfErrors();
244 uploader_->TearDown();
245 if (num_uploader_errors > 0) {
246 LogCvmfs(kLogReceiver, kLogSyslogErr,
247 "PayloadProcessor - error: Uploader - %d upload(s) failed.",
248 num_uploader_errors);
249 return kUploaderError;
250 }
251
252 if (GetNumErrors() > 0) {
253 LogCvmfs(kLogReceiver, kLogSyslogErr,
254 "PayloadProcessor - error: %d unpacking error(s).", GetNumErrors());
255 return kOtherError;
256 }
257
258 return kSuccess;
259 }
260
261 } // namespace receiver
262