CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
swissknife_sync.cc
Go to the documentation of this file.
1 
19 // NOLINTNEXTLINE
20 #define _FILE_OFFSET_BITS 64
21 // NOLINTNEXTLINE
22 #define __STDC_FORMAT_MACROS
23 
24 #include "swissknife_sync.h"
25 #include "cvmfs_config.h"
26 
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <glob.h>
30 #include <inttypes.h>
31 #include <limits.h>
32 #include <sys/capability.h>
33 
34 #include <cstdio>
35 #include <cstdlib>
36 #include <string>
37 #include <vector>
38 
39 #include "catalog_mgr_ro.h"
40 #include "catalog_mgr_rw.h"
41 #include "catalog_virtual.h"
42 #include "manifest.h"
43 #include "monitor.h"
44 #include "network/download.h"
45 #include "path_filters/dirtab.h"
46 #include "reflog.h"
47 #include "sanitizer.h"
48 #include "statistics.h"
49 #include "statistics_database.h"
51 #include "sync_mediator.h"
52 #include "sync_union.h"
53 #include "sync_union_aufs.h"
54 #include "sync_union_overlayfs.h"
55 #include "util/logging.h"
56 #include "util/platform.h"
57 #include "util/string.h"
58 
59 using namespace std; // NOLINT
60 
62  if (!DirectoryExists(p.dir_scratch)) {
63  PrintError("overlay (copy on write) directory does not exist");
64  return false;
65  }
66  if (!DirectoryExists(p.dir_union)) {
67  PrintError("union volume does not exist");
68  return false;
69  }
70  if (!DirectoryExists(p.dir_rdonly)) {
71  PrintError("cvmfs read/only repository does not exist");
72  return false;
73  }
74  if (p.stratum0 == "") {
75  PrintError("Stratum0 url missing");
76  return false;
77  }
78 
79  if (p.manifest_path == "") {
80  PrintError("manifest output required");
81  return false;
82  }
83  if (!DirectoryExists(p.dir_temp)) {
84  PrintError("data store directory does not exist");
85  return false;
86  }
87 
90  PrintError("file chunk size values are not sane");
91  return false;
92  }
93 
94  if (HasPrefix(p.spooler_definition, "gw", false)) {
95  if (p.session_token_file.empty()) {
96  PrintError(
97  "Session token file has to be provided "
98  "when upstream type is gw.");
99  return false;
100  }
101  }
102 
103  return true;
104 }
105 
107  const string manifest_path = *args.find('o')->second;
108  const string dir_temp = *args.find('t')->second;
109  const string spooler_definition = *args.find('r')->second;
110  const string repo_name = *args.find('n')->second;
111  const string reflog_chksum_path = *args.find('R')->second;
112  if (args.find('l') != args.end()) {
113  unsigned log_level =
114  kLogLevel0 << String2Uint64(*args.find('l')->second);
115  if (log_level > kLogNone) {
116  LogCvmfs(kLogCvmfs, kLogStderr, "invalid log level");
117  return 1;
118  }
119  SetLogVerbosity(static_cast<LogLevels>(log_level));
120  }
121  shash::Algorithms hash_algorithm = shash::kSha1;
122  if (args.find('a') != args.end()) {
123  hash_algorithm = shash::ParseHashAlgorithm(*args.find('a')->second);
124  if (hash_algorithm == shash::kAny) {
125  PrintError("unknown hash algorithm");
126  return 1;
127  }
128  }
129 
130  const bool volatile_content = (args.count('v') > 0);
131  const bool garbage_collectable = (args.count('z') > 0);
132  std::string voms_authz;
133  if (args.find('V') != args.end()) {
134  voms_authz = *args.find('V')->second;
135  }
136 
137  const upload::SpoolerDefinition sd(spooler_definition, hash_algorithm,
139  UniquePtr<upload::Spooler> spooler(upload::Spooler::Construct(sd));
140  assert(spooler.IsValid());
141 
144  dir_temp, volatile_content, voms_authz, spooler.weak_ref()));
145  if (!manifest.IsValid()) {
146  PrintError("Failed to create new repository");
147  return 1;
148  }
149 
150  UniquePtr<manifest::Reflog> reflog(CreateEmptyReflog(dir_temp, repo_name));
151  if (!reflog.IsValid()) {
152  PrintError("Failed to create fresh Reflog");
153  return 1;
154  }
155 
156  reflog->DropDatabaseFileOwnership();
157  string reflog_path = reflog->database_file();
158  reflog.Destroy();
159  shash::Any reflog_hash(hash_algorithm);
160  manifest::Reflog::HashDatabase(reflog_path, &reflog_hash);
161  spooler->UploadReflog(reflog_path);
162  spooler->WaitForUpload();
163  unlink(reflog_path.c_str());
164  if (spooler->GetNumberOfErrors()) {
165  LogCvmfs(kLogCvmfs, kLogStderr, "Failed to upload reflog");
166  return 4;
167  }
168  assert(!reflog_chksum_path.empty());
169  manifest::Reflog::WriteChecksum(reflog_chksum_path, reflog_hash);
170 
171  // set optional manifest fields
172  const bool needs_bootstrap_shortcuts = !voms_authz.empty();
173  manifest->set_garbage_collectability(garbage_collectable);
174  manifest->set_has_alt_catalog_path(needs_bootstrap_shortcuts);
175 
176  if (!manifest->Export(manifest_path)) {
177  PrintError("Failed to create new repository");
178  return 5;
179  }
180 
181  return 0;
182 }
183 
185  const string source = *args.find('i')->second;
186  const string dest = *args.find('o')->second;
187  const string spooler_definition = *args.find('r')->second;
188  shash::Algorithms hash_algorithm = shash::kSha1;
189  if (args.find('a') != args.end()) {
190  hash_algorithm = shash::ParseHashAlgorithm(*args.find('a')->second);
191  if (hash_algorithm == shash::kAny) {
192  PrintError("unknown hash algorithm");
193  return 1;
194  }
195  }
196 
197  const upload::SpoolerDefinition sd(spooler_definition, hash_algorithm);
198  upload::Spooler *spooler = upload::Spooler::Construct(sd);
199  assert(spooler);
200  spooler->Upload(source, dest);
201  spooler->WaitForUpload();
202 
203  if (spooler->GetNumberOfErrors() > 0) {
204  LogCvmfs(kLogCatalog, kLogStderr, "failed to upload %s", source.c_str());
205  return 1;
206  }
207 
208  delete spooler;
209 
210  return 0;
211 }
212 
214  const string file_to_peek = *args.find('d')->second;
215  const string spooler_definition = *args.find('r')->second;
216 
217  // Hash doesn't matter
218  const upload::SpoolerDefinition sd(spooler_definition, shash::kAny);
219  upload::Spooler *spooler = upload::Spooler::Construct(sd);
220  assert(spooler);
221  const bool success = spooler->Peek(file_to_peek);
222 
223  if (spooler->GetNumberOfErrors() > 0) {
224  LogCvmfs(kLogCatalog, kLogStderr, "failed to peek for %s",
225  file_to_peek.c_str());
226  return 2;
227  }
228  if (!success) {
229  LogCvmfs(kLogCatalog, kLogStdout, "%s not found", file_to_peek.c_str());
230  return 1;
231  }
232  LogCvmfs(kLogCatalog, kLogStdout, "%s available", file_to_peek.c_str());
233 
234  delete spooler;
235 
236  return 0;
237 }
238 
240  const string file_to_delete = *args.find('o')->second;
241  const string spooler_definition = *args.find('r')->second;
242 
243  // Hash doesn't matter
244  const upload::SpoolerDefinition sd(spooler_definition, shash::kAny);
245  upload::Spooler *spooler = upload::Spooler::Construct(sd);
246  assert(spooler);
247  spooler->RemoveAsync(file_to_delete);
248  spooler->WaitForUpload();
249 
250  if (spooler->GetNumberOfErrors() > 0) {
251  LogCvmfs(kLogCatalog, kLogStderr, "failed to delete %s",
252  file_to_delete.c_str());
253  return 1;
254  }
255 
256  delete spooler;
257 
258  return 0;
259 }
260 
262  const string dirtab_file = *args.find('d')->second;
263  union_dir_ = MakeCanonicalPath(*args.find('u')->second);
264  scratch_dir_ = MakeCanonicalPath(*args.find('s')->second);
265  const shash::Any base_hash = shash::MkFromHexPtr(
266  shash::HexPtr(*args.find('b')->second), shash::kSuffixCatalog);
267  const string stratum0 = *args.find('w')->second;
268  const string dir_temp = *args.find('t')->second;
269  verbose_ = (args.find('x') != args.end());
270 
271  // check if there is a dirtab file
272  if (!FileExists(dirtab_file)) {
274  "Didn't find a dirtab at '%s'. Skipping...", dirtab_file.c_str());
275  return 0;
276  }
277 
278  // parse dirtab file
279  catalog::Dirtab *dirtab = catalog::Dirtab::Create(dirtab_file);
280  if (!dirtab->IsValid()) {
281  LogCvmfs(kLogCatalog, kLogStderr, "Invalid or not readable dirtab '%s'",
282  dirtab_file.c_str());
283  return 1;
284  }
285  LogCvmfs(kLogCatalog, kLogVerboseMsg, "Found %lu rules in dirtab '%s'",
286  dirtab->RuleCount(), dirtab_file.c_str());
287 
288  // initialize catalog infrastructure
289  const bool auto_manage_catalog_files = true;
290  const bool follow_redirects = (args.count('L') > 0);
291  const string proxy = (args.count('@') > 0) ? *args.find('@')->second : "";
292  if (!InitDownloadManager(follow_redirects, proxy)) {
293  return 1;
294  }
295  catalog::SimpleCatalogManager catalog_manager(
296  base_hash, stratum0, dir_temp, download_manager(), statistics(),
297  auto_manage_catalog_files);
298  catalog_manager.Init();
299 
300  vector<string> new_nested_catalogs;
301  DetermineNestedCatalogCandidates(*dirtab, &catalog_manager,
302  &new_nested_catalogs);
303  const bool success = CreateCatalogMarkers(new_nested_catalogs);
304  delete dirtab;
305 
306  return (success) ? 0 : 1;
307 }
308 
309 
310 namespace {
311 
312 // Overwrite directory traversal in the globbing in order to avoid breaking out
313 // the repository tree
314 
315 std::string *g_glob_uniondir = NULL;
316 
317 bool GlobCheckPath(const char *name) {
318  char resolved_cstr[PATH_MAX];
319  char *retval = realpath(name, resolved_cstr);
320  if (retval == NULL) return false;
321 
322  std::string resolved(resolved_cstr);
323  if (resolved == *g_glob_uniondir) return true;
324  if (!HasPrefix(resolved, (*g_glob_uniondir) + "/", false /*ignore_case*/)) {
325  errno = EACCES;
326  return false;
327  }
328  return true;
329 }
330 
331 void *GlobOpendir(const char *name) {
332  if (!GlobCheckPath(name)) return NULL;
333  return opendir(name);
334 }
335 
336 void GlobClosedir(void *dirp) {
337  closedir(static_cast<DIR *>(dirp));
338 }
339 
340 struct dirent *GlobReaddir(void *dirp) {
341  return readdir(static_cast<DIR *>(dirp));
342 }
343 
344 int GlobLstat(const char *name, struct stat *st) {
345  if (!GlobCheckPath(name)) return -1;
346  return lstat(name, st);
347 }
348 
349 int GlobStat(const char *name, struct stat *st) {
350  if (!GlobCheckPath(name)) return -1;
351  return stat(name, st);
352 }
353 
354 
355 } // anonymous namespace
356 
358  const catalog::Dirtab &dirtab,
359  catalog::SimpleCatalogManager *catalog_manager,
360  vector<string> *nested_catalog_candidates) {
361  // find possible new nested catalog locations
362  const catalog::Dirtab::Rules &lookup_rules = dirtab.positive_rules();
363  catalog::Dirtab::Rules::const_iterator i = lookup_rules.begin();
364  const catalog::Dirtab::Rules::const_iterator iend = lookup_rules.end();
365  for (; i != iend; ++i) {
366  assert(!i->is_negation);
367 
368  // run a glob using the current dirtab rule on the current repository
369  // state
370  const std::string &glob_string = i->pathspec.GetGlobString();
371  const std::string &glob_string_abs = union_dir_ + glob_string;
372  const int glob_flags = GLOB_ONLYDIR | GLOB_NOSORT | GLOB_PERIOD |
373  GLOB_ALTDIRFUNC;
374  glob_t glob_res;
375  g_glob_uniondir = new std::string(union_dir_);
376  glob_res.gl_opendir = GlobOpendir;
377  glob_res.gl_readdir = GlobReaddir;
378  glob_res.gl_closedir = GlobClosedir;
379  glob_res.gl_lstat = GlobLstat;
380  glob_res.gl_stat = GlobStat;
381  const int glob_retval =
382  glob(glob_string_abs.c_str(), glob_flags, NULL, &glob_res);
383  delete g_glob_uniondir;
384  g_glob_uniondir = NULL;
385 
386  if (glob_retval == 0) {
387  // found some candidates... filtering by cvmfs catalog structure
388  LogCvmfs(kLogCatalog, kLogDebug, "Found %lu entries for pathspec (%s)",
389  glob_res.gl_pathc, glob_string.c_str());
390  FilterCandidatesFromGlobResult(dirtab, glob_res.gl_pathv,
391  glob_res.gl_pathc, catalog_manager,
392  nested_catalog_candidates);
393  } else if (glob_retval == GLOB_NOMATCH) {
394  LogCvmfs(kLogCvmfs, kLogStderr, "WARNING: cannot apply pathspec %s",
395  glob_string.c_str());
396  } else {
397  LogCvmfs(kLogCvmfs, kLogStderr, "Failed to run glob matching (%s)",
398  glob_string.c_str());
399  }
400 
401  globfree(&glob_res);
402  }
403 }
404 
406  const catalog::Dirtab &dirtab, char **paths, const size_t npaths,
407  catalog::SimpleCatalogManager *catalog_manager,
408  std::vector<std::string> *nested_catalog_candidates) {
409  // go through the paths produced by glob() and filter them
410  for (size_t i = 0; i < npaths; ++i) {
411  // process candidate paths
412  const std::string candidate(paths[i]);
413  const std::string candidate_rel = candidate.substr(union_dir_.size());
414 
415  // check if path points to a directory
416  platform_stat64 candidate_info;
417  const int lstat_retval = platform_lstat(candidate.c_str(), &candidate_info);
418  if (lstat_retval != 0) {
420  "Error in processing .cvmfsdirtab: cannot access %s (%d)",
421  candidate.c_str(), errno);
422  abort();
423  }
424  assert(lstat_retval == 0);
425  if (!S_ISDIR(candidate_info.st_mode)) {
426  // The GLOB_ONLYDIR flag is only a hint, non-directories can still be
427  // returned
429  "The '%s' dirtab entry does not point to a directory "
430  "but to a file or a symbolic link",
431  candidate_rel.c_str());
432  continue;
433  }
434 
435  // check if the path is a meta-directory (. or ..)
436  assert(candidate_rel.size() >= 2);
437  if (candidate_rel.substr(candidate_rel.size() - 2) == "/." ||
438  candidate_rel.substr(candidate_rel.size() - 3) == "/..") {
439  continue;
440  }
441 
442  // check that the path isn't excluded in the dirtab
443  if (dirtab.IsOpposing(candidate_rel)) {
444  LogCvmfs(kLogCatalog, kLogDebug, "Candidate '%s' is excluded by dirtab",
445  candidate_rel.c_str());
446  continue;
447  }
448 
449  // lookup the path in the catalog structure to find out if it already
450  // points to a nested catalog transition point. Furthermore it could be
451  // a new directory and thus not in any catalog yet.
453  const bool lookup_success = catalog_manager->LookupPath(
454  candidate_rel, catalog::kLookupDefault, &dirent);
455  if (!lookup_success) {
457  "Didn't find '%s' in catalogs, could "
458  "be a new directory and nested catalog.",
459  candidate_rel.c_str());
460  nested_catalog_candidates->push_back(candidate);
461  } else if (!dirent.IsNestedCatalogMountpoint() &&
462  !dirent.IsNestedCatalogRoot()) {
464  "Found '%s' in catalogs but is not a "
465  "nested catalog yet.",
466  candidate_rel.c_str());
467  nested_catalog_candidates->push_back(candidate);
468  } else {
469  // check if the nested catalog marker is still there, we might need to
470  // recreate the catalog after manual marker removal
471  // Note: First we check if the parent directory shows up in the scratch
472  // space to verify that it was touched (copy-on-write)
473  // Otherwise we would force the cvmfs client behind the union
474  // file-
475  // system to (potentially) unnecessarily fetch catalogs
476  if (DirectoryExists(scratch_dir_ + candidate_rel) &&
477  !FileExists(union_dir_ + candidate_rel + "/.cvmfscatalog")) {
479  "WARNING: '%s' should be a nested "
480  "catalog according to the dirtab. "
481  "Recreating...",
482  candidate_rel.c_str());
483  nested_catalog_candidates->push_back(candidate);
484  } else {
486  "Found '%s' in catalogs and it already is a nested catalog.",
487  candidate_rel.c_str());
488  }
489  }
490  }
491 }
492 
494  const std::vector<std::string> &new_nested_catalogs) {
495  // go through the new nested catalog paths and create .cvmfscatalog markers
496  // where necessary
497  bool success = true;
498  std::vector<std::string>::const_iterator k = new_nested_catalogs.begin();
499  const std::vector<std::string>::const_iterator kend =
500  new_nested_catalogs.end();
501  for (; k != kend; ++k) {
502  assert(!k->empty() && k->size() > union_dir_.size());
503 
504  // was the marker already created by hand?
505  const std::string marker_path = *k + "/.cvmfscatalog";
506  if (FileExists(marker_path)) {
507  continue;
508  }
509 
510  // create a nested catalog marker
511  const mode_t mode = kDefaultFileMode;
512  const int fd = open(marker_path.c_str(), O_CREAT, mode);
513  if (fd < 0) {
515  "Failed to create nested catalog marker "
516  "at '%s' (errno: %d)",
517  marker_path.c_str(), errno);
518  success = false;
519  continue;
520  }
521  close(fd);
522 
523  // inform the user if requested
524  if (verbose_) {
525  LogCvmfs(kLogCvmfs, kLogStdout, "Auto-creating nested catalog in %s",
526  k->c_str());
527  }
528  }
529 
530  return success;
531 }
532 
533 struct chunk_arg {
534  chunk_arg(char param, size_t *save_to) : param(param), save_to(save_to) {}
535  char param;
536  size_t *save_to;
537 };
538 
540  const swissknife::ArgumentList &args, SyncParameters *params) {
541  typedef std::vector<chunk_arg> ChunkArgs;
542 
543  // define where to store the value of which file chunk argument
544  ChunkArgs chunk_args;
545  chunk_args.push_back(chunk_arg('a', &params->avg_file_chunk_size));
546  chunk_args.push_back(chunk_arg('l', &params->min_file_chunk_size));
547  chunk_args.push_back(chunk_arg('h', &params->max_file_chunk_size));
548 
549  // read the arguments
550  ChunkArgs::const_iterator i = chunk_args.begin();
551  ChunkArgs::const_iterator iend = chunk_args.end();
552  for (; i != iend; ++i) {
553  swissknife::ArgumentList::const_iterator arg = args.find(i->param);
554 
555  if (arg != args.end()) {
556  size_t arg_value = static_cast<size_t>(String2Uint64(*arg->second));
557  if (arg_value > 0) {
558  *i->save_to = arg_value;
559  } else {
560  return false;
561  }
562  }
563  }
564 
565  // check if argument values are sane
566  return true;
567 }
568 
570  string start_time = GetGMTimestamp();
571 
572  // Spawn monitoring process (watchdog)
573  std::string watchdog_dir = "/tmp";
574  char watchdog_path[PATH_MAX];
575  std::string timestamp = GetGMTimestamp("%Y.%m.%d-%H.%M.%S");
576  int path_size = snprintf(watchdog_path, sizeof(watchdog_path),
577  "%s/cvmfs-swissknife-sync-stacktrace.%s.%d",
578  watchdog_dir.c_str(), timestamp.c_str(), getpid());
579  assert(path_size > 0);
580  assert(path_size < PATH_MAX);
581  UniquePtr<Watchdog> watchdog(Watchdog::Create(NULL));
582  watchdog->Spawn(std::string(watchdog_path));
583 
584  SyncParameters params;
585 
586  // Initialization
587  params.dir_union = MakeCanonicalPath(*args.find('u')->second);
588  params.dir_scratch = MakeCanonicalPath(*args.find('s')->second);
589  params.dir_rdonly = MakeCanonicalPath(*args.find('c')->second);
590  params.dir_temp = MakeCanonicalPath(*args.find('t')->second);
591  params.base_hash = shash::MkFromHexPtr(shash::HexPtr(*args.find('b')->second),
593  params.stratum0 = *args.find('w')->second;
594  params.manifest_path = *args.find('o')->second;
595  params.spooler_definition = *args.find('r')->second;
596 
597  params.public_keys = *args.find('K')->second;
598  params.repo_name = *args.find('N')->second;
599 
601 
602  if (args.find('f') != args.end())
603  params.union_fs_type = *args.find('f')->second;
604  if (args.find('A') != args.end()) params.is_balanced = true;
605  if (args.find('x') != args.end()) params.print_changeset = true;
606  if (args.find('y') != args.end()) params.dry_run = true;
607  if (args.find('m') != args.end()) params.mucatalogs = true;
608  if (args.find('i') != args.end()) params.ignore_xdir_hardlinks = true;
609  if (args.find('d') != args.end()) params.stop_for_catalog_tweaks = true;
610  if (args.find('V') != args.end()) params.voms_authz = true;
611  if (args.find('F') != args.end()) params.authz_file = *args.find('F')->second;
612  if (args.find('k') != args.end()) params.include_xattrs = true;
613  if (args.find('Y') != args.end()) params.external_data = true;
614  if (args.find('W') != args.end()) params.direct_io = true;
615  if (args.find('S') != args.end()) {
617  *args.find('S')->second, &params.virtual_dir_actions);
618  if (!retval) {
619  LogCvmfs(kLogCvmfs, kLogStderr, "invalid virtual catalog options: %s",
620  args.find('S')->second->c_str());
621  return 1;
622  }
623  }
624  if (args.find('z') != args.end()) {
625  unsigned log_level =
626  1 << (kLogLevel0 + String2Uint64(*args.find('z')->second));
627  if (log_level > kLogNone) {
628  LogCvmfs(kLogCvmfs, kLogStderr, "invalid log level");
629  return 1;
630  }
631  SetLogVerbosity(static_cast<LogLevels>(log_level));
632  }
633 
634  if (args.find('X') != args.end())
635  params.max_weight = String2Uint64(*args.find('X')->second);
636  if (args.find('M') != args.end())
637  params.min_weight = String2Uint64(*args.find('M')->second);
638 
639  if (args.find('p') != args.end()) {
640  params.use_file_chunking = true;
641  if (!ReadFileChunkingArgs(args, &params)) {
642  PrintError("Failed to read file chunk size values");
643  return 2;
644  }
645  }
646  if (args.find('O') != args.end()) {
647  params.generate_legacy_bulk_chunks = true;
648  }
649  shash::Algorithms hash_algorithm = shash::kSha1;
650  if (args.find('e') != args.end()) {
651  hash_algorithm = shash::ParseHashAlgorithm(*args.find('e')->second);
652  if (hash_algorithm == shash::kAny) {
653  PrintError("unknown hash algorithm");
654  return 1;
655  }
656  }
657  if (args.find('Z') != args.end()) {
658  params.compression_alg =
659  zlib::ParseCompressionAlgorithm(*args.find('Z')->second);
660  }
661 
662  if (args.find('E') != args.end()) params.enforce_limits = true;
663  if (args.find('Q') != args.end()) {
664  params.nested_kcatalog_limit = String2Uint64(*args.find('Q')->second);
665  } else {
667  }
668  if (args.find('R') != args.end()) {
669  params.root_kcatalog_limit = String2Uint64(*args.find('R')->second);
670  } else {
672  }
673  if (args.find('U') != args.end()) {
674  params.file_mbyte_limit = String2Uint64(*args.find('U')->second);
675  } else {
677  }
678 
679  if (args.find('v') != args.end()) {
680  sanitizer::IntegerSanitizer sanitizer;
681  if (!sanitizer.IsValid(*args.find('v')->second)) {
682  PrintError("invalid revision number");
683  return 1;
684  }
685  params.manual_revision = String2Uint64(*args.find('v')->second);
686  }
687 
688  params.branched_catalog = args.find('B') != args.end();
689 
690  if (args.find('q') != args.end()) {
691  params.max_concurrent_write_jobs = String2Uint64(*args.find('q')->second);
692  }
693 
694  if (args.find('0') != args.end()) {
695  params.num_upload_tasks = String2Uint64(*args.find('0')->second);
696  }
697 
698  if (args.find('T') != args.end()) {
699  params.ttl_seconds = String2Uint64(*args.find('T')->second);
700  }
701 
702  if (args.find('g') != args.end()) {
703  params.ignore_special_files = true;
704  }
705 
706  if (args.find('P') != args.end()) {
707  params.session_token_file = *args.find('P')->second;
708  }
709 
710  if (args.find('H') != args.end()) {
711  params.key_file = *args.find('H')->second;
712  }
713 
714  if (args.find('D') != args.end()) {
715  params.repo_tag.SetName(*args.find('D')->second);
716  }
717 
718  if (args.find('J') != args.end()) {
719  params.repo_tag.SetDescription(*args.find('J')->second);
720  }
721 
722  const bool upload_statsdb = (args.count('I') > 0);
723 
724  if (!CheckParams(params)) return 2;
725  // This may fail, in which case a warning is printed and the process continues
727 
728  perf::StatisticsTemplate publish_statistics("publish", this->statistics());
729 
730  // Start spooler
731  upload::SpoolerDefinition spooler_definition(
732  params.spooler_definition, hash_algorithm, params.compression_alg,
735  params.max_file_chunk_size, params.session_token_file, params.key_file);
736  if (params.max_concurrent_write_jobs > 0) {
737  spooler_definition.number_of_concurrent_uploads =
739  }
740  spooler_definition.num_upload_tasks = params.num_upload_tasks;
741 
742  upload::SpoolerDefinition spooler_definition_catalogs(
743  spooler_definition.Dup2DefaultCompression());
744 
745  params.spooler = upload::Spooler::Construct(spooler_definition,
746  &publish_statistics);
747  if (NULL == params.spooler) return 3;
749  upload::Spooler::Construct(spooler_definition_catalogs,
750  &publish_statistics));
751  if (!spooler_catalogs.IsValid()) return 3;
752 
753  const bool follow_redirects = (args.count('L') > 0);
754  const string proxy = (args.count('@') > 0) ? *args.find('@')->second : "";
755  if (!InitDownloadManager(follow_redirects, proxy)) {
756  return 3;
757  }
758 
759  if (!InitVerifyingSignatureManager(params.public_keys)) {
760  return 3;
761  }
762 
763  /*
764  * Note: If the upstream is of type gateway, due to the possibility of
765  * concurrent release managers, it's possible to have a different local and
766  * remote root hashes. We proceed by loading the remote manifest but we give
767  * an empty base hash.
768  */
770  if (params.branched_catalog) {
771  // Throw-away manifest
772  manifest = new manifest::Manifest(shash::Any(), 0, "");
773  } else if (params.virtual_dir_actions !=
775  manifest = this->OpenLocalManifest(params.manifest_path);
776  params.base_hash = manifest->catalog_hash();
777  } else {
778  // TODO(jblomer): revert to params.base_hash if spooler driver type is not
779  // upload::SpoolerDefinition::Gateway
780  manifest =
781  FetchRemoteManifest(params.stratum0, params.repo_name, shash::Any());
782  }
783  if (!manifest.IsValid()) {
784  return 3;
785  }
786 
787  StatisticsDatabase *stats_db =
789 
790  const std::string old_root_hash = manifest->catalog_hash().ToString(true);
791 
792  catalog::WritableCatalogManager catalog_manager(
793  params.base_hash, params.stratum0, params.dir_temp,
794  spooler_catalogs.weak_ref(),
795  download_manager(), params.enforce_limits, params.nested_kcatalog_limit,
796  params.root_kcatalog_limit, params.file_mbyte_limit, statistics(),
797  params.is_balanced, params.max_weight, params.min_weight);
798  catalog_manager.Init();
799 
800  publish::SyncMediator mediator(&catalog_manager, &params, publish_statistics);
801  LogCvmfs(kLogPublish, kLogStdout, "Processing changes...");
802 
803  // Should be before the synchronization starts to avoid race of GetTTL with
804  // other sqlite operations
805  if ((params.ttl_seconds > 0) &&
806  ((params.ttl_seconds != catalog_manager.GetTTL()) ||
807  !catalog_manager.HasExplicitTTL())) {
808  LogCvmfs(kLogCvmfs, kLogStdout, "Setting repository TTL to %" PRIu64 "s",
809  params.ttl_seconds);
810  catalog_manager.SetTTL(params.ttl_seconds);
811  }
812 
813  // Either real catalogs or virtual catalog
815  publish::SyncUnion *sync;
816  if (params.union_fs_type == "overlayfs") {
817  sync = new publish::SyncUnionOverlayfs(
818  &mediator, params.dir_rdonly, params.dir_union, params.dir_scratch);
819  } else if (params.union_fs_type == "aufs") {
820  sync = new publish::SyncUnionAufs(&mediator, params.dir_rdonly,
821  params.dir_union, params.dir_scratch);
822  } else {
823  LogCvmfs(kLogCvmfs, kLogStderr, "unknown union file system: %s",
824  params.union_fs_type.c_str());
825  return 3;
826  }
827 
828  if (!sync->Initialize()) {
830  "Initialization of the synchronisation "
831  "engine failed");
832  return 4;
833  }
834 
835  sync->Traverse();
836  } else {
837  assert(!manifest->history().IsNull());
838  catalog::VirtualCatalog virtual_catalog(
839  manifest.weak_ref(), download_manager(), &catalog_manager, &params);
840  virtual_catalog.Generate(params.virtual_dir_actions);
841  }
842 
843  if (!params.authz_file.empty()) {
845  "Adding contents of authz file %s to"
846  " root catalog.",
847  params.authz_file.c_str());
848  int fd = open(params.authz_file.c_str(), O_RDONLY);
849  if (fd == -1) {
851  "Unable to open authz file (%s)"
852  "from the publication process: %s",
853  params.authz_file.c_str(), strerror(errno));
854  return 7;
855  }
856 
857  std::string new_authz;
858  const bool read_successful = SafeReadToString(fd, &new_authz);
859  close(fd);
860 
861  if (!read_successful) {
862  LogCvmfs(kLogCvmfs, kLogStderr, "Failed to read authz file (%s): %s",
863  params.authz_file.c_str(), strerror(errno));
864  return 8;
865  }
866 
867  catalog_manager.SetVOMSAuthz(new_authz);
868  }
869 
870  if (!mediator.Commit(manifest.weak_ref())) {
871  PrintError("something went wrong during sync");
872  if (!params.dry_run) {
873  stats_db->StorePublishStatistics(this->statistics(), start_time, false);
874  if (upload_statsdb) {
875  stats_db->UploadStatistics(params.spooler);
876  }
877  }
878  return 5;
879  }
880 
881  perf::Counter *revision_counter = statistics()->Register("publish.revision",
882  "Published revision number");
883  revision_counter->Set(static_cast<int64_t>(
884  catalog_manager.GetRootCatalog()->revision()));
885 
886  // finalize the spooler
887  LogCvmfs(kLogCvmfs, kLogStdout, "Wait for all uploads to finish");
888  params.spooler->WaitForUpload();
889  spooler_catalogs->WaitForUpload();
890  params.spooler->FinalizeSession(false);
891 
892  LogCvmfs(kLogCvmfs, kLogStdout, "Exporting repository manifest");
893 
894  // We call FinalizeSession(true) this time, to also trigger the commit
895  // operation on the gateway machine (if the upstream is of type "gw").
896 
897  // Get the path of the new root catalog
898  const std::string new_root_hash = manifest->catalog_hash().ToString(true);
899 
900  if (!spooler_catalogs->FinalizeSession(true, old_root_hash, new_root_hash,
901  params.repo_tag)) {
902  PrintError("Failed to commit transaction.");
903  if (!params.dry_run) {
904  stats_db->StorePublishStatistics(this->statistics(), start_time, false);
905  if (upload_statsdb) {
906  stats_db->UploadStatistics(params.spooler);
907  }
908  }
909  return 9;
910  }
911 
912  if (!params.dry_run) {
913  stats_db->StorePublishStatistics(this->statistics(), start_time, true);
914  if (upload_statsdb) {
915  stats_db->UploadStatistics(params.spooler);
916  }
917  }
918 
919  delete params.spooler;
920 
921  if (!manifest->Export(params.manifest_path)) {
922  PrintError("Failed to create new repository");
923  return 6;
924  }
925 
926  return 0;
927 }
bool Commit(manifest::Manifest *manifest)
void SetLogVerbosity(const LogLevels max_level)
Definition: logging.cc:261
std::string database_file() const
Definition: reflog.cc:337
Algorithms ParseCompressionAlgorithm(const std::string &algorithm_option)
Definition: compression.cc:148
size_t avg_file_chunk_size
const manifest::Manifest * manifest() const
Definition: repository.h:125
struct stat64 platform_stat64
std::string stratum0
int Main(const ArgumentList &args)
size_t * save_to
std::string GetGMTimestamp(const std::string &format)
Definition: string.cc:615
bool IsValid() const
Definition: dirtab.h:127
bool UploadStatistics(upload::Spooler *spooler, std::string local_path="")
T * weak_ref() const
Definition: pointer.h:42
SpoolerDefinition Dup2DefaultCompression() const
std::vector< Rule > Rules
Definition: dirtab.h:68
bool generate_legacy_bulk_chunks
unsigned file_mbyte_limit
uint64_t max_concurrent_write_jobs
const int kDefaultFileMode
Definition: posix.h:32
static const unsigned kDefaultFileMbyteLimit
virtual bool Initialize()
Definition: sync_union.cc:24
static const unsigned kDefaultNestedKcatalogLimit
static const unsigned kDefaultRootKcatalogLimit
int Main(const ArgumentList &args)
static bool ParseActions(const std::string &action_desc, int *actions)
size_t min_file_chunk_size
unsigned num_upload_tasks
std::string spooler_definition
std::string manifest_path
bool ReadFileChunkingArgs(const swissknife::ArgumentList &args, SyncParameters *params)
void SetName(const std::string &name)
const Rules & positive_rules() const
Definition: dirtab.h:121
bool CheckParams(const SyncParameters &p)
zlib::Algorithms compression_alg
unsigned nested_kcatalog_limit
int GlobLstat(const char *name, struct stat *st)
std::string repo_name
assert((mem||(size==0))&&"Out Of Memory")
bool LookupPath(const PathString &path, const LookupOptions options, DirectoryEntry *entry)
void SetDescription(const std::string &description)
std::string union_fs_type
const unsigned kLookupDefault
Definition: catalog_mgr.h:43
std::string dir_scratch
int Main(const ArgumentList &args)
bool IsNestedCatalogMountpoint() const
virtual bool IsOpposing(const std::string &path) const
Definition: dirtab.cc:161
chunk_arg(char param, size_t *save_to)
bool IsValid(const std::string &input) const
Definition: sanitizer.cc:114
Algorithms
Definition: hash.h:41
std::string key_file
bool IsNestedCatalogRoot() const
bool FileExists(const std::string &path)
Definition: posix.cc:791
std::string dir_temp
int Main(const ArgumentList &args)
static Watchdog * Create(FnOnCrash on_crash)
Definition: monitor.cc:70
static manifest::Manifest * CreateRepository(const std::string &dir_temp, const bool volatile_content, const std::string &voms_authz, upload::Spooler *spooler)
void Set(const int64_t val)
Definition: statistics.h:33
void FilterCandidatesFromGlobResult(const catalog::Dirtab &dirtab, char **paths, const size_t npaths, catalog::SimpleCatalogManager *catalog_manager, std::vector< std::string > *nested_catalog_candidates)
bool CheckParams(const swissknife::CommandLease::Parameters &p)
std::string dir_rdonly
const char kSuffixCatalog
Definition: hash.h:54
shash::Any base_hash
upload::Spooler * spooler
int platform_lstat(const char *path, platform_stat64 *buf)
static Dirtab * Create(const std::string &dirtab_path)
Definition: dirtab.h:84
static const int kActionNone
int Main(const ArgumentList &args)
std::string dir_union
static void HashDatabase(const std::string &database_path, shash::Any *hash_reflog)
Definition: reflog.cc:322
int Main(const ArgumentList &args)
bool IsValid() const
Definition: pointer.h:43
struct dirent * GlobReaddir(void *dirp)
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:267
void DropDatabaseFileOwnership()
Definition: reflog.cc:313
static StatisticsDatabase * OpenStandardDB(const std::string repo_name)
bool DirectoryExists(const std::string &path)
Definition: posix.cc:813
bool stop_for_catalog_tweaks
bool SafeReadToString(int fd, std::string *final_result)
Definition: posix.cc:2068
unsigned root_kcatalog_limit
size_t max_file_chunk_size
bool StorePublishStatistics(const perf::Statistics *statistics, const std::string &start_time, const bool success)
uint64_t manual_revision
int GlobStat(const char *name, struct stat *st)
std::string session_token_file
bool ObtainDacReadSearchCapability()
virtual void Traverse()=0
uint64_t String2Uint64(const string &value)
Definition: string.cc:228
std::map< char, SharedPtr< std::string > > ArgumentList
Definition: swissknife.h:72
size_t RuleCount() const
Definition: dirtab.h:124
Algorithms ParseHashAlgorithm(const string &algorithm_option)
Definition: hash.cc:72
uint64_t ttl_seconds
std::string authz_file
Any MkFromHexPtr(const HexPtr hex, const char suffix)
Definition: hash.cc:83
void DetermineNestedCatalogCandidates(const catalog::Dirtab &dirtab, catalog::SimpleCatalogManager *catalog_manager, std::vector< std::string > *nested_catalog_candidates)
RepositoryTag repo_tag
const int kLogVerboseMsg
void Generate(int actions)
void Spawn(const std::string &crash_dump_path)
Definition: monitor.cc:510
static bool WriteChecksum(const std::string &path, const shash::Any &value)
Definition: reflog.cc:64
bool ignore_xdir_hardlinks
std::string MakeCanonicalPath(const std::string &path)
Definition: posix.cc:98
void PrintError(const string &message)
Definition: logging.cc:543
const upload::Spooler * spooler_catalogs() const
Definition: repository.h:322
static const uint64_t kDefaultTTL
Definition: catalog.h:104
bool CreateCatalogMarkers(const std::vector< std::string > &new_nested_catalogs)
std::string public_keys
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528