CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
swissknife_sync.cc
Go to the documentation of this file.
1 
19 // NOLINTNEXTLINE
20 #define _FILE_OFFSET_BITS 64
21 // NOLINTNEXTLINE
22 #define __STDC_FORMAT_MACROS
23 
24 #include "swissknife_sync.h"
25 
26 #include <errno.h>
27 #include <fcntl.h>
28 #include <glob.h>
29 #include <inttypes.h>
30 #include <limits.h>
31 #include <sys/capability.h>
32 
33 #include <cstdio>
34 #include <cstdlib>
35 #include <string>
36 #include <vector>
37 
38 #include "catalog_mgr_ro.h"
39 #include "catalog_mgr_rw.h"
40 #include "catalog_virtual.h"
41 #include "manifest.h"
42 #include "monitor.h"
43 #include "network/download.h"
44 #include "path_filters/dirtab.h"
45 #include "reflog.h"
46 #include "sanitizer.h"
47 #include "statistics.h"
48 #include "statistics_database.h"
50 #include "sync_mediator.h"
51 #include "sync_union.h"
52 #include "sync_union_aufs.h"
53 #include "sync_union_overlayfs.h"
54 #include "util/logging.h"
55 #include "util/platform.h"
56 #include "util/string.h"
57 
58 using namespace std; // NOLINT
59 
61  if (!DirectoryExists(p.dir_scratch)) {
62  PrintError("overlay (copy on write) directory does not exist");
63  return false;
64  }
65  if (!DirectoryExists(p.dir_union)) {
66  PrintError("union volume does not exist");
67  return false;
68  }
69  if (!DirectoryExists(p.dir_rdonly)) {
70  PrintError("cvmfs read/only repository does not exist");
71  return false;
72  }
73  if (p.stratum0 == "") {
74  PrintError("Stratum0 url missing");
75  return false;
76  }
77 
78  if (p.manifest_path == "") {
79  PrintError("manifest output required");
80  return false;
81  }
82  if (!DirectoryExists(p.dir_temp)) {
83  PrintError("data store directory does not exist");
84  return false;
85  }
86 
89  PrintError("file chunk size values are not sane");
90  return false;
91  }
92 
93  if (HasPrefix(p.spooler_definition, "gw", false)) {
94  if (p.session_token_file.empty()) {
95  PrintError("Session token file has to be provided "
96  "when upstream type is gw.");
97  return false;
98  }
99  }
100 
101  return true;
102 }
103 
105  const string manifest_path = *args.find('o')->second;
106  const string dir_temp = *args.find('t')->second;
107  const string spooler_definition = *args.find('r')->second;
108  const string repo_name = *args.find('n')->second;
109  const string reflog_chksum_path = *args.find('R')->second;
110  if (args.find('l') != args.end()) {
111  const unsigned log_level = kLogLevel0
112  << String2Uint64(*args.find('l')->second);
113  if (log_level > kLogNone) {
114  LogCvmfs(kLogCvmfs, kLogStderr, "invalid log level");
115  return 1;
116  }
117  SetLogVerbosity(static_cast<LogLevels>(log_level));
118  }
119  shash::Algorithms hash_algorithm = shash::kSha1;
120  if (args.find('a') != args.end()) {
121  hash_algorithm = shash::ParseHashAlgorithm(*args.find('a')->second);
122  if (hash_algorithm == shash::kAny) {
123  PrintError("unknown hash algorithm");
124  return 1;
125  }
126  }
127 
128  const bool volatile_content = (args.count('v') > 0);
129  const bool garbage_collectable = (args.count('z') > 0);
130  std::string voms_authz;
131  if (args.find('V') != args.end()) {
132  voms_authz = *args.find('V')->second;
133  }
134 
135  const upload::SpoolerDefinition sd(spooler_definition, hash_algorithm,
137  const UniquePtr<upload::Spooler> spooler(upload::Spooler::Construct(sd));
138  assert(spooler.IsValid());
139 
142  dir_temp, volatile_content, voms_authz, spooler.weak_ref()));
143  if (!manifest.IsValid()) {
144  PrintError("Swissknife Sync: Failed to create new repository");
145  return 1;
146  }
147 
148  UniquePtr<manifest::Reflog> reflog(CreateEmptyReflog(dir_temp, repo_name));
149  if (!reflog.IsValid()) {
150  PrintError("Swissknife Sync: Failed to create fresh Reflog");
151  return 1;
152  }
153 
154  reflog->DropDatabaseFileOwnership();
155  const string reflog_path = reflog->database_file();
156  reflog.Destroy();
157  shash::Any reflog_hash(hash_algorithm);
158  manifest::Reflog::HashDatabase(reflog_path, &reflog_hash);
159  spooler->UploadReflog(reflog_path);
160  spooler->WaitForUpload();
161  unlink(reflog_path.c_str());
162  if (spooler->GetNumberOfErrors()) {
163  LogCvmfs(kLogCvmfs, kLogStderr, "Swissknife Sync: Failed to upload reflog");
164  return 4;
165  }
166  assert(!reflog_chksum_path.empty());
167  manifest::Reflog::WriteChecksum(reflog_chksum_path, reflog_hash);
168 
169  // set optional manifest fields
170  const bool needs_bootstrap_shortcuts = !voms_authz.empty();
171  manifest->set_garbage_collectability(garbage_collectable);
172  manifest->set_has_alt_catalog_path(needs_bootstrap_shortcuts);
173 
174  if (!manifest->Export(manifest_path)) {
175  PrintError("Swissknife Sync: Failed to create new repository");
176  return 5;
177  }
178 
179  return 0;
180 }
181 
183  const string source = *args.find('i')->second;
184  const string dest = *args.find('o')->second;
185  const string spooler_definition = *args.find('r')->second;
186  shash::Algorithms hash_algorithm = shash::kSha1;
187  if (args.find('a') != args.end()) {
188  hash_algorithm = shash::ParseHashAlgorithm(*args.find('a')->second);
189  if (hash_algorithm == shash::kAny) {
190  PrintError("Swissknife Sync: Unknown hash algorithm");
191  return 1;
192  }
193  }
194 
195  const upload::SpoolerDefinition sd(spooler_definition, hash_algorithm);
196  upload::Spooler *spooler = upload::Spooler::Construct(sd);
197  assert(spooler);
198  spooler->Upload(source, dest);
199  spooler->WaitForUpload();
200 
201  if (spooler->GetNumberOfErrors() > 0) {
202  LogCvmfs(kLogCatalog, kLogStderr, "Swissknife Sync: failed to upload %s",
203  source.c_str());
204  return 1;
205  }
206 
207  delete spooler;
208 
209  return 0;
210 }
211 
213  const string file_to_peek = *args.find('d')->second;
214  const string spooler_definition = *args.find('r')->second;
215 
216  // Hash doesn't matter
217  const upload::SpoolerDefinition sd(spooler_definition, shash::kAny);
218  upload::Spooler *spooler = upload::Spooler::Construct(sd);
219  assert(spooler);
220  const bool success = spooler->Peek(file_to_peek);
221 
222  if (spooler->GetNumberOfErrors() > 0) {
223  LogCvmfs(kLogCatalog, kLogStderr, "Swissknife Sync: failed to peek for %s",
224  file_to_peek.c_str());
225  return 2;
226  }
227  if (!success) {
228  LogCvmfs(kLogCatalog, kLogStdout, "Swissknife Sync: %s not found",
229  file_to_peek.c_str());
230  return 1;
231  }
232  LogCvmfs(kLogCatalog, kLogStdout, "Swissknife Sync: %s available",
233  file_to_peek.c_str());
234 
235  delete spooler;
236 
237  return 0;
238 }
239 
241  const string file_to_delete = *args.find('o')->second;
242  const string spooler_definition = *args.find('r')->second;
243 
244  // Hash doesn't matter
245  const upload::SpoolerDefinition sd(spooler_definition, shash::kAny);
246  upload::Spooler *spooler = upload::Spooler::Construct(sd);
247  assert(spooler);
248  spooler->RemoveAsync(file_to_delete);
249  spooler->WaitForUpload();
250 
251  if (spooler->GetNumberOfErrors() > 0) {
252  LogCvmfs(kLogCatalog, kLogStderr, "Swissknife Sync: failed to delete %s",
253  file_to_delete.c_str());
254  return 1;
255  }
256 
257  delete spooler;
258 
259  return 0;
260 }
261 
263  const string dirtab_file = *args.find('d')->second;
264  union_dir_ = MakeCanonicalPath(*args.find('u')->second);
265  scratch_dir_ = MakeCanonicalPath(*args.find('s')->second);
266  const shash::Any base_hash = shash::MkFromHexPtr(
267  shash::HexPtr(*args.find('b')->second), shash::kSuffixCatalog);
268  const string stratum0 = *args.find('w')->second;
269  const string dir_temp = *args.find('t')->second;
270  verbose_ = (args.find('x') != args.end());
271 
272  // check if there is a dirtab file
273  if (!FileExists(dirtab_file)) {
275  "Swissknife Sync: Didn't find a dirtab at '%s'. Skipping...",
276  dirtab_file.c_str());
277  return 0;
278  }
279 
280  // parse dirtab file
281  catalog::Dirtab *dirtab = catalog::Dirtab::Create(dirtab_file);
282  if (!dirtab->IsValid()) {
284  "Swissknife Sync: Invalid or not readable dirtab '%s'",
285  dirtab_file.c_str());
286  return 1;
287  }
289  "Swissknife Sync: Found %lu rules in dirtab '%s'",
290  dirtab->RuleCount(), dirtab_file.c_str());
291 
292  // initialize catalog infrastructure
293  const bool auto_manage_catalog_files = true;
294  const bool follow_redirects = (args.count('L') > 0);
295  const string proxy = (args.count('@') > 0) ? *args.find('@')->second : "";
296  if (!InitDownloadManager(follow_redirects, proxy)) {
297  return 1;
298  }
299  catalog::SimpleCatalogManager catalog_manager(
300  base_hash, stratum0, dir_temp, download_manager(), statistics(),
301  auto_manage_catalog_files);
302  catalog_manager.Init();
303 
304  vector<string> new_nested_catalogs;
305  DetermineNestedCatalogCandidates(*dirtab, &catalog_manager,
306  &new_nested_catalogs);
307  const bool success = CreateCatalogMarkers(new_nested_catalogs);
308  delete dirtab;
309 
310  return (success) ? 0 : 1;
311 }
312 
313 
314 namespace {
315 
316 // Overwrite directory traversal in the globbing in order to avoid breaking out
317 // the repository tree
318 
319 std::string *g_glob_uniondir = NULL;
320 
321 bool GlobCheckPath(const char *name) {
322  char resolved_cstr[PATH_MAX];
323  char *retval = realpath(name, resolved_cstr);
324  if (retval == NULL)
325  return false;
326 
327  const std::string resolved(resolved_cstr);
328  if (resolved == *g_glob_uniondir)
329  return true;
330  if (!HasPrefix(resolved, (*g_glob_uniondir) + "/", false /*ignore_case*/)) {
331  errno = EACCES;
332  return false;
333  }
334  return true;
335 }
336 
337 void *GlobOpendir(const char *name) {
338  if (!GlobCheckPath(name))
339  return NULL;
340  return opendir(name);
341 }
342 
343 void GlobClosedir(void *dirp) { closedir(static_cast<DIR *>(dirp)); }
344 
345 struct dirent *GlobReaddir(void *dirp) {
346  return readdir(static_cast<DIR *>(dirp));
347 }
348 
349 int GlobLstat(const char *name, struct stat *st) {
350  if (!GlobCheckPath(name))
351  return -1;
352  return lstat(name, st);
353 }
354 
355 int GlobStat(const char *name, struct stat *st) {
356  if (!GlobCheckPath(name))
357  return -1;
358  return stat(name, st);
359 }
360 
361 
362 } // anonymous namespace
363 
365  const catalog::Dirtab &dirtab,
366  catalog::SimpleCatalogManager *catalog_manager,
367  vector<string> *nested_catalog_candidates) {
368  // find possible new nested catalog locations
369  const catalog::Dirtab::Rules &lookup_rules = dirtab.positive_rules();
370  catalog::Dirtab::Rules::const_iterator i = lookup_rules.begin();
371  const catalog::Dirtab::Rules::const_iterator iend = lookup_rules.end();
372  for (; i != iend; ++i) {
373  assert(!i->is_negation);
374 
375  // run a glob using the current dirtab rule on the current repository
376  // state
377  const std::string &glob_string = i->pathspec.GetGlobString();
378  const std::string &glob_string_abs = union_dir_ + glob_string;
379  const int glob_flags = GLOB_ONLYDIR | GLOB_NOSORT | GLOB_PERIOD
380  | GLOB_ALTDIRFUNC;
381  glob_t glob_res;
382  g_glob_uniondir = new std::string(union_dir_);
383  glob_res.gl_opendir = GlobOpendir;
384  glob_res.gl_readdir = GlobReaddir;
385  glob_res.gl_closedir = GlobClosedir;
386  glob_res.gl_lstat = GlobLstat;
387  glob_res.gl_stat = GlobStat;
388  const int glob_retval = glob(glob_string_abs.c_str(), glob_flags, NULL,
389  &glob_res);
390  delete g_glob_uniondir;
391  g_glob_uniondir = NULL;
392 
393  if (glob_retval == 0) {
394  // found some candidates... filtering by cvmfs catalog structure
396  "Swissknife Sync: Found %lu entries for pathspec (%s)",
397  glob_res.gl_pathc, glob_string.c_str());
398  FilterCandidatesFromGlobResult(dirtab, glob_res.gl_pathv,
399  glob_res.gl_pathc, catalog_manager,
400  nested_catalog_candidates);
401  } else if (glob_retval == GLOB_NOMATCH) {
403  "Swissknife Sync: WARNING: cannot apply pathspec %s",
404  glob_string.c_str());
405  } else {
407  "Swissknife Sync: Failed to run glob matching (%s)",
408  glob_string.c_str());
409  }
410 
411  globfree(&glob_res);
412  }
413 }
414 
416  const catalog::Dirtab &dirtab, char **paths, const size_t npaths,
417  catalog::SimpleCatalogManager *catalog_manager,
418  std::vector<std::string> *nested_catalog_candidates) {
419  // go through the paths produced by glob() and filter them
420  for (size_t i = 0; i < npaths; ++i) {
421  // process candidate paths
422  const std::string candidate(paths[i]);
423  const std::string candidate_rel = candidate.substr(union_dir_.size());
424 
425  // check if path points to a directory
426  platform_stat64 candidate_info;
427  const int lstat_retval = platform_lstat(candidate.c_str(), &candidate_info);
428  if (lstat_retval != 0) {
430  "Swissknife Sync: "
431  "Error in processing .cvmfsdirtab: cannot access %s (%d)",
432  candidate.c_str(), errno);
433  abort();
434  }
435  assert(lstat_retval == 0);
436  if (!S_ISDIR(candidate_info.st_mode)) {
437  // The GLOB_ONLYDIR flag is only a hint, non-directories can still be
438  // returned
440  "Swissknife Sync: "
441  "The '%s' dirtab entry does not point to a directory "
442  "but to a file or a symbolic link",
443  candidate_rel.c_str());
444  continue;
445  }
446 
447  // check if the path is a meta-directory (. or ..)
448  assert(candidate_rel.size() >= 2);
449  if (candidate_rel.substr(candidate_rel.size() - 2) == "/."
450  || candidate_rel.substr(candidate_rel.size() - 3) == "/..") {
451  continue;
452  }
453 
454  // check that the path isn't excluded in the dirtab
455  if (dirtab.IsOpposing(candidate_rel)) {
457  "Swissknife Sync: Candidate '%s' is excluded by dirtab",
458  candidate_rel.c_str());
459  continue;
460  }
461 
462  // lookup the path in the catalog structure to find out if it already
463  // points to a nested catalog transition point. Furthermore it could be
464  // a new directory and thus not in any catalog yet.
466  const bool lookup_success = catalog_manager->LookupPath(
467  candidate_rel, catalog::kLookupDefault, &dirent);
468  if (!lookup_success) {
470  "Swissknife Sync: Didn't find '%s' in catalogs, could "
471  "be a new directory and nested catalog.",
472  candidate_rel.c_str());
473  nested_catalog_candidates->push_back(candidate);
474  } else if (!dirent.IsNestedCatalogMountpoint()
475  && !dirent.IsNestedCatalogRoot()) {
477  "Swissknife Sync: Found '%s' in catalogs but is not a "
478  "nested catalog yet.",
479  candidate_rel.c_str());
480  nested_catalog_candidates->push_back(candidate);
481  } else {
482  // check if the nested catalog marker is still there, we might need to
483  // recreate the catalog after manual marker removal
484  // Note: First we check if the parent directory shows up in the scratch
485  // space to verify that it was touched (copy-on-write)
486  // Otherwise we would force the cvmfs client behind the union
487  // file-
488  // system to (potentially) unnecessarily fetch catalogs
489  if (DirectoryExists(scratch_dir_ + candidate_rel)
490  && !FileExists(union_dir_ + candidate_rel + "/.cvmfscatalog")) {
492  "Swissknife Sync: WARNING: '%s' should be a nested "
493  "catalog according to the dirtab. "
494  "Recreating...",
495  candidate_rel.c_str());
496  nested_catalog_candidates->push_back(candidate);
497  } else {
499  "Swissknife Sync: "
500  "Found '%s' in catalogs and it already is a nested catalog.",
501  candidate_rel.c_str());
502  }
503  }
504  }
505 }
506 
508  const std::vector<std::string> &new_nested_catalogs) {
509  // go through the new nested catalog paths and create .cvmfscatalog markers
510  // where necessary
511  bool success = true;
512  std::vector<std::string>::const_iterator k = new_nested_catalogs.begin();
513  const std::vector<std::string>::const_iterator kend = new_nested_catalogs
514  .end();
515  for (; k != kend; ++k) {
516  assert(!k->empty() && k->size() > union_dir_.size());
517 
518  // was the marker already created by hand?
519  const std::string marker_path = *k + "/.cvmfscatalog";
520  if (FileExists(marker_path)) {
521  continue;
522  }
523 
524  // create a nested catalog marker
525  const mode_t mode = kDefaultFileMode;
526  const int fd = open(marker_path.c_str(), O_CREAT, mode);
527  if (fd < 0) {
529  "Swissknife Sync: Failed to create nested catalog marker "
530  "at '%s' (errno: %d)",
531  marker_path.c_str(), errno);
532  success = false;
533  continue;
534  }
535  close(fd);
536 
537  // inform the user if requested
538  if (verbose_) {
540  "Swissknife Sync: Auto-creating nested catalog in %s",
541  k->c_str());
542  }
543  }
544 
545  return success;
546 }
547 
548 struct chunk_arg {
549  chunk_arg(char param, size_t *save_to) : param(param), save_to(save_to) { }
550  char param;
551  size_t *save_to;
552 };
553 
555  const swissknife::ArgumentList &args, SyncParameters *params) {
556  typedef std::vector<chunk_arg> ChunkArgs;
557 
558  // define where to store the value of which file chunk argument
559  ChunkArgs chunk_args;
560  chunk_args.push_back(chunk_arg('a', &params->avg_file_chunk_size));
561  chunk_args.push_back(chunk_arg('l', &params->min_file_chunk_size));
562  chunk_args.push_back(chunk_arg('h', &params->max_file_chunk_size));
563 
564  // read the arguments
565  ChunkArgs::const_iterator i = chunk_args.begin();
566  const ChunkArgs::const_iterator iend = chunk_args.end();
567  for (; i != iend; ++i) {
568  const swissknife::ArgumentList::const_iterator arg = args.find(i->param);
569 
570  if (arg != args.end()) {
571  const size_t arg_value = static_cast<size_t>(String2Uint64(*arg->second));
572  if (arg_value > 0) {
573  *i->save_to = arg_value;
574  } else {
575  return false;
576  }
577  }
578  }
579 
580  // check if argument values are sane
581  return true;
582 }
583 
585  const string start_time = GetGMTimestamp();
586 
587  // Spawn monitoring process (watchdog)
588  const std::string watchdog_dir = "/tmp";
589  char watchdog_path[PATH_MAX];
590  const std::string timestamp = GetGMTimestamp("%Y.%m.%d-%H.%M.%S");
591  const int path_size =
592  snprintf(watchdog_path, sizeof(watchdog_path),
593  "%s/cvmfs-swissknife-sync-stacktrace.%s.%d",
594  watchdog_dir.c_str(), timestamp.c_str(), getpid());
595  assert(path_size > 0);
596  assert(path_size < PATH_MAX);
597  const UniquePtr<Watchdog> watchdog(Watchdog::Create(NULL));
598  watchdog->Spawn(std::string(watchdog_path));
599 
600  SyncParameters params;
601 
602  // Initialization
603  params.dir_union = MakeCanonicalPath(*args.find('u')->second);
604  params.dir_scratch = MakeCanonicalPath(*args.find('s')->second);
605  params.dir_rdonly = MakeCanonicalPath(*args.find('c')->second);
606  params.dir_temp = MakeCanonicalPath(*args.find('t')->second);
607  params.base_hash = shash::MkFromHexPtr(shash::HexPtr(*args.find('b')->second),
609  params.stratum0 = *args.find('w')->second;
610  params.manifest_path = *args.find('o')->second;
611  params.spooler_definition = *args.find('r')->second;
612 
613  params.public_keys = *args.find('K')->second;
614  params.repo_name = *args.find('N')->second;
615 
617 
618  if (args.find('f') != args.end())
619  params.union_fs_type = *args.find('f')->second;
620  if (args.find('A') != args.end())
621  params.is_balanced = true;
622  if (args.find('x') != args.end())
623  params.print_changeset = true;
624  if (args.find('y') != args.end())
625  params.dry_run = true;
626  if (args.find('m') != args.end())
627  params.mucatalogs = true;
628  if (args.find('i') != args.end())
629  params.ignore_xdir_hardlinks = true;
630  if (args.find('d') != args.end())
631  params.stop_for_catalog_tweaks = true;
632  if (args.find('V') != args.end())
633  params.voms_authz = true;
634  if (args.find('F') != args.end())
635  params.authz_file = *args.find('F')->second;
636  if (args.find('k') != args.end())
637  params.include_xattrs = true;
638  if (args.find('j') != args.end())
639  params.enable_mtime_ns = true;
640  if (args.find('Y') != args.end())
641  params.external_data = true;
642  if (args.find('W') != args.end())
643  params.direct_io = true;
644  if (args.find('S') != args.end()) {
645  const bool retval = catalog::VirtualCatalog::ParseActions(
646  *args.find('S')->second, &params.virtual_dir_actions);
647  if (!retval) {
649  "Swissknife Sync: Invalid virtual catalog options: %s",
650  args.find('S')->second->c_str());
651  return 1;
652  }
653  }
654  if (args.find('z') != args.end()) {
655  const unsigned log_level =
656  1 << (kLogLevel0 + String2Uint64(*args.find('z')->second));
657  if (log_level > kLogNone) {
658  LogCvmfs(kLogCvmfs, kLogStderr, "Swissknife Sync: invalid log level");
659  return 1;
660  }
661  SetLogVerbosity(static_cast<LogLevels>(log_level));
662  }
663 
664  if (args.find('X') != args.end())
665  params.max_weight = String2Uint64(*args.find('X')->second);
666  if (args.find('M') != args.end())
667  params.min_weight = String2Uint64(*args.find('M')->second);
668 
669  if (args.find('p') != args.end()) {
670  params.use_file_chunking = true;
671  if (!ReadFileChunkingArgs(args, &params)) {
672  PrintError("Swissknife Sync: Failed to read file chunk size values");
673  return 2;
674  }
675  }
676  if (args.find('O') != args.end()) {
677  params.generate_legacy_bulk_chunks = true;
678  }
679  shash::Algorithms hash_algorithm = shash::kSha1;
680  if (args.find('e') != args.end()) {
681  hash_algorithm = shash::ParseHashAlgorithm(*args.find('e')->second);
682  if (hash_algorithm == shash::kAny) {
683  PrintError("Swissknife Sync: Unknown hash algorithm");
684  return 1;
685  }
686  }
687  if (args.find('Z') != args.end()) {
689  *args.find('Z')->second);
690  }
691 
692  if (args.find('E') != args.end())
693  params.enforce_limits = true;
694  if (args.find('Q') != args.end()) {
695  params.nested_kcatalog_limit = String2Uint64(*args.find('Q')->second);
696  } else {
698  }
699  if (args.find('R') != args.end()) {
700  params.root_kcatalog_limit = String2Uint64(*args.find('R')->second);
701  } else {
703  }
704  if (args.find('U') != args.end()) {
705  params.file_mbyte_limit = String2Uint64(*args.find('U')->second);
706  } else {
708  }
709 
710  if (args.find('v') != args.end()) {
711  const sanitizer::IntegerSanitizer sanitizer;
712  if (!sanitizer.IsValid(*args.find('v')->second)) {
713  PrintError("Swissknife Sync: Invalid revision number");
714  return 1;
715  }
716  params.manual_revision = String2Uint64(*args.find('v')->second);
717  }
718 
719  params.branched_catalog = args.find('B') != args.end();
720 
721  if (args.find('q') != args.end()) {
722  params.max_concurrent_write_jobs = String2Uint64(*args.find('q')->second);
723  }
724 
725  if (args.find('0') != args.end()) {
726  params.num_upload_tasks = String2Uint64(*args.find('0')->second);
727  }
728 
729  if (args.find('T') != args.end()) {
730  params.ttl_seconds = String2Uint64(*args.find('T')->second);
731  }
732 
733  if (args.find('g') != args.end()) {
734  params.ignore_special_files = true;
735  }
736 
737  if (args.find('P') != args.end()) {
738  params.session_token_file = *args.find('P')->second;
739  }
740 
741  if (args.find('H') != args.end()) {
742  params.key_file = *args.find('H')->second;
743  }
744 
745  if (args.find('D') != args.end()) {
746  params.repo_tag.SetName(*args.find('D')->second);
747  }
748 
749  if (args.find('J') != args.end()) {
750  params.repo_tag.SetDescription(*args.find('J')->second);
751  }
752 
753  if (args.find('G') != args.end()) {
754  params.cache_dir = "/var/spool/cvmfs/" + params.repo_name + "/cache.server";
755  }
756 
757  const bool upload_statsdb = (args.count('I') > 0);
758 
759  if (!CheckParams(params))
760  return 2;
761  // This may fail, in which case a warning is printed and the process continues
763 
764  perf::StatisticsTemplate publish_statistics("publish", this->statistics());
765 
766  // Start spooler
767  upload::SpoolerDefinition spooler_definition(
768  params.spooler_definition, hash_algorithm, params.compression_alg,
771  params.max_file_chunk_size, params.session_token_file, params.key_file);
772  if (params.max_concurrent_write_jobs > 0) {
773  spooler_definition
775  }
776  spooler_definition.num_upload_tasks = params.num_upload_tasks;
777 
778  const upload::SpoolerDefinition spooler_definition_catalogs(
779  spooler_definition.Dup2DefaultCompression());
780 
781  params.spooler = upload::Spooler::Construct(spooler_definition,
782  &publish_statistics);
783  if (NULL == params.spooler)
784  return 3;
785  const UniquePtr<upload::Spooler> spooler_catalogs(upload::Spooler::Construct(
786  spooler_definition_catalogs, &publish_statistics));
787  if (!spooler_catalogs.IsValid())
788  return 3;
789 
790  const bool follow_redirects = (args.count('L') > 0);
791  const string proxy = (args.count('@') > 0) ? *args.find('@')->second : "";
792  if (!InitDownloadManager(follow_redirects, proxy)) {
793  return 3;
794  }
795 
796  if (!InitSignatureManager(params.public_keys)) {
797  return 3;
798  }
799 
800  /*
801  * Note: If the upstream is of type gateway, due to the possibility of
802  * concurrent release managers, it's possible to have a different local and
803  * remote root hashes. We proceed by loading the remote manifest but we give
804  * an empty base hash.
805  */
807  if (params.branched_catalog) {
808  // Throw-away manifest
809  manifest = new manifest::Manifest(shash::Any(), 0, "");
810  } else if (params.virtual_dir_actions
812  manifest = this->OpenLocalManifest(params.manifest_path);
813  params.base_hash = manifest->catalog_hash();
814  } else {
815  // TODO(jblomer): revert to params.base_hash if spooler driver type is not
816  // upload::SpoolerDefinition::Gateway
817  manifest = FetchRemoteManifest(params.stratum0, params.repo_name,
818  shash::Any());
819  }
820  if (!manifest.IsValid()) {
821  return 3;
822  }
823 
825  params.repo_name);
826 
827  const std::string old_root_hash = manifest->catalog_hash().ToString(true);
828 
829  catalog::WritableCatalogManager catalog_manager(
830  params.base_hash, params.stratum0, params.dir_temp,
831  spooler_catalogs.weak_ref(), download_manager(), params.enforce_limits,
833  params.file_mbyte_limit, statistics(), params.is_balanced,
834  params.max_weight, params.min_weight, params.cache_dir);
835  catalog_manager.Init();
836 
837  publish::SyncMediator mediator(&catalog_manager, &params, publish_statistics);
838  LogCvmfs(kLogPublish, kLogStdout, "Swissknife Sync: Processing changes...");
839 
840  // Should be before the synchronization starts to avoid race of GetTTL with
841  // other sqlite operations
842  if ((params.ttl_seconds > 0)
843  && ((params.ttl_seconds != catalog_manager.GetTTL())
844  || !catalog_manager.HasExplicitTTL())) {
846  "Swissknife Sync: Setting repository TTL to %" PRIu64 "s",
847  params.ttl_seconds);
848  catalog_manager.SetTTL(params.ttl_seconds);
849  }
850 
851  // Either real catalogs or virtual catalog
853  publish::SyncUnion *sync;
854  if (params.union_fs_type == "overlayfs") {
855  sync = new publish::SyncUnionOverlayfs(
856  &mediator, params.dir_rdonly, params.dir_union, params.dir_scratch);
857  } else if (params.union_fs_type == "aufs") {
858  sync = new publish::SyncUnionAufs(&mediator, params.dir_rdonly,
859  params.dir_union, params.dir_scratch);
860  } else {
862  "Swissknife Sync: unknown union file system: %s",
863  params.union_fs_type.c_str());
864  return 3;
865  }
866 
867  if (!sync->Initialize()) {
869  "Swissknife Sync: Initialization of the synchronisation "
870  "engine failed");
871  return 4;
872  }
873 
874  sync->Traverse();
875  } else {
876  assert(!manifest->history().IsNull());
877  catalog::VirtualCatalog virtual_catalog(
878  manifest.weak_ref(), download_manager(), &catalog_manager, &params);
879  virtual_catalog.Generate(params.virtual_dir_actions);
880  }
881 
882  if (!params.authz_file.empty()) {
884  "Swissknife Sync: Adding contents of authz file %s to"
885  " root catalog.",
886  params.authz_file.c_str());
887  const int fd = open(params.authz_file.c_str(), O_RDONLY);
888  if (fd == -1) {
890  "Swissknife Sync: Unable to open authz file (%s)"
891  "from the publication process: %s",
892  params.authz_file.c_str(), strerror(errno));
893  return 7;
894  }
895 
896  std::string new_authz;
897  const bool read_successful = SafeReadToString(fd, &new_authz);
898  close(fd);
899 
900  if (!read_successful) {
902  "Swissknife Sync: Failed to read authz file (%s): %s",
903  params.authz_file.c_str(), strerror(errno));
904  return 8;
905  }
906 
907  catalog_manager.SetVOMSAuthz(new_authz);
908  }
909 
910  if (!mediator.Commit(manifest.weak_ref())) {
911  PrintError("Swissknife Sync: Something went wrong during sync");
912  if (!params.dry_run) {
913  stats_db->StorePublishStatistics(this->statistics(), start_time, false);
914  if (upload_statsdb) {
915  stats_db->UploadStatistics(params.spooler);
916  }
917  }
918  return 5;
919  }
920 
921  perf::Counter *revision_counter = statistics()->Register(
922  "publish.revision", "Published revision number");
923  revision_counter->Set(
924  static_cast<int64_t>(catalog_manager.GetRootCatalog()->revision()));
925 
926  // finalize the spooler
928  "Swissknife Sync: Wait for all uploads to finish");
929  params.spooler->WaitForUpload();
930  spooler_catalogs->WaitForUpload();
931  params.spooler->FinalizeSession(false);
932 
934  "Swissknife Sync: Exporting repository manifest");
935 
936  // We call FinalizeSession(true) this time, to also trigger the commit
937  // operation on the gateway machine (if the upstream is of type "gw").
938 
939  // Get the path of the new root catalog
940  const std::string new_root_hash = manifest->catalog_hash().ToString(true);
941 
942  if (!spooler_catalogs->FinalizeSession(true, old_root_hash, new_root_hash,
943  params.repo_tag)) {
944  PrintError("Swissknife Sync: Failed to commit transaction.");
945  if (!params.dry_run) {
946  stats_db->StorePublishStatistics(this->statistics(), start_time, false);
947  if (upload_statsdb) {
948  stats_db->UploadStatistics(params.spooler);
949  }
950  }
951  return 9;
952  }
953 
954  if (!params.dry_run) {
955  stats_db->StorePublishStatistics(this->statistics(), start_time, true);
956  if (upload_statsdb) {
957  stats_db->UploadStatistics(params.spooler);
958  }
959  }
960 
961  delete params.spooler;
962 
963  if (!manifest->Export(params.manifest_path)) {
964  PrintError("Swissknife Sync: Failed to create new repository");
965  return 6;
966  }
967 
968  return 0;
969 }
bool Commit(manifest::Manifest *manifest)
void SetLogVerbosity(const LogLevels max_level)
Definition: logging.cc:258
std::string database_file() const
Definition: reflog.cc:322
Algorithms ParseCompressionAlgorithm(const std::string &algorithm_option)
Definition: compression.cc:153
size_t avg_file_chunk_size
const manifest::Manifest * manifest() const
Definition: repository.h:125
struct stat64 platform_stat64
std::string stratum0
int Main(const ArgumentList &args)
size_t * save_to
std::string cache_dir
std::string GetGMTimestamp(const std::string &format)
Definition: string.cc:654
bool IsValid() const
Definition: dirtab.h:127
bool UploadStatistics(upload::Spooler *spooler, std::string local_path="")
T * weak_ref() const
Definition: pointer.h:46
SpoolerDefinition Dup2DefaultCompression() const
std::vector< Rule > Rules
Definition: dirtab.h:68
bool generate_legacy_bulk_chunks
unsigned file_mbyte_limit
uint64_t max_concurrent_write_jobs
const int kDefaultFileMode
Definition: posix.h:32
static const unsigned kDefaultFileMbyteLimit
CVMFS_EXPORT const LogSource source
Definition: exception.h:33
virtual bool Initialize()
Definition: sync_union.cc:24
static const unsigned kDefaultNestedKcatalogLimit
static const unsigned kDefaultRootKcatalogLimit
int Main(const ArgumentList &args)
static bool ParseActions(const std::string &action_desc, int *actions)
size_t min_file_chunk_size
unsigned num_upload_tasks
std::string spooler_definition
std::string manifest_path
bool ReadFileChunkingArgs(const swissknife::ArgumentList &args, SyncParameters *params)
void SetName(const std::string &name)
const Rules & positive_rules() const
Definition: dirtab.h:121
bool CheckParams(const SyncParameters &p)
zlib::Algorithms compression_alg
unsigned nested_kcatalog_limit
int GlobLstat(const char *name, struct stat *st)
std::string repo_name
assert((mem||(size==0))&&"Out Of Memory")
bool LookupPath(const PathString &path, const LookupOptions options, DirectoryEntry *entry)
void SetDescription(const std::string &description)
std::string union_fs_type
const unsigned kLookupDefault
Definition: catalog_mgr.h:43
std::string dir_scratch
int Main(const ArgumentList &args)
bool IsNestedCatalogMountpoint() const
virtual bool IsOpposing(const std::string &path) const
Definition: dirtab.cc:161
chunk_arg(char param, size_t *save_to)
bool IsValid(const std::string &input) const
Definition: sanitizer.cc:112
Algorithms
Definition: hash.h:41
std::string key_file
bool IsNestedCatalogRoot() const
bool FileExists(const std::string &path)
Definition: posix.cc:803
std::string dir_temp
int Main(const ArgumentList &args)
static Watchdog * Create(FnOnCrash on_crash)
Definition: monitor.cc:67
static manifest::Manifest * CreateRepository(const std::string &dir_temp, const bool volatile_content, const std::string &voms_authz, upload::Spooler *spooler)
void Set(const int64_t val)
Definition: statistics.h:33
void FilterCandidatesFromGlobResult(const catalog::Dirtab &dirtab, char **paths, const size_t npaths, catalog::SimpleCatalogManager *catalog_manager, std::vector< std::string > *nested_catalog_candidates)
bool CheckParams(const swissknife::CommandLease::Parameters &p)
std::string dir_rdonly
const char kSuffixCatalog
Definition: hash.h:54
shash::Any base_hash
upload::Spooler * spooler
int platform_lstat(const char *path, platform_stat64 *buf)
static Dirtab * Create(const std::string &dirtab_path)
Definition: dirtab.h:84
static const int kActionNone
int Main(const ArgumentList &args)
std::string dir_union
static void HashDatabase(const std::string &database_path, shash::Any *hash_reflog)
Definition: reflog.cc:309
int Main(const ArgumentList &args)
bool IsValid() const
Definition: pointer.h:47
struct dirent * GlobReaddir(void *dirp)
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:279
void DropDatabaseFileOwnership()
Definition: reflog.cc:300
static StatisticsDatabase * OpenStandardDB(const std::string repo_name)
bool DirectoryExists(const std::string &path)
Definition: posix.cc:824
bool stop_for_catalog_tweaks
bool SafeReadToString(int fd, std::string *final_result)
Definition: posix.cc:2117
unsigned root_kcatalog_limit
size_t max_file_chunk_size
bool StorePublishStatistics(const perf::Statistics *statistics, const std::string &start_time, const bool success)
uint64_t manual_revision
int GlobStat(const char *name, struct stat *st)
std::string session_token_file
bool ObtainDacReadSearchCapability()
virtual void Traverse()=0
uint64_t String2Uint64(const string &value)
Definition: string.cc:240
std::map< char, SharedPtr< std::string > > ArgumentList
Definition: swissknife.h:72
size_t RuleCount() const
Definition: dirtab.h:124
Algorithms ParseHashAlgorithm(const string &algorithm_option)
Definition: hash.cc:71
uint64_t ttl_seconds
std::string authz_file
Any MkFromHexPtr(const HexPtr hex, const char suffix)
Definition: hash.cc:82
void DetermineNestedCatalogCandidates(const catalog::Dirtab &dirtab, catalog::SimpleCatalogManager *catalog_manager, std::vector< std::string > *nested_catalog_candidates)
RepositoryTag repo_tag
const int kLogVerboseMsg
void Generate(int actions)
void Spawn(const std::string &crash_dump_path)
Definition: monitor.cc:514
static bool WriteChecksum(const std::string &path, const shash::Any &value)
Definition: reflog.cc:65
bool ignore_xdir_hardlinks
std::string MakeCanonicalPath(const std::string &path)
Definition: posix.cc:98
void PrintError(const string &message)
Definition: logging.cc:556
const upload::Spooler * spooler_catalogs() const
Definition: repository.h:321
static const uint64_t kDefaultTTL
Definition: catalog.h:104
bool CreateCatalogMarkers(const std::vector< std::string > &new_nested_catalogs)
std::string public_keys
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545