CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
swissknife_sync.cc
Go to the documentation of this file.
1 
19 // NOLINTNEXTLINE
20 #define _FILE_OFFSET_BITS 64
21 // NOLINTNEXTLINE
22 #define __STDC_FORMAT_MACROS
23 
24 #include "swissknife_sync.h"
25 
26 #include <errno.h>
27 #include <fcntl.h>
28 #include <glob.h>
29 #include <inttypes.h>
30 #include <limits.h>
31 #include <sys/capability.h>
32 
33 #include <cstdio>
34 #include <cstdlib>
35 #include <string>
36 #include <vector>
37 
38 #include "catalog_mgr_ro.h"
39 #include "catalog_mgr_rw.h"
40 #include "catalog_virtual.h"
41 #include "manifest.h"
42 #include "monitor.h"
43 #include "network/download.h"
44 #include "path_filters/dirtab.h"
45 #include "reflog.h"
46 #include "sanitizer.h"
47 #include "statistics.h"
48 #include "statistics_database.h"
50 #include "sync_mediator.h"
51 #include "sync_union.h"
52 #include "sync_union_aufs.h"
53 #include "sync_union_overlayfs.h"
54 #include "util/logging.h"
55 #include "util/platform.h"
56 #include "util/string.h"
57 
58 using namespace std; // NOLINT
59 
61  if (!DirectoryExists(p.dir_scratch)) {
62  PrintError("overlay (copy on write) directory does not exist");
63  return false;
64  }
65  if (!DirectoryExists(p.dir_union)) {
66  PrintError("union volume does not exist");
67  return false;
68  }
69  if (!DirectoryExists(p.dir_rdonly)) {
70  PrintError("cvmfs read/only repository does not exist");
71  return false;
72  }
73  if (p.stratum0 == "") {
74  PrintError("Stratum0 url missing");
75  return false;
76  }
77 
78  if (p.manifest_path == "") {
79  PrintError("manifest output required");
80  return false;
81  }
82  if (!DirectoryExists(p.dir_temp)) {
83  PrintError("data store directory does not exist");
84  return false;
85  }
86 
89  PrintError("file chunk size values are not sane");
90  return false;
91  }
92 
93  if (HasPrefix(p.spooler_definition, "gw", false)) {
94  if (p.session_token_file.empty()) {
95  PrintError("Session token file has to be provided "
96  "when upstream type is gw.");
97  return false;
98  }
99  }
100 
101  return true;
102 }
103 
105  const string manifest_path = *args.find('o')->second;
106  const string dir_temp = *args.find('t')->second;
107  const string spooler_definition = *args.find('r')->second;
108  const string repo_name = *args.find('n')->second;
109  const string reflog_chksum_path = *args.find('R')->second;
110  if (args.find('l') != args.end()) {
111  unsigned log_level = kLogLevel0 << String2Uint64(*args.find('l')->second);
112  if (log_level > kLogNone) {
113  LogCvmfs(kLogCvmfs, kLogStderr, "invalid log level");
114  return 1;
115  }
116  SetLogVerbosity(static_cast<LogLevels>(log_level));
117  }
118  shash::Algorithms hash_algorithm = shash::kSha1;
119  if (args.find('a') != args.end()) {
120  hash_algorithm = shash::ParseHashAlgorithm(*args.find('a')->second);
121  if (hash_algorithm == shash::kAny) {
122  PrintError("unknown hash algorithm");
123  return 1;
124  }
125  }
126 
127  const bool volatile_content = (args.count('v') > 0);
128  const bool garbage_collectable = (args.count('z') > 0);
129  std::string voms_authz;
130  if (args.find('V') != args.end()) {
131  voms_authz = *args.find('V')->second;
132  }
133 
134  const upload::SpoolerDefinition sd(spooler_definition, hash_algorithm,
136  UniquePtr<upload::Spooler> spooler(upload::Spooler::Construct(sd));
137  assert(spooler.IsValid());
138 
141  dir_temp, volatile_content, voms_authz, spooler.weak_ref()));
142  if (!manifest.IsValid()) {
143  PrintError("Swissknife Sync: Failed to create new repository");
144  return 1;
145  }
146 
147  UniquePtr<manifest::Reflog> reflog(CreateEmptyReflog(dir_temp, repo_name));
148  if (!reflog.IsValid()) {
149  PrintError("Swissknife Sync: Failed to create fresh Reflog");
150  return 1;
151  }
152 
153  reflog->DropDatabaseFileOwnership();
154  string reflog_path = reflog->database_file();
155  reflog.Destroy();
156  shash::Any reflog_hash(hash_algorithm);
157  manifest::Reflog::HashDatabase(reflog_path, &reflog_hash);
158  spooler->UploadReflog(reflog_path);
159  spooler->WaitForUpload();
160  unlink(reflog_path.c_str());
161  if (spooler->GetNumberOfErrors()) {
162  LogCvmfs(kLogCvmfs, kLogStderr, "Swissknife Sync: Failed to upload reflog");
163  return 4;
164  }
165  assert(!reflog_chksum_path.empty());
166  manifest::Reflog::WriteChecksum(reflog_chksum_path, reflog_hash);
167 
168  // set optional manifest fields
169  const bool needs_bootstrap_shortcuts = !voms_authz.empty();
170  manifest->set_garbage_collectability(garbage_collectable);
171  manifest->set_has_alt_catalog_path(needs_bootstrap_shortcuts);
172 
173  if (!manifest->Export(manifest_path)) {
174  PrintError("Swissknife Sync: Failed to create new repository");
175  return 5;
176  }
177 
178  return 0;
179 }
180 
182  const string source = *args.find('i')->second;
183  const string dest = *args.find('o')->second;
184  const string spooler_definition = *args.find('r')->second;
185  shash::Algorithms hash_algorithm = shash::kSha1;
186  if (args.find('a') != args.end()) {
187  hash_algorithm = shash::ParseHashAlgorithm(*args.find('a')->second);
188  if (hash_algorithm == shash::kAny) {
189  PrintError("Swissknife Sync: Unknown hash algorithm");
190  return 1;
191  }
192  }
193 
194  const upload::SpoolerDefinition sd(spooler_definition, hash_algorithm);
195  upload::Spooler *spooler = upload::Spooler::Construct(sd);
196  assert(spooler);
197  spooler->Upload(source, dest);
198  spooler->WaitForUpload();
199 
200  if (spooler->GetNumberOfErrors() > 0) {
201  LogCvmfs(kLogCatalog, kLogStderr, "Swissknife Sync: failed to upload %s",
202  source.c_str());
203  return 1;
204  }
205 
206  delete spooler;
207 
208  return 0;
209 }
210 
212  const string file_to_peek = *args.find('d')->second;
213  const string spooler_definition = *args.find('r')->second;
214 
215  // Hash doesn't matter
216  const upload::SpoolerDefinition sd(spooler_definition, shash::kAny);
217  upload::Spooler *spooler = upload::Spooler::Construct(sd);
218  assert(spooler);
219  const bool success = spooler->Peek(file_to_peek);
220 
221  if (spooler->GetNumberOfErrors() > 0) {
222  LogCvmfs(kLogCatalog, kLogStderr, "Swissknife Sync: failed to peek for %s",
223  file_to_peek.c_str());
224  return 2;
225  }
226  if (!success) {
227  LogCvmfs(kLogCatalog, kLogStdout, "Swissknife Sync: %s not found",
228  file_to_peek.c_str());
229  return 1;
230  }
231  LogCvmfs(kLogCatalog, kLogStdout, "Swissknife Sync: %s available",
232  file_to_peek.c_str());
233 
234  delete spooler;
235 
236  return 0;
237 }
238 
240  const string file_to_delete = *args.find('o')->second;
241  const string spooler_definition = *args.find('r')->second;
242 
243  // Hash doesn't matter
244  const upload::SpoolerDefinition sd(spooler_definition, shash::kAny);
245  upload::Spooler *spooler = upload::Spooler::Construct(sd);
246  assert(spooler);
247  spooler->RemoveAsync(file_to_delete);
248  spooler->WaitForUpload();
249 
250  if (spooler->GetNumberOfErrors() > 0) {
251  LogCvmfs(kLogCatalog, kLogStderr, "Swissknife Sync: failed to delete %s",
252  file_to_delete.c_str());
253  return 1;
254  }
255 
256  delete spooler;
257 
258  return 0;
259 }
260 
262  const string dirtab_file = *args.find('d')->second;
263  union_dir_ = MakeCanonicalPath(*args.find('u')->second);
264  scratch_dir_ = MakeCanonicalPath(*args.find('s')->second);
265  const shash::Any base_hash = shash::MkFromHexPtr(
266  shash::HexPtr(*args.find('b')->second), shash::kSuffixCatalog);
267  const string stratum0 = *args.find('w')->second;
268  const string dir_temp = *args.find('t')->second;
269  verbose_ = (args.find('x') != args.end());
270 
271  // check if there is a dirtab file
272  if (!FileExists(dirtab_file)) {
274  "Swissknife Sync: Didn't find a dirtab at '%s'. Skipping...",
275  dirtab_file.c_str());
276  return 0;
277  }
278 
279  // parse dirtab file
280  catalog::Dirtab *dirtab = catalog::Dirtab::Create(dirtab_file);
281  if (!dirtab->IsValid()) {
283  "Swissknife Sync: Invalid or not readable dirtab '%s'",
284  dirtab_file.c_str());
285  return 1;
286  }
288  "Swissknife Sync: Found %lu rules in dirtab '%s'",
289  dirtab->RuleCount(), dirtab_file.c_str());
290 
291  // initialize catalog infrastructure
292  const bool auto_manage_catalog_files = true;
293  const bool follow_redirects = (args.count('L') > 0);
294  const string proxy = (args.count('@') > 0) ? *args.find('@')->second : "";
295  if (!InitDownloadManager(follow_redirects, proxy)) {
296  return 1;
297  }
298  catalog::SimpleCatalogManager catalog_manager(
299  base_hash, stratum0, dir_temp, download_manager(), statistics(),
300  auto_manage_catalog_files);
301  catalog_manager.Init();
302 
303  vector<string> new_nested_catalogs;
304  DetermineNestedCatalogCandidates(*dirtab, &catalog_manager,
305  &new_nested_catalogs);
306  const bool success = CreateCatalogMarkers(new_nested_catalogs);
307  delete dirtab;
308 
309  return (success) ? 0 : 1;
310 }
311 
312 
313 namespace {
314 
315 // Overwrite directory traversal in the globbing in order to avoid breaking out
316 // the repository tree
317 
318 std::string *g_glob_uniondir = NULL;
319 
320 bool GlobCheckPath(const char *name) {
321  char resolved_cstr[PATH_MAX];
322  char *retval = realpath(name, resolved_cstr);
323  if (retval == NULL)
324  return false;
325 
326  std::string resolved(resolved_cstr);
327  if (resolved == *g_glob_uniondir)
328  return true;
329  if (!HasPrefix(resolved, (*g_glob_uniondir) + "/", false /*ignore_case*/)) {
330  errno = EACCES;
331  return false;
332  }
333  return true;
334 }
335 
336 void *GlobOpendir(const char *name) {
337  if (!GlobCheckPath(name))
338  return NULL;
339  return opendir(name);
340 }
341 
342 void GlobClosedir(void *dirp) { closedir(static_cast<DIR *>(dirp)); }
343 
344 struct dirent *GlobReaddir(void *dirp) {
345  return readdir(static_cast<DIR *>(dirp));
346 }
347 
348 int GlobLstat(const char *name, struct stat *st) {
349  if (!GlobCheckPath(name))
350  return -1;
351  return lstat(name, st);
352 }
353 
354 int GlobStat(const char *name, struct stat *st) {
355  if (!GlobCheckPath(name))
356  return -1;
357  return stat(name, st);
358 }
359 
360 
361 } // anonymous namespace
362 
364  const catalog::Dirtab &dirtab,
365  catalog::SimpleCatalogManager *catalog_manager,
366  vector<string> *nested_catalog_candidates) {
367  // find possible new nested catalog locations
368  const catalog::Dirtab::Rules &lookup_rules = dirtab.positive_rules();
369  catalog::Dirtab::Rules::const_iterator i = lookup_rules.begin();
370  const catalog::Dirtab::Rules::const_iterator iend = lookup_rules.end();
371  for (; i != iend; ++i) {
372  assert(!i->is_negation);
373 
374  // run a glob using the current dirtab rule on the current repository
375  // state
376  const std::string &glob_string = i->pathspec.GetGlobString();
377  const std::string &glob_string_abs = union_dir_ + glob_string;
378  const int glob_flags = GLOB_ONLYDIR | GLOB_NOSORT | GLOB_PERIOD
379  | GLOB_ALTDIRFUNC;
380  glob_t glob_res;
381  g_glob_uniondir = new std::string(union_dir_);
382  glob_res.gl_opendir = GlobOpendir;
383  glob_res.gl_readdir = GlobReaddir;
384  glob_res.gl_closedir = GlobClosedir;
385  glob_res.gl_lstat = GlobLstat;
386  glob_res.gl_stat = GlobStat;
387  const int glob_retval = glob(glob_string_abs.c_str(), glob_flags, NULL,
388  &glob_res);
389  delete g_glob_uniondir;
390  g_glob_uniondir = NULL;
391 
392  if (glob_retval == 0) {
393  // found some candidates... filtering by cvmfs catalog structure
395  "Swissknife Sync: Found %lu entries for pathspec (%s)",
396  glob_res.gl_pathc, glob_string.c_str());
397  FilterCandidatesFromGlobResult(dirtab, glob_res.gl_pathv,
398  glob_res.gl_pathc, catalog_manager,
399  nested_catalog_candidates);
400  } else if (glob_retval == GLOB_NOMATCH) {
402  "Swissknife Sync: WARNING: cannot apply pathspec %s",
403  glob_string.c_str());
404  } else {
406  "Swissknife Sync: Failed to run glob matching (%s)",
407  glob_string.c_str());
408  }
409 
410  globfree(&glob_res);
411  }
412 }
413 
415  const catalog::Dirtab &dirtab, char **paths, const size_t npaths,
416  catalog::SimpleCatalogManager *catalog_manager,
417  std::vector<std::string> *nested_catalog_candidates) {
418  // go through the paths produced by glob() and filter them
419  for (size_t i = 0; i < npaths; ++i) {
420  // process candidate paths
421  const std::string candidate(paths[i]);
422  const std::string candidate_rel = candidate.substr(union_dir_.size());
423 
424  // check if path points to a directory
425  platform_stat64 candidate_info;
426  const int lstat_retval = platform_lstat(candidate.c_str(), &candidate_info);
427  if (lstat_retval != 0) {
429  "Swissknife Sync: "
430  "Error in processing .cvmfsdirtab: cannot access %s (%d)",
431  candidate.c_str(), errno);
432  abort();
433  }
434  assert(lstat_retval == 0);
435  if (!S_ISDIR(candidate_info.st_mode)) {
436  // The GLOB_ONLYDIR flag is only a hint, non-directories can still be
437  // returned
439  "Swissknife Sync: "
440  "The '%s' dirtab entry does not point to a directory "
441  "but to a file or a symbolic link",
442  candidate_rel.c_str());
443  continue;
444  }
445 
446  // check if the path is a meta-directory (. or ..)
447  assert(candidate_rel.size() >= 2);
448  if (candidate_rel.substr(candidate_rel.size() - 2) == "/."
449  || candidate_rel.substr(candidate_rel.size() - 3) == "/..") {
450  continue;
451  }
452 
453  // check that the path isn't excluded in the dirtab
454  if (dirtab.IsOpposing(candidate_rel)) {
456  "Swissknife Sync: Candidate '%s' is excluded by dirtab",
457  candidate_rel.c_str());
458  continue;
459  }
460 
461  // lookup the path in the catalog structure to find out if it already
462  // points to a nested catalog transition point. Furthermore it could be
463  // a new directory and thus not in any catalog yet.
465  const bool lookup_success = catalog_manager->LookupPath(
466  candidate_rel, catalog::kLookupDefault, &dirent);
467  if (!lookup_success) {
469  "Swissknife Sync: Didn't find '%s' in catalogs, could "
470  "be a new directory and nested catalog.",
471  candidate_rel.c_str());
472  nested_catalog_candidates->push_back(candidate);
473  } else if (!dirent.IsNestedCatalogMountpoint()
474  && !dirent.IsNestedCatalogRoot()) {
476  "Swissknife Sync: Found '%s' in catalogs but is not a "
477  "nested catalog yet.",
478  candidate_rel.c_str());
479  nested_catalog_candidates->push_back(candidate);
480  } else {
481  // check if the nested catalog marker is still there, we might need to
482  // recreate the catalog after manual marker removal
483  // Note: First we check if the parent directory shows up in the scratch
484  // space to verify that it was touched (copy-on-write)
485  // Otherwise we would force the cvmfs client behind the union
486  // file-
487  // system to (potentially) unnecessarily fetch catalogs
488  if (DirectoryExists(scratch_dir_ + candidate_rel)
489  && !FileExists(union_dir_ + candidate_rel + "/.cvmfscatalog")) {
491  "Swissknife Sync: WARNING: '%s' should be a nested "
492  "catalog according to the dirtab. "
493  "Recreating...",
494  candidate_rel.c_str());
495  nested_catalog_candidates->push_back(candidate);
496  } else {
498  "Swissknife Sync: "
499  "Found '%s' in catalogs and it already is a nested catalog.",
500  candidate_rel.c_str());
501  }
502  }
503  }
504 }
505 
507  const std::vector<std::string> &new_nested_catalogs) {
508  // go through the new nested catalog paths and create .cvmfscatalog markers
509  // where necessary
510  bool success = true;
511  std::vector<std::string>::const_iterator k = new_nested_catalogs.begin();
512  const std::vector<std::string>::const_iterator kend = new_nested_catalogs
513  .end();
514  for (; k != kend; ++k) {
515  assert(!k->empty() && k->size() > union_dir_.size());
516 
517  // was the marker already created by hand?
518  const std::string marker_path = *k + "/.cvmfscatalog";
519  if (FileExists(marker_path)) {
520  continue;
521  }
522 
523  // create a nested catalog marker
524  const mode_t mode = kDefaultFileMode;
525  const int fd = open(marker_path.c_str(), O_CREAT, mode);
526  if (fd < 0) {
528  "Swissknife Sync: Failed to create nested catalog marker "
529  "at '%s' (errno: %d)",
530  marker_path.c_str(), errno);
531  success = false;
532  continue;
533  }
534  close(fd);
535 
536  // inform the user if requested
537  if (verbose_) {
539  "Swissknife Sync: Auto-creating nested catalog in %s",
540  k->c_str());
541  }
542  }
543 
544  return success;
545 }
546 
547 struct chunk_arg {
548  chunk_arg(char param, size_t *save_to) : param(param), save_to(save_to) { }
549  char param;
550  size_t *save_to;
551 };
552 
554  const swissknife::ArgumentList &args, SyncParameters *params) {
555  typedef std::vector<chunk_arg> ChunkArgs;
556 
557  // define where to store the value of which file chunk argument
558  ChunkArgs chunk_args;
559  chunk_args.push_back(chunk_arg('a', &params->avg_file_chunk_size));
560  chunk_args.push_back(chunk_arg('l', &params->min_file_chunk_size));
561  chunk_args.push_back(chunk_arg('h', &params->max_file_chunk_size));
562 
563  // read the arguments
564  ChunkArgs::const_iterator i = chunk_args.begin();
565  ChunkArgs::const_iterator iend = chunk_args.end();
566  for (; i != iend; ++i) {
567  swissknife::ArgumentList::const_iterator arg = args.find(i->param);
568 
569  if (arg != args.end()) {
570  size_t arg_value = static_cast<size_t>(String2Uint64(*arg->second));
571  if (arg_value > 0) {
572  *i->save_to = arg_value;
573  } else {
574  return false;
575  }
576  }
577  }
578 
579  // check if argument values are sane
580  return true;
581 }
582 
584  string start_time = GetGMTimestamp();
585 
586  // Spawn monitoring process (watchdog)
587  std::string watchdog_dir = "/tmp";
588  char watchdog_path[PATH_MAX];
589  std::string timestamp = GetGMTimestamp("%Y.%m.%d-%H.%M.%S");
590  int path_size = snprintf(watchdog_path, sizeof(watchdog_path),
591  "%s/cvmfs-swissknife-sync-stacktrace.%s.%d",
592  watchdog_dir.c_str(), timestamp.c_str(), getpid());
593  assert(path_size > 0);
594  assert(path_size < PATH_MAX);
595  UniquePtr<Watchdog> watchdog(Watchdog::Create(NULL));
596  watchdog->Spawn(std::string(watchdog_path));
597 
598  SyncParameters params;
599 
600  // Initialization
601  params.dir_union = MakeCanonicalPath(*args.find('u')->second);
602  params.dir_scratch = MakeCanonicalPath(*args.find('s')->second);
603  params.dir_rdonly = MakeCanonicalPath(*args.find('c')->second);
604  params.dir_temp = MakeCanonicalPath(*args.find('t')->second);
605  params.base_hash = shash::MkFromHexPtr(shash::HexPtr(*args.find('b')->second),
607  params.stratum0 = *args.find('w')->second;
608  params.manifest_path = *args.find('o')->second;
609  params.spooler_definition = *args.find('r')->second;
610 
611  params.public_keys = *args.find('K')->second;
612  params.repo_name = *args.find('N')->second;
613 
615 
616  if (args.find('f') != args.end())
617  params.union_fs_type = *args.find('f')->second;
618  if (args.find('A') != args.end())
619  params.is_balanced = true;
620  if (args.find('x') != args.end())
621  params.print_changeset = true;
622  if (args.find('y') != args.end())
623  params.dry_run = true;
624  if (args.find('m') != args.end())
625  params.mucatalogs = true;
626  if (args.find('i') != args.end())
627  params.ignore_xdir_hardlinks = true;
628  if (args.find('d') != args.end())
629  params.stop_for_catalog_tweaks = true;
630  if (args.find('V') != args.end())
631  params.voms_authz = true;
632  if (args.find('F') != args.end())
633  params.authz_file = *args.find('F')->second;
634  if (args.find('k') != args.end())
635  params.include_xattrs = true;
636  if (args.find('j') != args.end())
637  params.enable_mtime_ns = true;
638  if (args.find('Y') != args.end())
639  params.external_data = true;
640  if (args.find('W') != args.end())
641  params.direct_io = true;
642  if (args.find('S') != args.end()) {
644  *args.find('S')->second, &params.virtual_dir_actions);
645  if (!retval) {
647  "Swissknife Sync: Invalid virtual catalog options: %s",
648  args.find('S')->second->c_str());
649  return 1;
650  }
651  }
652  if (args.find('z') != args.end()) {
653  unsigned log_level = 1 << (kLogLevel0
654  + String2Uint64(*args.find('z')->second));
655  if (log_level > kLogNone) {
656  LogCvmfs(kLogCvmfs, kLogStderr, "Swissknife Sync: invalid log level");
657  return 1;
658  }
659  SetLogVerbosity(static_cast<LogLevels>(log_level));
660  }
661 
662  if (args.find('X') != args.end())
663  params.max_weight = String2Uint64(*args.find('X')->second);
664  if (args.find('M') != args.end())
665  params.min_weight = String2Uint64(*args.find('M')->second);
666 
667  if (args.find('p') != args.end()) {
668  params.use_file_chunking = true;
669  if (!ReadFileChunkingArgs(args, &params)) {
670  PrintError("Swissknife Sync: Failed to read file chunk size values");
671  return 2;
672  }
673  }
674  if (args.find('O') != args.end()) {
675  params.generate_legacy_bulk_chunks = true;
676  }
677  shash::Algorithms hash_algorithm = shash::kSha1;
678  if (args.find('e') != args.end()) {
679  hash_algorithm = shash::ParseHashAlgorithm(*args.find('e')->second);
680  if (hash_algorithm == shash::kAny) {
681  PrintError("Swissknife Sync: Unknown hash algorithm");
682  return 1;
683  }
684  }
685  if (args.find('Z') != args.end()) {
687  *args.find('Z')->second);
688  }
689 
690  if (args.find('E') != args.end())
691  params.enforce_limits = true;
692  if (args.find('Q') != args.end()) {
693  params.nested_kcatalog_limit = String2Uint64(*args.find('Q')->second);
694  } else {
696  }
697  if (args.find('R') != args.end()) {
698  params.root_kcatalog_limit = String2Uint64(*args.find('R')->second);
699  } else {
701  }
702  if (args.find('U') != args.end()) {
703  params.file_mbyte_limit = String2Uint64(*args.find('U')->second);
704  } else {
706  }
707 
708  if (args.find('v') != args.end()) {
709  sanitizer::IntegerSanitizer sanitizer;
710  if (!sanitizer.IsValid(*args.find('v')->second)) {
711  PrintError("Swissknife Sync: Invalid revision number");
712  return 1;
713  }
714  params.manual_revision = String2Uint64(*args.find('v')->second);
715  }
716 
717  params.branched_catalog = args.find('B') != args.end();
718 
719  if (args.find('q') != args.end()) {
720  params.max_concurrent_write_jobs = String2Uint64(*args.find('q')->second);
721  }
722 
723  if (args.find('0') != args.end()) {
724  params.num_upload_tasks = String2Uint64(*args.find('0')->second);
725  }
726 
727  if (args.find('T') != args.end()) {
728  params.ttl_seconds = String2Uint64(*args.find('T')->second);
729  }
730 
731  if (args.find('g') != args.end()) {
732  params.ignore_special_files = true;
733  }
734 
735  if (args.find('P') != args.end()) {
736  params.session_token_file = *args.find('P')->second;
737  }
738 
739  if (args.find('H') != args.end()) {
740  params.key_file = *args.find('H')->second;
741  }
742 
743  if (args.find('D') != args.end()) {
744  params.repo_tag.SetName(*args.find('D')->second);
745  }
746 
747  if (args.find('J') != args.end()) {
748  params.repo_tag.SetDescription(*args.find('J')->second);
749  }
750 
751  if (args.find('G') != args.end()) {
752  params.cache_dir = "/var/spool/cvmfs/" + params.repo_name + "/cache.server";
753  }
754 
755  const bool upload_statsdb = (args.count('I') > 0);
756 
757  if (!CheckParams(params))
758  return 2;
759  // This may fail, in which case a warning is printed and the process continues
761 
762  perf::StatisticsTemplate publish_statistics("publish", this->statistics());
763 
764  // Start spooler
765  upload::SpoolerDefinition spooler_definition(
766  params.spooler_definition, hash_algorithm, params.compression_alg,
769  params.max_file_chunk_size, params.session_token_file, params.key_file);
770  if (params.max_concurrent_write_jobs > 0) {
771  spooler_definition
773  }
774  spooler_definition.num_upload_tasks = params.num_upload_tasks;
775 
776  upload::SpoolerDefinition spooler_definition_catalogs(
777  spooler_definition.Dup2DefaultCompression());
778 
779  params.spooler = upload::Spooler::Construct(spooler_definition,
780  &publish_statistics);
781  if (NULL == params.spooler)
782  return 3;
783  UniquePtr<upload::Spooler> spooler_catalogs(upload::Spooler::Construct(
784  spooler_definition_catalogs, &publish_statistics));
785  if (!spooler_catalogs.IsValid())
786  return 3;
787 
788  const bool follow_redirects = (args.count('L') > 0);
789  const string proxy = (args.count('@') > 0) ? *args.find('@')->second : "";
790  if (!InitDownloadManager(follow_redirects, proxy)) {
791  return 3;
792  }
793 
794  if (!InitSignatureManager(params.public_keys)) {
795  return 3;
796  }
797 
798  /*
799  * Note: If the upstream is of type gateway, due to the possibility of
800  * concurrent release managers, it's possible to have a different local and
801  * remote root hashes. We proceed by loading the remote manifest but we give
802  * an empty base hash.
803  */
805  if (params.branched_catalog) {
806  // Throw-away manifest
807  manifest = new manifest::Manifest(shash::Any(), 0, "");
808  } else if (params.virtual_dir_actions
810  manifest = this->OpenLocalManifest(params.manifest_path);
811  params.base_hash = manifest->catalog_hash();
812  } else {
813  // TODO(jblomer): revert to params.base_hash if spooler driver type is not
814  // upload::SpoolerDefinition::Gateway
815  manifest = FetchRemoteManifest(params.stratum0, params.repo_name,
816  shash::Any());
817  }
818  if (!manifest.IsValid()) {
819  return 3;
820  }
821 
823  params.repo_name);
824 
825  const std::string old_root_hash = manifest->catalog_hash().ToString(true);
826 
827  catalog::WritableCatalogManager catalog_manager(
828  params.base_hash, params.stratum0, params.dir_temp,
829  spooler_catalogs.weak_ref(), download_manager(), params.enforce_limits,
831  params.file_mbyte_limit, statistics(), params.is_balanced,
832  params.max_weight, params.min_weight, params.cache_dir);
833  catalog_manager.Init();
834 
835  publish::SyncMediator mediator(&catalog_manager, &params, publish_statistics);
836  LogCvmfs(kLogPublish, kLogStdout, "Swissknife Sync: Processing changes...");
837 
838  // Should be before the synchronization starts to avoid race of GetTTL with
839  // other sqlite operations
840  if ((params.ttl_seconds > 0)
841  && ((params.ttl_seconds != catalog_manager.GetTTL())
842  || !catalog_manager.HasExplicitTTL())) {
844  "Swissknife Sync: Setting repository TTL to %" PRIu64 "s",
845  params.ttl_seconds);
846  catalog_manager.SetTTL(params.ttl_seconds);
847  }
848 
849  // Either real catalogs or virtual catalog
851  publish::SyncUnion *sync;
852  if (params.union_fs_type == "overlayfs") {
853  sync = new publish::SyncUnionOverlayfs(
854  &mediator, params.dir_rdonly, params.dir_union, params.dir_scratch);
855  } else if (params.union_fs_type == "aufs") {
856  sync = new publish::SyncUnionAufs(&mediator, params.dir_rdonly,
857  params.dir_union, params.dir_scratch);
858  } else {
860  "Swissknife Sync: unknown union file system: %s",
861  params.union_fs_type.c_str());
862  return 3;
863  }
864 
865  if (!sync->Initialize()) {
867  "Swissknife Sync: Initialization of the synchronisation "
868  "engine failed");
869  return 4;
870  }
871 
872  sync->Traverse();
873  } else {
874  assert(!manifest->history().IsNull());
875  catalog::VirtualCatalog virtual_catalog(
876  manifest.weak_ref(), download_manager(), &catalog_manager, &params);
877  virtual_catalog.Generate(params.virtual_dir_actions);
878  }
879 
880  if (!params.authz_file.empty()) {
882  "Swissknife Sync: Adding contents of authz file %s to"
883  " root catalog.",
884  params.authz_file.c_str());
885  int fd = open(params.authz_file.c_str(), O_RDONLY);
886  if (fd == -1) {
888  "Swissknife Sync: Unable to open authz file (%s)"
889  "from the publication process: %s",
890  params.authz_file.c_str(), strerror(errno));
891  return 7;
892  }
893 
894  std::string new_authz;
895  const bool read_successful = SafeReadToString(fd, &new_authz);
896  close(fd);
897 
898  if (!read_successful) {
900  "Swissknife Sync: Failed to read authz file (%s): %s",
901  params.authz_file.c_str(), strerror(errno));
902  return 8;
903  }
904 
905  catalog_manager.SetVOMSAuthz(new_authz);
906  }
907 
908  if (!mediator.Commit(manifest.weak_ref())) {
909  PrintError("Swissknife Sync: Something went wrong during sync");
910  if (!params.dry_run) {
911  stats_db->StorePublishStatistics(this->statistics(), start_time, false);
912  if (upload_statsdb) {
913  stats_db->UploadStatistics(params.spooler);
914  }
915  }
916  return 5;
917  }
918 
919  perf::Counter *revision_counter = statistics()->Register(
920  "publish.revision", "Published revision number");
921  revision_counter->Set(
922  static_cast<int64_t>(catalog_manager.GetRootCatalog()->revision()));
923 
924  // finalize the spooler
926  "Swissknife Sync: Wait for all uploads to finish");
927  params.spooler->WaitForUpload();
928  spooler_catalogs->WaitForUpload();
929  params.spooler->FinalizeSession(false);
930 
932  "Swissknife Sync: Exporting repository manifest");
933 
934  // We call FinalizeSession(true) this time, to also trigger the commit
935  // operation on the gateway machine (if the upstream is of type "gw").
936 
937  // Get the path of the new root catalog
938  const std::string new_root_hash = manifest->catalog_hash().ToString(true);
939 
940  if (!spooler_catalogs->FinalizeSession(true, old_root_hash, new_root_hash,
941  params.repo_tag)) {
942  PrintError("Swissknife Sync: Failed to commit transaction.");
943  if (!params.dry_run) {
944  stats_db->StorePublishStatistics(this->statistics(), start_time, false);
945  if (upload_statsdb) {
946  stats_db->UploadStatistics(params.spooler);
947  }
948  }
949  return 9;
950  }
951 
952  if (!params.dry_run) {
953  stats_db->StorePublishStatistics(this->statistics(), start_time, true);
954  if (upload_statsdb) {
955  stats_db->UploadStatistics(params.spooler);
956  }
957  }
958 
959  delete params.spooler;
960 
961  if (!manifest->Export(params.manifest_path)) {
962  PrintError("Swissknife Sync: Failed to create new repository");
963  return 6;
964  }
965 
966  return 0;
967 }
bool Commit(manifest::Manifest *manifest)
void SetLogVerbosity(const LogLevels max_level)
Definition: logging.cc:258
std::string database_file() const
Definition: reflog.cc:321
Algorithms ParseCompressionAlgorithm(const std::string &algorithm_option)
Definition: compression.cc:153
size_t avg_file_chunk_size
const manifest::Manifest * manifest() const
Definition: repository.h:125
struct stat64 platform_stat64
std::string stratum0
int Main(const ArgumentList &args)
size_t * save_to
std::string cache_dir
std::string GetGMTimestamp(const std::string &format)
Definition: string.cc:654
bool IsValid() const
Definition: dirtab.h:127
bool UploadStatistics(upload::Spooler *spooler, std::string local_path="")
T * weak_ref() const
Definition: pointer.h:46
SpoolerDefinition Dup2DefaultCompression() const
std::vector< Rule > Rules
Definition: dirtab.h:68
bool generate_legacy_bulk_chunks
unsigned file_mbyte_limit
uint64_t max_concurrent_write_jobs
const int kDefaultFileMode
Definition: posix.h:32
static const unsigned kDefaultFileMbyteLimit
CVMFS_EXPORT const LogSource source
Definition: exception.h:33
virtual bool Initialize()
Definition: sync_union.cc:24
static const unsigned kDefaultNestedKcatalogLimit
static const unsigned kDefaultRootKcatalogLimit
int Main(const ArgumentList &args)
static bool ParseActions(const std::string &action_desc, int *actions)
size_t min_file_chunk_size
unsigned num_upload_tasks
std::string spooler_definition
std::string manifest_path
bool ReadFileChunkingArgs(const swissknife::ArgumentList &args, SyncParameters *params)
void SetName(const std::string &name)
const Rules & positive_rules() const
Definition: dirtab.h:121
bool CheckParams(const SyncParameters &p)
zlib::Algorithms compression_alg
unsigned nested_kcatalog_limit
int GlobLstat(const char *name, struct stat *st)
std::string repo_name
assert((mem||(size==0))&&"Out Of Memory")
bool LookupPath(const PathString &path, const LookupOptions options, DirectoryEntry *entry)
void SetDescription(const std::string &description)
std::string union_fs_type
const unsigned kLookupDefault
Definition: catalog_mgr.h:43
std::string dir_scratch
int Main(const ArgumentList &args)
bool IsNestedCatalogMountpoint() const
virtual bool IsOpposing(const std::string &path) const
Definition: dirtab.cc:161
chunk_arg(char param, size_t *save_to)
bool IsValid(const std::string &input) const
Definition: sanitizer.cc:112
Algorithms
Definition: hash.h:41
std::string key_file
bool IsNestedCatalogRoot() const
bool FileExists(const std::string &path)
Definition: posix.cc:803
std::string dir_temp
int Main(const ArgumentList &args)
static Watchdog * Create(FnOnCrash on_crash)
Definition: monitor.cc:67
static manifest::Manifest * CreateRepository(const std::string &dir_temp, const bool volatile_content, const std::string &voms_authz, upload::Spooler *spooler)
void Set(const int64_t val)
Definition: statistics.h:33
void FilterCandidatesFromGlobResult(const catalog::Dirtab &dirtab, char **paths, const size_t npaths, catalog::SimpleCatalogManager *catalog_manager, std::vector< std::string > *nested_catalog_candidates)
bool CheckParams(const swissknife::CommandLease::Parameters &p)
std::string dir_rdonly
const char kSuffixCatalog
Definition: hash.h:54
shash::Any base_hash
upload::Spooler * spooler
int platform_lstat(const char *path, platform_stat64 *buf)
static Dirtab * Create(const std::string &dirtab_path)
Definition: dirtab.h:84
static const int kActionNone
int Main(const ArgumentList &args)
std::string dir_union
static void HashDatabase(const std::string &database_path, shash::Any *hash_reflog)
Definition: reflog.cc:308
int Main(const ArgumentList &args)
bool IsValid() const
Definition: pointer.h:47
struct dirent * GlobReaddir(void *dirp)
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:279
void DropDatabaseFileOwnership()
Definition: reflog.cc:299
static StatisticsDatabase * OpenStandardDB(const std::string repo_name)
bool DirectoryExists(const std::string &path)
Definition: posix.cc:824
bool stop_for_catalog_tweaks
bool SafeReadToString(int fd, std::string *final_result)
Definition: posix.cc:2116
unsigned root_kcatalog_limit
size_t max_file_chunk_size
bool StorePublishStatistics(const perf::Statistics *statistics, const std::string &start_time, const bool success)
uint64_t manual_revision
int GlobStat(const char *name, struct stat *st)
std::string session_token_file
bool ObtainDacReadSearchCapability()
virtual void Traverse()=0
uint64_t String2Uint64(const string &value)
Definition: string.cc:240
std::map< char, SharedPtr< std::string > > ArgumentList
Definition: swissknife.h:72
size_t RuleCount() const
Definition: dirtab.h:124
Algorithms ParseHashAlgorithm(const string &algorithm_option)
Definition: hash.cc:71
uint64_t ttl_seconds
std::string authz_file
Any MkFromHexPtr(const HexPtr hex, const char suffix)
Definition: hash.cc:82
void DetermineNestedCatalogCandidates(const catalog::Dirtab &dirtab, catalog::SimpleCatalogManager *catalog_manager, std::vector< std::string > *nested_catalog_candidates)
RepositoryTag repo_tag
const int kLogVerboseMsg
void Generate(int actions)
void Spawn(const std::string &crash_dump_path)
Definition: monitor.cc:514
static bool WriteChecksum(const std::string &path, const shash::Any &value)
Definition: reflog.cc:65
bool ignore_xdir_hardlinks
std::string MakeCanonicalPath(const std::string &path)
Definition: posix.cc:98
void PrintError(const string &message)
Definition: logging.cc:556
const upload::Spooler * spooler_catalogs() const
Definition: repository.h:321
static const uint64_t kDefaultTTL
Definition: catalog.h:104
bool CreateCatalogMarkers(const std::vector< std::string > &new_nested_catalogs)
std::string public_keys
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545