GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/swissknife_sync.cc
Date: 2024-04-28 02:33:07
Exec Total Coverage
Lines: 0 471 0.0%
Branches: 0 318 0.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System
3 *
4 * This tool figures out the changes made to a cvmfs repository by means
5 * of a union file system mounted on top of a cvmfs volume.
6 * We take all three volumes (namely union, overlay and repository) into
7 * account to sync the changes back into the repository.
8 *
9 * On the repository side we have a catalogs directory that mimics the
10 * shadow directory structure and stores compressed and uncompressed
11 * versions of all catalogs. The raw data are stored in the data
12 * subdirectory in zlib-compressed form. They are named with their SHA-1
13 * hash of the compressed file (like in CVMFS client cache, but with a
14 * 2-level cache hierarchy). Symlinks from the catalog directory to the
15 * data directory form the connection. If necessary, add a .htaccess file
16 * to allow Apache to follow the symlinks.
17 */
18
19 // NOLINTNEXTLINE
20 #define _FILE_OFFSET_BITS 64
21 // NOLINTNEXTLINE
22 #define __STDC_FORMAT_MACROS
23
24 #include "swissknife_sync.h"
25 #include "cvmfs_config.h"
26
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <glob.h>
30 #include <inttypes.h>
31 #include <limits.h>
32 #include <sys/capability.h>
33
34 #include <cstdio>
35 #include <cstdlib>
36 #include <string>
37 #include <vector>
38
39 #include "catalog_mgr_ro.h"
40 #include "catalog_mgr_rw.h"
41 #include "catalog_virtual.h"
42 #include "manifest.h"
43 #include "monitor.h"
44 #include "network/download.h"
45 #include "path_filters/dirtab.h"
46 #include "reflog.h"
47 #include "sanitizer.h"
48 #include "statistics.h"
49 #include "statistics_database.h"
50 #include "swissknife_capabilities.h"
51 #include "sync_mediator.h"
52 #include "sync_union.h"
53 #include "sync_union_aufs.h"
54 #include "sync_union_overlayfs.h"
55 #include "util/logging.h"
56 #include "util/platform.h"
57 #include "util/string.h"
58
59 using namespace std; // NOLINT
60
61 bool swissknife::CommandSync::CheckParams(const SyncParameters &p) {
62 if (!DirectoryExists(p.dir_scratch)) {
63 PrintError("overlay (copy on write) directory does not exist");
64 return false;
65 }
66 if (!DirectoryExists(p.dir_union)) {
67 PrintError("union volume does not exist");
68 return false;
69 }
70 if (!DirectoryExists(p.dir_rdonly)) {
71 PrintError("cvmfs read/only repository does not exist");
72 return false;
73 }
74 if (p.stratum0 == "") {
75 PrintError("Stratum0 url missing");
76 return false;
77 }
78
79 if (p.manifest_path == "") {
80 PrintError("manifest output required");
81 return false;
82 }
83 if (!DirectoryExists(p.dir_temp)) {
84 PrintError("data store directory does not exist");
85 return false;
86 }
87
88 if (p.min_file_chunk_size >= p.avg_file_chunk_size ||
89 p.avg_file_chunk_size >= p.max_file_chunk_size) {
90 PrintError("file chunk size values are not sane");
91 return false;
92 }
93
94 if (HasPrefix(p.spooler_definition, "gw", false)) {
95 if (p.session_token_file.empty()) {
96 PrintError(
97 "Session token file has to be provided "
98 "when upstream type is gw.");
99 return false;
100 }
101 }
102
103 return true;
104 }
105
106 int swissknife::CommandCreate::Main(const swissknife::ArgumentList &args) {
107 const string manifest_path = *args.find('o')->second;
108 const string dir_temp = *args.find('t')->second;
109 const string spooler_definition = *args.find('r')->second;
110 const string repo_name = *args.find('n')->second;
111 const string reflog_chksum_path = *args.find('R')->second;
112 if (args.find('l') != args.end()) {
113 unsigned log_level =
114 kLogLevel0 << String2Uint64(*args.find('l')->second);
115 if (log_level > kLogNone) {
116 LogCvmfs(kLogCvmfs, kLogStderr, "invalid log level");
117 return 1;
118 }
119 SetLogVerbosity(static_cast<LogLevels>(log_level));
120 }
121 shash::Algorithms hash_algorithm = shash::kSha1;
122 if (args.find('a') != args.end()) {
123 hash_algorithm = shash::ParseHashAlgorithm(*args.find('a')->second);
124 if (hash_algorithm == shash::kAny) {
125 PrintError("unknown hash algorithm");
126 return 1;
127 }
128 }
129
130 const bool volatile_content = (args.count('v') > 0);
131 const bool garbage_collectable = (args.count('z') > 0);
132 std::string voms_authz;
133 if (args.find('V') != args.end()) {
134 voms_authz = *args.find('V')->second;
135 }
136
137 const upload::SpoolerDefinition sd(spooler_definition, hash_algorithm,
138 zlib::kZlibDefault);
139 UniquePtr<upload::Spooler> spooler(upload::Spooler::Construct(sd));
140 assert(spooler.IsValid());
141
142 UniquePtr<manifest::Manifest> manifest(
143 catalog::WritableCatalogManager::CreateRepository(
144 dir_temp, volatile_content, voms_authz, spooler.weak_ref()));
145 if (!manifest.IsValid()) {
146 PrintError("Failed to create new repository");
147 return 1;
148 }
149
150 UniquePtr<manifest::Reflog> reflog(CreateEmptyReflog(dir_temp, repo_name));
151 if (!reflog.IsValid()) {
152 PrintError("Failed to create fresh Reflog");
153 return 1;
154 }
155
156 reflog->DropDatabaseFileOwnership();
157 string reflog_path = reflog->database_file();
158 reflog.Destroy();
159 shash::Any reflog_hash(hash_algorithm);
160 manifest::Reflog::HashDatabase(reflog_path, &reflog_hash);
161 spooler->UploadReflog(reflog_path);
162 spooler->WaitForUpload();
163 unlink(reflog_path.c_str());
164 if (spooler->GetNumberOfErrors()) {
165 LogCvmfs(kLogCvmfs, kLogStderr, "Failed to upload reflog");
166 return 4;
167 }
168 assert(!reflog_chksum_path.empty());
169 manifest::Reflog::WriteChecksum(reflog_chksum_path, reflog_hash);
170
171 // set optional manifest fields
172 const bool needs_bootstrap_shortcuts = !voms_authz.empty();
173 manifest->set_garbage_collectability(garbage_collectable);
174 manifest->set_has_alt_catalog_path(needs_bootstrap_shortcuts);
175
176 if (!manifest->Export(manifest_path)) {
177 PrintError("Failed to create new repository");
178 return 5;
179 }
180
181 return 0;
182 }
183
184 int swissknife::CommandUpload::Main(const swissknife::ArgumentList &args) {
185 const string source = *args.find('i')->second;
186 const string dest = *args.find('o')->second;
187 const string spooler_definition = *args.find('r')->second;
188 shash::Algorithms hash_algorithm = shash::kSha1;
189 if (args.find('a') != args.end()) {
190 hash_algorithm = shash::ParseHashAlgorithm(*args.find('a')->second);
191 if (hash_algorithm == shash::kAny) {
192 PrintError("unknown hash algorithm");
193 return 1;
194 }
195 }
196
197 const upload::SpoolerDefinition sd(spooler_definition, hash_algorithm);
198 upload::Spooler *spooler = upload::Spooler::Construct(sd);
199 assert(spooler);
200 spooler->Upload(source, dest);
201 spooler->WaitForUpload();
202
203 if (spooler->GetNumberOfErrors() > 0) {
204 LogCvmfs(kLogCatalog, kLogStderr, "failed to upload %s", source.c_str());
205 return 1;
206 }
207
208 delete spooler;
209
210 return 0;
211 }
212
213 int swissknife::CommandPeek::Main(const swissknife::ArgumentList &args) {
214 const string file_to_peek = *args.find('d')->second;
215 const string spooler_definition = *args.find('r')->second;
216
217 // Hash doesn't matter
218 const upload::SpoolerDefinition sd(spooler_definition, shash::kAny);
219 upload::Spooler *spooler = upload::Spooler::Construct(sd);
220 assert(spooler);
221 const bool success = spooler->Peek(file_to_peek);
222
223 if (spooler->GetNumberOfErrors() > 0) {
224 LogCvmfs(kLogCatalog, kLogStderr, "failed to peek for %s",
225 file_to_peek.c_str());
226 return 2;
227 }
228 if (!success) {
229 LogCvmfs(kLogCatalog, kLogStdout, "%s not found", file_to_peek.c_str());
230 return 1;
231 }
232 LogCvmfs(kLogCatalog, kLogStdout, "%s available", file_to_peek.c_str());
233
234 delete spooler;
235
236 return 0;
237 }
238
239 int swissknife::CommandRemove::Main(const ArgumentList &args) {
240 const string file_to_delete = *args.find('o')->second;
241 const string spooler_definition = *args.find('r')->second;
242
243 // Hash doesn't matter
244 const upload::SpoolerDefinition sd(spooler_definition, shash::kAny);
245 upload::Spooler *spooler = upload::Spooler::Construct(sd);
246 assert(spooler);
247 spooler->RemoveAsync(file_to_delete);
248 spooler->WaitForUpload();
249
250 if (spooler->GetNumberOfErrors() > 0) {
251 LogCvmfs(kLogCatalog, kLogStderr, "failed to delete %s",
252 file_to_delete.c_str());
253 return 1;
254 }
255
256 delete spooler;
257
258 return 0;
259 }
260
261 int swissknife::CommandApplyDirtab::Main(const ArgumentList &args) {
262 const string dirtab_file = *args.find('d')->second;
263 union_dir_ = MakeCanonicalPath(*args.find('u')->second);
264 scratch_dir_ = MakeCanonicalPath(*args.find('s')->second);
265 const shash::Any base_hash = shash::MkFromHexPtr(
266 shash::HexPtr(*args.find('b')->second), shash::kSuffixCatalog);
267 const string stratum0 = *args.find('w')->second;
268 const string dir_temp = *args.find('t')->second;
269 verbose_ = (args.find('x') != args.end());
270
271 // check if there is a dirtab file
272 if (!FileExists(dirtab_file)) {
273 LogCvmfs(kLogCatalog, kLogVerboseMsg,
274 "Didn't find a dirtab at '%s'. Skipping...", dirtab_file.c_str());
275 return 0;
276 }
277
278 // parse dirtab file
279 catalog::Dirtab *dirtab = catalog::Dirtab::Create(dirtab_file);
280 if (!dirtab->IsValid()) {
281 LogCvmfs(kLogCatalog, kLogStderr, "Invalid or not readable dirtab '%s'",
282 dirtab_file.c_str());
283 return 1;
284 }
285 LogCvmfs(kLogCatalog, kLogVerboseMsg, "Found %lu rules in dirtab '%s'",
286 dirtab->RuleCount(), dirtab_file.c_str());
287
288 // initialize catalog infrastructure
289 const bool auto_manage_catalog_files = true;
290 const bool follow_redirects = (args.count('L') > 0);
291 const string proxy = (args.count('@') > 0) ? *args.find('@')->second : "";
292 if (!InitDownloadManager(follow_redirects, proxy)) {
293 return 1;
294 }
295 catalog::SimpleCatalogManager catalog_manager(
296 base_hash, stratum0, dir_temp, download_manager(), statistics(),
297 auto_manage_catalog_files);
298 catalog_manager.Init();
299
300 vector<string> new_nested_catalogs;
301 DetermineNestedCatalogCandidates(*dirtab, &catalog_manager,
302 &new_nested_catalogs);
303 const bool success = CreateCatalogMarkers(new_nested_catalogs);
304 delete dirtab;
305
306 return (success) ? 0 : 1;
307 }
308
309
310 namespace {
311
312 // Overwrite directory traversal in the globbing in order to avoid breaking out
313 // the repository tree
314
315 std::string *g_glob_uniondir = NULL;
316
317 bool GlobCheckPath(const char *name) {
318 char resolved_cstr[PATH_MAX];
319 char *retval = realpath(name, resolved_cstr);
320 if (retval == NULL) return false;
321
322 std::string resolved(resolved_cstr);
323 if (resolved == *g_glob_uniondir) return true;
324 if (!HasPrefix(resolved, (*g_glob_uniondir) + "/", false /*ignore_case*/)) {
325 errno = EACCES;
326 return false;
327 }
328 return true;
329 }
330
331 void *GlobOpendir(const char *name) {
332 if (!GlobCheckPath(name)) return NULL;
333 return opendir(name);
334 }
335
336 void GlobClosedir(void *dirp) {
337 closedir(static_cast<DIR *>(dirp));
338 }
339
340 struct dirent *GlobReaddir(void *dirp) {
341 return readdir(static_cast<DIR *>(dirp));
342 }
343
344 int GlobLstat(const char *name, struct stat *st) {
345 if (!GlobCheckPath(name)) return -1;
346 return lstat(name, st);
347 }
348
349 int GlobStat(const char *name, struct stat *st) {
350 if (!GlobCheckPath(name)) return -1;
351 return stat(name, st);
352 }
353
354
355 } // anonymous namespace
356
357 void swissknife::CommandApplyDirtab::DetermineNestedCatalogCandidates(
358 const catalog::Dirtab &dirtab,
359 catalog::SimpleCatalogManager *catalog_manager,
360 vector<string> *nested_catalog_candidates) {
361 // find possible new nested catalog locations
362 const catalog::Dirtab::Rules &lookup_rules = dirtab.positive_rules();
363 catalog::Dirtab::Rules::const_iterator i = lookup_rules.begin();
364 const catalog::Dirtab::Rules::const_iterator iend = lookup_rules.end();
365 for (; i != iend; ++i) {
366 assert(!i->is_negation);
367
368 // run a glob using the current dirtab rule on the current repository
369 // state
370 const std::string &glob_string = i->pathspec.GetGlobString();
371 const std::string &glob_string_abs = union_dir_ + glob_string;
372 const int glob_flags = GLOB_ONLYDIR | GLOB_NOSORT | GLOB_PERIOD |
373 GLOB_ALTDIRFUNC;
374 glob_t glob_res;
375 g_glob_uniondir = new std::string(union_dir_);
376 glob_res.gl_opendir = GlobOpendir;
377 glob_res.gl_readdir = GlobReaddir;
378 glob_res.gl_closedir = GlobClosedir;
379 glob_res.gl_lstat = GlobLstat;
380 glob_res.gl_stat = GlobStat;
381 const int glob_retval =
382 glob(glob_string_abs.c_str(), glob_flags, NULL, &glob_res);
383 delete g_glob_uniondir;
384 g_glob_uniondir = NULL;
385
386 if (glob_retval == 0) {
387 // found some candidates... filtering by cvmfs catalog structure
388 LogCvmfs(kLogCatalog, kLogDebug, "Found %lu entries for pathspec (%s)",
389 glob_res.gl_pathc, glob_string.c_str());
390 FilterCandidatesFromGlobResult(dirtab, glob_res.gl_pathv,
391 glob_res.gl_pathc, catalog_manager,
392 nested_catalog_candidates);
393 } else if (glob_retval == GLOB_NOMATCH) {
394 LogCvmfs(kLogCvmfs, kLogStderr, "WARNING: cannot apply pathspec %s",
395 glob_string.c_str());
396 } else {
397 LogCvmfs(kLogCvmfs, kLogStderr, "Failed to run glob matching (%s)",
398 glob_string.c_str());
399 }
400
401 globfree(&glob_res);
402 }
403 }
404
405 void swissknife::CommandApplyDirtab::FilterCandidatesFromGlobResult(
406 const catalog::Dirtab &dirtab, char **paths, const size_t npaths,
407 catalog::SimpleCatalogManager *catalog_manager,
408 std::vector<std::string> *nested_catalog_candidates) {
409 // go through the paths produced by glob() and filter them
410 for (size_t i = 0; i < npaths; ++i) {
411 // process candidate paths
412 const std::string candidate(paths[i]);
413 const std::string candidate_rel = candidate.substr(union_dir_.size());
414
415 // check if path points to a directory
416 platform_stat64 candidate_info;
417 const int lstat_retval = platform_lstat(candidate.c_str(), &candidate_info);
418 if (lstat_retval != 0) {
419 LogCvmfs(kLogCatalog, kLogDebug | kLogStderr | kLogSyslogErr,
420 "Error in processing .cvmfsdirtab: cannot access %s (%d)",
421 candidate.c_str(), errno);
422 abort();
423 }
424 assert(lstat_retval == 0);
425 if (!S_ISDIR(candidate_info.st_mode)) {
426 // The GLOB_ONLYDIR flag is only a hint, non-directories can still be
427 // returned
428 LogCvmfs(kLogCatalog, kLogDebug,
429 "The '%s' dirtab entry does not point to a directory "
430 "but to a file or a symbolic link",
431 candidate_rel.c_str());
432 continue;
433 }
434
435 // check if the path is a meta-directory (. or ..)
436 assert(candidate_rel.size() >= 2);
437 if (candidate_rel.substr(candidate_rel.size() - 2) == "/." ||
438 candidate_rel.substr(candidate_rel.size() - 3) == "/..") {
439 continue;
440 }
441
442 // check that the path isn't excluded in the dirtab
443 if (dirtab.IsOpposing(candidate_rel)) {
444 LogCvmfs(kLogCatalog, kLogDebug, "Candidate '%s' is excluded by dirtab",
445 candidate_rel.c_str());
446 continue;
447 }
448
449 // lookup the path in the catalog structure to find out if it already
450 // points to a nested catalog transition point. Furthermore it could be
451 // a new directory and thus not in any catalog yet.
452 catalog::DirectoryEntry dirent;
453 const bool lookup_success = catalog_manager->LookupPath(
454 candidate_rel, catalog::kLookupDefault, &dirent);
455 if (!lookup_success) {
456 LogCvmfs(kLogCatalog, kLogDebug,
457 "Didn't find '%s' in catalogs, could "
458 "be a new directory and nested catalog.",
459 candidate_rel.c_str());
460 nested_catalog_candidates->push_back(candidate);
461 } else if (!dirent.IsNestedCatalogMountpoint() &&
462 !dirent.IsNestedCatalogRoot()) {
463 LogCvmfs(kLogCatalog, kLogDebug,
464 "Found '%s' in catalogs but is not a "
465 "nested catalog yet.",
466 candidate_rel.c_str());
467 nested_catalog_candidates->push_back(candidate);
468 } else {
469 // check if the nested catalog marker is still there, we might need to
470 // recreate the catalog after manual marker removal
471 // Note: First we check if the parent directory shows up in the scratch
472 // space to verify that it was touched (copy-on-write)
473 // Otherwise we would force the cvmfs client behind the union
474 // file-
475 // system to (potentially) unnecessarily fetch catalogs
476 if (DirectoryExists(scratch_dir_ + candidate_rel) &&
477 !FileExists(union_dir_ + candidate_rel + "/.cvmfscatalog")) {
478 LogCvmfs(kLogCatalog, kLogStdout,
479 "WARNING: '%s' should be a nested "
480 "catalog according to the dirtab. "
481 "Recreating...",
482 candidate_rel.c_str());
483 nested_catalog_candidates->push_back(candidate);
484 } else {
485 LogCvmfs(kLogCatalog, kLogDebug,
486 "Found '%s' in catalogs and it already is a nested catalog.",
487 candidate_rel.c_str());
488 }
489 }
490 }
491 }
492
493 bool swissknife::CommandApplyDirtab::CreateCatalogMarkers(
494 const std::vector<std::string> &new_nested_catalogs) {
495 // go through the new nested catalog paths and create .cvmfscatalog markers
496 // where necessary
497 bool success = true;
498 std::vector<std::string>::const_iterator k = new_nested_catalogs.begin();
499 const std::vector<std::string>::const_iterator kend =
500 new_nested_catalogs.end();
501 for (; k != kend; ++k) {
502 assert(!k->empty() && k->size() > union_dir_.size());
503
504 // was the marker already created by hand?
505 const std::string marker_path = *k + "/.cvmfscatalog";
506 if (FileExists(marker_path)) {
507 continue;
508 }
509
510 // create a nested catalog marker
511 const mode_t mode = kDefaultFileMode;
512 const int fd = open(marker_path.c_str(), O_CREAT, mode);
513 if (fd < 0) {
514 LogCvmfs(kLogCvmfs, kLogStderr,
515 "Failed to create nested catalog marker "
516 "at '%s' (errno: %d)",
517 marker_path.c_str(), errno);
518 success = false;
519 continue;
520 }
521 close(fd);
522
523 // inform the user if requested
524 if (verbose_) {
525 LogCvmfs(kLogCvmfs, kLogStdout, "Auto-creating nested catalog in %s",
526 k->c_str());
527 }
528 }
529
530 return success;
531 }
532
533 struct chunk_arg {
534 chunk_arg(char param, size_t *save_to) : param(param), save_to(save_to) {}
535 char param;
536 size_t *save_to;
537 };
538
539 bool swissknife::CommandSync::ReadFileChunkingArgs(
540 const swissknife::ArgumentList &args, SyncParameters *params) {
541 typedef std::vector<chunk_arg> ChunkArgs;
542
543 // define where to store the value of which file chunk argument
544 ChunkArgs chunk_args;
545 chunk_args.push_back(chunk_arg('a', &params->avg_file_chunk_size));
546 chunk_args.push_back(chunk_arg('l', &params->min_file_chunk_size));
547 chunk_args.push_back(chunk_arg('h', &params->max_file_chunk_size));
548
549 // read the arguments
550 ChunkArgs::const_iterator i = chunk_args.begin();
551 ChunkArgs::const_iterator iend = chunk_args.end();
552 for (; i != iend; ++i) {
553 swissknife::ArgumentList::const_iterator arg = args.find(i->param);
554
555 if (arg != args.end()) {
556 size_t arg_value = static_cast<size_t>(String2Uint64(*arg->second));
557 if (arg_value > 0) {
558 *i->save_to = arg_value;
559 } else {
560 return false;
561 }
562 }
563 }
564
565 // check if argument values are sane
566 return true;
567 }
568
569 int swissknife::CommandSync::Main(const swissknife::ArgumentList &args) {
570 string start_time = GetGMTimestamp();
571
572 // Spawn monitoring process (watchdog)
573 std::string watchdog_dir = "/tmp";
574 char watchdog_path[PATH_MAX];
575 std::string timestamp = GetGMTimestamp("%Y.%m.%d-%H.%M.%S");
576 int path_size = snprintf(watchdog_path, sizeof(watchdog_path),
577 "%s/cvmfs-swissknife-sync-stacktrace.%s.%d",
578 watchdog_dir.c_str(), timestamp.c_str(), getpid());
579 assert(path_size > 0);
580 assert(path_size < PATH_MAX);
581 UniquePtr<Watchdog> watchdog(Watchdog::Create(NULL));
582 watchdog->Spawn(std::string(watchdog_path));
583
584 SyncParameters params;
585
586 // Initialization
587 params.dir_union = MakeCanonicalPath(*args.find('u')->second);
588 params.dir_scratch = MakeCanonicalPath(*args.find('s')->second);
589 params.dir_rdonly = MakeCanonicalPath(*args.find('c')->second);
590 params.dir_temp = MakeCanonicalPath(*args.find('t')->second);
591 params.base_hash = shash::MkFromHexPtr(shash::HexPtr(*args.find('b')->second),
592 shash::kSuffixCatalog);
593 params.stratum0 = *args.find('w')->second;
594 params.manifest_path = *args.find('o')->second;
595 params.spooler_definition = *args.find('r')->second;
596
597 params.public_keys = *args.find('K')->second;
598 params.repo_name = *args.find('N')->second;
599
600 params.ttl_seconds = catalog::Catalog::kDefaultTTL;
601
602 if (args.find('f') != args.end())
603 params.union_fs_type = *args.find('f')->second;
604 if (args.find('A') != args.end()) params.is_balanced = true;
605 if (args.find('x') != args.end()) params.print_changeset = true;
606 if (args.find('y') != args.end()) params.dry_run = true;
607 if (args.find('m') != args.end()) params.mucatalogs = true;
608 if (args.find('i') != args.end()) params.ignore_xdir_hardlinks = true;
609 if (args.find('d') != args.end()) params.stop_for_catalog_tweaks = true;
610 if (args.find('V') != args.end()) params.voms_authz = true;
611 if (args.find('F') != args.end()) params.authz_file = *args.find('F')->second;
612 if (args.find('k') != args.end()) params.include_xattrs = true;
613 if (args.find('Y') != args.end()) params.external_data = true;
614 if (args.find('W') != args.end()) params.direct_io = true;
615 if (args.find('S') != args.end()) {
616 bool retval = catalog::VirtualCatalog::ParseActions(
617 *args.find('S')->second, &params.virtual_dir_actions);
618 if (!retval) {
619 LogCvmfs(kLogCvmfs, kLogStderr, "invalid virtual catalog options: %s",
620 args.find('S')->second->c_str());
621 return 1;
622 }
623 }
624 if (args.find('z') != args.end()) {
625 unsigned log_level =
626 1 << (kLogLevel0 + String2Uint64(*args.find('z')->second));
627 if (log_level > kLogNone) {
628 LogCvmfs(kLogCvmfs, kLogStderr, "invalid log level");
629 return 1;
630 }
631 SetLogVerbosity(static_cast<LogLevels>(log_level));
632 }
633
634 if (args.find('X') != args.end())
635 params.max_weight = String2Uint64(*args.find('X')->second);
636 if (args.find('M') != args.end())
637 params.min_weight = String2Uint64(*args.find('M')->second);
638
639 if (args.find('p') != args.end()) {
640 params.use_file_chunking = true;
641 if (!ReadFileChunkingArgs(args, &params)) {
642 PrintError("Failed to read file chunk size values");
643 return 2;
644 }
645 }
646 if (args.find('O') != args.end()) {
647 params.generate_legacy_bulk_chunks = true;
648 }
649 shash::Algorithms hash_algorithm = shash::kSha1;
650 if (args.find('e') != args.end()) {
651 hash_algorithm = shash::ParseHashAlgorithm(*args.find('e')->second);
652 if (hash_algorithm == shash::kAny) {
653 PrintError("unknown hash algorithm");
654 return 1;
655 }
656 }
657 if (args.find('Z') != args.end()) {
658 params.compression_alg =
659 zlib::ParseCompressionAlgorithm(*args.find('Z')->second);
660 }
661
662 if (args.find('E') != args.end()) params.enforce_limits = true;
663 if (args.find('Q') != args.end()) {
664 params.nested_kcatalog_limit = String2Uint64(*args.find('Q')->second);
665 } else {
666 params.nested_kcatalog_limit = SyncParameters::kDefaultNestedKcatalogLimit;
667 }
668 if (args.find('R') != args.end()) {
669 params.root_kcatalog_limit = String2Uint64(*args.find('R')->second);
670 } else {
671 params.root_kcatalog_limit = SyncParameters::kDefaultRootKcatalogLimit;
672 }
673 if (args.find('U') != args.end()) {
674 params.file_mbyte_limit = String2Uint64(*args.find('U')->second);
675 } else {
676 params.file_mbyte_limit = SyncParameters::kDefaultFileMbyteLimit;
677 }
678
679 if (args.find('v') != args.end()) {
680 sanitizer::IntegerSanitizer sanitizer;
681 if (!sanitizer.IsValid(*args.find('v')->second)) {
682 PrintError("invalid revision number");
683 return 1;
684 }
685 params.manual_revision = String2Uint64(*args.find('v')->second);
686 }
687
688 params.branched_catalog = args.find('B') != args.end();
689
690 if (args.find('q') != args.end()) {
691 params.max_concurrent_write_jobs = String2Uint64(*args.find('q')->second);
692 }
693
694 if (args.find('0') != args.end()) {
695 params.num_upload_tasks = String2Uint64(*args.find('0')->second);
696 }
697
698 if (args.find('T') != args.end()) {
699 params.ttl_seconds = String2Uint64(*args.find('T')->second);
700 }
701
702 if (args.find('g') != args.end()) {
703 params.ignore_special_files = true;
704 }
705
706 if (args.find('P') != args.end()) {
707 params.session_token_file = *args.find('P')->second;
708 }
709
710 if (args.find('H') != args.end()) {
711 params.key_file = *args.find('H')->second;
712 }
713
714 if (args.find('D') != args.end()) {
715 params.repo_tag.SetName(*args.find('D')->second);
716 }
717
718 if (args.find('J') != args.end()) {
719 params.repo_tag.SetDescription(*args.find('J')->second);
720 }
721
722 const bool upload_statsdb = (args.count('I') > 0);
723
724 if (!CheckParams(params)) return 2;
725 // This may fail, in which case a warning is printed and the process continues
726 ObtainDacReadSearchCapability();
727
728 perf::StatisticsTemplate publish_statistics("publish", this->statistics());
729
730 // Start spooler
731 upload::SpoolerDefinition spooler_definition(
732 params.spooler_definition, hash_algorithm, params.compression_alg,
733 params.generate_legacy_bulk_chunks, params.use_file_chunking,
734 params.min_file_chunk_size, params.avg_file_chunk_size,
735 params.max_file_chunk_size, params.session_token_file, params.key_file);
736 if (params.max_concurrent_write_jobs > 0) {
737 spooler_definition.number_of_concurrent_uploads =
738 params.max_concurrent_write_jobs;
739 }
740 spooler_definition.num_upload_tasks = params.num_upload_tasks;
741
742 upload::SpoolerDefinition spooler_definition_catalogs(
743 spooler_definition.Dup2DefaultCompression());
744
745 params.spooler = upload::Spooler::Construct(spooler_definition,
746 &publish_statistics);
747 if (NULL == params.spooler) return 3;
748 UniquePtr<upload::Spooler> spooler_catalogs(
749 upload::Spooler::Construct(spooler_definition_catalogs,
750 &publish_statistics));
751 if (!spooler_catalogs.IsValid()) return 3;
752
753 const bool follow_redirects = (args.count('L') > 0);
754 const string proxy = (args.count('@') > 0) ? *args.find('@')->second : "";
755 if (!InitDownloadManager(follow_redirects, proxy)) {
756 return 3;
757 }
758
759 if (!InitVerifyingSignatureManager(params.public_keys)) {
760 return 3;
761 }
762
763 /*
764 * Note: If the upstream is of type gateway, due to the possibility of
765 * concurrent release managers, it's possible to have a different local and
766 * remote root hashes. We proceed by loading the remote manifest but we give
767 * an empty base hash.
768 */
769 UniquePtr<manifest::Manifest> manifest;
770 if (params.branched_catalog) {
771 // Throw-away manifest
772 manifest = new manifest::Manifest(shash::Any(), 0, "");
773 } else if (params.virtual_dir_actions !=
774 catalog::VirtualCatalog::kActionNone) {
775 manifest = this->OpenLocalManifest(params.manifest_path);
776 params.base_hash = manifest->catalog_hash();
777 } else {
778 // TODO(jblomer): revert to params.base_hash if spooler driver type is not
779 // upload::SpoolerDefinition::Gateway
780 manifest =
781 FetchRemoteManifest(params.stratum0, params.repo_name, shash::Any());
782 }
783 if (!manifest.IsValid()) {
784 return 3;
785 }
786
787 StatisticsDatabase *stats_db =
788 StatisticsDatabase::OpenStandardDB(params.repo_name);
789
790 const std::string old_root_hash = manifest->catalog_hash().ToString(true);
791
792 catalog::WritableCatalogManager catalog_manager(
793 params.base_hash, params.stratum0, params.dir_temp,
794 spooler_catalogs.weak_ref(),
795 download_manager(), params.enforce_limits, params.nested_kcatalog_limit,
796 params.root_kcatalog_limit, params.file_mbyte_limit, statistics(),
797 params.is_balanced, params.max_weight, params.min_weight);
798 catalog_manager.Init();
799
800 publish::SyncMediator mediator(&catalog_manager, &params, publish_statistics);
801 LogCvmfs(kLogPublish, kLogStdout, "Processing changes...");
802
803 // Should be before the synchronization starts to avoid race of GetTTL with
804 // other sqlite operations
805 if ((params.ttl_seconds > 0) &&
806 ((params.ttl_seconds != catalog_manager.GetTTL()) ||
807 !catalog_manager.HasExplicitTTL())) {
808 LogCvmfs(kLogCvmfs, kLogStdout, "Setting repository TTL to %" PRIu64 "s",
809 params.ttl_seconds);
810 catalog_manager.SetTTL(params.ttl_seconds);
811 }
812
813 // Either real catalogs or virtual catalog
814 if (params.virtual_dir_actions == catalog::VirtualCatalog::kActionNone) {
815 publish::SyncUnion *sync;
816 if (params.union_fs_type == "overlayfs") {
817 sync = new publish::SyncUnionOverlayfs(
818 &mediator, params.dir_rdonly, params.dir_union, params.dir_scratch);
819 } else if (params.union_fs_type == "aufs") {
820 sync = new publish::SyncUnionAufs(&mediator, params.dir_rdonly,
821 params.dir_union, params.dir_scratch);
822 } else {
823 LogCvmfs(kLogCvmfs, kLogStderr, "unknown union file system: %s",
824 params.union_fs_type.c_str());
825 return 3;
826 }
827
828 if (!sync->Initialize()) {
829 LogCvmfs(kLogCvmfs, kLogStderr,
830 "Initialization of the synchronisation "
831 "engine failed");
832 return 4;
833 }
834
835 sync->Traverse();
836 } else {
837 assert(!manifest->history().IsNull());
838 catalog::VirtualCatalog virtual_catalog(
839 manifest.weak_ref(), download_manager(), &catalog_manager, &params);
840 virtual_catalog.Generate(params.virtual_dir_actions);
841 }
842
843 if (!params.authz_file.empty()) {
844 LogCvmfs(kLogCvmfs, kLogDebug,
845 "Adding contents of authz file %s to"
846 " root catalog.",
847 params.authz_file.c_str());
848 int fd = open(params.authz_file.c_str(), O_RDONLY);
849 if (fd == -1) {
850 LogCvmfs(kLogCvmfs, kLogStderr,
851 "Unable to open authz file (%s)"
852 "from the publication process: %s",
853 params.authz_file.c_str(), strerror(errno));
854 return 7;
855 }
856
857 std::string new_authz;
858 const bool read_successful = SafeReadToString(fd, &new_authz);
859 close(fd);
860
861 if (!read_successful) {
862 LogCvmfs(kLogCvmfs, kLogStderr, "Failed to read authz file (%s): %s",
863 params.authz_file.c_str(), strerror(errno));
864 return 8;
865 }
866
867 catalog_manager.SetVOMSAuthz(new_authz);
868 }
869
870 if (!mediator.Commit(manifest.weak_ref())) {
871 PrintError("something went wrong during sync");
872 if (!params.dry_run) {
873 stats_db->StorePublishStatistics(this->statistics(), start_time, false);
874 if (upload_statsdb) {
875 stats_db->UploadStatistics(params.spooler);
876 }
877 }
878 return 5;
879 }
880
881 perf::Counter *revision_counter = statistics()->Register("publish.revision",
882 "Published revision number");
883 revision_counter->Set(static_cast<int64_t>(
884 catalog_manager.GetRootCatalog()->revision()));
885
886 // finalize the spooler
887 LogCvmfs(kLogCvmfs, kLogStdout, "Wait for all uploads to finish");
888 params.spooler->WaitForUpload();
889 spooler_catalogs->WaitForUpload();
890 params.spooler->FinalizeSession(false);
891
892 LogCvmfs(kLogCvmfs, kLogStdout, "Exporting repository manifest");
893
894 // We call FinalizeSession(true) this time, to also trigger the commit
895 // operation on the gateway machine (if the upstream is of type "gw").
896
897 // Get the path of the new root catalog
898 const std::string new_root_hash = manifest->catalog_hash().ToString(true);
899
900 if (!spooler_catalogs->FinalizeSession(true, old_root_hash, new_root_hash,
901 params.repo_tag)) {
902 PrintError("Failed to commit transaction.");
903 if (!params.dry_run) {
904 stats_db->StorePublishStatistics(this->statistics(), start_time, false);
905 if (upload_statsdb) {
906 stats_db->UploadStatistics(params.spooler);
907 }
908 }
909 return 9;
910 }
911
912 if (!params.dry_run) {
913 stats_db->StorePublishStatistics(this->statistics(), start_time, true);
914 if (upload_statsdb) {
915 stats_db->UploadStatistics(params.spooler);
916 }
917 }
918
919 delete params.spooler;
920
921 if (!manifest->Export(params.manifest_path)) {
922 PrintError("Failed to create new repository");
923 return 6;
924 }
925
926 return 0;
927 }
928