GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/receiver/commit_processor.cc
Date: 2025-04-20 02:34:28
Exec Total Coverage
Lines: 0 126 0.0%
Branches: 0 344 0.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "commit_processor.h"
6
7 #include <time.h>
8
9 #include <vector>
10
11 #include "catalog_diff_tool.h"
12 #include "catalog_merge_tool.h"
13 #include "catalog_mgr_ro.h"
14 #include "catalog_mgr_rw.h"
15 #include "compression/compression.h"
16 #include "manifest.h"
17 #include "manifest_fetch.h"
18 #include "network/download.h"
19 #include "params.h"
20 #include "signing_tool.h"
21 #include "statistics.h"
22 #include "statistics_database.h"
23 #include "swissknife.h"
24 #include "swissknife_history.h"
25 #include "util/algorithm.h"
26 #include "util/logging.h"
27 #include "util/pointer.h"
28 #include "util/posix.h"
29 #include "util/raii_temp_dir.h"
30 #include "util/string.h"
31
32 namespace {
33
34 PathString RemoveRepoName(const PathString& lease_path) {
35 std::string abs_path = lease_path.ToString();
36 std::string::const_iterator it =
37 std::find(abs_path.begin(), abs_path.end(), '/');
38 if (it != abs_path.end()) {
39 size_t idx = it - abs_path.begin() + 1;
40 return lease_path.Suffix(idx);
41 } else {
42 return lease_path;
43 }
44 }
45
46 bool CreateNewTag(const RepositoryTag& repo_tag, const std::string& repo_name,
47 const receiver::Params& params, const std::string& temp_dir,
48 const std::string& manifest_path,
49 const std::string& public_key_path,
50 const std::string& proxy) {
51 swissknife::ArgumentList args;
52 args['r'].Reset(new std::string(params.spooler_configuration));
53 args['w'].Reset(new std::string(params.stratum0));
54 args['t'].Reset(new std::string(temp_dir));
55 args['m'].Reset(new std::string(manifest_path));
56 args['p'].Reset(new std::string(public_key_path));
57 args['f'].Reset(new std::string(repo_name));
58 args['e'].Reset(new std::string(params.hash_alg_str));
59 args['a'].Reset(new std::string(repo_tag.name()));
60 args['D'].Reset(new std::string(repo_tag.description()));
61 args['x'].Reset(new std::string());
62 args['@'].Reset(new std::string(proxy));
63
64 UniquePtr<swissknife::CommandEditTag> edit_cmd(
65 new swissknife::CommandEditTag());
66 const int ret = edit_cmd->Main(args);
67
68 if (ret) {
69 LogCvmfs(kLogReceiver, kLogSyslogErr, "Error %d creating tag: %s", ret,
70 repo_tag.name().c_str());
71 return false;
72 }
73
74 return true;
75 }
76
77 } // namespace
78
79 namespace receiver {
80
81 CommitProcessor::CommitProcessor() : num_errors_(0), statistics_(NULL) {}
82
83 CommitProcessor::~CommitProcessor() {}
84
85 /**
86 * Applies the changes from the new catalog onto the repository.
87 *
88 * Let:
89 * + C_O = the root catalog of the repository (given by old_root_hash) at
90 * the beginning of the lease, on the release manager machine
91 * + C_N = the root catalog of the repository (given by new_root_hash), on
92 * the release manager machine, with the changes introduced during the
93 * lease
94 * + C_G = the current root catalog of the repository on the gateway machine.
95 *
96 * This method applies all the changes from C_N, with respect to C_O, onto C_G.
97 * The resulting catalog on the gateway machine (C_GN) is then set as root
98 * catalog in the repository manifest. The method also signs the updated
99 * repository manifest.
100 */
101 CommitProcessor::Result CommitProcessor::Process(
102 const std::string& lease_path, const shash::Any& old_root_hash,
103 const shash::Any& new_root_hash, const RepositoryTag& tag,
104 uint64_t *final_revision) {
105 RepositoryTag final_tag = tag;
106 // If tag_name is a generic tag, update the time stamp
107 if (final_tag.HasGenericName()) {
108 final_tag.SetGenericName();
109 }
110
111 LogCvmfs(kLogReceiver, kLogSyslog,
112 "CommitProcessor - lease_path: %s, old hash: %s, new hash: %s, "
113 "tag_name: %s, tag_description: %s",
114 lease_path.c_str(), old_root_hash.ToString(true).c_str(),
115 new_root_hash.ToString(true).c_str(), final_tag.name().c_str(),
116 final_tag.description().c_str());
117
118 const std::vector<std::string> lease_path_tokens =
119 SplitString(lease_path, '/');
120
121 const std::string repo_name = lease_path_tokens.front();
122
123 Params params;
124 if (!GetParamsFromFile(repo_name, &params)) {
125 LogCvmfs(
126 kLogReceiver, kLogSyslogErr,
127 "CommitProcessor - error: Could not get configuration parameters.");
128 return kError;
129 }
130
131 UniquePtr<ServerTool> server_tool(new ServerTool());
132
133 if (!server_tool->InitDownloadManager(true, params.proxy)) {
134 LogCvmfs(
135 kLogReceiver, kLogSyslogErr,
136 "CommitProcessor - error: Could not initialize the download manager");
137 return kError;
138 }
139
140 const std::string public_key = "/etc/cvmfs/keys/" + repo_name + ".pub";
141 const std::string certificate = "/etc/cvmfs/keys/" + repo_name + ".crt";
142 const std::string private_key = "/etc/cvmfs/keys/" + repo_name + ".key";
143 if (!server_tool->InitSignatureManager(public_key, certificate, private_key))
144 {
145 LogCvmfs(
146 kLogReceiver, kLogSyslogErr,
147 "CommitProcessor - error: Could not initialize the signature manager");
148 return kError;
149 }
150
151 shash::Any manifest_base_hash;
152 const UniquePtr<manifest::Manifest> manifest_tgt(
153 server_tool->FetchRemoteManifest(
154 params.stratum0, repo_name, manifest_base_hash));
155
156 // Current catalog from the gateway machine
157 if (!manifest_tgt.IsValid()) {
158 LogCvmfs(kLogReceiver, kLogSyslogErr,
159 "CommitProcessor - error: Could not open repository manifest");
160 return kError;
161 }
162
163 LogCvmfs(kLogReceiver, kLogSyslog,
164 "CommitProcessor - lease_path: %s, target root hash: %s",
165 lease_path.c_str(),
166 manifest_tgt->catalog_hash().ToString(false).c_str());
167
168
169 std::string cache_dir_;
170 if (params.use_local_cache) {
171 cache_dir_ = "/var/spool/cvmfs/" + repo_name + "/cache.server";
172 }
173
174 const std::string spooler_temp_dir =
175 GetSpoolerTempDir(params.spooler_configuration);
176 assert(!spooler_temp_dir.empty());
177 assert(MkdirDeep(spooler_temp_dir + "/receiver", 0755, true));
178 const std::string temp_dir_root =
179 spooler_temp_dir + "/receiver/commit_processor";
180
181 const PathString relative_lease_path = RemoveRepoName(PathString(lease_path));
182
183 LogCvmfs(kLogReceiver, kLogSyslog,
184 "CommitProcessor - lease_path: %s, merging catalogs",
185 lease_path.c_str());
186
187 CatalogMergeTool<catalog::WritableCatalogManager,
188 catalog::SimpleCatalogManager>
189 merge_tool(params.stratum0, old_root_hash, new_root_hash,
190 relative_lease_path, temp_dir_root,
191 server_tool->download_manager(), manifest_tgt.weak_ref(),
192 statistics_, cache_dir_);
193 if (!merge_tool.Init()) {
194 LogCvmfs(kLogReceiver, kLogSyslogErr,
195 "Error: Could not initialize the catalog merge tool");
196 return kError;
197 }
198
199 std::string new_manifest_path;
200 shash::Any new_manifest_hash;
201 if (!merge_tool.Run(params, &new_manifest_path, &new_manifest_hash, final_revision)) {
202 LogCvmfs(kLogReceiver, kLogSyslogErr,
203 "CommitProcessor - error: Catalog merge failed");
204 return kMergeFailure;
205 }
206
207 UniquePtr<RaiiTempDir> raii_temp_dir(RaiiTempDir::Create(temp_dir_root));
208 const std::string temp_dir = raii_temp_dir->dir();
209
210 if (!CreateNewTag(final_tag, repo_name, params, temp_dir, new_manifest_path,
211 public_key, params.proxy)) {
212 LogCvmfs(kLogReceiver, kLogSyslogErr, "Error creating tag: %s",
213 final_tag.name().c_str());
214 return kError;
215 }
216
217 LogCvmfs(kLogReceiver, kLogSyslog,
218 "CommitProcessor - lease_path: %s, signing manifest",
219 lease_path.c_str());
220
221 // Add C_N root catalog hash to reflog through SigningTool,
222 // so garbage collector can later delete it.
223 std::vector<shash::Any> reflog_catalogs;
224 reflog_catalogs.push_back(new_root_hash);
225
226 SigningTool signing_tool(server_tool.weak_ref());
227 SigningTool::Result res = signing_tool.Run(
228 new_manifest_path, params.stratum0, params.spooler_configuration,
229 temp_dir, certificate, private_key, repo_name, "", "",
230 "/var/spool/cvmfs/" + repo_name + "/reflog.chksum", params.proxy,
231 params.garbage_collection, false, false, reflog_catalogs);
232 switch (res) {
233 case SigningTool::kReflogChecksumMissing:
234 LogCvmfs(kLogReceiver, kLogSyslogErr,
235 "CommitProcessor - error: missing reflog.chksum");
236 return kMissingReflog;
237 case SigningTool::kReflogMissing:
238 LogCvmfs(kLogReceiver, kLogSyslogErr,
239 "CommitProcessor - error: missing reflog");
240 return kMissingReflog;
241 case SigningTool::kError:
242 case SigningTool::kInitError:
243 LogCvmfs(kLogReceiver, kLogSyslogErr,
244 "CommitProcessor - error: signing manifest");
245 return kError;
246 case SigningTool::kSuccess:
247 LogCvmfs(kLogReceiver, kLogSyslog,
248 "CommitProcessor - lease_path: %s, success.",
249 lease_path.c_str());
250 }
251
252 LogCvmfs(kLogReceiver, kLogSyslog,
253 "CommitProcessor - lease_path: %s, new root hash: %s",
254 lease_path.c_str(),
255 new_manifest_hash.ToString(false).c_str());
256
257 // Ensure CVMFS_ROOT_HASH is not set in
258 // /var/spool/cvmfs/<REPO_NAME>/client.local
259 const std::string fname = "/var/spool/cvmfs/" + repo_name + "/client.local";
260 if (truncate(fname.c_str(), 0) < 0) {
261 LogCvmfs(kLogReceiver, kLogSyslogErr, "Could not truncate %s\n",
262 fname.c_str());
263 return kError;
264 }
265
266 StatisticsDatabase *stats_db = StatisticsDatabase::OpenStandardDB(repo_name);
267 if (stats_db != NULL) {
268 if (!stats_db->StorePublishStatistics(statistics_, start_time_, true)) {
269 LogCvmfs(kLogReceiver, kLogSyslogErr,
270 "Could not store publish statistics");
271 }
272 if (params.upload_stats_db) {
273 upload::SpoolerDefinition sd(params.spooler_configuration, shash::kAny);
274 upload::Spooler *spooler = upload::Spooler::Construct(sd);
275 if (!stats_db->UploadStatistics(spooler)) {
276 LogCvmfs(kLogReceiver, kLogSyslogErr,
277 "Could not upload statistics DB to upstream storage");
278 }
279 delete spooler;
280 }
281 delete stats_db;
282
283 } else {
284 LogCvmfs(kLogReceiver, kLogSyslogErr, "Could not open statistics DB");
285 }
286
287 return kSuccess;
288 }
289
290 void CommitProcessor::SetStatistics(perf::Statistics *st,
291 const std::string &start_time)
292 {
293 statistics_ = st;
294 statistics_->Register("publish.revision", "");
295 start_time_ = start_time;
296 }
297
298 } // namespace receiver
299