CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
swissknife_migrate.cc
Go to the documentation of this file.
1 
8 #include "swissknife_migrate.h"
9 
10 #include <sys/resource.h>
11 #include <unistd.h>
12 
13 #include "catalog_rw.h"
14 #include "catalog_sql.h"
15 #include "catalog_virtual.h"
17 #include "crypto/hash.h"
18 #include "swissknife_history.h"
19 #include "util/concurrency.h"
20 #include "util/logging.h"
21 
22 using namespace std; // NOLINT
23 
24 namespace swissknife {
25 
26 catalog::DirectoryEntry CommandMigrate::nested_catalog_marker_;
27 
28 CommandMigrate::CommandMigrate() :
29  file_descriptor_limit_(8192),
30  catalog_count_(0),
31  has_committed_new_revision_(false),
32  uid_(0),
33  gid_(0),
34  root_catalog_(NULL)
35 {
36  atomic_init32(&catalogs_processed_);
37 }
38 
39 
41  ParameterList r;
42  r.push_back(Parameter::Mandatory('v',
43  "migration base version ( 2.0.x | 2.1.7 | chown | hardlink | bulkhash | "
44  "stats)"));
45  r.push_back(Parameter::Mandatory('r',
46  "repository URL (absolute local path or remote URL)"));
47  r.push_back(Parameter::Mandatory('u', "upstream definition string"));
48  r.push_back(Parameter::Mandatory('o', "manifest output file"));
49  r.push_back(Parameter::Mandatory('t',
50  "temporary directory for catalog decompress"));
51  r.push_back(Parameter::Optional('p',
52  "user id to be used for this repository"));
53  r.push_back(Parameter::Optional('g',
54  "group id to be used for this repository"));
55  r.push_back(Parameter::Optional('n', "fully qualified repository name"));
56  r.push_back(Parameter::Optional('h', "root hash (other than trunk)"));
57  r.push_back(Parameter::Optional('k', "repository master key(s)"));
58  r.push_back(Parameter::Optional('i', "UID map for chown"));
59  r.push_back(Parameter::Optional('j', "GID map for chown"));
60  r.push_back(Parameter::Optional('@', "proxy url"));
61  r.push_back(Parameter::Switch('f', "fix nested catalog transition points"));
62  r.push_back(Parameter::Switch('l', "disable linkcount analysis of files"));
63  r.push_back(Parameter::Switch('s',
64  "enable collection of catalog statistics"));
65  return r;
66 }
67 
68 
69 static void Error(const std::string &message) {
70  LogCvmfs(kLogCatalog, kLogStderr, "%s", message.c_str());
71 }
72 
73 
74 static void Error(const std::string &message,
75  const CommandMigrate::PendingCatalog *catalog) {
76  const std::string err_msg = message + "\n"
77  "Catalog: " + catalog->root_path();
78  Error(err_msg);
79 }
80 
81 
82 static void Error(const std::string &message,
83  const catalog::SqlCatalog &statement,
84  const CommandMigrate::PendingCatalog *catalog) {
85  const std::string err_msg =
86  message + "\n"
87  "SQLite: " + StringifyInt(statement.GetLastError()) +
88  " - " + statement.GetLastErrorMsg();
89  Error(err_msg, catalog);
90 }
91 
92 
94  shash::Any manual_root_hash;
95  const std::string &migration_base = *args.find('v')->second;
96  const std::string &repo_url = *args.find('r')->second;
97  const std::string &spooler = *args.find('u')->second;
98  const std::string &manifest_path = *args.find('o')->second;
99  const std::string &tmp_dir = *args.find('t')->second;
100  const std::string &uid = (args.count('p') > 0) ?
101  *args.find('p')->second :
102  "";
103  const std::string &gid = (args.count('g') > 0) ?
104  *args.find('g')->second :
105  "";
106  const std::string &repo_name = (args.count('n') > 0) ?
107  *args.find('n')->second :
108  "";
109  const std::string &repo_keys = (args.count('k') > 0) ?
110  *args.find('k')->second :
111  "";
112  const std::string &uid_map_path = (args.count('i') > 0) ?
113  *args.find('i')->second :
114  "";
115  const std::string &gid_map_path = (args.count('j') > 0) ?
116  *args.find('j')->second :
117  "";
118  const bool fix_transition_points = (args.count('f') > 0);
119  const bool analyze_file_linkcounts = (args.count('l') == 0);
120  const bool collect_catalog_statistics = (args.count('s') > 0);
121  if (args.count('h') > 0) {
122  manual_root_hash = shash::MkFromHexPtr(shash::HexPtr(
123  *args.find('h')->second), shash::kSuffixCatalog);
124  }
125 
126  // We might need a lot of file descriptors
127  if (!RaiseFileDescriptorLimit()) {
128  Error("Failed to raise file descriptor limits");
129  return 2;
130  }
131 
132  // Put SQLite into multithreaded mode
133  if (!ConfigureSQLite()) {
134  Error("Failed to preconfigure SQLite library");
135  return 3;
136  }
137 
138  // Create an upstream spooler
139  temporary_directory_ = tmp_dir;
140  const upload::SpoolerDefinition spooler_definition(spooler, shash::kSha1);
141  spooler_ = upload::Spooler::Construct(spooler_definition);
142  if (!spooler_.IsValid()) {
143  Error("Failed to create upstream Spooler.");
144  return 5;
145  }
146  spooler_->RegisterListener(&CommandMigrate::UploadCallback, this);
147 
148  // Load the full catalog hierarchy
149  LogCvmfs(kLogCatalog, kLogStdout, "Loading current catalog tree...");
150 
152  bool loading_successful = false;
153  if (IsHttpUrl(repo_url)) {
155 
156  const bool follow_redirects = false;
157  const string proxy = (args.count('@') > 0) ? *args.find('@')->second : "";
158  if (!this->InitDownloadManager(follow_redirects, proxy) ||
159  !this->InitVerifyingSignatureManager(repo_keys)) {
160  LogCvmfs(kLogCatalog, kLogStderr, "Failed to init repo connection");
161  return 1;
162  }
163 
164  ObjectFetcher fetcher(repo_name,
165  repo_url,
166  tmp_dir,
169 
170  loading_successful = LoadCatalogs(manual_root_hash, &fetcher);
171  } else {
173  ObjectFetcher fetcher(repo_url, tmp_dir);
174  loading_successful = LoadCatalogs(manual_root_hash, &fetcher);
175  }
177 
178  if (!loading_successful) {
179  Error("Failed to load catalog tree");
180  return 4;
181  }
182 
183  LogCvmfs(kLogCatalog, kLogStdout, "Loaded %d catalogs", catalog_count_);
184  assert(root_catalog_ != NULL);
185 
186  // Do the actual migration step
187  bool migration_succeeded = false;
188  if (migration_base == "2.0.x") {
189  if (!ReadPersona(uid, gid)) {
190  return 1;
191  }
192 
193  // Generate and upload a nested catalog marker
195  Error("Failed to create a nested catalog marker.");
196  return 6;
197  }
198  spooler_->WaitForUpload();
199 
200  // Configure the concurrent catalog migration facility
202  collect_catalog_statistics,
203  fix_transition_points,
204  analyze_file_linkcounts,
205  uid_,
206  gid_);
207  migration_succeeded =
208  DoMigrationAndCommit<MigrationWorker_20x>(manifest_path, &context);
209  } else if (migration_base == "2.1.7") {
210  MigrationWorker_217::worker_context context(temporary_directory_,
211  collect_catalog_statistics);
212  migration_succeeded =
213  DoMigrationAndCommit<MigrationWorker_217>(manifest_path, &context);
214  } else if (migration_base == "chown") {
215  UidMap uid_map;
216  GidMap gid_map;
217  if (!ReadPersonaMaps(uid_map_path, gid_map_path, &uid_map, &gid_map)) {
218  Error("Failed to read UID and/or GID map");
219  return 1;
220  }
222  collect_catalog_statistics,
223  uid_map,
224  gid_map);
225  migration_succeeded =
226  DoMigrationAndCommit<ChownMigrationWorker>(manifest_path, &context);
227  } else if (migration_base == "hardlink") {
228  HardlinkRemovalMigrationWorker::worker_context
229  context(temporary_directory_, collect_catalog_statistics);
230  migration_succeeded =
231  DoMigrationAndCommit<HardlinkRemovalMigrationWorker>(manifest_path,
232  &context);
233  } else if (migration_base == "bulkhash") {
234  BulkhashRemovalMigrationWorker::worker_context
235  context(temporary_directory_, collect_catalog_statistics);
236  migration_succeeded =
237  DoMigrationAndCommit<BulkhashRemovalMigrationWorker>(manifest_path,
238  &context);
239  } else if (migration_base == "stats") {
240  StatsMigrationWorker::worker_context context(
241  temporary_directory_, collect_catalog_statistics);
242  migration_succeeded =
243  DoMigrationAndCommit<StatsMigrationWorker>(manifest_path, &context);
244  } else {
245  const std::string err_msg = "Unknown migration base: " + migration_base;
246  Error(err_msg);
247  return 1;
248  }
249 
250  // Check if everything went well
251  if (!migration_succeeded) {
252  Error("Migration failed!");
253  return 5;
254  }
255 
256  // Analyze collected statistics
257  if (collect_catalog_statistics && has_committed_new_revision_) {
258  LogCvmfs(kLogCatalog, kLogStdout, "\nCollected statistics results:");
260  }
261 
262  LogCvmfs(kLogCatalog, kLogStdout, "\nCatalog Migration succeeded");
263  return 0;
264 }
265 
266 
267 bool CommandMigrate::ReadPersona(const std::string &uid,
268  const std::string &gid) {
269  if (uid.empty()) {
270  Error("Please provide a user ID");
271  return false;
272  }
273  if (gid.empty()) {
274  Error("Please provide a group ID");
275  return false;
276  }
277 
278  uid_ = String2Int64(uid);
279  gid_ = String2Int64(gid);
280  return true;
281 }
282 
283 
284 
285 bool CommandMigrate::ReadPersonaMaps(const std::string &uid_map_path,
286  const std::string &gid_map_path,
287  UidMap *uid_map,
288  GidMap *gid_map) const {
289  if (!uid_map->Read(uid_map_path) || !uid_map->IsValid()) {
290  Error("Failed to read UID map");
291  return false;
292  }
293 
294  if (!gid_map->Read(gid_map_path) || !gid_map->IsValid()) {
295  Error("Failed to read GID map");
296  return false;
297  }
298 
299  if (uid_map->RuleCount() == 0 && !uid_map->HasDefault()) {
300  Error("UID map appears to be empty");
301  return false;
302  }
303 
304  if (gid_map->RuleCount() == 0 && !gid_map->HasDefault()) {
305  Error("GID map appears to be empty");
306  return false;
307  }
308 
309  return true;
310 }
311 
312 
314  const upload::SpoolerResult &result,
315  Future<shash::Any> *hash)
316 {
317  assert(!result.IsChunked());
318  if (result.return_code != 0) {
319  LogCvmfs(kLogCvmfs, kLogStderr, "failed to upload history database (%d)",
320  result.return_code);
321  hash->Set(shash::Any());
322  } else {
323  hash->Set(result.content_hash);
324  }
325 }
326 
327 
329  PendingCatalog *root_catalog,
330  uint64_t revision,
331  time_t timestamp,
332  shash::Any *history_hash)
333 {
334  string filename_old = history_upstream_->filename();
335  string filename_new = filename_old + ".new";
336  bool retval = CopyPath2Path(filename_old, filename_new);
337  if (!retval) return false;
340  history->TakeDatabaseFileOwnership();
341 
342  history::History::Tag tag_trunk;
343  bool exists = history->GetByName(CommandTag::kHeadTag, &tag_trunk);
344  if (exists) {
345  retval = history->Remove(CommandTag::kHeadTag);
346  if (!retval) return false;
347 
348  history::History::Tag tag_trunk_previous = tag_trunk;
349  tag_trunk_previous.name = CommandTag::kPreviousHeadTag;
351  history->Remove(CommandTag::kPreviousHeadTag);
352 
353  tag_trunk.root_hash = root_catalog->new_catalog_hash;
354  tag_trunk.size = root_catalog->new_catalog_size;
355  tag_trunk.revision = revision;
356  tag_trunk.timestamp = timestamp;
357 
358  retval = history->Insert(tag_trunk_previous);
359  if (!retval) return false;
360  retval = history->Insert(tag_trunk);
361  if (!retval) return false;
362  }
363 
364  history->SetPreviousRevision(manifest_upstream_->history());
365  history->DropDatabaseFileOwnership();
366  history.Destroy();
367 
368  Future<shash::Any> history_hash_new;
369  upload::Spooler::CallbackPtr callback = spooler_->RegisterListener(
370  &CommandMigrate::UploadHistoryClosure, this, &history_hash_new);
371  spooler_->ProcessHistory(filename_new);
372  spooler_->WaitForUpload();
373  spooler_->UnregisterListener(callback);
374  unlink(filename_new.c_str());
375  *history_hash = history_hash_new.Get();
376  if (history_hash->IsNull()) {
377  Error("failed to upload tag database");
378  return false;
379  }
380 
381  return true;
382 }
383 
384 
385 template <class MigratorT>
387  const std::string &manifest_path,
388  typename MigratorT::worker_context *context
389 ) {
390  // Create a concurrent migration context for catalog migration
391  const unsigned int cpus = GetNumberOfCpuCores();
392  ConcurrentWorkers<MigratorT> concurrent_migration(cpus, cpus * 10, context);
393 
394  if (!concurrent_migration.Initialize()) {
395  Error("Failed to initialize worker migration system.");
396  return false;
397  }
399  this);
400 
401  // Migrate catalogs recursively (starting with the deepest nested catalogs)
402  LogCvmfs(kLogCatalog, kLogStdout, "\nMigrating catalogs...");
403  PendingCatalog *root_catalog = new PendingCatalog(root_catalog_);
405  ConvertCatalogsRecursively(root_catalog, &concurrent_migration);
406  concurrent_migration.WaitForEmptyQueue();
407  spooler_->WaitForUpload();
408  spooler_->UnregisterListeners();
410 
411  // check for possible errors during the migration process
412  const unsigned int errors = concurrent_migration.GetNumberOfFailedJobs() +
413  spooler_->GetNumberOfErrors();
415  "Catalog Migration finished with %d errors.", errors);
416  if (errors > 0) {
418  "\nCatalog Migration produced errors\nAborting...");
419  return false;
420  }
421 
422  if (root_catalog->was_updated.Get()) {
424  "\nCommitting migrated repository revision...");
426  manifest.set_catalog_hash(root_catalog->new_catalog_hash);
427  manifest.set_catalog_size(root_catalog->new_catalog_size);
428  manifest.set_root_path(root_catalog->root_path());
429  const catalog::Catalog* new_catalog = (root_catalog->HasNew())
430  ? root_catalog->new_catalog
431  : root_catalog->old_catalog;
432  manifest.set_ttl(new_catalog->GetTTL());
433  manifest.set_revision(new_catalog->GetRevision());
434 
435  // Commit the new (migrated) repository revision...
436  if (history_upstream_.IsValid()) {
437  shash::Any history_hash(manifest_upstream_->history());
439  "Updating repository tag database... ");
440  if (!UpdateUndoTags(root_catalog,
441  new_catalog->GetRevision(),
442  new_catalog->GetLastModified(),
443  &history_hash))
444  {
445  Error("Updating tag database failed.\nAborting...");
446  return false;
447  }
448  manifest.set_history(history_hash);
449  LogCvmfs(kLogCvmfs, kLogStdout, "%s", history_hash.ToString().c_str());
450  }
451 
452  if (!manifest.Export(manifest_path)) {
453  Error("Manifest export failed.\nAborting...");
454  return false;
455  }
457  } else {
459  "\nNo catalogs migrated, skipping the commit...");
460  }
461 
462  // Get rid of the open root catalog
463  delete root_catalog;
464 
465  return true;
466 }
467 
468 
471  std::string tree_indent;
472  std::string hash_string;
473  std::string path;
474 
475  for (unsigned int i = 1; i < data.tree_level; ++i) {
476  tree_indent += "\u2502 ";
477  }
478 
479  if (data.tree_level > 0) {
480  tree_indent += "\u251C\u2500 ";
481  }
482 
483  hash_string = data.catalog_hash.ToString();
484 
485  path = data.catalog->mountpoint().ToString();
486  if (path.empty()) {
487  path = "/";
488  root_catalog_ = data.catalog;
489  }
490 
491  LogCvmfs(kLogCatalog, kLogStdout, "%s%s %s",
492  tree_indent.c_str(),
493  hash_string.c_str(),
494  path.c_str());
495 
496  ++catalog_count_;
497 }
498 
499 
501  // Check if the migration of the catalog was successful
502  if (!data->success) {
503  Error("Catalog migration failed! Aborting...");
504  exit(1);
505  return;
506  }
507 
508  if (!data->HasChanges()) {
509  PrintStatusMessage(data, data->GetOldContentHash(), "preserved");
510  data->was_updated.Set(false);
511  return;
512  }
513 
514  const string &path = (data->HasNew()) ? data->new_catalog->database_path()
515  : data->old_catalog->database_path();
516 
517  // Save the processed catalog in the pending map
518  {
520  assert(pending_catalogs_.find(path) == pending_catalogs_.end());
521  pending_catalogs_[path] = data;
522  }
524 
525  // check the size of the uncompressed catalog file
526  size_t new_catalog_size = GetFileSize(path);
527  if (new_catalog_size <= 0) {
528  Error("Failed to get uncompressed file size of catalog!", data);
529  exit(2);
530  return;
531  }
532  data->new_catalog_size = new_catalog_size;
533 
534  // Schedule the compression and upload of the catalog
535  spooler_->ProcessCatalog(path);
536 }
537 
538 
540  const string &path = result.local_path;
541 
542  // Check if the upload was successful
543  if (result.return_code != 0) {
544  Error("Failed to upload file " + path + "\nAborting...");
545  exit(2);
546  return;
547  }
548  assert(result.file_chunks.size() == 0);
549 
550  // Remove the just uploaded file
551  unlink(path.c_str());
552 
553  // Uploaded nested catalog marker... generate and cache DirectoryEntry for it
554  if (path == nested_catalog_marker_tmp_path_) {
556  return;
557  } else {
558  // Find the catalog path in the pending catalogs and remove it from the list
559  PendingCatalog *catalog;
560  {
562  PendingCatalogMap::iterator i = pending_catalogs_.find(path);
563  assert(i != pending_catalogs_.end());
564  catalog = const_cast<PendingCatalog*>(i->second);
565  pending_catalogs_.erase(i);
566  }
567 
568  PrintStatusMessage(catalog, result.content_hash, "migrated and uploaded");
569 
570  // The catalog is completely processed... fill the content_hash to allow the
571  // processing of parent catalogs (Notified by 'was_updated'-future)
572  // NOTE: From now on, this PendingCatalog structure could be deleted and
573  // should not be used anymore!
574  catalog->new_catalog_hash = result.content_hash;
575  catalog->was_updated.Set(true);
576  }
577 }
578 
579 
581  const shash::Any &content_hash,
582  const std::string &message) {
583  atomic_inc32(&catalogs_processed_);
584  const unsigned int processed = (atomic_read32(&catalogs_processed_) * 100) /
586  LogCvmfs(kLogCatalog, kLogStdout, "[%d%%] %s %sC %s",
587  processed,
588  message.c_str(),
589  content_hash.ToString().c_str(),
590  catalog->root_path().c_str());
591 }
592 
593 
594 template <class MigratorT>
596  MigratorT *migrator) {
597  // First migrate all nested catalogs (depth first traversal)
598  const catalog::CatalogList nested_catalogs =
599  catalog->old_catalog->GetChildren();
600  catalog::CatalogList::const_iterator i = nested_catalogs.begin();
601  catalog::CatalogList::const_iterator iend = nested_catalogs.end();
602  catalog->nested_catalogs.reserve(nested_catalogs.size());
603  for (; i != iend; ++i) {
604  PendingCatalog *new_nested = new PendingCatalog(*i);
605  catalog->nested_catalogs.push_back(new_nested);
606  ConvertCatalogsRecursively(new_nested, migrator);
607  }
608 
609  // Migrate this catalog referencing all its (already migrated) children
610  migrator->Schedule(catalog);
611 }
612 
613 
615  struct rlimit rpl;
616  memset(&rpl, 0, sizeof(rpl));
617  getrlimit(RLIMIT_NOFILE, &rpl);
618  if (rpl.rlim_cur < file_descriptor_limit_) {
619  if (rpl.rlim_max < file_descriptor_limit_)
620  rpl.rlim_max = file_descriptor_limit_;
621  rpl.rlim_cur = file_descriptor_limit_;
622  const bool retval = setrlimit(RLIMIT_NOFILE, &rpl);
623  if (retval != 0) {
624  return false;
625  }
626  }
627  return true;
628 }
629 
630 
632  int retval = sqlite3_config(SQLITE_CONFIG_MULTITHREAD);
633  return (retval == SQLITE_OK);
634 }
635 
636 
638  const unsigned int number_of_catalogs = catalog_statistics_list_.size();
639  unsigned int aggregated_entry_count = 0;
640  unsigned int aggregated_max_row_id = 0;
641  unsigned int aggregated_hardlink_count = 0;
642  unsigned int aggregated_linkcounts = 0;
643  double aggregated_migration_time = 0.0;
644 
645  CatalogStatisticsList::const_iterator i = catalog_statistics_list_.begin();
646  CatalogStatisticsList::const_iterator iend = catalog_statistics_list_.end();
647  for (; i != iend; ++i) {
648  aggregated_entry_count += i->entry_count;
649  aggregated_max_row_id += i->max_row_id;
650  aggregated_hardlink_count += i->hardlink_group_count;
651  aggregated_linkcounts += i->aggregated_linkcounts;
652  aggregated_migration_time += i->migration_time;
653  }
654 
655  // Inode quantization
656  assert(aggregated_max_row_id > 0);
657  const unsigned int unused_inodes =
658  aggregated_max_row_id - aggregated_entry_count;
659  const float ratio =
660  (static_cast<float>(unused_inodes) /
661  static_cast<float>(aggregated_max_row_id)) * 100.0f;
662  LogCvmfs(kLogCatalog, kLogStdout, "Actual Entries: %d\n"
663  "Allocated Inodes: %d\n"
664  " Unused Inodes: %d\n"
665  " Percentage of wasted Inodes: %.1f%%\n",
666  aggregated_entry_count, aggregated_max_row_id, unused_inodes, ratio);
667 
668  // Hardlink statistics
669  const float average_linkcount = (aggregated_hardlink_count > 0)
670  ? aggregated_linkcounts /
671  aggregated_hardlink_count
672  : 0.0f;
673  LogCvmfs(kLogCatalog, kLogStdout, "Generated Hardlink Groups: %d\n"
674  "Average Linkcount per Group: %.1f\n",
675  aggregated_hardlink_count, average_linkcount);
676 
677  // Performance measures
678  const double average_migration_time =
679  aggregated_migration_time / static_cast<double>(number_of_catalogs);
680  LogCvmfs(kLogCatalog, kLogStdout, "Catalog Loading Time: %.2fs\n"
681  "Average Migration Time: %.2fs\n"
682  "Overall Migration Time: %.2fs\n"
683  "Aggregated Migration Time: %.2fs\n",
685  average_migration_time,
687  aggregated_migration_time);
688 }
689 
690 
692  delete old_catalog;
693  old_catalog = NULL;
694 
695  if (new_catalog != NULL) {
696  delete new_catalog;
697  new_catalog = NULL;
698  }
699 }
700 
701 
702 template<class DerivedT>
704  const worker_context *context)
705  : temporary_directory_(context->temporary_directory)
706  , collect_catalog_statistics_(context->collect_catalog_statistics)
707 { }
708 
709 
710 template<class DerivedT>
712 
713 
714 template<class DerivedT>
716  const expected_data &data) {
718  const bool success = static_cast<DerivedT*>(this)->RunMigration(data) &&
719  UpdateNestedCatalogReferences(data) &&
720  UpdateCatalogMetadata(data) &&
721  CollectAndAggregateStatistics(data) &&
722  CleanupNestedCatalogs(data);
723  data->success = success;
725 
728 
729  // Note: MigrationCallback() will take care of the result...
730  if (success) {
732  } else {
734  }
735 }
736 
737 
738 template<class DerivedT>
741 {
742  const catalog::Catalog *new_catalog =
743  (data->HasNew()) ? data->new_catalog : data->old_catalog;
744  const catalog::CatalogDatabase &writable = new_catalog->database();
745 
746  catalog::SqlCatalog add_nested_catalog(writable,
747  "INSERT OR REPLACE INTO nested_catalogs (path, sha1, size) "
748  " VALUES (:path, :sha1, :size);");
749 
750  // go through all nested catalogs and update their references (we are
751  // currently in their parent catalog)
752  // Note: we might need to wait for the nested catalog to be fully processed.
753  PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
754  PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
755  for (; i != iend; ++i) {
756  PendingCatalog *nested_catalog = *i;
757 
758  if (!nested_catalog->was_updated.Get()) {
759  continue;
760  }
761 
762  const std::string &root_path = nested_catalog->root_path();
763  const shash::Any catalog_hash = nested_catalog->new_catalog_hash;
764  const size_t catalog_size = nested_catalog->new_catalog_size;
765 
766  // insert the updated nested catalog reference into the new catalog
767  const bool retval =
768  add_nested_catalog.BindText(1, root_path) &&
769  add_nested_catalog.BindText(2, catalog_hash.ToString()) &&
770  add_nested_catalog.BindInt64(3, catalog_size) &&
771  add_nested_catalog.Execute();
772  if (!retval) {
773  Error("Failed to add nested catalog link", add_nested_catalog, data);
774  return false;
775  }
776  add_nested_catalog.Reset();
777  }
778 
779  return true;
780 }
781 
782 
783 template<class DerivedT>
786 {
787  if (!data->HasChanges()) {
788  return true;
789  }
790 
791  catalog::WritableCatalog *catalog =
792  (data->HasNew()) ? data->new_catalog : GetWritable(data->old_catalog);
793 
794  // Set the previous revision hash in the new catalog to the old catalog
795  // we are doing the whole migration as a new snapshot that does not change
796  // any files, but just applies the necessary data schema migrations
797  catalog->SetPreviousRevision(data->old_catalog->hash());
798  catalog->IncrementRevision();
799  catalog->UpdateLastModified();
800 
801  return true;
802 }
803 
804 
805 template<class DerivedT>
808 {
809  if (!collect_catalog_statistics_) {
810  return true;
811  }
812 
813  const catalog::Catalog *new_catalog =
814  (data->HasNew()) ? data->new_catalog : data->old_catalog;
815  const catalog::CatalogDatabase &writable = new_catalog->database();
816  bool retval;
817 
818  // Find out the discrepancy between MAX(rowid) and COUNT(*)
819  catalog::SqlCatalog wasted_inodes(writable,
820  "SELECT COUNT(*), MAX(rowid) FROM catalog;");
821  retval = wasted_inodes.FetchRow();
822  if (!retval) {
823  Error("Failed to count entries in catalog", wasted_inodes, data);
824  return false;
825  }
826  const unsigned int entry_count = wasted_inodes.RetrieveInt64(0);
827  const unsigned int max_row_id = wasted_inodes.RetrieveInt64(1);
828 
829  // Save collected information into the central statistics aggregator
830  data->statistics.root_path = data->root_path();
831  data->statistics.max_row_id = max_row_id;
832  data->statistics.entry_count = entry_count;
833 
834  return true;
835 }
836 
837 
838 template<class DerivedT>
840  PendingCatalog *data) const
841 {
842  // All nested catalogs of PendingCatalog 'data' are fully processed and
843  // accounted. It is safe to get rid of their data structures here!
844  PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
845  PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
846  for (; i != iend; ++i) {
847  delete *i;
848  }
849 
850  data->nested_catalogs.clear();
851  return true;
852 }
853 
854 
863 
864 
865 template<class DerivedT>
868  const catalog::Catalog *catalog) const {
869  return dynamic_cast<catalog::WritableCatalog*>(const_cast<catalog::Catalog*>(
870  catalog));
871 }
872 
873 
874 //------------------------------------------------------------------------------
875 
876 
878  const worker_context *context)
880  , fix_nested_catalog_transitions_(context->fix_nested_catalog_transitions)
881  , analyze_file_linkcounts_(context->analyze_file_linkcounts)
882  , uid_(context->uid)
883  , gid_(context->gid) { }
884 
885 
887  const
888 {
889  // double-check that we are generating compatible catalogs to the actual
890  // catalog management classes
893 
894  return CreateNewEmptyCatalog(data) &&
895  CheckDatabaseSchemaCompatibility(data) &&
896  AttachOldCatalogDatabase(data) &&
897  StartDatabaseTransaction(data) &&
898  MigrateFileMetadata(data) &&
899  MigrateNestedCatalogMountPoints(data) &&
900  FixNestedCatalogTransitionPoints(data) &&
901  RemoveDanglingNestedMountpoints(data) &&
902  GenerateCatalogStatistics(data) &&
903  FindRootEntryInformation(data) &&
904  CommitDatabaseTransaction(data) &&
905  DetachOldCatalogDatabase(data);
906 }
907 
909  PendingCatalog *data) const
910 {
911  const string root_path = data->root_path();
912 
913  // create a new catalog database schema
914  const string clg_db_path =
915  CreateTempPath(temporary_directory_ + "/catalog", 0666);
916  if (clg_db_path.empty()) {
917  Error("Failed to create temporary file for the new catalog database.");
918  return false;
919  }
920  const bool volatile_content = false;
921 
922  {
923  // TODO(rmeusel): Attach catalog should work with an open catalog database
924  // as well, to remove this inefficiency
926  new_clg_db(catalog::CatalogDatabase::Create(clg_db_path));
927  if (!new_clg_db.IsValid() ||
928  !new_clg_db->InsertInitialValues(root_path, volatile_content, "")) {
929  Error("Failed to create database for new catalog");
930  unlink(clg_db_path.c_str());
931  return false;
932  }
933  }
934 
935  // Attach the just created nested catalog database
936  catalog::WritableCatalog *writable_catalog =
937  catalog::WritableCatalog::AttachFreely(root_path, clg_db_path,
939  if (writable_catalog == NULL) {
940  Error("Failed to open database for new catalog");
941  unlink(clg_db_path.c_str());
942  return false;
943  }
944 
945  data->new_catalog = writable_catalog;
946  return true;
947 }
948 
949 
951  PendingCatalog *data) const
952 {
953  const catalog::CatalogDatabase &old_catalog = data->old_catalog->database();
954  const catalog::CatalogDatabase &new_catalog = data->new_catalog->database();
955 
956  if ((new_catalog.schema_version() <
959  ||
960  new_catalog.schema_version() >
963  ||
964  (old_catalog.schema_version() > 2.1 +
966  {
967  Error("Failed to meet database requirements for migration.", data);
968  return false;
969  }
970  return true;
971 }
972 
973 
975  PendingCatalog *data) const
976 {
977  const catalog::CatalogDatabase &old_catalog = data->old_catalog->database();
978  const catalog::CatalogDatabase &new_catalog = data->new_catalog->database();
979 
980  catalog::SqlCatalog sql_attach_new(new_catalog,
981  "ATTACH '" + old_catalog.filename() + "' AS old;");
982  bool retval = sql_attach_new.Execute();
983 
984  // remove the hardlink to the old database file (temporary file), it will not
985  // be needed anymore... data will get deleted when the database is closed
986  unlink(data->old_catalog->database().filename().c_str());
987 
988  if (!retval) {
989  Error("Failed to attach database of old catalog", sql_attach_new, data);
990  return false;
991  }
992  return true;
993 }
994 
995 
997  PendingCatalog *data) const
998 {
999  assert(data->HasNew());
1000  data->new_catalog->Transaction();
1001  return true;
1002 }
1003 
1004 
1006  PendingCatalog *data) const
1007 {
1008  assert(!data->new_catalog->IsDirty());
1009  assert(data->HasNew());
1010  bool retval;
1011  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1012 
1013  // Hardlinks scratch space.
1014  // This temporary table is used for the hardlink analysis results.
1015  // The old catalog format did not have a direct notion of hardlinks and their
1016  // linkcounts, but this information can be partly retrieved from the under-
1017  // lying file system semantics.
1018  //
1019  // Hardlinks:
1020  // groupid : this group id can be used for the new catalog schema
1021  // inode : the inodes that were part of a hardlink group before
1022  // linkcount : the linkcount for hardlink group id members
1023  catalog::SqlCatalog sql_create_hardlinks_table(writable,
1024  "CREATE TEMPORARY TABLE hardlinks "
1025  " ( hardlink_group_id INTEGER PRIMARY KEY AUTOINCREMENT, "
1026  " inode INTEGER, "
1027  " linkcount INTEGER, "
1028  " CONSTRAINT unique_inode UNIQUE (inode) );");
1029  retval = sql_create_hardlinks_table.Execute();
1030  if (!retval) {
1031  Error("Failed to create temporary hardlink analysis table",
1032  sql_create_hardlinks_table, data);
1033  return false;
1034  }
1035 
1036  // Directory Linkcount scratch space.
1037  // Directory linkcounts can be obtained from the directory hierarchy reflected
1038  // in the old style catalogs. The new catalog schema asks for this specific
1039  // linkcount. Directory linkcount analysis results will be put into this
1040  // temporary table
1041  catalog::SqlCatalog sql_create_linkcounts_table(writable,
1042  "CREATE TEMPORARY TABLE dir_linkcounts "
1043  " ( inode INTEGER PRIMARY KEY, "
1044  " linkcount INTEGER );");
1045  retval = sql_create_linkcounts_table.Execute();
1046  if (!retval) {
1047  Error("Failed to create tmeporary directory linkcount analysis table",
1048  sql_create_linkcounts_table, data);
1049  }
1050 
1051  // It is possible to skip this step.
1052  // In that case all hardlink inodes with a (potential) linkcount > 1 will get
1053  // degraded to files containing the same content
1054  if (analyze_file_linkcounts_) {
1055  retval = AnalyzeFileLinkcounts(data);
1056  if (!retval) {
1057  return false;
1058  }
1059  }
1060 
1061  // Analyze the linkcounts of directories
1062  // - each directory has a linkcount of at least 2 (empty directory)
1063  // (link in parent directory and self reference (cd .) )
1064  // - for each child directory, the parent's link count is incremented by 1
1065  // (parent reference in child (cd ..) )
1066  //
1067  // Note: nested catalog mountpoints will be miscalculated here, since we can't
1068  // check the number of containing directories. They are defined in a the
1069  // linked nested catalog and need to be added later on.
1070  // (see: MigrateNestedCatalogMountPoints() for details)
1071  catalog::SqlCatalog sql_dir_linkcounts(writable,
1072  "INSERT INTO dir_linkcounts "
1073  " SELECT c1.inode as inode, "
1074  " SUM(IFNULL(MIN(c2.inode,1),0)) + 2 as linkcount "
1075  " FROM old.catalog as c1 "
1076  " LEFT JOIN old.catalog as c2 "
1077  " ON c2.parent_1 = c1.md5path_1 AND "
1078  " c2.parent_2 = c1.md5path_2 AND "
1079  " c2.flags & :flag_dir_1 "
1080  " WHERE c1.flags & :flag_dir_2 "
1081  " GROUP BY c1.inode;");
1082  retval =
1083  sql_dir_linkcounts.BindInt64(1, catalog::SqlDirent::kFlagDir) &&
1084  sql_dir_linkcounts.BindInt64(2, catalog::SqlDirent::kFlagDir) &&
1085  sql_dir_linkcounts.Execute();
1086  if (!retval) {
1087  Error("Failed to analyze directory specific linkcounts",
1088  sql_dir_linkcounts, data);
1089  if (sql_dir_linkcounts.GetLastError() == SQLITE_CONSTRAINT) {
1090  Error("Obviously your catalogs are corrupted, since we found a directory"
1091  "inode that is a file inode at the same time!");
1092  }
1093  return false;
1094  }
1095 
1096  // Copy the old file meta information into the new catalog schema
1097  // here we also add the previously analyzed hardlink/linkcount information
1098  // from both temporary tables "hardlinks" and "dir_linkcounts".
1099  //
1100  // Note: nested catalog mountpoints still need to be treated separately
1101  // (see MigrateNestedCatalogMountPoints() for details)
1102  catalog::SqlCatalog migrate_file_meta_data(writable,
1103  "INSERT INTO catalog "
1104  " SELECT md5path_1, md5path_2, "
1105  " parent_1, parent_2, "
1106  " IFNULL(hardlink_group_id, 0) << 32 | "
1107  " COALESCE(hardlinks.linkcount, dir_linkcounts.linkcount, 1) "
1108  " AS hardlinks, "
1109  " hash, size, mode, mtime, "
1110  " flags, name, symlink, "
1111  " :uid, "
1112  " :gid, "
1113  " NULL " // set empty xattr BLOB (default)
1114  " FROM old.catalog "
1115  " LEFT JOIN hardlinks "
1116  " ON catalog.inode = hardlinks.inode "
1117  " LEFT JOIN dir_linkcounts "
1118  " ON catalog.inode = dir_linkcounts.inode;");
1119  retval = migrate_file_meta_data.BindInt64(1, uid_) &&
1120  migrate_file_meta_data.BindInt64(2, gid_) &&
1121  migrate_file_meta_data.Execute();
1122  if (!retval) {
1123  Error("Failed to migrate the file system meta data",
1124  migrate_file_meta_data, data);
1125  return false;
1126  }
1127 
1128  // If we deal with a nested catalog, we need to add a .cvmfscatalog entry
1129  // since it was not present in the old repository specification but is needed
1130  // now!
1131  if (!data->IsRoot()) {
1132  const catalog::DirectoryEntry &nested_marker =
1134  catalog::SqlDirentInsert insert_nested_marker(writable);
1135  const std::string root_path = data->root_path();
1136  const std::string file_path = root_path +
1137  "/" + nested_marker.name().ToString();
1138  const shash::Md5 &path_hash = shash::Md5(file_path.data(),
1139  file_path.size());
1140  const shash::Md5 &parent_hash = shash::Md5(root_path.data(),
1141  root_path.size());
1142  retval = insert_nested_marker.BindPathHash(path_hash) &&
1143  insert_nested_marker.BindParentPathHash(parent_hash) &&
1144  insert_nested_marker.BindDirent(nested_marker) &&
1145  insert_nested_marker.BindXattrEmpty() &&
1146  insert_nested_marker.Execute();
1147  if (!retval) {
1148  Error("Failed to insert nested catalog marker into new nested catalog.",
1149  insert_nested_marker, data);
1150  return false;
1151  }
1152  }
1153 
1154  // Copy (and update) the properties fields
1155  //
1156  // Note: The 'schema' is explicitly not copied to the new catalog.
1157  // Each catalog contains a revision, which is also copied here and that
1158  // is later updated by calling catalog->IncrementRevision()
1159  catalog::SqlCatalog copy_properties(writable,
1160  "INSERT OR REPLACE INTO properties "
1161  " SELECT key, value "
1162  " FROM old.properties "
1163  " WHERE key != 'schema';");
1164  retval = copy_properties.Execute();
1165  if (!retval) {
1166  Error("Failed to migrate the properties table.", copy_properties, data);
1167  return false;
1168  }
1169 
1170  return true;
1171 }
1172 
1173 
1175  PendingCatalog *data) const
1176 {
1177  assert(data->HasNew());
1178  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1179  bool retval;
1180 
1181  // Analyze the hardlink relationships in the old catalog
1182  // inodes used to be assigned at publishing time, implicitly constituating
1183  // those relationships. We now need them explicitly in the file catalogs
1184  // This looks for directory entries with matching inodes but differing path-
1185  // hashes and saves the results in a temporary table called 'hl_scratch'
1186  //
1187  // Note: We only support hardlink groups that reside in the same directory!
1188  // Therefore we first need to figure out hardlink candidates (which
1189  // might still contain hardlink groups spanning more than one directory)
1190  // In a second step these candidates will be analyzed to kick out un-
1191  // supported hardlink groups.
1192  // Unsupported hardlink groups will be be treated as normal files with
1193  // the same content
1194  catalog::SqlCatalog sql_create_hardlinks_scratch_table(writable,
1195  "CREATE TEMPORARY TABLE hl_scratch AS "
1196  " SELECT c1.inode AS inode, c1.md5path_1, c1.md5path_2, "
1197  " c1.parent_1 as c1p1, c1.parent_2 as c1p2, "
1198  " c2.parent_1 as c2p1, c2.parent_2 as c2p2 "
1199  " FROM old.catalog AS c1 "
1200  " INNER JOIN old.catalog AS c2 "
1201  " ON c1.inode == c2.inode AND "
1202  " (c1.md5path_1 != c2.md5path_1 OR "
1203  " c1.md5path_2 != c2.md5path_2);");
1204  retval = sql_create_hardlinks_scratch_table.Execute();
1205  if (!retval) {
1206  Error("Failed to create temporary scratch table for hardlink analysis",
1207  sql_create_hardlinks_scratch_table, data);
1208  return false;
1209  }
1210 
1211  // Figures out which hardlink candidates are supported by CVMFS and can be
1212  // transferred into the new catalog as so called hardlink groups. Unsupported
1213  // hardlinks need to be discarded and treated as normal files containing the
1214  // exact same data
1215  catalog::SqlCatalog fill_linkcount_table_for_files(writable,
1216  "INSERT INTO hardlinks (inode, linkcount)"
1217  " SELECT inode, count(*) as linkcount "
1218  " FROM ( "
1219  // recombine supported hardlink inodes with their actual manifested
1220  // hard-links in the catalog.
1221  // Note: for each directory entry pointing to the same supported
1222  // hardlink inode we have a distinct MD5 path hash
1223  " SELECT DISTINCT hl.inode, hl.md5path_1, hl.md5path_2 "
1224  " FROM ( "
1225  // sort out supported hardlink inodes from unsupported ones by
1226  // locality
1227  // Note: see the next comment for the nested SELECT
1228  " SELECT inode "
1229  " FROM ( "
1230  " SELECT inode, count(*) AS cnt "
1231  " FROM ( "
1232  // go through the potential hardlinks and collect location infor-
1233  // mation about them.
1234  // Note: we only support hardlinks that all reside in the same
1235  // directory, thus having the same parent (c1p* == c2p*)
1236  // --> For supported hardlink candidates the SELECT DISTINCT
1237  // will produce only a single row, whereas others produce more
1238  " SELECT DISTINCT inode,c1p1,c1p1,c2p1,c2p2 "
1239  " FROM hl_scratch AS hl "
1240  " ) "
1241  " GROUP BY inode "
1242  " ) "
1243  " WHERE cnt = 1 "
1244  " ) AS supported_hardlinks "
1245  " LEFT JOIN hl_scratch AS hl "
1246  " ON supported_hardlinks.inode = hl.inode "
1247  " ) "
1248  " GROUP BY inode;");
1249  retval = fill_linkcount_table_for_files.Execute();
1250  if (!retval) {
1251  Error("Failed to analyze hardlink relationships for files.",
1252  fill_linkcount_table_for_files, data);
1253  return false;
1254  }
1255 
1256  // The file linkcount and hardlink analysis is finished and the scratch table
1257  // can be deleted...
1258  catalog::SqlCatalog drop_hardlink_scratch_space(writable,
1259  "DROP TABLE hl_scratch;");
1260  retval = drop_hardlink_scratch_space.Execute();
1261  if (!retval) {
1262  Error("Failed to remove file linkcount analysis scratch table",
1263  drop_hardlink_scratch_space, data);
1264  return false;
1265  }
1266 
1267  // Do some statistics if asked for...
1268  if (collect_catalog_statistics_) {
1269  catalog::SqlCatalog count_hardlinks(writable,
1270  "SELECT count(*), sum(linkcount) FROM hardlinks;");
1271  retval = count_hardlinks.FetchRow();
1272  if (!retval) {
1273  Error("Failed to count the generated file hardlinks for statistics",
1274  count_hardlinks, data);
1275  return false;
1276  }
1277 
1278  data->statistics.hardlink_group_count += count_hardlinks.RetrieveInt64(0);
1279  data->statistics.aggregated_linkcounts += count_hardlinks.RetrieveInt64(1);
1280  }
1281 
1282  return true;
1283 }
1284 
1285 
1287  PendingCatalog *data) const
1288 {
1289  assert(data->HasNew());
1290  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1291  bool retval;
1292 
1293  // preparing the SQL statement for nested catalog mountpoint update
1294  catalog::SqlCatalog update_mntpnt_linkcount(writable,
1295  "UPDATE catalog "
1296  "SET hardlinks = :linkcount "
1297  "WHERE md5path_1 = :md5_1 AND md5path_2 = :md5_2;");
1298 
1299  // update all nested catalog mountpoints
1300  // (Note: we might need to wait for the nested catalog to be processed)
1301  PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
1302  PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
1303  for (; i != iend; ++i) {
1304  // collect information about the nested catalog
1305  PendingCatalog *nested_catalog = *i;
1306  const catalog::DirectoryEntry root_entry = nested_catalog->root_entry.Get();
1307  const string &root_path = nested_catalog->root_path();
1308 
1309  // update the nested catalog mountpoint directory entry with the correct
1310  // linkcount that was determined while processing the nested catalog
1311  const shash::Md5 mountpoint_hash = shash::Md5(root_path.data(),
1312  root_path.size());
1313  retval =
1314  update_mntpnt_linkcount.BindInt64(1, root_entry.linkcount()) &&
1315  update_mntpnt_linkcount.BindMd5(2, 3, mountpoint_hash) &&
1316  update_mntpnt_linkcount.Execute();
1317  if (!retval) {
1318  Error("Failed to update linkcount of nested catalog mountpoint",
1319  update_mntpnt_linkcount, data);
1320  return false;
1321  }
1322  update_mntpnt_linkcount.Reset();
1323  }
1324 
1325  return true;
1326 }
1327 
1328 
1330  PendingCatalog *data) const
1331 {
1332  assert(data->HasNew());
1333  if (!fix_nested_catalog_transitions_) {
1334  // Fixing transition point mismatches is not enabled...
1335  return true;
1336  }
1337 
1338  typedef catalog::DirectoryEntry::Difference Difference;
1339 
1340  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1341  bool retval;
1342 
1343  catalog::SqlLookupPathHash lookup_mountpoint(writable);
1344  catalog::SqlDirentUpdate update_directory_entry(writable);
1345 
1346  // Unbox the nested catalogs (possibly waiting for migration of them first)
1347  PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
1348  PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
1349  for (; i != iend; ++i) {
1350  // Collect information about the nested catalog
1351  PendingCatalog *nested_catalog = *i;
1352  const catalog::DirectoryEntry nested_root_entry =
1353  nested_catalog->root_entry.Get();
1354  const string &nested_root_path = nested_catalog->root_path();
1355  const shash::Md5 mountpoint_path_hash =
1356  shash::Md5(nested_root_path.data(), nested_root_path.size());
1357 
1358  // Retrieve the nested catalog mountpoint from the current catalog
1359  retval = lookup_mountpoint.BindPathHash(mountpoint_path_hash) &&
1360  lookup_mountpoint.FetchRow();
1361  if (!retval) {
1362  Error("Failed to fetch nested catalog mountpoint to check for compatible"
1363  "transition points", lookup_mountpoint, data);
1364  return false;
1365  }
1366 
1367  catalog::DirectoryEntry mountpoint_entry =
1368  lookup_mountpoint.GetDirent(data->new_catalog);
1369  lookup_mountpoint.Reset();
1370 
1371  // Compare nested catalog mountpoint and nested catalog root entries
1373  mountpoint_entry.CompareTo(nested_root_entry);
1374 
1375  // We MUST deal with two directory entries that are a pair of nested cata-
1376  // log mountpoint and root entry! Thus we expect their transition flags to
1377  // differ and their name to be the same.
1378  assert(diffs & Difference::kNestedCatalogTransitionFlags);
1379  assert((diffs & Difference::kName) == 0);
1380 
1381  // Check if there are other differences except the nested catalog transition
1382  // flags and fix them...
1383  if ((diffs ^ Difference::kNestedCatalogTransitionFlags) != 0) {
1384  // If we found differences, we still assume a couple of directory entry
1385  // fields to be the same, otherwise some severe stuff would be wrong...
1386  if ((diffs & Difference::kChecksum) ||
1387  (diffs & Difference::kLinkcount) ||
1388  (diffs & Difference::kSymlink) ||
1389  (diffs & Difference::kChunkedFileFlag) )
1390  {
1391  Error("Found an irreparable mismatch in a nested catalog transition "
1392  "point at '" + nested_root_path + "'\nAborting...\n");
1393  }
1394 
1395  // Copy the properties from the nested catalog root entry into the mount-
1396  // point entry to bring them in sync again
1398  nested_root_entry, &mountpoint_entry);
1399 
1400  // save the nested catalog mountpoint entry into the catalog
1401  retval = update_directory_entry.BindPathHash(mountpoint_path_hash) &&
1402  update_directory_entry.BindDirent(mountpoint_entry) &&
1403  update_directory_entry.Execute();
1404  if (!retval) {
1405  Error("Failed to save resynchronized nested catalog mountpoint into "
1406  "catalog database", update_directory_entry, data);
1407  return false;
1408  }
1409  update_directory_entry.Reset();
1410 
1411  // Fixing of this mountpoint went well... inform the user that this minor
1412  // issue occurred
1414  "NOTE: fixed incompatible nested catalog transition point at: "
1415  "'%s' ", nested_root_path.c_str());
1416  }
1417  }
1418 
1419  return true;
1420 }
1421 
1422 
1424  const catalog::DirectoryEntry &nested_root,
1425  catalog::DirectoryEntry *mountpoint
1426 ) {
1427  // Replace some file system parameters in the mountpoint to resync it with
1428  // the nested root of the corresponding nested catalog
1429  //
1430  // Note: this method relies on CommandMigrate being a friend of DirectoryEntry
1431  mountpoint->mode_ = nested_root.mode_;
1432  mountpoint->uid_ = nested_root.uid_;
1433  mountpoint->gid_ = nested_root.gid_;
1434  mountpoint->size_ = nested_root.size_;
1435  mountpoint->mtime_ = nested_root.mtime_;
1436 }
1437 
1438 
1440  PendingCatalog *data) const
1441 {
1442  assert(data->HasNew());
1443  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1444  bool retval = false;
1445 
1446  // build a set of registered nested catalog path hashes
1447  typedef catalog::Catalog::NestedCatalogList NestedCatalogList;
1448  typedef std::map<shash::Md5, catalog::Catalog::NestedCatalog>
1449  NestedCatalogMap;
1450  const NestedCatalogList& nested_clgs =
1452  NestedCatalogList::const_iterator i = nested_clgs.begin();
1453  const NestedCatalogList::const_iterator iend = nested_clgs.end();
1454  NestedCatalogMap nested_catalog_path_hashes;
1455  for (; i != iend; ++i) {
1456  const PathString &path = i->mountpoint;
1457  const shash::Md5 hash(path.GetChars(), path.GetLength());
1458  nested_catalog_path_hashes[hash] = *i;
1459  }
1460 
1461  // Retrieve nested catalog mountpoints that have child entries directly inside
1462  // the current catalog (which is a malformed state)
1463  catalog::SqlLookupDanglingMountpoints sql_dangling_mountpoints(writable);
1464  catalog::SqlDirentUpdate save_updated_mountpoint(writable);
1465 
1466  std::vector<catalog::DirectoryEntry> todo_dirent;
1467  std::vector<shash::Md5> todo_hash;
1468 
1469  // go through the list of dangling nested catalog mountpoints and fix them
1470  // where needed (check if there is no nested catalog registered for them)
1471  while (sql_dangling_mountpoints.FetchRow()) {
1472  catalog::DirectoryEntry dangling_mountpoint =
1473  sql_dangling_mountpoints.GetDirent(data->new_catalog);
1474  const shash::Md5 path_hash = sql_dangling_mountpoints.GetPathHash();
1475  assert(dangling_mountpoint.IsNestedCatalogMountpoint());
1476 
1477  // check if the nested catalog mountpoint is registered in the nested cata-
1478  // log list of the currently migrated catalog
1479  const NestedCatalogMap::const_iterator nested_catalog =
1480  nested_catalog_path_hashes.find(path_hash);
1481  if (nested_catalog != nested_catalog_path_hashes.end()) {
1483  "WARNING: found a non-empty nested catalog mountpoint under "
1484  "'%s'", nested_catalog->second.mountpoint.c_str());
1485  continue;
1486  }
1487 
1488  // the mountpoint was confirmed to be dangling and needs to be removed
1489  dangling_mountpoint.set_is_nested_catalog_mountpoint(false);
1490  todo_dirent.push_back(dangling_mountpoint);
1491  todo_hash.push_back(path_hash);
1492  }
1493 
1494  for (unsigned i = 0; i < todo_dirent.size(); ++i) {
1495  retval = save_updated_mountpoint.BindPathHash(todo_hash[i]) &&
1496  save_updated_mountpoint.BindDirent(todo_dirent[i]) &&
1497  save_updated_mountpoint.Execute() &&
1498  save_updated_mountpoint.Reset();
1499  if (!retval) {
1500  Error("Failed to remove dangling nested catalog mountpoint entry in "
1501  "catalog", save_updated_mountpoint, data);
1502  return false;
1503  }
1504 
1505  // tell the user that this intervention has been taken place
1506  LogCvmfs(kLogCatalog, kLogStdout, "NOTE: fixed dangling nested catalog "
1507  "mountpoint entry called: '%s' ",
1508  todo_dirent[i].name().c_str());
1509  }
1510 
1511  return true;
1512 }
1513 
1514 
1516  // This is pre-initialized singleton... it MUST be already there...
1517  assert(nested_catalog_marker_.name_.ToString() == ".cvmfscatalog");
1518  return nested_catalog_marker_;
1519 }
1520 
1522  // Create an empty nested catalog marker file
1524  CreateTempPath(temporary_directory_ + "/.cvmfscatalog", 0644);
1525  if (nested_catalog_marker_tmp_path_.empty()) {
1526  Error("Failed to create temp file for nested catalog marker dummy.");
1527  return false;
1528  }
1529 
1530  // Process and upload it to the backend storage
1531  IngestionSource *source =
1533  spooler_->Process(source);
1534  return true;
1535 }
1536 
1538  const shash::Any &content_hash)
1539 {
1540  // Generate it only once
1541  assert(nested_catalog_marker_.name_.ToString() != ".cvmfscatalog");
1542 
1543  // Fill the DirectoryEntry structure will all needed information
1544  nested_catalog_marker_.name_.Assign(".cvmfscatalog", strlen(".cvmfscatalog"));
1545  nested_catalog_marker_.mode_ = 33188;
1549  nested_catalog_marker_.mtime_ = time(NULL);
1551  nested_catalog_marker_.checksum_ = content_hash;
1552 }
1553 
1554 
1556  PendingCatalog *data) const
1557 {
1558  assert(data->HasNew());
1559  bool retval = false;
1560  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1561 
1562  // Aggregated the statistics counters of all nested catalogs
1563  // Note: we might need to wait until nested catalogs are successfully
1564  // processed
1565  catalog::DeltaCounters stats_counters;
1566  PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
1567  PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
1568  for (; i != iend; ++i) {
1569  const PendingCatalog *nested_catalog = *i;
1570  const catalog::DeltaCounters &s = nested_catalog->nested_statistics.Get();
1571  s.PopulateToParent(&stats_counters);
1572  }
1573 
1574  // Count various directory entry types in the catalog to fill up the catalog
1575  // statistics counters introduced in the current catalog schema
1576  catalog::SqlCatalog count_regular_files(writable,
1577  "SELECT count(*) FROM catalog "
1578  " WHERE flags & :flag_file "
1579  " AND NOT flags & :flag_link;");
1580  catalog::SqlCatalog count_symlinks(writable,
1581  "SELECT count(*) FROM catalog WHERE flags & :flag_link;");
1582  catalog::SqlCatalog count_directories(writable,
1583  "SELECT count(*) FROM catalog WHERE flags & :flag_dir;");
1584  catalog::SqlCatalog aggregate_file_size(writable,
1585  "SELECT sum(size) FROM catalog WHERE flags & :flag_file "
1586  " AND NOT flags & :flag_link");
1587 
1588  // Run the actual counting queries
1589  retval =
1590  count_regular_files.BindInt64(1, catalog::SqlDirent::kFlagFile) &&
1591  count_regular_files.BindInt64(2, catalog::SqlDirent::kFlagLink) &&
1592  count_regular_files.FetchRow();
1593  if (!retval) {
1594  Error("Failed to count regular files.", count_regular_files, data);
1595  return false;
1596  }
1597  retval =
1598  count_symlinks.BindInt64(1, catalog::SqlDirent::kFlagLink) &&
1599  count_symlinks.FetchRow();
1600  if (!retval) {
1601  Error("Failed to count symlinks.", count_symlinks, data);
1602  return false;
1603  }
1604  retval =
1605  count_directories.BindInt64(1, catalog::SqlDirent::kFlagDir) &&
1606  count_directories.FetchRow();
1607  if (!retval) {
1608  Error("Failed to count directories.", count_directories, data);
1609  return false;
1610  }
1611  retval =
1612  aggregate_file_size.BindInt64(1, catalog::SqlDirent::kFlagFile) &&
1613  aggregate_file_size.BindInt64(2, catalog::SqlDirent::kFlagLink) &&
1614  aggregate_file_size.FetchRow();
1615  if (!retval) {
1616  Error("Failed to aggregate the file sizes.", aggregate_file_size, data);
1617  return false;
1618  }
1619 
1620  // Insert the counted statistics into the DeltaCounters data structure
1621  stats_counters.self.regular_files = count_regular_files.RetrieveInt64(0);
1622  stats_counters.self.symlinks = count_symlinks.RetrieveInt64(0);
1623  stats_counters.self.directories = count_directories.RetrieveInt64(0);
1624  stats_counters.self.nested_catalogs = data->nested_catalogs.size();
1625  stats_counters.self.file_size = aggregate_file_size.RetrieveInt64(0);
1626 
1627  // Write back the generated statistics counters into the catalog database
1628  stats_counters.WriteToDatabase(writable);
1629 
1630  // Push the generated statistics counters up to the parent catalog
1631  data->nested_statistics.Set(stats_counters);
1632 
1633  return true;
1634 }
1635 
1636 
1638  PendingCatalog *data) const
1639 {
1640  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1641  bool retval;
1642 
1643  std::string root_path = data->root_path();
1644  shash::Md5 root_path_hash = shash::Md5(root_path.data(), root_path.size());
1645 
1646  catalog::SqlLookupPathHash lookup_root_entry(writable);
1647  retval = lookup_root_entry.BindPathHash(root_path_hash) &&
1648  lookup_root_entry.FetchRow();
1649  if (!retval) {
1650  Error("Failed to retrieve root directory entry of migrated catalog",
1651  lookup_root_entry, data);
1652  return false;
1653  }
1654 
1655  catalog::DirectoryEntry entry =
1656  lookup_root_entry.GetDirent(data->new_catalog);
1657  if (entry.linkcount() < 2 || entry.hardlink_group() > 0) {
1658  Error("Retrieved linkcount of catalog root entry is not sane.", data);
1659  return false;
1660  }
1661 
1662  data->root_entry.Set(entry);
1663  return true;
1664 }
1665 
1666 
1668  PendingCatalog *data) const
1669 {
1670  assert(data->HasNew());
1671  data->new_catalog->Commit();
1672  return true;
1673 }
1674 
1675 
1677  PendingCatalog *data) const
1678 {
1679  assert(data->HasNew());
1680  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1681  catalog::SqlCatalog detach_old_catalog(writable, "DETACH old;");
1682  const bool retval = detach_old_catalog.Execute();
1683  if (!retval) {
1684  Error("Failed to detach old catalog database.", detach_old_catalog, data);
1685  return false;
1686  }
1687  return true;
1688 }
1689 
1690 
1691 //------------------------------------------------------------------------------
1692 
1693 
1695  const worker_context *context)
1697 { }
1698 
1699 
1701  const
1702 {
1703  return CheckDatabaseSchemaCompatibility(data) &&
1704  StartDatabaseTransaction(data) &&
1705  GenerateNewStatisticsCounters(data) &&
1706  UpdateCatalogSchema(data) &&
1707  CommitDatabaseTransaction(data);
1708 }
1709 
1710 
1712  PendingCatalog *data) const
1713 {
1714  assert(!data->HasNew());
1715  const catalog::CatalogDatabase &old_catalog = data->old_catalog->database();
1716 
1717  if ((old_catalog.schema_version() < 2.4 -
1719  ||
1720  (old_catalog.schema_version() > 2.4 +
1722  {
1723  Error("Given Catalog is not Schema 2.4.", data);
1724  return false;
1725  }
1726 
1727  return true;
1728 }
1729 
1730 
1732  PendingCatalog *data) const
1733 {
1734  assert(!data->HasNew());
1735  GetWritable(data->old_catalog)->Transaction();
1736  return true;
1737 }
1738 
1739 
1741  (PendingCatalog *data) const {
1742  assert(!data->HasNew());
1743  bool retval = false;
1744  const catalog::CatalogDatabase &writable =
1745  GetWritable(data->old_catalog)->database();
1746 
1747  // Aggregated the statistics counters of all nested catalogs
1748  // Note: we might need to wait until nested catalogs are successfully
1749  // processed
1750  catalog::DeltaCounters stats_counters;
1751  PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
1752  PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
1753  for (; i != iend; ++i) {
1754  const PendingCatalog *nested_catalog = *i;
1755  const catalog::DeltaCounters &s = nested_catalog->nested_statistics.Get();
1756  s.PopulateToParent(&stats_counters);
1757  }
1758 
1759  // Count various directory entry types in the catalog to fill up the catalog
1760  // statistics counters introduced in the current catalog schema
1761  catalog::SqlCatalog count_chunked_files(writable,
1762  "SELECT count(*), sum(size) FROM catalog "
1763  " WHERE flags & :flag_chunked_file;");
1764  catalog::SqlCatalog count_file_chunks(writable,
1765  "SELECT count(*) FROM chunks;");
1766  catalog::SqlCatalog aggregate_file_size(writable,
1767  "SELECT sum(size) FROM catalog WHERE flags & :flag_file "
1768  " AND NOT flags & :flag_link;");
1769 
1770  // Run the actual counting queries
1771  retval =
1772  count_chunked_files.BindInt64(1, catalog::SqlDirent::kFlagFileChunk) &&
1773  count_chunked_files.FetchRow();
1774  if (!retval) {
1775  Error("Failed to count chunked files.", count_chunked_files, data);
1776  return false;
1777  }
1778  retval = count_file_chunks.FetchRow();
1779  if (!retval) {
1780  Error("Failed to count file chunks", count_file_chunks, data);
1781  return false;
1782  }
1783  retval =
1784  aggregate_file_size.BindInt64(1, catalog::SqlDirent::kFlagFile) &&
1785  aggregate_file_size.BindInt64(2, catalog::SqlDirent::kFlagLink) &&
1786  aggregate_file_size.FetchRow();
1787  if (!retval) {
1788  Error("Failed to aggregate the file sizes.", aggregate_file_size, data);
1789  return false;
1790  }
1791 
1792  // Insert the counted statistics into the DeltaCounters data structure
1793  stats_counters.self.chunked_files = count_chunked_files.RetrieveInt64(0);
1794  stats_counters.self.chunked_file_size = count_chunked_files.RetrieveInt64(1);
1795  stats_counters.self.file_chunks = count_file_chunks.RetrieveInt64(0);
1796  stats_counters.self.file_size = aggregate_file_size.RetrieveInt64(0);
1797 
1798  // Write back the generated statistics counters into the catalog database
1799  catalog::Counters counters;
1800  retval = counters.ReadFromDatabase(writable, catalog::LegacyMode::kLegacy);
1801  if (!retval) {
1802  Error("Failed to read old catalog statistics counters", data);
1803  return false;
1804  }
1805  counters.ApplyDelta(stats_counters);
1806  retval = counters.InsertIntoDatabase(writable);
1807  if (!retval) {
1808  Error("Failed to write new statistics counters to database", data);
1809  return false;
1810  }
1811 
1812  // Push the generated statistics counters up to the parent catalog
1813  data->nested_statistics.Set(stats_counters);
1814 
1815  return true;
1816 }
1817 
1818 
1820  (PendingCatalog *data) const {
1821  assert(!data->HasNew());
1822  const catalog::CatalogDatabase &writable =
1823  GetWritable(data->old_catalog)->database();
1824  catalog::SqlCatalog update_schema_version(writable,
1825  "UPDATE properties SET value = :schema_version WHERE key = 'schema';");
1826 
1827  const bool retval =
1828  update_schema_version.BindDouble(1, 2.5) &&
1829  update_schema_version.Execute();
1830  if (!retval) {
1831  Error("Failed to update catalog schema version",
1832  update_schema_version,
1833  data);
1834  return false;
1835  }
1836 
1837  return true;
1838 }
1839 
1840 
1842  (PendingCatalog *data) const {
1843  assert(!data->HasNew());
1844  GetWritable(data->old_catalog)->Commit();
1845  return true;
1846 }
1847 
1848 
1849 //------------------------------------------------------------------------------
1850 
1851 
1853  const worker_context *context)
1855  , uid_map_statement_(GenerateMappingStatement(context->uid_map, "uid"))
1856  , gid_map_statement_(GenerateMappingStatement(context->gid_map, "gid"))
1857 {}
1858 
1860  PendingCatalog *data) const {
1861  return ApplyPersonaMappings(data);
1862 }
1863 
1864 
1866  PendingCatalog *data) const {
1867  assert(data->old_catalog != NULL);
1868  assert(data->new_catalog == NULL);
1869 
1870  if (data->old_catalog->mountpoint() ==
1872  {
1873  // skipping virtual catalog
1874  return true;
1875  }
1876 
1877  const catalog::CatalogDatabase &db =
1878  GetWritable(data->old_catalog)->database();
1879 
1880  if (!db.BeginTransaction()) {
1881  return false;
1882  }
1883 
1884  catalog::SqlCatalog uid_sql(db, uid_map_statement_);
1885  if (!uid_sql.Execute()) {
1886  Error("Failed to update UIDs", uid_sql, data);
1887  return false;
1888  }
1889 
1890  catalog::SqlCatalog gid_sql(db, gid_map_statement_);
1891  if (!gid_sql.Execute()) {
1892  Error("Failed to update GIDs", gid_sql, data);
1893  return false;
1894  }
1895 
1896  return db.CommitTransaction();
1897 }
1898 
1899 
1900 template <class MapT>
1902  const MapT &map,
1903  const std::string &column) const {
1904  assert(map.RuleCount() > 0 || map.HasDefault());
1905 
1906  std::string stmt = "UPDATE OR ABORT catalog SET " + column + " = ";
1907 
1908  if (map.RuleCount() == 0) {
1909  // map everything to the same value (just a simple UPDATE clause)
1910  stmt += StringifyInt(map.GetDefault());
1911  } else {
1912  // apply multiple ID mappings (UPDATE clause with CASE statement)
1913  stmt += "CASE " + column + " ";
1914  typedef typename MapT::map_type::const_iterator map_iterator;
1915  map_iterator i = map.GetRuleMap().begin();
1916  const map_iterator iend = map.GetRuleMap().end();
1917  for (; i != iend; ++i) {
1918  stmt += "WHEN " + StringifyInt(i->first) +
1919  " THEN " + StringifyInt(i->second) + " ";
1920  }
1921 
1922  // add a default (if provided) or leave unchanged if no mapping fits
1923  stmt += (map.HasDefault())
1924  ? "ELSE " + StringifyInt(map.GetDefault()) + " "
1925  : "ELSE " + column + " ";
1926  stmt += "END";
1927  }
1928 
1929  stmt += ";";
1930  return stmt;
1931 }
1932 
1933 
1934 //------------------------------------------------------------------------------
1935 
1936 
1938  PendingCatalog *data) const {
1939  return CheckDatabaseSchemaCompatibility(data) &&
1940  BreakUpHardlinks(data);
1941 }
1942 
1943 
1944 bool
1946  (PendingCatalog *data) const {
1947  assert(data->old_catalog != NULL);
1948  assert(data->new_catalog == NULL);
1949 
1950  const catalog::CatalogDatabase &clg = data->old_catalog->database();
1952 }
1953 
1954 
1956  PendingCatalog *data) const {
1957  assert(data->old_catalog != NULL);
1958  assert(data->new_catalog == NULL);
1959 
1960  const catalog::CatalogDatabase &db =
1961  GetWritable(data->old_catalog)->database();
1962 
1963  if (!db.BeginTransaction()) {
1964  return false;
1965  }
1966 
1967  // CernVM-FS catalogs do not contain inodes directly but they are assigned by
1968  // the CVMFS catalog at runtime. Hardlinks are treated with so-called hardlink
1969  // group IDs to indicate hardlink relationships that need to be respected at
1970  // runtime by assigning identical inodes accordingly.
1971  //
1972  // This updates all directory entries of a given catalog that have a linkcount
1973  // greater than 1 and are flagged as a 'file'. Note: Symlinks are flagged both
1974  // as 'file' and as 'symlink', hence they are updated implicitly as well.
1975  //
1976  // The 'hardlinks' field in the catalog contains two 32 bit integers:
1977  // * the linkcount in the lower 32 bits
1978  // * the (so called) hardlink group ID in the higher 32 bits
1979  //
1980  // Files that have a linkcount of exactly 1 do not have any hardlinks and have
1981  // the (implicit) hardlink group ID '0'. Hence, 'hardlinks == 1' means that a
1982  // file doesn't have any hardlinks (linkcount = 1) and doesn't need treatment
1983  // here.
1984  //
1985  // Files that have hardlinks (linkcount > 1) will have a very large integer in
1986  // their 'hardlinks' field (hardlink group ID > 0 in higher 32 bits). Those
1987  // files will be treated by setting their 'hardlinks' field to 1, effectively
1988  // clearing all hardlink information from the directory entry.
1989  const std::string stmt = "UPDATE OR ABORT catalog "
1990  "SET hardlinks = 1 "
1991  "WHERE flags & :file_flag "
1992  " AND hardlinks > 1;";
1993  catalog::SqlCatalog hardlink_removal_sql(db, stmt);
1994  hardlink_removal_sql.BindInt64(1, catalog::SqlDirent::kFlagFile);
1995  hardlink_removal_sql.Execute();
1996 
1997  return db.CommitTransaction();
1998 }
1999 
2000 //------------------------------------------------------------------------------
2001 
2002 
2004  PendingCatalog *data) const {
2005  return CheckDatabaseSchemaCompatibility(data) &&
2006  RemoveRedundantBulkHashes(data);
2007 }
2008 
2009 
2010 bool
2012  (PendingCatalog *data) const {
2013  assert(data->old_catalog != NULL);
2014  assert(data->new_catalog == NULL);
2015 
2016  const catalog::CatalogDatabase &clg = data->old_catalog->database();
2018 }
2019 
2020 
2022  PendingCatalog *data) const {
2023  assert(data->old_catalog != NULL);
2024  assert(data->new_catalog == NULL);
2025 
2026  const catalog::CatalogDatabase &db =
2027  GetWritable(data->old_catalog)->database();
2028 
2029  if (!db.BeginTransaction()) {
2030  return false;
2031  }
2032 
2033  // Regular files with both bulk hashes and chunked hashes can drop the bulk
2034  // hash since modern clients >= 2.1.7 won't require them
2035  const std::string stmt = "UPDATE OR ABORT catalog "
2036  "SET hash = NULL "
2037  "WHERE flags & :file_chunked_flag;";
2038  catalog::SqlCatalog bulkhash_removal_sql(db, stmt);
2039  bulkhash_removal_sql.BindInt64(1, catalog::SqlDirent::kFlagFileChunk);
2040  bulkhash_removal_sql.Execute();
2041 
2042  return db.CommitTransaction();
2043 }
2044 
2045 
2046 //------------------------------------------------------------------------------
2047 
2048 
2050  const worker_context *context)
2052 { }
2053 
2054 
2056  const
2057 {
2058  return CheckDatabaseSchemaCompatibility(data) &&
2059  StartDatabaseTransaction(data) &&
2060  RepairStatisticsCounters(data) &&
2061  CommitDatabaseTransaction(data);
2062 }
2063 
2064 
2066  PendingCatalog *data) const
2067 {
2068  assert(data->old_catalog != NULL);
2069  assert(data->new_catalog == NULL);
2070 
2071  const catalog::CatalogDatabase &clg = data->old_catalog->database();
2073  Error("Given catalog schema is < 2.5.", data);
2074  return false;
2075  }
2076 
2077  if (clg.schema_revision() < 5) {
2078  Error("Given catalog revision is < 5", data);
2079  return false;
2080  }
2081 
2082  return true;
2083 }
2084 
2085 
2087  PendingCatalog *data) const
2088 {
2089  assert(!data->HasNew());
2090  GetWritable(data->old_catalog)->Transaction();
2091  return true;
2092 }
2093 
2094 
2096  PendingCatalog *data) const
2097 {
2098  assert(!data->HasNew());
2099  bool retval = false;
2100  const catalog::CatalogDatabase &writable =
2101  GetWritable(data->old_catalog)->database();
2102 
2103  // Aggregated the statistics counters of all nested catalogs
2104  // Note: we might need to wait until nested catalogs are successfully
2105  // processed
2106  catalog::DeltaCounters stats_counters;
2107  PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
2108  PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
2109  for (; i != iend; ++i) {
2110  const PendingCatalog *nested_catalog = *i;
2111  const catalog::DeltaCounters &s = nested_catalog->nested_statistics.Get();
2112  s.PopulateToParent(&stats_counters);
2113  }
2114 
2115  // Count various directory entry types in the catalog to fill up the catalog
2116  // statistics counters introduced in the current catalog schema
2117  catalog::SqlCatalog count_regular(writable,
2118  std::string("SELECT count(*), sum(size) FROM catalog ") +
2119  "WHERE flags & " + StringifyInt(catalog::SqlDirent::kFlagFile) +
2120  " AND NOT flags & " + StringifyInt(catalog::SqlDirent::kFlagLink) +
2121  " AND NOT flags & " + StringifyInt(catalog::SqlDirent::kFlagFileSpecial) +
2122  ";");
2123  catalog::SqlCatalog count_external(writable,
2124  std::string("SELECT count(*), sum(size) FROM catalog ") +
2126  ";");
2127  catalog::SqlCatalog count_symlink(writable,
2128  std::string("SELECT count(*) FROM catalog ") +
2129  "WHERE flags & " + StringifyInt(catalog::SqlDirent::kFlagLink) + ";");
2130  catalog::SqlCatalog count_special(writable,
2131  std::string("SELECT count(*) FROM catalog ") +
2133  ";");
2134  catalog::SqlCatalog count_xattr(writable,
2135  std::string("SELECT count(*) FROM catalog ") +
2136  "WHERE xattr IS NOT NULL;");
2137  catalog::SqlCatalog count_chunk(writable,
2138  std::string("SELECT count(*), sum(size) FROM catalog ") +
2139  "WHERE flags & " + StringifyInt(catalog::SqlDirent::kFlagFileChunk) + ";");
2140  catalog::SqlCatalog count_dir(writable,
2141  std::string("SELECT count(*) FROM catalog ") +
2142  "WHERE flags & " + StringifyInt(catalog::SqlDirent::kFlagDir) + ";");
2143  catalog::SqlCatalog count_chunk_blobs(writable,
2144  "SELECT count(*) FROM chunks;");
2145 
2146  retval = count_regular.FetchRow() &&
2147  count_external.FetchRow() &&
2148  count_symlink.FetchRow() &&
2149  count_special.FetchRow() &&
2150  count_xattr.FetchRow() &&
2151  count_chunk.FetchRow() &&
2152  count_dir.FetchRow() &&
2153  count_chunk_blobs.FetchRow();
2154  if (!retval) {
2155  Error("Failed to collect catalog statistics", data);
2156  return false;
2157  }
2158 
2159  stats_counters.self.regular_files = count_regular.RetrieveInt64(0);
2160  stats_counters.self.symlinks = count_symlink.RetrieveInt64(0);
2161  stats_counters.self.specials = count_special.RetrieveInt64(0);
2162  stats_counters.self.directories = count_dir.RetrieveInt64(0);
2163  stats_counters.self.nested_catalogs = data->nested_catalogs.size();
2164  stats_counters.self.chunked_files = count_chunk.RetrieveInt64(0);
2165  stats_counters.self.file_chunks = count_chunk_blobs.RetrieveInt64(0);
2166  stats_counters.self.file_size = count_regular.RetrieveInt64(1);
2167  stats_counters.self.chunked_file_size = count_chunk.RetrieveInt64(1);
2168  stats_counters.self.xattrs = count_xattr.RetrieveInt64(0);
2169  stats_counters.self.externals = count_external.RetrieveInt64(0);
2170  stats_counters.self.external_file_size = count_external.RetrieveInt64(1);
2171 
2172  // Write back the generated statistics counters into the catalog database
2173  catalog::Counters counters;
2174  counters.ApplyDelta(stats_counters);
2175  retval = counters.InsertIntoDatabase(writable);
2176  if (!retval) {
2177  Error("Failed to write new statistics counters to database", data);
2178  return false;
2179  }
2180 
2181  // Push the generated statistics counters up to the parent catalog
2182  data->nested_statistics.Set(stats_counters);
2183 
2184  return true;
2185 }
2186 
2187 
2189  PendingCatalog *data) const
2190 {
2191  assert(!data->HasNew());
2192  GetWritable(data->old_catalog)->Commit();
2193  return true;
2194 }
2195 
2196 } // namespace swissknife
bool InsertIntoDatabase(const CatalogDatabase &database) const
uint32_t linkcount() const
catalog::Catalog const * root_catalog_
int return_code
the return value of the spooler operation
void ConvertCatalogsRecursively(PendingCatalog *catalog, MigratorT *migrator)
static Parameter Optional(const char key, const std::string &desc)
Definition: swissknife.h:41
bool UpdateCatalogMetadata(PendingCatalog *data) const
std::string GetLastErrorMsg() const
Definition: sql.cc:164
bool AnalyzeFileLinkcounts(PendingCatalog *data) const
UniquePtr< upload::Spooler > spooler_
CallbackPtr RegisterListener(typename BoundClosure< WorkerT::returned_data, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data)
PendingCatalogMap pending_catalogs_
Differences CompareTo(const DirectoryEntry &other) const
bool ReadPersonaMaps(const std::string &uid_map_path, const std::string &gid_map_path, UidMap *uid_map, GidMap *gid_map) const
const manifest::Manifest * manifest() const
Definition: repository.h:125
bool Export(const std::string &path) const
Definition: manifest.cc:223
shash::Md5 GetPathHash() const
Definition: catalog_sql.cc:683
bool Execute()
Definition: sql.cc:42
bool Reset()
Definition: sql.cc:126
static Parameter Switch(const char key, const std::string &desc)
Definition: swissknife.h:44
bool MigrateFileMetadata(PendingCatalog *data) const
bool UpdateCatalogSchema(PendingCatalog *data) const
FileChunkList file_chunks
the file chunks generated during processing
bool BindText(const int index, const std::string &value)
Definition: sql.h:399
void Insert(const CatalogStatistics &statistics)
std::string database_path() const
Definition: catalog.h:184
void set_catalog_hash(const shash::Any &catalog_hash)
Definition: manifest.h:110
bool FetchRow()
Definition: sql.cc:62
bool FindRootEntryInformation(PendingCatalog *data) const
const std::string & filename() const
Definition: sql.h:148
ConcurrentWorkers< DerivedWorkerT > * master() const
Definition: concurrency.h:709
void WaitForEmptyQueue() const
bool StartDatabaseTransaction(PendingCatalog *data) const
bool LoadCatalogs(const shash::Any &manual_root_hash, ObjectFetcherT *object_fetcher)
bool BindDirent(const DirectoryEntry &entry)
bool FixNestedCatalogTransitionPoints(PendingCatalog *data) const
static const std::string kPreviousHeadTag
Definition: repository.cc:41
std::string name
Definition: history.h:88
std::vector< Parameter > ParameterList
Definition: swissknife.h:71
void CreateNestedCatalogMarkerDirent(const shash::Any &content_hash)
double GetTime() const
Definition: algorithm.cc:76
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:249
void Assign(const char *chars, const unsigned length)
Definition: shortstring.h:61
void UploadCallback(const upload::SpoolerResult &result)
bool IsDirty() const
Definition: catalog_rw.h:57
const history::History * history() const
gid_t gid_
Definition: loader.cc:133
gid_t gid_
void Reset()
Definition: algorithm.cc:69
void ApplyDelta(const DeltaCounters &delta)
bool IsHttpUrl(const std::string &path)
Definition: posix.cc:168
DirectoryEntry GetDirent(const Catalog *catalog, const bool expand_symlink=true) const
Definition: catalog_sql.cc:696
bool BeginTransaction() const
Definition: sql_impl.h:271
std::string CreateTempPath(const std::string &path_prefix, const int mode)
Definition: posix.cc:1045
unsigned int GetNumberOfCpuCores()
Definition: concurrency.cc:18
void set_revision(const uint64_t revision)
Definition: manifest.h:94
bool UpdateUndoTags(PendingCatalog *root_catalog, uint64_t revision, time_t timestamp, shash::Any *history_hash)
static void FixNestedCatalogTransitionPoint(const catalog::DirectoryEntry &nested_root, catalog::DirectoryEntry *mountpoint)
virtual ParameterList GetParams() const
uint64_t GetTTL() const
Definition: catalog.cc:495
assert((mem||(size==0))&&"Out Of Memory")
static const int kFlagFileChunk
Definition: catalog_sql.h:186
float schema_version() const
Definition: sql.h:149
CatalogList GetChildren() const
Definition: catalog.cc:752
bool InitVerifyingSignatureManager(const std::string &pubkey_path)
Definition: server_tool.cc:44
void PrintStatusMessage(const PendingCatalog *catalog, const shash::Any &content_hash, const std::string &message)
void SetPreviousRevision(const shash::Any &hash)
Definition: catalog_rw.cc:352
bool RemoveDanglingNestedMountpoints(PendingCatalog *data) const
static const int kFlagFile
Definition: catalog_sql.h:183
static catalog::DirectoryEntry nested_catalog_marker_
bool AttachOldCatalogDatabase(PendingCatalog *data) const
bool CopyPath2Path(const string &src, const string &dest)
Definition: compression.cc:63
bool ReadPersona(const std::string &uid, const std::string &gid)
const NestedCatalogList & ListNestedCatalogs() const
Definition: catalog.cc:630
void Set(const T &object)
Definition: future.h:53
uint64_t size_
void JobSuccessful(const returned_data_t &data)
Definition: concurrency.h:510
bool MigrateNestedCatalogMountPoints(PendingCatalog *data) const
unsigned schema_revision() const
Definition: sql.h:150
void CatalogCallback(const CatalogTraversalData< catalog::WritableCatalog > &data)
signature::SignatureManager * signature_manager() const
Definition: server_tool.cc:106
T & Get()
Definition: future.h:66
UniquePtr< history::SqliteHistory > history_upstream_
static const int kFlagLink
Definition: catalog_sql.h:184
static void Error(const std::string &message)
bool WriteToDatabase(const CatalogDatabase &database) const
bool IsNestedCatalogMountpoint() const
bool BindPathHash(const shash::Md5 &hash)
bool CommitDatabaseTransaction(PendingCatalog *data) const
static SqliteHistory * OpenWritable(const std::string &file_name)
static WritableCatalog * AttachFreely(const std::string &root_path, const std::string &file, const shash::Any &catalog_hash, Catalog *parent=NULL, const bool is_not_root=false)
Definition: catalog_rw.cc:49
std::string description
Definition: history.h:93
NameString name_
uint64_t GetRevision() const
Definition: catalog.cc:528
int GetLastError() const
Definition: sql.h:343
bool GenerateCatalogStatistics(PendingCatalog *data) const
NameString name() const
uint64_t GetLastModified() const
Definition: catalog.cc:533
int64_t String2Int64(const string &value)
Definition: string.cc:222
bool BindDouble(const int index, const double value)
Definition: sql.h:371
void UploadHistoryClosure(const upload::SpoolerResult &result, Future< shash::Any > *hash)
std::string local_path
the local_path previously given as input
uid_t uid_
Definition: loader.cc:132
download::DownloadManager * download_manager() const
Definition: server_tool.cc:101
Future< catalog::DirectoryEntry > root_entry
std::string GenerateMappingStatement(const MapT &map, const std::string &column) const
static const std::string kPreviousHeadTagDescription
static const float kSchemaEpsilon
Definition: sql.h:105
bool RepairStatisticsCounters(PendingCatalog *data) const
bool CheckDatabaseSchemaCompatibility(PendingCatalog *data) const
static Parameter Mandatory(const char key, const std::string &desc)
Definition: swissknife.h:38
bool StartDatabaseTransaction(PendingCatalog *data) const
bool DetachOldCatalogDatabase(PendingCatalog *data) const
const char kSuffixCatalog
Definition: hash.h:54
uint32_t linkcount_
bool ApplyPersonaMappings(PendingCatalog *data) const
static const int kFlagFileSpecial
Definition: catalog_sql.h:185
static const int kFlagFileExternal
Definition: catalog_sql.h:192
bool CommitDatabaseTransaction(PendingCatalog *data) const
static const std::string kHeadTag
Definition: repository.cc:40
static const catalog::DirectoryEntry & GetNestedCatalogMarkerDirent()
void PopulateToParent(DeltaCounters *parent) const
bool InsertInitialValues(const std::string &root_path, const bool volatile_content, const std::string &voms_authz, const DirectoryEntry &root_entry=DirectoryEntry(kDirentNegative))
Definition: catalog_sql.cc:263
sqlite3_int64 RetrieveInt64(const int idx_column) const
Definition: sql.h:445
bool RunMigration(PendingCatalog *data) const
PathString mountpoint() const
Definition: catalog.h:179
time_t mtime_
bool BindPathHash(const struct shash::Md5 &hash)
Definition: catalog_sql.cc:788
string StringifyInt(const int64_t value)
Definition: string.cc:78
bool IsValid() const
Definition: pointer.h:43
bool CommitTransaction() const
Definition: sql_impl.h:278
bool ReadFromDatabase(const CatalogDatabase &database, const LegacyMode::Type legacy=LegacyMode::kNoLegacy)
bool BindInt64(const int index, const sqlite3_int64 value)
Definition: sql.h:381
void Stop()
Definition: algorithm.cc:61
void set_history(const shash::Any &history_db)
Definition: manifest.h:98
Future< catalog::DeltaCounters > nested_statistics
bool CollectAndAggregateStatistics(PendingCatalog *data) const
std::vector< Catalog * > CatalogList
Definition: catalog.h:38
bool GenerateNewStatisticsCounters(PendingCatalog *data) const
void set_ttl(const uint32_t ttl)
Definition: manifest.h:93
bool CommitDatabaseTransaction(PendingCatalog *data) const
bool RunMigration(PendingCatalog *data) const
std::string ToString() const
Definition: shortstring.h:141
std::vector< NestedCatalog > NestedCatalogList
Definition: catalog.h:208
int Main(const ArgumentList &args)
std::map< char, SharedPtr< std::string > > ArgumentList
Definition: swissknife.h:72
unsigned int mode_
ShortString< kDefaultMaxPath, 0 > PathString
Definition: shortstring.h:217
MigrationWorker_20x(const worker_context *context)
UniquePtr< manifest::Manifest > manifest_upstream_
shash::Any root_hash
Definition: history.h:89
bool CleanupNestedCatalogs(PendingCatalog *data) const
bool UpdateNestedCatalogReferences(PendingCatalog *data) const
bool CreateNewEmptyCatalog(PendingCatalog *data) const
void set_catalog_size(const uint64_t catalog_size)
Definition: manifest.h:107
CatalogStatisticsList catalog_statistics_list_
const CatalogDatabase & database() const
Definition: catalog.h:249
unsigned int GetNumberOfFailedJobs() const
Definition: concurrency.h:500
Any MkFromHexPtr(const HexPtr hex, const char suffix)
Definition: hash.cc:83
int64_t GetFileSize(const std::string &path)
Definition: posix.cc:812
HttpObjectFetcher ObjectFetcher
bool CheckDatabaseSchemaCompatibility(PendingCatalog *data) const
static const float kLatestSupportedSchema
Definition: catalog_sql.h:44
bool StartDatabaseTransaction(PendingCatalog *data) const
void JobFailed(const returned_data_t &data)
Definition: concurrency.h:524
unsigned GetLength() const
Definition: shortstring.h:131
shash::Any checksum_
shash::Any hash() const
Definition: catalog.h:186
static const char * kVirtualPath
uint64_t revision
Definition: history.h:91
bool InitDownloadManager(const bool follow_redirects, const std::string &proxy, const unsigned max_pool_handles=1)
Definition: server_tool.cc:17
bool CheckDatabaseSchemaCompatibility(PendingCatalog *data) const
bool BindMd5(const int idx_high, const int idx_low, const shash::Md5 &hash)
Definition: catalog_sql.h:138
const char * GetChars() const
Definition: shortstring.h:123
uint32_t hardlink_group() const
bool BindParentPathHash(const shash::Md5 &hash)
static const unsigned kLatestSchemaRevision
Definition: catalog_sql.h:46
static const int kFlagDir
Definition: catalog_sql.h:178
void MigrationCallback(PendingCatalog *const &data)
void Start()
Definition: algorithm.cc:53
void set_is_nested_catalog_mountpoint(const bool val)
unsigned int Differences
bool BindPathHash(const shash::Md5 &hash)
catalog::WritableCatalog * GetWritable(const catalog::Catalog *catalog) const
void Destroy()
Definition: pointer.h:45
static DerivedT * Create(const std::string &filename)
Definition: sql_impl.h:30
bool DoMigrationAndCommit(const std::string &manifest_path, typename MigratorT::worker_context *context)
size_t size() const
Definition: bigvector.h:121
bool BindDirent(const DirectoryEntry &entry)
uid_t uid_
void set_root_path(const std::string &root_path)
Definition: manifest.h:122
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528