GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/receiver/commit_processor.cc
Date: 2025-07-06 02:35:01
Exec Total Coverage
Lines: 0 127 0.0%
Branches: 0 344 0.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "commit_processor.h"
6
7 #include <time.h>
8
9 #include <vector>
10
11 #include "catalog_diff_tool.h"
12 #include "catalog_merge_tool.h"
13 #include "catalog_mgr_ro.h"
14 #include "catalog_mgr_rw.h"
15 #include "compression/compression.h"
16 #include "manifest.h"
17 #include "manifest_fetch.h"
18 #include "network/download.h"
19 #include "params.h"
20 #include "signing_tool.h"
21 #include "statistics.h"
22 #include "statistics_database.h"
23 #include "swissknife.h"
24 #include "swissknife_history.h"
25 #include "util/algorithm.h"
26 #include "util/logging.h"
27 #include "util/pointer.h"
28 #include "util/posix.h"
29 #include "util/raii_temp_dir.h"
30 #include "util/string.h"
31
32 namespace {
33
34 PathString RemoveRepoName(const PathString &lease_path) {
35 std::string abs_path = lease_path.ToString();
36 const std::string::const_iterator it = std::find(abs_path.begin(),
37 abs_path.end(), '/');
38 if (it != abs_path.end()) {
39 const size_t idx = it - abs_path.begin() + 1;
40 return lease_path.Suffix(idx);
41 } else {
42 return lease_path;
43 }
44 }
45
46 bool CreateNewTag(const RepositoryTag &repo_tag, const std::string &repo_name,
47 const receiver::Params &params, const std::string &temp_dir,
48 const std::string &manifest_path,
49 const std::string &public_key_path,
50 const std::string &proxy) {
51 swissknife::ArgumentList args;
52 args['r'].Reset(new std::string(params.spooler_configuration));
53 args['w'].Reset(new std::string(params.stratum0));
54 args['t'].Reset(new std::string(temp_dir));
55 args['m'].Reset(new std::string(manifest_path));
56 args['p'].Reset(new std::string(public_key_path));
57 args['f'].Reset(new std::string(repo_name));
58 args['e'].Reset(new std::string(params.hash_alg_str));
59 args['a'].Reset(new std::string(repo_tag.name()));
60 args['D'].Reset(new std::string(repo_tag.description()));
61 args['x'].Reset(new std::string());
62 args['@'].Reset(new std::string(proxy));
63
64 const UniquePtr<swissknife::CommandEditTag> edit_cmd(
65 new swissknife::CommandEditTag());
66 const int ret = edit_cmd->Main(args);
67
68 if (ret) {
69 LogCvmfs(kLogReceiver, kLogSyslogErr, "Error %d creating tag: %s", ret,
70 repo_tag.name().c_str());
71 return false;
72 }
73
74 return true;
75 }
76
77 } // namespace
78
79 namespace receiver {
80
81 CommitProcessor::CommitProcessor() : num_errors_(0), statistics_(NULL) { }
82
83 CommitProcessor::~CommitProcessor() { }
84
85 /**
86 * Applies the changes from the new catalog onto the repository.
87 *
88 * Let:
89 * + C_O = the root catalog of the repository (given by old_root_hash) at
90 * the beginning of the lease, on the release manager machine
91 * + C_N = the root catalog of the repository (given by new_root_hash), on
92 * the release manager machine, with the changes introduced during the
93 * lease
94 * + C_G = the current root catalog of the repository on the gateway machine.
95 *
96 * This method applies all the changes from C_N, with respect to C_O, onto C_G.
97 * The resulting catalog on the gateway machine (C_GN) is then set as root
98 * catalog in the repository manifest. The method also signs the updated
99 * repository manifest.
100 */
101 CommitProcessor::Result CommitProcessor::Process(
102 const std::string &lease_path, const shash::Any &old_root_hash,
103 const shash::Any &new_root_hash, const RepositoryTag &tag,
104 uint64_t *final_revision) {
105 RepositoryTag final_tag = tag;
106 // If tag_name is a generic tag, update the time stamp
107 if (final_tag.HasGenericName()) {
108 final_tag.SetGenericName();
109 }
110
111 LogCvmfs(kLogReceiver, kLogSyslog,
112 "CommitProcessor - lease_path: %s, old hash: %s, new hash: %s, "
113 "tag_name: %s, tag_description: %s",
114 lease_path.c_str(), old_root_hash.ToString(true).c_str(),
115 new_root_hash.ToString(true).c_str(), final_tag.name().c_str(),
116 final_tag.description().c_str());
117
118 const std::vector<std::string> lease_path_tokens = SplitString(lease_path,
119 '/');
120
121 const std::string repo_name = lease_path_tokens.front();
122
123 Params params;
124 if (!GetParamsFromFile(repo_name, &params)) {
125 LogCvmfs(
126 kLogReceiver, kLogSyslogErr,
127 "CommitProcessor - error: Could not get configuration parameters.");
128 return kError;
129 }
130
131 const UniquePtr<ServerTool> server_tool(new ServerTool());
132
133 if (!server_tool->InitDownloadManager(true, params.proxy)) {
134 LogCvmfs(
135 kLogReceiver, kLogSyslogErr,
136 "CommitProcessor - error: Could not initialize the download manager");
137 return kError;
138 }
139
140 const std::string public_key = "/etc/cvmfs/keys/" + repo_name + ".pub";
141 const std::string certificate = "/etc/cvmfs/keys/" + repo_name + ".crt";
142 const std::string private_key = "/etc/cvmfs/keys/" + repo_name + ".key";
143 if (!server_tool->InitSignatureManager(public_key, certificate,
144 private_key)) {
145 LogCvmfs(
146 kLogReceiver, kLogSyslogErr,
147 "CommitProcessor - error: Could not initialize the signature manager");
148 return kError;
149 }
150
151 const shash::Any manifest_base_hash;
152 const UniquePtr<manifest::Manifest> manifest_tgt(
153 server_tool->FetchRemoteManifest(params.stratum0, repo_name,
154 manifest_base_hash));
155
156 // Current catalog from the gateway machine
157 if (!manifest_tgt.IsValid()) {
158 LogCvmfs(kLogReceiver, kLogSyslogErr,
159 "CommitProcessor - error: Could not open repository manifest");
160 return kError;
161 }
162
163 LogCvmfs(kLogReceiver, kLogSyslog,
164 "CommitProcessor - lease_path: %s, target root hash: %s",
165 lease_path.c_str(),
166 manifest_tgt->catalog_hash().ToString(false).c_str());
167
168
169 std::string cache_dir_;
170 if (params.use_local_cache) {
171 cache_dir_ = "/var/spool/cvmfs/" + repo_name + "/cache.server";
172 }
173
174 const std::string spooler_temp_dir = GetSpoolerTempDir(
175 params.spooler_configuration);
176 assert(!spooler_temp_dir.empty());
177 assert(MkdirDeep(spooler_temp_dir + "/receiver", 0755, true));
178 const std::string temp_dir_root = spooler_temp_dir
179 + "/receiver/commit_processor";
180
181 const PathString relative_lease_path = RemoveRepoName(PathString(lease_path));
182
183 LogCvmfs(kLogReceiver, kLogSyslog,
184 "CommitProcessor - lease_path: %s, merging catalogs",
185 lease_path.c_str());
186
187 CatalogMergeTool<catalog::WritableCatalogManager,
188 catalog::SimpleCatalogManager>
189 merge_tool(params.stratum0, old_root_hash, new_root_hash,
190 relative_lease_path, temp_dir_root,
191 server_tool->download_manager(), manifest_tgt.weak_ref(),
192 statistics_, cache_dir_);
193 if (!merge_tool.Init()) {
194 LogCvmfs(kLogReceiver, kLogSyslogErr,
195 "Error: Could not initialize the catalog merge tool");
196 return kError;
197 }
198
199 std::string new_manifest_path;
200 shash::Any new_manifest_hash;
201 if (!merge_tool.Run(params, &new_manifest_path, &new_manifest_hash,
202 final_revision)) {
203 LogCvmfs(kLogReceiver, kLogSyslogErr,
204 "CommitProcessor - error: Catalog merge failed");
205 return kMergeFailure;
206 }
207
208 const UniquePtr<RaiiTempDir> raii_temp_dir(
209 RaiiTempDir::Create(temp_dir_root));
210 const std::string temp_dir = raii_temp_dir->dir();
211
212 if (!CreateNewTag(final_tag, repo_name, params, temp_dir, new_manifest_path,
213 public_key, params.proxy)) {
214 LogCvmfs(kLogReceiver, kLogSyslogErr, "Error creating tag: %s",
215 final_tag.name().c_str());
216 return kError;
217 }
218
219 LogCvmfs(kLogReceiver, kLogSyslog,
220 "CommitProcessor - lease_path: %s, signing manifest",
221 lease_path.c_str());
222
223 // Add C_N root catalog hash to reflog through SigningTool,
224 // so garbage collector can later delete it.
225 std::vector<shash::Any> reflog_catalogs;
226 reflog_catalogs.push_back(new_root_hash);
227
228 SigningTool signing_tool(server_tool.weak_ref());
229 const SigningTool::Result res = signing_tool.Run(
230 new_manifest_path, params.stratum0, params.spooler_configuration,
231 temp_dir, certificate, private_key, repo_name, "", "",
232 "/var/spool/cvmfs/" + repo_name + "/reflog.chksum", params.proxy,
233 params.garbage_collection, false, false, reflog_catalogs);
234 switch (res) {
235 case SigningTool::kReflogChecksumMissing:
236 LogCvmfs(kLogReceiver, kLogSyslogErr,
237 "CommitProcessor - error: missing reflog.chksum");
238 return kMissingReflog;
239 case SigningTool::kReflogMissing:
240 LogCvmfs(kLogReceiver, kLogSyslogErr,
241 "CommitProcessor - error: missing reflog");
242 return kMissingReflog;
243 case SigningTool::kError:
244 case SigningTool::kInitError:
245 LogCvmfs(kLogReceiver, kLogSyslogErr,
246 "CommitProcessor - error: signing manifest");
247 return kError;
248 case SigningTool::kSuccess:
249 LogCvmfs(kLogReceiver, kLogSyslog,
250 "CommitProcessor - lease_path: %s, success.",
251 lease_path.c_str());
252 }
253
254 LogCvmfs(kLogReceiver, kLogSyslog,
255 "CommitProcessor - lease_path: %s, new root hash: %s",
256 lease_path.c_str(), new_manifest_hash.ToString(false).c_str());
257
258 // Ensure CVMFS_ROOT_HASH is not set in
259 // /var/spool/cvmfs/<REPO_NAME>/client.local
260 const std::string fname = "/var/spool/cvmfs/" + repo_name + "/client.local";
261 if (truncate(fname.c_str(), 0) < 0) {
262 LogCvmfs(kLogReceiver, kLogSyslogErr, "Could not truncate %s\n",
263 fname.c_str());
264 return kError;
265 }
266
267 StatisticsDatabase *stats_db = StatisticsDatabase::OpenStandardDB(repo_name);
268 if (stats_db != NULL) {
269 if (!stats_db->StorePublishStatistics(statistics_, start_time_, true)) {
270 LogCvmfs(kLogReceiver, kLogSyslogErr,
271 "Could not store publish statistics");
272 }
273 if (params.upload_stats_db) {
274 const upload::SpoolerDefinition sd(params.spooler_configuration,
275 shash::kAny);
276 upload::Spooler *spooler = upload::Spooler::Construct(sd);
277 if (!stats_db->UploadStatistics(spooler)) {
278 LogCvmfs(kLogReceiver, kLogSyslogErr,
279 "Could not upload statistics DB to upstream storage");
280 }
281 delete spooler;
282 }
283 delete stats_db;
284
285 } else {
286 LogCvmfs(kLogReceiver, kLogSyslogErr, "Could not open statistics DB");
287 }
288
289 return kSuccess;
290 }
291
292 void CommitProcessor::SetStatistics(perf::Statistics *st,
293 const std::string &start_time) {
294 statistics_ = st;
295 statistics_->Register("publish.revision", "");
296 start_time_ = start_time;
297 }
298
299 } // namespace receiver
300