CernVM-FS  2.10.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
swissknife_migrate.cc
Go to the documentation of this file.
1 
8 #include "swissknife_migrate.h"
9 
10 #include <sys/resource.h>
11 #include <unistd.h>
12 
13 #include "catalog_rw.h"
14 #include "catalog_sql.h"
15 #include "catalog_virtual.h"
16 #include "compression.h"
17 #include "hash.h"
18 #include "logging.h"
19 #include "swissknife_history.h"
20 #include "util_concurrency.h"
21 
22 using namespace std; // NOLINT
23 
24 namespace swissknife {
25 
26 catalog::DirectoryEntry CommandMigrate::nested_catalog_marker_;
27 
28 CommandMigrate::CommandMigrate() :
29  file_descriptor_limit_(8192),
30  catalog_count_(0),
31  has_committed_new_revision_(false),
32  uid_(0),
33  gid_(0),
34  root_catalog_(NULL)
35 {
36  atomic_init32(&catalogs_processed_);
37 }
38 
39 
41  ParameterList r;
42  r.push_back(Parameter::Mandatory('v',
43  "migration base version ( 2.0.x | 2.1.7 | chown | hardlink | bulkhash | "
44  "stats)"));
45  r.push_back(Parameter::Mandatory('r',
46  "repository URL (absolute local path or remote URL)"));
47  r.push_back(Parameter::Mandatory('u', "upstream definition string"));
48  r.push_back(Parameter::Mandatory('o', "manifest output file"));
49  r.push_back(Parameter::Mandatory('t',
50  "temporary directory for catalog decompress"));
51  r.push_back(Parameter::Optional('p',
52  "user id to be used for this repository"));
53  r.push_back(Parameter::Optional('g',
54  "group id to be used for this repository"));
55  r.push_back(Parameter::Optional('n', "fully qualified repository name"));
56  r.push_back(Parameter::Optional('h', "root hash (other than trunk)"));
57  r.push_back(Parameter::Optional('k', "repository master key(s)"));
58  r.push_back(Parameter::Optional('i', "UID map for chown"));
59  r.push_back(Parameter::Optional('j', "GID map for chown"));
60  r.push_back(Parameter::Optional('@', "proxy url"));
61  r.push_back(Parameter::Switch('f', "fix nested catalog transition points"));
62  r.push_back(Parameter::Switch('l', "disable linkcount analysis of files"));
63  r.push_back(Parameter::Switch('s',
64  "enable collection of catalog statistics"));
65  return r;
66 }
67 
68 
69 static void Error(const std::string &message) {
70  LogCvmfs(kLogCatalog, kLogStderr, message.c_str());
71 }
72 
73 
74 static void Error(const std::string &message,
75  const CommandMigrate::PendingCatalog *catalog) {
76  const std::string err_msg = message + "\n"
77  "Catalog: " + catalog->root_path();
78  Error(err_msg);
79 }
80 
81 
82 static void Error(const std::string &message,
83  const catalog::SqlCatalog &statement,
84  const CommandMigrate::PendingCatalog *catalog) {
85  const std::string err_msg =
86  message + "\n"
87  "SQLite: " + StringifyInt(statement.GetLastError()) +
88  " - " + statement.GetLastErrorMsg();
89  Error(err_msg, catalog);
90 }
91 
92 
94  shash::Any manual_root_hash;
95  const std::string &migration_base = *args.find('v')->second;
96  const std::string &repo_url = *args.find('r')->second;
97  const std::string &spooler = *args.find('u')->second;
98  const std::string &manifest_path = *args.find('o')->second;
99  const std::string &tmp_dir = *args.find('t')->second;
100  const std::string &uid = (args.count('p') > 0) ?
101  *args.find('p')->second :
102  "";
103  const std::string &gid = (args.count('g') > 0) ?
104  *args.find('g')->second :
105  "";
106  const std::string &repo_name = (args.count('n') > 0) ?
107  *args.find('n')->second :
108  "";
109  const std::string &repo_keys = (args.count('k') > 0) ?
110  *args.find('k')->second :
111  "";
112  const std::string &uid_map_path = (args.count('i') > 0) ?
113  *args.find('i')->second :
114  "";
115  const std::string &gid_map_path = (args.count('j') > 0) ?
116  *args.find('j')->second :
117  "";
118  const bool fix_transition_points = (args.count('f') > 0);
119  const bool analyze_file_linkcounts = (args.count('l') == 0);
120  const bool collect_catalog_statistics = (args.count('s') > 0);
121  if (args.count('h') > 0) {
122  manual_root_hash = shash::MkFromHexPtr(shash::HexPtr(
123  *args.find('h')->second), shash::kSuffixCatalog);
124  }
125 
126  // We might need a lot of file descriptors
127  if (!RaiseFileDescriptorLimit()) {
128  Error("Failed to raise file descriptor limits");
129  return 2;
130  }
131 
132  // Put SQLite into multithreaded mode
133  if (!ConfigureSQLite()) {
134  Error("Failed to preconfigure SQLite library");
135  return 3;
136  }
137 
138  // Create an upstream spooler
139  temporary_directory_ = tmp_dir;
140  const upload::SpoolerDefinition spooler_definition(spooler, shash::kSha1);
141  spooler_ = upload::Spooler::Construct(spooler_definition);
142  if (!spooler_.IsValid()) {
143  Error("Failed to create upstream Spooler.");
144  return 5;
145  }
146  spooler_->RegisterListener(&CommandMigrate::UploadCallback, this);
147 
148  // Load the full catalog hierarchy
149  LogCvmfs(kLogCatalog, kLogStdout, "Loading current catalog tree...");
150 
152  bool loading_successful = false;
153  if (IsHttpUrl(repo_url)) {
155 
156  const bool follow_redirects = false;
157  const string proxy = (args.count('@') > 0) ? *args.find('@')->second : "";
158  if (!this->InitDownloadManager(follow_redirects, proxy) ||
159  !this->InitVerifyingSignatureManager(repo_keys)) {
160  LogCvmfs(kLogCatalog, kLogStderr, "Failed to init repo connection");
161  return 1;
162  }
163 
164  ObjectFetcher fetcher(repo_name,
165  repo_url,
166  tmp_dir,
169 
170  loading_successful = LoadCatalogs(manual_root_hash, &fetcher);
171  } else {
173  ObjectFetcher fetcher(repo_url, tmp_dir);
174  loading_successful = LoadCatalogs(manual_root_hash, &fetcher);
175  }
177 
178  if (!loading_successful) {
179  Error("Failed to load catalog tree");
180  return 4;
181  }
182 
183  LogCvmfs(kLogCatalog, kLogStdout, "Loaded %d catalogs", catalog_count_);
184  assert(root_catalog_ != NULL);
185 
186  // Do the actual migration step
187  bool migration_succeeded = false;
188  if (migration_base == "2.0.x") {
189  if (!ReadPersona(uid, gid)) {
190  return 1;
191  }
192 
193  // Generate and upload a nested catalog marker
195  Error("Failed to create a nested catalog marker.");
196  return 6;
197  }
198  spooler_->WaitForUpload();
199 
200  // Configure the concurrent catalog migration facility
202  collect_catalog_statistics,
203  fix_transition_points,
204  analyze_file_linkcounts,
205  uid_,
206  gid_);
207  migration_succeeded =
208  DoMigrationAndCommit<MigrationWorker_20x>(manifest_path, &context);
209  } else if (migration_base == "2.1.7") {
210  MigrationWorker_217::worker_context context(temporary_directory_,
211  collect_catalog_statistics);
212  migration_succeeded =
213  DoMigrationAndCommit<MigrationWorker_217>(manifest_path, &context);
214  } else if (migration_base == "chown") {
215  UidMap uid_map;
216  GidMap gid_map;
217  if (!ReadPersonaMaps(uid_map_path, gid_map_path, &uid_map, &gid_map)) {
218  Error("Failed to read UID and/or GID map");
219  return 1;
220  }
222  collect_catalog_statistics,
223  uid_map,
224  gid_map);
225  migration_succeeded =
226  DoMigrationAndCommit<ChownMigrationWorker>(manifest_path, &context);
227  } else if (migration_base == "hardlink") {
228  HardlinkRemovalMigrationWorker::worker_context
229  context(temporary_directory_, collect_catalog_statistics);
230  migration_succeeded =
231  DoMigrationAndCommit<HardlinkRemovalMigrationWorker>(manifest_path,
232  &context);
233  } else if (migration_base == "bulkhash") {
234  BulkhashRemovalMigrationWorker::worker_context
235  context(temporary_directory_, collect_catalog_statistics);
236  migration_succeeded =
237  DoMigrationAndCommit<BulkhashRemovalMigrationWorker>(manifest_path,
238  &context);
239  } else if (migration_base == "stats") {
240  StatsMigrationWorker::worker_context context(
241  temporary_directory_, collect_catalog_statistics);
242  migration_succeeded =
243  DoMigrationAndCommit<StatsMigrationWorker>(manifest_path, &context);
244  } else {
245  const std::string err_msg = "Unknown migration base: " + migration_base;
246  Error(err_msg);
247  return 1;
248  }
249 
250  // Check if everything went well
251  if (!migration_succeeded) {
252  Error("Migration failed!");
253  return 5;
254  }
255 
256  // Analyze collected statistics
257  if (collect_catalog_statistics && has_committed_new_revision_) {
258  LogCvmfs(kLogCatalog, kLogStdout, "\nCollected statistics results:");
260  }
261 
262  LogCvmfs(kLogCatalog, kLogStdout, "\nCatalog Migration succeeded");
263  return 0;
264 }
265 
266 
267 bool CommandMigrate::ReadPersona(const std::string &uid,
268  const std::string &gid) {
269  if (uid.empty()) {
270  Error("Please provide a user ID");
271  return false;
272  }
273  if (gid.empty()) {
274  Error("Please provide a group ID");
275  return false;
276  }
277 
278  uid_ = String2Int64(uid);
279  gid_ = String2Int64(gid);
280  return true;
281 }
282 
283 
284 
285 bool CommandMigrate::ReadPersonaMaps(const std::string &uid_map_path,
286  const std::string &gid_map_path,
287  UidMap *uid_map,
288  GidMap *gid_map) const {
289  if (!uid_map->Read(uid_map_path) || !uid_map->IsValid()) {
290  Error("Failed to read UID map");
291  return false;
292  }
293 
294  if (!gid_map->Read(gid_map_path) || !gid_map->IsValid()) {
295  Error("Failed to read GID map");
296  return false;
297  }
298 
299  if (uid_map->RuleCount() == 0 && !uid_map->HasDefault()) {
300  Error("UID map appears to be empty");
301  return false;
302  }
303 
304  if (gid_map->RuleCount() == 0 && !gid_map->HasDefault()) {
305  Error("GID map appears to be empty");
306  return false;
307  }
308 
309  return true;
310 }
311 
312 
314  const upload::SpoolerResult &result,
315  Future<shash::Any> *hash)
316 {
317  assert(!result.IsChunked());
318  if (result.return_code != 0) {
319  LogCvmfs(kLogCvmfs, kLogStderr, "failed to upload history database (%d)",
320  result.return_code);
321  hash->Set(shash::Any());
322  } else {
323  hash->Set(result.content_hash);
324  }
325 }
326 
327 
329  PendingCatalog *root_catalog,
330  unsigned revision,
331  time_t timestamp,
332  shash::Any *history_hash)
333 {
334  string filename_old = history_upstream_->filename();
335  string filename_new = filename_old + ".new";
336  bool retval = CopyPath2Path(filename_old, filename_new);
337  if (!retval) return false;
340  history->TakeDatabaseFileOwnership();
341 
342  history::History::Tag tag_trunk;
343  bool exists = history->GetByName(CommandTag::kHeadTag, &tag_trunk);
344  if (exists) {
345  retval = history->Remove(CommandTag::kHeadTag);
346  if (!retval) return false;
347 
348  history::History::Tag tag_trunk_previous = tag_trunk;
349  tag_trunk_previous.name = CommandTag::kPreviousHeadTag;
351  history->Remove(CommandTag::kPreviousHeadTag);
352 
353  tag_trunk.root_hash = root_catalog->new_catalog_hash;
354  tag_trunk.size = root_catalog->new_catalog_size;
355  tag_trunk.revision = root_catalog->new_catalog_size;
356  tag_trunk.revision = revision;
357  tag_trunk.timestamp = timestamp;
358 
359  retval = history->Insert(tag_trunk_previous);
360  if (!retval) return false;
361  retval = history->Insert(tag_trunk);
362  if (!retval) return false;
363  }
364 
365  history->SetPreviousRevision(manifest_upstream_->history());
366  history->DropDatabaseFileOwnership();
367  history.Destroy();
368 
369  Future<shash::Any> history_hash_new;
370  upload::Spooler::CallbackPtr callback = spooler_->RegisterListener(
371  &CommandMigrate::UploadHistoryClosure, this, &history_hash_new);
372  spooler_->ProcessHistory(filename_new);
373  spooler_->WaitForUpload();
374  spooler_->UnregisterListener(callback);
375  unlink(filename_new.c_str());
376  *history_hash = history_hash_new.Get();
377  if (history_hash->IsNull()) {
378  Error("failed to upload tag database");
379  return false;
380  }
381 
382  return true;
383 }
384 
385 
386 template <class MigratorT>
388  const std::string &manifest_path,
389  typename MigratorT::worker_context *context
390 ) {
391  // Create a concurrent migration context for catalog migration
392  const unsigned int cpus = GetNumberOfCpuCores();
393  ConcurrentWorkers<MigratorT> concurrent_migration(cpus, cpus * 10, context);
394 
395  if (!concurrent_migration.Initialize()) {
396  Error("Failed to initialize worker migration system.");
397  return false;
398  }
400  this);
401 
402  // Migrate catalogs recursively (starting with the deepest nested catalogs)
403  LogCvmfs(kLogCatalog, kLogStdout, "\nMigrating catalogs...");
404  PendingCatalog *root_catalog = new PendingCatalog(root_catalog_);
406  ConvertCatalogsRecursively(root_catalog, &concurrent_migration);
407  concurrent_migration.WaitForEmptyQueue();
408  spooler_->WaitForUpload();
409  spooler_->UnregisterListeners();
411 
412  // check for possible errors during the migration process
413  const unsigned int errors = concurrent_migration.GetNumberOfFailedJobs() +
414  spooler_->GetNumberOfErrors();
416  "Catalog Migration finished with %d errors.", errors);
417  if (errors > 0) {
419  "\nCatalog Migration produced errors\nAborting...");
420  return false;
421  }
422 
423  if (root_catalog->was_updated.Get()) {
425  "\nCommitting migrated repository revision...");
427  manifest.set_catalog_hash(root_catalog->new_catalog_hash);
428  manifest.set_catalog_size(root_catalog->new_catalog_size);
429  manifest.set_root_path(root_catalog->root_path());
430  const catalog::Catalog* new_catalog = (root_catalog->HasNew())
431  ? root_catalog->new_catalog
432  : root_catalog->old_catalog;
433  manifest.set_ttl(new_catalog->GetTTL());
434  manifest.set_revision(new_catalog->GetRevision());
435 
436  // Commit the new (migrated) repository revision...
437  if (history_upstream_.IsValid()) {
438  shash::Any history_hash(manifest_upstream_->history());
440  "Updating repository tag database... ");
441  if (!UpdateUndoTags(root_catalog,
442  new_catalog->GetRevision(),
443  new_catalog->GetLastModified(),
444  &history_hash))
445  {
446  Error("Updateing tag database failed.\nAborting...");
447  return false;
448  }
449  manifest.set_history(history_hash);
450  LogCvmfs(kLogCvmfs, kLogStdout, "%s", history_hash.ToString().c_str());
451  }
452 
453  if (!manifest.Export(manifest_path)) {
454  Error("Manifest export failed.\nAborting...");
455  return false;
456  }
458  } else {
460  "\nNo catalogs migrated, skipping the commit...");
461  }
462 
463  // Get rid of the open root catalog
464  delete root_catalog;
465 
466  return true;
467 }
468 
469 
472  std::string tree_indent;
473  std::string hash_string;
474  std::string path;
475 
476  for (unsigned int i = 1; i < data.tree_level; ++i) {
477  tree_indent += "\u2502 ";
478  }
479 
480  if (data.tree_level > 0) {
481  tree_indent += "\u251C\u2500 ";
482  }
483 
484  hash_string = data.catalog_hash.ToString();
485 
486  path = data.catalog->mountpoint().ToString();
487  if (path.empty()) {
488  path = "/";
489  root_catalog_ = data.catalog;
490  }
491 
492  LogCvmfs(kLogCatalog, kLogStdout, "%s%s %s",
493  tree_indent.c_str(),
494  hash_string.c_str(),
495  path.c_str());
496 
497  ++catalog_count_;
498 }
499 
500 
502  // Check if the migration of the catalog was successful
503  if (!data->success) {
504  Error("Catalog migration failed! Aborting...");
505  exit(1);
506  return;
507  }
508 
509  if (!data->HasChanges()) {
510  PrintStatusMessage(data, data->GetOldContentHash(), "preserved");
511  data->was_updated.Set(false);
512  return;
513  }
514 
515  const string &path = (data->HasNew()) ? data->new_catalog->database_path()
516  : data->old_catalog->database_path();
517 
518  // Save the processed catalog in the pending map
519  {
521  assert(pending_catalogs_.find(path) == pending_catalogs_.end());
522  pending_catalogs_[path] = data;
523  }
525 
526  // check the size of the uncompressed catalog file
527  size_t new_catalog_size = GetFileSize(path);
528  if (new_catalog_size <= 0) {
529  Error("Failed to get uncompressed file size of catalog!", data);
530  exit(2);
531  return;
532  }
533  data->new_catalog_size = new_catalog_size;
534 
535  // Schedule the compression and upload of the catalog
536  spooler_->ProcessCatalog(path);
537 }
538 
539 
541  const string &path = result.local_path;
542 
543  // Check if the upload was successful
544  if (result.return_code != 0) {
545  Error("Failed to upload file " + path + "\nAborting...");
546  exit(2);
547  return;
548  }
549  assert(result.file_chunks.size() == 0);
550 
551  // Remove the just uploaded file
552  unlink(path.c_str());
553 
554  // Uploaded nested catalog marker... generate and cache DirectoryEntry for it
555  if (path == nested_catalog_marker_tmp_path_) {
557  return;
558  } else {
559  // Find the catalog path in the pending catalogs and remove it from the list
560  PendingCatalog *catalog;
561  {
563  PendingCatalogMap::iterator i = pending_catalogs_.find(path);
564  assert(i != pending_catalogs_.end());
565  catalog = const_cast<PendingCatalog*>(i->second);
566  pending_catalogs_.erase(i);
567  }
568 
569  PrintStatusMessage(catalog, result.content_hash, "migrated and uploaded");
570 
571  // The catalog is completely processed... fill the content_hash to allow the
572  // processing of parent catalogs (Notified by 'was_updated'-future)
573  // NOTE: From now on, this PendingCatalog structure could be deleted and
574  // should not be used anymore!
575  catalog->new_catalog_hash = result.content_hash;
576  catalog->was_updated.Set(true);
577  }
578 }
579 
580 
582  const shash::Any &content_hash,
583  const std::string &message) {
584  atomic_inc32(&catalogs_processed_);
585  const unsigned int processed = (atomic_read32(&catalogs_processed_) * 100) /
587  LogCvmfs(kLogCatalog, kLogStdout, "[%d%%] %s %sC %s",
588  processed,
589  message.c_str(),
590  content_hash.ToString().c_str(),
591  catalog->root_path().c_str());
592 }
593 
594 
595 template <class MigratorT>
597  MigratorT *migrator) {
598  // First migrate all nested catalogs (depth first traversal)
599  const catalog::CatalogList nested_catalogs =
600  catalog->old_catalog->GetChildren();
601  catalog::CatalogList::const_iterator i = nested_catalogs.begin();
602  catalog::CatalogList::const_iterator iend = nested_catalogs.end();
603  catalog->nested_catalogs.reserve(nested_catalogs.size());
604  for (; i != iend; ++i) {
605  PendingCatalog *new_nested = new PendingCatalog(*i);
606  catalog->nested_catalogs.push_back(new_nested);
607  ConvertCatalogsRecursively(new_nested, migrator);
608  }
609 
610  // Migrate this catalog referencing all its (already migrated) children
611  migrator->Schedule(catalog);
612 }
613 
614 
616  struct rlimit rpl;
617  memset(&rpl, 0, sizeof(rpl));
618  getrlimit(RLIMIT_NOFILE, &rpl);
619  if (rpl.rlim_cur < file_descriptor_limit_) {
620  if (rpl.rlim_max < file_descriptor_limit_)
621  rpl.rlim_max = file_descriptor_limit_;
622  rpl.rlim_cur = file_descriptor_limit_;
623  const bool retval = setrlimit(RLIMIT_NOFILE, &rpl);
624  if (retval != 0) {
625  return false;
626  }
627  }
628  return true;
629 }
630 
631 
633  int retval = sqlite3_config(SQLITE_CONFIG_MULTITHREAD);
634  return (retval == SQLITE_OK);
635 }
636 
637 
639  const unsigned int number_of_catalogs = catalog_statistics_list_.size();
640  unsigned int aggregated_entry_count = 0;
641  unsigned int aggregated_max_row_id = 0;
642  unsigned int aggregated_hardlink_count = 0;
643  unsigned int aggregated_linkcounts = 0;
644  double aggregated_migration_time = 0.0;
645 
646  CatalogStatisticsList::const_iterator i = catalog_statistics_list_.begin();
647  CatalogStatisticsList::const_iterator iend = catalog_statistics_list_.end();
648  for (; i != iend; ++i) {
649  aggregated_entry_count += i->entry_count;
650  aggregated_max_row_id += i->max_row_id;
651  aggregated_hardlink_count += i->hardlink_group_count;
652  aggregated_linkcounts += i->aggregated_linkcounts;
653  aggregated_migration_time += i->migration_time;
654  }
655 
656  // Inode quantization
657  assert(aggregated_max_row_id > 0);
658  const unsigned int unused_inodes =
659  aggregated_max_row_id - aggregated_entry_count;
660  const float ratio =
661  (static_cast<float>(unused_inodes) /
662  static_cast<float>(aggregated_max_row_id)) * 100.0f;
663  LogCvmfs(kLogCatalog, kLogStdout, "Actual Entries: %d\n"
664  "Allocated Inodes: %d\n"
665  " Unused Inodes: %d\n"
666  " Percentage of wasted Inodes: %.1f%%\n",
667  aggregated_entry_count, aggregated_max_row_id, unused_inodes, ratio);
668 
669  // Hardlink statistics
670  const float average_linkcount = (aggregated_hardlink_count > 0)
671  ? aggregated_linkcounts /
672  aggregated_hardlink_count
673  : 0.0f;
674  LogCvmfs(kLogCatalog, kLogStdout, "Generated Hardlink Groups: %d\n"
675  "Average Linkcount per Group: %.1f\n",
676  aggregated_hardlink_count, average_linkcount);
677 
678  // Performance measures
679  const double average_migration_time =
680  aggregated_migration_time / static_cast<double>(number_of_catalogs);
681  LogCvmfs(kLogCatalog, kLogStdout, "Catalog Loading Time: %.2fs\n"
682  "Average Migration Time: %.2fs\n"
683  "Overall Migration Time: %.2fs\n"
684  "Aggregated Migration Time: %.2fs\n",
686  average_migration_time,
688  aggregated_migration_time);
689 }
690 
691 
693  delete old_catalog;
694  old_catalog = NULL;
695 
696  if (new_catalog != NULL) {
697  delete new_catalog;
698  new_catalog = NULL;
699  }
700 }
701 
702 
703 template<class DerivedT>
705  const worker_context *context)
706  : temporary_directory_(context->temporary_directory)
707  , collect_catalog_statistics_(context->collect_catalog_statistics)
708 { }
709 
710 
711 template<class DerivedT>
713 
714 
715 template<class DerivedT>
717  const expected_data &data) {
719  const bool success = static_cast<DerivedT*>(this)->RunMigration(data) &&
720  UpdateNestedCatalogReferences(data) &&
721  UpdateCatalogMetadata(data) &&
722  CollectAndAggregateStatistics(data) &&
723  CleanupNestedCatalogs(data);
724  data->success = success;
726 
729 
730  // Note: MigrationCallback() will take care of the result...
731  if (success) {
733  } else {
735  }
736 }
737 
738 
739 template<class DerivedT>
742 {
743  const catalog::Catalog *new_catalog =
744  (data->HasNew()) ? data->new_catalog : data->old_catalog;
745  const catalog::CatalogDatabase &writable = new_catalog->database();
746 
747  catalog::SqlCatalog add_nested_catalog(writable,
748  "INSERT OR REPLACE INTO nested_catalogs (path, sha1, size) "
749  " VALUES (:path, :sha1, :size);");
750 
751  // go through all nested catalogs and update their references (we are curently
752  // in their parent catalog)
753  // Note: we might need to wait for the nested catalog to be fully processed.
754  PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
755  PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
756  for (; i != iend; ++i) {
757  PendingCatalog *nested_catalog = *i;
758 
759  if (!nested_catalog->was_updated.Get()) {
760  continue;
761  }
762 
763  const std::string &root_path = nested_catalog->root_path();
764  const shash::Any catalog_hash = nested_catalog->new_catalog_hash;
765  const size_t catalog_size = nested_catalog->new_catalog_size;
766 
767  // insert the updated nested catalog reference into the new catalog
768  const bool retval =
769  add_nested_catalog.BindText(1, root_path) &&
770  add_nested_catalog.BindText(2, catalog_hash.ToString()) &&
771  add_nested_catalog.BindInt64(3, catalog_size) &&
772  add_nested_catalog.Execute();
773  if (!retval) {
774  Error("Failed to add nested catalog link", add_nested_catalog, data);
775  return false;
776  }
777  add_nested_catalog.Reset();
778  }
779 
780  return true;
781 }
782 
783 
784 template<class DerivedT>
787 {
788  if (!data->HasChanges()) {
789  return true;
790  }
791 
792  catalog::WritableCatalog *catalog =
793  (data->HasNew()) ? data->new_catalog : GetWritable(data->old_catalog);
794 
795  // Set the previous revision hash in the new catalog to the old catalog
796  // we are doing the whole migration as a new snapshot that does not change
797  // any files, but just applies the necessary data schema migrations
798  catalog->SetPreviousRevision(data->old_catalog->hash());
799  catalog->IncrementRevision();
800  catalog->UpdateLastModified();
801 
802  return true;
803 }
804 
805 
806 template<class DerivedT>
809 {
810  if (!collect_catalog_statistics_) {
811  return true;
812  }
813 
814  const catalog::Catalog *new_catalog =
815  (data->HasNew()) ? data->new_catalog : data->old_catalog;
816  const catalog::CatalogDatabase &writable = new_catalog->database();
817  bool retval;
818 
819  // Find out the discrepancy between MAX(rowid) and COUNT(*)
820  catalog::SqlCatalog wasted_inodes(writable,
821  "SELECT COUNT(*), MAX(rowid) FROM catalog;");
822  retval = wasted_inodes.FetchRow();
823  if (!retval) {
824  Error("Failed to count entries in catalog", wasted_inodes, data);
825  return false;
826  }
827  const unsigned int entry_count = wasted_inodes.RetrieveInt64(0);
828  const unsigned int max_row_id = wasted_inodes.RetrieveInt64(1);
829 
830  // Save collected information into the central statistics aggregator
831  data->statistics.root_path = data->root_path();
832  data->statistics.max_row_id = max_row_id;
833  data->statistics.entry_count = entry_count;
834 
835  return true;
836 }
837 
838 
839 template<class DerivedT>
841  PendingCatalog *data) const
842 {
843  // All nested catalogs of PendingCatalog 'data' are fully processed and
844  // accounted. It is safe to get rid of their data structures here!
845  PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
846  PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
847  for (; i != iend; ++i) {
848  delete *i;
849  }
850 
851  data->nested_catalogs.clear();
852  return true;
853 }
854 
855 
864 
865 
866 template<class DerivedT>
869  const catalog::Catalog *catalog) const {
870  return dynamic_cast<catalog::WritableCatalog*>(const_cast<catalog::Catalog*>(
871  catalog));
872 }
873 
874 
875 //------------------------------------------------------------------------------
876 
877 
879  const worker_context *context)
881  , fix_nested_catalog_transitions_(context->fix_nested_catalog_transitions)
882  , analyze_file_linkcounts_(context->analyze_file_linkcounts)
883  , uid_(context->uid)
884  , gid_(context->gid) { }
885 
886 
888  const
889 {
890  // double-check that we are generating compatible catalogs to the actual
891  // catalog management classes
894 
895  return CreateNewEmptyCatalog(data) &&
896  CheckDatabaseSchemaCompatibility(data) &&
897  AttachOldCatalogDatabase(data) &&
898  StartDatabaseTransaction(data) &&
899  MigrateFileMetadata(data) &&
900  MigrateNestedCatalogMountPoints(data) &&
901  FixNestedCatalogTransitionPoints(data) &&
902  RemoveDanglingNestedMountpoints(data) &&
903  GenerateCatalogStatistics(data) &&
904  FindRootEntryInformation(data) &&
905  CommitDatabaseTransaction(data) &&
906  DetachOldCatalogDatabase(data);
907 }
908 
910  PendingCatalog *data) const
911 {
912  const string root_path = data->root_path();
913 
914  // create a new catalog database schema
915  const string clg_db_path =
916  CreateTempPath(temporary_directory_ + "/catalog", 0666);
917  if (clg_db_path.empty()) {
918  Error("Failed to create temporary file for the new catalog database.");
919  return false;
920  }
921  const bool volatile_content = false;
922 
923  {
924  // TODO(rmeusel): Attach catalog should work with an open catalog database
925  // as well, to remove this inefficiency
927  new_clg_db(catalog::CatalogDatabase::Create(clg_db_path));
928  if (!new_clg_db.IsValid() ||
929  !new_clg_db->InsertInitialValues(root_path, volatile_content, "")) {
930  Error("Failed to create database for new catalog");
931  unlink(clg_db_path.c_str());
932  return false;
933  }
934  }
935 
936  // Attach the just created nested catalog database
937  catalog::WritableCatalog *writable_catalog =
938  catalog::WritableCatalog::AttachFreely(root_path, clg_db_path,
940  if (writable_catalog == NULL) {
941  Error("Failed to open database for new catalog");
942  unlink(clg_db_path.c_str());
943  return false;
944  }
945 
946  data->new_catalog = writable_catalog;
947  return true;
948 }
949 
950 
952  PendingCatalog *data) const
953 {
954  const catalog::CatalogDatabase &old_catalog = data->old_catalog->database();
955  const catalog::CatalogDatabase &new_catalog = data->new_catalog->database();
956 
957  if ((new_catalog.schema_version() <
960  ||
961  new_catalog.schema_version() >
964  ||
965  (old_catalog.schema_version() > 2.1 +
967  {
968  Error("Failed to meet database requirements for migration.", data);
969  return false;
970  }
971  return true;
972 }
973 
974 
976  PendingCatalog *data) const
977 {
978  const catalog::CatalogDatabase &old_catalog = data->old_catalog->database();
979  const catalog::CatalogDatabase &new_catalog = data->new_catalog->database();
980 
981  catalog::SqlCatalog sql_attach_new(new_catalog,
982  "ATTACH '" + old_catalog.filename() + "' AS old;");
983  bool retval = sql_attach_new.Execute();
984 
985  // remove the hardlink to the old database file (temporary file), it will not
986  // be needed anymore... data will get deleted when the database is closed
987  unlink(data->old_catalog->database().filename().c_str());
988 
989  if (!retval) {
990  Error("Failed to attach database of old catalog", sql_attach_new, data);
991  return false;
992  }
993  return true;
994 }
995 
996 
998  PendingCatalog *data) const
999 {
1000  assert(data->HasNew());
1001  data->new_catalog->Transaction();
1002  return true;
1003 }
1004 
1005 
1007  PendingCatalog *data) const
1008 {
1009  assert(!data->new_catalog->IsDirty());
1010  assert(data->HasNew());
1011  bool retval;
1012  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1013 
1014  // Hardlinks scratch space.
1015  // This temporary table is used for the hardlink analysis results.
1016  // The old catalog format did not have a direct notion of hardlinks and their
1017  // linkcounts, but this information can be partly retrieved from the under-
1018  // lying file system semantics.
1019  //
1020  // Hardlinks:
1021  // groupid : this group id can be used for the new catalog schema
1022  // inode : the inodes that were part of a hardlink group before
1023  // linkcount : the linkcount for hardlink group id members
1024  catalog::SqlCatalog sql_create_hardlinks_table(writable,
1025  "CREATE TEMPORARY TABLE hardlinks "
1026  " ( hardlink_group_id INTEGER PRIMARY KEY AUTOINCREMENT, "
1027  " inode INTEGER, "
1028  " linkcount INTEGER, "
1029  " CONSTRAINT unique_inode UNIQUE (inode) );");
1030  retval = sql_create_hardlinks_table.Execute();
1031  if (!retval) {
1032  Error("Failed to create temporary hardlink analysis table",
1033  sql_create_hardlinks_table, data);
1034  return false;
1035  }
1036 
1037  // Directory Linkcount scratch space.
1038  // Directory linkcounts can be obtained from the directory hierarchy reflected
1039  // in the old style catalogs. The new catalog schema asks for this specific
1040  // linkcount. Directory linkcount analysis results will be put into this
1041  // temporary table
1042  catalog::SqlCatalog sql_create_linkcounts_table(writable,
1043  "CREATE TEMPORARY TABLE dir_linkcounts "
1044  " ( inode INTEGER PRIMARY KEY, "
1045  " linkcount INTEGER );");
1046  retval = sql_create_linkcounts_table.Execute();
1047  if (!retval) {
1048  Error("Failed to create tmeporary directory linkcount analysis table",
1049  sql_create_linkcounts_table, data);
1050  }
1051 
1052  // It is possible to skip this step.
1053  // In that case all hardlink inodes with a (potential) linkcount > 1 will get
1054  // degraded to files containing the same content
1055  if (analyze_file_linkcounts_) {
1056  retval = AnalyzeFileLinkcounts(data);
1057  if (!retval) {
1058  return false;
1059  }
1060  }
1061 
1062  // Analyze the linkcounts of directories
1063  // - each directory has a linkcount of at least 2 (empty directory)
1064  // (link in parent directory and self reference (cd .) )
1065  // - for each child directory, the parent's link count is incremented by 1
1066  // (parent reference in child (cd ..) )
1067  //
1068  // Note: nested catalog mountpoints will be miscalculated here, since we can't
1069  // check the number of containing directories. They are defined in a the
1070  // linked nested catalog and need to be added later on.
1071  // (see: MigrateNestedCatalogMountPoints() for details)
1072  catalog::SqlCatalog sql_dir_linkcounts(writable,
1073  "INSERT INTO dir_linkcounts "
1074  " SELECT c1.inode as inode, "
1075  " SUM(IFNULL(MIN(c2.inode,1),0)) + 2 as linkcount "
1076  " FROM old.catalog as c1 "
1077  " LEFT JOIN old.catalog as c2 "
1078  " ON c2.parent_1 = c1.md5path_1 AND "
1079  " c2.parent_2 = c1.md5path_2 AND "
1080  " c2.flags & :flag_dir_1 "
1081  " WHERE c1.flags & :flag_dir_2 "
1082  " GROUP BY c1.inode;");
1083  retval =
1084  sql_dir_linkcounts.BindInt64(1, catalog::SqlDirent::kFlagDir) &&
1085  sql_dir_linkcounts.BindInt64(2, catalog::SqlDirent::kFlagDir) &&
1086  sql_dir_linkcounts.Execute();
1087  if (!retval) {
1088  Error("Failed to analyze directory specific linkcounts",
1089  sql_dir_linkcounts, data);
1090  if (sql_dir_linkcounts.GetLastError() == SQLITE_CONSTRAINT) {
1091  Error("Obviously your catalogs are corrupted, since we found a directory"
1092  "inode that is a file inode at the same time!");
1093  }
1094  return false;
1095  }
1096 
1097  // Copy the old file meta information into the new catalog schema
1098  // here we also add the previously analyzed hardlink/linkcount information
1099  // from both temporary tables "hardlinks" and "dir_linkcounts".
1100  //
1101  // Note: nested catalog mountpoints still need to be treated separately
1102  // (see MigrateNestedCatalogMountPoints() for details)
1103  catalog::SqlCatalog migrate_file_meta_data(writable,
1104  "INSERT INTO catalog "
1105  " SELECT md5path_1, md5path_2, "
1106  " parent_1, parent_2, "
1107  " IFNULL(hardlink_group_id, 0) << 32 | "
1108  " COALESCE(hardlinks.linkcount, dir_linkcounts.linkcount, 1) "
1109  " AS hardlinks, "
1110  " hash, size, mode, mtime, "
1111  " flags, name, symlink, "
1112  " :uid, "
1113  " :gid, "
1114  " NULL " // set empty xattr BLOB (default)
1115  " FROM old.catalog "
1116  " LEFT JOIN hardlinks "
1117  " ON catalog.inode = hardlinks.inode "
1118  " LEFT JOIN dir_linkcounts "
1119  " ON catalog.inode = dir_linkcounts.inode;");
1120  retval = migrate_file_meta_data.BindInt64(1, uid_) &&
1121  migrate_file_meta_data.BindInt64(2, gid_) &&
1122  migrate_file_meta_data.Execute();
1123  if (!retval) {
1124  Error("Failed to migrate the file system meta data",
1125  migrate_file_meta_data, data);
1126  return false;
1127  }
1128 
1129  // If we deal with a nested catalog, we need to add a .cvmfscatalog entry
1130  // since it was not present in the old repository specification but is needed
1131  // now!
1132  if (!data->IsRoot()) {
1133  const catalog::DirectoryEntry &nested_marker =
1135  catalog::SqlDirentInsert insert_nested_marker(writable);
1136  const std::string root_path = data->root_path();
1137  const std::string file_path = root_path +
1138  "/" + nested_marker.name().ToString();
1139  const shash::Md5 &path_hash = shash::Md5(file_path.data(),
1140  file_path.size());
1141  const shash::Md5 &parent_hash = shash::Md5(root_path.data(),
1142  root_path.size());
1143  retval = insert_nested_marker.BindPathHash(path_hash) &&
1144  insert_nested_marker.BindParentPathHash(parent_hash) &&
1145  insert_nested_marker.BindDirent(nested_marker) &&
1146  insert_nested_marker.BindXattrEmpty() &&
1147  insert_nested_marker.Execute();
1148  if (!retval) {
1149  Error("Failed to insert nested catalog marker into new nested catalog.",
1150  insert_nested_marker, data);
1151  return false;
1152  }
1153  }
1154 
1155  // Copy (and update) the properties fields
1156  //
1157  // Note: The 'schema' is explicitly not copied to the new catalog.
1158  // Each catalog contains a revision, which is also copied here and that
1159  // is later updated by calling catalog->IncrementRevision()
1160  catalog::SqlCatalog copy_properties(writable,
1161  "INSERT OR REPLACE INTO properties "
1162  " SELECT key, value "
1163  " FROM old.properties "
1164  " WHERE key != 'schema';");
1165  retval = copy_properties.Execute();
1166  if (!retval) {
1167  Error("Failed to migrate the properties table.", copy_properties, data);
1168  return false;
1169  }
1170 
1171  return true;
1172 }
1173 
1174 
1176  PendingCatalog *data) const
1177 {
1178  assert(data->HasNew());
1179  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1180  bool retval;
1181 
1182  // Analyze the hardlink relationships in the old catalog
1183  // inodes used to be assigned at publishing time, implicitly constituating
1184  // those relationships. We now need them explicitly in the file catalogs
1185  // This looks for directory entries with matching inodes but differing path-
1186  // hashes and saves the results in a temporary table called 'hl_scratch'
1187  //
1188  // Note: We only support hardlink groups that reside in the same directory!
1189  // Therefore we first need to figure out hardlink candidates (which
1190  // might still contain hardlink groups spanning more than one directory)
1191  // In a second step these candidates will be analyzed to kick out un-
1192  // supported hardlink groups.
1193  // Unsupported hardlink groups will be be treated as normal files with
1194  // the same content
1195  catalog::SqlCatalog sql_create_hardlinks_scratch_table(writable,
1196  "CREATE TEMPORARY TABLE hl_scratch AS "
1197  " SELECT c1.inode AS inode, c1.md5path_1, c1.md5path_2, "
1198  " c1.parent_1 as c1p1, c1.parent_2 as c1p2, "
1199  " c2.parent_1 as c2p1, c2.parent_2 as c2p2 "
1200  " FROM old.catalog AS c1 "
1201  " INNER JOIN old.catalog AS c2 "
1202  " ON c1.inode == c2.inode AND "
1203  " (c1.md5path_1 != c2.md5path_1 OR "
1204  " c1.md5path_2 != c2.md5path_2);");
1205  retval = sql_create_hardlinks_scratch_table.Execute();
1206  if (!retval) {
1207  Error("Failed to create temporary scratch table for hardlink analysis",
1208  sql_create_hardlinks_scratch_table, data);
1209  return false;
1210  }
1211 
1212  // Figures out which hardlink candidates are supported by CVMFS and can be
1213  // transferred into the new catalog as so called hardlink groups. Unsupported
1214  // hardlinks need to be discarded and treated as normal files containing the
1215  // exact same data
1216  catalog::SqlCatalog fill_linkcount_table_for_files(writable,
1217  "INSERT INTO hardlinks (inode, linkcount)"
1218  " SELECT inode, count(*) as linkcount "
1219  " FROM ( "
1220  // recombine supported hardlink inodes with their actual manifested
1221  // hard-links in the catalog.
1222  // Note: for each directory entry pointing to the same supported
1223  // hardlink inode we have a distinct MD5 path hash
1224  " SELECT DISTINCT hl.inode, hl.md5path_1, hl.md5path_2 "
1225  " FROM ( "
1226  // sort out supported hardlink inodes from unsupported ones by
1227  // locality
1228  // Note: see the next comment for the nested SELECT
1229  " SELECT inode "
1230  " FROM ( "
1231  " SELECT inode, count(*) AS cnt "
1232  " FROM ( "
1233  // go through the potential hardlinks and collect location infor-
1234  // mation about them.
1235  // Note: we only support hardlinks that all reside in the same
1236  // directory, thus having the same parent (c1p* == c2p*)
1237  // --> For supported hardlink candidates the SELECT DISTINCT
1238  // will produce only a single row, whereas others produce more
1239  " SELECT DISTINCT inode,c1p1,c1p1,c2p1,c2p2 "
1240  " FROM hl_scratch AS hl "
1241  " ) "
1242  " GROUP BY inode "
1243  " ) "
1244  " WHERE cnt = 1 "
1245  " ) AS supported_hardlinks "
1246  " LEFT JOIN hl_scratch AS hl "
1247  " ON supported_hardlinks.inode = hl.inode "
1248  " ) "
1249  " GROUP BY inode;");
1250  retval = fill_linkcount_table_for_files.Execute();
1251  if (!retval) {
1252  Error("Failed to analyze hardlink relationships for files.",
1253  fill_linkcount_table_for_files, data);
1254  return false;
1255  }
1256 
1257  // The file linkcount and hardlink analysis is finished and the scratch table
1258  // can be deleted...
1259  catalog::SqlCatalog drop_hardlink_scratch_space(writable,
1260  "DROP TABLE hl_scratch;");
1261  retval = drop_hardlink_scratch_space.Execute();
1262  if (!retval) {
1263  Error("Failed to remove file linkcount analysis scratch table",
1264  drop_hardlink_scratch_space, data);
1265  return false;
1266  }
1267 
1268  // Do some statistics if asked for...
1269  if (collect_catalog_statistics_) {
1270  catalog::SqlCatalog count_hardlinks(writable,
1271  "SELECT count(*), sum(linkcount) FROM hardlinks;");
1272  retval = count_hardlinks.FetchRow();
1273  if (!retval) {
1274  Error("Failed to count the generated file hardlinks for statistics",
1275  count_hardlinks, data);
1276  return false;
1277  }
1278 
1279  data->statistics.hardlink_group_count += count_hardlinks.RetrieveInt64(0);
1280  data->statistics.aggregated_linkcounts += count_hardlinks.RetrieveInt64(1);
1281  }
1282 
1283  return true;
1284 }
1285 
1286 
1288  PendingCatalog *data) const
1289 {
1290  assert(data->HasNew());
1291  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1292  bool retval;
1293 
1294  // preparing the SQL statement for nested catalog mountpoint update
1295  catalog::SqlCatalog update_mntpnt_linkcount(writable,
1296  "UPDATE catalog "
1297  "SET hardlinks = :linkcount "
1298  "WHERE md5path_1 = :md5_1 AND md5path_2 = :md5_2;");
1299 
1300  // update all nested catalog mountpoints
1301  // (Note: we might need to wait for the nested catalog to be processed)
1302  PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
1303  PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
1304  for (; i != iend; ++i) {
1305  // collect information about the nested catalog
1306  PendingCatalog *nested_catalog = *i;
1307  const catalog::DirectoryEntry root_entry = nested_catalog->root_entry.Get();
1308  const string &root_path = nested_catalog->root_path();
1309 
1310  // update the nested catalog mountpoint directory entry with the correct
1311  // linkcount that was determined while processing the nested catalog
1312  const shash::Md5 mountpoint_hash = shash::Md5(root_path.data(),
1313  root_path.size());
1314  retval =
1315  update_mntpnt_linkcount.BindInt64(1, root_entry.linkcount()) &&
1316  update_mntpnt_linkcount.BindMd5(2, 3, mountpoint_hash) &&
1317  update_mntpnt_linkcount.Execute();
1318  if (!retval) {
1319  Error("Failed to update linkcount of nested catalog mountpoint",
1320  update_mntpnt_linkcount, data);
1321  return false;
1322  }
1323  update_mntpnt_linkcount.Reset();
1324  }
1325 
1326  return true;
1327 }
1328 
1329 
1331  PendingCatalog *data) const
1332 {
1333  assert(data->HasNew());
1334  if (!fix_nested_catalog_transitions_) {
1335  // Fixing transition point mismatches is not enabled...
1336  return true;
1337  }
1338 
1339  typedef catalog::DirectoryEntry::Difference Difference;
1340 
1341  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1342  bool retval;
1343 
1344  catalog::SqlLookupPathHash lookup_mountpoint(writable);
1345  catalog::SqlDirentUpdate update_directory_entry(writable);
1346 
1347  // Unbox the nested catalogs (possibly waiting for migration of them first)
1348  PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
1349  PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
1350  for (; i != iend; ++i) {
1351  // Collect information about the nested catalog
1352  PendingCatalog *nested_catalog = *i;
1353  const catalog::DirectoryEntry nested_root_entry =
1354  nested_catalog->root_entry.Get();
1355  const string &nested_root_path = nested_catalog->root_path();
1356  const shash::Md5 mountpoint_path_hash =
1357  shash::Md5(nested_root_path.data(), nested_root_path.size());
1358 
1359  // Retrieve the nested catalog mountpoint from the current catalog
1360  retval = lookup_mountpoint.BindPathHash(mountpoint_path_hash) &&
1361  lookup_mountpoint.FetchRow();
1362  if (!retval) {
1363  Error("Failed to fetch nested catalog mountpoint to check for compatible"
1364  "transition points", lookup_mountpoint, data);
1365  return false;
1366  }
1367 
1368  catalog::DirectoryEntry mountpoint_entry =
1369  lookup_mountpoint.GetDirent(data->new_catalog);
1370  lookup_mountpoint.Reset();
1371 
1372  // Compare nested catalog mountpoint and nested catalog root entries
1374  mountpoint_entry.CompareTo(nested_root_entry);
1375 
1376  // We MUST deal with two directory entries that are a pair of nested cata-
1377  // log mountpoint and root entry! Thus we expect their transition flags to
1378  // differ and their name to be the same.
1379  assert(diffs & Difference::kNestedCatalogTransitionFlags);
1380  assert((diffs & Difference::kName) == 0);
1381 
1382  // Check if there are other differences except the nested catalog transition
1383  // flags and fix them...
1384  if ((diffs ^ Difference::kNestedCatalogTransitionFlags) != 0) {
1385  // If we found differences, we still assume a couple of directory entry
1386  // fields to be the same, otherwise some severe stuff would be wrong...
1387  if ((diffs & Difference::kChecksum) ||
1388  (diffs & Difference::kLinkcount) ||
1389  (diffs & Difference::kSymlink) ||
1390  (diffs & Difference::kChunkedFileFlag) )
1391  {
1392  Error("Found an irreparable mismatch in a nested catalog transition "
1393  "point at '" + nested_root_path + "'\nAborting...\n");
1394  }
1395 
1396  // Copy the properties from the nested catalog root entry into the mount-
1397  // point entry to bring them in sync again
1399  nested_root_entry, &mountpoint_entry);
1400 
1401  // save the nested catalog mountpoint entry into the catalog
1402  retval = update_directory_entry.BindPathHash(mountpoint_path_hash) &&
1403  update_directory_entry.BindDirent(mountpoint_entry) &&
1404  update_directory_entry.Execute();
1405  if (!retval) {
1406  Error("Failed to save resynchronized nested catalog mountpoint into "
1407  "catalog database", update_directory_entry, data);
1408  return false;
1409  }
1410  update_directory_entry.Reset();
1411 
1412  // Fixing of this mountpoint went well... inform the user that this minor
1413  // issue occured
1415  "NOTE: fixed incompatible nested catalog transition point at: "
1416  "'%s' ", nested_root_path.c_str());
1417  }
1418  }
1419 
1420  return true;
1421 }
1422 
1423 
1425  const catalog::DirectoryEntry &nested_root,
1426  catalog::DirectoryEntry *mountpoint
1427 ) {
1428  // Replace some file system parameters in the mountpoint to resync it with
1429  // the nested root of the corresponding nested catalog
1430  //
1431  // Note: this method relies on CommandMigrate being a friend of DirectoryEntry
1432  mountpoint->mode_ = nested_root.mode_;
1433  mountpoint->uid_ = nested_root.uid_;
1434  mountpoint->gid_ = nested_root.gid_;
1435  mountpoint->size_ = nested_root.size_;
1436  mountpoint->mtime_ = nested_root.mtime_;
1437 }
1438 
1439 
1441  PendingCatalog *data) const
1442 {
1443  assert(data->HasNew());
1444  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1445  bool retval = false;
1446 
1447  // build a set of registered nested catalog path hashes
1448  typedef catalog::Catalog::NestedCatalogList NestedCatalogList;
1449  typedef std::map<shash::Md5, catalog::Catalog::NestedCatalog>
1450  NestedCatalogMap;
1451  const NestedCatalogList& nested_clgs =
1453  NestedCatalogList::const_iterator i = nested_clgs.begin();
1454  const NestedCatalogList::const_iterator iend = nested_clgs.end();
1455  NestedCatalogMap nested_catalog_path_hashes;
1456  for (; i != iend; ++i) {
1457  const PathString &path = i->mountpoint;
1458  const shash::Md5 hash(path.GetChars(), path.GetLength());
1459  nested_catalog_path_hashes[hash] = *i;
1460  }
1461 
1462  // Retrieve nested catalog mountpoints that have child entries directly inside
1463  // the current catalog (which is a malformed state)
1464  catalog::SqlLookupDanglingMountpoints sql_dangling_mountpoints(writable);
1465  catalog::SqlDirentUpdate save_updated_mountpoint(writable);
1466 
1467  std::vector<catalog::DirectoryEntry> todo_dirent;
1468  std::vector<shash::Md5> todo_hash;
1469 
1470  // go through the list of dangling nested catalog mountpoints and fix them
1471  // where needed (check if there is no nested catalog registered for them)
1472  while (sql_dangling_mountpoints.FetchRow()) {
1473  catalog::DirectoryEntry dangling_mountpoint =
1474  sql_dangling_mountpoints.GetDirent(data->new_catalog);
1475  const shash::Md5 path_hash = sql_dangling_mountpoints.GetPathHash();
1476  assert(dangling_mountpoint.IsNestedCatalogMountpoint());
1477 
1478  // check if the nested catalog mountpoint is registered in the nested cata-
1479  // log list of the currently migrated catalog
1480  const NestedCatalogMap::const_iterator nested_catalog =
1481  nested_catalog_path_hashes.find(path_hash);
1482  if (nested_catalog != nested_catalog_path_hashes.end()) {
1484  "WARNING: found a non-empty nested catalog mountpoint under "
1485  "'%s'", nested_catalog->second.mountpoint.c_str());
1486  continue;
1487  }
1488 
1489  // the mountpoint was confirmed to be dangling and needs to be removed
1490  dangling_mountpoint.set_is_nested_catalog_mountpoint(false);
1491  todo_dirent.push_back(dangling_mountpoint);
1492  todo_hash.push_back(path_hash);
1493  }
1494 
1495  for (unsigned i = 0; i < todo_dirent.size(); ++i) {
1496  retval = save_updated_mountpoint.BindPathHash(todo_hash[i]) &&
1497  save_updated_mountpoint.BindDirent(todo_dirent[i]) &&
1498  save_updated_mountpoint.Execute() &&
1499  save_updated_mountpoint.Reset();
1500  if (!retval) {
1501  Error("Failed to remove dangling nested catalog mountpoint entry in "
1502  "catalog", save_updated_mountpoint, data);
1503  return false;
1504  }
1505 
1506  // tell the user that this intervention has been taken place
1507  LogCvmfs(kLogCatalog, kLogStdout, "NOTE: fixed dangling nested catalog "
1508  "mountpoint entry called: '%s' ",
1509  todo_dirent[i].name().c_str());
1510  }
1511 
1512  return true;
1513 }
1514 
1515 
1517  // This is pre-initialized singleton... it MUST be already there...
1518  assert(nested_catalog_marker_.name_.ToString() == ".cvmfscatalog");
1519  return nested_catalog_marker_;
1520 }
1521 
1523  // Create an empty nested catalog marker file
1525  CreateTempPath(temporary_directory_ + "/.cvmfscatalog", 0644);
1526  if (nested_catalog_marker_tmp_path_.empty()) {
1527  Error("Failed to create temp file for nested catalog marker dummy.");
1528  return false;
1529  }
1530 
1531  // Process and upload it to the backend storage
1532  IngestionSource *source =
1534  spooler_->Process(source);
1535  return true;
1536 }
1537 
1539  const shash::Any &content_hash)
1540 {
1541  // Generate it only once
1542  assert(nested_catalog_marker_.name_.ToString() != ".cvmfscatalog");
1543 
1544  // Fill the DirectoryEntry structure will all needed information
1545  nested_catalog_marker_.name_.Assign(".cvmfscatalog", strlen(".cvmfscatalog"));
1546  nested_catalog_marker_.mode_ = 33188;
1550  nested_catalog_marker_.mtime_ = time(NULL);
1552  nested_catalog_marker_.checksum_ = content_hash;
1553 }
1554 
1555 
1557  PendingCatalog *data) const
1558 {
1559  assert(data->HasNew());
1560  bool retval = false;
1561  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1562 
1563  // Aggregated the statistics counters of all nested catalogs
1564  // Note: we might need to wait until nested catalogs are sucessfully processed
1565  catalog::DeltaCounters stats_counters;
1566  PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
1567  PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
1568  for (; i != iend; ++i) {
1569  const PendingCatalog *nested_catalog = *i;
1570  const catalog::DeltaCounters &s = nested_catalog->nested_statistics.Get();
1571  s.PopulateToParent(&stats_counters);
1572  }
1573 
1574  // Count various directory entry types in the catalog to fill up the catalog
1575  // statistics counters introduced in the current catalog schema
1576  catalog::SqlCatalog count_regular_files(writable,
1577  "SELECT count(*) FROM catalog "
1578  " WHERE flags & :flag_file "
1579  " AND NOT flags & :flag_link;");
1580  catalog::SqlCatalog count_symlinks(writable,
1581  "SELECT count(*) FROM catalog WHERE flags & :flag_link;");
1582  catalog::SqlCatalog count_directories(writable,
1583  "SELECT count(*) FROM catalog WHERE flags & :flag_dir;");
1584  catalog::SqlCatalog aggregate_file_size(writable,
1585  "SELECT sum(size) FROM catalog WHERE flags & :flag_file "
1586  " AND NOT flags & :flag_link");
1587 
1588  // Run the actual counting queries
1589  retval =
1590  count_regular_files.BindInt64(1, catalog::SqlDirent::kFlagFile) &&
1591  count_regular_files.BindInt64(2, catalog::SqlDirent::kFlagLink) &&
1592  count_regular_files.FetchRow();
1593  if (!retval) {
1594  Error("Failed to count regular files.", count_regular_files, data);
1595  return false;
1596  }
1597  retval =
1598  count_symlinks.BindInt64(1, catalog::SqlDirent::kFlagLink) &&
1599  count_symlinks.FetchRow();
1600  if (!retval) {
1601  Error("Failed to count symlinks.", count_symlinks, data);
1602  return false;
1603  }
1604  retval =
1605  count_directories.BindInt64(1, catalog::SqlDirent::kFlagDir) &&
1606  count_directories.FetchRow();
1607  if (!retval) {
1608  Error("Failed to count directories.", count_directories, data);
1609  return false;
1610  }
1611  retval =
1612  aggregate_file_size.BindInt64(1, catalog::SqlDirent::kFlagFile) &&
1613  aggregate_file_size.BindInt64(2, catalog::SqlDirent::kFlagLink) &&
1614  aggregate_file_size.FetchRow();
1615  if (!retval) {
1616  Error("Failed to aggregate the file sizes.", aggregate_file_size, data);
1617  return false;
1618  }
1619 
1620  // Insert the counted statistics into the DeltaCounters data structure
1621  stats_counters.self.regular_files = count_regular_files.RetrieveInt64(0);
1622  stats_counters.self.symlinks = count_symlinks.RetrieveInt64(0);
1623  stats_counters.self.directories = count_directories.RetrieveInt64(0);
1624  stats_counters.self.nested_catalogs = data->nested_catalogs.size();
1625  stats_counters.self.file_size = aggregate_file_size.RetrieveInt64(0);
1626 
1627  // Write back the generated statistics counters into the catalog database
1628  stats_counters.WriteToDatabase(writable);
1629 
1630  // Push the generated statistics counters up to the parent catalog
1631  data->nested_statistics.Set(stats_counters);
1632 
1633  return true;
1634 }
1635 
1636 
1638  PendingCatalog *data) const
1639 {
1640  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1641  bool retval;
1642 
1643  std::string root_path = data->root_path();
1644  shash::Md5 root_path_hash = shash::Md5(root_path.data(), root_path.size());
1645 
1646  catalog::SqlLookupPathHash lookup_root_entry(writable);
1647  retval = lookup_root_entry.BindPathHash(root_path_hash) &&
1648  lookup_root_entry.FetchRow();
1649  if (!retval) {
1650  Error("Failed to retrieve root directory entry of migrated catalog",
1651  lookup_root_entry, data);
1652  return false;
1653  }
1654 
1655  catalog::DirectoryEntry entry =
1656  lookup_root_entry.GetDirent(data->new_catalog);
1657  if (entry.linkcount() < 2 || entry.hardlink_group() > 0) {
1658  Error("Retrieved linkcount of catalog root entry is not sane.", data);
1659  return false;
1660  }
1661 
1662  data->root_entry.Set(entry);
1663  return true;
1664 }
1665 
1666 
1668  PendingCatalog *data) const
1669 {
1670  assert(data->HasNew());
1671  data->new_catalog->Commit();
1672  return true;
1673 }
1674 
1675 
1677  PendingCatalog *data) const
1678 {
1679  assert(data->HasNew());
1680  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1681  catalog::SqlCatalog detach_old_catalog(writable, "DETACH old;");
1682  const bool retval = detach_old_catalog.Execute();
1683  if (!retval) {
1684  Error("Failed to detach old catalog database.", detach_old_catalog, data);
1685  return false;
1686  }
1687  return true;
1688 }
1689 
1690 
1691 //------------------------------------------------------------------------------
1692 
1693 
1695  const worker_context *context)
1697 { }
1698 
1699 
1701  const
1702 {
1703  return CheckDatabaseSchemaCompatibility(data) &&
1704  StartDatabaseTransaction(data) &&
1705  GenerateNewStatisticsCounters(data) &&
1706  UpdateCatalogSchema(data) &&
1707  CommitDatabaseTransaction(data);
1708 }
1709 
1710 
1712  PendingCatalog *data) const
1713 {
1714  assert(!data->HasNew());
1715  const catalog::CatalogDatabase &old_catalog = data->old_catalog->database();
1716 
1717  if ((old_catalog.schema_version() < 2.4 -
1719  ||
1720  (old_catalog.schema_version() > 2.4 +
1722  {
1723  Error("Given Catalog is not Schema 2.4.", data);
1724  return false;
1725  }
1726 
1727  return true;
1728 }
1729 
1730 
1732  PendingCatalog *data) const
1733 {
1734  assert(!data->HasNew());
1735  GetWritable(data->old_catalog)->Transaction();
1736  return true;
1737 }
1738 
1739 
1741  (PendingCatalog *data) const {
1742  assert(!data->HasNew());
1743  bool retval = false;
1744  const catalog::CatalogDatabase &writable =
1745  GetWritable(data->old_catalog)->database();
1746 
1747  // Aggregated the statistics counters of all nested catalogs
1748  // Note: we might need to wait until nested catalogs are sucessfully processed
1749  catalog::DeltaCounters stats_counters;
1750  PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
1751  PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
1752  for (; i != iend; ++i) {
1753  const PendingCatalog *nested_catalog = *i;
1754  const catalog::DeltaCounters &s = nested_catalog->nested_statistics.Get();
1755  s.PopulateToParent(&stats_counters);
1756  }
1757 
1758  // Count various directory entry types in the catalog to fill up the catalog
1759  // statistics counters introduced in the current catalog schema
1760  catalog::SqlCatalog count_chunked_files(writable,
1761  "SELECT count(*), sum(size) FROM catalog "
1762  " WHERE flags & :flag_chunked_file;");
1763  catalog::SqlCatalog count_file_chunks(writable,
1764  "SELECT count(*) FROM chunks;");
1765  catalog::SqlCatalog aggregate_file_size(writable,
1766  "SELECT sum(size) FROM catalog WHERE flags & :flag_file "
1767  " AND NOT flags & :flag_link;");
1768 
1769  // Run the actual counting queries
1770  retval =
1771  count_chunked_files.BindInt64(1, catalog::SqlDirent::kFlagFileChunk) &&
1772  count_chunked_files.FetchRow();
1773  if (!retval) {
1774  Error("Failed to count chunked files.", count_chunked_files, data);
1775  return false;
1776  }
1777  retval = count_file_chunks.FetchRow();
1778  if (!retval) {
1779  Error("Failed to count file chunks", count_file_chunks, data);
1780  return false;
1781  }
1782  retval =
1783  aggregate_file_size.BindInt64(1, catalog::SqlDirent::kFlagFile) &&
1784  aggregate_file_size.BindInt64(2, catalog::SqlDirent::kFlagLink) &&
1785  aggregate_file_size.FetchRow();
1786  if (!retval) {
1787  Error("Failed to aggregate the file sizes.", aggregate_file_size, data);
1788  return false;
1789  }
1790 
1791  // Insert the counted statistics into the DeltaCounters data structure
1792  stats_counters.self.chunked_files = count_chunked_files.RetrieveInt64(0);
1793  stats_counters.self.chunked_file_size = count_chunked_files.RetrieveInt64(1);
1794  stats_counters.self.file_chunks = count_file_chunks.RetrieveInt64(0);
1795  stats_counters.self.file_size = aggregate_file_size.RetrieveInt64(0);
1796 
1797  // Write back the generated statistics counters into the catalog database
1798  catalog::Counters counters;
1799  retval = counters.ReadFromDatabase(writable, catalog::LegacyMode::kLegacy);
1800  if (!retval) {
1801  Error("Failed to read old catalog statistics counters", data);
1802  return false;
1803  }
1804  counters.ApplyDelta(stats_counters);
1805  retval = counters.InsertIntoDatabase(writable);
1806  if (!retval) {
1807  Error("Failed to write new statistics counters to database", data);
1808  return false;
1809  }
1810 
1811  // Push the generated statistics counters up to the parent catalog
1812  data->nested_statistics.Set(stats_counters);
1813 
1814  return true;
1815 }
1816 
1817 
1819  (PendingCatalog *data) const {
1820  assert(!data->HasNew());
1821  const catalog::CatalogDatabase &writable =
1822  GetWritable(data->old_catalog)->database();
1823  catalog::SqlCatalog update_schema_version(writable,
1824  "UPDATE properties SET value = :schema_version WHERE key = 'schema';");
1825 
1826  const bool retval =
1827  update_schema_version.BindDouble(1, 2.5) &&
1828  update_schema_version.Execute();
1829  if (!retval) {
1830  Error("Failed to update catalog schema version",
1831  update_schema_version,
1832  data);
1833  return false;
1834  }
1835 
1836  return true;
1837 }
1838 
1839 
1841  (PendingCatalog *data) const {
1842  assert(!data->HasNew());
1843  GetWritable(data->old_catalog)->Commit();
1844  return true;
1845 }
1846 
1847 
1848 //------------------------------------------------------------------------------
1849 
1850 
1852  const worker_context *context)
1854  , uid_map_statement_(GenerateMappingStatement(context->uid_map, "uid"))
1855  , gid_map_statement_(GenerateMappingStatement(context->gid_map, "gid"))
1856 {}
1857 
1859  PendingCatalog *data) const {
1860  return ApplyPersonaMappings(data);
1861 }
1862 
1863 
1865  PendingCatalog *data) const {
1866  assert(data->old_catalog != NULL);
1867  assert(data->new_catalog == NULL);
1868 
1869  if (data->old_catalog->mountpoint() ==
1871  {
1872  // skipping virtual catalog
1873  return true;
1874  }
1875 
1876  const catalog::CatalogDatabase &db =
1877  GetWritable(data->old_catalog)->database();
1878 
1879  if (!db.BeginTransaction()) {
1880  return false;
1881  }
1882 
1883  catalog::SqlCatalog uid_sql(db, uid_map_statement_);
1884  if (!uid_sql.Execute()) {
1885  Error("Failed to update UIDs", uid_sql, data);
1886  return false;
1887  }
1888 
1889  catalog::SqlCatalog gid_sql(db, gid_map_statement_);
1890  if (!gid_sql.Execute()) {
1891  Error("Failed to update GIDs", gid_sql, data);
1892  return false;
1893  }
1894 
1895  return db.CommitTransaction();
1896 }
1897 
1898 
1899 template <class MapT>
1901  const MapT &map,
1902  const std::string &column) const {
1903  assert(map.RuleCount() > 0 || map.HasDefault());
1904 
1905  std::string stmt = "UPDATE OR ABORT catalog SET " + column + " = ";
1906 
1907  if (map.RuleCount() == 0) {
1908  // map everything to the same value (just a simple UPDATE clause)
1909  stmt += StringifyInt(map.GetDefault());
1910  } else {
1911  // apply multiple ID mappings (UPDATE clause with CASE statement)
1912  stmt += "CASE " + column + " ";
1913  typedef typename MapT::map_type::const_iterator map_iterator;
1914  map_iterator i = map.GetRuleMap().begin();
1915  const map_iterator iend = map.GetRuleMap().end();
1916  for (; i != iend; ++i) {
1917  stmt += "WHEN " + StringifyInt(i->first) +
1918  " THEN " + StringifyInt(i->second) + " ";
1919  }
1920 
1921  // add a default (if provided) or leave unchanged if no mapping fits
1922  stmt += (map.HasDefault())
1923  ? "ELSE " + StringifyInt(map.GetDefault()) + " "
1924  : "ELSE " + column + " ";
1925  stmt += "END";
1926  }
1927 
1928  stmt += ";";
1929  return stmt;
1930 }
1931 
1932 
1933 //------------------------------------------------------------------------------
1934 
1935 
1937  PendingCatalog *data) const {
1938  return CheckDatabaseSchemaCompatibility(data) &&
1939  BreakUpHardlinks(data);
1940 }
1941 
1942 
1943 bool
1945  (PendingCatalog *data) const {
1946  assert(data->old_catalog != NULL);
1947  assert(data->new_catalog == NULL);
1948 
1949  const catalog::CatalogDatabase &clg = data->old_catalog->database();
1951 }
1952 
1953 
1955  PendingCatalog *data) const {
1956  assert(data->old_catalog != NULL);
1957  assert(data->new_catalog == NULL);
1958 
1959  const catalog::CatalogDatabase &db =
1960  GetWritable(data->old_catalog)->database();
1961 
1962  if (!db.BeginTransaction()) {
1963  return false;
1964  }
1965 
1966  // CernVM-FS catalogs do not contain inodes directly but they are assigned by
1967  // the CVMFS catalog at runtime. Hardlinks are treated with so-called hardlink
1968  // group IDs to indicate hardlink relationships that need to be respected at
1969  // runtime by assigning identical inodes accordingly.
1970  //
1971  // This updates all directory entries of a given catalog that have a linkcount
1972  // greater than 1 and are flagged as a 'file'. Note: Symlinks are flagged both
1973  // as 'file' and as 'symlink', hence they are updated implicitly as well.
1974  //
1975  // The 'hardlinks' field in the catalog contains two 32 bit integers:
1976  // * the linkcount in the lower 32 bits
1977  // * the (so called) hardlink group ID in the higher 32 bits
1978  //
1979  // Files that have a linkcount of exactly 1 do not have any hardlinks and have
1980  // the (implicit) hardlink group ID '0'. Hence, 'hardlinks == 1' means that a
1981  // file doesn't have any hardlinks (linkcount = 1) and doesn't need treatment
1982  // here.
1983  //
1984  // Files that have hardlinks (linkcount > 1) will have a very large integer in
1985  // their 'hardlinks' field (hardlink group ID > 0 in higher 32 bits). Those
1986  // files will be treated by setting their 'hardlinks' field to 1, effectively
1987  // clearing all hardlink information from the directory entry.
1988  const std::string stmt = "UPDATE OR ABORT catalog "
1989  "SET hardlinks = 1 "
1990  "WHERE flags & :file_flag "
1991  " AND hardlinks > 1;";
1992  catalog::SqlCatalog hardlink_removal_sql(db, stmt);
1993  hardlink_removal_sql.BindInt64(1, catalog::SqlDirent::kFlagFile);
1994  hardlink_removal_sql.Execute();
1995 
1996  return db.CommitTransaction();
1997 }
1998 
1999 //------------------------------------------------------------------------------
2000 
2001 
2003  PendingCatalog *data) const {
2004  return CheckDatabaseSchemaCompatibility(data) &&
2005  RemoveRedundantBulkHashes(data);
2006 }
2007 
2008 
2009 bool
2011  (PendingCatalog *data) const {
2012  assert(data->old_catalog != NULL);
2013  assert(data->new_catalog == NULL);
2014 
2015  const catalog::CatalogDatabase &clg = data->old_catalog->database();
2017 }
2018 
2019 
2021  PendingCatalog *data) const {
2022  assert(data->old_catalog != NULL);
2023  assert(data->new_catalog == NULL);
2024 
2025  const catalog::CatalogDatabase &db =
2026  GetWritable(data->old_catalog)->database();
2027 
2028  if (!db.BeginTransaction()) {
2029  return false;
2030  }
2031 
2032  // Regular files with both bulk hashes and chunked hashes can drop the bulk
2033  // hash since modern clients >= 2.1.7 won't require them
2034  const std::string stmt = "UPDATE OR ABORT catalog "
2035  "SET hash = NULL "
2036  "WHERE flags & :file_chunked_flag;";
2037  catalog::SqlCatalog bulkhash_removal_sql(db, stmt);
2038  bulkhash_removal_sql.BindInt64(1, catalog::SqlDirent::kFlagFileChunk);
2039  bulkhash_removal_sql.Execute();
2040 
2041  return db.CommitTransaction();
2042 }
2043 
2044 
2045 //------------------------------------------------------------------------------
2046 
2047 
2049  const worker_context *context)
2051 { }
2052 
2053 
2055  const
2056 {
2057  return CheckDatabaseSchemaCompatibility(data) &&
2058  StartDatabaseTransaction(data) &&
2059  RepairStatisticsCounters(data) &&
2060  CommitDatabaseTransaction(data);
2061 }
2062 
2063 
2065  PendingCatalog *data) const
2066 {
2067  assert(data->old_catalog != NULL);
2068  assert(data->new_catalog == NULL);
2069 
2070  const catalog::CatalogDatabase &clg = data->old_catalog->database();
2072  Error("Given catalog schema is < 2.5.", data);
2073  return false;
2074  }
2075 
2076  if (clg.schema_revision() < 5) {
2077  Error("Given catalog revision is < 5", data);
2078  return false;
2079  }
2080 
2081  return true;
2082 }
2083 
2084 
2086  PendingCatalog *data) const
2087 {
2088  assert(!data->HasNew());
2089  GetWritable(data->old_catalog)->Transaction();
2090  return true;
2091 }
2092 
2093 
2095  PendingCatalog *data) const
2096 {
2097  assert(!data->HasNew());
2098  bool retval = false;
2099  const catalog::CatalogDatabase &writable =
2100  GetWritable(data->old_catalog)->database();
2101 
2102  // Aggregated the statistics counters of all nested catalogs
2103  // Note: we might need to wait until nested catalogs are sucessfully processed
2104  catalog::DeltaCounters stats_counters;
2105  PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
2106  PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
2107  for (; i != iend; ++i) {
2108  const PendingCatalog *nested_catalog = *i;
2109  const catalog::DeltaCounters &s = nested_catalog->nested_statistics.Get();
2110  s.PopulateToParent(&stats_counters);
2111  }
2112 
2113  // Count various directory entry types in the catalog to fill up the catalog
2114  // statistics counters introduced in the current catalog schema
2115  catalog::SqlCatalog count_regular(writable,
2116  std::string("SELECT count(*), sum(size) FROM catalog ") +
2117  "WHERE flags & " + StringifyInt(catalog::SqlDirent::kFlagFile) +
2118  " AND NOT flags & " + StringifyInt(catalog::SqlDirent::kFlagLink) +
2119  " AND NOT flags & " + StringifyInt(catalog::SqlDirent::kFlagFileSpecial) +
2120  ";");
2121  catalog::SqlCatalog count_external(writable,
2122  std::string("SELECT count(*), sum(size) FROM catalog ") +
2124  ";");
2125  catalog::SqlCatalog count_symlink(writable,
2126  std::string("SELECT count(*) FROM catalog ") +
2127  "WHERE flags & " + StringifyInt(catalog::SqlDirent::kFlagLink) + ";");
2128  catalog::SqlCatalog count_special(writable,
2129  std::string("SELECT count(*) FROM catalog ") +
2131  ";");
2132  catalog::SqlCatalog count_xattr(writable,
2133  std::string("SELECT count(*) FROM catalog ") +
2134  "WHERE xattr IS NOT NULL;");
2135  catalog::SqlCatalog count_chunk(writable,
2136  std::string("SELECT count(*), sum(size) FROM catalog ") +
2137  "WHERE flags & " + StringifyInt(catalog::SqlDirent::kFlagFileChunk) + ";");
2138  catalog::SqlCatalog count_dir(writable,
2139  std::string("SELECT count(*) FROM catalog ") +
2140  "WHERE flags & " + StringifyInt(catalog::SqlDirent::kFlagDir) + ";");
2141  catalog::SqlCatalog count_chunk_blobs(writable,
2142  "SELECT count(*) FROM chunks;");
2143 
2144  retval = count_regular.FetchRow() &&
2145  count_external.FetchRow() &&
2146  count_symlink.FetchRow() &&
2147  count_special.FetchRow() &&
2148  count_xattr.FetchRow() &&
2149  count_chunk.FetchRow() &&
2150  count_dir.FetchRow() &&
2151  count_chunk_blobs.FetchRow();
2152  if (!retval) {
2153  Error("Failed to collect catalog statistics", data);
2154  return false;
2155  }
2156 
2157  stats_counters.self.regular_files = count_regular.RetrieveInt64(0);
2158  stats_counters.self.symlinks = count_symlink.RetrieveInt64(0);
2159  stats_counters.self.specials = count_special.RetrieveInt64(0);
2160  stats_counters.self.directories = count_dir.RetrieveInt64(0);
2161  stats_counters.self.nested_catalogs = data->nested_catalogs.size();
2162  stats_counters.self.chunked_files = count_chunk.RetrieveInt64(0);
2163  stats_counters.self.file_chunks = count_chunk_blobs.RetrieveInt64(0);
2164  stats_counters.self.file_size = count_regular.RetrieveInt64(1);
2165  stats_counters.self.chunked_file_size = count_chunk.RetrieveInt64(1);
2166  stats_counters.self.xattrs = count_xattr.RetrieveInt64(0);
2167  stats_counters.self.externals = count_external.RetrieveInt64(0);
2168  stats_counters.self.external_file_size = count_external.RetrieveInt64(1);
2169 
2170  // Write back the generated statistics counters into the catalog database
2171  catalog::Counters counters;
2172  counters.ApplyDelta(stats_counters);
2173  retval = counters.InsertIntoDatabase(writable);
2174  if (!retval) {
2175  Error("Failed to write new statistics counters to database", data);
2176  return false;
2177  }
2178 
2179  // Push the generated statistics counters up to the parent catalog
2180  data->nested_statistics.Set(stats_counters);
2181 
2182  return true;
2183 }
2184 
2185 
2187  PendingCatalog *data) const
2188 {
2189  assert(!data->HasNew());
2190  GetWritable(data->old_catalog)->Commit();
2191  return true;
2192 }
2193 
2194 } // namespace swissknife
bool InsertIntoDatabase(const CatalogDatabase &database) const
uint32_t linkcount() const
catalog::Catalog const * root_catalog_
int return_code
the return value of the spooler operation
void ConvertCatalogsRecursively(PendingCatalog *catalog, MigratorT *migrator)
static Parameter Optional(const char key, const std::string &desc)
Definition: swissknife.h:41
#define LogCvmfs(source, mask,...)
Definition: logging.h:20
bool UpdateCatalogMetadata(PendingCatalog *data) const
std::string GetLastErrorMsg() const
Definition: sql.cc:164
bool AnalyzeFileLinkcounts(PendingCatalog *data) const
UniquePtr< upload::Spooler > spooler_
CallbackPtr RegisterListener(typename BoundClosure< WorkerT::returned_data, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data)
PendingCatalogMap pending_catalogs_
Differences CompareTo(const DirectoryEntry &other) const
bool ReadPersonaMaps(const std::string &uid_map_path, const std::string &gid_map_path, UidMap *uid_map, GidMap *gid_map) const
const manifest::Manifest * manifest() const
Definition: repository.h:125
bool Export(const std::string &path) const
Definition: manifest.cc:209
shash::Md5 GetPathHash() const
Definition: catalog_sql.cc:683
bool Execute()
Definition: sql.cc:42
bool Reset()
Definition: sql.cc:126
static Parameter Switch(const char key, const std::string &desc)
Definition: swissknife.h:44
bool MigrateFileMetadata(PendingCatalog *data) const
bool UpdateCatalogSchema(PendingCatalog *data) const
FileChunkList file_chunks
the file chunks generated during processing
bool BindText(const int index, const std::string &value)
Definition: sql.h:399
void Insert(const CatalogStatistics &statistics)
std::string database_path() const
Definition: catalog.h:184
void set_catalog_hash(const shash::Any &catalog_hash)
Definition: manifest.h:102
bool FetchRow()
Definition: sql.cc:62
bool FindRootEntryInformation(PendingCatalog *data) const
unsigned int GetNumberOfCpuCores()
const std::string & filename() const
Definition: sql.h:148
ConcurrentWorkers< DerivedWorkerT > * master() const
void WaitForEmptyQueue() const
bool StartDatabaseTransaction(PendingCatalog *data) const
bool LoadCatalogs(const shash::Any &manual_root_hash, ObjectFetcherT *object_fetcher)
bool BindDirent(const DirectoryEntry &entry)
bool FixNestedCatalogTransitionPoints(PendingCatalog *data) const
static const std::string kPreviousHeadTag
Definition: repository.cc:41
std::string name
Definition: history.h:89
std::vector< Parameter > ParameterList
Definition: swissknife.h:71
void CreateNestedCatalogMarkerDirent(const shash::Any &content_hash)
double GetTime() const
Definition: algorithm.cc:76
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:248
void Assign(const char *chars, const unsigned length)
Definition: shortstring.h:53
void UploadCallback(const upload::SpoolerResult &result)
bool IsDirty() const
Definition: catalog_rw.h:57
const history::History * history() const
gid_t gid_
Definition: loader.cc:133
gid_t gid_
void Reset()
Definition: algorithm.cc:69
unsigned revision
Definition: history.h:92
void ApplyDelta(const DeltaCounters &delta)
bool IsHttpUrl(const std::string &path)
Definition: posix.cc:203
DirectoryEntry GetDirent(const Catalog *catalog, const bool expand_symlink=true) const
Definition: catalog_sql.cc:696
bool BeginTransaction() const
Definition: sql_impl.h:268
std::string CreateTempPath(const std::string &path_prefix, const int mode)
Definition: posix.cc:1058
void set_revision(const uint64_t revision)
Definition: manifest.h:86
static void FixNestedCatalogTransitionPoint(const catalog::DirectoryEntry &nested_root, catalog::DirectoryEntry *mountpoint)
virtual ParameterList GetParams() const
uint64_t GetTTL() const
Definition: catalog.cc:495
assert((mem||(size==0))&&"Out Of Memory")
static const int kFlagFileChunk
Definition: catalog_sql.h:186
float schema_version() const
Definition: sql.h:149
CatalogList GetChildren() const
Definition: catalog.cc:752
void PrintStatusMessage(const PendingCatalog *catalog, const shash::Any &content_hash, const std::string &message)
void SetPreviousRevision(const shash::Any &hash)
Definition: catalog_rw.cc:352
bool RemoveDanglingNestedMountpoints(PendingCatalog *data) const
static const int kFlagFile
Definition: catalog_sql.h:183
static catalog::DirectoryEntry nested_catalog_marker_
bool AttachOldCatalogDatabase(PendingCatalog *data) const
bool CopyPath2Path(const string &src, const string &dest)
Definition: compression.cc:63
bool ReadPersona(const std::string &uid, const std::string &gid)
const NestedCatalogList & ListNestedCatalogs() const
Definition: catalog.cc:630
bool InitVerifyingSignatureManager(const std::string &pubkey_path, const std::string &trusted_certs="")
Definition: server_tool.cc:49
void Set(const T &object)
uint64_t size_
void JobSuccessful(const returned_data_t &data)
bool MigrateNestedCatalogMountPoints(PendingCatalog *data) const
unsigned schema_revision() const
Definition: sql.h:150
void CatalogCallback(const CatalogTraversalData< catalog::WritableCatalog > &data)
signature::SignatureManager * signature_manager() const
Definition: server_tool.cc:118
UniquePtr< history::SqliteHistory > history_upstream_
static const int kFlagLink
Definition: catalog_sql.h:184
static void Error(const std::string &message)
bool WriteToDatabase(const CatalogDatabase &database) const
bool IsNestedCatalogMountpoint() const
bool BindPathHash(const shash::Md5 &hash)
bool CommitDatabaseTransaction(PendingCatalog *data) const
static SqliteHistory * OpenWritable(const std::string &file_name)
static WritableCatalog * AttachFreely(const std::string &root_path, const std::string &file, const shash::Any &catalog_hash, Catalog *parent=NULL, const bool is_not_root=false)
Definition: catalog_rw.cc:49
std::string description
Definition: history.h:94
NameString name_
uint64_t GetRevision() const
Definition: catalog.cc:528
int GetLastError() const
Definition: sql.h:343
bool GenerateCatalogStatistics(PendingCatalog *data) const
NameString name() const
uint64_t GetLastModified() const
Definition: catalog.cc:533
int64_t String2Int64(const string &value)
Definition: string.cc:222
bool BindDouble(const int index, const double value)
Definition: sql.h:371
void UploadHistoryClosure(const upload::SpoolerResult &result, Future< shash::Any > *hash)
std::string local_path
the local_path previously given as input
uid_t uid_
Definition: loader.cc:132
download::DownloadManager * download_manager() const
Definition: server_tool.cc:113
Future< catalog::DirectoryEntry > root_entry
std::string GenerateMappingStatement(const MapT &map, const std::string &column) const
static const std::string kPreviousHeadTagDescription
static const float kSchemaEpsilon
Definition: sql.h:105
bool RepairStatisticsCounters(PendingCatalog *data) const
bool CheckDatabaseSchemaCompatibility(PendingCatalog *data) const
static Parameter Mandatory(const char key, const std::string &desc)
Definition: swissknife.h:38
bool StartDatabaseTransaction(PendingCatalog *data) const
bool DetachOldCatalogDatabase(PendingCatalog *data) const
const char kSuffixCatalog
Definition: hash.h:53
uint32_t linkcount_
bool ApplyPersonaMappings(PendingCatalog *data) const
static const int kFlagFileSpecial
Definition: catalog_sql.h:185
static const int kFlagFileExternal
Definition: catalog_sql.h:192
bool CommitDatabaseTransaction(PendingCatalog *data) const
static const std::string kHeadTag
Definition: repository.cc:40
static const catalog::DirectoryEntry & GetNestedCatalogMarkerDirent()
void PopulateToParent(DeltaCounters *parent) const
Any MkFromHexPtr(const HexPtr hex, const char suffix)
Definition: hash.cc:83
bool InsertInitialValues(const std::string &root_path, const bool volatile_content, const std::string &voms_authz, const DirectoryEntry &root_entry=DirectoryEntry(kDirentNegative))
Definition: catalog_sql.cc:263
sqlite3_int64 RetrieveInt64(const int idx_column) const
Definition: sql.h:445
bool RunMigration(PendingCatalog *data) const
PathString mountpoint() const
Definition: catalog.h:179
time_t mtime_
bool BindPathHash(const struct shash::Md5 &hash)
Definition: catalog_sql.cc:780
string StringifyInt(const int64_t value)
Definition: string.cc:78
bool IsValid() const
Definition: pointer.h:43
bool CommitTransaction() const
Definition: sql_impl.h:275
bool ReadFromDatabase(const CatalogDatabase &database, const LegacyMode::Type legacy=LegacyMode::kNoLegacy)
bool BindInt64(const int index, const sqlite3_int64 value)
Definition: sql.h:381
void Stop()
Definition: algorithm.cc:61
void set_history(const shash::Any &history_db)
Definition: manifest.h:90
Future< catalog::DeltaCounters > nested_statistics
bool CollectAndAggregateStatistics(PendingCatalog *data) const
std::vector< Catalog * > CatalogList
Definition: catalog.h:38
bool GenerateNewStatisticsCounters(PendingCatalog *data) const
void set_ttl(const uint32_t ttl)
Definition: manifest.h:85
bool CommitDatabaseTransaction(PendingCatalog *data) const
bool RunMigration(PendingCatalog *data) const
std::string ToString() const
Definition: shortstring.h:114
std::vector< NestedCatalog > NestedCatalogList
Definition: catalog.h:208
int Main(const ArgumentList &args)
std::map< char, SharedPtr< std::string > > ArgumentList
Definition: swissknife.h:72
unsigned int mode_
ShortString< kDefaultMaxPath, 0 > PathString
Definition: shortstring.h:190
MigrationWorker_20x(const worker_context *context)
UniquePtr< manifest::Manifest > manifest_upstream_
bool UpdateUndoTags(PendingCatalog *root_catalog, unsigned revision, time_t timestamp, shash::Any *history_hash)
shash::Any root_hash
Definition: history.h:90
bool CleanupNestedCatalogs(PendingCatalog *data) const
bool UpdateNestedCatalogReferences(PendingCatalog *data) const
bool CreateNewEmptyCatalog(PendingCatalog *data) const
void set_catalog_size(const uint64_t catalog_size)
Definition: manifest.h:99
CatalogStatisticsList catalog_statistics_list_
const CatalogDatabase & database() const
Definition: catalog.h:249
unsigned int GetNumberOfFailedJobs() const
int64_t GetFileSize(const std::string &path)
Definition: posix.cc:825
HttpObjectFetcher ObjectFetcher
bool CheckDatabaseSchemaCompatibility(PendingCatalog *data) const
static const float kLatestSupportedSchema
Definition: catalog_sql.h:44
bool StartDatabaseTransaction(PendingCatalog *data) const
void JobFailed(const returned_data_t &data)
unsigned GetLength() const
Definition: shortstring.h:104
shash::Any checksum_
shash::Any hash() const
Definition: catalog.h:186
static const char * kVirtualPath
bool InitDownloadManager(const bool follow_redirects, const std::string &proxy, const unsigned max_pool_handles=1)
Definition: server_tool.cc:21
bool CheckDatabaseSchemaCompatibility(PendingCatalog *data) const
bool BindMd5(const int idx_high, const int idx_low, const shash::Md5 &hash)
Definition: catalog_sql.h:138
const char * GetChars() const
Definition: shortstring.h:96
uint32_t hardlink_group() const
bool BindParentPathHash(const shash::Md5 &hash)
static const unsigned kLatestSchemaRevision
Definition: catalog_sql.h:46
static const int kFlagDir
Definition: catalog_sql.h:178
void MigrationCallback(PendingCatalog *const &data)
void Start()
Definition: algorithm.cc:53
void set_is_nested_catalog_mountpoint(const bool val)
unsigned int Differences
bool BindPathHash(const shash::Md5 &hash)
catalog::WritableCatalog * GetWritable(const catalog::Catalog *catalog) const
void Destroy()
Definition: pointer.h:45
static DerivedT * Create(const std::string &filename)
Definition: sql_impl.h:30
bool DoMigrationAndCommit(const std::string &manifest_path, typename MigratorT::worker_context *context)
size_t size() const
Definition: bigvector.h:100
bool BindDirent(const DirectoryEntry &entry)
uid_t uid_
void set_root_path(const std::string &root_path)
Definition: manifest.h:114