CernVM-FS  2.10.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
swissknife_migrate.cc
Go to the documentation of this file.
1 
8 #include "swissknife_migrate.h"
9 
10 #include <sys/resource.h>
11 #include <unistd.h>
12 
13 #include "catalog_rw.h"
14 #include "catalog_sql.h"
15 #include "catalog_virtual.h"
16 #include "compression.h"
17 #include "hash.h"
18 #include "logging.h"
19 #include "swissknife_history.h"
20 #include "util_concurrency.h"
21 
22 using namespace std; // NOLINT
23 
24 namespace swissknife {
25 
26 catalog::DirectoryEntry CommandMigrate::nested_catalog_marker_;
27 
28 CommandMigrate::CommandMigrate() :
29  file_descriptor_limit_(8192),
30  catalog_count_(0),
31  has_committed_new_revision_(false),
32  uid_(0),
33  gid_(0),
34  root_catalog_(NULL)
35 {
36  atomic_init32(&catalogs_processed_);
37 }
38 
39 
41  ParameterList r;
42  r.push_back(Parameter::Mandatory('v',
43  "migration base version ( 2.0.x | 2.1.7 | chown | hardlink | bulkhash | "
44  "stats)"));
45  r.push_back(Parameter::Mandatory('r',
46  "repository URL (absolute local path or remote URL)"));
47  r.push_back(Parameter::Mandatory('u', "upstream definition string"));
48  r.push_back(Parameter::Mandatory('o', "manifest output file"));
49  r.push_back(Parameter::Mandatory('t',
50  "temporary directory for catalog decompress"));
51  r.push_back(Parameter::Optional('p',
52  "user id to be used for this repository"));
53  r.push_back(Parameter::Optional('g',
54  "group id to be used for this repository"));
55  r.push_back(Parameter::Optional('n', "fully qualified repository name"));
56  r.push_back(Parameter::Optional('h', "root hash (other than trunk)"));
57  r.push_back(Parameter::Optional('k', "repository master key(s)"));
58  r.push_back(Parameter::Optional('i', "UID map for chown"));
59  r.push_back(Parameter::Optional('j', "GID map for chown"));
60  r.push_back(Parameter::Switch('f', "fix nested catalog transition points"));
61  r.push_back(Parameter::Switch('l', "disable linkcount analysis of files"));
62  r.push_back(Parameter::Switch('s',
63  "enable collection of catalog statistics"));
64  return r;
65 }
66 
67 
68 static void Error(const std::string &message) {
69  LogCvmfs(kLogCatalog, kLogStderr, message.c_str());
70 }
71 
72 
73 static void Error(const std::string &message,
74  const CommandMigrate::PendingCatalog *catalog) {
75  const std::string err_msg = message + "\n"
76  "Catalog: " + catalog->root_path();
77  Error(err_msg);
78 }
79 
80 
81 static void Error(const std::string &message,
82  const catalog::SqlCatalog &statement,
83  const CommandMigrate::PendingCatalog *catalog) {
84  const std::string err_msg =
85  message + "\n"
86  "SQLite: " + StringifyInt(statement.GetLastError()) +
87  " - " + statement.GetLastErrorMsg();
88  Error(err_msg, catalog);
89 }
90 
91 
93  shash::Any manual_root_hash;
94  const std::string &migration_base = *args.find('v')->second;
95  const std::string &repo_url = *args.find('r')->second;
96  const std::string &spooler = *args.find('u')->second;
97  const std::string &manifest_path = *args.find('o')->second;
98  const std::string &tmp_dir = *args.find('t')->second;
99  const std::string &uid = (args.count('p') > 0) ?
100  *args.find('p')->second :
101  "";
102  const std::string &gid = (args.count('g') > 0) ?
103  *args.find('g')->second :
104  "";
105  const std::string &repo_name = (args.count('n') > 0) ?
106  *args.find('n')->second :
107  "";
108  const std::string &repo_keys = (args.count('k') > 0) ?
109  *args.find('k')->second :
110  "";
111  const std::string &uid_map_path = (args.count('i') > 0) ?
112  *args.find('i')->second :
113  "";
114  const std::string &gid_map_path = (args.count('j') > 0) ?
115  *args.find('j')->second :
116  "";
117  const bool fix_transition_points = (args.count('f') > 0);
118  const bool analyze_file_linkcounts = (args.count('l') == 0);
119  const bool collect_catalog_statistics = (args.count('s') > 0);
120  if (args.count('h') > 0) {
121  manual_root_hash = shash::MkFromHexPtr(shash::HexPtr(
122  *args.find('h')->second), shash::kSuffixCatalog);
123  }
124 
125  // We might need a lot of file descriptors
126  if (!RaiseFileDescriptorLimit()) {
127  Error("Failed to raise file descriptor limits");
128  return 2;
129  }
130 
131  // Put SQLite into multithreaded mode
132  if (!ConfigureSQLite()) {
133  Error("Failed to preconfigure SQLite library");
134  return 3;
135  }
136 
137  // Create an upstream spooler
138  temporary_directory_ = tmp_dir;
139  const upload::SpoolerDefinition spooler_definition(spooler, shash::kSha1);
140  spooler_ = upload::Spooler::Construct(spooler_definition);
141  if (!spooler_.IsValid()) {
142  Error("Failed to create upstream Spooler.");
143  return 5;
144  }
145  spooler_->RegisterListener(&CommandMigrate::UploadCallback, this);
146 
147  // Load the full catalog hierarchy
148  LogCvmfs(kLogCatalog, kLogStdout, "Loading current catalog tree...");
149 
151  bool loading_successful = false;
152  if (IsHttpUrl(repo_url)) {
154 
155  const bool follow_redirects = false;
156  if (!this->InitDownloadManager(follow_redirects) ||
157  !this->InitVerifyingSignatureManager(repo_keys)) {
158  LogCvmfs(kLogCatalog, kLogStderr, "Failed to init repo connection");
159  return 1;
160  }
161 
162  ObjectFetcher fetcher(repo_name,
163  repo_url,
164  tmp_dir,
167 
168  loading_successful = LoadCatalogs(manual_root_hash, &fetcher);
169  } else {
171  ObjectFetcher fetcher(repo_url, tmp_dir);
172  loading_successful = LoadCatalogs(manual_root_hash, &fetcher);
173  }
175 
176  if (!loading_successful) {
177  Error("Failed to load catalog tree");
178  return 4;
179  }
180 
181  LogCvmfs(kLogCatalog, kLogStdout, "Loaded %d catalogs", catalog_count_);
182  assert(root_catalog_ != NULL);
183 
184  // Do the actual migration step
185  bool migration_succeeded = false;
186  if (migration_base == "2.0.x") {
187  if (!ReadPersona(uid, gid)) {
188  return 1;
189  }
190 
191  // Generate and upload a nested catalog marker
193  Error("Failed to create a nested catalog marker.");
194  return 6;
195  }
196  spooler_->WaitForUpload();
197 
198  // Configure the concurrent catalog migration facility
200  collect_catalog_statistics,
201  fix_transition_points,
202  analyze_file_linkcounts,
203  uid_,
204  gid_);
205  migration_succeeded =
206  DoMigrationAndCommit<MigrationWorker_20x>(manifest_path, &context);
207  } else if (migration_base == "2.1.7") {
208  MigrationWorker_217::worker_context context(temporary_directory_,
209  collect_catalog_statistics);
210  migration_succeeded =
211  DoMigrationAndCommit<MigrationWorker_217>(manifest_path, &context);
212  } else if (migration_base == "chown") {
213  UidMap uid_map;
214  GidMap gid_map;
215  if (!ReadPersonaMaps(uid_map_path, gid_map_path, &uid_map, &gid_map)) {
216  Error("Failed to read UID and/or GID map");
217  return 1;
218  }
220  collect_catalog_statistics,
221  uid_map,
222  gid_map);
223  migration_succeeded =
224  DoMigrationAndCommit<ChownMigrationWorker>(manifest_path, &context);
225  } else if (migration_base == "hardlink") {
226  HardlinkRemovalMigrationWorker::worker_context
227  context(temporary_directory_, collect_catalog_statistics);
228  migration_succeeded =
229  DoMigrationAndCommit<HardlinkRemovalMigrationWorker>(manifest_path,
230  &context);
231  } else if (migration_base == "bulkhash") {
232  BulkhashRemovalMigrationWorker::worker_context
233  context(temporary_directory_, collect_catalog_statistics);
234  migration_succeeded =
235  DoMigrationAndCommit<BulkhashRemovalMigrationWorker>(manifest_path,
236  &context);
237  } else if (migration_base == "stats") {
238  StatsMigrationWorker::worker_context context(
239  temporary_directory_, collect_catalog_statistics);
240  migration_succeeded =
241  DoMigrationAndCommit<StatsMigrationWorker>(manifest_path, &context);
242  } else {
243  const std::string err_msg = "Unknown migration base: " + migration_base;
244  Error(err_msg);
245  return 1;
246  }
247 
248  // Check if everything went well
249  if (!migration_succeeded) {
250  Error("Migration failed!");
251  return 5;
252  }
253 
254  // Analyze collected statistics
255  if (collect_catalog_statistics && has_committed_new_revision_) {
256  LogCvmfs(kLogCatalog, kLogStdout, "\nCollected statistics results:");
258  }
259 
260  LogCvmfs(kLogCatalog, kLogStdout, "\nCatalog Migration succeeded");
261  return 0;
262 }
263 
264 
265 bool CommandMigrate::ReadPersona(const std::string &uid,
266  const std::string &gid) {
267  if (uid.empty()) {
268  Error("Please provide a user ID");
269  return false;
270  }
271  if (gid.empty()) {
272  Error("Please provide a group ID");
273  return false;
274  }
275 
276  uid_ = String2Int64(uid);
277  gid_ = String2Int64(gid);
278  return true;
279 }
280 
281 
282 
283 bool CommandMigrate::ReadPersonaMaps(const std::string &uid_map_path,
284  const std::string &gid_map_path,
285  UidMap *uid_map,
286  GidMap *gid_map) const {
287  if (!uid_map->Read(uid_map_path) || !uid_map->IsValid()) {
288  Error("Failed to read UID map");
289  return false;
290  }
291 
292  if (!gid_map->Read(gid_map_path) || !gid_map->IsValid()) {
293  Error("Failed to read GID map");
294  return false;
295  }
296 
297  if (uid_map->RuleCount() == 0 && !uid_map->HasDefault()) {
298  Error("UID map appears to be empty");
299  return false;
300  }
301 
302  if (gid_map->RuleCount() == 0 && !gid_map->HasDefault()) {
303  Error("GID map appears to be empty");
304  return false;
305  }
306 
307  return true;
308 }
309 
310 
312  const upload::SpoolerResult &result,
313  Future<shash::Any> *hash)
314 {
315  assert(!result.IsChunked());
316  if (result.return_code != 0) {
317  LogCvmfs(kLogCvmfs, kLogStderr, "failed to upload history database (%d)",
318  result.return_code);
319  hash->Set(shash::Any());
320  } else {
321  hash->Set(result.content_hash);
322  }
323 }
324 
325 
327  PendingCatalog *root_catalog,
328  unsigned revision,
329  time_t timestamp,
330  shash::Any *history_hash)
331 {
332  string filename_old = history_upstream_->filename();
333  string filename_new = filename_old + ".new";
334  bool retval = CopyPath2Path(filename_old, filename_new);
335  if (!retval) return false;
338  history->TakeDatabaseFileOwnership();
339 
340  history::History::Tag tag_trunk;
341  bool exists = history->GetByName(CommandTag::kHeadTag, &tag_trunk);
342  if (exists) {
343  retval = history->Remove(CommandTag::kHeadTag);
344  if (!retval) return false;
345 
346  history::History::Tag tag_trunk_previous = tag_trunk;
347  tag_trunk_previous.name = CommandTag::kPreviousHeadTag;
349  history->Remove(CommandTag::kPreviousHeadTag);
350 
351  tag_trunk.root_hash = root_catalog->new_catalog_hash;
352  tag_trunk.size = root_catalog->new_catalog_size;
353  tag_trunk.revision = root_catalog->new_catalog_size;
354  tag_trunk.revision = revision;
355  tag_trunk.timestamp = timestamp;
356 
357  retval = history->Insert(tag_trunk_previous);
358  if (!retval) return false;
359  retval = history->Insert(tag_trunk);
360  if (!retval) return false;
361  }
362 
363  history->SetPreviousRevision(manifest_upstream_->history());
364  history->DropDatabaseFileOwnership();
365  history.Destroy();
366 
367  Future<shash::Any> history_hash_new;
368  upload::Spooler::CallbackPtr callback = spooler_->RegisterListener(
369  &CommandMigrate::UploadHistoryClosure, this, &history_hash_new);
370  spooler_->ProcessHistory(filename_new);
371  spooler_->WaitForUpload();
372  spooler_->UnregisterListener(callback);
373  unlink(filename_new.c_str());
374  *history_hash = history_hash_new.Get();
375  if (history_hash->IsNull()) {
376  Error("failed to upload tag database");
377  return false;
378  }
379 
380  return true;
381 }
382 
383 
384 template <class MigratorT>
386  const std::string &manifest_path,
387  typename MigratorT::worker_context *context
388 ) {
389  // Create a concurrent migration context for catalog migration
390  const unsigned int cpus = GetNumberOfCpuCores();
391  ConcurrentWorkers<MigratorT> concurrent_migration(cpus, cpus * 10, context);
392 
393  if (!concurrent_migration.Initialize()) {
394  Error("Failed to initialize worker migration system.");
395  return false;
396  }
398  this);
399 
400  // Migrate catalogs recursively (starting with the deepest nested catalogs)
401  LogCvmfs(kLogCatalog, kLogStdout, "\nMigrating catalogs...");
402  PendingCatalog *root_catalog = new PendingCatalog(root_catalog_);
404  ConvertCatalogsRecursively(root_catalog, &concurrent_migration);
405  concurrent_migration.WaitForEmptyQueue();
406  spooler_->WaitForUpload();
407  spooler_->UnregisterListeners();
409 
410  // check for possible errors during the migration process
411  const unsigned int errors = concurrent_migration.GetNumberOfFailedJobs() +
412  spooler_->GetNumberOfErrors();
414  "Catalog Migration finished with %d errors.", errors);
415  if (errors > 0) {
417  "\nCatalog Migration produced errors\nAborting...");
418  return false;
419  }
420 
421  if (root_catalog->was_updated.Get()) {
423  "\nCommitting migrated repository revision...");
425  manifest.set_catalog_hash(root_catalog->new_catalog_hash);
426  manifest.set_catalog_size(root_catalog->new_catalog_size);
427  manifest.set_root_path(root_catalog->root_path());
428  const catalog::Catalog* new_catalog = (root_catalog->HasNew())
429  ? root_catalog->new_catalog
430  : root_catalog->old_catalog;
431  manifest.set_ttl(new_catalog->GetTTL());
432  manifest.set_revision(new_catalog->GetRevision());
433 
434  // Commit the new (migrated) repository revision...
435  if (history_upstream_.IsValid()) {
436  shash::Any history_hash(manifest_upstream_->history());
438  "Updating repository tag database... ");
439  if (!UpdateUndoTags(root_catalog,
440  new_catalog->GetRevision(),
441  new_catalog->GetLastModified(),
442  &history_hash))
443  {
444  Error("Updateing tag database failed.\nAborting...");
445  return false;
446  }
447  manifest.set_history(history_hash);
448  LogCvmfs(kLogCvmfs, kLogStdout, "%s", history_hash.ToString().c_str());
449  }
450 
451  if (!manifest.Export(manifest_path)) {
452  Error("Manifest export failed.\nAborting...");
453  return false;
454  }
456  } else {
458  "\nNo catalogs migrated, skipping the commit...");
459  }
460 
461  // Get rid of the open root catalog
462  delete root_catalog;
463 
464  return true;
465 }
466 
467 
470  std::string tree_indent;
471  std::string hash_string;
472  std::string path;
473 
474  for (unsigned int i = 1; i < data.tree_level; ++i) {
475  tree_indent += "\u2502 ";
476  }
477 
478  if (data.tree_level > 0) {
479  tree_indent += "\u251C\u2500 ";
480  }
481 
482  hash_string = data.catalog_hash.ToString();
483 
484  path = data.catalog->mountpoint().ToString();
485  if (path.empty()) {
486  path = "/";
487  root_catalog_ = data.catalog;
488  }
489 
490  LogCvmfs(kLogCatalog, kLogStdout, "%s%s %s",
491  tree_indent.c_str(),
492  hash_string.c_str(),
493  path.c_str());
494 
495  ++catalog_count_;
496 }
497 
498 
500  // Check if the migration of the catalog was successful
501  if (!data->success) {
502  Error("Catalog migration failed! Aborting...");
503  exit(1);
504  return;
505  }
506 
507  if (!data->HasChanges()) {
508  PrintStatusMessage(data, data->GetOldContentHash(), "preserved");
509  data->was_updated.Set(false);
510  return;
511  }
512 
513  const string &path = (data->HasNew()) ? data->new_catalog->database_path()
514  : data->old_catalog->database_path();
515 
516  // Save the processed catalog in the pending map
517  {
519  assert(pending_catalogs_.find(path) == pending_catalogs_.end());
520  pending_catalogs_[path] = data;
521  }
523 
524  // check the size of the uncompressed catalog file
525  size_t new_catalog_size = GetFileSize(path);
526  if (new_catalog_size <= 0) {
527  Error("Failed to get uncompressed file size of catalog!", data);
528  exit(2);
529  return;
530  }
531  data->new_catalog_size = new_catalog_size;
532 
533  // Schedule the compression and upload of the catalog
534  spooler_->ProcessCatalog(path);
535 }
536 
537 
539  const string &path = result.local_path;
540 
541  // Check if the upload was successful
542  if (result.return_code != 0) {
543  Error("Failed to upload file " + path + "\nAborting...");
544  exit(2);
545  return;
546  }
547  assert(result.file_chunks.size() == 0);
548 
549  // Remove the just uploaded file
550  unlink(path.c_str());
551 
552  // Uploaded nested catalog marker... generate and cache DirectoryEntry for it
553  if (path == nested_catalog_marker_tmp_path_) {
555  return;
556  } else {
557  // Find the catalog path in the pending catalogs and remove it from the list
558  PendingCatalog *catalog;
559  {
561  PendingCatalogMap::iterator i = pending_catalogs_.find(path);
562  assert(i != pending_catalogs_.end());
563  catalog = const_cast<PendingCatalog*>(i->second);
564  pending_catalogs_.erase(i);
565  }
566 
567  PrintStatusMessage(catalog, result.content_hash, "migrated and uploaded");
568 
569  // The catalog is completely processed... fill the content_hash to allow the
570  // processing of parent catalogs (Notified by 'was_updated'-future)
571  // NOTE: From now on, this PendingCatalog structure could be deleted and
572  // should not be used anymore!
573  catalog->new_catalog_hash = result.content_hash;
574  catalog->was_updated.Set(true);
575  }
576 }
577 
578 
580  const shash::Any &content_hash,
581  const std::string &message) {
582  atomic_inc32(&catalogs_processed_);
583  const unsigned int processed = (atomic_read32(&catalogs_processed_) * 100) /
585  LogCvmfs(kLogCatalog, kLogStdout, "[%d%%] %s %sC %s",
586  processed,
587  message.c_str(),
588  content_hash.ToString().c_str(),
589  catalog->root_path().c_str());
590 }
591 
592 
593 template <class MigratorT>
595  MigratorT *migrator) {
596  // First migrate all nested catalogs (depth first traversal)
597  const catalog::CatalogList nested_catalogs =
598  catalog->old_catalog->GetChildren();
599  catalog::CatalogList::const_iterator i = nested_catalogs.begin();
600  catalog::CatalogList::const_iterator iend = nested_catalogs.end();
601  catalog->nested_catalogs.reserve(nested_catalogs.size());
602  for (; i != iend; ++i) {
603  PendingCatalog *new_nested = new PendingCatalog(*i);
604  catalog->nested_catalogs.push_back(new_nested);
605  ConvertCatalogsRecursively(new_nested, migrator);
606  }
607 
608  // Migrate this catalog referencing all its (already migrated) children
609  migrator->Schedule(catalog);
610 }
611 
612 
614  struct rlimit rpl;
615  memset(&rpl, 0, sizeof(rpl));
616  getrlimit(RLIMIT_NOFILE, &rpl);
617  if (rpl.rlim_cur < file_descriptor_limit_) {
618  if (rpl.rlim_max < file_descriptor_limit_)
619  rpl.rlim_max = file_descriptor_limit_;
620  rpl.rlim_cur = file_descriptor_limit_;
621  const bool retval = setrlimit(RLIMIT_NOFILE, &rpl);
622  if (retval != 0) {
623  return false;
624  }
625  }
626  return true;
627 }
628 
629 
631  int retval = sqlite3_config(SQLITE_CONFIG_MULTITHREAD);
632  return (retval == SQLITE_OK);
633 }
634 
635 
637  const unsigned int number_of_catalogs = catalog_statistics_list_.size();
638  unsigned int aggregated_entry_count = 0;
639  unsigned int aggregated_max_row_id = 0;
640  unsigned int aggregated_hardlink_count = 0;
641  unsigned int aggregated_linkcounts = 0;
642  double aggregated_migration_time = 0.0;
643 
644  CatalogStatisticsList::const_iterator i = catalog_statistics_list_.begin();
645  CatalogStatisticsList::const_iterator iend = catalog_statistics_list_.end();
646  for (; i != iend; ++i) {
647  aggregated_entry_count += i->entry_count;
648  aggregated_max_row_id += i->max_row_id;
649  aggregated_hardlink_count += i->hardlink_group_count;
650  aggregated_linkcounts += i->aggregated_linkcounts;
651  aggregated_migration_time += i->migration_time;
652  }
653 
654  // Inode quantization
655  assert(aggregated_max_row_id > 0);
656  const unsigned int unused_inodes =
657  aggregated_max_row_id - aggregated_entry_count;
658  const float ratio =
659  (static_cast<float>(unused_inodes) /
660  static_cast<float>(aggregated_max_row_id)) * 100.0f;
661  LogCvmfs(kLogCatalog, kLogStdout, "Actual Entries: %d\n"
662  "Allocated Inodes: %d\n"
663  " Unused Inodes: %d\n"
664  " Percentage of wasted Inodes: %.1f%%\n",
665  aggregated_entry_count, aggregated_max_row_id, unused_inodes, ratio);
666 
667  // Hardlink statistics
668  const float average_linkcount = (aggregated_hardlink_count > 0)
669  ? aggregated_linkcounts /
670  aggregated_hardlink_count
671  : 0.0f;
672  LogCvmfs(kLogCatalog, kLogStdout, "Generated Hardlink Groups: %d\n"
673  "Average Linkcount per Group: %.1f\n",
674  aggregated_hardlink_count, average_linkcount);
675 
676  // Performance measures
677  const double average_migration_time =
678  aggregated_migration_time / static_cast<double>(number_of_catalogs);
679  LogCvmfs(kLogCatalog, kLogStdout, "Catalog Loading Time: %.2fs\n"
680  "Average Migration Time: %.2fs\n"
681  "Overall Migration Time: %.2fs\n"
682  "Aggregated Migration Time: %.2fs\n",
684  average_migration_time,
686  aggregated_migration_time);
687 }
688 
689 
691  delete old_catalog;
692  old_catalog = NULL;
693 
694  if (new_catalog != NULL) {
695  delete new_catalog;
696  new_catalog = NULL;
697  }
698 }
699 
700 
701 template<class DerivedT>
703  const worker_context *context)
704  : temporary_directory_(context->temporary_directory)
705  , collect_catalog_statistics_(context->collect_catalog_statistics)
706 { }
707 
708 
709 template<class DerivedT>
711 
712 
713 template<class DerivedT>
715  const expected_data &data) {
717  const bool success = static_cast<DerivedT*>(this)->RunMigration(data) &&
718  UpdateNestedCatalogReferences(data) &&
719  UpdateCatalogMetadata(data) &&
720  CollectAndAggregateStatistics(data) &&
721  CleanupNestedCatalogs(data);
722  data->success = success;
724 
727 
728  // Note: MigrationCallback() will take care of the result...
729  if (success) {
731  } else {
733  }
734 }
735 
736 
737 template<class DerivedT>
740 {
741  const catalog::Catalog *new_catalog =
742  (data->HasNew()) ? data->new_catalog : data->old_catalog;
743  const catalog::CatalogDatabase &writable = new_catalog->database();
744 
745  catalog::SqlCatalog add_nested_catalog(writable,
746  "INSERT OR REPLACE INTO nested_catalogs (path, sha1, size) "
747  " VALUES (:path, :sha1, :size);");
748 
749  // go through all nested catalogs and update their references (we are curently
750  // in their parent catalog)
751  // Note: we might need to wait for the nested catalog to be fully processed.
752  PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
753  PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
754  for (; i != iend; ++i) {
755  PendingCatalog *nested_catalog = *i;
756 
757  if (!nested_catalog->was_updated.Get()) {
758  continue;
759  }
760 
761  const std::string &root_path = nested_catalog->root_path();
762  const shash::Any catalog_hash = nested_catalog->new_catalog_hash;
763  const size_t catalog_size = nested_catalog->new_catalog_size;
764 
765  // insert the updated nested catalog reference into the new catalog
766  const bool retval =
767  add_nested_catalog.BindText(1, root_path) &&
768  add_nested_catalog.BindText(2, catalog_hash.ToString()) &&
769  add_nested_catalog.BindInt64(3, catalog_size) &&
770  add_nested_catalog.Execute();
771  if (!retval) {
772  Error("Failed to add nested catalog link", add_nested_catalog, data);
773  return false;
774  }
775  add_nested_catalog.Reset();
776  }
777 
778  return true;
779 }
780 
781 
782 template<class DerivedT>
785 {
786  if (!data->HasChanges()) {
787  return true;
788  }
789 
790  catalog::WritableCatalog *catalog =
791  (data->HasNew()) ? data->new_catalog : GetWritable(data->old_catalog);
792 
793  // Set the previous revision hash in the new catalog to the old catalog
794  // we are doing the whole migration as a new snapshot that does not change
795  // any files, but just applies the necessary data schema migrations
796  catalog->SetPreviousRevision(data->old_catalog->hash());
797  catalog->IncrementRevision();
798  catalog->UpdateLastModified();
799 
800  return true;
801 }
802 
803 
804 template<class DerivedT>
807 {
808  if (!collect_catalog_statistics_) {
809  return true;
810  }
811 
812  const catalog::Catalog *new_catalog =
813  (data->HasNew()) ? data->new_catalog : data->old_catalog;
814  const catalog::CatalogDatabase &writable = new_catalog->database();
815  bool retval;
816 
817  // Find out the discrepancy between MAX(rowid) and COUNT(*)
818  catalog::SqlCatalog wasted_inodes(writable,
819  "SELECT COUNT(*), MAX(rowid) FROM catalog;");
820  retval = wasted_inodes.FetchRow();
821  if (!retval) {
822  Error("Failed to count entries in catalog", wasted_inodes, data);
823  return false;
824  }
825  const unsigned int entry_count = wasted_inodes.RetrieveInt64(0);
826  const unsigned int max_row_id = wasted_inodes.RetrieveInt64(1);
827 
828  // Save collected information into the central statistics aggregator
829  data->statistics.root_path = data->root_path();
830  data->statistics.max_row_id = max_row_id;
831  data->statistics.entry_count = entry_count;
832 
833  return true;
834 }
835 
836 
837 template<class DerivedT>
839  PendingCatalog *data) const
840 {
841  // All nested catalogs of PendingCatalog 'data' are fully processed and
842  // accounted. It is safe to get rid of their data structures here!
843  PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
844  PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
845  for (; i != iend; ++i) {
846  delete *i;
847  }
848 
849  data->nested_catalogs.clear();
850  return true;
851 }
852 
853 
862 
863 
864 template<class DerivedT>
867  const catalog::Catalog *catalog) const {
868  return dynamic_cast<catalog::WritableCatalog*>(const_cast<catalog::Catalog*>(
869  catalog));
870 }
871 
872 
873 //------------------------------------------------------------------------------
874 
875 
877  const worker_context *context)
879  , fix_nested_catalog_transitions_(context->fix_nested_catalog_transitions)
880  , analyze_file_linkcounts_(context->analyze_file_linkcounts)
881  , uid_(context->uid)
882  , gid_(context->gid) { }
883 
884 
886  const
887 {
888  // double-check that we are generating compatible catalogs to the actual
889  // catalog management classes
892 
893  return CreateNewEmptyCatalog(data) &&
894  CheckDatabaseSchemaCompatibility(data) &&
895  AttachOldCatalogDatabase(data) &&
896  StartDatabaseTransaction(data) &&
897  MigrateFileMetadata(data) &&
898  MigrateNestedCatalogMountPoints(data) &&
899  FixNestedCatalogTransitionPoints(data) &&
900  RemoveDanglingNestedMountpoints(data) &&
901  GenerateCatalogStatistics(data) &&
902  FindRootEntryInformation(data) &&
903  CommitDatabaseTransaction(data) &&
904  DetachOldCatalogDatabase(data);
905 }
906 
908  PendingCatalog *data) const
909 {
910  const string root_path = data->root_path();
911 
912  // create a new catalog database schema
913  const string clg_db_path =
914  CreateTempPath(temporary_directory_ + "/catalog", 0666);
915  if (clg_db_path.empty()) {
916  Error("Failed to create temporary file for the new catalog database.");
917  return false;
918  }
919  const bool volatile_content = false;
920 
921  {
922  // TODO(rmeusel): Attach catalog should work with an open catalog database
923  // as well, to remove this inefficiency
925  new_clg_db(catalog::CatalogDatabase::Create(clg_db_path));
926  if (!new_clg_db.IsValid() ||
927  !new_clg_db->InsertInitialValues(root_path, volatile_content, "")) {
928  Error("Failed to create database for new catalog");
929  unlink(clg_db_path.c_str());
930  return false;
931  }
932  }
933 
934  // Attach the just created nested catalog database
935  catalog::WritableCatalog *writable_catalog =
936  catalog::WritableCatalog::AttachFreely(root_path, clg_db_path,
938  if (writable_catalog == NULL) {
939  Error("Failed to open database for new catalog");
940  unlink(clg_db_path.c_str());
941  return false;
942  }
943 
944  data->new_catalog = writable_catalog;
945  return true;
946 }
947 
948 
950  PendingCatalog *data) const
951 {
952  const catalog::CatalogDatabase &old_catalog = data->old_catalog->database();
953  const catalog::CatalogDatabase &new_catalog = data->new_catalog->database();
954 
955  if ((new_catalog.schema_version() <
958  ||
959  new_catalog.schema_version() >
962  ||
963  (old_catalog.schema_version() > 2.1 +
965  {
966  Error("Failed to meet database requirements for migration.", data);
967  return false;
968  }
969  return true;
970 }
971 
972 
974  PendingCatalog *data) const
975 {
976  const catalog::CatalogDatabase &old_catalog = data->old_catalog->database();
977  const catalog::CatalogDatabase &new_catalog = data->new_catalog->database();
978 
979  catalog::SqlCatalog sql_attach_new(new_catalog,
980  "ATTACH '" + old_catalog.filename() + "' AS old;");
981  bool retval = sql_attach_new.Execute();
982 
983  // remove the hardlink to the old database file (temporary file), it will not
984  // be needed anymore... data will get deleted when the database is closed
985  unlink(data->old_catalog->database().filename().c_str());
986 
987  if (!retval) {
988  Error("Failed to attach database of old catalog", sql_attach_new, data);
989  return false;
990  }
991  return true;
992 }
993 
994 
996  PendingCatalog *data) const
997 {
998  assert(data->HasNew());
999  data->new_catalog->Transaction();
1000  return true;
1001 }
1002 
1003 
1005  PendingCatalog *data) const
1006 {
1007  assert(!data->new_catalog->IsDirty());
1008  assert(data->HasNew());
1009  bool retval;
1010  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1011 
1012  // Hardlinks scratch space.
1013  // This temporary table is used for the hardlink analysis results.
1014  // The old catalog format did not have a direct notion of hardlinks and their
1015  // linkcounts, but this information can be partly retrieved from the under-
1016  // lying file system semantics.
1017  //
1018  // Hardlinks:
1019  // groupid : this group id can be used for the new catalog schema
1020  // inode : the inodes that were part of a hardlink group before
1021  // linkcount : the linkcount for hardlink group id members
1022  catalog::SqlCatalog sql_create_hardlinks_table(writable,
1023  "CREATE TEMPORARY TABLE hardlinks "
1024  " ( hardlink_group_id INTEGER PRIMARY KEY AUTOINCREMENT, "
1025  " inode INTEGER, "
1026  " linkcount INTEGER, "
1027  " CONSTRAINT unique_inode UNIQUE (inode) );");
1028  retval = sql_create_hardlinks_table.Execute();
1029  if (!retval) {
1030  Error("Failed to create temporary hardlink analysis table",
1031  sql_create_hardlinks_table, data);
1032  return false;
1033  }
1034 
1035  // Directory Linkcount scratch space.
1036  // Directory linkcounts can be obtained from the directory hierarchy reflected
1037  // in the old style catalogs. The new catalog schema asks for this specific
1038  // linkcount. Directory linkcount analysis results will be put into this
1039  // temporary table
1040  catalog::SqlCatalog sql_create_linkcounts_table(writable,
1041  "CREATE TEMPORARY TABLE dir_linkcounts "
1042  " ( inode INTEGER PRIMARY KEY, "
1043  " linkcount INTEGER );");
1044  retval = sql_create_linkcounts_table.Execute();
1045  if (!retval) {
1046  Error("Failed to create tmeporary directory linkcount analysis table",
1047  sql_create_linkcounts_table, data);
1048  }
1049 
1050  // It is possible to skip this step.
1051  // In that case all hardlink inodes with a (potential) linkcount > 1 will get
1052  // degraded to files containing the same content
1053  if (analyze_file_linkcounts_) {
1054  retval = AnalyzeFileLinkcounts(data);
1055  if (!retval) {
1056  return false;
1057  }
1058  }
1059 
1060  // Analyze the linkcounts of directories
1061  // - each directory has a linkcount of at least 2 (empty directory)
1062  // (link in parent directory and self reference (cd .) )
1063  // - for each child directory, the parent's link count is incremented by 1
1064  // (parent reference in child (cd ..) )
1065  //
1066  // Note: nested catalog mountpoints will be miscalculated here, since we can't
1067  // check the number of containing directories. They are defined in a the
1068  // linked nested catalog and need to be added later on.
1069  // (see: MigrateNestedCatalogMountPoints() for details)
1070  catalog::SqlCatalog sql_dir_linkcounts(writable,
1071  "INSERT INTO dir_linkcounts "
1072  " SELECT c1.inode as inode, "
1073  " SUM(IFNULL(MIN(c2.inode,1),0)) + 2 as linkcount "
1074  " FROM old.catalog as c1 "
1075  " LEFT JOIN old.catalog as c2 "
1076  " ON c2.parent_1 = c1.md5path_1 AND "
1077  " c2.parent_2 = c1.md5path_2 AND "
1078  " c2.flags & :flag_dir_1 "
1079  " WHERE c1.flags & :flag_dir_2 "
1080  " GROUP BY c1.inode;");
1081  retval =
1082  sql_dir_linkcounts.BindInt64(1, catalog::SqlDirent::kFlagDir) &&
1083  sql_dir_linkcounts.BindInt64(2, catalog::SqlDirent::kFlagDir) &&
1084  sql_dir_linkcounts.Execute();
1085  if (!retval) {
1086  Error("Failed to analyze directory specific linkcounts",
1087  sql_dir_linkcounts, data);
1088  if (sql_dir_linkcounts.GetLastError() == SQLITE_CONSTRAINT) {
1089  Error("Obviously your catalogs are corrupted, since we found a directory"
1090  "inode that is a file inode at the same time!");
1091  }
1092  return false;
1093  }
1094 
1095  // Copy the old file meta information into the new catalog schema
1096  // here we also add the previously analyzed hardlink/linkcount information
1097  // from both temporary tables "hardlinks" and "dir_linkcounts".
1098  //
1099  // Note: nested catalog mountpoints still need to be treated separately
1100  // (see MigrateNestedCatalogMountPoints() for details)
1101  catalog::SqlCatalog migrate_file_meta_data(writable,
1102  "INSERT INTO catalog "
1103  " SELECT md5path_1, md5path_2, "
1104  " parent_1, parent_2, "
1105  " IFNULL(hardlink_group_id, 0) << 32 | "
1106  " COALESCE(hardlinks.linkcount, dir_linkcounts.linkcount, 1) "
1107  " AS hardlinks, "
1108  " hash, size, mode, mtime, "
1109  " flags, name, symlink, "
1110  " :uid, "
1111  " :gid, "
1112  " NULL " // set empty xattr BLOB (default)
1113  " FROM old.catalog "
1114  " LEFT JOIN hardlinks "
1115  " ON catalog.inode = hardlinks.inode "
1116  " LEFT JOIN dir_linkcounts "
1117  " ON catalog.inode = dir_linkcounts.inode;");
1118  retval = migrate_file_meta_data.BindInt64(1, uid_) &&
1119  migrate_file_meta_data.BindInt64(2, gid_) &&
1120  migrate_file_meta_data.Execute();
1121  if (!retval) {
1122  Error("Failed to migrate the file system meta data",
1123  migrate_file_meta_data, data);
1124  return false;
1125  }
1126 
1127  // If we deal with a nested catalog, we need to add a .cvmfscatalog entry
1128  // since it was not present in the old repository specification but is needed
1129  // now!
1130  if (!data->IsRoot()) {
1131  const catalog::DirectoryEntry &nested_marker =
1133  catalog::SqlDirentInsert insert_nested_marker(writable);
1134  const std::string root_path = data->root_path();
1135  const std::string file_path = root_path +
1136  "/" + nested_marker.name().ToString();
1137  const shash::Md5 &path_hash = shash::Md5(file_path.data(),
1138  file_path.size());
1139  const shash::Md5 &parent_hash = shash::Md5(root_path.data(),
1140  root_path.size());
1141  retval = insert_nested_marker.BindPathHash(path_hash) &&
1142  insert_nested_marker.BindParentPathHash(parent_hash) &&
1143  insert_nested_marker.BindDirent(nested_marker) &&
1144  insert_nested_marker.BindXattrEmpty() &&
1145  insert_nested_marker.Execute();
1146  if (!retval) {
1147  Error("Failed to insert nested catalog marker into new nested catalog.",
1148  insert_nested_marker, data);
1149  return false;
1150  }
1151  }
1152 
1153  // Copy (and update) the properties fields
1154  //
1155  // Note: The 'schema' is explicitly not copied to the new catalog.
1156  // Each catalog contains a revision, which is also copied here and that
1157  // is later updated by calling catalog->IncrementRevision()
1158  catalog::SqlCatalog copy_properties(writable,
1159  "INSERT OR REPLACE INTO properties "
1160  " SELECT key, value "
1161  " FROM old.properties "
1162  " WHERE key != 'schema';");
1163  retval = copy_properties.Execute();
1164  if (!retval) {
1165  Error("Failed to migrate the properties table.", copy_properties, data);
1166  return false;
1167  }
1168 
1169  return true;
1170 }
1171 
1172 
1174  PendingCatalog *data) const
1175 {
1176  assert(data->HasNew());
1177  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1178  bool retval;
1179 
1180  // Analyze the hardlink relationships in the old catalog
1181  // inodes used to be assigned at publishing time, implicitly constituating
1182  // those relationships. We now need them explicitly in the file catalogs
1183  // This looks for directory entries with matching inodes but differing path-
1184  // hashes and saves the results in a temporary table called 'hl_scratch'
1185  //
1186  // Note: We only support hardlink groups that reside in the same directory!
1187  // Therefore we first need to figure out hardlink candidates (which
1188  // might still contain hardlink groups spanning more than one directory)
1189  // In a second step these candidates will be analyzed to kick out un-
1190  // supported hardlink groups.
1191  // Unsupported hardlink groups will be be treated as normal files with
1192  // the same content
1193  catalog::SqlCatalog sql_create_hardlinks_scratch_table(writable,
1194  "CREATE TEMPORARY TABLE hl_scratch AS "
1195  " SELECT c1.inode AS inode, c1.md5path_1, c1.md5path_2, "
1196  " c1.parent_1 as c1p1, c1.parent_2 as c1p2, "
1197  " c2.parent_1 as c2p1, c2.parent_2 as c2p2 "
1198  " FROM old.catalog AS c1 "
1199  " INNER JOIN old.catalog AS c2 "
1200  " ON c1.inode == c2.inode AND "
1201  " (c1.md5path_1 != c2.md5path_1 OR "
1202  " c1.md5path_2 != c2.md5path_2);");
1203  retval = sql_create_hardlinks_scratch_table.Execute();
1204  if (!retval) {
1205  Error("Failed to create temporary scratch table for hardlink analysis",
1206  sql_create_hardlinks_scratch_table, data);
1207  return false;
1208  }
1209 
1210  // Figures out which hardlink candidates are supported by CVMFS and can be
1211  // transferred into the new catalog as so called hardlink groups. Unsupported
1212  // hardlinks need to be discarded and treated as normal files containing the
1213  // exact same data
1214  catalog::SqlCatalog fill_linkcount_table_for_files(writable,
1215  "INSERT INTO hardlinks (inode, linkcount)"
1216  " SELECT inode, count(*) as linkcount "
1217  " FROM ( "
1218  // recombine supported hardlink inodes with their actual manifested
1219  // hard-links in the catalog.
1220  // Note: for each directory entry pointing to the same supported
1221  // hardlink inode we have a distinct MD5 path hash
1222  " SELECT DISTINCT hl.inode, hl.md5path_1, hl.md5path_2 "
1223  " FROM ( "
1224  // sort out supported hardlink inodes from unsupported ones by
1225  // locality
1226  // Note: see the next comment for the nested SELECT
1227  " SELECT inode "
1228  " FROM ( "
1229  " SELECT inode, count(*) AS cnt "
1230  " FROM ( "
1231  // go through the potential hardlinks and collect location infor-
1232  // mation about them.
1233  // Note: we only support hardlinks that all reside in the same
1234  // directory, thus having the same parent (c1p* == c2p*)
1235  // --> For supported hardlink candidates the SELECT DISTINCT
1236  // will produce only a single row, whereas others produce more
1237  " SELECT DISTINCT inode,c1p1,c1p1,c2p1,c2p2 "
1238  " FROM hl_scratch AS hl "
1239  " ) "
1240  " GROUP BY inode "
1241  " ) "
1242  " WHERE cnt = 1 "
1243  " ) AS supported_hardlinks "
1244  " LEFT JOIN hl_scratch AS hl "
1245  " ON supported_hardlinks.inode = hl.inode "
1246  " ) "
1247  " GROUP BY inode;");
1248  retval = fill_linkcount_table_for_files.Execute();
1249  if (!retval) {
1250  Error("Failed to analyze hardlink relationships for files.",
1251  fill_linkcount_table_for_files, data);
1252  return false;
1253  }
1254 
1255  // The file linkcount and hardlink analysis is finished and the scratch table
1256  // can be deleted...
1257  catalog::SqlCatalog drop_hardlink_scratch_space(writable,
1258  "DROP TABLE hl_scratch;");
1259  retval = drop_hardlink_scratch_space.Execute();
1260  if (!retval) {
1261  Error("Failed to remove file linkcount analysis scratch table",
1262  drop_hardlink_scratch_space, data);
1263  return false;
1264  }
1265 
1266  // Do some statistics if asked for...
1267  if (collect_catalog_statistics_) {
1268  catalog::SqlCatalog count_hardlinks(writable,
1269  "SELECT count(*), sum(linkcount) FROM hardlinks;");
1270  retval = count_hardlinks.FetchRow();
1271  if (!retval) {
1272  Error("Failed to count the generated file hardlinks for statistics",
1273  count_hardlinks, data);
1274  return false;
1275  }
1276 
1277  data->statistics.hardlink_group_count += count_hardlinks.RetrieveInt64(0);
1278  data->statistics.aggregated_linkcounts += count_hardlinks.RetrieveInt64(1);
1279  }
1280 
1281  return true;
1282 }
1283 
1284 
1286  PendingCatalog *data) const
1287 {
1288  assert(data->HasNew());
1289  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1290  bool retval;
1291 
1292  // preparing the SQL statement for nested catalog mountpoint update
1293  catalog::SqlCatalog update_mntpnt_linkcount(writable,
1294  "UPDATE catalog "
1295  "SET hardlinks = :linkcount "
1296  "WHERE md5path_1 = :md5_1 AND md5path_2 = :md5_2;");
1297 
1298  // update all nested catalog mountpoints
1299  // (Note: we might need to wait for the nested catalog to be processed)
1300  PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
1301  PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
1302  for (; i != iend; ++i) {
1303  // collect information about the nested catalog
1304  PendingCatalog *nested_catalog = *i;
1305  const catalog::DirectoryEntry root_entry = nested_catalog->root_entry.Get();
1306  const string &root_path = nested_catalog->root_path();
1307 
1308  // update the nested catalog mountpoint directory entry with the correct
1309  // linkcount that was determined while processing the nested catalog
1310  const shash::Md5 mountpoint_hash = shash::Md5(root_path.data(),
1311  root_path.size());
1312  retval =
1313  update_mntpnt_linkcount.BindInt64(1, root_entry.linkcount()) &&
1314  update_mntpnt_linkcount.BindMd5(2, 3, mountpoint_hash) &&
1315  update_mntpnt_linkcount.Execute();
1316  if (!retval) {
1317  Error("Failed to update linkcount of nested catalog mountpoint",
1318  update_mntpnt_linkcount, data);
1319  return false;
1320  }
1321  update_mntpnt_linkcount.Reset();
1322  }
1323 
1324  return true;
1325 }
1326 
1327 
1329  PendingCatalog *data) const
1330 {
1331  assert(data->HasNew());
1332  if (!fix_nested_catalog_transitions_) {
1333  // Fixing transition point mismatches is not enabled...
1334  return true;
1335  }
1336 
1337  typedef catalog::DirectoryEntry::Difference Difference;
1338 
1339  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1340  bool retval;
1341 
1342  catalog::SqlLookupPathHash lookup_mountpoint(writable);
1343  catalog::SqlDirentUpdate update_directory_entry(writable);
1344 
1345  // Unbox the nested catalogs (possibly waiting for migration of them first)
1346  PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
1347  PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
1348  for (; i != iend; ++i) {
1349  // Collect information about the nested catalog
1350  PendingCatalog *nested_catalog = *i;
1351  const catalog::DirectoryEntry nested_root_entry =
1352  nested_catalog->root_entry.Get();
1353  const string &nested_root_path = nested_catalog->root_path();
1354  const shash::Md5 mountpoint_path_hash =
1355  shash::Md5(nested_root_path.data(), nested_root_path.size());
1356 
1357  // Retrieve the nested catalog mountpoint from the current catalog
1358  retval = lookup_mountpoint.BindPathHash(mountpoint_path_hash) &&
1359  lookup_mountpoint.FetchRow();
1360  if (!retval) {
1361  Error("Failed to fetch nested catalog mountpoint to check for compatible"
1362  "transition points", lookup_mountpoint, data);
1363  return false;
1364  }
1365 
1366  catalog::DirectoryEntry mountpoint_entry =
1367  lookup_mountpoint.GetDirent(data->new_catalog);
1368  lookup_mountpoint.Reset();
1369 
1370  // Compare nested catalog mountpoint and nested catalog root entries
1372  mountpoint_entry.CompareTo(nested_root_entry);
1373 
1374  // We MUST deal with two directory entries that are a pair of nested cata-
1375  // log mountpoint and root entry! Thus we expect their transition flags to
1376  // differ and their name to be the same.
1377  assert(diffs & Difference::kNestedCatalogTransitionFlags);
1378  assert((diffs & Difference::kName) == 0);
1379 
1380  // Check if there are other differences except the nested catalog transition
1381  // flags and fix them...
1382  if ((diffs ^ Difference::kNestedCatalogTransitionFlags) != 0) {
1383  // If we found differences, we still assume a couple of directory entry
1384  // fields to be the same, otherwise some severe stuff would be wrong...
1385  if ((diffs & Difference::kChecksum) ||
1386  (diffs & Difference::kLinkcount) ||
1387  (diffs & Difference::kSymlink) ||
1388  (diffs & Difference::kChunkedFileFlag) )
1389  {
1390  Error("Found an irreparable mismatch in a nested catalog transition "
1391  "point at '" + nested_root_path + "'\nAborting...\n");
1392  }
1393 
1394  // Copy the properties from the nested catalog root entry into the mount-
1395  // point entry to bring them in sync again
1397  nested_root_entry, &mountpoint_entry);
1398 
1399  // save the nested catalog mountpoint entry into the catalog
1400  retval = update_directory_entry.BindPathHash(mountpoint_path_hash) &&
1401  update_directory_entry.BindDirent(mountpoint_entry) &&
1402  update_directory_entry.Execute();
1403  if (!retval) {
1404  Error("Failed to save resynchronized nested catalog mountpoint into "
1405  "catalog database", update_directory_entry, data);
1406  return false;
1407  }
1408  update_directory_entry.Reset();
1409 
1410  // Fixing of this mountpoint went well... inform the user that this minor
1411  // issue occured
1413  "NOTE: fixed incompatible nested catalog transition point at: "
1414  "'%s' ", nested_root_path.c_str());
1415  }
1416  }
1417 
1418  return true;
1419 }
1420 
1421 
1423  const catalog::DirectoryEntry &nested_root,
1424  catalog::DirectoryEntry *mountpoint
1425 ) {
1426  // Replace some file system parameters in the mountpoint to resync it with
1427  // the nested root of the corresponding nested catalog
1428  //
1429  // Note: this method relies on CommandMigrate being a friend of DirectoryEntry
1430  mountpoint->mode_ = nested_root.mode_;
1431  mountpoint->uid_ = nested_root.uid_;
1432  mountpoint->gid_ = nested_root.gid_;
1433  mountpoint->size_ = nested_root.size_;
1434  mountpoint->mtime_ = nested_root.mtime_;
1435 }
1436 
1437 
1439  PendingCatalog *data) const
1440 {
1441  assert(data->HasNew());
1442  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1443  bool retval = false;
1444 
1445  // build a set of registered nested catalog path hashes
1446  typedef catalog::Catalog::NestedCatalogList NestedCatalogList;
1447  typedef std::map<shash::Md5, catalog::Catalog::NestedCatalog>
1448  NestedCatalogMap;
1449  const NestedCatalogList& nested_clgs =
1451  NestedCatalogList::const_iterator i = nested_clgs.begin();
1452  const NestedCatalogList::const_iterator iend = nested_clgs.end();
1453  NestedCatalogMap nested_catalog_path_hashes;
1454  for (; i != iend; ++i) {
1455  const PathString &path = i->mountpoint;
1456  const shash::Md5 hash(path.GetChars(), path.GetLength());
1457  nested_catalog_path_hashes[hash] = *i;
1458  }
1459 
1460  // Retrieve nested catalog mountpoints that have child entries directly inside
1461  // the current catalog (which is a malformed state)
1462  catalog::SqlLookupDanglingMountpoints sql_dangling_mountpoints(writable);
1463  catalog::SqlDirentUpdate save_updated_mountpoint(writable);
1464 
1465  std::vector<catalog::DirectoryEntry> todo_dirent;
1466  std::vector<shash::Md5> todo_hash;
1467 
1468  // go through the list of dangling nested catalog mountpoints and fix them
1469  // where needed (check if there is no nested catalog registered for them)
1470  while (sql_dangling_mountpoints.FetchRow()) {
1471  catalog::DirectoryEntry dangling_mountpoint =
1472  sql_dangling_mountpoints.GetDirent(data->new_catalog);
1473  const shash::Md5 path_hash = sql_dangling_mountpoints.GetPathHash();
1474  assert(dangling_mountpoint.IsNestedCatalogMountpoint());
1475 
1476  // check if the nested catalog mountpoint is registered in the nested cata-
1477  // log list of the currently migrated catalog
1478  const NestedCatalogMap::const_iterator nested_catalog =
1479  nested_catalog_path_hashes.find(path_hash);
1480  if (nested_catalog != nested_catalog_path_hashes.end()) {
1482  "WARNING: found a non-empty nested catalog mountpoint under "
1483  "'%s'", nested_catalog->second.mountpoint.c_str());
1484  continue;
1485  }
1486 
1487  // the mountpoint was confirmed to be dangling and needs to be removed
1488  dangling_mountpoint.set_is_nested_catalog_mountpoint(false);
1489  todo_dirent.push_back(dangling_mountpoint);
1490  todo_hash.push_back(path_hash);
1491  }
1492 
1493  for (unsigned i = 0; i < todo_dirent.size(); ++i) {
1494  retval = save_updated_mountpoint.BindPathHash(todo_hash[i]) &&
1495  save_updated_mountpoint.BindDirent(todo_dirent[i]) &&
1496  save_updated_mountpoint.Execute() &&
1497  save_updated_mountpoint.Reset();
1498  if (!retval) {
1499  Error("Failed to remove dangling nested catalog mountpoint entry in "
1500  "catalog", save_updated_mountpoint, data);
1501  return false;
1502  }
1503 
1504  // tell the user that this intervention has been taken place
1505  LogCvmfs(kLogCatalog, kLogStdout, "NOTE: fixed dangling nested catalog "
1506  "mountpoint entry called: '%s' ",
1507  todo_dirent[i].name().c_str());
1508  }
1509 
1510  return true;
1511 }
1512 
1513 
1515  // This is pre-initialized singleton... it MUST be already there...
1516  assert(nested_catalog_marker_.name_.ToString() == ".cvmfscatalog");
1517  return nested_catalog_marker_;
1518 }
1519 
1521  // Create an empty nested catalog marker file
1523  CreateTempPath(temporary_directory_ + "/.cvmfscatalog", 0644);
1524  if (nested_catalog_marker_tmp_path_.empty()) {
1525  Error("Failed to create temp file for nested catalog marker dummy.");
1526  return false;
1527  }
1528 
1529  // Process and upload it to the backend storage
1530  IngestionSource *source =
1532  spooler_->Process(source);
1533  return true;
1534 }
1535 
1537  const shash::Any &content_hash)
1538 {
1539  // Generate it only once
1540  assert(nested_catalog_marker_.name_.ToString() != ".cvmfscatalog");
1541 
1542  // Fill the DirectoryEntry structure will all needed information
1543  nested_catalog_marker_.name_.Assign(".cvmfscatalog", strlen(".cvmfscatalog"));
1544  nested_catalog_marker_.mode_ = 33188;
1548  nested_catalog_marker_.mtime_ = time(NULL);
1550  nested_catalog_marker_.checksum_ = content_hash;
1551 }
1552 
1553 
1555  PendingCatalog *data) const
1556 {
1557  assert(data->HasNew());
1558  bool retval = false;
1559  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1560 
1561  // Aggregated the statistics counters of all nested catalogs
1562  // Note: we might need to wait until nested catalogs are sucessfully processed
1563  catalog::DeltaCounters stats_counters;
1564  PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
1565  PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
1566  for (; i != iend; ++i) {
1567  const PendingCatalog *nested_catalog = *i;
1568  const catalog::DeltaCounters &s = nested_catalog->nested_statistics.Get();
1569  s.PopulateToParent(&stats_counters);
1570  }
1571 
1572  // Count various directory entry types in the catalog to fill up the catalog
1573  // statistics counters introduced in the current catalog schema
1574  catalog::SqlCatalog count_regular_files(writable,
1575  "SELECT count(*) FROM catalog "
1576  " WHERE flags & :flag_file "
1577  " AND NOT flags & :flag_link;");
1578  catalog::SqlCatalog count_symlinks(writable,
1579  "SELECT count(*) FROM catalog WHERE flags & :flag_link;");
1580  catalog::SqlCatalog count_directories(writable,
1581  "SELECT count(*) FROM catalog WHERE flags & :flag_dir;");
1582  catalog::SqlCatalog aggregate_file_size(writable,
1583  "SELECT sum(size) FROM catalog WHERE flags & :flag_file "
1584  " AND NOT flags & :flag_link");
1585 
1586  // Run the actual counting queries
1587  retval =
1588  count_regular_files.BindInt64(1, catalog::SqlDirent::kFlagFile) &&
1589  count_regular_files.BindInt64(2, catalog::SqlDirent::kFlagLink) &&
1590  count_regular_files.FetchRow();
1591  if (!retval) {
1592  Error("Failed to count regular files.", count_regular_files, data);
1593  return false;
1594  }
1595  retval =
1596  count_symlinks.BindInt64(1, catalog::SqlDirent::kFlagLink) &&
1597  count_symlinks.FetchRow();
1598  if (!retval) {
1599  Error("Failed to count symlinks.", count_symlinks, data);
1600  return false;
1601  }
1602  retval =
1603  count_directories.BindInt64(1, catalog::SqlDirent::kFlagDir) &&
1604  count_directories.FetchRow();
1605  if (!retval) {
1606  Error("Failed to count directories.", count_directories, data);
1607  return false;
1608  }
1609  retval =
1610  aggregate_file_size.BindInt64(1, catalog::SqlDirent::kFlagFile) &&
1611  aggregate_file_size.BindInt64(2, catalog::SqlDirent::kFlagLink) &&
1612  aggregate_file_size.FetchRow();
1613  if (!retval) {
1614  Error("Failed to aggregate the file sizes.", aggregate_file_size, data);
1615  return false;
1616  }
1617 
1618  // Insert the counted statistics into the DeltaCounters data structure
1619  stats_counters.self.regular_files = count_regular_files.RetrieveInt64(0);
1620  stats_counters.self.symlinks = count_symlinks.RetrieveInt64(0);
1621  stats_counters.self.directories = count_directories.RetrieveInt64(0);
1622  stats_counters.self.nested_catalogs = data->nested_catalogs.size();
1623  stats_counters.self.file_size = aggregate_file_size.RetrieveInt64(0);
1624 
1625  // Write back the generated statistics counters into the catalog database
1626  stats_counters.WriteToDatabase(writable);
1627 
1628  // Push the generated statistics counters up to the parent catalog
1629  data->nested_statistics.Set(stats_counters);
1630 
1631  return true;
1632 }
1633 
1634 
1636  PendingCatalog *data) const
1637 {
1638  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1639  bool retval;
1640 
1641  std::string root_path = data->root_path();
1642  shash::Md5 root_path_hash = shash::Md5(root_path.data(), root_path.size());
1643 
1644  catalog::SqlLookupPathHash lookup_root_entry(writable);
1645  retval = lookup_root_entry.BindPathHash(root_path_hash) &&
1646  lookup_root_entry.FetchRow();
1647  if (!retval) {
1648  Error("Failed to retrieve root directory entry of migrated catalog",
1649  lookup_root_entry, data);
1650  return false;
1651  }
1652 
1653  catalog::DirectoryEntry entry =
1654  lookup_root_entry.GetDirent(data->new_catalog);
1655  if (entry.linkcount() < 2 || entry.hardlink_group() > 0) {
1656  Error("Retrieved linkcount of catalog root entry is not sane.", data);
1657  return false;
1658  }
1659 
1660  data->root_entry.Set(entry);
1661  return true;
1662 }
1663 
1664 
1666  PendingCatalog *data) const
1667 {
1668  assert(data->HasNew());
1669  data->new_catalog->Commit();
1670  return true;
1671 }
1672 
1673 
1675  PendingCatalog *data) const
1676 {
1677  assert(data->HasNew());
1678  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1679  catalog::SqlCatalog detach_old_catalog(writable, "DETACH old;");
1680  const bool retval = detach_old_catalog.Execute();
1681  if (!retval) {
1682  Error("Failed to detach old catalog database.", detach_old_catalog, data);
1683  return false;
1684  }
1685  return true;
1686 }
1687 
1688 
1689 //------------------------------------------------------------------------------
1690 
1691 
1693  const worker_context *context)
1695 { }
1696 
1697 
1699  const
1700 {
1701  return CheckDatabaseSchemaCompatibility(data) &&
1702  StartDatabaseTransaction(data) &&
1703  GenerateNewStatisticsCounters(data) &&
1704  UpdateCatalogSchema(data) &&
1705  CommitDatabaseTransaction(data);
1706 }
1707 
1708 
1710  PendingCatalog *data) const
1711 {
1712  assert(!data->HasNew());
1713  const catalog::CatalogDatabase &old_catalog = data->old_catalog->database();
1714 
1715  if ((old_catalog.schema_version() < 2.4 -
1717  ||
1718  (old_catalog.schema_version() > 2.4 +
1720  {
1721  Error("Given Catalog is not Schema 2.4.", data);
1722  return false;
1723  }
1724 
1725  return true;
1726 }
1727 
1728 
1730  PendingCatalog *data) const
1731 {
1732  assert(!data->HasNew());
1733  GetWritable(data->old_catalog)->Transaction();
1734  return true;
1735 }
1736 
1737 
1739  (PendingCatalog *data) const {
1740  assert(!data->HasNew());
1741  bool retval = false;
1742  const catalog::CatalogDatabase &writable =
1743  GetWritable(data->old_catalog)->database();
1744 
1745  // Aggregated the statistics counters of all nested catalogs
1746  // Note: we might need to wait until nested catalogs are sucessfully processed
1747  catalog::DeltaCounters stats_counters;
1748  PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
1749  PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
1750  for (; i != iend; ++i) {
1751  const PendingCatalog *nested_catalog = *i;
1752  const catalog::DeltaCounters &s = nested_catalog->nested_statistics.Get();
1753  s.PopulateToParent(&stats_counters);
1754  }
1755 
1756  // Count various directory entry types in the catalog to fill up the catalog
1757  // statistics counters introduced in the current catalog schema
1758  catalog::SqlCatalog count_chunked_files(writable,
1759  "SELECT count(*), sum(size) FROM catalog "
1760  " WHERE flags & :flag_chunked_file;");
1761  catalog::SqlCatalog count_file_chunks(writable,
1762  "SELECT count(*) FROM chunks;");
1763  catalog::SqlCatalog aggregate_file_size(writable,
1764  "SELECT sum(size) FROM catalog WHERE flags & :flag_file "
1765  " AND NOT flags & :flag_link;");
1766 
1767  // Run the actual counting queries
1768  retval =
1769  count_chunked_files.BindInt64(1, catalog::SqlDirent::kFlagFileChunk) &&
1770  count_chunked_files.FetchRow();
1771  if (!retval) {
1772  Error("Failed to count chunked files.", count_chunked_files, data);
1773  return false;
1774  }
1775  retval = count_file_chunks.FetchRow();
1776  if (!retval) {
1777  Error("Failed to count file chunks", count_file_chunks, data);
1778  return false;
1779  }
1780  retval =
1781  aggregate_file_size.BindInt64(1, catalog::SqlDirent::kFlagFile) &&
1782  aggregate_file_size.BindInt64(2, catalog::SqlDirent::kFlagLink) &&
1783  aggregate_file_size.FetchRow();
1784  if (!retval) {
1785  Error("Failed to aggregate the file sizes.", aggregate_file_size, data);
1786  return false;
1787  }
1788 
1789  // Insert the counted statistics into the DeltaCounters data structure
1790  stats_counters.self.chunked_files = count_chunked_files.RetrieveInt64(0);
1791  stats_counters.self.chunked_file_size = count_chunked_files.RetrieveInt64(1);
1792  stats_counters.self.file_chunks = count_file_chunks.RetrieveInt64(0);
1793  stats_counters.self.file_size = aggregate_file_size.RetrieveInt64(0);
1794 
1795  // Write back the generated statistics counters into the catalog database
1796  catalog::Counters counters;
1797  retval = counters.ReadFromDatabase(writable, catalog::LegacyMode::kLegacy);
1798  if (!retval) {
1799  Error("Failed to read old catalog statistics counters", data);
1800  return false;
1801  }
1802  counters.ApplyDelta(stats_counters);
1803  retval = counters.InsertIntoDatabase(writable);
1804  if (!retval) {
1805  Error("Failed to write new statistics counters to database", data);
1806  return false;
1807  }
1808 
1809  // Push the generated statistics counters up to the parent catalog
1810  data->nested_statistics.Set(stats_counters);
1811 
1812  return true;
1813 }
1814 
1815 
1817  (PendingCatalog *data) const {
1818  assert(!data->HasNew());
1819  const catalog::CatalogDatabase &writable =
1820  GetWritable(data->old_catalog)->database();
1821  catalog::SqlCatalog update_schema_version(writable,
1822  "UPDATE properties SET value = :schema_version WHERE key = 'schema';");
1823 
1824  const bool retval =
1825  update_schema_version.BindDouble(1, 2.5) &&
1826  update_schema_version.Execute();
1827  if (!retval) {
1828  Error("Failed to update catalog schema version",
1829  update_schema_version,
1830  data);
1831  return false;
1832  }
1833 
1834  return true;
1835 }
1836 
1837 
1839  (PendingCatalog *data) const {
1840  assert(!data->HasNew());
1841  GetWritable(data->old_catalog)->Commit();
1842  return true;
1843 }
1844 
1845 
1846 //------------------------------------------------------------------------------
1847 
1848 
1850  const worker_context *context)
1852  , uid_map_statement_(GenerateMappingStatement(context->uid_map, "uid"))
1853  , gid_map_statement_(GenerateMappingStatement(context->gid_map, "gid"))
1854 {}
1855 
1857  PendingCatalog *data) const {
1858  return ApplyPersonaMappings(data);
1859 }
1860 
1861 
1863  PendingCatalog *data) const {
1864  assert(data->old_catalog != NULL);
1865  assert(data->new_catalog == NULL);
1866 
1867  if (data->old_catalog->mountpoint() ==
1869  {
1870  // skipping virtual catalog
1871  return true;
1872  }
1873 
1874  const catalog::CatalogDatabase &db =
1875  GetWritable(data->old_catalog)->database();
1876 
1877  if (!db.BeginTransaction()) {
1878  return false;
1879  }
1880 
1881  catalog::SqlCatalog uid_sql(db, uid_map_statement_);
1882  if (!uid_sql.Execute()) {
1883  Error("Failed to update UIDs", uid_sql, data);
1884  return false;
1885  }
1886 
1887  catalog::SqlCatalog gid_sql(db, gid_map_statement_);
1888  if (!gid_sql.Execute()) {
1889  Error("Failed to update GIDs", gid_sql, data);
1890  return false;
1891  }
1892 
1893  return db.CommitTransaction();
1894 }
1895 
1896 
1897 template <class MapT>
1899  const MapT &map,
1900  const std::string &column) const {
1901  assert(map.RuleCount() > 0 || map.HasDefault());
1902 
1903  std::string stmt = "UPDATE OR ABORT catalog SET " + column + " = ";
1904 
1905  if (map.RuleCount() == 0) {
1906  // map everything to the same value (just a simple UPDATE clause)
1907  stmt += StringifyInt(map.GetDefault());
1908  } else {
1909  // apply multiple ID mappings (UPDATE clause with CASE statement)
1910  stmt += "CASE " + column + " ";
1911  typedef typename MapT::map_type::const_iterator map_iterator;
1912  map_iterator i = map.GetRuleMap().begin();
1913  const map_iterator iend = map.GetRuleMap().end();
1914  for (; i != iend; ++i) {
1915  stmt += "WHEN " + StringifyInt(i->first) +
1916  " THEN " + StringifyInt(i->second) + " ";
1917  }
1918 
1919  // add a default (if provided) or leave unchanged if no mapping fits
1920  stmt += (map.HasDefault())
1921  ? "ELSE " + StringifyInt(map.GetDefault()) + " "
1922  : "ELSE " + column + " ";
1923  stmt += "END";
1924  }
1925 
1926  stmt += ";";
1927  return stmt;
1928 }
1929 
1930 
1931 //------------------------------------------------------------------------------
1932 
1933 
1935  PendingCatalog *data) const {
1936  return CheckDatabaseSchemaCompatibility(data) &&
1937  BreakUpHardlinks(data);
1938 }
1939 
1940 
1941 bool
1943  (PendingCatalog *data) const {
1944  assert(data->old_catalog != NULL);
1945  assert(data->new_catalog == NULL);
1946 
1947  const catalog::CatalogDatabase &clg = data->old_catalog->database();
1949 }
1950 
1951 
1953  PendingCatalog *data) const {
1954  assert(data->old_catalog != NULL);
1955  assert(data->new_catalog == NULL);
1956 
1957  const catalog::CatalogDatabase &db =
1958  GetWritable(data->old_catalog)->database();
1959 
1960  if (!db.BeginTransaction()) {
1961  return false;
1962  }
1963 
1964  // CernVM-FS catalogs do not contain inodes directly but they are assigned by
1965  // the CVMFS catalog at runtime. Hardlinks are treated with so-called hardlink
1966  // group IDs to indicate hardlink relationships that need to be respected at
1967  // runtime by assigning identical inodes accordingly.
1968  //
1969  // This updates all directory entries of a given catalog that have a linkcount
1970  // greater than 1 and are flagged as a 'file'. Note: Symlinks are flagged both
1971  // as 'file' and as 'symlink', hence they are updated implicitly as well.
1972  //
1973  // The 'hardlinks' field in the catalog contains two 32 bit integers:
1974  // * the linkcount in the lower 32 bits
1975  // * the (so called) hardlink group ID in the higher 32 bits
1976  //
1977  // Files that have a linkcount of exactly 1 do not have any hardlinks and have
1978  // the (implicit) hardlink group ID '0'. Hence, 'hardlinks == 1' means that a
1979  // file doesn't have any hardlinks (linkcount = 1) and doesn't need treatment
1980  // here.
1981  //
1982  // Files that have hardlinks (linkcount > 1) will have a very large integer in
1983  // their 'hardlinks' field (hardlink group ID > 0 in higher 32 bits). Those
1984  // files will be treated by setting their 'hardlinks' field to 1, effectively
1985  // clearing all hardlink information from the directory entry.
1986  const std::string stmt = "UPDATE OR ABORT catalog "
1987  "SET hardlinks = 1 "
1988  "WHERE flags & :file_flag "
1989  " AND hardlinks > 1;";
1990  catalog::SqlCatalog hardlink_removal_sql(db, stmt);
1991  hardlink_removal_sql.BindInt64(1, catalog::SqlDirent::kFlagFile);
1992  hardlink_removal_sql.Execute();
1993 
1994  return db.CommitTransaction();
1995 }
1996 
1997 //------------------------------------------------------------------------------
1998 
1999 
2001  PendingCatalog *data) const {
2002  return CheckDatabaseSchemaCompatibility(data) &&
2003  RemoveRedundantBulkHashes(data);
2004 }
2005 
2006 
2007 bool
2009  (PendingCatalog *data) const {
2010  assert(data->old_catalog != NULL);
2011  assert(data->new_catalog == NULL);
2012 
2013  const catalog::CatalogDatabase &clg = data->old_catalog->database();
2015 }
2016 
2017 
2019  PendingCatalog *data) const {
2020  assert(data->old_catalog != NULL);
2021  assert(data->new_catalog == NULL);
2022 
2023  const catalog::CatalogDatabase &db =
2024  GetWritable(data->old_catalog)->database();
2025 
2026  if (!db.BeginTransaction()) {
2027  return false;
2028  }
2029 
2030  // Regular files with both bulk hashes and chunked hashes can drop the bulk
2031  // hash since modern clients >= 2.1.7 won't require them
2032  const std::string stmt = "UPDATE OR ABORT catalog "
2033  "SET hash = NULL "
2034  "WHERE flags & :file_chunked_flag;";
2035  catalog::SqlCatalog bulkhash_removal_sql(db, stmt);
2036  bulkhash_removal_sql.BindInt64(1, catalog::SqlDirent::kFlagFileChunk);
2037  bulkhash_removal_sql.Execute();
2038 
2039  return db.CommitTransaction();
2040 }
2041 
2042 
2043 //------------------------------------------------------------------------------
2044 
2045 
2047  const worker_context *context)
2049 { }
2050 
2051 
2053  const
2054 {
2055  return CheckDatabaseSchemaCompatibility(data) &&
2056  StartDatabaseTransaction(data) &&
2057  RepairStatisticsCounters(data) &&
2058  CommitDatabaseTransaction(data);
2059 }
2060 
2061 
2063  PendingCatalog *data) const
2064 {
2065  assert(data->old_catalog != NULL);
2066  assert(data->new_catalog == NULL);
2067 
2068  const catalog::CatalogDatabase &clg = data->old_catalog->database();
2070  Error("Given catalog schema is < 2.5.", data);
2071  return false;
2072  }
2073 
2074  if (clg.schema_revision() < 5) {
2075  Error("Given catalog revision is < 5", data);
2076  return false;
2077  }
2078 
2079  return true;
2080 }
2081 
2082 
2084  PendingCatalog *data) const
2085 {
2086  assert(!data->HasNew());
2087  GetWritable(data->old_catalog)->Transaction();
2088  return true;
2089 }
2090 
2091 
2093  PendingCatalog *data) const
2094 {
2095  assert(!data->HasNew());
2096  bool retval = false;
2097  const catalog::CatalogDatabase &writable =
2098  GetWritable(data->old_catalog)->database();
2099 
2100  // Aggregated the statistics counters of all nested catalogs
2101  // Note: we might need to wait until nested catalogs are sucessfully processed
2102  catalog::DeltaCounters stats_counters;
2103  PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
2104  PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
2105  for (; i != iend; ++i) {
2106  const PendingCatalog *nested_catalog = *i;
2107  const catalog::DeltaCounters &s = nested_catalog->nested_statistics.Get();
2108  s.PopulateToParent(&stats_counters);
2109  }
2110 
2111  // Count various directory entry types in the catalog to fill up the catalog
2112  // statistics counters introduced in the current catalog schema
2113  catalog::SqlCatalog count_regular(writable,
2114  std::string("SELECT count(*), sum(size) FROM catalog ") +
2115  "WHERE flags & " + StringifyInt(catalog::SqlDirent::kFlagFile) +
2116  " AND NOT flags & " + StringifyInt(catalog::SqlDirent::kFlagLink) +
2117  " AND NOT flags & " + StringifyInt(catalog::SqlDirent::kFlagFileSpecial) +
2118  ";");
2119  catalog::SqlCatalog count_external(writable,
2120  std::string("SELECT count(*), sum(size) FROM catalog ") +
2122  ";");
2123  catalog::SqlCatalog count_symlink(writable,
2124  std::string("SELECT count(*) FROM catalog ") +
2125  "WHERE flags & " + StringifyInt(catalog::SqlDirent::kFlagLink) + ";");
2126  catalog::SqlCatalog count_special(writable,
2127  std::string("SELECT count(*) FROM catalog ") +
2129  ";");
2130  catalog::SqlCatalog count_xattr(writable,
2131  std::string("SELECT count(*) FROM catalog ") +
2132  "WHERE xattr IS NOT NULL;");
2133  catalog::SqlCatalog count_chunk(writable,
2134  std::string("SELECT count(*), sum(size) FROM catalog ") +
2135  "WHERE flags & " + StringifyInt(catalog::SqlDirent::kFlagFileChunk) + ";");
2136  catalog::SqlCatalog count_dir(writable,
2137  std::string("SELECT count(*) FROM catalog ") +
2138  "WHERE flags & " + StringifyInt(catalog::SqlDirent::kFlagDir) + ";");
2139  catalog::SqlCatalog count_chunk_blobs(writable,
2140  "SELECT count(*) FROM chunks;");
2141 
2142  retval = count_regular.FetchRow() &&
2143  count_external.FetchRow() &&
2144  count_symlink.FetchRow() &&
2145  count_special.FetchRow() &&
2146  count_xattr.FetchRow() &&
2147  count_chunk.FetchRow() &&
2148  count_dir.FetchRow() &&
2149  count_chunk_blobs.FetchRow();
2150  if (!retval) {
2151  Error("Failed to collect catalog statistics", data);
2152  return false;
2153  }
2154 
2155  stats_counters.self.regular_files = count_regular.RetrieveInt64(0);
2156  stats_counters.self.symlinks = count_symlink.RetrieveInt64(0);
2157  stats_counters.self.specials = count_special.RetrieveInt64(0);
2158  stats_counters.self.directories = count_dir.RetrieveInt64(0);
2159  stats_counters.self.nested_catalogs = data->nested_catalogs.size();
2160  stats_counters.self.chunked_files = count_chunk.RetrieveInt64(0);
2161  stats_counters.self.file_chunks = count_chunk_blobs.RetrieveInt64(0);
2162  stats_counters.self.file_size = count_regular.RetrieveInt64(1);
2163  stats_counters.self.chunked_file_size = count_chunk.RetrieveInt64(1);
2164  stats_counters.self.xattrs = count_xattr.RetrieveInt64(0);
2165  stats_counters.self.externals = count_external.RetrieveInt64(0);
2166  stats_counters.self.external_file_size = count_external.RetrieveInt64(1);
2167 
2168  // Write back the generated statistics counters into the catalog database
2169  catalog::Counters counters;
2170  counters.ApplyDelta(stats_counters);
2171  retval = counters.InsertIntoDatabase(writable);
2172  if (!retval) {
2173  Error("Failed to write new statistics counters to database", data);
2174  return false;
2175  }
2176 
2177  // Push the generated statistics counters up to the parent catalog
2178  data->nested_statistics.Set(stats_counters);
2179 
2180  return true;
2181 }
2182 
2183 
2185  PendingCatalog *data) const
2186 {
2187  assert(!data->HasNew());
2188  GetWritable(data->old_catalog)->Commit();
2189  return true;
2190 }
2191 
2192 } // namespace swissknife
bool InsertIntoDatabase(const CatalogDatabase &database) const
uint32_t linkcount() const
catalog::Catalog const * root_catalog_
int return_code
the return value of the spooler operation
void ConvertCatalogsRecursively(PendingCatalog *catalog, MigratorT *migrator)
static Parameter Optional(const char key, const std::string &desc)
Definition: swissknife.h:41
#define LogCvmfs(source, mask,...)
Definition: logging.h:20
bool UpdateCatalogMetadata(PendingCatalog *data) const
std::string GetLastErrorMsg() const
Definition: sql.cc:164
bool AnalyzeFileLinkcounts(PendingCatalog *data) const
UniquePtr< upload::Spooler > spooler_
CallbackPtr RegisterListener(typename BoundClosure< WorkerT::returned_data, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data)
PendingCatalogMap pending_catalogs_
Differences CompareTo(const DirectoryEntry &other) const
bool ReadPersonaMaps(const std::string &uid_map_path, const std::string &gid_map_path, UidMap *uid_map, GidMap *gid_map) const
const manifest::Manifest * manifest() const
Definition: repository.h:123
bool Export(const std::string &path) const
Definition: manifest.cc:209
shash::Md5 GetPathHash() const
Definition: catalog_sql.cc:683
bool Execute()
Definition: sql.cc:42
bool Reset()
Definition: sql.cc:126
static Parameter Switch(const char key, const std::string &desc)
Definition: swissknife.h:44
bool MigrateFileMetadata(PendingCatalog *data) const
bool UpdateCatalogSchema(PendingCatalog *data) const
FileChunkList file_chunks
the file chunks generated during processing
bool BindText(const int index, const std::string &value)
Definition: sql.h:399
void Insert(const CatalogStatistics &statistics)
std::string database_path() const
Definition: catalog.h:184
void set_catalog_hash(const shash::Any &catalog_hash)
Definition: manifest.h:102
bool FetchRow()
Definition: sql.cc:62
bool FindRootEntryInformation(PendingCatalog *data) const
unsigned int GetNumberOfCpuCores()
const std::string & filename() const
Definition: sql.h:148
ConcurrentWorkers< DerivedWorkerT > * master() const
void WaitForEmptyQueue() const
bool StartDatabaseTransaction(PendingCatalog *data) const
bool LoadCatalogs(const shash::Any &manual_root_hash, ObjectFetcherT *object_fetcher)
bool BindDirent(const DirectoryEntry &entry)
bool FixNestedCatalogTransitionPoints(PendingCatalog *data) const
static const std::string kPreviousHeadTag
Definition: repository.cc:41
std::string name
Definition: history.h:113
std::vector< Parameter > ParameterList
Definition: swissknife.h:71
void CreateNestedCatalogMarkerDirent(const shash::Any &content_hash)
double GetTime() const
Definition: algorithm.cc:76
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:248
void Assign(const char *chars, const unsigned length)
Definition: shortstring.h:53
void UploadCallback(const upload::SpoolerResult &result)
bool IsDirty() const
Definition: catalog_rw.h:57
const history::History * history() const
gid_t gid_
Definition: loader.cc:133
gid_t gid_
void Reset()
Definition: algorithm.cc:69
void ApplyDelta(const DeltaCounters &delta)
bool IsHttpUrl(const std::string &path)
Definition: posix.cc:204
DirectoryEntry GetDirent(const Catalog *catalog, const bool expand_symlink=true) const
Definition: catalog_sql.cc:696
bool BeginTransaction() const
Definition: sql_impl.h:269
std::string CreateTempPath(const std::string &path_prefix, const int mode)
Definition: posix.cc:1059
void set_revision(const uint64_t revision)
Definition: manifest.h:86
static void FixNestedCatalogTransitionPoint(const catalog::DirectoryEntry &nested_root, catalog::DirectoryEntry *mountpoint)
virtual ParameterList GetParams() const
uint64_t GetTTL() const
Definition: catalog.cc:495
assert((mem||(size==0))&&"Out Of Memory")
static const int kFlagFileChunk
Definition: catalog_sql.h:186
float schema_version() const
Definition: sql.h:149
CatalogList GetChildren() const
Definition: catalog.cc:752
void PrintStatusMessage(const PendingCatalog *catalog, const shash::Any &content_hash, const std::string &message)
void SetPreviousRevision(const shash::Any &hash)
Definition: catalog_rw.cc:352
bool RemoveDanglingNestedMountpoints(PendingCatalog *data) const
static const int kFlagFile
Definition: catalog_sql.h:183
static catalog::DirectoryEntry nested_catalog_marker_
bool AttachOldCatalogDatabase(PendingCatalog *data) const
bool CopyPath2Path(const string &src, const string &dest)
Definition: compression.cc:63
bool ReadPersona(const std::string &uid, const std::string &gid)
const NestedCatalogList & ListNestedCatalogs() const
Definition: catalog.cc:630
bool InitVerifyingSignatureManager(const std::string &pubkey_path, const std::string &trusted_certs="")
Definition: server_tool.cc:49
void Set(const T &object)
uint64_t size_
void JobSuccessful(const returned_data_t &data)
bool MigrateNestedCatalogMountPoints(PendingCatalog *data) const
unsigned schema_revision() const
Definition: sql.h:150
void CatalogCallback(const CatalogTraversalData< catalog::WritableCatalog > &data)
signature::SignatureManager * signature_manager() const
Definition: server_tool.cc:118
UniquePtr< history::SqliteHistory > history_upstream_
static const int kFlagLink
Definition: catalog_sql.h:184
static void Error(const std::string &message)
bool WriteToDatabase(const CatalogDatabase &database) const
bool IsNestedCatalogMountpoint() const
bool BindPathHash(const shash::Md5 &hash)
bool CommitDatabaseTransaction(PendingCatalog *data) const
static SqliteHistory * OpenWritable(const std::string &file_name)
static WritableCatalog * AttachFreely(const std::string &root_path, const std::string &file, const shash::Any &catalog_hash, Catalog *parent=NULL, const bool is_not_root=false)
Definition: catalog_rw.cc:49
std::string description
Definition: history.h:119
NameString name_
uint64_t GetRevision() const
Definition: catalog.cc:528
int GetLastError() const
Definition: sql.h:343
bool GenerateCatalogStatistics(PendingCatalog *data) const
NameString name() const
uint64_t GetLastModified() const
Definition: catalog.cc:533
int64_t String2Int64(const string &value)
Definition: string.cc:222
bool BindDouble(const int index, const double value)
Definition: sql.h:371
void UploadHistoryClosure(const upload::SpoolerResult &result, Future< shash::Any > *hash)
std::string local_path
the local_path previously given as input
uid_t uid_
Definition: loader.cc:132
download::DownloadManager * download_manager() const
Definition: server_tool.cc:113
Future< catalog::DirectoryEntry > root_entry
std::string GenerateMappingStatement(const MapT &map, const std::string &column) const
static const std::string kPreviousHeadTagDescription
static const float kSchemaEpsilon
Definition: sql.h:105
bool RepairStatisticsCounters(PendingCatalog *data) const
bool CheckDatabaseSchemaCompatibility(PendingCatalog *data) const
static Parameter Mandatory(const char key, const std::string &desc)
Definition: swissknife.h:38
bool StartDatabaseTransaction(PendingCatalog *data) const
bool DetachOldCatalogDatabase(PendingCatalog *data) const
const char kSuffixCatalog
Definition: hash.h:53
uint32_t linkcount_
bool ApplyPersonaMappings(PendingCatalog *data) const
static const int kFlagFileSpecial
Definition: catalog_sql.h:185
static const int kFlagFileExternal
Definition: catalog_sql.h:192
bool CommitDatabaseTransaction(PendingCatalog *data) const
static const std::string kHeadTag
Definition: repository.cc:40
static const catalog::DirectoryEntry & GetNestedCatalogMarkerDirent()
void PopulateToParent(DeltaCounters *parent) const
Any MkFromHexPtr(const HexPtr hex, const char suffix)
Definition: hash.cc:83
bool InsertInitialValues(const std::string &root_path, const bool volatile_content, const std::string &voms_authz, const DirectoryEntry &root_entry=DirectoryEntry(kDirentNegative))
Definition: catalog_sql.cc:263
sqlite3_int64 RetrieveInt64(const int idx_column) const
Definition: sql.h:445
bool RunMigration(PendingCatalog *data) const
PathString mountpoint() const
Definition: catalog.h:179
time_t mtime_
bool BindPathHash(const struct shash::Md5 &hash)
Definition: catalog_sql.cc:780
string StringifyInt(const int64_t value)
Definition: string.cc:78
bool IsValid() const
Definition: pointer.h:43
bool InitDownloadManager(const bool follow_redirects, const std::string &proxy="", const unsigned max_pool_handles=1)
Definition: server_tool.cc:21
bool CommitTransaction() const
Definition: sql_impl.h:276
bool ReadFromDatabase(const CatalogDatabase &database, const LegacyMode::Type legacy=LegacyMode::kNoLegacy)
bool BindInt64(const int index, const sqlite3_int64 value)
Definition: sql.h:381
void Stop()
Definition: algorithm.cc:61
void set_history(const shash::Any &history_db)
Definition: manifest.h:90
Future< catalog::DeltaCounters > nested_statistics
bool CollectAndAggregateStatistics(PendingCatalog *data) const
std::vector< Catalog * > CatalogList
Definition: catalog.h:38
bool GenerateNewStatisticsCounters(PendingCatalog *data) const
void set_ttl(const uint32_t ttl)
Definition: manifest.h:85
bool CommitDatabaseTransaction(PendingCatalog *data) const
bool RunMigration(PendingCatalog *data) const
std::string ToString() const
Definition: shortstring.h:114
std::vector< NestedCatalog > NestedCatalogList
Definition: catalog.h:208
int Main(const ArgumentList &args)
std::map< char, SharedPtr< std::string > > ArgumentList
Definition: swissknife.h:72
unsigned int mode_
ShortString< kDefaultMaxPath, 0 > PathString
Definition: shortstring.h:190
MigrationWorker_20x(const worker_context *context)
UniquePtr< manifest::Manifest > manifest_upstream_
bool UpdateUndoTags(PendingCatalog *root_catalog, unsigned revision, time_t timestamp, shash::Any *history_hash)
shash::Any root_hash
Definition: history.h:114
bool CleanupNestedCatalogs(PendingCatalog *data) const
bool UpdateNestedCatalogReferences(PendingCatalog *data) const
bool CreateNewEmptyCatalog(PendingCatalog *data) const
void set_catalog_size(const uint64_t catalog_size)
Definition: manifest.h:99
CatalogStatisticsList catalog_statistics_list_
const CatalogDatabase & database() const
Definition: catalog.h:249
unsigned int GetNumberOfFailedJobs() const
int64_t GetFileSize(const std::string &path)
Definition: posix.cc:826
HttpObjectFetcher ObjectFetcher
bool CheckDatabaseSchemaCompatibility(PendingCatalog *data) const
static const float kLatestSupportedSchema
Definition: catalog_sql.h:44
bool StartDatabaseTransaction(PendingCatalog *data) const
void JobFailed(const returned_data_t &data)
unsigned GetLength() const
Definition: shortstring.h:104
shash::Any checksum_
shash::Any hash() const
Definition: catalog.h:186
static const char * kVirtualPath
bool CheckDatabaseSchemaCompatibility(PendingCatalog *data) const
bool BindMd5(const int idx_high, const int idx_low, const shash::Md5 &hash)
Definition: catalog_sql.h:138
const char * GetChars() const
Definition: shortstring.h:96
uint32_t hardlink_group() const
bool BindParentPathHash(const shash::Md5 &hash)
static const unsigned kLatestSchemaRevision
Definition: catalog_sql.h:46
static const int kFlagDir
Definition: catalog_sql.h:178
void MigrationCallback(PendingCatalog *const &data)
void Start()
Definition: algorithm.cc:53
void set_is_nested_catalog_mountpoint(const bool val)
unsigned int Differences
bool BindPathHash(const shash::Md5 &hash)
catalog::WritableCatalog * GetWritable(const catalog::Catalog *catalog) const
void Destroy()
Definition: pointer.h:45
static DerivedT * Create(const std::string &filename)
Definition: sql_impl.h:30
bool DoMigrationAndCommit(const std::string &manifest_path, typename MigratorT::worker_context *context)
size_t size() const
Definition: bigvector.h:100
bool BindDirent(const DirectoryEntry &entry)
uid_t uid_
void set_root_path(const std::string &root_path)
Definition: manifest.h:114