CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
swissknife_sync.cc
Go to the documentation of this file.
1 
19 // NOLINTNEXTLINE
20 #define _FILE_OFFSET_BITS 64
21 // NOLINTNEXTLINE
22 #define __STDC_FORMAT_MACROS
23 
24 #include "swissknife_sync.h"
25 
26 
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <glob.h>
30 #include <inttypes.h>
31 #include <limits.h>
32 #include <sys/capability.h>
33 
34 #include <cstdio>
35 #include <cstdlib>
36 #include <string>
37 #include <vector>
38 
39 #include "catalog_mgr_ro.h"
40 #include "catalog_mgr_rw.h"
41 #include "catalog_virtual.h"
42 #include "manifest.h"
43 #include "monitor.h"
44 #include "network/download.h"
45 #include "path_filters/dirtab.h"
46 #include "reflog.h"
47 #include "sanitizer.h"
48 #include "statistics.h"
49 #include "statistics_database.h"
51 #include "sync_mediator.h"
52 #include "sync_union.h"
53 #include "sync_union_aufs.h"
54 #include "sync_union_overlayfs.h"
55 #include "util/logging.h"
56 #include "util/platform.h"
57 #include "util/string.h"
58 
59 using namespace std; // NOLINT
60 
62  if (!DirectoryExists(p.dir_scratch)) {
63  PrintError("overlay (copy on write) directory does not exist");
64  return false;
65  }
66  if (!DirectoryExists(p.dir_union)) {
67  PrintError("union volume does not exist");
68  return false;
69  }
70  if (!DirectoryExists(p.dir_rdonly)) {
71  PrintError("cvmfs read/only repository does not exist");
72  return false;
73  }
74  if (p.stratum0 == "") {
75  PrintError("Stratum0 url missing");
76  return false;
77  }
78 
79  if (p.manifest_path == "") {
80  PrintError("manifest output required");
81  return false;
82  }
83  if (!DirectoryExists(p.dir_temp)) {
84  PrintError("data store directory does not exist");
85  return false;
86  }
87 
90  PrintError("file chunk size values are not sane");
91  return false;
92  }
93 
94  if (HasPrefix(p.spooler_definition, "gw", false)) {
95  if (p.session_token_file.empty()) {
96  PrintError(
97  "Session token file has to be provided "
98  "when upstream type is gw.");
99  return false;
100  }
101  }
102 
103  return true;
104 }
105 
107  const string manifest_path = *args.find('o')->second;
108  const string dir_temp = *args.find('t')->second;
109  const string spooler_definition = *args.find('r')->second;
110  const string repo_name = *args.find('n')->second;
111  const string reflog_chksum_path = *args.find('R')->second;
112  if (args.find('l') != args.end()) {
113  unsigned log_level =
114  kLogLevel0 << String2Uint64(*args.find('l')->second);
115  if (log_level > kLogNone) {
116  LogCvmfs(kLogCvmfs, kLogStderr, "invalid log level");
117  return 1;
118  }
119  SetLogVerbosity(static_cast<LogLevels>(log_level));
120  }
121  shash::Algorithms hash_algorithm = shash::kSha1;
122  if (args.find('a') != args.end()) {
123  hash_algorithm = shash::ParseHashAlgorithm(*args.find('a')->second);
124  if (hash_algorithm == shash::kAny) {
125  PrintError("unknown hash algorithm");
126  return 1;
127  }
128  }
129 
130  const bool volatile_content = (args.count('v') > 0);
131  const bool garbage_collectable = (args.count('z') > 0);
132  std::string voms_authz;
133  if (args.find('V') != args.end()) {
134  voms_authz = *args.find('V')->second;
135  }
136 
137  const upload::SpoolerDefinition sd(spooler_definition, hash_algorithm,
139  UniquePtr<upload::Spooler> spooler(upload::Spooler::Construct(sd));
140  assert(spooler.IsValid());
141 
144  dir_temp, volatile_content, voms_authz, spooler.weak_ref()));
145  if (!manifest.IsValid()) {
146  PrintError("Swissknife Sync: Failed to create new repository");
147  return 1;
148  }
149 
150  UniquePtr<manifest::Reflog> reflog(CreateEmptyReflog(dir_temp, repo_name));
151  if (!reflog.IsValid()) {
152  PrintError("Swissknife Sync: Failed to create fresh Reflog");
153  return 1;
154  }
155 
156  reflog->DropDatabaseFileOwnership();
157  string reflog_path = reflog->database_file();
158  reflog.Destroy();
159  shash::Any reflog_hash(hash_algorithm);
160  manifest::Reflog::HashDatabase(reflog_path, &reflog_hash);
161  spooler->UploadReflog(reflog_path);
162  spooler->WaitForUpload();
163  unlink(reflog_path.c_str());
164  if (spooler->GetNumberOfErrors()) {
165  LogCvmfs(kLogCvmfs, kLogStderr, "Swissknife Sync: Failed to upload reflog");
166  return 4;
167  }
168  assert(!reflog_chksum_path.empty());
169  manifest::Reflog::WriteChecksum(reflog_chksum_path, reflog_hash);
170 
171  // set optional manifest fields
172  const bool needs_bootstrap_shortcuts = !voms_authz.empty();
173  manifest->set_garbage_collectability(garbage_collectable);
174  manifest->set_has_alt_catalog_path(needs_bootstrap_shortcuts);
175 
176  if (!manifest->Export(manifest_path)) {
177  PrintError("Swissknife Sync: Failed to create new repository");
178  return 5;
179  }
180 
181  return 0;
182 }
183 
185  const string source = *args.find('i')->second;
186  const string dest = *args.find('o')->second;
187  const string spooler_definition = *args.find('r')->second;
188  shash::Algorithms hash_algorithm = shash::kSha1;
189  if (args.find('a') != args.end()) {
190  hash_algorithm = shash::ParseHashAlgorithm(*args.find('a')->second);
191  if (hash_algorithm == shash::kAny) {
192  PrintError("Swissknife Sync: Unknown hash algorithm");
193  return 1;
194  }
195  }
196 
197  const upload::SpoolerDefinition sd(spooler_definition, hash_algorithm);
198  upload::Spooler *spooler = upload::Spooler::Construct(sd);
199  assert(spooler);
200  spooler->Upload(source, dest);
201  spooler->WaitForUpload();
202 
203  if (spooler->GetNumberOfErrors() > 0) {
204  LogCvmfs(kLogCatalog, kLogStderr, "Swissknife Sync: failed to upload %s",
205  source.c_str());
206  return 1;
207  }
208 
209  delete spooler;
210 
211  return 0;
212 }
213 
215  const string file_to_peek = *args.find('d')->second;
216  const string spooler_definition = *args.find('r')->second;
217 
218  // Hash doesn't matter
219  const upload::SpoolerDefinition sd(spooler_definition, shash::kAny);
220  upload::Spooler *spooler = upload::Spooler::Construct(sd);
221  assert(spooler);
222  const bool success = spooler->Peek(file_to_peek);
223 
224  if (spooler->GetNumberOfErrors() > 0) {
225  LogCvmfs(kLogCatalog, kLogStderr, "Swissknife Sync: failed to peek for %s",
226  file_to_peek.c_str());
227  return 2;
228  }
229  if (!success) {
230  LogCvmfs(kLogCatalog, kLogStdout, "Swissknife Sync: %s not found",
231  file_to_peek.c_str());
232  return 1;
233  }
234  LogCvmfs(kLogCatalog, kLogStdout, "Swissknife Sync: %s available",
235  file_to_peek.c_str());
236 
237  delete spooler;
238 
239  return 0;
240 }
241 
243  const string file_to_delete = *args.find('o')->second;
244  const string spooler_definition = *args.find('r')->second;
245 
246  // Hash doesn't matter
247  const upload::SpoolerDefinition sd(spooler_definition, shash::kAny);
248  upload::Spooler *spooler = upload::Spooler::Construct(sd);
249  assert(spooler);
250  spooler->RemoveAsync(file_to_delete);
251  spooler->WaitForUpload();
252 
253  if (spooler->GetNumberOfErrors() > 0) {
254  LogCvmfs(kLogCatalog, kLogStderr, "Swissknife Sync: failed to delete %s",
255  file_to_delete.c_str());
256  return 1;
257  }
258 
259  delete spooler;
260 
261  return 0;
262 }
263 
265  const string dirtab_file = *args.find('d')->second;
266  union_dir_ = MakeCanonicalPath(*args.find('u')->second);
267  scratch_dir_ = MakeCanonicalPath(*args.find('s')->second);
268  const shash::Any base_hash = shash::MkFromHexPtr(
269  shash::HexPtr(*args.find('b')->second), shash::kSuffixCatalog);
270  const string stratum0 = *args.find('w')->second;
271  const string dir_temp = *args.find('t')->second;
272  verbose_ = (args.find('x') != args.end());
273 
274  // check if there is a dirtab file
275  if (!FileExists(dirtab_file)) {
277  "Swissknife Sync: Didn't find a dirtab at '%s'. Skipping...",
278  dirtab_file.c_str());
279  return 0;
280  }
281 
282  // parse dirtab file
283  catalog::Dirtab *dirtab = catalog::Dirtab::Create(dirtab_file);
284  if (!dirtab->IsValid()) {
286  "Swissknife Sync: Invalid or not readable dirtab '%s'",
287  dirtab_file.c_str());
288  return 1;
289  }
291  "Swissknife Sync: Found %lu rules in dirtab '%s'",
292  dirtab->RuleCount(), dirtab_file.c_str());
293 
294  // initialize catalog infrastructure
295  const bool auto_manage_catalog_files = true;
296  const bool follow_redirects = (args.count('L') > 0);
297  const string proxy = (args.count('@') > 0) ? *args.find('@')->second : "";
298  if (!InitDownloadManager(follow_redirects, proxy)) {
299  return 1;
300  }
301  catalog::SimpleCatalogManager catalog_manager(
302  base_hash, stratum0, dir_temp, download_manager(), statistics(),
303  auto_manage_catalog_files);
304  catalog_manager.Init();
305 
306  vector<string> new_nested_catalogs;
307  DetermineNestedCatalogCandidates(*dirtab, &catalog_manager,
308  &new_nested_catalogs);
309  const bool success = CreateCatalogMarkers(new_nested_catalogs);
310  delete dirtab;
311 
312  return (success) ? 0 : 1;
313 }
314 
315 
316 namespace {
317 
318 // Overwrite directory traversal in the globbing in order to avoid breaking out
319 // the repository tree
320 
321 std::string *g_glob_uniondir = NULL;
322 
323 bool GlobCheckPath(const char *name) {
324  char resolved_cstr[PATH_MAX];
325  char *retval = realpath(name, resolved_cstr);
326  if (retval == NULL) return false;
327 
328  std::string resolved(resolved_cstr);
329  if (resolved == *g_glob_uniondir) return true;
330  if (!HasPrefix(resolved, (*g_glob_uniondir) + "/", false /*ignore_case*/)) {
331  errno = EACCES;
332  return false;
333  }
334  return true;
335 }
336 
337 void *GlobOpendir(const char *name) {
338  if (!GlobCheckPath(name)) return NULL;
339  return opendir(name);
340 }
341 
342 void GlobClosedir(void *dirp) {
343  closedir(static_cast<DIR *>(dirp));
344 }
345 
346 struct dirent *GlobReaddir(void *dirp) {
347  return readdir(static_cast<DIR *>(dirp));
348 }
349 
350 int GlobLstat(const char *name, struct stat *st) {
351  if (!GlobCheckPath(name)) return -1;
352  return lstat(name, st);
353 }
354 
355 int GlobStat(const char *name, struct stat *st) {
356  if (!GlobCheckPath(name)) return -1;
357  return stat(name, st);
358 }
359 
360 
361 } // anonymous namespace
362 
364  const catalog::Dirtab &dirtab,
365  catalog::SimpleCatalogManager *catalog_manager,
366  vector<string> *nested_catalog_candidates) {
367  // find possible new nested catalog locations
368  const catalog::Dirtab::Rules &lookup_rules = dirtab.positive_rules();
369  catalog::Dirtab::Rules::const_iterator i = lookup_rules.begin();
370  const catalog::Dirtab::Rules::const_iterator iend = lookup_rules.end();
371  for (; i != iend; ++i) {
372  assert(!i->is_negation);
373 
374  // run a glob using the current dirtab rule on the current repository
375  // state
376  const std::string &glob_string = i->pathspec.GetGlobString();
377  const std::string &glob_string_abs = union_dir_ + glob_string;
378  const int glob_flags = GLOB_ONLYDIR | GLOB_NOSORT | GLOB_PERIOD |
379  GLOB_ALTDIRFUNC;
380  glob_t glob_res;
381  g_glob_uniondir = new std::string(union_dir_);
382  glob_res.gl_opendir = GlobOpendir;
383  glob_res.gl_readdir = GlobReaddir;
384  glob_res.gl_closedir = GlobClosedir;
385  glob_res.gl_lstat = GlobLstat;
386  glob_res.gl_stat = GlobStat;
387  const int glob_retval =
388  glob(glob_string_abs.c_str(), glob_flags, NULL, &glob_res);
389  delete g_glob_uniondir;
390  g_glob_uniondir = NULL;
391 
392  if (glob_retval == 0) {
393  // found some candidates... filtering by cvmfs catalog structure
395  "Swissknife Sync: Found %lu entries for pathspec (%s)",
396  glob_res.gl_pathc, glob_string.c_str());
397  FilterCandidatesFromGlobResult(dirtab, glob_res.gl_pathv,
398  glob_res.gl_pathc, catalog_manager,
399  nested_catalog_candidates);
400  } else if (glob_retval == GLOB_NOMATCH) {
402  "Swissknife Sync: WARNING: cannot apply pathspec %s",
403  glob_string.c_str());
404  } else {
406  "Swissknife Sync: Failed to run glob matching (%s)",
407  glob_string.c_str());
408  }
409 
410  globfree(&glob_res);
411  }
412 }
413 
415  const catalog::Dirtab &dirtab, char **paths, const size_t npaths,
416  catalog::SimpleCatalogManager *catalog_manager,
417  std::vector<std::string> *nested_catalog_candidates) {
418  // go through the paths produced by glob() and filter them
419  for (size_t i = 0; i < npaths; ++i) {
420  // process candidate paths
421  const std::string candidate(paths[i]);
422  const std::string candidate_rel = candidate.substr(union_dir_.size());
423 
424  // check if path points to a directory
425  platform_stat64 candidate_info;
426  const int lstat_retval = platform_lstat(candidate.c_str(), &candidate_info);
427  if (lstat_retval != 0) {
429  "Swissknife Sync: "
430  "Error in processing .cvmfsdirtab: cannot access %s (%d)",
431  candidate.c_str(), errno);
432  abort();
433  }
434  assert(lstat_retval == 0);
435  if (!S_ISDIR(candidate_info.st_mode)) {
436  // The GLOB_ONLYDIR flag is only a hint, non-directories can still be
437  // returned
438  LogCvmfs(kLogCatalog, kLogDebug, "Swissknife Sync: "
439  "The '%s' dirtab entry does not point to a directory "
440  "but to a file or a symbolic link",
441  candidate_rel.c_str());
442  continue;
443  }
444 
445  // check if the path is a meta-directory (. or ..)
446  assert(candidate_rel.size() >= 2);
447  if (candidate_rel.substr(candidate_rel.size() - 2) == "/." ||
448  candidate_rel.substr(candidate_rel.size() - 3) == "/..") {
449  continue;
450  }
451 
452  // check that the path isn't excluded in the dirtab
453  if (dirtab.IsOpposing(candidate_rel)) {
455  "Swissknife Sync: Candidate '%s' is excluded by dirtab",
456  candidate_rel.c_str());
457  continue;
458  }
459 
460  // lookup the path in the catalog structure to find out if it already
461  // points to a nested catalog transition point. Furthermore it could be
462  // a new directory and thus not in any catalog yet.
464  const bool lookup_success = catalog_manager->LookupPath(
465  candidate_rel, catalog::kLookupDefault, &dirent);
466  if (!lookup_success) {
468  "Swissknife Sync: Didn't find '%s' in catalogs, could "
469  "be a new directory and nested catalog.",
470  candidate_rel.c_str());
471  nested_catalog_candidates->push_back(candidate);
472  } else if (!dirent.IsNestedCatalogMountpoint() &&
473  !dirent.IsNestedCatalogRoot()) {
475  "Swissknife Sync: Found '%s' in catalogs but is not a "
476  "nested catalog yet.",
477  candidate_rel.c_str());
478  nested_catalog_candidates->push_back(candidate);
479  } else {
480  // check if the nested catalog marker is still there, we might need to
481  // recreate the catalog after manual marker removal
482  // Note: First we check if the parent directory shows up in the scratch
483  // space to verify that it was touched (copy-on-write)
484  // Otherwise we would force the cvmfs client behind the union
485  // file-
486  // system to (potentially) unnecessarily fetch catalogs
487  if (DirectoryExists(scratch_dir_ + candidate_rel) &&
488  !FileExists(union_dir_ + candidate_rel + "/.cvmfscatalog")) {
490  "Swissknife Sync: WARNING: '%s' should be a nested "
491  "catalog according to the dirtab. "
492  "Recreating...",
493  candidate_rel.c_str());
494  nested_catalog_candidates->push_back(candidate);
495  } else {
496  LogCvmfs(kLogCatalog, kLogDebug, "Swissknife Sync: "
497  "Found '%s' in catalogs and it already is a nested catalog.",
498  candidate_rel.c_str());
499  }
500  }
501  }
502 }
503 
505  const std::vector<std::string> &new_nested_catalogs) {
506  // go through the new nested catalog paths and create .cvmfscatalog markers
507  // where necessary
508  bool success = true;
509  std::vector<std::string>::const_iterator k = new_nested_catalogs.begin();
510  const std::vector<std::string>::const_iterator kend =
511  new_nested_catalogs.end();
512  for (; k != kend; ++k) {
513  assert(!k->empty() && k->size() > union_dir_.size());
514 
515  // was the marker already created by hand?
516  const std::string marker_path = *k + "/.cvmfscatalog";
517  if (FileExists(marker_path)) {
518  continue;
519  }
520 
521  // create a nested catalog marker
522  const mode_t mode = kDefaultFileMode;
523  const int fd = open(marker_path.c_str(), O_CREAT, mode);
524  if (fd < 0) {
526  "Swissknife Sync: Failed to create nested catalog marker "
527  "at '%s' (errno: %d)",
528  marker_path.c_str(), errno);
529  success = false;
530  continue;
531  }
532  close(fd);
533 
534  // inform the user if requested
535  if (verbose_) {
537  "Swissknife Sync: Auto-creating nested catalog in %s",
538  k->c_str());
539  }
540  }
541 
542  return success;
543 }
544 
545 struct chunk_arg {
546  chunk_arg(char param, size_t *save_to) : param(param), save_to(save_to) {}
547  char param;
548  size_t *save_to;
549 };
550 
552  const swissknife::ArgumentList &args, SyncParameters *params) {
553  typedef std::vector<chunk_arg> ChunkArgs;
554 
555  // define where to store the value of which file chunk argument
556  ChunkArgs chunk_args;
557  chunk_args.push_back(chunk_arg('a', &params->avg_file_chunk_size));
558  chunk_args.push_back(chunk_arg('l', &params->min_file_chunk_size));
559  chunk_args.push_back(chunk_arg('h', &params->max_file_chunk_size));
560 
561  // read the arguments
562  ChunkArgs::const_iterator i = chunk_args.begin();
563  ChunkArgs::const_iterator iend = chunk_args.end();
564  for (; i != iend; ++i) {
565  swissknife::ArgumentList::const_iterator arg = args.find(i->param);
566 
567  if (arg != args.end()) {
568  size_t arg_value = static_cast<size_t>(String2Uint64(*arg->second));
569  if (arg_value > 0) {
570  *i->save_to = arg_value;
571  } else {
572  return false;
573  }
574  }
575  }
576 
577  // check if argument values are sane
578  return true;
579 }
580 
582  string start_time = GetGMTimestamp();
583 
584  // Spawn monitoring process (watchdog)
585  std::string watchdog_dir = "/tmp";
586  char watchdog_path[PATH_MAX];
587  std::string timestamp = GetGMTimestamp("%Y.%m.%d-%H.%M.%S");
588  int path_size = snprintf(watchdog_path, sizeof(watchdog_path),
589  "%s/cvmfs-swissknife-sync-stacktrace.%s.%d",
590  watchdog_dir.c_str(), timestamp.c_str(), getpid());
591  assert(path_size > 0);
592  assert(path_size < PATH_MAX);
593  UniquePtr<Watchdog> watchdog(Watchdog::Create(NULL));
594  watchdog->Spawn(std::string(watchdog_path));
595 
596  SyncParameters params;
597 
598  // Initialization
599  params.dir_union = MakeCanonicalPath(*args.find('u')->second);
600  params.dir_scratch = MakeCanonicalPath(*args.find('s')->second);
601  params.dir_rdonly = MakeCanonicalPath(*args.find('c')->second);
602  params.dir_temp = MakeCanonicalPath(*args.find('t')->second);
603  params.base_hash = shash::MkFromHexPtr(shash::HexPtr(*args.find('b')->second),
605  params.stratum0 = *args.find('w')->second;
606  params.manifest_path = *args.find('o')->second;
607  params.spooler_definition = *args.find('r')->second;
608 
609  params.public_keys = *args.find('K')->second;
610  params.repo_name = *args.find('N')->second;
611 
613 
614  if (args.find('f') != args.end())
615  params.union_fs_type = *args.find('f')->second;
616  if (args.find('A') != args.end()) params.is_balanced = true;
617  if (args.find('x') != args.end()) params.print_changeset = true;
618  if (args.find('y') != args.end()) params.dry_run = true;
619  if (args.find('m') != args.end()) params.mucatalogs = true;
620  if (args.find('i') != args.end()) params.ignore_xdir_hardlinks = true;
621  if (args.find('d') != args.end()) params.stop_for_catalog_tweaks = true;
622  if (args.find('V') != args.end()) params.voms_authz = true;
623  if (args.find('F') != args.end()) params.authz_file = *args.find('F')->second;
624  if (args.find('k') != args.end()) params.include_xattrs = true;
625  if (args.find('j') != args.end()) params.enable_mtime_ns = true;
626  if (args.find('Y') != args.end()) params.external_data = true;
627  if (args.find('W') != args.end()) params.direct_io = true;
628  if (args.find('S') != args.end()) {
630  *args.find('S')->second, &params.virtual_dir_actions);
631  if (!retval) {
633  "Swissknife Sync: Invalid virtual catalog options: %s",
634  args.find('S')->second->c_str());
635  return 1;
636  }
637  }
638  if (args.find('z') != args.end()) {
639  unsigned log_level =
640  1 << (kLogLevel0 + String2Uint64(*args.find('z')->second));
641  if (log_level > kLogNone) {
642  LogCvmfs(kLogCvmfs, kLogStderr, "Swissknife Sync: invalid log level");
643  return 1;
644  }
645  SetLogVerbosity(static_cast<LogLevels>(log_level));
646  }
647 
648  if (args.find('X') != args.end())
649  params.max_weight = String2Uint64(*args.find('X')->second);
650  if (args.find('M') != args.end())
651  params.min_weight = String2Uint64(*args.find('M')->second);
652 
653  if (args.find('p') != args.end()) {
654  params.use_file_chunking = true;
655  if (!ReadFileChunkingArgs(args, &params)) {
656  PrintError("Swissknife Sync: Failed to read file chunk size values");
657  return 2;
658  }
659  }
660  if (args.find('O') != args.end()) {
661  params.generate_legacy_bulk_chunks = true;
662  }
663  shash::Algorithms hash_algorithm = shash::kSha1;
664  if (args.find('e') != args.end()) {
665  hash_algorithm = shash::ParseHashAlgorithm(*args.find('e')->second);
666  if (hash_algorithm == shash::kAny) {
667  PrintError("Swissknife Sync: Unknown hash algorithm");
668  return 1;
669  }
670  }
671  if (args.find('Z') != args.end()) {
672  params.compression_alg =
673  zlib::ParseCompressionAlgorithm(*args.find('Z')->second);
674  }
675 
676  if (args.find('E') != args.end()) params.enforce_limits = true;
677  if (args.find('Q') != args.end()) {
678  params.nested_kcatalog_limit = String2Uint64(*args.find('Q')->second);
679  } else {
681  }
682  if (args.find('R') != args.end()) {
683  params.root_kcatalog_limit = String2Uint64(*args.find('R')->second);
684  } else {
686  }
687  if (args.find('U') != args.end()) {
688  params.file_mbyte_limit = String2Uint64(*args.find('U')->second);
689  } else {
691  }
692 
693  if (args.find('v') != args.end()) {
694  sanitizer::IntegerSanitizer sanitizer;
695  if (!sanitizer.IsValid(*args.find('v')->second)) {
696  PrintError("Swissknife Sync: Invalid revision number");
697  return 1;
698  }
699  params.manual_revision = String2Uint64(*args.find('v')->second);
700  }
701 
702  params.branched_catalog = args.find('B') != args.end();
703 
704  if (args.find('q') != args.end()) {
705  params.max_concurrent_write_jobs = String2Uint64(*args.find('q')->second);
706  }
707 
708  if (args.find('0') != args.end()) {
709  params.num_upload_tasks = String2Uint64(*args.find('0')->second);
710  }
711 
712  if (args.find('T') != args.end()) {
713  params.ttl_seconds = String2Uint64(*args.find('T')->second);
714  }
715 
716  if (args.find('g') != args.end()) {
717  params.ignore_special_files = true;
718  }
719 
720  if (args.find('P') != args.end()) {
721  params.session_token_file = *args.find('P')->second;
722  }
723 
724  if (args.find('H') != args.end()) {
725  params.key_file = *args.find('H')->second;
726  }
727 
728  if (args.find('D') != args.end()) {
729  params.repo_tag.SetName(*args.find('D')->second);
730  }
731 
732  if (args.find('J') != args.end()) {
733  params.repo_tag.SetDescription(*args.find('J')->second);
734  }
735 
736  if (args.find('G') != args.end()) {
737  params.cache_dir = "/var/spool/cvmfs/" + params.repo_name + "/cache.server";
738  }
739 
740  const bool upload_statsdb = (args.count('I') > 0);
741 
742  if (!CheckParams(params)) return 2;
743  // This may fail, in which case a warning is printed and the process continues
745 
746  perf::StatisticsTemplate publish_statistics("publish", this->statistics());
747 
748  // Start spooler
749  upload::SpoolerDefinition spooler_definition(
750  params.spooler_definition, hash_algorithm, params.compression_alg,
753  params.max_file_chunk_size, params.session_token_file, params.key_file);
754  if (params.max_concurrent_write_jobs > 0) {
755  spooler_definition.number_of_concurrent_uploads =
757  }
758  spooler_definition.num_upload_tasks = params.num_upload_tasks;
759 
760  upload::SpoolerDefinition spooler_definition_catalogs(
761  spooler_definition.Dup2DefaultCompression());
762 
763  params.spooler = upload::Spooler::Construct(spooler_definition,
764  &publish_statistics);
765  if (NULL == params.spooler) return 3;
767  upload::Spooler::Construct(spooler_definition_catalogs,
768  &publish_statistics));
769  if (!spooler_catalogs.IsValid()) return 3;
770 
771  const bool follow_redirects = (args.count('L') > 0);
772  const string proxy = (args.count('@') > 0) ? *args.find('@')->second : "";
773  if (!InitDownloadManager(follow_redirects, proxy)) {
774  return 3;
775  }
776 
777  if (!InitSignatureManager(params.public_keys)) {
778  return 3;
779  }
780 
781  /*
782  * Note: If the upstream is of type gateway, due to the possibility of
783  * concurrent release managers, it's possible to have a different local and
784  * remote root hashes. We proceed by loading the remote manifest but we give
785  * an empty base hash.
786  */
788  if (params.branched_catalog) {
789  // Throw-away manifest
790  manifest = new manifest::Manifest(shash::Any(), 0, "");
791  } else if (params.virtual_dir_actions !=
793  manifest = this->OpenLocalManifest(params.manifest_path);
794  params.base_hash = manifest->catalog_hash();
795  } else {
796  // TODO(jblomer): revert to params.base_hash if spooler driver type is not
797  // upload::SpoolerDefinition::Gateway
798  manifest =
799  FetchRemoteManifest(params.stratum0, params.repo_name, shash::Any());
800  }
801  if (!manifest.IsValid()) {
802  return 3;
803  }
804 
805  StatisticsDatabase *stats_db =
807 
808  const std::string old_root_hash = manifest->catalog_hash().ToString(true);
809 
810  catalog::WritableCatalogManager catalog_manager(
811  params.base_hash, params.stratum0, params.dir_temp,
812  spooler_catalogs.weak_ref(), download_manager(), params.enforce_limits,
814  params.file_mbyte_limit, statistics(), params.is_balanced,
815  params.max_weight, params.min_weight, params.cache_dir);
816  catalog_manager.Init();
817 
818  publish::SyncMediator mediator(&catalog_manager, &params, publish_statistics);
819  LogCvmfs(kLogPublish, kLogStdout, "Swissknife Sync: Processing changes...");
820 
821  // Should be before the synchronization starts to avoid race of GetTTL with
822  // other sqlite operations
823  if ((params.ttl_seconds > 0) &&
824  ((params.ttl_seconds != catalog_manager.GetTTL()) ||
825  !catalog_manager.HasExplicitTTL())) {
827  "Swissknife Sync: Setting repository TTL to %" PRIu64 "s",
828  params.ttl_seconds);
829  catalog_manager.SetTTL(params.ttl_seconds);
830  }
831 
832  // Either real catalogs or virtual catalog
834  publish::SyncUnion *sync;
835  if (params.union_fs_type == "overlayfs") {
836  sync = new publish::SyncUnionOverlayfs(
837  &mediator, params.dir_rdonly, params.dir_union, params.dir_scratch);
838  } else if (params.union_fs_type == "aufs") {
839  sync = new publish::SyncUnionAufs(&mediator, params.dir_rdonly,
840  params.dir_union, params.dir_scratch);
841  } else {
843  "Swissknife Sync: unknown union file system: %s",
844  params.union_fs_type.c_str());
845  return 3;
846  }
847 
848  if (!sync->Initialize()) {
850  "Swissknife Sync: Initialization of the synchronisation "
851  "engine failed");
852  return 4;
853  }
854 
855  sync->Traverse();
856  } else {
857  assert(!manifest->history().IsNull());
858  catalog::VirtualCatalog virtual_catalog(
859  manifest.weak_ref(), download_manager(), &catalog_manager, &params);
860  virtual_catalog.Generate(params.virtual_dir_actions);
861  }
862 
863  if (!params.authz_file.empty()) {
865  "Swissknife Sync: Adding contents of authz file %s to"
866  " root catalog.",
867  params.authz_file.c_str());
868  int fd = open(params.authz_file.c_str(), O_RDONLY);
869  if (fd == -1) {
871  "Swissknife Sync: Unable to open authz file (%s)"
872  "from the publication process: %s",
873  params.authz_file.c_str(), strerror(errno));
874  return 7;
875  }
876 
877  std::string new_authz;
878  const bool read_successful = SafeReadToString(fd, &new_authz);
879  close(fd);
880 
881  if (!read_successful) {
883  "Swissknife Sync: Failed to read authz file (%s): %s",
884  params.authz_file.c_str(), strerror(errno));
885  return 8;
886  }
887 
888  catalog_manager.SetVOMSAuthz(new_authz);
889  }
890 
891  if (!mediator.Commit(manifest.weak_ref())) {
892  PrintError("Swissknife Sync: Something went wrong during sync");
893  if (!params.dry_run) {
894  stats_db->StorePublishStatistics(this->statistics(), start_time, false);
895  if (upload_statsdb) {
896  stats_db->UploadStatistics(params.spooler);
897  }
898  }
899  return 5;
900  }
901 
902  perf::Counter *revision_counter = statistics()->Register("publish.revision",
903  "Published revision number");
904  revision_counter->Set(static_cast<int64_t>(
905  catalog_manager.GetRootCatalog()->revision()));
906 
907  // finalize the spooler
909  "Swissknife Sync: Wait for all uploads to finish");
910  params.spooler->WaitForUpload();
911  spooler_catalogs->WaitForUpload();
912  params.spooler->FinalizeSession(false);
913 
915  "Swissknife Sync: Exporting repository manifest");
916 
917  // We call FinalizeSession(true) this time, to also trigger the commit
918  // operation on the gateway machine (if the upstream is of type "gw").
919 
920  // Get the path of the new root catalog
921  const std::string new_root_hash = manifest->catalog_hash().ToString(true);
922 
923  if (!spooler_catalogs->FinalizeSession(true, old_root_hash, new_root_hash,
924  params.repo_tag)) {
925  PrintError("Swissknife Sync: Failed to commit transaction.");
926  if (!params.dry_run) {
927  stats_db->StorePublishStatistics(this->statistics(), start_time, false);
928  if (upload_statsdb) {
929  stats_db->UploadStatistics(params.spooler);
930  }
931  }
932  return 9;
933  }
934 
935  if (!params.dry_run) {
936  stats_db->StorePublishStatistics(this->statistics(), start_time, true);
937  if (upload_statsdb) {
938  stats_db->UploadStatistics(params.spooler);
939  }
940  }
941 
942  delete params.spooler;
943 
944  if (!manifest->Export(params.manifest_path)) {
945  PrintError("Swissknife Sync: Failed to create new repository");
946  return 6;
947  }
948 
949  return 0;
950 }
bool Commit(manifest::Manifest *manifest)
void SetLogVerbosity(const LogLevels max_level)
Definition: logging.cc:261
std::string database_file() const
Definition: reflog.cc:337
Algorithms ParseCompressionAlgorithm(const std::string &algorithm_option)
Definition: compression.cc:148
size_t avg_file_chunk_size
const manifest::Manifest * manifest() const
Definition: repository.h:125
struct stat64 platform_stat64
std::string stratum0
int Main(const ArgumentList &args)
size_t * save_to
std::string cache_dir
std::string GetGMTimestamp(const std::string &format)
Definition: string.cc:633
bool IsValid() const
Definition: dirtab.h:127
bool UploadStatistics(upload::Spooler *spooler, std::string local_path="")
T * weak_ref() const
Definition: pointer.h:42
SpoolerDefinition Dup2DefaultCompression() const
std::vector< Rule > Rules
Definition: dirtab.h:68
bool generate_legacy_bulk_chunks
unsigned file_mbyte_limit
uint64_t max_concurrent_write_jobs
const int kDefaultFileMode
Definition: posix.h:32
static const unsigned kDefaultFileMbyteLimit
virtual bool Initialize()
Definition: sync_union.cc:24
static const unsigned kDefaultNestedKcatalogLimit
static const unsigned kDefaultRootKcatalogLimit
int Main(const ArgumentList &args)
static bool ParseActions(const std::string &action_desc, int *actions)
size_t min_file_chunk_size
unsigned num_upload_tasks
std::string spooler_definition
std::string manifest_path
bool ReadFileChunkingArgs(const swissknife::ArgumentList &args, SyncParameters *params)
void SetName(const std::string &name)
const Rules & positive_rules() const
Definition: dirtab.h:121
bool CheckParams(const SyncParameters &p)
zlib::Algorithms compression_alg
unsigned nested_kcatalog_limit
int GlobLstat(const char *name, struct stat *st)
std::string repo_name
assert((mem||(size==0))&&"Out Of Memory")
bool LookupPath(const PathString &path, const LookupOptions options, DirectoryEntry *entry)
void SetDescription(const std::string &description)
std::string union_fs_type
const unsigned kLookupDefault
Definition: catalog_mgr.h:43
std::string dir_scratch
int Main(const ArgumentList &args)
bool IsNestedCatalogMountpoint() const
virtual bool IsOpposing(const std::string &path) const
Definition: dirtab.cc:161
chunk_arg(char param, size_t *save_to)
bool IsValid(const std::string &input) const
Definition: sanitizer.cc:114
Algorithms
Definition: hash.h:41
std::string key_file
bool IsNestedCatalogRoot() const
bool FileExists(const std::string &path)
Definition: posix.cc:802
std::string dir_temp
int Main(const ArgumentList &args)
static Watchdog * Create(FnOnCrash on_crash)
Definition: monitor.cc:70
static manifest::Manifest * CreateRepository(const std::string &dir_temp, const bool volatile_content, const std::string &voms_authz, upload::Spooler *spooler)
void Set(const int64_t val)
Definition: statistics.h:33
void FilterCandidatesFromGlobResult(const catalog::Dirtab &dirtab, char **paths, const size_t npaths, catalog::SimpleCatalogManager *catalog_manager, std::vector< std::string > *nested_catalog_candidates)
bool CheckParams(const swissknife::CommandLease::Parameters &p)
std::string dir_rdonly
const char kSuffixCatalog
Definition: hash.h:54
shash::Any base_hash
upload::Spooler * spooler
int platform_lstat(const char *path, platform_stat64 *buf)
static Dirtab * Create(const std::string &dirtab_path)
Definition: dirtab.h:84
static const int kActionNone
int Main(const ArgumentList &args)
std::string dir_union
static void HashDatabase(const std::string &database_path, shash::Any *hash_reflog)
Definition: reflog.cc:322
int Main(const ArgumentList &args)
bool IsValid() const
Definition: pointer.h:43
struct dirent * GlobReaddir(void *dirp)
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:285
void DropDatabaseFileOwnership()
Definition: reflog.cc:313
static StatisticsDatabase * OpenStandardDB(const std::string repo_name)
bool DirectoryExists(const std::string &path)
Definition: posix.cc:824
bool stop_for_catalog_tweaks
bool SafeReadToString(int fd, std::string *final_result)
Definition: posix.cc:2142
unsigned root_kcatalog_limit
size_t max_file_chunk_size
bool StorePublishStatistics(const perf::Statistics *statistics, const std::string &start_time, const bool success)
uint64_t manual_revision
int GlobStat(const char *name, struct stat *st)
std::string session_token_file
bool ObtainDacReadSearchCapability()
virtual void Traverse()=0
uint64_t String2Uint64(const string &value)
Definition: string.cc:246
std::map< char, SharedPtr< std::string > > ArgumentList
Definition: swissknife.h:72
size_t RuleCount() const
Definition: dirtab.h:124
Algorithms ParseHashAlgorithm(const string &algorithm_option)
Definition: hash.cc:72
uint64_t ttl_seconds
std::string authz_file
Any MkFromHexPtr(const HexPtr hex, const char suffix)
Definition: hash.cc:83
void DetermineNestedCatalogCandidates(const catalog::Dirtab &dirtab, catalog::SimpleCatalogManager *catalog_manager, std::vector< std::string > *nested_catalog_candidates)
RepositoryTag repo_tag
const int kLogVerboseMsg
void Generate(int actions)
void Spawn(const std::string &crash_dump_path)
Definition: monitor.cc:510
static bool WriteChecksum(const std::string &path, const shash::Any &value)
Definition: reflog.cc:64
bool ignore_xdir_hardlinks
std::string MakeCanonicalPath(const std::string &path)
Definition: posix.cc:98
void PrintError(const string &message)
Definition: logging.cc:543
const upload::Spooler * spooler_catalogs() const
Definition: repository.h:322
static const uint64_t kDefaultTTL
Definition: catalog.h:104
bool CreateCatalogMarkers(const std::vector< std::string > &new_nested_catalogs)
std::string public_keys
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528