CernVM-FS  2.9.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
swissknife_sync.cc
Go to the documentation of this file.
1 
19 #define _FILE_OFFSET_BITS 64
20 
21 #define __STDC_FORMAT_MACROS
22 
23 #include "swissknife_sync.h"
24 #include "cvmfs_config.h"
25 
26 #include <errno.h>
27 #include <fcntl.h>
28 #include <glob.h>
29 #include <inttypes.h>
30 #include <limits.h>
31 #include <sys/capability.h>
32 
33 #include <cstdio>
34 #include <cstdlib>
35 #include <string>
36 #include <vector>
37 
38 #include "catalog_mgr_ro.h"
39 #include "catalog_mgr_rw.h"
40 #include "catalog_virtual.h"
41 #include "download.h"
42 #include "logging.h"
43 #include "manifest.h"
44 #include "monitor.h"
45 #include "path_filters/dirtab.h"
46 #include "platform.h"
47 #include "reflog.h"
48 #include "sanitizer.h"
49 #include "statistics.h"
50 #include "statistics_database.h"
52 #include "sync_mediator.h"
53 #include "sync_union.h"
54 #include "sync_union_aufs.h"
55 #include "sync_union_overlayfs.h"
56 #include "util/string.h"
57 
58 using namespace std; // NOLINT
59 
61  if (!DirectoryExists(p.dir_scratch)) {
62  PrintError("overlay (copy on write) directory does not exist");
63  return false;
64  }
65  if (!DirectoryExists(p.dir_union)) {
66  PrintError("union volume does not exist");
67  return false;
68  }
69  if (!DirectoryExists(p.dir_rdonly)) {
70  PrintError("cvmfs read/only repository does not exist");
71  return false;
72  }
73  if (p.stratum0 == "") {
74  PrintError("Stratum0 url missing");
75  return false;
76  }
77 
78  if (p.manifest_path == "") {
79  PrintError("manifest output required");
80  return false;
81  }
82  if (!DirectoryExists(p.dir_temp)) {
83  PrintError("data store directory does not exist");
84  return false;
85  }
86 
89  PrintError("file chunk size values are not sane");
90  return false;
91  }
92 
93  if (HasPrefix(p.spooler_definition, "gw", false)) {
94  if (p.session_token_file.empty()) {
95  PrintError(
96  "Session token file has to be provided "
97  "when upstream type is gw.");
98  return false;
99  }
100  }
101 
102  return true;
103 }
104 
106  const string manifest_path = *args.find('o')->second;
107  const string dir_temp = *args.find('t')->second;
108  const string spooler_definition = *args.find('r')->second;
109  const string repo_name = *args.find('n')->second;
110  const string reflog_chksum_path = *args.find('R')->second;
111  if (args.find('l') != args.end()) {
112  unsigned log_level =
113  1 << (kLogLevel0 + String2Uint64(*args.find('l')->second));
114  if (log_level > kLogNone) {
115  LogCvmfs(kLogCvmfs, kLogStderr, "invalid log level");
116  return 1;
117  }
118  SetLogVerbosity(static_cast<LogLevels>(log_level));
119  }
120  shash::Algorithms hash_algorithm = shash::kSha1;
121  if (args.find('a') != args.end()) {
122  hash_algorithm = shash::ParseHashAlgorithm(*args.find('a')->second);
123  if (hash_algorithm == shash::kAny) {
124  PrintError("unknown hash algorithm");
125  return 1;
126  }
127  }
128 
129  const bool volatile_content = (args.count('v') > 0);
130  const bool garbage_collectable = (args.count('z') > 0);
131  std::string voms_authz;
132  if (args.find('V') != args.end()) {
133  voms_authz = *args.find('V')->second;
134  }
135 
136  const upload::SpoolerDefinition sd(spooler_definition, hash_algorithm,
138  UniquePtr<upload::Spooler> spooler(upload::Spooler::Construct(sd));
139  assert(spooler.IsValid());
140 
143  dir_temp, volatile_content, voms_authz, spooler));
144  if (!manifest.IsValid()) {
145  PrintError("Failed to create new repository");
146  return 1;
147  }
148 
149  UniquePtr<manifest::Reflog> reflog(CreateEmptyReflog(dir_temp, repo_name));
150  if (!reflog.IsValid()) {
151  PrintError("Failed to create fresh Reflog");
152  return 1;
153  }
154 
155  reflog->DropDatabaseFileOwnership();
156  string reflog_path = reflog->database_file();
157  reflog.Destroy();
158  shash::Any reflog_hash(hash_algorithm);
159  manifest::Reflog::HashDatabase(reflog_path, &reflog_hash);
160  spooler->UploadReflog(reflog_path);
161  spooler->WaitForUpload();
162  unlink(reflog_path.c_str());
163  if (spooler->GetNumberOfErrors()) {
164  LogCvmfs(kLogCvmfs, kLogStderr, "Failed to upload reflog");
165  return 4;
166  }
167  assert(!reflog_chksum_path.empty());
168  manifest::Reflog::WriteChecksum(reflog_chksum_path, reflog_hash);
169 
170  // set optional manifest fields
171  const bool needs_bootstrap_shortcuts = !voms_authz.empty();
172  manifest->set_garbage_collectability(garbage_collectable);
173  manifest->set_has_alt_catalog_path(needs_bootstrap_shortcuts);
174 
175  if (!manifest->Export(manifest_path)) {
176  PrintError("Failed to create new repository");
177  return 5;
178  }
179 
180  return 0;
181 }
182 
184  const string source = *args.find('i')->second;
185  const string dest = *args.find('o')->second;
186  const string spooler_definition = *args.find('r')->second;
187  shash::Algorithms hash_algorithm = shash::kSha1;
188  if (args.find('a') != args.end()) {
189  hash_algorithm = shash::ParseHashAlgorithm(*args.find('a')->second);
190  if (hash_algorithm == shash::kAny) {
191  PrintError("unknown hash algorithm");
192  return 1;
193  }
194  }
195 
196  const upload::SpoolerDefinition sd(spooler_definition, hash_algorithm);
197  upload::Spooler *spooler = upload::Spooler::Construct(sd);
198  assert(spooler);
199  spooler->Upload(source, dest);
200  spooler->WaitForUpload();
201 
202  if (spooler->GetNumberOfErrors() > 0) {
203  LogCvmfs(kLogCatalog, kLogStderr, "failed to upload %s", source.c_str());
204  return 1;
205  }
206 
207  delete spooler;
208 
209  return 0;
210 }
211 
213  const string file_to_peek = *args.find('d')->second;
214  const string spooler_definition = *args.find('r')->second;
215 
216  // Hash doesn't matter
217  const upload::SpoolerDefinition sd(spooler_definition, shash::kAny);
218  upload::Spooler *spooler = upload::Spooler::Construct(sd);
219  assert(spooler);
220  const bool success = spooler->Peek(file_to_peek);
221 
222  if (spooler->GetNumberOfErrors() > 0) {
223  LogCvmfs(kLogCatalog, kLogStderr, "failed to peek for %s",
224  file_to_peek.c_str());
225  return 2;
226  }
227  if (!success) {
228  LogCvmfs(kLogCatalog, kLogStdout, "%s not found", file_to_peek.c_str());
229  return 1;
230  }
231  LogCvmfs(kLogCatalog, kLogStdout, "%s available", file_to_peek.c_str());
232 
233  delete spooler;
234 
235  return 0;
236 }
237 
239  const string file_to_delete = *args.find('o')->second;
240  const string spooler_definition = *args.find('r')->second;
241 
242  // Hash doesn't matter
243  const upload::SpoolerDefinition sd(spooler_definition, shash::kAny);
244  upload::Spooler *spooler = upload::Spooler::Construct(sd);
245  assert(spooler);
246  spooler->RemoveAsync(file_to_delete);
247  spooler->WaitForUpload();
248 
249  if (spooler->GetNumberOfErrors() > 0) {
250  LogCvmfs(kLogCatalog, kLogStderr, "failed to delete %s",
251  file_to_delete.c_str());
252  return 1;
253  }
254 
255  delete spooler;
256 
257  return 0;
258 }
259 
261  const string dirtab_file = *args.find('d')->second;
262  union_dir_ = MakeCanonicalPath(*args.find('u')->second);
263  scratch_dir_ = MakeCanonicalPath(*args.find('s')->second);
264  const shash::Any base_hash = shash::MkFromHexPtr(
265  shash::HexPtr(*args.find('b')->second), shash::kSuffixCatalog);
266  const string stratum0 = *args.find('w')->second;
267  const string dir_temp = *args.find('t')->second;
268  verbose_ = (args.find('x') != args.end());
269 
270  // check if there is a dirtab file
271  if (!FileExists(dirtab_file)) {
273  "Didn't find a dirtab at '%s'. Skipping...", dirtab_file.c_str());
274  return 0;
275  }
276 
277  // parse dirtab file
278  catalog::Dirtab *dirtab = catalog::Dirtab::Create(dirtab_file);
279  if (!dirtab->IsValid()) {
280  LogCvmfs(kLogCatalog, kLogStderr, "Invalid or not readable dirtab '%s'",
281  dirtab_file.c_str());
282  return 1;
283  }
284  LogCvmfs(kLogCatalog, kLogVerboseMsg, "Found %d rules in dirtab '%s'",
285  dirtab->RuleCount(), dirtab_file.c_str());
286 
287  // initialize catalog infrastructure
288  const bool auto_manage_catalog_files = true;
289  const bool follow_redirects = (args.count('L') > 0);
290  if (!InitDownloadManager(follow_redirects)) {
291  return 1;
292  }
293  catalog::SimpleCatalogManager catalog_manager(
294  base_hash, stratum0, dir_temp, download_manager(), statistics(),
295  auto_manage_catalog_files);
296  catalog_manager.Init();
297 
298  vector<string> new_nested_catalogs;
299  DetermineNestedCatalogCandidates(*dirtab, &catalog_manager,
300  &new_nested_catalogs);
301  const bool success = CreateCatalogMarkers(new_nested_catalogs);
302  delete dirtab;
303 
304  return (success) ? 0 : 1;
305 }
306 
307 
308 namespace {
309 
310 // Overwrite directory traversal in the globbing in order to avoid breaking out
311 // the repository tree
312 
313 std::string *g_glob_uniondir = NULL;
314 
315 bool GlobCheckPath(const char *name) {
316  char resolved_cstr[PATH_MAX];
317  char *retval = realpath(name, resolved_cstr);
318  if (retval == NULL) return false;
319 
320  std::string resolved(resolved_cstr);
321  if (resolved == *g_glob_uniondir) return true;
322  if (!HasPrefix(resolved, (*g_glob_uniondir) + "/", false /*ignore_case*/)) {
323  errno = EACCES;
324  return false;
325  }
326  return true;
327 }
328 
329 void *GlobOpendir(const char *name) {
330  if (!GlobCheckPath(name)) return NULL;
331  return opendir(name);
332 }
333 
334 void GlobClosedir(void *dirp) {
335  closedir(static_cast<DIR *>(dirp));
336 }
337 
338 struct dirent *GlobReaddir(void *dirp) {
339  return readdir(static_cast<DIR *>(dirp));
340 }
341 
342 int GlobLstat(const char *name, struct stat *st) {
343  if (!GlobCheckPath(name)) return -1;
344  return lstat(name, st);
345 }
346 
347 int GlobStat(const char *name, struct stat *st) {
348  if (!GlobCheckPath(name)) return -1;
349  return stat(name, st);
350 }
351 
352 
353 } // anonymous namespace
354 
356  const catalog::Dirtab &dirtab,
357  catalog::SimpleCatalogManager *catalog_manager,
358  vector<string> *nested_catalog_candidates) {
359  // find possible new nested catalog locations
360  const catalog::Dirtab::Rules &lookup_rules = dirtab.positive_rules();
361  catalog::Dirtab::Rules::const_iterator i = lookup_rules.begin();
362  const catalog::Dirtab::Rules::const_iterator iend = lookup_rules.end();
363  for (; i != iend; ++i) {
364  assert(!i->is_negation);
365 
366  // run a glob using the current dirtab rule on the current repository
367  // state
368  const std::string &glob_string = i->pathspec.GetGlobString();
369  const std::string &glob_string_abs = union_dir_ + glob_string;
370  const int glob_flags = GLOB_ONLYDIR | GLOB_NOSORT | GLOB_PERIOD |
371  GLOB_ALTDIRFUNC;
372  glob_t glob_res;
373  g_glob_uniondir = new std::string(union_dir_);
374  glob_res.gl_opendir = GlobOpendir;
375  glob_res.gl_readdir = GlobReaddir;
376  glob_res.gl_closedir = GlobClosedir;
377  glob_res.gl_lstat = GlobLstat;
378  glob_res.gl_stat = GlobStat;
379  const int glob_retval =
380  glob(glob_string_abs.c_str(), glob_flags, NULL, &glob_res);
381  delete g_glob_uniondir;
382  g_glob_uniondir = NULL;
383 
384  if (glob_retval == 0) {
385  // found some candidates... filtering by cvmfs catalog structure
386  LogCvmfs(kLogCatalog, kLogDebug, "Found %d entries for pathspec (%s)",
387  glob_res.gl_pathc, glob_string.c_str());
388  FilterCandidatesFromGlobResult(dirtab, glob_res.gl_pathv,
389  glob_res.gl_pathc, catalog_manager,
390  nested_catalog_candidates);
391  } else if (glob_retval == GLOB_NOMATCH) {
392  LogCvmfs(kLogCvmfs, kLogStderr, "WARNING: cannot apply pathspec %s",
393  glob_string.c_str());
394  } else {
395  LogCvmfs(kLogCvmfs, kLogStderr, "Failed to run glob matching (%s)",
396  glob_string.c_str());
397  }
398 
399  globfree(&glob_res);
400  }
401 }
402 
404  const catalog::Dirtab &dirtab, char **paths, const size_t npaths,
405  catalog::SimpleCatalogManager *catalog_manager,
406  std::vector<std::string> *nested_catalog_candidates) {
407  // go through the paths produced by glob() and filter them
408  for (size_t i = 0; i < npaths; ++i) {
409  // process candidate paths
410  const std::string candidate(paths[i]);
411  const std::string candidate_rel = candidate.substr(union_dir_.size());
412 
413  // check if path points to a directory
414  platform_stat64 candidate_info;
415  const int lstat_retval = platform_lstat(candidate.c_str(), &candidate_info);
416  if (lstat_retval != 0) {
418  "Error in processing .cvmfsdirtab: cannot access %s (%d)",
419  candidate.c_str(), errno);
420  abort();
421  }
422  assert(lstat_retval == 0);
423  if (!S_ISDIR(candidate_info.st_mode)) {
424  // The GLOB_ONLYDIR flag is only a hint, non-directories can still be
425  // returned
427  "The '%s' dirtab entry does not point to a directory "
428  "but to a file or a symbolic link",
429  candidate_rel.c_str());
430  continue;
431  }
432 
433  // check if the path is a meta-directory (. or ..)
434  assert(candidate_rel.size() >= 2);
435  if (candidate_rel.substr(candidate_rel.size() - 2) == "/." ||
436  candidate_rel.substr(candidate_rel.size() - 3) == "/..") {
437  continue;
438  }
439 
440  // check that the path isn't excluded in the dirtab
441  if (dirtab.IsOpposing(candidate_rel)) {
442  LogCvmfs(kLogCatalog, kLogDebug, "Candidate '%s' is excluded by dirtab",
443  candidate_rel.c_str());
444  continue;
445  }
446 
447  // lookup the path in the catalog structure to find out if it already
448  // points to a nested catalog transition point. Furthermore it could be
449  // a new directory and thus not in any catalog yet.
451  const bool lookup_success = catalog_manager->LookupPath(
452  candidate_rel, catalog::kLookupSole, &dirent);
453  if (!lookup_success) {
455  "Didn't find '%s' in catalogs, could "
456  "be a new directory and nested catalog.",
457  candidate_rel.c_str());
458  nested_catalog_candidates->push_back(candidate);
459  } else if (!dirent.IsNestedCatalogMountpoint() &&
460  !dirent.IsNestedCatalogRoot()) {
462  "Found '%s' in catalogs but is not a "
463  "nested catalog yet.",
464  candidate_rel.c_str());
465  nested_catalog_candidates->push_back(candidate);
466  } else {
467  // check if the nested catalog marker is still there, we might need to
468  // recreate the catalog after manual marker removal
469  // Note: First we check if the parent directory shows up in the scratch
470  // space to verify that it was touched (copy-on-write)
471  // Otherwise we would force the cvmfs client behind the union
472  // file-
473  // system to (potentially) unncessarily fetch catalogs
474  if (DirectoryExists(scratch_dir_ + candidate_rel) &&
475  !FileExists(union_dir_ + candidate_rel + "/.cvmfscatalog")) {
477  "WARNING: '%s' should be a nested "
478  "catalog according to the dirtab. "
479  "Recreating...",
480  candidate_rel.c_str());
481  nested_catalog_candidates->push_back(candidate);
482  } else {
484  "Found '%s' in catalogs and it already is a nested catalog.",
485  candidate_rel.c_str());
486  }
487  }
488  }
489 }
490 
492  const std::vector<std::string> &new_nested_catalogs) {
493  // go through the new nested catalog paths and create .cvmfscatalog markers
494  // where necessary
495  bool success = true;
496  std::vector<std::string>::const_iterator k = new_nested_catalogs.begin();
497  const std::vector<std::string>::const_iterator kend =
498  new_nested_catalogs.end();
499  for (; k != kend; ++k) {
500  assert(!k->empty() && k->size() > union_dir_.size());
501 
502  // was the marker already created by hand?
503  const std::string marker_path = *k + "/.cvmfscatalog";
504  if (FileExists(marker_path)) {
505  continue;
506  }
507 
508  // create a nested catalog marker
509  const mode_t mode = kDefaultFileMode;
510  const int fd = open(marker_path.c_str(), O_CREAT, mode);
511  if (fd < 0) {
513  "Failed to create nested catalog marker "
514  "at '%s' (errno: %d)",
515  marker_path.c_str(), errno);
516  success = false;
517  continue;
518  }
519  close(fd);
520 
521  // inform the user if requested
522  if (verbose_) {
523  LogCvmfs(kLogCvmfs, kLogStdout, "Auto-creating nested catalog in %s",
524  k->c_str());
525  }
526  }
527 
528  return success;
529 }
530 
531 struct chunk_arg {
532  chunk_arg(char param, size_t *save_to) : param(param), save_to(save_to) {}
533  char param;
534  size_t *save_to;
535 };
536 
538  const swissknife::ArgumentList &args, SyncParameters *params) {
539  typedef std::vector<chunk_arg> ChunkArgs;
540 
541  // define where to store the value of which file chunk argument
542  ChunkArgs chunk_args;
543  chunk_args.push_back(chunk_arg('a', &params->avg_file_chunk_size));
544  chunk_args.push_back(chunk_arg('l', &params->min_file_chunk_size));
545  chunk_args.push_back(chunk_arg('h', &params->max_file_chunk_size));
546 
547  // read the arguments
548  ChunkArgs::const_iterator i = chunk_args.begin();
549  ChunkArgs::const_iterator iend = chunk_args.end();
550  for (; i != iend; ++i) {
551  swissknife::ArgumentList::const_iterator arg = args.find(i->param);
552 
553  if (arg != args.end()) {
554  size_t arg_value = static_cast<size_t>(String2Uint64(*arg->second));
555  if (arg_value > 0)
556  *i->save_to = arg_value;
557  else
558  return false;
559  }
560  }
561 
562  // check if argument values are sane
563  return true;
564 }
565 
567  string start_time = GetGMTimestamp();
568 
569  // Spawn monitoring process (watchdog)
570  std::string watchdog_dir = "/tmp";
571  char watchdog_path[PATH_MAX];
572  std::string timestamp = GetGMTimestamp("%Y.%m.%d-%H.%M.%S");
573  int path_size = snprintf(watchdog_path, sizeof(watchdog_path),
574  "%s/cvmfs-swissknife-sync-stacktrace.%s.%d",
575  watchdog_dir.c_str(), timestamp.c_str(), getpid());
576  assert(path_size > 0);
577  assert(path_size < PATH_MAX);
578  UniquePtr<Watchdog> watchdog(Watchdog::Create(std::string(watchdog_path)));
579  watchdog->Spawn();
580 
581  SyncParameters params;
582 
583  // Initialization
584  params.dir_union = MakeCanonicalPath(*args.find('u')->second);
585  params.dir_scratch = MakeCanonicalPath(*args.find('s')->second);
586  params.dir_rdonly = MakeCanonicalPath(*args.find('c')->second);
587  params.dir_temp = MakeCanonicalPath(*args.find('t')->second);
588  params.base_hash = shash::MkFromHexPtr(shash::HexPtr(*args.find('b')->second),
590  params.stratum0 = *args.find('w')->second;
591  params.manifest_path = *args.find('o')->second;
592  params.spooler_definition = *args.find('r')->second;
593 
594  params.public_keys = *args.find('K')->second;
595  params.repo_name = *args.find('N')->second;
596 
598 
599  if (args.find('f') != args.end())
600  params.union_fs_type = *args.find('f')->second;
601  if (args.find('A') != args.end()) params.is_balanced = true;
602  if (args.find('x') != args.end()) params.print_changeset = true;
603  if (args.find('y') != args.end()) params.dry_run = true;
604  if (args.find('m') != args.end()) params.mucatalogs = true;
605  if (args.find('i') != args.end()) params.ignore_xdir_hardlinks = true;
606  if (args.find('d') != args.end()) params.stop_for_catalog_tweaks = true;
607  if (args.find('V') != args.end()) params.voms_authz = true;
608  if (args.find('F') != args.end()) params.authz_file = *args.find('F')->second;
609  if (args.find('k') != args.end()) params.include_xattrs = true;
610  if (args.find('Y') != args.end()) params.external_data = true;
611  if (args.find('W') != args.end()) params.direct_io = true;
612  if (args.find('S') != args.end()) {
614  *args.find('S')->second, &params.virtual_dir_actions);
615  if (!retval) {
616  LogCvmfs(kLogCvmfs, kLogStderr, "invalid virtual catalog options: %s",
617  args.find('S')->second->c_str());
618  return 1;
619  }
620  }
621  if (args.find('z') != args.end()) {
622  unsigned log_level =
623  1 << (kLogLevel0 + String2Uint64(*args.find('z')->second));
624  if (log_level > kLogNone) {
625  LogCvmfs(kLogCvmfs, kLogStderr, "invalid log level");
626  return 1;
627  }
628  SetLogVerbosity(static_cast<LogLevels>(log_level));
629  }
630 
631  if (args.find('X') != args.end())
632  params.max_weight = String2Uint64(*args.find('X')->second);
633  if (args.find('M') != args.end())
634  params.min_weight = String2Uint64(*args.find('M')->second);
635 
636  if (args.find('p') != args.end()) {
637  params.use_file_chunking = true;
638  if (!ReadFileChunkingArgs(args, &params)) {
639  PrintError("Failed to read file chunk size values");
640  return 2;
641  }
642  }
643  if (args.find('O') != args.end()) {
644  params.generate_legacy_bulk_chunks = true;
645  }
646  shash::Algorithms hash_algorithm = shash::kSha1;
647  if (args.find('e') != args.end()) {
648  hash_algorithm = shash::ParseHashAlgorithm(*args.find('e')->second);
649  if (hash_algorithm == shash::kAny) {
650  PrintError("unknown hash algorithm");
651  return 1;
652  }
653  }
654  if (args.find('Z') != args.end()) {
655  params.compression_alg =
656  zlib::ParseCompressionAlgorithm(*args.find('Z')->second);
657  }
658 
659  if (args.find('C') != args.end()) {
660  params.trusted_certs = *args.find('C')->second;
661  }
662 
663  if (args.find('E') != args.end()) params.enforce_limits = true;
664  if (args.find('Q') != args.end()) {
665  params.nested_kcatalog_limit = String2Uint64(*args.find('Q')->second);
666  } else {
668  }
669  if (args.find('R') != args.end()) {
670  params.root_kcatalog_limit = String2Uint64(*args.find('R')->second);
671  } else {
673  }
674  if (args.find('U') != args.end()) {
675  params.file_mbyte_limit = String2Uint64(*args.find('U')->second);
676  } else {
678  }
679 
680  if (args.find('v') != args.end()) {
681  sanitizer::IntegerSanitizer sanitizer;
682  if (!sanitizer.IsValid(*args.find('v')->second)) {
683  PrintError("invalid revision number");
684  return 1;
685  }
686  params.manual_revision = String2Uint64(*args.find('v')->second);
687  }
688 
689  params.branched_catalog = args.find('B') != args.end();
690 
691  if (args.find('q') != args.end()) {
692  params.max_concurrent_write_jobs = String2Uint64(*args.find('q')->second);
693  }
694 
695  if (args.find('0') != args.end()) {
696  params.num_upload_tasks = String2Uint64(*args.find('0')->second);
697  }
698 
699  if (args.find('T') != args.end()) {
700  params.ttl_seconds = String2Uint64(*args.find('T')->second);
701  }
702 
703  if (args.find('g') != args.end()) {
704  params.ignore_special_files = true;
705  }
706 
707  if (args.find('P') != args.end()) {
708  params.session_token_file = *args.find('P')->second;
709  }
710 
711  if (args.find('H') != args.end()) {
712  params.key_file = *args.find('H')->second;
713  }
714 
715  if (args.find('D') != args.end()) {
716  params.repo_tag.name_ = *args.find('D')->second;
717  }
718 
719  if (args.find('G') != args.end()) {
720  params.repo_tag.channel_ = *args.find('G')->second;
721  }
722 
723  if (args.find('J') != args.end()) {
724  params.repo_tag.description_ = *args.find('J')->second;
725  }
726 
727  const bool upload_statsdb = (args.count('I') > 0);
728 
729  if (!CheckParams(params)) return 2;
730  // This may fail, in which case a warning is printed and the process continues
732 
733  perf::StatisticsTemplate publish_statistics("publish", this->statistics());
734 
735  // Start spooler
736  upload::SpoolerDefinition spooler_definition(
737  params.spooler_definition, hash_algorithm, params.compression_alg,
740  params.max_file_chunk_size, params.session_token_file, params.key_file);
741  if (params.max_concurrent_write_jobs > 0) {
742  spooler_definition.number_of_concurrent_uploads =
744  }
745  spooler_definition.num_upload_tasks = params.num_upload_tasks;
746 
747  upload::SpoolerDefinition spooler_definition_catalogs(
748  spooler_definition.Dup2DefaultCompression());
749 
750  params.spooler = upload::Spooler::Construct(spooler_definition,
751  &publish_statistics);
752  if (NULL == params.spooler) return 3;
754  upload::Spooler::Construct(spooler_definition_catalogs,
755  &publish_statistics));
756  if (!spooler_catalogs.IsValid()) return 3;
757 
758  const bool follow_redirects = (args.count('L') > 0);
759  if (!InitDownloadManager(follow_redirects)) {
760  return 3;
761  }
762 
763  if (!InitVerifyingSignatureManager(params.public_keys,
764  params.trusted_certs)) {
765  return 3;
766  }
767 
768  bool with_gateway =
769  spooler_definition.driver_type == upload::SpoolerDefinition::Gateway;
770  /*
771  * Note: If the upstream is of type gateway, due to the possibility of
772  * concurrent
773  * release managers, it's possible to have a different local and remote
774  * root
775  * hashes. We proceed by loading the remote manifest but we give an
776  * empty base
777  * hash.
778  */
780  if (params.branched_catalog) {
781  // Throw-away manifest
782  manifest = new manifest::Manifest(shash::Any(), 0, "");
783  } else if (params.virtual_dir_actions !=
785  manifest = this->OpenLocalManifest(params.manifest_path);
786  params.base_hash = manifest->catalog_hash();
787  } else {
788  if (with_gateway) {
789  manifest =
790  FetchRemoteManifest(params.stratum0, params.repo_name, shash::Any());
791  } else {
792  manifest = FetchRemoteManifest(params.stratum0, params.repo_name,
793  shash::Any());
794  // TODO(jblomer): revert to params.base_hash);
795  }
796  }
797  if (!manifest) {
798  return 3;
799  }
800 
801  StatisticsDatabase *stats_db =
803 
804  const std::string old_root_hash = manifest->catalog_hash().ToString(true);
805 
806  catalog::WritableCatalogManager catalog_manager(
807  params.base_hash, params.stratum0, params.dir_temp, spooler_catalogs,
808  download_manager(), params.enforce_limits, params.nested_kcatalog_limit,
809  params.root_kcatalog_limit, params.file_mbyte_limit, statistics(),
810  params.is_balanced, params.max_weight, params.min_weight);
811  catalog_manager.Init();
812 
813  publish::SyncMediator mediator(&catalog_manager, &params, publish_statistics);
814  LogCvmfs(kLogPublish, kLogStdout, "Processing changes...");
815 
816  // Should be before the syncronization starts to avoid race of GetTTL with
817  // other sqlite operations
818  if ((params.ttl_seconds > 0) &&
819  ((params.ttl_seconds != catalog_manager.GetTTL()) ||
820  !catalog_manager.HasExplicitTTL())) {
821  LogCvmfs(kLogCvmfs, kLogStdout, "Setting repository TTL to %" PRIu64 "s",
822  params.ttl_seconds);
823  catalog_manager.SetTTL(params.ttl_seconds);
824  }
825 
826  // Either real catalogs or virtual catalog
828  publish::SyncUnion *sync;
829  if (params.union_fs_type == "overlayfs") {
830  sync = new publish::SyncUnionOverlayfs(
831  &mediator, params.dir_rdonly, params.dir_union, params.dir_scratch);
832  } else if (params.union_fs_type == "aufs") {
833  sync = new publish::SyncUnionAufs(&mediator, params.dir_rdonly,
834  params.dir_union, params.dir_scratch);
835  } else {
836  LogCvmfs(kLogCvmfs, kLogStderr, "unknown union file system: %s",
837  params.union_fs_type.c_str());
838  return 3;
839  }
840 
841  if (!sync->Initialize()) {
843  "Initialization of the synchronisation "
844  "engine failed");
845  return 4;
846  }
847 
848  sync->Traverse();
849  } else {
850  assert(!manifest->history().IsNull());
851  catalog::VirtualCatalog virtual_catalog(
852  manifest.weak_ref(), download_manager(), &catalog_manager, &params);
853  virtual_catalog.Generate(params.virtual_dir_actions);
854  }
855 
856  if (!params.authz_file.empty()) {
858  "Adding contents of authz file %s to"
859  " root catalog.",
860  params.authz_file.c_str());
861  int fd = open(params.authz_file.c_str(), O_RDONLY);
862  if (fd == -1) {
864  "Unable to open authz file (%s)"
865  "from the publication process: %s",
866  params.authz_file.c_str(), strerror(errno));
867  return 7;
868  }
869 
870  std::string new_authz;
871  const bool read_successful = SafeReadToString(fd, &new_authz);
872  close(fd);
873 
874  if (!read_successful) {
875  LogCvmfs(kLogCvmfs, kLogStderr, "Failed to read authz file (%s): %s",
876  params.authz_file.c_str(), strerror(errno));
877  return 8;
878  }
879 
880  catalog_manager.SetVOMSAuthz(new_authz);
881  }
882 
883  if (!mediator.Commit(manifest.weak_ref())) {
884  PrintError("something went wrong during sync");
885  if (!params.dry_run) {
886  stats_db->StorePublishStatistics(this->statistics(), start_time, false);
887  if (upload_statsdb) {
888  stats_db->UploadStatistics(params.spooler);
889  }
890  }
891  return 5;
892  }
893 
894  perf::Counter *revision_counter = statistics()->Register("publish.revision",
895  "Published revision number");
896  revision_counter->Set(catalog_manager.GetRootCatalog()->revision());
897 
898  // finalize the spooler
899  LogCvmfs(kLogCvmfs, kLogStdout, "Wait for all uploads to finish");
900  params.spooler->WaitForUpload();
901  spooler_catalogs->WaitForUpload();
902  params.spooler->FinalizeSession(false);
903 
904  LogCvmfs(kLogCvmfs, kLogStdout, "Exporting repository manifest");
905 
906  // We call FinalizeSession(true) this time, to also trigger the commit
907  // operation on the gateway machine (if the upstream is of type "gw").
908 
909  // Get the path of the new root catalog
910  const std::string new_root_hash = manifest->catalog_hash().ToString(true);
911 
912  if (!spooler_catalogs->FinalizeSession(true, old_root_hash, new_root_hash,
913  params.repo_tag)) {
914  PrintError("Failed to commit transaction.");
915  if (!params.dry_run) {
916  stats_db->StorePublishStatistics(this->statistics(), start_time, false);
917  if (upload_statsdb) {
918  stats_db->UploadStatistics(params.spooler);
919  }
920  }
921  return 9;
922  }
923 
924  if (!params.dry_run) {
925  stats_db->StorePublishStatistics(this->statistics(), start_time, true);
926  if (upload_statsdb) {
927  stats_db->UploadStatistics(params.spooler);
928  }
929  }
930 
931  delete params.spooler;
932 
933  if (!manifest->Export(params.manifest_path)) {
934  PrintError("Failed to create new repository");
935  return 6;
936  }
937 
938  return 0;
939 }
bool Commit(manifest::Manifest *manifest)
#define LogCvmfs(source, mask,...)
Definition: logging.h:20
std::string database_file() const
Definition: reflog.cc:337
Algorithms ParseCompressionAlgorithm(const std::string &algorithm_option)
Definition: compression.cc:148
size_t avg_file_chunk_size
const manifest::Manifest * manifest() const
Definition: repository.h:123
struct stat64 platform_stat64
std::string stratum0
static const unsigned kActionNone
int Main(const ArgumentList &args)
size_t * save_to
DriverType driver_type
the type of the spooler driver
bool IsValid() const
Definition: dirtab.h:127
uint64_t revision() const
Definition: catalog.h:188
bool UploadStatistics(upload::Spooler *spooler, std::string local_path="")
T * weak_ref() const
Definition: pointer.h:43
SpoolerDefinition Dup2DefaultCompression() const
std::vector< Rule > Rules
Definition: dirtab.h:68
bool generate_legacy_bulk_chunks
static Watchdog * Create(const std::string &crash_dump_path)
Definition: monitor.cc:59
unsigned file_mbyte_limit
uint64_t max_concurrent_write_jobs
const int kDefaultFileMode
Definition: posix.h:31
static const unsigned kDefaultFileMbyteLimit
virtual bool Initialize()
Definition: sync_union.cc:24
std::string name_
static const unsigned kDefaultNestedKcatalogLimit
static const unsigned kDefaultRootKcatalogLimit
int Main(const ArgumentList &args)
size_t min_file_chunk_size
unsigned num_upload_tasks
std::string spooler_definition
std::string manifest_path
bool ReadFileChunkingArgs(const swissknife::ArgumentList &args, SyncParameters *params)
void SetTTL(const uint64_t new_ttl)
const Rules & positive_rules() const
Definition: dirtab.h:121
bool CheckParams(const SyncParameters &p)
zlib::Algorithms compression_alg
unsigned nested_kcatalog_limit
int GlobLstat(const char *name, struct stat *st)
std::string repo_name
void Spawn()
Definition: monitor.cc:376
assert((mem||(size==0))&&"Out Of Memory")
bool LookupPath(const PathString &path, const LookupOptions options, DirectoryEntry *entry)
std::string union_fs_type
std::string dir_scratch
int Main(const ArgumentList &args)
bool IsNestedCatalogMountpoint() const
virtual bool IsOpposing(const std::string &path) const
Definition: dirtab.cc:161
chunk_arg(char param, size_t *save_to)
bool IsValid(const std::string &input) const
Definition: sanitizer.cc:114
Algorithms
Definition: hash.h:39
std::string key_file
bool IsNestedCatalogRoot() const
bool FileExists(const std::string &path)
Definition: posix.cc:811
std::string dir_temp
int Main(const ArgumentList &args)
static manifest::Manifest * CreateRepository(const std::string &dir_temp, const bool volatile_content, const std::string &voms_authz, upload::Spooler *spooler)
std::string channel_
void Set(const int64_t val)
Definition: statistics.h:33
void FilterCandidatesFromGlobResult(const catalog::Dirtab &dirtab, char **paths, const size_t npaths, catalog::SimpleCatalogManager *catalog_manager, std::vector< std::string > *nested_catalog_candidates)
bool CheckParams(const swissknife::CommandLease::Parameters &p)
std::string dir_rdonly
const char kSuffixCatalog
Definition: hash.h:52
static bool ParseActions(const std::string &action_desc, unsigned *actions)
shash::Any base_hash
upload::Spooler * spooler
int platform_lstat(const char *path, platform_stat64 *buf)
Any MkFromHexPtr(const HexPtr hex, const char suffix)
Definition: hash.cc:83
static Dirtab * Create(const std::string &dirtab_path)
Definition: dirtab.h:84
void SetLogVerbosity(const LogLevels min_level)
Definition: logging.cc:207
int Main(const ArgumentList &args)
std::string dir_union
static void HashDatabase(const std::string &database_path, shash::Any *hash_reflog)
Definition: reflog.cc:322
int Main(const ArgumentList &args)
bool SetVOMSAuthz(const std::string &voms_authz)
bool IsValid() const
Definition: pointer.h:44
struct dirent * GlobReaddir(void *dirp)
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:264
void DropDatabaseFileOwnership()
Definition: reflog.cc:313
static StatisticsDatabase * OpenStandardDB(const std::string repo_name)
bool DirectoryExists(const std::string &path)
Definition: posix.cc:833
CatalogT * GetRootCatalog() const
Definition: catalog_mgr.h:192
bool stop_for_catalog_tweaks
bool SafeReadToString(int fd, std::string *final_result)
Definition: posix.cc:2000
unsigned root_kcatalog_limit
size_t max_file_chunk_size
bool StorePublishStatistics(const perf::Statistics *statistics, const std::string &start_time, const bool success)
uint64_t manual_revision
int GlobStat(const char *name, struct stat *st)
std::string session_token_file
std::string trusted_certs
bool ObtainDacReadSearchCapability()
virtual void Traverse()=0
uint64_t String2Uint64(const string &value)
Definition: string.cc:227
std::map< char, SharedPtr< std::string > > ArgumentList
Definition: swissknife.h:72
size_t RuleCount() const
Definition: dirtab.h:124
Algorithms ParseHashAlgorithm(const string &algorithm_option)
Definition: hash.cc:72
uint64_t ttl_seconds
std::string description_
std::string authz_file
void DetermineNestedCatalogCandidates(const catalog::Dirtab &dirtab, catalog::SimpleCatalogManager *catalog_manager, std::vector< std::string > *nested_catalog_candidates)
RepositoryTag repo_tag
const int kLogVerboseMsg
unsigned virtual_dir_actions
void Generate(int actions)
static bool WriteChecksum(const std::string &path, const shash::Any &value)
Definition: reflog.cc:64
bool ignore_xdir_hardlinks
std::string MakeCanonicalPath(const std::string &path)
Definition: posix.cc:95
void PrintError(const string &message)
Definition: logging.cc:465
const upload::Spooler * spooler_catalogs() const
Definition: repository.h:311
static const uint64_t kDefaultTTL
Definition: catalog.h:104
std::string GetGMTimestamp(std::string format)
Definition: string.cc:583
bool CreateCatalogMarkers(const std::vector< std::string > &new_nested_catalogs)
std::string public_keys