CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
swissknife_migrate.cc
Go to the documentation of this file.
1 
8 #include "swissknife_migrate.h"
9 
10 #include <sys/resource.h>
11 #include <unistd.h>
12 
13 #include "catalog_rw.h"
14 #include "catalog_sql.h"
15 #include "catalog_virtual.h"
17 #include "crypto/hash.h"
18 #include "swissknife_history.h"
19 #include "util/concurrency.h"
20 #include "util/logging.h"
21 
22 using namespace std; // NOLINT
23 
24 namespace swissknife {
25 
26 catalog::DirectoryEntry CommandMigrate::nested_catalog_marker_;
27 
28 CommandMigrate::CommandMigrate()
29  : file_descriptor_limit_(8192)
30  , catalog_count_(0)
31  , has_committed_new_revision_(false)
32  , uid_(0)
33  , gid_(0)
34  , root_catalog_(NULL) {
35  atomic_init32(&catalogs_processed_);
36 }
37 
38 
40  ParameterList r;
41  r.push_back(Parameter::Mandatory(
42  'v',
43  "migration base version ( 2.0.x | 2.1.7 | chown | hardlink | bulkhash | "
44  "stats)"));
45  r.push_back(Parameter::Mandatory(
46  'r', "repository URL (absolute local path or remote URL)"));
47  r.push_back(Parameter::Mandatory('u', "upstream definition string"));
48  r.push_back(Parameter::Mandatory('o', "manifest output file"));
49  r.push_back(
50  Parameter::Mandatory('t', "temporary directory for catalog decompress"));
51  r.push_back(
52  Parameter::Optional('p', "user id to be used for this repository"));
53  r.push_back(
54  Parameter::Optional('g', "group id to be used for this repository"));
55  r.push_back(Parameter::Optional('n', "fully qualified repository name"));
56  r.push_back(Parameter::Optional('h', "root hash (other than trunk)"));
57  r.push_back(Parameter::Optional('k', "repository master key(s)"));
58  r.push_back(Parameter::Optional('i', "UID map for chown"));
59  r.push_back(Parameter::Optional('j', "GID map for chown"));
60  r.push_back(Parameter::Optional('@', "proxy url"));
61  r.push_back(Parameter::Switch('f', "fix nested catalog transition points"));
62  r.push_back(Parameter::Switch('l', "disable linkcount analysis of files"));
63  r.push_back(
64  Parameter::Switch('s', "enable collection of catalog statistics"));
65  return r;
66 }
67 
68 
69 static void Error(const std::string &message) {
70  LogCvmfs(kLogCatalog, kLogStderr, "%s", message.c_str());
71 }
72 
73 
74 static void Error(const std::string &message,
75  const CommandMigrate::PendingCatalog *catalog) {
76  const std::string err_msg = message
77  + "\n"
78  "Catalog: "
79  + catalog->root_path();
80  Error(err_msg);
81 }
82 
83 
84 static void Error(const std::string &message,
85  const catalog::SqlCatalog &statement,
86  const CommandMigrate::PendingCatalog *catalog) {
87  const std::string err_msg = message
88  + "\n"
89  "SQLite: "
90  + StringifyInt(statement.GetLastError()) + " - "
91  + statement.GetLastErrorMsg();
92  Error(err_msg, catalog);
93 }
94 
95 
97  shash::Any manual_root_hash;
98  const std::string &migration_base = *args.find('v')->second;
99  const std::string &repo_url = *args.find('r')->second;
100  const std::string &spooler = *args.find('u')->second;
101  const std::string &manifest_path = *args.find('o')->second;
102  const std::string &tmp_dir = *args.find('t')->second;
103  const std::string &uid = (args.count('p') > 0) ? *args.find('p')->second : "";
104  const std::string &gid = (args.count('g') > 0) ? *args.find('g')->second : "";
105  const std::string &repo_name = (args.count('n') > 0) ? *args.find('n')->second
106  : "";
107  const std::string &repo_keys = (args.count('k') > 0) ? *args.find('k')->second
108  : "";
109  const std::string &uid_map_path = (args.count('i') > 0)
110  ? *args.find('i')->second
111  : "";
112  const std::string &gid_map_path = (args.count('j') > 0)
113  ? *args.find('j')->second
114  : "";
115  const bool fix_transition_points = (args.count('f') > 0);
116  const bool analyze_file_linkcounts = (args.count('l') == 0);
117  const bool collect_catalog_statistics = (args.count('s') > 0);
118  if (args.count('h') > 0) {
119  manual_root_hash = shash::MkFromHexPtr(
120  shash::HexPtr(*args.find('h')->second), shash::kSuffixCatalog);
121  }
122 
123  // We might need a lot of file descriptors
124  if (!RaiseFileDescriptorLimit()) {
125  Error("Failed to raise file descriptor limits");
126  return 2;
127  }
128 
129  // Put SQLite into multithreaded mode
130  if (!ConfigureSQLite()) {
131  Error("Failed to preconfigure SQLite library");
132  return 3;
133  }
134 
135  // Create an upstream spooler
136  temporary_directory_ = tmp_dir;
137  const upload::SpoolerDefinition spooler_definition(spooler, shash::kSha1);
138  spooler_ = upload::Spooler::Construct(spooler_definition);
139  if (!spooler_.IsValid()) {
140  Error("Failed to create upstream Spooler.");
141  return 5;
142  }
143  spooler_->RegisterListener(&CommandMigrate::UploadCallback, this);
144 
145  // Load the full catalog hierarchy
146  LogCvmfs(kLogCatalog, kLogStdout, "Loading current catalog tree...");
147 
149  bool loading_successful = false;
150  if (IsHttpUrl(repo_url)) {
152 
153  const bool follow_redirects = false;
154  const string proxy = (args.count('@') > 0) ? *args.find('@')->second : "";
155  if (!this->InitDownloadManager(follow_redirects, proxy)
156  || !this->InitSignatureManager(repo_keys)) {
157  LogCvmfs(kLogCatalog, kLogStderr, "Failed to init repo connection");
158  return 1;
159  }
160 
161  ObjectFetcher fetcher(
162  repo_name, repo_url, tmp_dir, download_manager(), signature_manager());
163 
164  loading_successful = LoadCatalogs(manual_root_hash, &fetcher);
165  } else {
167  ObjectFetcher fetcher(repo_url, tmp_dir);
168  loading_successful = LoadCatalogs(manual_root_hash, &fetcher);
169  }
171 
172  if (!loading_successful) {
173  Error("Failed to load catalog tree");
174  return 4;
175  }
176 
177  LogCvmfs(kLogCatalog, kLogStdout, "Loaded %d catalogs", catalog_count_);
178  assert(root_catalog_ != NULL);
179 
180  // Do the actual migration step
181  bool migration_succeeded = false;
182  if (migration_base == "2.0.x") {
183  if (!ReadPersona(uid, gid)) {
184  return 1;
185  }
186 
187  // Generate and upload a nested catalog marker
189  Error("Failed to create a nested catalog marker.");
190  return 6;
191  }
192  spooler_->WaitForUpload();
193 
194  // Configure the concurrent catalog migration facility
196  collect_catalog_statistics,
197  fix_transition_points,
198  analyze_file_linkcounts,
199  uid_,
200  gid_);
201  migration_succeeded = DoMigrationAndCommit<MigrationWorker_20x>(
202  manifest_path, &context);
203  } else if (migration_base == "2.1.7") {
204  MigrationWorker_217::worker_context context(temporary_directory_,
205  collect_catalog_statistics);
206  migration_succeeded = DoMigrationAndCommit<MigrationWorker_217>(
207  manifest_path, &context);
208  } else if (migration_base == "chown") {
209  UidMap uid_map;
210  GidMap gid_map;
211  if (!ReadPersonaMaps(uid_map_path, gid_map_path, &uid_map, &gid_map)) {
212  Error("Failed to read UID and/or GID map");
213  return 1;
214  }
216  temporary_directory_, collect_catalog_statistics, uid_map, gid_map);
217  migration_succeeded = DoMigrationAndCommit<ChownMigrationWorker>(
218  manifest_path, &context);
219  } else if (migration_base == "hardlink") {
220  HardlinkRemovalMigrationWorker::worker_context context(
221  temporary_directory_, collect_catalog_statistics);
222  migration_succeeded = DoMigrationAndCommit<HardlinkRemovalMigrationWorker>(
223  manifest_path, &context);
224  } else if (migration_base == "bulkhash") {
225  BulkhashRemovalMigrationWorker::worker_context context(
226  temporary_directory_, collect_catalog_statistics);
227  migration_succeeded = DoMigrationAndCommit<BulkhashRemovalMigrationWorker>(
228  manifest_path, &context);
229  } else if (migration_base == "stats") {
230  StatsMigrationWorker::worker_context context(temporary_directory_,
231  collect_catalog_statistics);
232  migration_succeeded = DoMigrationAndCommit<StatsMigrationWorker>(
233  manifest_path, &context);
234  } else {
235  const std::string err_msg = "Unknown migration base: " + migration_base;
236  Error(err_msg);
237  return 1;
238  }
239 
240  // Check if everything went well
241  if (!migration_succeeded) {
242  Error("Migration failed!");
243  return 5;
244  }
245 
246  // Analyze collected statistics
247  if (collect_catalog_statistics && has_committed_new_revision_) {
248  LogCvmfs(kLogCatalog, kLogStdout, "\nCollected statistics results:");
250  }
251 
252  LogCvmfs(kLogCatalog, kLogStdout, "\nCatalog Migration succeeded");
253  return 0;
254 }
255 
256 
257 bool CommandMigrate::ReadPersona(const std::string &uid,
258  const std::string &gid) {
259  if (uid.empty()) {
260  Error("Please provide a user ID");
261  return false;
262  }
263  if (gid.empty()) {
264  Error("Please provide a group ID");
265  return false;
266  }
267 
268  uid_ = String2Int64(uid);
269  gid_ = String2Int64(gid);
270  return true;
271 }
272 
273 
274 bool CommandMigrate::ReadPersonaMaps(const std::string &uid_map_path,
275  const std::string &gid_map_path,
276  UidMap *uid_map,
277  GidMap *gid_map) const {
278  if (!uid_map->Read(uid_map_path) || !uid_map->IsValid()) {
279  Error("Failed to read UID map");
280  return false;
281  }
282 
283  if (!gid_map->Read(gid_map_path) || !gid_map->IsValid()) {
284  Error("Failed to read GID map");
285  return false;
286  }
287 
288  if (uid_map->RuleCount() == 0 && !uid_map->HasDefault()) {
289  Error("UID map appears to be empty");
290  return false;
291  }
292 
293  if (gid_map->RuleCount() == 0 && !gid_map->HasDefault()) {
294  Error("GID map appears to be empty");
295  return false;
296  }
297 
298  return true;
299 }
300 
301 
303  Future<shash::Any> *hash) {
304  assert(!result.IsChunked());
305  if (result.return_code != 0) {
306  LogCvmfs(kLogCvmfs, kLogStderr, "failed to upload history database (%d)",
307  result.return_code);
308  hash->Set(shash::Any());
309  } else {
310  hash->Set(result.content_hash);
311  }
312 }
313 
314 
316  uint64_t revision,
317  time_t timestamp,
318  shash::Any *history_hash) {
319  const string filename_old = history_upstream_->filename();
320  const string filename_new = filename_old + ".new";
321  bool retval = CopyPath2Path(filename_old, filename_new);
322  if (!retval)
323  return false;
326  history->TakeDatabaseFileOwnership();
327 
328  history::History::Tag tag_trunk;
329  const bool exists = history->GetByName(CommandTag::kHeadTag, &tag_trunk);
330  if (exists) {
331  retval = history->Remove(CommandTag::kHeadTag);
332  if (!retval)
333  return false;
334 
335  history::History::Tag tag_trunk_previous = tag_trunk;
336  tag_trunk_previous.name = CommandTag::kPreviousHeadTag;
338  history->Remove(CommandTag::kPreviousHeadTag);
339 
340  tag_trunk.root_hash = root_catalog->new_catalog_hash;
341  tag_trunk.size = root_catalog->new_catalog_size;
342  tag_trunk.revision = revision;
343  tag_trunk.timestamp = timestamp;
344 
345  retval = history->Insert(tag_trunk_previous);
346  if (!retval)
347  return false;
348  retval = history->Insert(tag_trunk);
349  if (!retval)
350  return false;
351  }
352 
353  history->SetPreviousRevision(manifest_upstream_->history());
354  history->DropDatabaseFileOwnership();
355  history.Destroy();
356 
357  Future<shash::Any> history_hash_new;
358  upload::Spooler::CallbackPtr callback = spooler_->RegisterListener(
359  &CommandMigrate::UploadHistoryClosure, this, &history_hash_new);
360  spooler_->ProcessHistory(filename_new);
361  spooler_->WaitForUpload();
362  spooler_->UnregisterListener(callback);
363  unlink(filename_new.c_str());
364  *history_hash = history_hash_new.Get();
365  if (history_hash->IsNull()) {
366  Error("failed to upload tag database");
367  return false;
368  }
369 
370  return true;
371 }
372 
373 
374 template<class MigratorT>
376  const std::string &manifest_path,
377  typename MigratorT::worker_context *context) {
378  // Create a concurrent migration context for catalog migration
379  const unsigned int cpus = GetNumberOfCpuCores();
380  ConcurrentWorkers<MigratorT> concurrent_migration(cpus, cpus * 10, context);
381 
382  if (!concurrent_migration.Initialize()) {
383  Error("Failed to initialize worker migration system.");
384  return false;
385  }
387  this);
388 
389  // Migrate catalogs recursively (starting with the deepest nested catalogs)
390  LogCvmfs(kLogCatalog, kLogStdout, "\nMigrating catalogs...");
391  PendingCatalog *root_catalog = new PendingCatalog(root_catalog_);
393  ConvertCatalogsRecursively(root_catalog, &concurrent_migration);
394  concurrent_migration.WaitForEmptyQueue();
395  spooler_->WaitForUpload();
396  spooler_->UnregisterListeners();
398 
399  // check for possible errors during the migration process
400  const unsigned int errors = concurrent_migration.GetNumberOfFailedJobs()
401  + spooler_->GetNumberOfErrors();
403  "Catalog Migration finished with %d errors.", errors);
404  if (errors > 0) {
406  "\nCatalog Migration produced errors\nAborting...");
407  return false;
408  }
409 
410  if (root_catalog->was_updated.Get()) {
412  "\nCommitting migrated repository revision...");
414  manifest.set_catalog_hash(root_catalog->new_catalog_hash);
415  manifest.set_catalog_size(root_catalog->new_catalog_size);
416  manifest.set_root_path(root_catalog->root_path());
417  const catalog::Catalog *new_catalog = (root_catalog->HasNew())
418  ? root_catalog->new_catalog
419  : root_catalog->old_catalog;
420  manifest.set_ttl(new_catalog->GetTTL());
421  manifest.set_revision(new_catalog->GetRevision());
422 
423  // Commit the new (migrated) repository revision...
424  if (history_upstream_.IsValid()) {
425  shash::Any history_hash(manifest_upstream_->history());
427  "Updating repository tag database... ");
428  if (!UpdateUndoTags(root_catalog,
429  new_catalog->GetRevision(),
430  new_catalog->GetLastModified(),
431  &history_hash)) {
432  Error("Updating tag database failed.\nAborting...");
433  return false;
434  }
435  manifest.set_history(history_hash);
436  LogCvmfs(kLogCvmfs, kLogStdout, "%s", history_hash.ToString().c_str());
437  }
438 
439  if (!manifest.Export(manifest_path)) {
440  Error("Manifest export failed.\nAborting...");
441  return false;
442  }
444  } else {
446  "\nNo catalogs migrated, skipping the commit...");
447  }
448 
449  // Get rid of the open root catalog
450  delete root_catalog;
451 
452  return true;
453 }
454 
455 
458  std::string tree_indent;
459  std::string hash_string;
460  std::string path;
461 
462  for (unsigned int i = 1; i < data.tree_level; ++i) {
463  tree_indent += "\u2502 ";
464  }
465 
466  if (data.tree_level > 0) {
467  tree_indent += "\u251C\u2500 ";
468  }
469 
470  hash_string = data.catalog_hash.ToString();
471 
472  path = data.catalog->mountpoint().ToString();
473  if (path.empty()) {
474  path = "/";
475  root_catalog_ = data.catalog;
476  }
477 
478  LogCvmfs(kLogCatalog, kLogStdout, "%s%s %s", tree_indent.c_str(),
479  hash_string.c_str(), path.c_str());
480 
481  ++catalog_count_;
482 }
483 
484 
486  // Check if the migration of the catalog was successful
487  if (!data->success) {
488  Error("Catalog migration failed! Aborting...");
489  exit(1);
490  return;
491  }
492 
493  if (!data->HasChanges()) {
494  PrintStatusMessage(data, data->GetOldContentHash(), "preserved");
495  data->was_updated.Set(false);
496  return;
497  }
498 
499  const string &path = (data->HasNew()) ? data->new_catalog->database_path()
500  : data->old_catalog->database_path();
501 
502  // Save the processed catalog in the pending map
503  {
505  assert(pending_catalogs_.find(path) == pending_catalogs_.end());
506  pending_catalogs_[path] = data;
507  }
509 
510  // check the size of the uncompressed catalog file
511  const size_t new_catalog_size = GetFileSize(path);
512  if (new_catalog_size <= 0) {
513  Error("Failed to get uncompressed file size of catalog!", data);
514  exit(2);
515  return;
516  }
517  data->new_catalog_size = new_catalog_size;
518 
519  // Schedule the compression and upload of the catalog
520  spooler_->ProcessCatalog(path);
521 }
522 
523 
525  const string &path = result.local_path;
526 
527  // Check if the upload was successful
528  if (result.return_code != 0) {
529  Error("Failed to upload file " + path + "\nAborting...");
530  exit(2);
531  return;
532  }
533  assert(result.file_chunks.size() == 0);
534 
535  // Remove the just uploaded file
536  unlink(path.c_str());
537 
538  // Uploaded nested catalog marker... generate and cache DirectoryEntry for it
539  if (path == nested_catalog_marker_tmp_path_) {
541  return;
542  } else {
543  // Find the catalog path in the pending catalogs and remove it from the list
544  PendingCatalog *catalog;
545  {
547  const PendingCatalogMap::iterator i = pending_catalogs_.find(path);
548  assert(i != pending_catalogs_.end());
549  catalog = const_cast<PendingCatalog *>(i->second);
550  pending_catalogs_.erase(i);
551  }
552 
553  PrintStatusMessage(catalog, result.content_hash, "migrated and uploaded");
554 
555  // The catalog is completely processed... fill the content_hash to allow the
556  // processing of parent catalogs (Notified by 'was_updated'-future)
557  // NOTE: From now on, this PendingCatalog structure could be deleted and
558  // should not be used anymore!
559  catalog->new_catalog_hash = result.content_hash;
560  catalog->was_updated.Set(true);
561  }
562 }
563 
564 
566  const shash::Any &content_hash,
567  const std::string &message) {
568  atomic_inc32(&catalogs_processed_);
569  const unsigned int processed = (atomic_read32(&catalogs_processed_) * 100)
570  / catalog_count_;
571  LogCvmfs(kLogCatalog, kLogStdout, "[%d%%] %s %sC %s", processed,
572  message.c_str(), content_hash.ToString().c_str(),
573  catalog->root_path().c_str());
574 }
575 
576 
577 template<class MigratorT>
579  MigratorT *migrator) {
580  // First migrate all nested catalogs (depth first traversal)
581  const catalog::CatalogList nested_catalogs = catalog->old_catalog
582  ->GetChildren();
583  catalog::CatalogList::const_iterator i = nested_catalogs.begin();
584  const catalog::CatalogList::const_iterator iend = nested_catalogs.end();
585  catalog->nested_catalogs.reserve(nested_catalogs.size());
586  for (; i != iend; ++i) {
587  PendingCatalog *new_nested = new PendingCatalog(*i);
588  catalog->nested_catalogs.push_back(new_nested);
589  ConvertCatalogsRecursively(new_nested, migrator);
590  }
591 
592  // Migrate this catalog referencing all its (already migrated) children
593  migrator->Schedule(catalog);
594 }
595 
596 
598  struct rlimit rpl;
599  memset(&rpl, 0, sizeof(rpl));
600  getrlimit(RLIMIT_NOFILE, &rpl);
601  if (rpl.rlim_cur < file_descriptor_limit_) {
602  if (rpl.rlim_max < file_descriptor_limit_)
603  rpl.rlim_max = file_descriptor_limit_;
604  rpl.rlim_cur = file_descriptor_limit_;
605  const bool retval = setrlimit(RLIMIT_NOFILE, &rpl);
606  if (retval != 0) {
607  return false;
608  }
609  }
610  return true;
611 }
612 
613 
615  const int retval = sqlite3_config(SQLITE_CONFIG_MULTITHREAD);
616  return (retval == SQLITE_OK);
617 }
618 
619 
621  const unsigned int number_of_catalogs = catalog_statistics_list_.size();
622  unsigned int aggregated_entry_count = 0;
623  unsigned int aggregated_max_row_id = 0;
624  unsigned int aggregated_hardlink_count = 0;
625  unsigned int aggregated_linkcounts = 0;
626  double aggregated_migration_time = 0.0;
627 
628  CatalogStatisticsList::const_iterator i = catalog_statistics_list_.begin();
629  const CatalogStatisticsList::const_iterator iend =
631  for (; i != iend; ++i) {
632  aggregated_entry_count += i->entry_count;
633  aggregated_max_row_id += i->max_row_id;
634  aggregated_hardlink_count += i->hardlink_group_count;
635  aggregated_linkcounts += i->aggregated_linkcounts;
636  aggregated_migration_time += i->migration_time;
637  }
638 
639  // Inode quantization
640  assert(aggregated_max_row_id > 0);
641  const unsigned int unused_inodes = aggregated_max_row_id
642  - aggregated_entry_count;
643  const float ratio = (static_cast<float>(unused_inodes)
644  / static_cast<float>(aggregated_max_row_id))
645  * 100.0f;
647  "Actual Entries: %d\n"
648  "Allocated Inodes: %d\n"
649  " Unused Inodes: %d\n"
650  " Percentage of wasted Inodes: %.1f%%\n",
651  aggregated_entry_count, aggregated_max_row_id, unused_inodes, ratio);
652 
653  // Hardlink statistics
654  const float average_linkcount = (aggregated_hardlink_count > 0)
655  ? aggregated_linkcounts
656  / aggregated_hardlink_count
657  : 0.0f;
659  "Generated Hardlink Groups: %d\n"
660  "Average Linkcount per Group: %.1f\n",
661  aggregated_hardlink_count, average_linkcount);
662 
663  // Performance measures
664  const double average_migration_time = aggregated_migration_time
665  / static_cast<double>(
666  number_of_catalogs);
668  "Catalog Loading Time: %.2fs\n"
669  "Average Migration Time: %.2fs\n"
670  "Overall Migration Time: %.2fs\n"
671  "Aggregated Migration Time: %.2fs\n",
672  catalog_loading_stopwatch_.GetTime(), average_migration_time,
673  migration_stopwatch_.GetTime(), aggregated_migration_time);
674 }
675 
676 
678  delete old_catalog;
679  old_catalog = NULL;
680 
681  if (new_catalog != NULL) {
682  delete new_catalog;
683  new_catalog = NULL;
684  }
685 }
686 
687 
688 template<class DerivedT>
690  const worker_context *context)
691  : temporary_directory_(context->temporary_directory)
692  , collect_catalog_statistics_(context->collect_catalog_statistics) { }
693 
694 
695 template<class DerivedT>
697 }
698 
699 
700 template<class DerivedT>
702  const expected_data &data) {
704  const bool success = static_cast<DerivedT *>(this)->RunMigration(data)
705  && UpdateNestedCatalogReferences(data)
706  && UpdateCatalogMetadata(data)
707  && CollectAndAggregateStatistics(data)
708  && CleanupNestedCatalogs(data);
709  data->success = success;
711 
714 
715  // Note: MigrationCallback() will take care of the result...
716  if (success) {
718  } else {
720  }
721 }
722 
723 
724 template<class DerivedT>
726  DerivedT>::UpdateNestedCatalogReferences(PendingCatalog *data) const {
727  const catalog::Catalog *new_catalog = (data->HasNew()) ? data->new_catalog
728  : data->old_catalog;
729  const catalog::CatalogDatabase &writable = new_catalog->database();
730 
731  catalog::SqlCatalog add_nested_catalog(
732  writable,
733  "INSERT OR REPLACE INTO nested_catalogs (path, sha1, size) "
734  " VALUES (:path, :sha1, :size);");
735 
736  // go through all nested catalogs and update their references (we are
737  // currently in their parent catalog)
738  // Note: we might need to wait for the nested catalog to be fully processed.
739  PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
740  const PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
741  for (; i != iend; ++i) {
742  PendingCatalog *nested_catalog = *i;
743 
744  if (!nested_catalog->was_updated.Get()) {
745  continue;
746  }
747 
748  const std::string &root_path = nested_catalog->root_path();
749  const shash::Any catalog_hash = nested_catalog->new_catalog_hash;
750  const size_t catalog_size = nested_catalog->new_catalog_size;
751 
752  // insert the updated nested catalog reference into the new catalog
753  const bool retval = add_nested_catalog.BindText(1, root_path)
754  && add_nested_catalog.BindText(2,
755  catalog_hash.ToString())
756  && add_nested_catalog.BindInt64(3, catalog_size)
757  && add_nested_catalog.Execute();
758  if (!retval) {
759  Error("Failed to add nested catalog link", add_nested_catalog, data);
760  return false;
761  }
762  add_nested_catalog.Reset();
763  }
764 
765  return true;
766 }
767 
768 
769 template<class DerivedT>
771  PendingCatalog *data) const {
772  if (!data->HasChanges()) {
773  return true;
774  }
775 
776  catalog::WritableCatalog *catalog = (data->HasNew())
777  ? data->new_catalog
778  : GetWritable(data->old_catalog);
779 
780  // Set the previous revision hash in the new catalog to the old catalog
781  // we are doing the whole migration as a new snapshot that does not change
782  // any files, but just applies the necessary data schema migrations
783  catalog->SetPreviousRevision(data->old_catalog->hash());
784  catalog->IncrementRevision();
785  catalog->UpdateLastModified();
786 
787  return true;
788 }
789 
790 
791 template<class DerivedT>
793  DerivedT>::CollectAndAggregateStatistics(PendingCatalog *data) const {
794  if (!collect_catalog_statistics_) {
795  return true;
796  }
797 
798  const catalog::Catalog *new_catalog = (data->HasNew()) ? data->new_catalog
799  : data->old_catalog;
800  const catalog::CatalogDatabase &writable = new_catalog->database();
801  bool retval;
802 
803  // Find out the discrepancy between MAX(rowid) and COUNT(*)
804  catalog::SqlCatalog wasted_inodes(
805  writable, "SELECT COUNT(*), MAX(rowid) FROM catalog;");
806  retval = wasted_inodes.FetchRow();
807  if (!retval) {
808  Error("Failed to count entries in catalog", wasted_inodes, data);
809  return false;
810  }
811  const unsigned int entry_count = wasted_inodes.RetrieveInt64(0);
812  const unsigned int max_row_id = wasted_inodes.RetrieveInt64(1);
813 
814  // Save collected information into the central statistics aggregator
815  data->statistics.root_path = data->root_path();
816  data->statistics.max_row_id = max_row_id;
817  data->statistics.entry_count = entry_count;
818 
819  return true;
820 }
821 
822 
823 template<class DerivedT>
825  PendingCatalog *data) const {
826  // All nested catalogs of PendingCatalog 'data' are fully processed and
827  // accounted. It is safe to get rid of their data structures here!
828  PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
829  const PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
830  for (; i != iend; ++i) {
831  delete *i;
832  }
833 
834  data->nested_catalogs.clear();
835  return true;
836 }
837 
838 
847 
848 
849 template<class DerivedT>
852  const catalog::Catalog *catalog) const {
853  return dynamic_cast<catalog::WritableCatalog *>(
854  const_cast<catalog::Catalog *>(catalog));
855 }
856 
857 
858 //------------------------------------------------------------------------------
859 
860 
862  const worker_context *context)
864  , fix_nested_catalog_transitions_(context->fix_nested_catalog_transitions)
865  , analyze_file_linkcounts_(context->analyze_file_linkcounts)
866  , uid_(context->uid)
867  , gid_(context->gid) { }
868 
869 
871  PendingCatalog *data) const {
872  // double-check that we are generating compatible catalogs to the actual
873  // catalog management classes
876 
877  return CreateNewEmptyCatalog(data) && CheckDatabaseSchemaCompatibility(data)
878  && AttachOldCatalogDatabase(data) && StartDatabaseTransaction(data)
879  && MigrateFileMetadata(data) && MigrateNestedCatalogMountPoints(data)
880  && FixNestedCatalogTransitionPoints(data)
881  && RemoveDanglingNestedMountpoints(data)
882  && GenerateCatalogStatistics(data) && FindRootEntryInformation(data)
883  && CommitDatabaseTransaction(data) && DetachOldCatalogDatabase(data);
884 }
885 
887  PendingCatalog *data) const {
888  const string root_path = data->root_path();
889 
890  // create a new catalog database schema
891  const string clg_db_path = CreateTempPath(temporary_directory_ + "/catalog",
892  0666);
893  if (clg_db_path.empty()) {
894  Error("Failed to create temporary file for the new catalog database.");
895  return false;
896  }
897  const bool volatile_content = false;
898 
899  {
900  // TODO(rmeusel): Attach catalog should work with an open catalog database
901  // as well, to remove this inefficiency
902  const UniquePtr<catalog::CatalogDatabase> new_clg_db(
903  catalog::CatalogDatabase::Create(clg_db_path));
904  if (!new_clg_db.IsValid()
905  || !new_clg_db->InsertInitialValues(root_path, volatile_content, "")) {
906  Error("Failed to create database for new catalog");
907  unlink(clg_db_path.c_str());
908  return false;
909  }
910  }
911 
912  // Attach the just created nested catalog database
914  *writable_catalog = catalog::WritableCatalog::AttachFreely(
915  root_path, clg_db_path, shash::Any(shash::kSha1));
916  if (writable_catalog == NULL) {
917  Error("Failed to open database for new catalog");
918  unlink(clg_db_path.c_str());
919  return false;
920  }
921 
922  data->new_catalog = writable_catalog;
923  return true;
924 }
925 
926 
928  PendingCatalog *data) const {
929  const catalog::CatalogDatabase &old_catalog = data->old_catalog->database();
930  const catalog::CatalogDatabase &new_catalog = data->new_catalog->database();
931 
932  if ((new_catalog.schema_version()
935  || new_catalog.schema_version()
938  || (old_catalog.schema_version()
940  Error("Failed to meet database requirements for migration.", data);
941  return false;
942  }
943  return true;
944 }
945 
946 
948  PendingCatalog *data) const {
949  const catalog::CatalogDatabase &old_catalog = data->old_catalog->database();
950  const catalog::CatalogDatabase &new_catalog = data->new_catalog->database();
951 
952  catalog::SqlCatalog sql_attach_new(
953  new_catalog, "ATTACH '" + old_catalog.filename() + "' AS old;");
954  const bool retval = sql_attach_new.Execute();
955 
956  // remove the hardlink to the old database file (temporary file), it will not
957  // be needed anymore... data will get deleted when the database is closed
958  unlink(data->old_catalog->database().filename().c_str());
959 
960  if (!retval) {
961  Error("Failed to attach database of old catalog", sql_attach_new, data);
962  return false;
963  }
964  return true;
965 }
966 
967 
969  PendingCatalog *data) const {
970  assert(data->HasNew());
971  data->new_catalog->Transaction();
972  return true;
973 }
974 
975 
977  PendingCatalog *data) const {
978  assert(!data->new_catalog->IsDirty());
979  assert(data->HasNew());
980  bool retval;
981  const catalog::CatalogDatabase &writable = data->new_catalog->database();
982 
983  // Hardlinks scratch space.
984  // This temporary table is used for the hardlink analysis results.
985  // The old catalog format did not have a direct notion of hardlinks and their
986  // linkcounts, but this information can be partly retrieved from the under-
987  // lying file system semantics.
988  //
989  // Hardlinks:
990  // groupid : this group id can be used for the new catalog schema
991  // inode : the inodes that were part of a hardlink group before
992  // linkcount : the linkcount for hardlink group id members
993  catalog::SqlCatalog sql_create_hardlinks_table(
994  writable,
995  "CREATE TEMPORARY TABLE hardlinks "
996  " ( hardlink_group_id INTEGER PRIMARY KEY AUTOINCREMENT, "
997  " inode INTEGER, "
998  " linkcount INTEGER, "
999  " CONSTRAINT unique_inode UNIQUE (inode) );");
1000  retval = sql_create_hardlinks_table.Execute();
1001  if (!retval) {
1002  Error("Failed to create temporary hardlink analysis table",
1003  sql_create_hardlinks_table, data);
1004  return false;
1005  }
1006 
1007  // Directory Linkcount scratch space.
1008  // Directory linkcounts can be obtained from the directory hierarchy reflected
1009  // in the old style catalogs. The new catalog schema asks for this specific
1010  // linkcount. Directory linkcount analysis results will be put into this
1011  // temporary table
1012  catalog::SqlCatalog sql_create_linkcounts_table(
1013  writable,
1014  "CREATE TEMPORARY TABLE dir_linkcounts "
1015  " ( inode INTEGER PRIMARY KEY, "
1016  " linkcount INTEGER );");
1017  retval = sql_create_linkcounts_table.Execute();
1018  if (!retval) {
1019  Error("Failed to create tmeporary directory linkcount analysis table",
1020  sql_create_linkcounts_table, data);
1021  }
1022 
1023  // It is possible to skip this step.
1024  // In that case all hardlink inodes with a (potential) linkcount > 1 will get
1025  // degraded to files containing the same content
1026  if (analyze_file_linkcounts_) {
1027  retval = AnalyzeFileLinkcounts(data);
1028  if (!retval) {
1029  return false;
1030  }
1031  }
1032 
1033  // Analyze the linkcounts of directories
1034  // - each directory has a linkcount of at least 2 (empty directory)
1035  // (link in parent directory and self reference (cd .) )
1036  // - for each child directory, the parent's link count is incremented by 1
1037  // (parent reference in child (cd ..) )
1038  //
1039  // Note: nested catalog mountpoints will be miscalculated here, since we can't
1040  // check the number of containing directories. They are defined in a the
1041  // linked nested catalog and need to be added later on.
1042  // (see: MigrateNestedCatalogMountPoints() for details)
1043  catalog::SqlCatalog sql_dir_linkcounts(
1044  writable,
1045  "INSERT INTO dir_linkcounts "
1046  " SELECT c1.inode as inode, "
1047  " SUM(IFNULL(MIN(c2.inode,1),0)) + 2 as linkcount "
1048  " FROM old.catalog as c1 "
1049  " LEFT JOIN old.catalog as c2 "
1050  " ON c2.parent_1 = c1.md5path_1 AND "
1051  " c2.parent_2 = c1.md5path_2 AND "
1052  " c2.flags & :flag_dir_1 "
1053  " WHERE c1.flags & :flag_dir_2 "
1054  " GROUP BY c1.inode;");
1055  retval = sql_dir_linkcounts.BindInt64(1, catalog::SqlDirent::kFlagDir)
1056  && sql_dir_linkcounts.BindInt64(2, catalog::SqlDirent::kFlagDir)
1057  && sql_dir_linkcounts.Execute();
1058  if (!retval) {
1059  Error("Failed to analyze directory specific linkcounts", sql_dir_linkcounts,
1060  data);
1061  if (sql_dir_linkcounts.GetLastError() == SQLITE_CONSTRAINT) {
1062  Error("Obviously your catalogs are corrupted, since we found a directory"
1063  "inode that is a file inode at the same time!");
1064  }
1065  return false;
1066  }
1067 
1068  // Copy the old file meta information into the new catalog schema
1069  // here we also add the previously analyzed hardlink/linkcount information
1070  // from both temporary tables "hardlinks" and "dir_linkcounts".
1071  //
1072  // Note: nested catalog mountpoints still need to be treated separately
1073  // (see MigrateNestedCatalogMountPoints() for details)
1074  catalog::SqlCatalog migrate_file_meta_data(
1075  writable,
1076  "INSERT INTO catalog "
1077  " SELECT md5path_1, md5path_2, "
1078  " parent_1, parent_2, "
1079  " IFNULL(hardlink_group_id, 0) << 32 | "
1080  " COALESCE(hardlinks.linkcount, dir_linkcounts.linkcount, 1) "
1081  " AS hardlinks, "
1082  " hash, size, mode, mtime, NULL, " // set empty mtimens
1083  " flags, name, symlink, "
1084  " :uid, "
1085  " :gid, "
1086  " NULL " // set empty xattr BLOB (default)
1087  " FROM old.catalog "
1088  " LEFT JOIN hardlinks "
1089  " ON catalog.inode = hardlinks.inode "
1090  " LEFT JOIN dir_linkcounts "
1091  " ON catalog.inode = dir_linkcounts.inode;");
1092  retval = migrate_file_meta_data.BindInt64(1, uid_)
1093  && migrate_file_meta_data.BindInt64(2, gid_)
1094  && migrate_file_meta_data.Execute();
1095  if (!retval) {
1096  Error("Failed to migrate the file system meta data", migrate_file_meta_data,
1097  data);
1098  return false;
1099  }
1100 
1101  // If we deal with a nested catalog, we need to add a .cvmfscatalog entry
1102  // since it was not present in the old repository specification but is needed
1103  // now!
1104  if (!data->IsRoot()) {
1107  catalog::SqlDirentInsert insert_nested_marker(writable);
1108  const std::string root_path = data->root_path();
1109  const std::string file_path = root_path + "/"
1110  + nested_marker.name().ToString();
1111  const shash::Md5 &path_hash = shash::Md5(file_path.data(),
1112  file_path.size());
1113  const shash::Md5 &parent_hash = shash::Md5(root_path.data(),
1114  root_path.size());
1115  retval = insert_nested_marker.BindPathHash(path_hash)
1116  && insert_nested_marker.BindParentPathHash(parent_hash)
1117  && insert_nested_marker.BindDirent(nested_marker)
1118  && insert_nested_marker.BindXattrEmpty()
1119  && insert_nested_marker.Execute();
1120  if (!retval) {
1121  Error("Failed to insert nested catalog marker into new nested catalog.",
1122  insert_nested_marker, data);
1123  return false;
1124  }
1125  }
1126 
1127  // Copy (and update) the properties fields
1128  //
1129  // Note: The 'schema' is explicitly not copied to the new catalog.
1130  // Each catalog contains a revision, which is also copied here and that
1131  // is later updated by calling catalog->IncrementRevision()
1132  catalog::SqlCatalog copy_properties(writable,
1133  "INSERT OR REPLACE INTO properties "
1134  " SELECT key, value "
1135  " FROM old.properties "
1136  " WHERE key != 'schema';");
1137  retval = copy_properties.Execute();
1138  if (!retval) {
1139  Error("Failed to migrate the properties table.", copy_properties, data);
1140  return false;
1141  }
1142 
1143  return true;
1144 }
1145 
1146 
1148  PendingCatalog *data) const {
1149  assert(data->HasNew());
1150  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1151  bool retval;
1152 
1153  // Analyze the hardlink relationships in the old catalog
1154  // inodes used to be assigned at publishing time, implicitly constituating
1155  // those relationships. We now need them explicitly in the file catalogs
1156  // This looks for directory entries with matching inodes but differing path-
1157  // hashes and saves the results in a temporary table called 'hl_scratch'
1158  //
1159  // Note: We only support hardlink groups that reside in the same directory!
1160  // Therefore we first need to figure out hardlink candidates (which
1161  // might still contain hardlink groups spanning more than one directory)
1162  // In a second step these candidates will be analyzed to kick out un-
1163  // supported hardlink groups.
1164  // Unsupported hardlink groups will be be treated as normal files with
1165  // the same content
1166  catalog::SqlCatalog sql_create_hardlinks_scratch_table(
1167  writable,
1168  "CREATE TEMPORARY TABLE hl_scratch AS "
1169  " SELECT c1.inode AS inode, c1.md5path_1, c1.md5path_2, "
1170  " c1.parent_1 as c1p1, c1.parent_2 as c1p2, "
1171  " c2.parent_1 as c2p1, c2.parent_2 as c2p2 "
1172  " FROM old.catalog AS c1 "
1173  " INNER JOIN old.catalog AS c2 "
1174  " ON c1.inode == c2.inode AND "
1175  " (c1.md5path_1 != c2.md5path_1 OR "
1176  " c1.md5path_2 != c2.md5path_2);");
1177  retval = sql_create_hardlinks_scratch_table.Execute();
1178  if (!retval) {
1179  Error("Failed to create temporary scratch table for hardlink analysis",
1180  sql_create_hardlinks_scratch_table, data);
1181  return false;
1182  }
1183 
1184  // Figures out which hardlink candidates are supported by CVMFS and can be
1185  // transferred into the new catalog as so called hardlink groups. Unsupported
1186  // hardlinks need to be discarded and treated as normal files containing the
1187  // exact same data
1188  catalog::SqlCatalog fill_linkcount_table_for_files(
1189  writable,
1190  "INSERT INTO hardlinks (inode, linkcount)"
1191  " SELECT inode, count(*) as linkcount "
1192  " FROM ( "
1193  // recombine supported hardlink inodes with their actual manifested
1194  // hard-links in the catalog.
1195  // Note: for each directory entry pointing to the same supported
1196  // hardlink inode we have a distinct MD5 path hash
1197  " SELECT DISTINCT hl.inode, hl.md5path_1, hl.md5path_2 "
1198  " FROM ( "
1199  // sort out supported hardlink inodes from unsupported ones by
1200  // locality
1201  // Note: see the next comment for the nested SELECT
1202  " SELECT inode "
1203  " FROM ( "
1204  " SELECT inode, count(*) AS cnt "
1205  " FROM ( "
1206  // go through the potential hardlinks and collect location infor-
1207  // mation about them.
1208  // Note: we only support hardlinks that all reside in the same
1209  // directory, thus having the same parent (c1p* == c2p*)
1210  // --> For supported hardlink candidates the SELECT DISTINCT
1211  // will produce only a single row, whereas others produce more
1212  " SELECT DISTINCT inode,c1p1,c1p1,c2p1,c2p2 "
1213  " FROM hl_scratch AS hl "
1214  " ) "
1215  " GROUP BY inode "
1216  " ) "
1217  " WHERE cnt = 1 "
1218  " ) AS supported_hardlinks "
1219  " LEFT JOIN hl_scratch AS hl "
1220  " ON supported_hardlinks.inode = hl.inode "
1221  " ) "
1222  " GROUP BY inode;");
1223  retval = fill_linkcount_table_for_files.Execute();
1224  if (!retval) {
1225  Error("Failed to analyze hardlink relationships for files.",
1226  fill_linkcount_table_for_files, data);
1227  return false;
1228  }
1229 
1230  // The file linkcount and hardlink analysis is finished and the scratch table
1231  // can be deleted...
1232  catalog::SqlCatalog drop_hardlink_scratch_space(writable,
1233  "DROP TABLE hl_scratch;");
1234  retval = drop_hardlink_scratch_space.Execute();
1235  if (!retval) {
1236  Error("Failed to remove file linkcount analysis scratch table",
1237  drop_hardlink_scratch_space, data);
1238  return false;
1239  }
1240 
1241  // Do some statistics if asked for...
1242  if (collect_catalog_statistics_) {
1243  catalog::SqlCatalog count_hardlinks(
1244  writable, "SELECT count(*), sum(linkcount) FROM hardlinks;");
1245  retval = count_hardlinks.FetchRow();
1246  if (!retval) {
1247  Error("Failed to count the generated file hardlinks for statistics",
1248  count_hardlinks, data);
1249  return false;
1250  }
1251 
1252  data->statistics.hardlink_group_count += count_hardlinks.RetrieveInt64(0);
1253  data->statistics.aggregated_linkcounts += count_hardlinks.RetrieveInt64(1);
1254  }
1255 
1256  return true;
1257 }
1258 
1259 
1261  PendingCatalog *data) const {
1262  assert(data->HasNew());
1263  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1264  bool retval;
1265 
1266  // preparing the SQL statement for nested catalog mountpoint update
1267  catalog::SqlCatalog update_mntpnt_linkcount(
1268  writable,
1269  "UPDATE catalog "
1270  "SET hardlinks = :linkcount "
1271  "WHERE md5path_1 = :md5_1 AND md5path_2 = :md5_2;");
1272 
1273  // update all nested catalog mountpoints
1274  // (Note: we might need to wait for the nested catalog to be processed)
1275  PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
1276  const PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
1277  for (; i != iend; ++i) {
1278  // collect information about the nested catalog
1279  PendingCatalog *nested_catalog = *i;
1280  const catalog::DirectoryEntry root_entry = nested_catalog->root_entry.Get();
1281  const string &root_path = nested_catalog->root_path();
1282 
1283  // update the nested catalog mountpoint directory entry with the correct
1284  // linkcount that was determined while processing the nested catalog
1285  const shash::Md5 mountpoint_hash = shash::Md5(root_path.data(),
1286  root_path.size());
1287  retval = update_mntpnt_linkcount.BindInt64(1, root_entry.linkcount())
1288  && update_mntpnt_linkcount.BindMd5(2, 3, mountpoint_hash)
1289  && update_mntpnt_linkcount.Execute();
1290  if (!retval) {
1291  Error("Failed to update linkcount of nested catalog mountpoint",
1292  update_mntpnt_linkcount, data);
1293  return false;
1294  }
1295  update_mntpnt_linkcount.Reset();
1296  }
1297 
1298  return true;
1299 }
1300 
1301 
1303  PendingCatalog *data) const {
1304  assert(data->HasNew());
1305  if (!fix_nested_catalog_transitions_) {
1306  // Fixing transition point mismatches is not enabled...
1307  return true;
1308  }
1309 
1310  typedef catalog::DirectoryEntry::Difference Difference;
1311 
1312  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1313  bool retval;
1314 
1315  catalog::SqlLookupPathHash lookup_mountpoint(writable);
1316  catalog::SqlDirentUpdate update_directory_entry(writable);
1317 
1318  // Unbox the nested catalogs (possibly waiting for migration of them first)
1319  PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
1320  const PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
1321  for (; i != iend; ++i) {
1322  // Collect information about the nested catalog
1323  PendingCatalog *nested_catalog = *i;
1324  const catalog::DirectoryEntry nested_root_entry = nested_catalog->root_entry
1325  .Get();
1326  const string &nested_root_path = nested_catalog->root_path();
1327  const shash::Md5 mountpoint_path_hash = shash::Md5(nested_root_path.data(),
1328  nested_root_path.size());
1329 
1330  // Retrieve the nested catalog mountpoint from the current catalog
1331  retval = lookup_mountpoint.BindPathHash(mountpoint_path_hash)
1332  && lookup_mountpoint.FetchRow();
1333  if (!retval) {
1334  Error("Failed to fetch nested catalog mountpoint to check for compatible"
1335  "transition points",
1336  lookup_mountpoint, data);
1337  return false;
1338  }
1339 
1340  catalog::DirectoryEntry mountpoint_entry = lookup_mountpoint.GetDirent(
1341  data->new_catalog);
1342  lookup_mountpoint.Reset();
1343 
1344  // Compare nested catalog mountpoint and nested catalog root entries
1346  mountpoint_entry.CompareTo(nested_root_entry);
1347 
1348  // We MUST deal with two directory entries that are a pair of nested cata-
1349  // log mountpoint and root entry! Thus we expect their transition flags to
1350  // differ and their name to be the same.
1351  assert(diffs & Difference::kNestedCatalogTransitionFlags);
1352  assert((diffs & Difference::kName) == 0);
1353 
1354  // Check if there are other differences except the nested catalog transition
1355  // flags and fix them...
1356  if ((diffs ^ Difference::kNestedCatalogTransitionFlags) != 0) {
1357  // If we found differences, we still assume a couple of directory entry
1358  // fields to be the same, otherwise some severe stuff would be wrong...
1359  if ((diffs & Difference::kChecksum) || (diffs & Difference::kLinkcount)
1360  || (diffs & Difference::kSymlink)
1361  || (diffs & Difference::kChunkedFileFlag)) {
1362  Error("Found an irreparable mismatch in a nested catalog transition "
1363  "point at '"
1364  + nested_root_path + "'\nAborting...\n");
1365  }
1366 
1367  // Copy the properties from the nested catalog root entry into the mount-
1368  // point entry to bring them in sync again
1370  &mountpoint_entry);
1371 
1372  // save the nested catalog mountpoint entry into the catalog
1373  retval = update_directory_entry.BindPathHash(mountpoint_path_hash)
1374  && update_directory_entry.BindDirent(mountpoint_entry)
1375  && update_directory_entry.Execute();
1376  if (!retval) {
1377  Error("Failed to save resynchronized nested catalog mountpoint into "
1378  "catalog database",
1379  update_directory_entry, data);
1380  return false;
1381  }
1382  update_directory_entry.Reset();
1383 
1384  // Fixing of this mountpoint went well... inform the user that this minor
1385  // issue occurred
1387  "NOTE: fixed incompatible nested catalog transition point at: "
1388  "'%s' ",
1389  nested_root_path.c_str());
1390  }
1391  }
1392 
1393  return true;
1394 }
1395 
1396 
1398  const catalog::DirectoryEntry &nested_root,
1399  catalog::DirectoryEntry *mountpoint) {
1400  // Replace some file system parameters in the mountpoint to resync it with
1401  // the nested root of the corresponding nested catalog
1402  //
1403  // Note: this method relies on CommandMigrate being a friend of DirectoryEntry
1404  mountpoint->mode_ = nested_root.mode_;
1405  mountpoint->uid_ = nested_root.uid_;
1406  mountpoint->gid_ = nested_root.gid_;
1407  mountpoint->size_ = nested_root.size_;
1408  mountpoint->mtime_ = nested_root.mtime_;
1409 }
1410 
1411 
1413  PendingCatalog *data) const {
1414  assert(data->HasNew());
1415  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1416  bool retval = false;
1417 
1418  // build a set of registered nested catalog path hashes
1419  typedef catalog::Catalog::NestedCatalogList NestedCatalogList;
1420  typedef std::map<shash::Md5, catalog::Catalog::NestedCatalog>
1421  NestedCatalogMap;
1422  const NestedCatalogList &nested_clgs = data->old_catalog
1423  ->ListNestedCatalogs();
1424  NestedCatalogList::const_iterator i = nested_clgs.begin();
1425  const NestedCatalogList::const_iterator iend = nested_clgs.end();
1426  NestedCatalogMap nested_catalog_path_hashes;
1427  for (; i != iend; ++i) {
1428  const PathString &path = i->mountpoint;
1429  const shash::Md5 hash(path.GetChars(), path.GetLength());
1430  nested_catalog_path_hashes[hash] = *i;
1431  }
1432 
1433  // Retrieve nested catalog mountpoints that have child entries directly inside
1434  // the current catalog (which is a malformed state)
1435  catalog::SqlLookupDanglingMountpoints sql_dangling_mountpoints(writable);
1436  catalog::SqlDirentUpdate save_updated_mountpoint(writable);
1437 
1438  std::vector<catalog::DirectoryEntry> todo_dirent;
1439  std::vector<shash::Md5> todo_hash;
1440 
1441  // go through the list of dangling nested catalog mountpoints and fix them
1442  // where needed (check if there is no nested catalog registered for them)
1443  while (sql_dangling_mountpoints.FetchRow()) {
1444  catalog::DirectoryEntry dangling_mountpoint = sql_dangling_mountpoints
1445  .GetDirent(
1446  data->new_catalog);
1447  const shash::Md5 path_hash = sql_dangling_mountpoints.GetPathHash();
1448  assert(dangling_mountpoint.IsNestedCatalogMountpoint());
1449 
1450  // check if the nested catalog mountpoint is registered in the nested cata-
1451  // log list of the currently migrated catalog
1452  const NestedCatalogMap::const_iterator
1453  nested_catalog = nested_catalog_path_hashes.find(path_hash);
1454  if (nested_catalog != nested_catalog_path_hashes.end()) {
1456  "WARNING: found a non-empty nested catalog mountpoint under "
1457  "'%s'",
1458  nested_catalog->second.mountpoint.c_str());
1459  continue;
1460  }
1461 
1462  // the mountpoint was confirmed to be dangling and needs to be removed
1463  dangling_mountpoint.set_is_nested_catalog_mountpoint(false);
1464  todo_dirent.push_back(dangling_mountpoint);
1465  todo_hash.push_back(path_hash);
1466  }
1467 
1468  for (unsigned i = 0; i < todo_dirent.size(); ++i) {
1469  retval = save_updated_mountpoint.BindPathHash(todo_hash[i])
1470  && save_updated_mountpoint.BindDirent(todo_dirent[i])
1471  && save_updated_mountpoint.Execute()
1472  && save_updated_mountpoint.Reset();
1473  if (!retval) {
1474  Error("Failed to remove dangling nested catalog mountpoint entry in "
1475  "catalog",
1476  save_updated_mountpoint, data);
1477  return false;
1478  }
1479 
1480  // tell the user that this intervention has been taken place
1482  "NOTE: fixed dangling nested catalog "
1483  "mountpoint entry called: '%s' ",
1484  todo_dirent[i].name().c_str());
1485  }
1486 
1487  return true;
1488 }
1489 
1490 
1492  // This is pre-initialized singleton... it MUST be already there...
1493  assert(nested_catalog_marker_.name_.ToString() == ".cvmfscatalog");
1494  return nested_catalog_marker_;
1495 }
1496 
1498  // Create an empty nested catalog marker file
1500  temporary_directory_ + "/.cvmfscatalog", 0644);
1501  if (nested_catalog_marker_tmp_path_.empty()) {
1502  Error("Failed to create temp file for nested catalog marker dummy.");
1503  return false;
1504  }
1505 
1506  // Process and upload it to the backend storage
1509  spooler_->Process(source);
1510  return true;
1511 }
1512 
1514  const shash::Any &content_hash) {
1515  // Generate it only once
1516  assert(nested_catalog_marker_.name_.ToString() != ".cvmfscatalog");
1517 
1518  // Fill the DirectoryEntry structure will all needed information
1519  nested_catalog_marker_.name_.Assign(".cvmfscatalog", strlen(".cvmfscatalog"));
1520  nested_catalog_marker_.mode_ = 33188;
1524  nested_catalog_marker_.mtime_ = time(NULL);
1526  nested_catalog_marker_.checksum_ = content_hash;
1527 }
1528 
1529 
1531  PendingCatalog *data) const {
1532  assert(data->HasNew());
1533  bool retval = false;
1534  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1535 
1536  // Aggregated the statistics counters of all nested catalogs
1537  // Note: we might need to wait until nested catalogs are successfully
1538  // processed
1539  catalog::DeltaCounters stats_counters;
1540  PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
1541  const PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
1542  for (; i != iend; ++i) {
1543  const PendingCatalog *nested_catalog = *i;
1544  const catalog::DeltaCounters &s = nested_catalog->nested_statistics.Get();
1545  s.PopulateToParent(&stats_counters);
1546  }
1547 
1548  // Count various directory entry types in the catalog to fill up the catalog
1549  // statistics counters introduced in the current catalog schema
1550  catalog::SqlCatalog count_regular_files(
1551  writable,
1552  "SELECT count(*) FROM catalog "
1553  " WHERE flags & :flag_file "
1554  " AND NOT flags & :flag_link;");
1555  catalog::SqlCatalog count_symlinks(
1556  writable, "SELECT count(*) FROM catalog WHERE flags & :flag_link;");
1557  catalog::SqlCatalog count_directories(
1558  writable, "SELECT count(*) FROM catalog WHERE flags & :flag_dir;");
1559  catalog::SqlCatalog aggregate_file_size(
1560  writable,
1561  "SELECT sum(size) FROM catalog WHERE flags & :flag_file "
1562  " AND NOT flags & :flag_link");
1563 
1564  // Run the actual counting queries
1565  retval = count_regular_files.BindInt64(1, catalog::SqlDirent::kFlagFile)
1566  && count_regular_files.BindInt64(2, catalog::SqlDirent::kFlagLink)
1567  && count_regular_files.FetchRow();
1568  if (!retval) {
1569  Error("Failed to count regular files.", count_regular_files, data);
1570  return false;
1571  }
1572  retval = count_symlinks.BindInt64(1, catalog::SqlDirent::kFlagLink)
1573  && count_symlinks.FetchRow();
1574  if (!retval) {
1575  Error("Failed to count symlinks.", count_symlinks, data);
1576  return false;
1577  }
1578  retval = count_directories.BindInt64(1, catalog::SqlDirent::kFlagDir)
1579  && count_directories.FetchRow();
1580  if (!retval) {
1581  Error("Failed to count directories.", count_directories, data);
1582  return false;
1583  }
1584  retval = aggregate_file_size.BindInt64(1, catalog::SqlDirent::kFlagFile)
1585  && aggregate_file_size.BindInt64(2, catalog::SqlDirent::kFlagLink)
1586  && aggregate_file_size.FetchRow();
1587  if (!retval) {
1588  Error("Failed to aggregate the file sizes.", aggregate_file_size, data);
1589  return false;
1590  }
1591 
1592  // Insert the counted statistics into the DeltaCounters data structure
1593  stats_counters.self.regular_files = count_regular_files.RetrieveInt64(0);
1594  stats_counters.self.symlinks = count_symlinks.RetrieveInt64(0);
1595  stats_counters.self.directories = count_directories.RetrieveInt64(0);
1596  stats_counters.self.nested_catalogs = data->nested_catalogs.size();
1597  stats_counters.self.file_size = aggregate_file_size.RetrieveInt64(0);
1598 
1599  // Write back the generated statistics counters into the catalog database
1600  stats_counters.WriteToDatabase(writable);
1601 
1602  // Push the generated statistics counters up to the parent catalog
1603  data->nested_statistics.Set(stats_counters);
1604 
1605  return true;
1606 }
1607 
1608 
1610  PendingCatalog *data) const {
1611  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1612  bool retval;
1613 
1614  std::string root_path = data->root_path();
1615  const shash::Md5 root_path_hash =
1616  shash::Md5(root_path.data(), root_path.size());
1617 
1618  catalog::SqlLookupPathHash lookup_root_entry(writable);
1619  retval = lookup_root_entry.BindPathHash(root_path_hash)
1620  && lookup_root_entry.FetchRow();
1621  if (!retval) {
1622  Error("Failed to retrieve root directory entry of migrated catalog",
1623  lookup_root_entry, data);
1624  return false;
1625  }
1626 
1627  const catalog::DirectoryEntry entry =
1628  lookup_root_entry.GetDirent(data->new_catalog);
1629  if (entry.linkcount() < 2 || entry.hardlink_group() > 0) {
1630  Error("Retrieved linkcount of catalog root entry is not sane.", data);
1631  return false;
1632  }
1633 
1634  data->root_entry.Set(entry);
1635  return true;
1636 }
1637 
1638 
1640  PendingCatalog *data) const {
1641  assert(data->HasNew());
1642  data->new_catalog->Commit();
1643  return true;
1644 }
1645 
1646 
1648  PendingCatalog *data) const {
1649  assert(data->HasNew());
1650  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1651  catalog::SqlCatalog detach_old_catalog(writable, "DETACH old;");
1652  const bool retval = detach_old_catalog.Execute();
1653  if (!retval) {
1654  Error("Failed to detach old catalog database.", detach_old_catalog, data);
1655  return false;
1656  }
1657  return true;
1658 }
1659 
1660 
1661 //------------------------------------------------------------------------------
1662 
1663 
1665  const worker_context *context)
1667 
1668 
1670  PendingCatalog *data) const {
1671  return CheckDatabaseSchemaCompatibility(data)
1672  && StartDatabaseTransaction(data)
1673  && GenerateNewStatisticsCounters(data) && UpdateCatalogSchema(data)
1674  && CommitDatabaseTransaction(data);
1675 }
1676 
1677 
1679  PendingCatalog *data) const {
1680  assert(!data->HasNew());
1681  const catalog::CatalogDatabase &old_catalog = data->old_catalog->database();
1682 
1683  if ((old_catalog.schema_version()
1685  || (old_catalog.schema_version()
1687  Error("Given Catalog is not Schema 2.4.", data);
1688  return false;
1689  }
1690 
1691  return true;
1692 }
1693 
1694 
1696  PendingCatalog *data) const {
1697  assert(!data->HasNew());
1698  GetWritable(data->old_catalog)->Transaction();
1699  return true;
1700 }
1701 
1702 
1704  PendingCatalog *data) const {
1705  assert(!data->HasNew());
1706  bool retval = false;
1707  const catalog::CatalogDatabase &writable = GetWritable(data->old_catalog)
1708  ->database();
1709 
1710  // Aggregated the statistics counters of all nested catalogs
1711  // Note: we might need to wait until nested catalogs are successfully
1712  // processed
1713  catalog::DeltaCounters stats_counters;
1714  PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
1715  const PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
1716  for (; i != iend; ++i) {
1717  const PendingCatalog *nested_catalog = *i;
1718  const catalog::DeltaCounters &s = nested_catalog->nested_statistics.Get();
1719  s.PopulateToParent(&stats_counters);
1720  }
1721 
1722  // Count various directory entry types in the catalog to fill up the catalog
1723  // statistics counters introduced in the current catalog schema
1724  catalog::SqlCatalog count_chunked_files(
1725  writable,
1726  "SELECT count(*), sum(size) FROM catalog "
1727  " WHERE flags & :flag_chunked_file;");
1728  catalog::SqlCatalog count_file_chunks(writable,
1729  "SELECT count(*) FROM chunks;");
1730  catalog::SqlCatalog aggregate_file_size(
1731  writable,
1732  "SELECT sum(size) FROM catalog WHERE flags & :flag_file "
1733  " AND NOT flags & :flag_link;");
1734 
1735  // Run the actual counting queries
1736  retval = count_chunked_files.BindInt64(1, catalog::SqlDirent::kFlagFileChunk)
1737  && count_chunked_files.FetchRow();
1738  if (!retval) {
1739  Error("Failed to count chunked files.", count_chunked_files, data);
1740  return false;
1741  }
1742  retval = count_file_chunks.FetchRow();
1743  if (!retval) {
1744  Error("Failed to count file chunks", count_file_chunks, data);
1745  return false;
1746  }
1747  retval = aggregate_file_size.BindInt64(1, catalog::SqlDirent::kFlagFile)
1748  && aggregate_file_size.BindInt64(2, catalog::SqlDirent::kFlagLink)
1749  && aggregate_file_size.FetchRow();
1750  if (!retval) {
1751  Error("Failed to aggregate the file sizes.", aggregate_file_size, data);
1752  return false;
1753  }
1754 
1755  // Insert the counted statistics into the DeltaCounters data structure
1756  stats_counters.self.chunked_files = count_chunked_files.RetrieveInt64(0);
1757  stats_counters.self.chunked_file_size = count_chunked_files.RetrieveInt64(1);
1758  stats_counters.self.file_chunks = count_file_chunks.RetrieveInt64(0);
1759  stats_counters.self.file_size = aggregate_file_size.RetrieveInt64(0);
1760 
1761  // Write back the generated statistics counters into the catalog database
1762  catalog::Counters counters;
1763  retval = counters.ReadFromDatabase(writable, catalog::LegacyMode::kLegacy);
1764  if (!retval) {
1765  Error("Failed to read old catalog statistics counters", data);
1766  return false;
1767  }
1768  counters.ApplyDelta(stats_counters);
1769  retval = counters.InsertIntoDatabase(writable);
1770  if (!retval) {
1771  Error("Failed to write new statistics counters to database", data);
1772  return false;
1773  }
1774 
1775  // Push the generated statistics counters up to the parent catalog
1776  data->nested_statistics.Set(stats_counters);
1777 
1778  return true;
1779 }
1780 
1781 
1783  PendingCatalog *data) const {
1784  assert(!data->HasNew());
1785  const catalog::CatalogDatabase &writable = GetWritable(data->old_catalog)
1786  ->database();
1787  catalog::SqlCatalog update_schema_version(
1788  writable,
1789  "UPDATE properties SET value = :schema_version WHERE key = 'schema';");
1790 
1791  const bool retval = update_schema_version.BindDouble(1, 2.5)
1792  && update_schema_version.Execute();
1793  if (!retval) {
1794  Error(
1795  "Failed to update catalog schema version", update_schema_version, data);
1796  return false;
1797  }
1798 
1799  return true;
1800 }
1801 
1802 
1804  PendingCatalog *data) const {
1805  assert(!data->HasNew());
1806  GetWritable(data->old_catalog)->Commit();
1807  return true;
1808 }
1809 
1810 
1811 //------------------------------------------------------------------------------
1812 
1813 
1815  const worker_context *context)
1817  , uid_map_statement_(GenerateMappingStatement(context->uid_map, "uid"))
1818  , gid_map_statement_(GenerateMappingStatement(context->gid_map, "gid")) { }
1819 
1821  PendingCatalog *data) const {
1822  return ApplyPersonaMappings(data);
1823 }
1824 
1825 
1827  PendingCatalog *data) const {
1828  assert(data->old_catalog != NULL);
1829  assert(data->new_catalog == NULL);
1830 
1831  if (data->old_catalog->mountpoint()
1833  // skipping virtual catalog
1834  return true;
1835  }
1836 
1837  const catalog::CatalogDatabase &db = GetWritable(data->old_catalog)
1838  ->database();
1839 
1840  if (!db.BeginTransaction()) {
1841  return false;
1842  }
1843 
1844  catalog::SqlCatalog uid_sql(db, uid_map_statement_);
1845  if (!uid_sql.Execute()) {
1846  Error("Failed to update UIDs", uid_sql, data);
1847  return false;
1848  }
1849 
1850  catalog::SqlCatalog gid_sql(db, gid_map_statement_);
1851  if (!gid_sql.Execute()) {
1852  Error("Failed to update GIDs", gid_sql, data);
1853  return false;
1854  }
1855 
1856  return db.CommitTransaction();
1857 }
1858 
1859 
1860 template<class MapT>
1862  const MapT &map, const std::string &column) const {
1863  assert(map.RuleCount() > 0 || map.HasDefault());
1864 
1865  std::string stmt = "UPDATE OR ABORT catalog SET " + column + " = ";
1866 
1867  if (map.RuleCount() == 0) {
1868  // map everything to the same value (just a simple UPDATE clause)
1869  stmt += StringifyInt(map.GetDefault());
1870  } else {
1871  // apply multiple ID mappings (UPDATE clause with CASE statement)
1872  stmt += "CASE " + column + " ";
1873  typedef typename MapT::map_type::const_iterator map_iterator;
1874  map_iterator i = map.GetRuleMap().begin();
1875  const map_iterator iend = map.GetRuleMap().end();
1876  for (; i != iend; ++i) {
1877  stmt += "WHEN " + StringifyInt(i->first) + " THEN "
1878  + StringifyInt(i->second) + " ";
1879  }
1880 
1881  // add a default (if provided) or leave unchanged if no mapping fits
1882  stmt += (map.HasDefault()) ? "ELSE " + StringifyInt(map.GetDefault()) + " "
1883  : "ELSE " + column + " ";
1884  stmt += "END";
1885  }
1886 
1887  stmt += ";";
1888  return stmt;
1889 }
1890 
1891 
1892 //------------------------------------------------------------------------------
1893 
1894 
1896  PendingCatalog *data) const {
1897  return CheckDatabaseSchemaCompatibility(data) && BreakUpHardlinks(data);
1898 }
1899 
1900 
1903  assert(data->old_catalog != NULL);
1904  assert(data->new_catalog == NULL);
1905 
1906  const catalog::CatalogDatabase &clg = data->old_catalog->database();
1908 }
1909 
1910 
1912  PendingCatalog *data) const {
1913  assert(data->old_catalog != NULL);
1914  assert(data->new_catalog == NULL);
1915 
1916  const catalog::CatalogDatabase &db = GetWritable(data->old_catalog)
1917  ->database();
1918 
1919  if (!db.BeginTransaction()) {
1920  return false;
1921  }
1922 
1923  // CernVM-FS catalogs do not contain inodes directly but they are assigned by
1924  // the CVMFS catalog at runtime. Hardlinks are treated with so-called hardlink
1925  // group IDs to indicate hardlink relationships that need to be respected at
1926  // runtime by assigning identical inodes accordingly.
1927  //
1928  // This updates all directory entries of a given catalog that have a linkcount
1929  // greater than 1 and are flagged as a 'file'. Note: Symlinks are flagged both
1930  // as 'file' and as 'symlink', hence they are updated implicitly as well.
1931  //
1932  // The 'hardlinks' field in the catalog contains two 32 bit integers:
1933  // * the linkcount in the lower 32 bits
1934  // * the (so called) hardlink group ID in the higher 32 bits
1935  //
1936  // Files that have a linkcount of exactly 1 do not have any hardlinks and have
1937  // the (implicit) hardlink group ID '0'. Hence, 'hardlinks == 1' means that a
1938  // file doesn't have any hardlinks (linkcount = 1) and doesn't need treatment
1939  // here.
1940  //
1941  // Files that have hardlinks (linkcount > 1) will have a very large integer in
1942  // their 'hardlinks' field (hardlink group ID > 0 in higher 32 bits). Those
1943  // files will be treated by setting their 'hardlinks' field to 1, effectively
1944  // clearing all hardlink information from the directory entry.
1945  const std::string stmt = "UPDATE OR ABORT catalog "
1946  "SET hardlinks = 1 "
1947  "WHERE flags & :file_flag "
1948  " AND hardlinks > 1;";
1949  catalog::SqlCatalog hardlink_removal_sql(db, stmt);
1950  hardlink_removal_sql.BindInt64(1, catalog::SqlDirent::kFlagFile);
1951  hardlink_removal_sql.Execute();
1952 
1953  return db.CommitTransaction();
1954 }
1955 
1956 //------------------------------------------------------------------------------
1957 
1958 
1960  PendingCatalog *data) const {
1961  return CheckDatabaseSchemaCompatibility(data)
1962  && RemoveRedundantBulkHashes(data);
1963 }
1964 
1965 
1968  assert(data->old_catalog != NULL);
1969  assert(data->new_catalog == NULL);
1970 
1971  const catalog::CatalogDatabase &clg = data->old_catalog->database();
1973 }
1974 
1975 
1977  PendingCatalog *data) const {
1978  assert(data->old_catalog != NULL);
1979  assert(data->new_catalog == NULL);
1980 
1981  const catalog::CatalogDatabase &db = GetWritable(data->old_catalog)
1982  ->database();
1983 
1984  if (!db.BeginTransaction()) {
1985  return false;
1986  }
1987 
1988  // Regular files with both bulk hashes and chunked hashes can drop the bulk
1989  // hash since modern clients >= 2.1.7 won't require them
1990  const std::string stmt = "UPDATE OR ABORT catalog "
1991  "SET hash = NULL "
1992  "WHERE flags & :file_chunked_flag;";
1993  catalog::SqlCatalog bulkhash_removal_sql(db, stmt);
1994  bulkhash_removal_sql.BindInt64(1, catalog::SqlDirent::kFlagFileChunk);
1995  bulkhash_removal_sql.Execute();
1996 
1997  return db.CommitTransaction();
1998 }
1999 
2000 
2001 //------------------------------------------------------------------------------
2002 
2003 
2005  const worker_context *context)
2007 
2008 
2010  PendingCatalog *data) const {
2011  return CheckDatabaseSchemaCompatibility(data)
2012  && StartDatabaseTransaction(data) && RepairStatisticsCounters(data)
2013  && CommitDatabaseTransaction(data);
2014 }
2015 
2016 
2018  PendingCatalog *data) const {
2019  assert(data->old_catalog != NULL);
2020  assert(data->new_catalog == NULL);
2021 
2022  const catalog::CatalogDatabase &clg = data->old_catalog->database();
2024  Error("Given catalog schema is < 2.5.", data);
2025  return false;
2026  }
2027 
2028  if (clg.schema_revision() < 5) {
2029  Error("Given catalog revision is < 5", data);
2030  return false;
2031  }
2032 
2033  return true;
2034 }
2035 
2036 
2038  PendingCatalog *data) const {
2039  assert(!data->HasNew());
2040  GetWritable(data->old_catalog)->Transaction();
2041  return true;
2042 }
2043 
2044 
2046  PendingCatalog *data) const {
2047  assert(!data->HasNew());
2048  bool retval = false;
2049  const catalog::CatalogDatabase &writable = GetWritable(data->old_catalog)
2050  ->database();
2051 
2052  // Aggregated the statistics counters of all nested catalogs
2053  // Note: we might need to wait until nested catalogs are successfully
2054  // processed
2055  catalog::DeltaCounters stats_counters;
2056  PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
2057  const PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
2058  for (; i != iend; ++i) {
2059  const PendingCatalog *nested_catalog = *i;
2060  const catalog::DeltaCounters &s = nested_catalog->nested_statistics.Get();
2061  s.PopulateToParent(&stats_counters);
2062  }
2063 
2064  // Count various directory entry types in the catalog to fill up the catalog
2065  // statistics counters introduced in the current catalog schema
2066  catalog::SqlCatalog count_regular(
2067  writable,
2068  std::string("SELECT count(*), sum(size) FROM catalog ") + "WHERE flags & "
2069  + StringifyInt(catalog::SqlDirent::kFlagFile) + " AND NOT flags & "
2070  + StringifyInt(catalog::SqlDirent::kFlagLink) + " AND NOT flags & "
2072  catalog::SqlCatalog count_external(
2073  writable,
2074  std::string("SELECT count(*), sum(size) FROM catalog ") + "WHERE flags & "
2076  catalog::SqlCatalog count_symlink(
2077  writable,
2078  std::string("SELECT count(*) FROM catalog ") + "WHERE flags & "
2080  catalog::SqlCatalog count_special(
2081  writable,
2082  std::string("SELECT count(*) FROM catalog ") + "WHERE flags & "
2084  catalog::SqlCatalog count_xattr(writable,
2085  std::string("SELECT count(*) FROM catalog ")
2086  + "WHERE xattr IS NOT NULL;");
2087  catalog::SqlCatalog count_chunk(
2088  writable,
2089  std::string("SELECT count(*), sum(size) FROM catalog ") + "WHERE flags & "
2091  catalog::SqlCatalog count_dir(
2092  writable,
2093  std::string("SELECT count(*) FROM catalog ") + "WHERE flags & "
2095  catalog::SqlCatalog count_chunk_blobs(writable,
2096  "SELECT count(*) FROM chunks;");
2097 
2098  retval = count_regular.FetchRow() && count_external.FetchRow()
2099  && count_symlink.FetchRow() && count_special.FetchRow()
2100  && count_xattr.FetchRow() && count_chunk.FetchRow()
2101  && count_dir.FetchRow() && count_chunk_blobs.FetchRow();
2102  if (!retval) {
2103  Error("Failed to collect catalog statistics", data);
2104  return false;
2105  }
2106 
2107  stats_counters.self.regular_files = count_regular.RetrieveInt64(0);
2108  stats_counters.self.symlinks = count_symlink.RetrieveInt64(0);
2109  stats_counters.self.specials = count_special.RetrieveInt64(0);
2110  stats_counters.self.directories = count_dir.RetrieveInt64(0);
2111  stats_counters.self.nested_catalogs = data->nested_catalogs.size();
2112  stats_counters.self.chunked_files = count_chunk.RetrieveInt64(0);
2113  stats_counters.self.file_chunks = count_chunk_blobs.RetrieveInt64(0);
2114  stats_counters.self.file_size = count_regular.RetrieveInt64(1);
2115  stats_counters.self.chunked_file_size = count_chunk.RetrieveInt64(1);
2116  stats_counters.self.xattrs = count_xattr.RetrieveInt64(0);
2117  stats_counters.self.externals = count_external.RetrieveInt64(0);
2118  stats_counters.self.external_file_size = count_external.RetrieveInt64(1);
2119 
2120  // Write back the generated statistics counters into the catalog database
2121  catalog::Counters counters;
2122  counters.ApplyDelta(stats_counters);
2123  retval = counters.InsertIntoDatabase(writable);
2124  if (!retval) {
2125  Error("Failed to write new statistics counters to database", data);
2126  return false;
2127  }
2128 
2129  // Push the generated statistics counters up to the parent catalog
2130  data->nested_statistics.Set(stats_counters);
2131 
2132  return true;
2133 }
2134 
2135 
2137  PendingCatalog *data) const {
2138  assert(!data->HasNew());
2139  GetWritable(data->old_catalog)->Commit();
2140  return true;
2141 }
2142 
2143 } // namespace swissknife
bool InsertIntoDatabase(const CatalogDatabase &database) const
uint32_t linkcount() const
catalog::Catalog const * root_catalog_
int return_code
the return value of the spooler operation
void ConvertCatalogsRecursively(PendingCatalog *catalog, MigratorT *migrator)
static Parameter Optional(const char key, const std::string &desc)
Definition: swissknife.h:41
bool UpdateCatalogMetadata(PendingCatalog *data) const
std::string GetLastErrorMsg() const
Definition: sql.cc:165
bool AnalyzeFileLinkcounts(PendingCatalog *data) const
UniquePtr< upload::Spooler > spooler_
CallbackPtr RegisterListener(typename BoundClosure< WorkerT::returned_data, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data)
PendingCatalogMap pending_catalogs_
Differences CompareTo(const DirectoryEntry &other) const
bool ReadPersonaMaps(const std::string &uid_map_path, const std::string &gid_map_path, UidMap *uid_map, GidMap *gid_map) const
const manifest::Manifest * manifest() const
Definition: repository.h:125
bool Export(const std::string &path) const
Definition: manifest.cc:217
shash::Md5 GetPathHash() const
Definition: catalog_sql.cc:729
bool Execute()
Definition: sql.cc:41
bool Reset()
Definition: sql.cc:127
static Parameter Switch(const char key, const std::string &desc)
Definition: swissknife.h:44
bool MigrateFileMetadata(PendingCatalog *data) const
bool UpdateCatalogSchema(PendingCatalog *data) const
FileChunkList file_chunks
the file chunks generated during processing
bool BindText(const int index, const std::string &value)
Definition: sql.h:394
void Insert(const CatalogStatistics &statistics)
std::string database_path() const
Definition: catalog.h:180
void set_catalog_hash(const shash::Any &catalog_hash)
Definition: manifest.h:107
bool FetchRow()
Definition: sql.cc:61
bool FindRootEntryInformation(PendingCatalog *data) const
const std::string & filename() const
Definition: sql.h:146
ConcurrentWorkers< DerivedWorkerT > * master() const
Definition: concurrency.h:784
void WaitForEmptyQueue() const
bool StartDatabaseTransaction(PendingCatalog *data) const
bool LoadCatalogs(const shash::Any &manual_root_hash, ObjectFetcherT *object_fetcher)
bool BindDirent(const DirectoryEntry &entry)
bool FixNestedCatalogTransitionPoints(PendingCatalog *data) const
static const std::string kPreviousHeadTag
Definition: repository.cc:41
std::string name
Definition: history.h:92
std::vector< Parameter > ParameterList
Definition: swissknife.h:71
CVMFS_EXPORT const LogSource source
Definition: exception.h:33
void CreateNestedCatalogMarkerDirent(const shash::Any &content_hash)
double GetTime() const
Definition: algorithm.cc:75
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:241
void Assign(const char *chars, const unsigned length)
Definition: shortstring.h:61
void UploadCallback(const upload::SpoolerResult &result)
bool IsDirty() const
Definition: catalog_rw.h:57
const history::History * history() const
gid_t gid_
Definition: loader.cc:135
gid_t gid_
void Reset()
Definition: algorithm.cc:68
void ApplyDelta(const DeltaCounters &delta)
bool IsHttpUrl(const std::string &path)
Definition: posix.cc:167
DirectoryEntry GetDirent(const Catalog *catalog, const bool expand_symlink=true) const
Definition: catalog_sql.cc:738
bool BeginTransaction() const
Definition: sql_impl.h:268
std::string CreateTempPath(const std::string &path_prefix, const int mode)
Definition: posix.cc:1042
unsigned int GetNumberOfCpuCores()
Definition: concurrency.cc:18
void set_revision(const uint64_t revision)
Definition: manifest.h:93
bool UpdateUndoTags(PendingCatalog *root_catalog, uint64_t revision, time_t timestamp, shash::Any *history_hash)
static void FixNestedCatalogTransitionPoint(const catalog::DirectoryEntry &nested_root, catalog::DirectoryEntry *mountpoint)
virtual ParameterList GetParams() const
uint64_t GetTTL() const
Definition: catalog.cc:480
assert((mem||(size==0))&&"Out Of Memory")
static const int kFlagFileChunk
Definition: catalog_sql.h:181
float schema_version() const
Definition: sql.h:147
CatalogList GetChildren() const
Definition: catalog.cc:741
void PrintStatusMessage(const PendingCatalog *catalog, const shash::Any &content_hash, const std::string &message)
void SetPreviousRevision(const shash::Any &hash)
Definition: catalog_rw.cc:337
bool RemoveDanglingNestedMountpoints(PendingCatalog *data) const
static const int kFlagFile
Definition: catalog_sql.h:178
static catalog::DirectoryEntry nested_catalog_marker_
bool AttachOldCatalogDatabase(PendingCatalog *data) const
bool CopyPath2Path(const string &src, const string &dest)
Definition: compression.cc:66
bool ReadPersona(const std::string &uid, const std::string &gid)
const NestedCatalogList & ListNestedCatalogs() const
Definition: catalog.cc:620
void Set(const T &object)
Definition: future.h:53
uint64_t size_
void JobSuccessful(const returned_data_t &data)
Definition: concurrency.h:584
bool MigrateNestedCatalogMountPoints(PendingCatalog *data) const
unsigned schema_revision() const
Definition: sql.h:148
void CatalogCallback(const CatalogTraversalData< catalog::WritableCatalog > &data)
signature::SignatureManager * signature_manager() const
Definition: server_tool.cc:101
T & Get()
Definition: future.h:66
UniquePtr< history::SqliteHistory > history_upstream_
static const int kFlagLink
Definition: catalog_sql.h:179
static void Error(const std::string &message)
bool WriteToDatabase(const CatalogDatabase &database) const
bool IsNestedCatalogMountpoint() const
bool BindPathHash(const shash::Md5 &hash)
bool CommitDatabaseTransaction(PendingCatalog *data) const
static SqliteHistory * OpenWritable(const std::string &file_name)
static WritableCatalog * AttachFreely(const std::string &root_path, const std::string &file, const shash::Any &catalog_hash, Catalog *parent=NULL, const bool is_not_root=false)
Definition: catalog_rw.cc:49
std::string description
Definition: history.h:97
NameString name_
uint64_t GetRevision() const
Definition: catalog.cc:517
int GetLastError() const
Definition: sql.h:340
bool GenerateCatalogStatistics(PendingCatalog *data) const
NameString name() const
uint64_t GetLastModified() const
Definition: catalog.cc:522
int64_t String2Int64(const string &value)
Definition: string.cc:234
bool BindDouble(const int index, const double value)
Definition: sql.h:366
void UploadHistoryClosure(const upload::SpoolerResult &result, Future< shash::Any > *hash)
std::string local_path
the local_path previously given as input
uid_t uid_
Definition: loader.cc:134
download::DownloadManager * download_manager() const
Definition: server_tool.cc:96
Future< catalog::DirectoryEntry > root_entry
std::string GenerateMappingStatement(const MapT &map, const std::string &column) const
static const std::string kPreviousHeadTagDescription
static const float kSchemaEpsilon
Definition: sql.h:104
bool RepairStatisticsCounters(PendingCatalog *data) const
bool CheckDatabaseSchemaCompatibility(PendingCatalog *data) const
static Parameter Mandatory(const char key, const std::string &desc)
Definition: swissknife.h:38
bool StartDatabaseTransaction(PendingCatalog *data) const
bool DetachOldCatalogDatabase(PendingCatalog *data) const
const char kSuffixCatalog
Definition: hash.h:54
uint32_t linkcount_
bool ApplyPersonaMappings(PendingCatalog *data) const
static const int kFlagFileSpecial
Definition: catalog_sql.h:180
static const int kFlagFileExternal
Definition: catalog_sql.h:187
bool CommitDatabaseTransaction(PendingCatalog *data) const
static const std::string kHeadTag
Definition: repository.cc:40
static const catalog::DirectoryEntry & GetNestedCatalogMarkerDirent()
void PopulateToParent(DeltaCounters *parent) const
bool InsertInitialValues(const std::string &root_path, const bool volatile_content, const std::string &voms_authz, const DirectoryEntry &root_entry=DirectoryEntry(kDirentNegative))
Definition: catalog_sql.cc:299
sqlite3_int64 RetrieveInt64(const int idx_column) const
Definition: sql.h:445
bool RunMigration(PendingCatalog *data) const
PathString mountpoint() const
Definition: catalog.h:175
time_t mtime_
bool BindPathHash(const struct shash::Md5 &hash)
Definition: catalog_sql.cc:830
string StringifyInt(const int64_t value)
Definition: string.cc:77
bool IsValid() const
Definition: pointer.h:47
bool CommitTransaction() const
Definition: sql_impl.h:274
bool ReadFromDatabase(const CatalogDatabase &database, const LegacyMode::Type legacy=LegacyMode::kNoLegacy)
bool BindInt64(const int index, const sqlite3_int64 value)
Definition: sql.h:376
void Stop()
Definition: algorithm.cc:60
void set_history(const shash::Any &history_db)
Definition: manifest.h:97
Future< catalog::DeltaCounters > nested_statistics
std::vector< Catalog * > CatalogList
Definition: catalog.h:38
bool GenerateNewStatisticsCounters(PendingCatalog *data) const
void set_ttl(const uint32_t ttl)
Definition: manifest.h:92
bool CommitDatabaseTransaction(PendingCatalog *data) const
bool RunMigration(PendingCatalog *data) const
bool InitSignatureManager(const std::string &pubkey_path, const std::string &certificate_path="", const std::string &private_key_path="")
Definition: server_tool.cc:44
std::string ToString() const
Definition: shortstring.h:139
std::vector< NestedCatalog > NestedCatalogList
Definition: catalog.h:204
int Main(const ArgumentList &args)
std::map< char, SharedPtr< std::string > > ArgumentList
Definition: swissknife.h:72
unsigned int mode_
ShortString< kDefaultMaxPath, 0 > PathString
Definition: shortstring.h:213
MigrationWorker_20x(const worker_context *context)
UniquePtr< manifest::Manifest > manifest_upstream_
shash::Any root_hash
Definition: history.h:93
bool CleanupNestedCatalogs(PendingCatalog *data) const
bool CreateNewEmptyCatalog(PendingCatalog *data) const
void set_catalog_size(const uint64_t catalog_size)
Definition: manifest.h:104
CatalogStatisticsList catalog_statistics_list_
const CatalogDatabase & database() const
Definition: catalog.h:249
unsigned int GetNumberOfFailedJobs() const
Definition: concurrency.h:574
Any MkFromHexPtr(const HexPtr hex, const char suffix)
Definition: hash.cc:82
int64_t GetFileSize(const std::string &path)
Definition: posix.cc:812
HttpObjectFetcher ObjectFetcher
bool CheckDatabaseSchemaCompatibility(PendingCatalog *data) const
static const float kLatestSupportedSchema
Definition: catalog_sql.h:44
bool StartDatabaseTransaction(PendingCatalog *data) const
void JobFailed(const returned_data_t &data)
Definition: concurrency.h:598
unsigned GetLength() const
Definition: shortstring.h:131
shash::Any checksum_
shash::Any hash() const
Definition: catalog.h:182
static const char * kVirtualPath
uint64_t revision
Definition: history.h:95
bool InitDownloadManager(const bool follow_redirects, const std::string &proxy, const unsigned max_pool_handles=1)
Definition: server_tool.cc:17
bool CheckDatabaseSchemaCompatibility(PendingCatalog *data) const
bool BindMd5(const int idx_high, const int idx_low, const shash::Md5 &hash)
Definition: catalog_sql.h:134
const char * GetChars() const
Definition: shortstring.h:123
uint32_t hardlink_group() const
bool BindParentPathHash(const shash::Md5 &hash)
static const unsigned kLatestSchemaRevision
Definition: catalog_sql.h:46
static const int kFlagDir
Definition: catalog_sql.h:173
void MigrationCallback(PendingCatalog *const &data)
void Start()
Definition: algorithm.cc:52
void set_is_nested_catalog_mountpoint(const bool val)
unsigned int Differences
bool BindPathHash(const shash::Md5 &hash)
catalog::WritableCatalog * GetWritable(const catalog::Catalog *catalog) const
void Destroy()
Definition: pointer.h:53
static DerivedT * Create(const std::string &filename)
Definition: sql_impl.h:30
bool DoMigrationAndCommit(const std::string &manifest_path, typename MigratorT::worker_context *context)
size_t size() const
Definition: bigvector.h:117
bool BindDirent(const DirectoryEntry &entry)
uid_t uid_
void set_root_path(const std::string &root_path)
Definition: manifest.h:117
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545