CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
commit_processor.cc
Go to the documentation of this file.
1 
5 #include "commit_processor.h"
6 
7 #include <time.h>
8 
9 #include <vector>
10 
11 #include "catalog_diff_tool.h"
12 #include "catalog_merge_tool.h"
13 #include "catalog_mgr_ro.h"
14 #include "catalog_mgr_rw.h"
16 #include "manifest.h"
17 #include "manifest_fetch.h"
18 #include "network/download.h"
19 #include "params.h"
20 #include "signing_tool.h"
21 #include "statistics.h"
22 #include "statistics_database.h"
23 #include "swissknife.h"
24 #include "swissknife_history.h"
25 #include "util/algorithm.h"
26 #include "util/logging.h"
27 #include "util/pointer.h"
28 #include "util/posix.h"
29 #include "util/raii_temp_dir.h"
30 #include "util/string.h"
31 
32 namespace {
33 
34 PathString RemoveRepoName(const PathString &lease_path) {
35  std::string abs_path = lease_path.ToString();
36  const std::string::const_iterator it =
37  std::find(abs_path.begin(), abs_path.end(), '/');
38  if (it != abs_path.end()) {
39  const size_t idx = it - abs_path.begin() + 1;
40  return lease_path.Suffix(idx);
41  } else {
42  return lease_path;
43  }
44 }
45 
46 bool CreateNewTag(const RepositoryTag &repo_tag, const std::string &repo_name,
47  const receiver::Params &params, const std::string &temp_dir,
48  const std::string &manifest_path,
49  const std::string &public_key_path,
50  const std::string &proxy) {
52  args['r'].Reset(new std::string(params.spooler_configuration));
53  args['w'].Reset(new std::string(params.stratum0));
54  args['t'].Reset(new std::string(temp_dir));
55  args['m'].Reset(new std::string(manifest_path));
56  args['p'].Reset(new std::string(public_key_path));
57  args['f'].Reset(new std::string(repo_name));
58  args['e'].Reset(new std::string(params.hash_alg_str));
59  args['a'].Reset(new std::string(repo_tag.name()));
60  args['D'].Reset(new std::string(repo_tag.description()));
61  args['x'].Reset(new std::string());
62  args['@'].Reset(new std::string(proxy));
63 
66  const int ret = edit_cmd->Main(args);
67 
68  if (ret) {
69  LogCvmfs(kLogReceiver, kLogSyslogErr, "Error %d creating tag: %s", ret,
70  repo_tag.name().c_str());
71  return false;
72  }
73 
74  return true;
75 }
76 
77 } // namespace
78 
79 namespace receiver {
80 
81 CommitProcessor::CommitProcessor() : num_errors_(0), statistics_(NULL) { }
82 
84 
102  const std::string &lease_path, const shash::Any &old_root_hash,
103  const shash::Any &new_root_hash, const RepositoryTag &tag,
104  uint64_t *final_revision) {
105  RepositoryTag final_tag = tag;
106  // If tag_name is a generic tag, update the time stamp
107  if (final_tag.HasGenericName()) {
108  final_tag.SetGenericName();
109  }
110 
112  "CommitProcessor - lease_path: %s, old hash: %s, new hash: %s, "
113  "tag_name: %s, tag_description: %s",
114  lease_path.c_str(), old_root_hash.ToString(true).c_str(),
115  new_root_hash.ToString(true).c_str(), final_tag.name().c_str(),
116  final_tag.description().c_str());
117 
118  const std::vector<std::string> lease_path_tokens = SplitString(lease_path,
119  '/');
120 
121  const std::string repo_name = lease_path_tokens.front();
122 
123  Params params;
124  if (!GetParamsFromFile(repo_name, &params)) {
125  LogCvmfs(
127  "CommitProcessor - error: Could not get configuration parameters.");
128  return kError;
129  }
130 
131  const UniquePtr<ServerTool> server_tool(new ServerTool());
132 
133  if (!server_tool->InitDownloadManager(true, params.proxy)) {
134  LogCvmfs(
136  "CommitProcessor - error: Could not initialize the download manager");
137  return kError;
138  }
139 
140  const std::string public_key = "/etc/cvmfs/keys/" + repo_name + ".pub";
141  const std::string certificate = "/etc/cvmfs/keys/" + repo_name + ".crt";
142  const std::string private_key = "/etc/cvmfs/keys/" + repo_name + ".key";
143  if (!server_tool->InitSignatureManager(public_key, certificate,
144  private_key)) {
145  LogCvmfs(
147  "CommitProcessor - error: Could not initialize the signature manager");
148  return kError;
149  }
150 
151  const shash::Any manifest_base_hash;
152  const UniquePtr<manifest::Manifest> manifest_tgt(
153  server_tool->FetchRemoteManifest(params.stratum0, repo_name,
154  manifest_base_hash));
155 
156  // Current catalog from the gateway machine
157  if (!manifest_tgt.IsValid()) {
159  "CommitProcessor - error: Could not open repository manifest");
160  return kError;
161  }
162 
164  "CommitProcessor - lease_path: %s, target root hash: %s",
165  lease_path.c_str(),
166  manifest_tgt->catalog_hash().ToString(false).c_str());
167 
168 
169  std::string cache_dir_;
170  if (params.use_local_cache) {
171  cache_dir_ = "/var/spool/cvmfs/" + repo_name + "/cache.server";
172  }
173 
174  const std::string spooler_temp_dir = GetSpoolerTempDir(
175  params.spooler_configuration);
176  assert(!spooler_temp_dir.empty());
177  assert(MkdirDeep(spooler_temp_dir + "/receiver", 0755, true));
178  const std::string temp_dir_root = spooler_temp_dir
179  + "/receiver/commit_processor";
180 
181  const PathString relative_lease_path = RemoveRepoName(PathString(lease_path));
182 
184  "CommitProcessor - lease_path: %s, merging catalogs",
185  lease_path.c_str());
186 
189  merge_tool(params.stratum0, old_root_hash, new_root_hash,
190  relative_lease_path, temp_dir_root,
191  server_tool->download_manager(), manifest_tgt.weak_ref(),
192  statistics_, cache_dir_);
193  if (!merge_tool.Init()) {
195  "Error: Could not initialize the catalog merge tool");
196  return kError;
197  }
198 
199  std::string new_manifest_path;
200  shash::Any new_manifest_hash;
201  if (!merge_tool.Run(params, &new_manifest_path, &new_manifest_hash,
202  final_revision)) {
204  "CommitProcessor - error: Catalog merge failed");
205  return kMergeFailure;
206  }
207 
208  const UniquePtr<RaiiTempDir> raii_temp_dir(
209  RaiiTempDir::Create(temp_dir_root));
210  const std::string temp_dir = raii_temp_dir->dir();
211 
212  if (!CreateNewTag(final_tag, repo_name, params, temp_dir, new_manifest_path,
213  public_key, params.proxy)) {
214  LogCvmfs(kLogReceiver, kLogSyslogErr, "Error creating tag: %s",
215  final_tag.name().c_str());
216  return kError;
217  }
218 
220  "CommitProcessor - lease_path: %s, signing manifest",
221  lease_path.c_str());
222 
223  // Add C_N root catalog hash to reflog through SigningTool,
224  // so garbage collector can later delete it.
225  std::vector<shash::Any> reflog_catalogs;
226  reflog_catalogs.push_back(new_root_hash);
227 
228  SigningTool signing_tool(server_tool.weak_ref());
229  const SigningTool::Result res = signing_tool.Run(
230  new_manifest_path, params.stratum0, params.spooler_configuration,
231  temp_dir, certificate, private_key, repo_name, "", "",
232  "/var/spool/cvmfs/" + repo_name + "/reflog.chksum", params.proxy,
233  params.garbage_collection, false, false, reflog_catalogs);
234  switch (res) {
237  "CommitProcessor - error: missing reflog.chksum");
238  return kMissingReflog;
241  "CommitProcessor - error: missing reflog");
242  return kMissingReflog;
243  case SigningTool::kError:
246  "CommitProcessor - error: signing manifest");
247  return kError;
250  "CommitProcessor - lease_path: %s, success.",
251  lease_path.c_str());
252  }
253 
255  "CommitProcessor - lease_path: %s, new root hash: %s",
256  lease_path.c_str(), new_manifest_hash.ToString(false).c_str());
257 
258  // Ensure CVMFS_ROOT_HASH is not set in
259  // /var/spool/cvmfs/<REPO_NAME>/client.local
260  const std::string fname = "/var/spool/cvmfs/" + repo_name + "/client.local";
261  if (truncate(fname.c_str(), 0) < 0) {
262  LogCvmfs(kLogReceiver, kLogSyslogErr, "Could not truncate %s\n",
263  fname.c_str());
264  return kError;
265  }
266 
268  if (stats_db != NULL) {
269  if (!stats_db->StorePublishStatistics(statistics_, start_time_, true)) {
271  "Could not store publish statistics");
272  }
273  if (params.upload_stats_db) {
275  shash::kAny);
276  upload::Spooler *spooler = upload::Spooler::Construct(sd);
277  if (!stats_db->UploadStatistics(spooler)) {
279  "Could not upload statistics DB to upstream storage");
280  }
281  delete spooler;
282  }
283  delete stats_db;
284 
285  } else {
286  LogCvmfs(kLogReceiver, kLogSyslogErr, "Could not open statistics DB");
287  }
288 
289  return kSuccess;
290 }
291 
293  const std::string &start_time) {
294  statistics_ = st;
295  statistics_->Register("publish.revision", "");
296  start_time_ = start_time;
297 }
298 
299 } // namespace receiver
std::string name() const
Counter * Register(const std::string &name, const std::string &desc)
Definition: statistics.cc:163
void SetGenericName()
Result Run(const std::string &manifest_path, const std::string &repo_url, const std::string &spooler_definition, const std::string &temp_dir, const std::string &certificate="", const std::string &priv_key="", const std::string &repo_name="", const std::string &pwd="", const std::string &meta_info="", const std::string &reflog_chksum_path="", const std::string &proxy="", const bool garbage_collectable=false, const bool bootstrap_shortcuts=false, const bool return_early=false, const std::vector< shash::Any > reflog_catalogs=std::vector< shash::Any >())
Definition: signing_tool.cc:28
std::string description() const
std::string spooler_configuration
Definition: params.h:20
bool UploadStatistics(upload::Spooler *spooler, std::string local_path="")
ShortString Suffix(const unsigned start_at) const
Definition: shortstring.h:194
perf::Statistics * statistics_
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:241
int Main(const ArgumentList &args)
bool GetParamsFromFile(const std::string &repo_name, Params *params)
Definition: params.cc:20
perf::Statistics * statistics_
Definition: repository.h:138
manifest::Manifest * FetchRemoteManifest(const std::string &repository_url, const std::string &repository_name, const shash::Any &base_hash=shash::Any()) const
Definition: server_tool.cc:121
assert((mem||(size==0))&&"Out Of Memory")
std::string GetSpoolerTempDir(const std::string &spooler_config)
Definition: params.cc:14
std::string hash_alg_str
Definition: params.h:23
bool CreateNewTag(const RepositoryTag &repo_tag, const std::string &repo_name, const receiver::Params &params, const std::string &temp_dir, const std::string &manifest_path, const std::string &public_key_path, const std::string &proxy)
download::DownloadManager * download_manager() const
Definition: server_tool.cc:96
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:306
std::string stratum0
Definition: params.h:18
static RaiiTempDir * Create(const std::string &prefix)
Definition: raii_temp_dir.cc:9
bool MkdirDeep(const std::string &path, const mode_t mode, bool verify_writable)
Definition: posix.cc:855
bool use_local_cache
Definition: params.h:21
PathString RemoveRepoName(const PathString &lease_path)
static StatisticsDatabase * OpenStandardDB(const std::string repo_name)
bool garbage_collection
Definition: params.h:34
void SetStatistics(perf::Statistics *st, const std::string &start_time)
bool StorePublishStatistics(const perf::Statistics *statistics, const std::string &start_time, const bool success)
bool HasGenericName()
bool InitSignatureManager(const std::string &pubkey_path, const std::string &certificate_path="", const std::string &private_key_path="")
Definition: server_tool.cc:44
std::string ToString() const
Definition: shortstring.h:139
std::map< char, SharedPtr< std::string > > ArgumentList
Definition: swissknife.h:72
ShortString< kDefaultMaxPath, 0 > PathString
Definition: shortstring.h:213
Result Process(const std::string &lease_path, const shash::Any &old_root_hash, const shash::Any &new_root_hash, const RepositoryTag &tag, uint64_t *final_revision)
std::string proxy
Definition: params.h:19
bool upload_stats_db
Definition: params.h:38
bool InitDownloadManager(const bool follow_redirects, const std::string &proxy, const unsigned max_pool_handles=1)
Definition: server_tool.cc:17
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545