CernVM-FS  2.10.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
swissknife_sync.cc
Go to the documentation of this file.
1 
19 // NOLINTNEXTLINE
20 #define _FILE_OFFSET_BITS 64
21 // NOLINTNEXTLINE
22 #define __STDC_FORMAT_MACROS
23 
24 #include "swissknife_sync.h"
25 #include "cvmfs_config.h"
26 
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <glob.h>
30 #include <inttypes.h>
31 #include <limits.h>
32 #include <sys/capability.h>
33 
34 #include <cstdio>
35 #include <cstdlib>
36 #include <string>
37 #include <vector>
38 
39 #include "catalog_mgr_ro.h"
40 #include "catalog_mgr_rw.h"
41 #include "catalog_virtual.h"
42 #include "download.h"
43 #include "logging.h"
44 #include "manifest.h"
45 #include "monitor.h"
46 #include "path_filters/dirtab.h"
47 #include "platform.h"
48 #include "reflog.h"
49 #include "sanitizer.h"
50 #include "statistics.h"
51 #include "statistics_database.h"
53 #include "sync_mediator.h"
54 #include "sync_union.h"
55 #include "sync_union_aufs.h"
56 #include "sync_union_overlayfs.h"
57 #include "util/string.h"
58 
59 using namespace std; // NOLINT
60 
62  if (!DirectoryExists(p.dir_scratch)) {
63  PrintError("overlay (copy on write) directory does not exist");
64  return false;
65  }
66  if (!DirectoryExists(p.dir_union)) {
67  PrintError("union volume does not exist");
68  return false;
69  }
70  if (!DirectoryExists(p.dir_rdonly)) {
71  PrintError("cvmfs read/only repository does not exist");
72  return false;
73  }
74  if (p.stratum0 == "") {
75  PrintError("Stratum0 url missing");
76  return false;
77  }
78 
79  if (p.manifest_path == "") {
80  PrintError("manifest output required");
81  return false;
82  }
83  if (!DirectoryExists(p.dir_temp)) {
84  PrintError("data store directory does not exist");
85  return false;
86  }
87 
90  PrintError("file chunk size values are not sane");
91  return false;
92  }
93 
94  if (HasPrefix(p.spooler_definition, "gw", false)) {
95  if (p.session_token_file.empty()) {
96  PrintError(
97  "Session token file has to be provided "
98  "when upstream type is gw.");
99  return false;
100  }
101  }
102 
103  return true;
104 }
105 
107  const string manifest_path = *args.find('o')->second;
108  const string dir_temp = *args.find('t')->second;
109  const string spooler_definition = *args.find('r')->second;
110  const string repo_name = *args.find('n')->second;
111  const string reflog_chksum_path = *args.find('R')->second;
112  if (args.find('l') != args.end()) {
113  unsigned log_level =
114  kLogLevel0 << String2Uint64(*args.find('l')->second);
115  if (log_level > kLogNone) {
116  LogCvmfs(kLogCvmfs, kLogStderr, "invalid log level");
117  return 1;
118  }
119  SetLogVerbosity(static_cast<LogLevels>(log_level));
120  }
121  shash::Algorithms hash_algorithm = shash::kSha1;
122  if (args.find('a') != args.end()) {
123  hash_algorithm = shash::ParseHashAlgorithm(*args.find('a')->second);
124  if (hash_algorithm == shash::kAny) {
125  PrintError("unknown hash algorithm");
126  return 1;
127  }
128  }
129 
130  const bool volatile_content = (args.count('v') > 0);
131  const bool garbage_collectable = (args.count('z') > 0);
132  std::string voms_authz;
133  if (args.find('V') != args.end()) {
134  voms_authz = *args.find('V')->second;
135  }
136 
137  const upload::SpoolerDefinition sd(spooler_definition, hash_algorithm,
139  UniquePtr<upload::Spooler> spooler(upload::Spooler::Construct(sd));
140  assert(spooler.IsValid());
141 
144  dir_temp, volatile_content, voms_authz, spooler.weak_ref()));
145  if (!manifest.IsValid()) {
146  PrintError("Failed to create new repository");
147  return 1;
148  }
149 
150  UniquePtr<manifest::Reflog> reflog(CreateEmptyReflog(dir_temp, repo_name));
151  if (!reflog.IsValid()) {
152  PrintError("Failed to create fresh Reflog");
153  return 1;
154  }
155 
156  reflog->DropDatabaseFileOwnership();
157  string reflog_path = reflog->database_file();
158  reflog.Destroy();
159  shash::Any reflog_hash(hash_algorithm);
160  manifest::Reflog::HashDatabase(reflog_path, &reflog_hash);
161  spooler->UploadReflog(reflog_path);
162  spooler->WaitForUpload();
163  unlink(reflog_path.c_str());
164  if (spooler->GetNumberOfErrors()) {
165  LogCvmfs(kLogCvmfs, kLogStderr, "Failed to upload reflog");
166  return 4;
167  }
168  assert(!reflog_chksum_path.empty());
169  manifest::Reflog::WriteChecksum(reflog_chksum_path, reflog_hash);
170 
171  // set optional manifest fields
172  const bool needs_bootstrap_shortcuts = !voms_authz.empty();
173  manifest->set_garbage_collectability(garbage_collectable);
174  manifest->set_has_alt_catalog_path(needs_bootstrap_shortcuts);
175 
176  if (!manifest->Export(manifest_path)) {
177  PrintError("Failed to create new repository");
178  return 5;
179  }
180 
181  return 0;
182 }
183 
185  const string source = *args.find('i')->second;
186  const string dest = *args.find('o')->second;
187  const string spooler_definition = *args.find('r')->second;
188  shash::Algorithms hash_algorithm = shash::kSha1;
189  if (args.find('a') != args.end()) {
190  hash_algorithm = shash::ParseHashAlgorithm(*args.find('a')->second);
191  if (hash_algorithm == shash::kAny) {
192  PrintError("unknown hash algorithm");
193  return 1;
194  }
195  }
196 
197  const upload::SpoolerDefinition sd(spooler_definition, hash_algorithm);
198  upload::Spooler *spooler = upload::Spooler::Construct(sd);
199  assert(spooler);
200  spooler->Upload(source, dest);
201  spooler->WaitForUpload();
202 
203  if (spooler->GetNumberOfErrors() > 0) {
204  LogCvmfs(kLogCatalog, kLogStderr, "failed to upload %s", source.c_str());
205  return 1;
206  }
207 
208  delete spooler;
209 
210  return 0;
211 }
212 
214  const string file_to_peek = *args.find('d')->second;
215  const string spooler_definition = *args.find('r')->second;
216 
217  // Hash doesn't matter
218  const upload::SpoolerDefinition sd(spooler_definition, shash::kAny);
219  upload::Spooler *spooler = upload::Spooler::Construct(sd);
220  assert(spooler);
221  const bool success = spooler->Peek(file_to_peek);
222 
223  if (spooler->GetNumberOfErrors() > 0) {
224  LogCvmfs(kLogCatalog, kLogStderr, "failed to peek for %s",
225  file_to_peek.c_str());
226  return 2;
227  }
228  if (!success) {
229  LogCvmfs(kLogCatalog, kLogStdout, "%s not found", file_to_peek.c_str());
230  return 1;
231  }
232  LogCvmfs(kLogCatalog, kLogStdout, "%s available", file_to_peek.c_str());
233 
234  delete spooler;
235 
236  return 0;
237 }
238 
240  const string file_to_delete = *args.find('o')->second;
241  const string spooler_definition = *args.find('r')->second;
242 
243  // Hash doesn't matter
244  const upload::SpoolerDefinition sd(spooler_definition, shash::kAny);
245  upload::Spooler *spooler = upload::Spooler::Construct(sd);
246  assert(spooler);
247  spooler->RemoveAsync(file_to_delete);
248  spooler->WaitForUpload();
249 
250  if (spooler->GetNumberOfErrors() > 0) {
251  LogCvmfs(kLogCatalog, kLogStderr, "failed to delete %s",
252  file_to_delete.c_str());
253  return 1;
254  }
255 
256  delete spooler;
257 
258  return 0;
259 }
260 
262  const string dirtab_file = *args.find('d')->second;
263  union_dir_ = MakeCanonicalPath(*args.find('u')->second);
264  scratch_dir_ = MakeCanonicalPath(*args.find('s')->second);
265  const shash::Any base_hash = shash::MkFromHexPtr(
266  shash::HexPtr(*args.find('b')->second), shash::kSuffixCatalog);
267  const string stratum0 = *args.find('w')->second;
268  const string dir_temp = *args.find('t')->second;
269  verbose_ = (args.find('x') != args.end());
270 
271  // check if there is a dirtab file
272  if (!FileExists(dirtab_file)) {
274  "Didn't find a dirtab at '%s'. Skipping...", dirtab_file.c_str());
275  return 0;
276  }
277 
278  // parse dirtab file
279  catalog::Dirtab *dirtab = catalog::Dirtab::Create(dirtab_file);
280  if (!dirtab->IsValid()) {
281  LogCvmfs(kLogCatalog, kLogStderr, "Invalid or not readable dirtab '%s'",
282  dirtab_file.c_str());
283  return 1;
284  }
285  LogCvmfs(kLogCatalog, kLogVerboseMsg, "Found %d rules in dirtab '%s'",
286  dirtab->RuleCount(), dirtab_file.c_str());
287 
288  // initialize catalog infrastructure
289  const bool auto_manage_catalog_files = true;
290  const bool follow_redirects = (args.count('L') > 0);
291  if (!InitDownloadManager(follow_redirects)) {
292  return 1;
293  }
294  catalog::SimpleCatalogManager catalog_manager(
295  base_hash, stratum0, dir_temp, download_manager(), statistics(),
296  auto_manage_catalog_files);
297  catalog_manager.Init();
298 
299  vector<string> new_nested_catalogs;
300  DetermineNestedCatalogCandidates(*dirtab, &catalog_manager,
301  &new_nested_catalogs);
302  const bool success = CreateCatalogMarkers(new_nested_catalogs);
303  delete dirtab;
304 
305  return (success) ? 0 : 1;
306 }
307 
308 
309 namespace {
310 
311 // Overwrite directory traversal in the globbing in order to avoid breaking out
312 // the repository tree
313 
314 std::string *g_glob_uniondir = NULL;
315 
316 bool GlobCheckPath(const char *name) {
317  char resolved_cstr[PATH_MAX];
318  char *retval = realpath(name, resolved_cstr);
319  if (retval == NULL) return false;
320 
321  std::string resolved(resolved_cstr);
322  if (resolved == *g_glob_uniondir) return true;
323  if (!HasPrefix(resolved, (*g_glob_uniondir) + "/", false /*ignore_case*/)) {
324  errno = EACCES;
325  return false;
326  }
327  return true;
328 }
329 
330 void *GlobOpendir(const char *name) {
331  if (!GlobCheckPath(name)) return NULL;
332  return opendir(name);
333 }
334 
335 void GlobClosedir(void *dirp) {
336  closedir(static_cast<DIR *>(dirp));
337 }
338 
339 struct dirent *GlobReaddir(void *dirp) {
340  return readdir(static_cast<DIR *>(dirp));
341 }
342 
343 int GlobLstat(const char *name, struct stat *st) {
344  if (!GlobCheckPath(name)) return -1;
345  return lstat(name, st);
346 }
347 
348 int GlobStat(const char *name, struct stat *st) {
349  if (!GlobCheckPath(name)) return -1;
350  return stat(name, st);
351 }
352 
353 
354 } // anonymous namespace
355 
357  const catalog::Dirtab &dirtab,
358  catalog::SimpleCatalogManager *catalog_manager,
359  vector<string> *nested_catalog_candidates) {
360  // find possible new nested catalog locations
361  const catalog::Dirtab::Rules &lookup_rules = dirtab.positive_rules();
362  catalog::Dirtab::Rules::const_iterator i = lookup_rules.begin();
363  const catalog::Dirtab::Rules::const_iterator iend = lookup_rules.end();
364  for (; i != iend; ++i) {
365  assert(!i->is_negation);
366 
367  // run a glob using the current dirtab rule on the current repository
368  // state
369  const std::string &glob_string = i->pathspec.GetGlobString();
370  const std::string &glob_string_abs = union_dir_ + glob_string;
371  const int glob_flags = GLOB_ONLYDIR | GLOB_NOSORT | GLOB_PERIOD |
372  GLOB_ALTDIRFUNC;
373  glob_t glob_res;
374  g_glob_uniondir = new std::string(union_dir_);
375  glob_res.gl_opendir = GlobOpendir;
376  glob_res.gl_readdir = GlobReaddir;
377  glob_res.gl_closedir = GlobClosedir;
378  glob_res.gl_lstat = GlobLstat;
379  glob_res.gl_stat = GlobStat;
380  const int glob_retval =
381  glob(glob_string_abs.c_str(), glob_flags, NULL, &glob_res);
382  delete g_glob_uniondir;
383  g_glob_uniondir = NULL;
384 
385  if (glob_retval == 0) {
386  // found some candidates... filtering by cvmfs catalog structure
387  LogCvmfs(kLogCatalog, kLogDebug, "Found %d entries for pathspec (%s)",
388  glob_res.gl_pathc, glob_string.c_str());
389  FilterCandidatesFromGlobResult(dirtab, glob_res.gl_pathv,
390  glob_res.gl_pathc, catalog_manager,
391  nested_catalog_candidates);
392  } else if (glob_retval == GLOB_NOMATCH) {
393  LogCvmfs(kLogCvmfs, kLogStderr, "WARNING: cannot apply pathspec %s",
394  glob_string.c_str());
395  } else {
396  LogCvmfs(kLogCvmfs, kLogStderr, "Failed to run glob matching (%s)",
397  glob_string.c_str());
398  }
399 
400  globfree(&glob_res);
401  }
402 }
403 
405  const catalog::Dirtab &dirtab, char **paths, const size_t npaths,
406  catalog::SimpleCatalogManager *catalog_manager,
407  std::vector<std::string> *nested_catalog_candidates) {
408  // go through the paths produced by glob() and filter them
409  for (size_t i = 0; i < npaths; ++i) {
410  // process candidate paths
411  const std::string candidate(paths[i]);
412  const std::string candidate_rel = candidate.substr(union_dir_.size());
413 
414  // check if path points to a directory
415  platform_stat64 candidate_info;
416  const int lstat_retval = platform_lstat(candidate.c_str(), &candidate_info);
417  if (lstat_retval != 0) {
419  "Error in processing .cvmfsdirtab: cannot access %s (%d)",
420  candidate.c_str(), errno);
421  abort();
422  }
423  assert(lstat_retval == 0);
424  if (!S_ISDIR(candidate_info.st_mode)) {
425  // The GLOB_ONLYDIR flag is only a hint, non-directories can still be
426  // returned
428  "The '%s' dirtab entry does not point to a directory "
429  "but to a file or a symbolic link",
430  candidate_rel.c_str());
431  continue;
432  }
433 
434  // check if the path is a meta-directory (. or ..)
435  assert(candidate_rel.size() >= 2);
436  if (candidate_rel.substr(candidate_rel.size() - 2) == "/." ||
437  candidate_rel.substr(candidate_rel.size() - 3) == "/..") {
438  continue;
439  }
440 
441  // check that the path isn't excluded in the dirtab
442  if (dirtab.IsOpposing(candidate_rel)) {
443  LogCvmfs(kLogCatalog, kLogDebug, "Candidate '%s' is excluded by dirtab",
444  candidate_rel.c_str());
445  continue;
446  }
447 
448  // lookup the path in the catalog structure to find out if it already
449  // points to a nested catalog transition point. Furthermore it could be
450  // a new directory and thus not in any catalog yet.
452  const bool lookup_success = catalog_manager->LookupPath(
453  candidate_rel, catalog::kLookupSole, &dirent);
454  if (!lookup_success) {
456  "Didn't find '%s' in catalogs, could "
457  "be a new directory and nested catalog.",
458  candidate_rel.c_str());
459  nested_catalog_candidates->push_back(candidate);
460  } else if (!dirent.IsNestedCatalogMountpoint() &&
461  !dirent.IsNestedCatalogRoot()) {
463  "Found '%s' in catalogs but is not a "
464  "nested catalog yet.",
465  candidate_rel.c_str());
466  nested_catalog_candidates->push_back(candidate);
467  } else {
468  // check if the nested catalog marker is still there, we might need to
469  // recreate the catalog after manual marker removal
470  // Note: First we check if the parent directory shows up in the scratch
471  // space to verify that it was touched (copy-on-write)
472  // Otherwise we would force the cvmfs client behind the union
473  // file-
474  // system to (potentially) unncessarily fetch catalogs
475  if (DirectoryExists(scratch_dir_ + candidate_rel) &&
476  !FileExists(union_dir_ + candidate_rel + "/.cvmfscatalog")) {
478  "WARNING: '%s' should be a nested "
479  "catalog according to the dirtab. "
480  "Recreating...",
481  candidate_rel.c_str());
482  nested_catalog_candidates->push_back(candidate);
483  } else {
485  "Found '%s' in catalogs and it already is a nested catalog.",
486  candidate_rel.c_str());
487  }
488  }
489  }
490 }
491 
493  const std::vector<std::string> &new_nested_catalogs) {
494  // go through the new nested catalog paths and create .cvmfscatalog markers
495  // where necessary
496  bool success = true;
497  std::vector<std::string>::const_iterator k = new_nested_catalogs.begin();
498  const std::vector<std::string>::const_iterator kend =
499  new_nested_catalogs.end();
500  for (; k != kend; ++k) {
501  assert(!k->empty() && k->size() > union_dir_.size());
502 
503  // was the marker already created by hand?
504  const std::string marker_path = *k + "/.cvmfscatalog";
505  if (FileExists(marker_path)) {
506  continue;
507  }
508 
509  // create a nested catalog marker
510  const mode_t mode = kDefaultFileMode;
511  const int fd = open(marker_path.c_str(), O_CREAT, mode);
512  if (fd < 0) {
514  "Failed to create nested catalog marker "
515  "at '%s' (errno: %d)",
516  marker_path.c_str(), errno);
517  success = false;
518  continue;
519  }
520  close(fd);
521 
522  // inform the user if requested
523  if (verbose_) {
524  LogCvmfs(kLogCvmfs, kLogStdout, "Auto-creating nested catalog in %s",
525  k->c_str());
526  }
527  }
528 
529  return success;
530 }
531 
532 struct chunk_arg {
533  chunk_arg(char param, size_t *save_to) : param(param), save_to(save_to) {}
534  char param;
535  size_t *save_to;
536 };
537 
539  const swissknife::ArgumentList &args, SyncParameters *params) {
540  typedef std::vector<chunk_arg> ChunkArgs;
541 
542  // define where to store the value of which file chunk argument
543  ChunkArgs chunk_args;
544  chunk_args.push_back(chunk_arg('a', &params->avg_file_chunk_size));
545  chunk_args.push_back(chunk_arg('l', &params->min_file_chunk_size));
546  chunk_args.push_back(chunk_arg('h', &params->max_file_chunk_size));
547 
548  // read the arguments
549  ChunkArgs::const_iterator i = chunk_args.begin();
550  ChunkArgs::const_iterator iend = chunk_args.end();
551  for (; i != iend; ++i) {
552  swissknife::ArgumentList::const_iterator arg = args.find(i->param);
553 
554  if (arg != args.end()) {
555  size_t arg_value = static_cast<size_t>(String2Uint64(*arg->second));
556  if (arg_value > 0) {
557  *i->save_to = arg_value;
558  } else {
559  return false;
560  }
561  }
562  }
563 
564  // check if argument values are sane
565  return true;
566 }
567 
569  string start_time = GetGMTimestamp();
570 
571  // Spawn monitoring process (watchdog)
572  std::string watchdog_dir = "/tmp";
573  char watchdog_path[PATH_MAX];
574  std::string timestamp = GetGMTimestamp("%Y.%m.%d-%H.%M.%S");
575  int path_size = snprintf(watchdog_path, sizeof(watchdog_path),
576  "%s/cvmfs-swissknife-sync-stacktrace.%s.%d",
577  watchdog_dir.c_str(), timestamp.c_str(), getpid());
578  assert(path_size > 0);
579  assert(path_size < PATH_MAX);
580  UniquePtr<Watchdog> watchdog(Watchdog::Create(std::string(watchdog_path)));
581  watchdog->Spawn();
582 
583  SyncParameters params;
584 
585  // Initialization
586  params.dir_union = MakeCanonicalPath(*args.find('u')->second);
587  params.dir_scratch = MakeCanonicalPath(*args.find('s')->second);
588  params.dir_rdonly = MakeCanonicalPath(*args.find('c')->second);
589  params.dir_temp = MakeCanonicalPath(*args.find('t')->second);
590  params.base_hash = shash::MkFromHexPtr(shash::HexPtr(*args.find('b')->second),
592  params.stratum0 = *args.find('w')->second;
593  params.manifest_path = *args.find('o')->second;
594  params.spooler_definition = *args.find('r')->second;
595 
596  params.public_keys = *args.find('K')->second;
597  params.repo_name = *args.find('N')->second;
598 
600 
601  if (args.find('f') != args.end())
602  params.union_fs_type = *args.find('f')->second;
603  if (args.find('A') != args.end()) params.is_balanced = true;
604  if (args.find('x') != args.end()) params.print_changeset = true;
605  if (args.find('y') != args.end()) params.dry_run = true;
606  if (args.find('m') != args.end()) params.mucatalogs = true;
607  if (args.find('i') != args.end()) params.ignore_xdir_hardlinks = true;
608  if (args.find('d') != args.end()) params.stop_for_catalog_tweaks = true;
609  if (args.find('V') != args.end()) params.voms_authz = true;
610  if (args.find('F') != args.end()) params.authz_file = *args.find('F')->second;
611  if (args.find('k') != args.end()) params.include_xattrs = true;
612  if (args.find('Y') != args.end()) params.external_data = true;
613  if (args.find('W') != args.end()) params.direct_io = true;
614  if (args.find('S') != args.end()) {
616  *args.find('S')->second, &params.virtual_dir_actions);
617  if (!retval) {
618  LogCvmfs(kLogCvmfs, kLogStderr, "invalid virtual catalog options: %s",
619  args.find('S')->second->c_str());
620  return 1;
621  }
622  }
623  if (args.find('z') != args.end()) {
624  unsigned log_level =
625  1 << (kLogLevel0 + String2Uint64(*args.find('z')->second));
626  if (log_level > kLogNone) {
627  LogCvmfs(kLogCvmfs, kLogStderr, "invalid log level");
628  return 1;
629  }
630  SetLogVerbosity(static_cast<LogLevels>(log_level));
631  }
632 
633  if (args.find('X') != args.end())
634  params.max_weight = String2Uint64(*args.find('X')->second);
635  if (args.find('M') != args.end())
636  params.min_weight = String2Uint64(*args.find('M')->second);
637 
638  if (args.find('p') != args.end()) {
639  params.use_file_chunking = true;
640  if (!ReadFileChunkingArgs(args, &params)) {
641  PrintError("Failed to read file chunk size values");
642  return 2;
643  }
644  }
645  if (args.find('O') != args.end()) {
646  params.generate_legacy_bulk_chunks = true;
647  }
648  shash::Algorithms hash_algorithm = shash::kSha1;
649  if (args.find('e') != args.end()) {
650  hash_algorithm = shash::ParseHashAlgorithm(*args.find('e')->second);
651  if (hash_algorithm == shash::kAny) {
652  PrintError("unknown hash algorithm");
653  return 1;
654  }
655  }
656  if (args.find('Z') != args.end()) {
657  params.compression_alg =
658  zlib::ParseCompressionAlgorithm(*args.find('Z')->second);
659  }
660 
661  if (args.find('C') != args.end()) {
662  params.trusted_certs = *args.find('C')->second;
663  }
664 
665  if (args.find('E') != args.end()) params.enforce_limits = true;
666  if (args.find('Q') != args.end()) {
667  params.nested_kcatalog_limit = String2Uint64(*args.find('Q')->second);
668  } else {
670  }
671  if (args.find('R') != args.end()) {
672  params.root_kcatalog_limit = String2Uint64(*args.find('R')->second);
673  } else {
675  }
676  if (args.find('U') != args.end()) {
677  params.file_mbyte_limit = String2Uint64(*args.find('U')->second);
678  } else {
680  }
681 
682  if (args.find('v') != args.end()) {
683  sanitizer::IntegerSanitizer sanitizer;
684  if (!sanitizer.IsValid(*args.find('v')->second)) {
685  PrintError("invalid revision number");
686  return 1;
687  }
688  params.manual_revision = String2Uint64(*args.find('v')->second);
689  }
690 
691  params.branched_catalog = args.find('B') != args.end();
692 
693  if (args.find('q') != args.end()) {
694  params.max_concurrent_write_jobs = String2Uint64(*args.find('q')->second);
695  }
696 
697  if (args.find('0') != args.end()) {
698  params.num_upload_tasks = String2Uint64(*args.find('0')->second);
699  }
700 
701  if (args.find('T') != args.end()) {
702  params.ttl_seconds = String2Uint64(*args.find('T')->second);
703  }
704 
705  if (args.find('g') != args.end()) {
706  params.ignore_special_files = true;
707  }
708 
709  if (args.find('P') != args.end()) {
710  params.session_token_file = *args.find('P')->second;
711  }
712 
713  if (args.find('H') != args.end()) {
714  params.key_file = *args.find('H')->second;
715  }
716 
717  if (args.find('D') != args.end()) {
718  params.repo_tag.name_ = *args.find('D')->second;
719  }
720 
721  if (args.find('G') != args.end()) {
722  params.repo_tag.channel_ = *args.find('G')->second;
723  }
724 
725  if (args.find('J') != args.end()) {
726  params.repo_tag.description_ = *args.find('J')->second;
727  }
728 
729  const bool upload_statsdb = (args.count('I') > 0);
730 
731  if (!CheckParams(params)) return 2;
732  // This may fail, in which case a warning is printed and the process continues
734 
735  perf::StatisticsTemplate publish_statistics("publish", this->statistics());
736 
737  // Start spooler
738  upload::SpoolerDefinition spooler_definition(
739  params.spooler_definition, hash_algorithm, params.compression_alg,
742  params.max_file_chunk_size, params.session_token_file, params.key_file);
743  if (params.max_concurrent_write_jobs > 0) {
744  spooler_definition.number_of_concurrent_uploads =
746  }
747  spooler_definition.num_upload_tasks = params.num_upload_tasks;
748 
749  upload::SpoolerDefinition spooler_definition_catalogs(
750  spooler_definition.Dup2DefaultCompression());
751 
752  params.spooler = upload::Spooler::Construct(spooler_definition,
753  &publish_statistics);
754  if (NULL == params.spooler) return 3;
756  upload::Spooler::Construct(spooler_definition_catalogs,
757  &publish_statistics));
758  if (!spooler_catalogs.IsValid()) return 3;
759 
760  const bool follow_redirects = (args.count('L') > 0);
761  const string proxy = (args.count('@') > 0) ? *args.find('@')->second : "";
762  if (!InitDownloadManager(follow_redirects, proxy)) {
763  return 3;
764  }
765 
766  if (!InitVerifyingSignatureManager(params.public_keys,
767  params.trusted_certs)) {
768  return 3;
769  }
770 
771  /*
772  * Note: If the upstream is of type gateway, due to the possibility of
773  * concurrent release managers, it's possible to have a different local and
774  * remote root hashes. We proceed by loading the remote manifest but we give
775  * an empty base hash.
776  */
778  if (params.branched_catalog) {
779  // Throw-away manifest
780  manifest = new manifest::Manifest(shash::Any(), 0, "");
781  } else if (params.virtual_dir_actions !=
783  manifest = this->OpenLocalManifest(params.manifest_path);
784  params.base_hash = manifest->catalog_hash();
785  } else {
786  // TODO(jblomer): revert to params.base_hash if spooler driver type is not
787  // upload::SpoolerDefinition::Gateway
788  manifest =
789  FetchRemoteManifest(params.stratum0, params.repo_name, shash::Any());
790  }
791  if (!manifest.IsValid()) {
792  return 3;
793  }
794 
795  StatisticsDatabase *stats_db =
797 
798  const std::string old_root_hash = manifest->catalog_hash().ToString(true);
799 
800  catalog::WritableCatalogManager catalog_manager(
801  params.base_hash, params.stratum0, params.dir_temp,
802  spooler_catalogs.weak_ref(),
803  download_manager(), params.enforce_limits, params.nested_kcatalog_limit,
804  params.root_kcatalog_limit, params.file_mbyte_limit, statistics(),
805  params.is_balanced, params.max_weight, params.min_weight);
806  catalog_manager.Init();
807 
808  publish::SyncMediator mediator(&catalog_manager, &params, publish_statistics);
809  LogCvmfs(kLogPublish, kLogStdout, "Processing changes...");
810 
811  // Should be before the syncronization starts to avoid race of GetTTL with
812  // other sqlite operations
813  if ((params.ttl_seconds > 0) &&
814  ((params.ttl_seconds != catalog_manager.GetTTL()) ||
815  !catalog_manager.HasExplicitTTL())) {
816  LogCvmfs(kLogCvmfs, kLogStdout, "Setting repository TTL to %" PRIu64 "s",
817  params.ttl_seconds);
818  catalog_manager.SetTTL(params.ttl_seconds);
819  }
820 
821  // Either real catalogs or virtual catalog
823  publish::SyncUnion *sync;
824  if (params.union_fs_type == "overlayfs") {
825  sync = new publish::SyncUnionOverlayfs(
826  &mediator, params.dir_rdonly, params.dir_union, params.dir_scratch);
827  } else if (params.union_fs_type == "aufs") {
828  sync = new publish::SyncUnionAufs(&mediator, params.dir_rdonly,
829  params.dir_union, params.dir_scratch);
830  } else {
831  LogCvmfs(kLogCvmfs, kLogStderr, "unknown union file system: %s",
832  params.union_fs_type.c_str());
833  return 3;
834  }
835 
836  if (!sync->Initialize()) {
838  "Initialization of the synchronisation "
839  "engine failed");
840  return 4;
841  }
842 
843  sync->Traverse();
844  } else {
845  assert(!manifest->history().IsNull());
846  catalog::VirtualCatalog virtual_catalog(
847  manifest.weak_ref(), download_manager(), &catalog_manager, &params);
848  virtual_catalog.Generate(params.virtual_dir_actions);
849  }
850 
851  if (!params.authz_file.empty()) {
853  "Adding contents of authz file %s to"
854  " root catalog.",
855  params.authz_file.c_str());
856  int fd = open(params.authz_file.c_str(), O_RDONLY);
857  if (fd == -1) {
859  "Unable to open authz file (%s)"
860  "from the publication process: %s",
861  params.authz_file.c_str(), strerror(errno));
862  return 7;
863  }
864 
865  std::string new_authz;
866  const bool read_successful = SafeReadToString(fd, &new_authz);
867  close(fd);
868 
869  if (!read_successful) {
870  LogCvmfs(kLogCvmfs, kLogStderr, "Failed to read authz file (%s): %s",
871  params.authz_file.c_str(), strerror(errno));
872  return 8;
873  }
874 
875  catalog_manager.SetVOMSAuthz(new_authz);
876  }
877 
878  if (!mediator.Commit(manifest.weak_ref())) {
879  PrintError("something went wrong during sync");
880  if (!params.dry_run) {
881  stats_db->StorePublishStatistics(this->statistics(), start_time, false);
882  if (upload_statsdb) {
883  stats_db->UploadStatistics(params.spooler);
884  }
885  }
886  return 5;
887  }
888 
889  perf::Counter *revision_counter = statistics()->Register("publish.revision",
890  "Published revision number");
891  revision_counter->Set(static_cast<int64_t>(
892  catalog_manager.GetRootCatalog()->revision()));
893 
894  // finalize the spooler
895  LogCvmfs(kLogCvmfs, kLogStdout, "Wait for all uploads to finish");
896  params.spooler->WaitForUpload();
897  spooler_catalogs->WaitForUpload();
898  params.spooler->FinalizeSession(false);
899 
900  LogCvmfs(kLogCvmfs, kLogStdout, "Exporting repository manifest");
901 
902  // We call FinalizeSession(true) this time, to also trigger the commit
903  // operation on the gateway machine (if the upstream is of type "gw").
904 
905  // Get the path of the new root catalog
906  const std::string new_root_hash = manifest->catalog_hash().ToString(true);
907 
908  if (!spooler_catalogs->FinalizeSession(true, old_root_hash, new_root_hash,
909  params.repo_tag)) {
910  PrintError("Failed to commit transaction.");
911  if (!params.dry_run) {
912  stats_db->StorePublishStatistics(this->statistics(), start_time, false);
913  if (upload_statsdb) {
914  stats_db->UploadStatistics(params.spooler);
915  }
916  }
917  return 9;
918  }
919 
920  if (!params.dry_run) {
921  stats_db->StorePublishStatistics(this->statistics(), start_time, true);
922  if (upload_statsdb) {
923  stats_db->UploadStatistics(params.spooler);
924  }
925  }
926 
927  delete params.spooler;
928 
929  if (!manifest->Export(params.manifest_path)) {
930  PrintError("Failed to create new repository");
931  return 6;
932  }
933 
934  return 0;
935 }
bool Commit(manifest::Manifest *manifest)
#define LogCvmfs(source, mask,...)
Definition: logging.h:20
void SetLogVerbosity(const LogLevels max_level)
Definition: logging.cc:260
std::string database_file() const
Definition: reflog.cc:337
Algorithms ParseCompressionAlgorithm(const std::string &algorithm_option)
Definition: compression.cc:148
size_t avg_file_chunk_size
const manifest::Manifest * manifest() const
Definition: repository.h:123
struct stat64 platform_stat64
std::string stratum0
int Main(const ArgumentList &args)
size_t * save_to
std::string GetGMTimestamp(const std::string &format)
Definition: string.cc:585
bool IsValid() const
Definition: dirtab.h:127
bool UploadStatistics(upload::Spooler *spooler, std::string local_path="")
T * weak_ref() const
Definition: pointer.h:42
SpoolerDefinition Dup2DefaultCompression() const
std::vector< Rule > Rules
Definition: dirtab.h:68
bool generate_legacy_bulk_chunks
static Watchdog * Create(const std::string &crash_dump_path)
Definition: monitor.cc:59
unsigned file_mbyte_limit
uint64_t max_concurrent_write_jobs
const int kDefaultFileMode
Definition: posix.h:31
static const unsigned kDefaultFileMbyteLimit
virtual bool Initialize()
Definition: sync_union.cc:24
std::string name_
static const unsigned kDefaultNestedKcatalogLimit
static const unsigned kDefaultRootKcatalogLimit
int Main(const ArgumentList &args)
static bool ParseActions(const std::string &action_desc, int *actions)
size_t min_file_chunk_size
unsigned num_upload_tasks
std::string spooler_definition
std::string manifest_path
bool ReadFileChunkingArgs(const swissknife::ArgumentList &args, SyncParameters *params)
const Rules & positive_rules() const
Definition: dirtab.h:121
bool CheckParams(const SyncParameters &p)
zlib::Algorithms compression_alg
unsigned nested_kcatalog_limit
int GlobLstat(const char *name, struct stat *st)
std::string repo_name
void Spawn()
Definition: monitor.cc:376
assert((mem||(size==0))&&"Out Of Memory")
bool LookupPath(const PathString &path, const LookupOptions options, DirectoryEntry *entry)
std::string union_fs_type
std::string dir_scratch
int Main(const ArgumentList &args)
bool IsNestedCatalogMountpoint() const
virtual bool IsOpposing(const std::string &path) const
Definition: dirtab.cc:161
chunk_arg(char param, size_t *save_to)
bool IsValid(const std::string &input) const
Definition: sanitizer.cc:114
Algorithms
Definition: hash.h:40
std::string key_file
bool IsNestedCatalogRoot() const
bool FileExists(const std::string &path)
Definition: posix.cc:816
std::string dir_temp
int Main(const ArgumentList &args)
static manifest::Manifest * CreateRepository(const std::string &dir_temp, const bool volatile_content, const std::string &voms_authz, upload::Spooler *spooler)
std::string channel_
void Set(const int64_t val)
Definition: statistics.h:33
void FilterCandidatesFromGlobResult(const catalog::Dirtab &dirtab, char **paths, const size_t npaths, catalog::SimpleCatalogManager *catalog_manager, std::vector< std::string > *nested_catalog_candidates)
bool CheckParams(const swissknife::CommandLease::Parameters &p)
std::string dir_rdonly
const char kSuffixCatalog
Definition: hash.h:53
shash::Any base_hash
upload::Spooler * spooler
int platform_lstat(const char *path, platform_stat64 *buf)
Any MkFromHexPtr(const HexPtr hex, const char suffix)
Definition: hash.cc:83
static Dirtab * Create(const std::string &dirtab_path)
Definition: dirtab.h:84
static const int kActionNone
int Main(const ArgumentList &args)
std::string dir_union
static void HashDatabase(const std::string &database_path, shash::Any *hash_reflog)
Definition: reflog.cc:322
int Main(const ArgumentList &args)
bool IsValid() const
Definition: pointer.h:43
struct dirent * GlobReaddir(void *dirp)
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:265
void DropDatabaseFileOwnership()
Definition: reflog.cc:313
static StatisticsDatabase * OpenStandardDB(const std::string repo_name)
bool DirectoryExists(const std::string &path)
Definition: posix.cc:838
bool stop_for_catalog_tweaks
bool SafeReadToString(int fd, std::string *final_result)
Definition: posix.cc:2011
unsigned root_kcatalog_limit
size_t max_file_chunk_size
bool StorePublishStatistics(const perf::Statistics *statistics, const std::string &start_time, const bool success)
uint64_t manual_revision
int GlobStat(const char *name, struct stat *st)
std::string session_token_file
std::string trusted_certs
bool ObtainDacReadSearchCapability()
virtual void Traverse()=0
uint64_t String2Uint64(const string &value)
Definition: string.cc:228
std::map< char, SharedPtr< std::string > > ArgumentList
Definition: swissknife.h:72
size_t RuleCount() const
Definition: dirtab.h:124
Algorithms ParseHashAlgorithm(const string &algorithm_option)
Definition: hash.cc:72
uint64_t ttl_seconds
std::string description_
std::string authz_file
void DetermineNestedCatalogCandidates(const catalog::Dirtab &dirtab, catalog::SimpleCatalogManager *catalog_manager, std::vector< std::string > *nested_catalog_candidates)
RepositoryTag repo_tag
const int kLogVerboseMsg
void Generate(int actions)
static bool WriteChecksum(const std::string &path, const shash::Any &value)
Definition: reflog.cc:64
bool ignore_xdir_hardlinks
std::string MakeCanonicalPath(const std::string &path)
Definition: posix.cc:96
void PrintError(const string &message)
Definition: logging.cc:532
const upload::Spooler * spooler_catalogs() const
Definition: repository.h:319
static const uint64_t kDefaultTTL
Definition: catalog.h:104
bool CreateCatalogMarkers(const std::vector< std::string > &new_nested_catalogs)
std::string public_keys