GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/receiver/commit_processor.cc
Date: 2024-04-28 02:33:07
Exec Total Coverage
Lines: 0 134 0.0%
Branches: 0 374 0.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "commit_processor.h"
6
7 #include <time.h>
8
9 #include <vector>
10
11 #include "catalog_diff_tool.h"
12 #include "catalog_merge_tool.h"
13 #include "catalog_mgr_ro.h"
14 #include "catalog_mgr_rw.h"
15 #include "compression.h"
16 #include "manifest.h"
17 #include "manifest_fetch.h"
18 #include "network/download.h"
19 #include "params.h"
20 #include "signing_tool.h"
21 #include "statistics.h"
22 #include "statistics_database.h"
23 #include "swissknife.h"
24 #include "swissknife_history.h"
25 #include "util/algorithm.h"
26 #include "util/logging.h"
27 #include "util/pointer.h"
28 #include "util/posix.h"
29 #include "util/raii_temp_dir.h"
30 #include "util/string.h"
31
32 namespace {
33
34 PathString RemoveRepoName(const PathString& lease_path) {
35 std::string abs_path = lease_path.ToString();
36 std::string::const_iterator it =
37 std::find(abs_path.begin(), abs_path.end(), '/');
38 if (it != abs_path.end()) {
39 size_t idx = it - abs_path.begin() + 1;
40 return lease_path.Suffix(idx);
41 } else {
42 return lease_path;
43 }
44 }
45
46 bool CreateNewTag(const RepositoryTag& repo_tag, const std::string& repo_name,
47 const receiver::Params& params, const std::string& temp_dir,
48 const std::string& manifest_path,
49 const std::string& public_key_path,
50 const std::string& proxy) {
51 swissknife::ArgumentList args;
52 args['r'].Reset(new std::string(params.spooler_configuration));
53 args['w'].Reset(new std::string(params.stratum0));
54 args['t'].Reset(new std::string(temp_dir));
55 args['m'].Reset(new std::string(manifest_path));
56 args['p'].Reset(new std::string(public_key_path));
57 args['f'].Reset(new std::string(repo_name));
58 args['e'].Reset(new std::string(params.hash_alg_str));
59 args['a'].Reset(new std::string(repo_tag.name()));
60 args['D'].Reset(new std::string(repo_tag.description()));
61 args['x'].Reset(new std::string());
62 args['@'].Reset(new std::string(proxy));
63
64 UniquePtr<swissknife::CommandEditTag> edit_cmd(
65 new swissknife::CommandEditTag());
66 const int ret = edit_cmd->Main(args);
67
68 if (ret) {
69 LogCvmfs(kLogReceiver, kLogSyslogErr, "Error %d creating tag: %s", ret,
70 repo_tag.name().c_str());
71 return false;
72 }
73
74 return true;
75 }
76
77 } // namespace
78
79 namespace receiver {
80
81 CommitProcessor::CommitProcessor() : num_errors_(0), statistics_(NULL) {}
82
83 CommitProcessor::~CommitProcessor() {}
84
85 /**
86 * Applies the changes from the new catalog onto the repository.
87 *
88 * Let:
89 * + C_O = the root catalog of the repository (given by old_root_hash) at
90 * the beginning of the lease, on the release manager machine
91 * + C_N = the root catalog of the repository (given by new_root_hash), on
92 * the release manager machine, with the changes introduced during the
93 * lease
94 * + C_G = the current root catalog of the repository on the gateway machine.
95 *
96 * This method applies all the changes from C_N, with respect to C_O, onto C_G.
97 * The resulting catalog on the gateway machine (C_GN) is then set as root
98 * catalog in the repository manifest. The method also signs the updated
99 * repository manifest.
100 */
101 CommitProcessor::Result CommitProcessor::Process(
102 const std::string& lease_path, const shash::Any& old_root_hash,
103 const shash::Any& new_root_hash, const RepositoryTag& tag,
104 uint64_t *final_revision) {
105 RepositoryTag final_tag = tag;
106 // If tag_name is a generic tag, update the time stamp
107 if (final_tag.HasGenericName()) {
108 final_tag.SetGenericName();
109 }
110
111 LogCvmfs(kLogReceiver, kLogSyslog,
112 "CommitProcessor - lease_path: %s, old hash: %s, new hash: %s, "
113 "tag_name: %s, tag_description: %s",
114 lease_path.c_str(), old_root_hash.ToString(true).c_str(),
115 new_root_hash.ToString(true).c_str(), final_tag.name().c_str(),
116 final_tag.description().c_str());
117
118 const std::vector<std::string> lease_path_tokens =
119 SplitString(lease_path, '/');
120
121 const std::string repo_name = lease_path_tokens.front();
122
123 Params params;
124 if (!GetParamsFromFile(repo_name, &params)) {
125 LogCvmfs(
126 kLogReceiver, kLogSyslogErr,
127 "CommitProcessor - error: Could not get configuration parameters.");
128 return kError;
129 }
130
131 UniquePtr<ServerTool> server_tool(new ServerTool());
132
133 if (!server_tool->InitDownloadManager(true, params.proxy)) {
134 LogCvmfs(
135 kLogReceiver, kLogSyslogErr,
136 "CommitProcessor - error: Could not initialize the download manager");
137 return kError;
138 }
139
140 const std::string public_key = "/etc/cvmfs/keys/" + repo_name + ".pub";
141 if (!server_tool->InitVerifyingSignatureManager(public_key)) {
142 LogCvmfs(
143 kLogReceiver, kLogSyslogErr,
144 "CommitProcessor - error: Could not initialize the signature manager");
145 return kError;
146 }
147
148 shash::Any manifest_base_hash;
149 UniquePtr<manifest::Manifest> manifest(server_tool->FetchRemoteManifest(
150 params.stratum0, repo_name, manifest_base_hash));
151
152 // Current catalog from the gateway machine
153 if (!manifest.IsValid()) {
154 LogCvmfs(kLogReceiver, kLogSyslogErr,
155 "CommitProcessor - error: Could not open repository manifest");
156 return kError;
157 }
158
159 LogCvmfs(kLogReceiver, kLogSyslog,
160 "CommitProcessor - lease_path: %s, target root hash: %s",
161 lease_path.c_str(),
162 manifest->catalog_hash().ToString(false).c_str());
163
164 const std::string spooler_temp_dir =
165 GetSpoolerTempDir(params.spooler_configuration);
166 assert(!spooler_temp_dir.empty());
167 assert(MkdirDeep(spooler_temp_dir + "/receiver", 0666, true));
168 const std::string temp_dir_root =
169 spooler_temp_dir + "/receiver/commit_processor";
170
171 const PathString relative_lease_path = RemoveRepoName(PathString(lease_path));
172
173 LogCvmfs(kLogReceiver, kLogSyslog,
174 "CommitProcessor - lease_path: %s, merging catalogs",
175 lease_path.c_str());
176
177 CatalogMergeTool<catalog::WritableCatalogManager,
178 catalog::SimpleCatalogManager>
179 merge_tool(params.stratum0, old_root_hash, new_root_hash,
180 relative_lease_path, temp_dir_root,
181 server_tool->download_manager(), manifest.weak_ref(),
182 statistics_);
183 if (!merge_tool.Init()) {
184 LogCvmfs(kLogReceiver, kLogSyslogErr,
185 "Error: Could not initialize the catalog merge tool");
186 return kError;
187 }
188
189 std::string new_manifest_path;
190 if (!merge_tool.Run(params, &new_manifest_path, final_revision)) {
191 LogCvmfs(kLogReceiver, kLogSyslogErr,
192 "CommitProcessor - error: Catalog merge failed");
193 return kMergeFailure;
194 }
195
196 UniquePtr<RaiiTempDir> raii_temp_dir(RaiiTempDir::Create(temp_dir_root));
197 const std::string temp_dir = raii_temp_dir->dir();
198 const std::string certificate = "/etc/cvmfs/keys/" + repo_name + ".crt";
199 const std::string private_key = "/etc/cvmfs/keys/" + repo_name + ".key";
200
201 if (!CreateNewTag(final_tag, repo_name, params, temp_dir, new_manifest_path,
202 public_key, params.proxy)) {
203 LogCvmfs(kLogReceiver, kLogSyslogErr, "Error creating tag: %s",
204 final_tag.name().c_str());
205 return kError;
206 }
207
208 // We need to re-initialize the ServerTool component for signing
209 server_tool.Destroy();
210 server_tool = new ServerTool();
211
212 LogCvmfs(kLogReceiver, kLogSyslog,
213 "CommitProcessor - lease_path: %s, signing manifest",
214 lease_path.c_str());
215
216 // Add C_N root catalog hash to reflog through SigningTool,
217 // so garbage collector can later delete it.
218 std::vector<shash::Any> reflog_catalogs;
219 reflog_catalogs.push_back(new_root_hash);
220
221 SigningTool signing_tool(server_tool.weak_ref());
222 SigningTool::Result res = signing_tool.Run(
223 new_manifest_path, params.stratum0, params.spooler_configuration,
224 temp_dir, certificate, private_key, repo_name, "", "",
225 "/var/spool/cvmfs/" + repo_name + "/reflog.chksum", params.proxy,
226 params.garbage_collection, false, false, reflog_catalogs);
227 switch (res) {
228 case SigningTool::kReflogChecksumMissing:
229 LogCvmfs(kLogReceiver, kLogSyslogErr,
230 "CommitProcessor - error: missing reflog.chksum");
231 return kMissingReflog;
232 case SigningTool::kReflogMissing:
233 LogCvmfs(kLogReceiver, kLogSyslogErr,
234 "CommitProcessor - error: missing reflog");
235 return kMissingReflog;
236 case SigningTool::kError:
237 case SigningTool::kInitError:
238 LogCvmfs(kLogReceiver, kLogSyslogErr,
239 "CommitProcessor - error: signing manifest");
240 return kError;
241 case SigningTool::kSuccess:
242 LogCvmfs(kLogReceiver, kLogSyslog,
243 "CommitProcessor - lease_path: %s, success.",
244 lease_path.c_str());
245 }
246
247 {
248 UniquePtr<ServerTool> server_tool(new ServerTool());
249
250 if (!server_tool->InitDownloadManager(true, params.proxy)) {
251 LogCvmfs(
252 kLogReceiver, kLogSyslogErr,
253 "CommitProcessor - error: Could not initialize the download manager");
254 return kError;
255 }
256
257 const std::string public_key = "/etc/cvmfs/keys/" + repo_name + ".pub";
258 if (!server_tool->InitVerifyingSignatureManager(public_key)) {
259 LogCvmfs(kLogReceiver, kLogSyslogErr,
260 "CommitProcessor - error: Could not initialize the signature "
261 "manager");
262 return kError;
263 }
264
265 shash::Any manifest_base_hash;
266 UniquePtr<manifest::Manifest> manifest(server_tool->FetchRemoteManifest(
267 params.stratum0, repo_name, manifest_base_hash));
268
269 LogCvmfs(kLogReceiver, kLogSyslog,
270 "CommitProcessor - lease_path: %s, new root hash: %s",
271 lease_path.c_str(),
272 manifest->catalog_hash().ToString(false).c_str());
273 }
274
275 // Ensure CVMFS_ROOT_HASH is not set in
276 // /var/spool/cvmfs/<REPO_NAME>/client.local
277 const std::string fname = "/var/spool/cvmfs/" + repo_name + "/client.local";
278 if (truncate(fname.c_str(), 0) < 0) {
279 LogCvmfs(kLogReceiver, kLogSyslogErr, "Could not truncate %s\n",
280 fname.c_str());
281 return kError;
282 }
283
284 StatisticsDatabase *stats_db = StatisticsDatabase::OpenStandardDB(repo_name);
285 if (stats_db != NULL) {
286 if (!stats_db->StorePublishStatistics(statistics_, start_time_, true)) {
287 LogCvmfs(kLogReceiver, kLogSyslogErr,
288 "Could not store publish statistics");
289 }
290 if (params.upload_stats_db) {
291 upload::SpoolerDefinition sd(params.spooler_configuration, shash::kAny);
292 upload::Spooler *spooler = upload::Spooler::Construct(sd);
293 if (!stats_db->UploadStatistics(spooler)) {
294 LogCvmfs(kLogReceiver, kLogSyslogErr,
295 "Could not upload statistics DB to upstream storage");
296 }
297 delete spooler;
298 }
299 delete stats_db;
300
301 } else {
302 LogCvmfs(kLogReceiver, kLogSyslogErr, "Could not open statistics DB");
303 }
304
305 return kSuccess;
306 }
307
308 void CommitProcessor::SetStatistics(perf::Statistics *st,
309 const std::string &start_time)
310 {
311 statistics_ = st;
312 statistics_->Register("publish.revision", "");
313 start_time_ = start_time;
314 }
315
316 } // namespace receiver
317