| Directory: | cvmfs/ |
|---|---|
| File: | cvmfs/receiver/commit_processor.cc |
| Date: | 2026-06-28 02:36:10 |
| Exec | Total | Coverage | |
|---|---|---|---|
| Lines: | 52 | 242 | 21.5% |
| Branches: | 58 | 574 | 10.1% |
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | /** | ||
| 2 | * This file is part of the CernVM File System. | ||
| 3 | */ | ||
| 4 | |||
| 5 | #include "commit_processor.h" | ||
| 6 | |||
| 7 | #include <time.h> | ||
| 8 | |||
| 9 | #include <cctype> | ||
| 10 | #include <string> | ||
| 11 | #include <vector> | ||
| 12 | |||
| 13 | #include "catalog_diff_tool.h" | ||
| 14 | #include "catalog_merge_tool.h" | ||
| 15 | #include "catalog_mgr_ro.h" | ||
| 16 | #include "catalog_mgr_rw.h" | ||
| 17 | #include "compression/compression.h" | ||
| 18 | #include "manifest.h" | ||
| 19 | #include "manifest_fetch.h" | ||
| 20 | #include "network/download.h" | ||
| 21 | #include "network/sink_path.h" | ||
| 22 | #include "params.h" | ||
| 23 | #include "signing_tool.h" | ||
| 24 | #include "statistics.h" | ||
| 25 | #include "statistics_database.h" | ||
| 26 | #include "swissknife.h" | ||
| 27 | #include "swissknife_history.h" | ||
| 28 | #include "util/algorithm.h" | ||
| 29 | #include "util/logging.h" | ||
| 30 | #include "util/pointer.h" | ||
| 31 | #include "util/posix.h" | ||
| 32 | #include "util/raii_temp_dir.h" | ||
| 33 | #include "util/string.h" | ||
| 34 | |||
| 35 | namespace { | ||
| 36 | |||
| 37 | ✗ | PathString RemoveRepoName(const PathString &lease_path) { | |
| 38 | ✗ | std::string abs_path = lease_path.ToString(); | |
| 39 | ✗ | const std::string::const_iterator it = std::find(abs_path.begin(), | |
| 40 | ✗ | abs_path.end(), '/'); | |
| 41 | ✗ | if (it != abs_path.end()) { | |
| 42 | ✗ | const size_t idx = it - abs_path.begin() + 1; | |
| 43 | ✗ | return lease_path.Suffix(idx); | |
| 44 | } else { | ||
| 45 | ✗ | return lease_path; | |
| 46 | } | ||
| 47 | } | ||
| 48 | |||
| 49 | ✗ | bool EditTags(const RepositoryTag &repo_tag, const std::string &repo_name, | |
| 50 | const receiver::Params ¶ms, const std::string &temp_dir, | ||
| 51 | const std::string &manifest_path, | ||
| 52 | const std::string &public_key_path, | ||
| 53 | const std::string &proxy, | ||
| 54 | const time_t auto_tag_threshold, | ||
| 55 | const bool maintain_undo_tags) { | ||
| 56 | ✗ | swissknife::ArgumentList args; | |
| 57 | ✗ | args['r'].Reset(new std::string(params.spooler_configuration)); | |
| 58 | ✗ | args['w'].Reset(new std::string(params.stratum0)); | |
| 59 | ✗ | args['t'].Reset(new std::string(temp_dir)); | |
| 60 | ✗ | args['m'].Reset(new std::string(manifest_path)); | |
| 61 | ✗ | args['p'].Reset(new std::string(public_key_path)); | |
| 62 | ✗ | args['f'].Reset(new std::string(repo_name)); | |
| 63 | ✗ | args['e'].Reset(new std::string(params.hash_alg_str)); | |
| 64 | ✗ | args['a'].Reset(new std::string(repo_tag.name())); | |
| 65 | ✗ | args['D'].Reset(new std::string(repo_tag.description())); | |
| 66 | ✗ | if (maintain_undo_tags) { | |
| 67 | ✗ | args['x'].Reset(new std::string()); | |
| 68 | } | ||
| 69 | ✗ | args['@'].Reset(new std::string(proxy)); | |
| 70 | // Remove the tags requested by `cvmfs_server tag -r` in the same history | ||
| 71 | // transaction as the (possibly empty) new tag, so a single new history | ||
| 72 | // database is published and registered in the reflog for this commit. | ||
| 73 | ✗ | if (!repo_tag.delete_tags().empty()) { | |
| 74 | ✗ | args['d'].Reset(new std::string(repo_tag.delete_tags())); | |
| 75 | } | ||
| 76 | // Remove outdated auto-generated tags in the same history transaction as the | ||
| 77 | // tag we are about to add, so that only a single new history database is | ||
| 78 | // published (and registered in the reflog) for this commit. | ||
| 79 | ✗ | if (auto_tag_threshold > 0) { | |
| 80 | ✗ | args['c'].Reset(new std::string(StringifyInt(auto_tag_threshold))); | |
| 81 | } | ||
| 82 | |||
| 83 | const UniquePtr<swissknife::CommandEditTag> edit_cmd( | ||
| 84 | ✗ | new swissknife::CommandEditTag()); | |
| 85 | ✗ | const int ret = edit_cmd->Main(args); | |
| 86 | |||
| 87 | ✗ | if (ret) { | |
| 88 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, "Error %d editing tags (add: '%s')", | |
| 89 | ✗ | ret, repo_tag.name().c_str()); | |
| 90 | ✗ | return false; | |
| 91 | } | ||
| 92 | |||
| 93 | ✗ | return true; | |
| 94 | } | ||
| 95 | |||
| 96 | } // namespace | ||
| 97 | |||
| 98 | namespace receiver { | ||
| 99 | |||
| 100 | // See commit_processor.h for the contract. `now` is injected so the parser is | ||
| 101 | // deterministic and unit-testable. | ||
| 102 | 792 | time_t ParseRelativeTimespan(const std::string ×pan, time_t now) { | |
| 103 | // Tokenize on whitespace, lower-casing as we go. | ||
| 104 | 792 | std::vector<std::string> tokens; | |
| 105 | 792 | std::string current; | |
| 106 |
2/2✓ Branch 1 taken 8481 times.
✓ Branch 2 taken 792 times.
|
9273 | for (size_t i = 0; i < timespan.size(); ++i) { |
| 107 | 8481 | const unsigned char c = static_cast<unsigned char>(timespan[i]); | |
| 108 |
2/2✓ Branch 0 taken 1518 times.
✓ Branch 1 taken 6963 times.
|
8481 | if (isspace(c)) { |
| 109 |
2/2✓ Branch 1 taken 1287 times.
✓ Branch 2 taken 231 times.
|
1518 | if (!current.empty()) { |
| 110 |
1/2✓ Branch 1 taken 1287 times.
✗ Branch 2 not taken.
|
1287 | tokens.push_back(current); |
| 111 | 1287 | current.clear(); | |
| 112 | } | ||
| 113 | } else { | ||
| 114 |
1/2✓ Branch 1 taken 6963 times.
✗ Branch 2 not taken.
|
6963 | current += static_cast<char>(tolower(c)); |
| 115 | } | ||
| 116 | } | ||
| 117 |
2/2✓ Branch 1 taken 726 times.
✓ Branch 2 taken 66 times.
|
792 | if (!current.empty()) { |
| 118 |
1/2✓ Branch 1 taken 726 times.
✗ Branch 2 not taken.
|
726 | tokens.push_back(current); |
| 119 | } | ||
| 120 | |||
| 121 | // Expect exactly "<number> <unit> ago". | ||
| 122 |
6/6✓ Branch 1 taken 594 times.
✓ Branch 2 taken 198 times.
✓ Branch 5 taken 33 times.
✓ Branch 6 taken 561 times.
✓ Branch 7 taken 231 times.
✓ Branch 8 taken 561 times.
|
792 | if (tokens.size() != 3 || tokens[2] != "ago") { |
| 123 | 231 | return 0; | |
| 124 | } | ||
| 125 | 561 | const std::string &number = tokens[0]; | |
| 126 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 561 times.
|
561 | if (number.empty()) { |
| 127 | ✗ | return 0; | |
| 128 | } | ||
| 129 |
2/2✓ Branch 1 taken 825 times.
✓ Branch 2 taken 528 times.
|
1353 | for (size_t i = 0; i < number.size(); ++i) { |
| 130 |
2/2✓ Branch 1 taken 33 times.
✓ Branch 2 taken 792 times.
|
825 | if (!isdigit(static_cast<unsigned char>(number[i]))) { |
| 131 | 33 | return 0; | |
| 132 | } | ||
| 133 | } | ||
| 134 |
1/2✓ Branch 1 taken 528 times.
✗ Branch 2 not taken.
|
528 | const int64_t count = String2Int64(number); |
| 135 | |||
| 136 | // De-pluralize the unit. | ||
| 137 |
1/2✓ Branch 2 taken 528 times.
✗ Branch 3 not taken.
|
528 | std::string unit = tokens[1]; |
| 138 |
6/8✓ Branch 1 taken 528 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 528 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 396 times.
✓ Branch 8 taken 132 times.
✓ Branch 9 taken 396 times.
✓ Branch 10 taken 132 times.
|
528 | if (!unit.empty() && unit[unit.size() - 1] == 's') { |
| 139 |
1/2✓ Branch 2 taken 396 times.
✗ Branch 3 not taken.
|
396 | unit.resize(unit.size() - 1); |
| 140 | } | ||
| 141 | |||
| 142 | // Fixed-length units can be subtracted directly. | ||
| 143 | 528 | int64_t factor = 0; | |
| 144 |
6/6✓ Branch 1 taken 495 times.
✓ Branch 2 taken 33 times.
✓ Branch 4 taken 66 times.
✓ Branch 5 taken 429 times.
✓ Branch 6 taken 99 times.
✓ Branch 7 taken 429 times.
|
528 | if (unit == "sec" || unit == "second") { |
| 145 | 99 | factor = 1; | |
| 146 |
6/6✓ Branch 1 taken 396 times.
✓ Branch 2 taken 33 times.
✓ Branch 4 taken 33 times.
✓ Branch 5 taken 363 times.
✓ Branch 6 taken 66 times.
✓ Branch 7 taken 363 times.
|
429 | } else if (unit == "min" || unit == "minute") { |
| 147 | 66 | factor = 60; | |
| 148 |
2/2✓ Branch 1 taken 33 times.
✓ Branch 2 taken 330 times.
|
363 | } else if (unit == "hour") { |
| 149 | 33 | factor = 3600; | |
| 150 |
2/2✓ Branch 1 taken 165 times.
✓ Branch 2 taken 165 times.
|
330 | } else if (unit == "day") { |
| 151 | 165 | factor = 86400; | |
| 152 |
2/2✓ Branch 1 taken 33 times.
✓ Branch 2 taken 132 times.
|
165 | } else if (unit == "week") { |
| 153 | 33 | factor = 604800; | |
| 154 | } | ||
| 155 |
2/2✓ Branch 0 taken 396 times.
✓ Branch 1 taken 132 times.
|
528 | if (factor > 0) { |
| 156 | 396 | return now - static_cast<time_t>(count * factor); | |
| 157 | } | ||
| 158 | |||
| 159 | // Calendar units: let mktime() normalize the broken-down time. | ||
| 160 | struct tm broken_time; | ||
| 161 | 132 | localtime_r(&now, &broken_time); | |
| 162 |
2/2✓ Branch 1 taken 66 times.
✓ Branch 2 taken 66 times.
|
132 | if (unit == "month") { |
| 163 | 66 | broken_time.tm_mon -= static_cast<int>(count); | |
| 164 | 66 | return mktime(&broken_time); | |
| 165 | } | ||
| 166 |
2/2✓ Branch 1 taken 33 times.
✓ Branch 2 taken 33 times.
|
66 | if (unit == "year") { |
| 167 | 33 | broken_time.tm_year -= static_cast<int>(count); | |
| 168 | 33 | return mktime(&broken_time); | |
| 169 | } | ||
| 170 | |||
| 171 | 33 | return 0; | |
| 172 | 792 | } | |
| 173 | |||
| 174 | 9 | CommitProcessor::CommitProcessor() : num_errors_(0), statistics_(NULL) { } | |
| 175 | |||
| 176 | 18 | CommitProcessor::~CommitProcessor() { } | |
| 177 | |||
| 178 | /** | ||
| 179 | * Applies the changes from the new catalog onto the repository. | ||
| 180 | * | ||
| 181 | * Let: | ||
| 182 | * + C_O = the root catalog of the repository (given by old_root_hash) at | ||
| 183 | * the beginning of the lease, on the release manager machine | ||
| 184 | * + C_N = the root catalog of the repository (given by new_root_hash), on | ||
| 185 | * the release manager machine, with the changes introduced during the | ||
| 186 | * lease | ||
| 187 | * + C_G = the current root catalog of the repository on the gateway machine. | ||
| 188 | * | ||
| 189 | * This method applies all the changes from C_N, with respect to C_O, onto C_G. | ||
| 190 | * The resulting catalog on the gateway machine (C_GN) is then set as root | ||
| 191 | * catalog in the repository manifest. The method also signs the updated | ||
| 192 | * repository manifest. | ||
| 193 | */ | ||
| 194 | ✗ | CommitProcessor::Result CommitProcessor::Process( | |
| 195 | const std::string &lease_path, const shash::Any &old_root_hash, | ||
| 196 | const shash::Any &new_root_hash, const RepositoryTag &tag, | ||
| 197 | int64_t lease_expiration, uint64_t *final_revision, bool direct_graft) { | ||
| 198 | ✗ | RepositoryTag final_tag = tag; | |
| 199 | // If tag_name is a generic tag, update the time stamp | ||
| 200 | ✗ | if (final_tag.HasGenericName()) { | |
| 201 | ✗ | final_tag.SetGenericName(); | |
| 202 | } | ||
| 203 | |||
| 204 | ✗ | LogCvmfs(kLogReceiver, kLogSyslog, | |
| 205 | "CommitProcessor - lease_path: %s, old hash: %s, new hash: %s, " | ||
| 206 | "tag_name: %s, tag_description: %s", | ||
| 207 | ✗ | lease_path.c_str(), old_root_hash.ToString(true).c_str(), | |
| 208 | ✗ | new_root_hash.ToString(true).c_str(), final_tag.name().c_str(), | |
| 209 | ✗ | final_tag.description().c_str()); | |
| 210 | |||
| 211 | const std::vector<std::string> lease_path_tokens = SplitString(lease_path, | ||
| 212 | ✗ | '/'); | |
| 213 | |||
| 214 | ✗ | const std::string repo_name = lease_path_tokens.front(); | |
| 215 | |||
| 216 | ✗ | Params params; | |
| 217 | ✗ | if (!GetParamsFromFile(repo_name, ¶ms)) { | |
| 218 | ✗ | LogCvmfs( | |
| 219 | kLogReceiver, kLogSyslogErr, | ||
| 220 | "CommitProcessor - error: Could not get configuration parameters."); | ||
| 221 | ✗ | return kError; | |
| 222 | } | ||
| 223 | |||
| 224 | ✗ | const UniquePtr<ServerTool> server_tool(new ServerTool()); | |
| 225 | |||
| 226 | ✗ | if (!server_tool->InitDownloadManager(true, params.proxy)) { | |
| 227 | ✗ | LogCvmfs( | |
| 228 | kLogReceiver, kLogSyslogErr, | ||
| 229 | "CommitProcessor - error: Could not initialize the download manager"); | ||
| 230 | ✗ | return kError; | |
| 231 | } | ||
| 232 | |||
| 233 | ✗ | const std::string public_key = "/etc/cvmfs/keys/" + repo_name + ".pub"; | |
| 234 | ✗ | const std::string certificate = "/etc/cvmfs/keys/" + repo_name + ".crt"; | |
| 235 | ✗ | const std::string private_key = "/etc/cvmfs/keys/" + repo_name + ".key"; | |
| 236 | ✗ | if (!server_tool->InitSignatureManager(public_key, certificate, | |
| 237 | private_key)) { | ||
| 238 | ✗ | LogCvmfs( | |
| 239 | kLogReceiver, kLogSyslogErr, | ||
| 240 | "CommitProcessor - error: Could not initialize the signature manager"); | ||
| 241 | ✗ | return kError; | |
| 242 | } | ||
| 243 | |||
| 244 | ✗ | const shash::Any manifest_base_hash; | |
| 245 | const UniquePtr<manifest::Manifest> manifest_tgt( | ||
| 246 | server_tool->FetchRemoteManifest(params.stratum0, repo_name, | ||
| 247 | ✗ | manifest_base_hash)); | |
| 248 | |||
| 249 | // Current catalog from the gateway machine | ||
| 250 | ✗ | if (!manifest_tgt.IsValid()) { | |
| 251 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 252 | "CommitProcessor - error: Could not open repository manifest"); | ||
| 253 | ✗ | return kError; | |
| 254 | } | ||
| 255 | |||
| 256 | ✗ | LogCvmfs(kLogReceiver, kLogSyslog, | |
| 257 | "CommitProcessor - lease_path: %s, target root hash: %s", | ||
| 258 | lease_path.c_str(), | ||
| 259 | ✗ | manifest_tgt->catalog_hash().ToString(false).c_str()); | |
| 260 | |||
| 261 | |||
| 262 | ✗ | std::string cache_dir_; | |
| 263 | ✗ | if (params.use_local_cache) { | |
| 264 | ✗ | cache_dir_ = "/var/spool/cvmfs/" + repo_name + "/cache.server"; | |
| 265 | } | ||
| 266 | |||
| 267 | const std::string spooler_temp_dir = GetSpoolerTempDir( | ||
| 268 | ✗ | params.spooler_configuration); | |
| 269 | ✗ | assert(!spooler_temp_dir.empty()); | |
| 270 | ✗ | assert(MkdirDeep(spooler_temp_dir + "/receiver", 0755, true)); | |
| 271 | const std::string temp_dir_root = spooler_temp_dir | ||
| 272 | ✗ | + "/receiver/commit_processor"; | |
| 273 | |||
| 274 | ✗ | const PathString relative_lease_path = RemoveRepoName(PathString(lease_path)); | |
| 275 | |||
| 276 | ✗ | std::string new_manifest_path; | |
| 277 | ✗ | shash::Any new_manifest_hash; | |
| 278 | |||
| 279 | ✗ | if (direct_graft) { | |
| 280 | // -- Experimental DirectGraft fast path ---------------------------------- | ||
| 281 | // Grafts new_root_hash directly into the parent catalog at | ||
| 282 | // relative_lease_path via WritableCatalogManager::TryGraftNestedCatalog, | ||
| 283 | // bypassing DiffRec entirely. Only valid when lease_path points to a | ||
| 284 | // brand-new directory subtree. Reached only via the experimental dedicated | ||
| 285 | // kCommitGraft reactor request. | ||
| 286 | ✗ | LogCvmfs(kLogReceiver, kLogSyslog, | |
| 287 | "CommitProcessor - lease_path: %s, direct-graft path " | ||
| 288 | "(skipping DiffRec)", | ||
| 289 | lease_path.c_str()); | ||
| 290 | |||
| 291 | const UniquePtr<RaiiTempDir> graft_temp_dir( | ||
| 292 | ✗ | RaiiTempDir::Create(temp_dir_root)); | |
| 293 | ✗ | const std::string graft_temp = graft_temp_dir->dir(); | |
| 294 | |||
| 295 | ✗ | perf::StatisticsTemplate stats_tmpl("publish", statistics_); | |
| 296 | // Register the FsCounters (n_files_added, n_directories_added, etc.) that | ||
| 297 | // StorePublishStatistics expects. In the DiffRec path these are created by | ||
| 298 | // CatalogMergeTool::Run(); DirectGraft bypasses that, so we register them | ||
| 299 | // here. The values stay 0 -- accurate for a graft that adds a whole | ||
| 300 | // subtree atomically rather than individual file-level diffs. | ||
| 301 | ✗ | const perf::FsCounters fs_counters(stats_tmpl); | |
| 302 | const upload::SpoolerDefinition definition( | ||
| 303 | params.spooler_configuration, params.hash_alg, params.compression_alg, | ||
| 304 | ✗ | params.generate_legacy_bulk_chunks, params.use_file_chunking, | |
| 305 | params.min_chunk_size, params.avg_chunk_size, params.max_chunk_size, | ||
| 306 | ✗ | "dummy_token", "dummy_key"); | |
| 307 | const UniquePtr<upload::Spooler> spooler( | ||
| 308 | ✗ | upload::Spooler::Construct(definition, &stats_tmpl)); | |
| 309 | |||
| 310 | const UniquePtr<catalog::WritableCatalogManager> output_mgr( | ||
| 311 | new catalog::WritableCatalogManager( | ||
| 312 | ✗ | manifest_tgt->catalog_hash(), params.stratum0, graft_temp, | |
| 313 | ✗ | spooler.weak_ref(), server_tool->download_manager(), | |
| 314 | ✗ | params.enforce_limits, params.nested_kcatalog_limit, | |
| 315 | ✗ | params.root_kcatalog_limit, params.file_mbyte_limit, | |
| 316 | ✗ | statistics_, params.use_autocatalogs, params.max_weight, | |
| 317 | ✗ | params.min_weight, cache_dir_)); | |
| 318 | ✗ | if (!output_mgr->Init()) { | |
| 319 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 320 | "CommitProcessor - error: Could not initialize catalog manager " | ||
| 321 | "for direct-graft"); | ||
| 322 | ✗ | return kError; | |
| 323 | } | ||
| 324 | |||
| 325 | ✗ | if (new_root_hash.IsNull() | |
| 326 | ✗ | || new_root_hash.suffix != shash::kSuffixCatalog) { | |
| 327 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 328 | "CommitProcessor - error: DirectGraft requires a catalog hash"); | ||
| 329 | ✗ | return kMergeFailure; | |
| 330 | } | ||
| 331 | |||
| 332 | // Download new_root_hash to a temp file to obtain its compressed size. | ||
| 333 | // TryGraftNestedCatalog will download it again internally via | ||
| 334 | // LoadFreeCatalog; with a local cache that second fetch is a cheap cache | ||
| 335 | // hit. | ||
| 336 | const std::string catalog_url = | ||
| 337 | ✗ | params.stratum0 + "/data/" + new_root_hash.MakePath(); | |
| 338 | ✗ | const std::string catalog_tmp = graft_temp + "/catalog_size"; | |
| 339 | { | ||
| 340 | ✗ | cvmfs::PathSink catalog_sink(catalog_tmp); | |
| 341 | ✗ | const shash::Any expected = new_root_hash; | |
| 342 | // Fetch the compressed object verbatim: the nested catalog reference | ||
| 343 | // stores the compressed CAS object size, while DownloadManager's normal | ||
| 344 | // catalog path (compressed=true) writes the decompressed SQLite file. | ||
| 345 | download::JobInfo dl_job(&catalog_url, false, false, &expected, | ||
| 346 | ✗ | &catalog_sink); | |
| 347 | const download::Failures dl_ret = | ||
| 348 | ✗ | server_tool->download_manager()->Fetch(&dl_job); | |
| 349 | ✗ | if (dl_ret != download::kFailOk) { | |
| 350 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 351 | "CommitProcessor - error: failed to download catalog %s " | ||
| 352 | "for size probe (%d)", | ||
| 353 | catalog_url.c_str(), static_cast<int>(dl_ret)); | ||
| 354 | ✗ | unlink(catalog_tmp.c_str()); | |
| 355 | ✗ | return kError; | |
| 356 | } | ||
| 357 | ✗ | } // PathSink destructor closes the file here | |
| 358 | ✗ | const int64_t catalog_size = GetFileSize(catalog_tmp); | |
| 359 | ✗ | unlink(catalog_tmp.c_str()); | |
| 360 | ✗ | if (catalog_size < 0) { | |
| 361 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 362 | "CommitProcessor - error: cannot stat downloaded catalog %s", | ||
| 363 | catalog_url.c_str()); | ||
| 364 | ✗ | return kError; | |
| 365 | } | ||
| 366 | |||
| 367 | // Graft: inserts the nested catalog reference into the parent catalog | ||
| 368 | // and propagates the directory entry + counters upward. | ||
| 369 | ✗ | if (!output_mgr->TryGraftNestedCatalog( | |
| 370 | ✗ | relative_lease_path.ToString(), new_root_hash, | |
| 371 | static_cast<uint64_t>(catalog_size))) { | ||
| 372 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 373 | "CommitProcessor - error: DirectGraft validation failed for " | ||
| 374 | "lease_path: %s", | ||
| 375 | lease_path.c_str()); | ||
| 376 | ✗ | return kMergeFailure; | |
| 377 | } | ||
| 378 | |||
| 379 | // Commit updates manifest_tgt in-place (new root hash, revision++, etc.) | ||
| 380 | ✗ | if (!output_mgr->Commit(false, 0, manifest_tgt.weak_ref())) { | |
| 381 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 382 | "CommitProcessor - error: Could not commit grafted catalog"); | ||
| 383 | ✗ | return kMergeFailure; | |
| 384 | } | ||
| 385 | |||
| 386 | // Export the updated manifest to a temp file for CreateNewTag/SigningTool. | ||
| 387 | ✗ | new_manifest_path = CreateTempPath(temp_dir_root, 0600); | |
| 388 | ✗ | if (!manifest_tgt->Export(new_manifest_path)) { | |
| 389 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 390 | "CommitProcessor - error: Could not export manifest after graft"); | ||
| 391 | ✗ | return kError; | |
| 392 | } | ||
| 393 | ✗ | new_manifest_hash = manifest_tgt->catalog_hash(); | |
| 394 | ✗ | *final_revision = manifest_tgt->revision(); | |
| 395 | |||
| 396 | ✗ | } else { | |
| 397 | // -- Standard DiffRec path via CatalogMergeTool -------------------------- | ||
| 398 | ✗ | LogCvmfs(kLogReceiver, kLogSyslog, | |
| 399 | "CommitProcessor - lease_path: %s, merging catalogs", | ||
| 400 | lease_path.c_str()); | ||
| 401 | |||
| 402 | CatalogMergeTool<catalog::WritableCatalogManager, | ||
| 403 | catalog::SimpleCatalogManager> | ||
| 404 | merge_tool(params.stratum0, old_root_hash, new_root_hash, | ||
| 405 | relative_lease_path, temp_dir_root, | ||
| 406 | server_tool->download_manager(), manifest_tgt.weak_ref(), | ||
| 407 | ✗ | statistics_, cache_dir_); | |
| 408 | ✗ | if (!merge_tool.Init()) { | |
| 409 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 410 | "Error: Could not initialize the catalog merge tool"); | ||
| 411 | ✗ | return kError; | |
| 412 | } | ||
| 413 | ✗ | if (!merge_tool.Run(params, &new_manifest_path, &new_manifest_hash, | |
| 414 | final_revision)) { | ||
| 415 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 416 | "CommitProcessor - error: Catalog merge failed"); | ||
| 417 | ✗ | return kMergeFailure; | |
| 418 | } | ||
| 419 | } | ||
| 420 | |||
| 421 | const UniquePtr<RaiiTempDir> raii_temp_dir( | ||
| 422 | ✗ | RaiiTempDir::Create(temp_dir_root)); | |
| 423 | ✗ | const std::string temp_dir = raii_temp_dir->dir(); | |
| 424 | |||
| 425 | // Determine the cutoff below which outdated auto-generated tags are removed. | ||
| 426 | // A value sent by the publisher (already an absolute timestamp) takes | ||
| 427 | // precedence over the gateway's local CVMFS_AUTO_TAG_TIMESPAN configuration, | ||
| 428 | // which is a relative "<N> <unit> ago" timespan resolved here. 0 disables | ||
| 429 | // cleanup. | ||
| 430 | ✗ | time_t auto_tag_threshold = final_tag.auto_tag_threshold(); | |
| 431 | ✗ | if (auto_tag_threshold <= 0 && !params.auto_tag_timespan.empty()) { | |
| 432 | ✗ | auto_tag_threshold = ParseRelativeTimespan(params.auto_tag_timespan, | |
| 433 | time(NULL)); | ||
| 434 | ✗ | if (auto_tag_threshold <= 0) { | |
| 435 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 436 | "CommitProcessor - warning: could not parse " | ||
| 437 | "CVMFS_AUTO_TAG_TIMESPAN '%s' (expected \"<N> <unit> ago\")", | ||
| 438 | params.auto_tag_timespan.c_str()); | ||
| 439 | } | ||
| 440 | } | ||
| 441 | ✗ | if (auto_tag_threshold > 0) { | |
| 442 | ✗ | LogCvmfs(kLogReceiver, kLogSyslog, | |
| 443 | "CommitProcessor - lease_path: %s, cleaning up auto tags " | ||
| 444 | "older than %ld", | ||
| 445 | lease_path.c_str(), static_cast<long>(auto_tag_threshold)); | ||
| 446 | } | ||
| 447 | |||
| 448 | // EditTags adds the tag for the new revision, removes any tags requested by | ||
| 449 | // `cvmfs_server tag -r`, and, when a cleanup threshold is set, removes the | ||
| 450 | // outdated auto tags -- all in the same history transaction. A failure here | ||
| 451 | // is fatal: leaving the new revision untagged (or silently keeping stale | ||
| 452 | // tags) would be worse than aborting the commit. | ||
| 453 | // | ||
| 454 | // Only real publish commits should rotate the undo tags (`trunk` and | ||
| 455 | // `trunk-previous`). Pure gateway tag edits reuse the current root hash as | ||
| 456 | // both old and new hash, so updating undo tags there would incorrectly make | ||
| 457 | // `trunk-previous` point at the current HEAD. | ||
| 458 | ✗ | const bool maintain_undo_tags = (old_root_hash != new_root_hash); | |
| 459 | ✗ | if (!EditTags(final_tag, repo_name, params, temp_dir, new_manifest_path, | |
| 460 | public_key, params.proxy, auto_tag_threshold, | ||
| 461 | maintain_undo_tags)) { | ||
| 462 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, "Error editing tags (add: '%s')", | |
| 463 | ✗ | final_tag.name().c_str()); | |
| 464 | ✗ | return kError; | |
| 465 | } | ||
| 466 | |||
| 467 | // Re-check the lease right before the final, repository-modifying step. The | ||
| 468 | // catalog merge and object upload above can be slow, during which the lease | ||
| 469 | // may have expired and an overlapping lease may have been granted to another | ||
| 470 | // publisher. If the deadline has passed we must not publish: the objects | ||
| 471 | // uploaded above stay unreferenced and are reclaimed by garbage collection. | ||
| 472 | // lease_expiration already has the gateway's configured safety margin | ||
| 473 | // subtracted, so this is a plain comparison against the current time. | ||
| 474 | ✗ | if (static_cast<int64_t>(time(NULL)) >= lease_expiration) { | |
| 475 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 476 | "CommitProcessor - lease_path: %s, lease expired during commit; " | ||
| 477 | "skipping publication, uploaded objects will be " | ||
| 478 | "garbage-collected", | ||
| 479 | lease_path.c_str()); | ||
| 480 | ✗ | return kLeaseExpired; | |
| 481 | } | ||
| 482 | |||
| 483 | ✗ | LogCvmfs(kLogReceiver, kLogSyslog, | |
| 484 | "CommitProcessor - lease_path: %s, signing manifest", | ||
| 485 | lease_path.c_str()); | ||
| 486 | |||
| 487 | // Add C_N root catalog hash to reflog through SigningTool, | ||
| 488 | // so garbage collector can later delete it. | ||
| 489 | ✗ | std::vector<shash::Any> reflog_catalogs; | |
| 490 | ✗ | reflog_catalogs.push_back(new_root_hash); | |
| 491 | |||
| 492 | ✗ | SigningTool signing_tool(server_tool.weak_ref()); | |
| 493 | ✗ | const SigningTool::Result res = signing_tool.Run( | |
| 494 | new_manifest_path, params.stratum0, params.spooler_configuration, | ||
| 495 | temp_dir, certificate, private_key, repo_name, "", "", | ||
| 496 | ✗ | "/var/spool/cvmfs/" + repo_name + "/reflog.chksum", params.proxy, | |
| 497 | ✗ | params.garbage_collection, false, false, reflog_catalogs); | |
| 498 | ✗ | switch (res) { | |
| 499 | ✗ | case SigningTool::kReflogChecksumMissing: | |
| 500 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 501 | "CommitProcessor - error: missing reflog.chksum"); | ||
| 502 | ✗ | return kMissingReflog; | |
| 503 | ✗ | case SigningTool::kReflogMissing: | |
| 504 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 505 | "CommitProcessor - error: missing reflog"); | ||
| 506 | ✗ | return kMissingReflog; | |
| 507 | ✗ | case SigningTool::kError: | |
| 508 | case SigningTool::kInitError: | ||
| 509 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 510 | "CommitProcessor - error: signing manifest"); | ||
| 511 | ✗ | return kError; | |
| 512 | ✗ | case SigningTool::kSuccess: | |
| 513 | ✗ | LogCvmfs(kLogReceiver, kLogSyslog, | |
| 514 | "CommitProcessor - lease_path: %s, success.", | ||
| 515 | lease_path.c_str()); | ||
| 516 | } | ||
| 517 | |||
| 518 | ✗ | LogCvmfs(kLogReceiver, kLogSyslog, | |
| 519 | "CommitProcessor - lease_path: %s, new root hash: %s", | ||
| 520 | ✗ | lease_path.c_str(), new_manifest_hash.ToString(false).c_str()); | |
| 521 | |||
| 522 | // Ensure CVMFS_ROOT_HASH is not set in | ||
| 523 | // /var/spool/cvmfs/<REPO_NAME>/client.local | ||
| 524 | ✗ | const std::string fname = "/var/spool/cvmfs/" + repo_name + "/client.local"; | |
| 525 | ✗ | if (truncate(fname.c_str(), 0) < 0) { | |
| 526 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, "Could not truncate %s\n", | |
| 527 | fname.c_str()); | ||
| 528 | ✗ | return kError; | |
| 529 | } | ||
| 530 | |||
| 531 | ✗ | StatisticsDatabase *stats_db = StatisticsDatabase::OpenStandardDB(repo_name); | |
| 532 | ✗ | if (stats_db != NULL) { | |
| 533 | ✗ | if (!stats_db->StorePublishStatistics(statistics_, start_time_, true)) { | |
| 534 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 535 | "Could not store publish statistics"); | ||
| 536 | } | ||
| 537 | ✗ | if (params.upload_stats_db) { | |
| 538 | const upload::SpoolerDefinition sd(params.spooler_configuration, | ||
| 539 | ✗ | shash::kAny); | |
| 540 | ✗ | upload::Spooler *spooler = upload::Spooler::Construct(sd); | |
| 541 | ✗ | if (!stats_db->UploadStatistics(spooler)) { | |
| 542 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 543 | "Could not upload statistics DB to upstream storage"); | ||
| 544 | } | ||
| 545 | ✗ | delete spooler; | |
| 546 | } | ||
| 547 | ✗ | delete stats_db; | |
| 548 | |||
| 549 | } else { | ||
| 550 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, "Could not open statistics DB"); | |
| 551 | } | ||
| 552 | |||
| 553 | ✗ | return kSuccess; | |
| 554 | } | ||
| 555 | |||
| 556 | 9 | void CommitProcessor::SetStatistics(perf::Statistics *st, | |
| 557 | const std::string &start_time) { | ||
| 558 | 9 | statistics_ = st; | |
| 559 |
3/6✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 9 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 9 times.
✗ Branch 10 not taken.
|
9 | statistics_->Register("publish.revision", ""); |
| 560 | 9 | start_time_ = start_time; | |
| 561 | 9 | } | |
| 562 | |||
| 563 | } // namespace receiver | ||
| 564 |