GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/swissknife_migrate.cc
Date: 2025-06-22 02:36:02
Exec Total Coverage
Lines: 0 1046 0.0%
Branches: 0 658 0.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 *
4 * Careful: any real schema migration as of now requires taking care of
5 * hash algorithm
6 */
7
8 #include "swissknife_migrate.h"
9
10 #include <sys/resource.h>
11 #include <unistd.h>
12
13 #include "catalog_rw.h"
14 #include "catalog_sql.h"
15 #include "catalog_virtual.h"
16 #include "compression/compression.h"
17 #include "crypto/hash.h"
18 #include "swissknife_history.h"
19 #include "util/concurrency.h"
20 #include "util/logging.h"
21
22 using namespace std; // NOLINT
23
24 namespace swissknife {
25
26 catalog::DirectoryEntry CommandMigrate::nested_catalog_marker_;
27
28 CommandMigrate::CommandMigrate()
29 : file_descriptor_limit_(8192)
30 , catalog_count_(0)
31 , has_committed_new_revision_(false)
32 , uid_(0)
33 , gid_(0)
34 , root_catalog_(NULL) {
35 atomic_init32(&catalogs_processed_);
36 }
37
38
39 ParameterList CommandMigrate::GetParams() const {
40 ParameterList r;
41 r.push_back(Parameter::Mandatory(
42 'v',
43 "migration base version ( 2.0.x | 2.1.7 | chown | hardlink | bulkhash | "
44 "stats)"));
45 r.push_back(Parameter::Mandatory(
46 'r', "repository URL (absolute local path or remote URL)"));
47 r.push_back(Parameter::Mandatory('u', "upstream definition string"));
48 r.push_back(Parameter::Mandatory('o', "manifest output file"));
49 r.push_back(
50 Parameter::Mandatory('t', "temporary directory for catalog decompress"));
51 r.push_back(
52 Parameter::Optional('p', "user id to be used for this repository"));
53 r.push_back(
54 Parameter::Optional('g', "group id to be used for this repository"));
55 r.push_back(Parameter::Optional('n', "fully qualified repository name"));
56 r.push_back(Parameter::Optional('h', "root hash (other than trunk)"));
57 r.push_back(Parameter::Optional('k', "repository master key(s)"));
58 r.push_back(Parameter::Optional('i', "UID map for chown"));
59 r.push_back(Parameter::Optional('j', "GID map for chown"));
60 r.push_back(Parameter::Optional('@', "proxy url"));
61 r.push_back(Parameter::Switch('f', "fix nested catalog transition points"));
62 r.push_back(Parameter::Switch('l', "disable linkcount analysis of files"));
63 r.push_back(
64 Parameter::Switch('s', "enable collection of catalog statistics"));
65 return r;
66 }
67
68
69 static void Error(const std::string &message) {
70 LogCvmfs(kLogCatalog, kLogStderr, "%s", message.c_str());
71 }
72
73
74 static void Error(const std::string &message,
75 const CommandMigrate::PendingCatalog *catalog) {
76 const std::string err_msg = message
77 + "\n"
78 "Catalog: "
79 + catalog->root_path();
80 Error(err_msg);
81 }
82
83
84 static void Error(const std::string &message,
85 const catalog::SqlCatalog &statement,
86 const CommandMigrate::PendingCatalog *catalog) {
87 const std::string err_msg = message
88 + "\n"
89 "SQLite: "
90 + StringifyInt(statement.GetLastError()) + " - "
91 + statement.GetLastErrorMsg();
92 Error(err_msg, catalog);
93 }
94
95
96 int CommandMigrate::Main(const ArgumentList &args) {
97 shash::Any manual_root_hash;
98 const std::string &migration_base = *args.find('v')->second;
99 const std::string &repo_url = *args.find('r')->second;
100 const std::string &spooler = *args.find('u')->second;
101 const std::string &manifest_path = *args.find('o')->second;
102 const std::string &tmp_dir = *args.find('t')->second;
103 const std::string &uid = (args.count('p') > 0) ? *args.find('p')->second : "";
104 const std::string &gid = (args.count('g') > 0) ? *args.find('g')->second : "";
105 const std::string &repo_name = (args.count('n') > 0) ? *args.find('n')->second
106 : "";
107 const std::string &repo_keys = (args.count('k') > 0) ? *args.find('k')->second
108 : "";
109 const std::string &uid_map_path = (args.count('i') > 0)
110 ? *args.find('i')->second
111 : "";
112 const std::string &gid_map_path = (args.count('j') > 0)
113 ? *args.find('j')->second
114 : "";
115 const bool fix_transition_points = (args.count('f') > 0);
116 const bool analyze_file_linkcounts = (args.count('l') == 0);
117 const bool collect_catalog_statistics = (args.count('s') > 0);
118 if (args.count('h') > 0) {
119 manual_root_hash = shash::MkFromHexPtr(
120 shash::HexPtr(*args.find('h')->second), shash::kSuffixCatalog);
121 }
122
123 // We might need a lot of file descriptors
124 if (!RaiseFileDescriptorLimit()) {
125 Error("Failed to raise file descriptor limits");
126 return 2;
127 }
128
129 // Put SQLite into multithreaded mode
130 if (!ConfigureSQLite()) {
131 Error("Failed to preconfigure SQLite library");
132 return 3;
133 }
134
135 // Create an upstream spooler
136 temporary_directory_ = tmp_dir;
137 const upload::SpoolerDefinition spooler_definition(spooler, shash::kSha1);
138 spooler_ = upload::Spooler::Construct(spooler_definition);
139 if (!spooler_.IsValid()) {
140 Error("Failed to create upstream Spooler.");
141 return 5;
142 }
143 spooler_->RegisterListener(&CommandMigrate::UploadCallback, this);
144
145 // Load the full catalog hierarchy
146 LogCvmfs(kLogCatalog, kLogStdout, "Loading current catalog tree...");
147
148 catalog_loading_stopwatch_.Start();
149 bool loading_successful = false;
150 if (IsHttpUrl(repo_url)) {
151 typedef HttpObjectFetcher<catalog::WritableCatalog> ObjectFetcher;
152
153 const bool follow_redirects = false;
154 const string proxy = (args.count('@') > 0) ? *args.find('@')->second : "";
155 if (!this->InitDownloadManager(follow_redirects, proxy)
156 || !this->InitSignatureManager(repo_keys)) {
157 LogCvmfs(kLogCatalog, kLogStderr, "Failed to init repo connection");
158 return 1;
159 }
160
161 ObjectFetcher fetcher(
162 repo_name, repo_url, tmp_dir, download_manager(), signature_manager());
163
164 loading_successful = LoadCatalogs(manual_root_hash, &fetcher);
165 } else {
166 typedef LocalObjectFetcher<catalog::WritableCatalog> ObjectFetcher;
167 ObjectFetcher fetcher(repo_url, tmp_dir);
168 loading_successful = LoadCatalogs(manual_root_hash, &fetcher);
169 }
170 catalog_loading_stopwatch_.Stop();
171
172 if (!loading_successful) {
173 Error("Failed to load catalog tree");
174 return 4;
175 }
176
177 LogCvmfs(kLogCatalog, kLogStdout, "Loaded %d catalogs", catalog_count_);
178 assert(root_catalog_ != NULL);
179
180 // Do the actual migration step
181 bool migration_succeeded = false;
182 if (migration_base == "2.0.x") {
183 if (!ReadPersona(uid, gid)) {
184 return 1;
185 }
186
187 // Generate and upload a nested catalog marker
188 if (!GenerateNestedCatalogMarkerChunk()) {
189 Error("Failed to create a nested catalog marker.");
190 return 6;
191 }
192 spooler_->WaitForUpload();
193
194 // Configure the concurrent catalog migration facility
195 MigrationWorker_20x::worker_context context(temporary_directory_,
196 collect_catalog_statistics,
197 fix_transition_points,
198 analyze_file_linkcounts,
199 uid_,
200 gid_);
201 migration_succeeded = DoMigrationAndCommit<MigrationWorker_20x>(
202 manifest_path, &context);
203 } else if (migration_base == "2.1.7") {
204 MigrationWorker_217::worker_context context(temporary_directory_,
205 collect_catalog_statistics);
206 migration_succeeded = DoMigrationAndCommit<MigrationWorker_217>(
207 manifest_path, &context);
208 } else if (migration_base == "chown") {
209 UidMap uid_map;
210 GidMap gid_map;
211 if (!ReadPersonaMaps(uid_map_path, gid_map_path, &uid_map, &gid_map)) {
212 Error("Failed to read UID and/or GID map");
213 return 1;
214 }
215 ChownMigrationWorker::worker_context context(
216 temporary_directory_, collect_catalog_statistics, uid_map, gid_map);
217 migration_succeeded = DoMigrationAndCommit<ChownMigrationWorker>(
218 manifest_path, &context);
219 } else if (migration_base == "hardlink") {
220 HardlinkRemovalMigrationWorker::worker_context context(
221 temporary_directory_, collect_catalog_statistics);
222 migration_succeeded = DoMigrationAndCommit<HardlinkRemovalMigrationWorker>(
223 manifest_path, &context);
224 } else if (migration_base == "bulkhash") {
225 BulkhashRemovalMigrationWorker::worker_context context(
226 temporary_directory_, collect_catalog_statistics);
227 migration_succeeded = DoMigrationAndCommit<BulkhashRemovalMigrationWorker>(
228 manifest_path, &context);
229 } else if (migration_base == "stats") {
230 StatsMigrationWorker::worker_context context(temporary_directory_,
231 collect_catalog_statistics);
232 migration_succeeded = DoMigrationAndCommit<StatsMigrationWorker>(
233 manifest_path, &context);
234 } else {
235 const std::string err_msg = "Unknown migration base: " + migration_base;
236 Error(err_msg);
237 return 1;
238 }
239
240 // Check if everything went well
241 if (!migration_succeeded) {
242 Error("Migration failed!");
243 return 5;
244 }
245
246 // Analyze collected statistics
247 if (collect_catalog_statistics && has_committed_new_revision_) {
248 LogCvmfs(kLogCatalog, kLogStdout, "\nCollected statistics results:");
249 AnalyzeCatalogStatistics();
250 }
251
252 LogCvmfs(kLogCatalog, kLogStdout, "\nCatalog Migration succeeded");
253 return 0;
254 }
255
256
257 bool CommandMigrate::ReadPersona(const std::string &uid,
258 const std::string &gid) {
259 if (uid.empty()) {
260 Error("Please provide a user ID");
261 return false;
262 }
263 if (gid.empty()) {
264 Error("Please provide a group ID");
265 return false;
266 }
267
268 uid_ = String2Int64(uid);
269 gid_ = String2Int64(gid);
270 return true;
271 }
272
273
274 bool CommandMigrate::ReadPersonaMaps(const std::string &uid_map_path,
275 const std::string &gid_map_path,
276 UidMap *uid_map,
277 GidMap *gid_map) const {
278 if (!uid_map->Read(uid_map_path) || !uid_map->IsValid()) {
279 Error("Failed to read UID map");
280 return false;
281 }
282
283 if (!gid_map->Read(gid_map_path) || !gid_map->IsValid()) {
284 Error("Failed to read GID map");
285 return false;
286 }
287
288 if (uid_map->RuleCount() == 0 && !uid_map->HasDefault()) {
289 Error("UID map appears to be empty");
290 return false;
291 }
292
293 if (gid_map->RuleCount() == 0 && !gid_map->HasDefault()) {
294 Error("GID map appears to be empty");
295 return false;
296 }
297
298 return true;
299 }
300
301
302 void CommandMigrate::UploadHistoryClosure(const upload::SpoolerResult &result,
303 Future<shash::Any> *hash) {
304 assert(!result.IsChunked());
305 if (result.return_code != 0) {
306 LogCvmfs(kLogCvmfs, kLogStderr, "failed to upload history database (%d)",
307 result.return_code);
308 hash->Set(shash::Any());
309 } else {
310 hash->Set(result.content_hash);
311 }
312 }
313
314
315 bool CommandMigrate::UpdateUndoTags(PendingCatalog *root_catalog,
316 uint64_t revision,
317 time_t timestamp,
318 shash::Any *history_hash) {
319 const string filename_old = history_upstream_->filename();
320 const string filename_new = filename_old + ".new";
321 bool retval = CopyPath2Path(filename_old, filename_new);
322 if (!retval)
323 return false;
324 UniquePtr<history::SqliteHistory> history(
325 history::SqliteHistory::OpenWritable(filename_new));
326 history->TakeDatabaseFileOwnership();
327
328 history::History::Tag tag_trunk;
329 const bool exists = history->GetByName(CommandTag::kHeadTag, &tag_trunk);
330 if (exists) {
331 retval = history->Remove(CommandTag::kHeadTag);
332 if (!retval)
333 return false;
334
335 history::History::Tag tag_trunk_previous = tag_trunk;
336 tag_trunk_previous.name = CommandTag::kPreviousHeadTag;
337 tag_trunk_previous.description = CommandTag::kPreviousHeadTagDescription;
338 history->Remove(CommandTag::kPreviousHeadTag);
339
340 tag_trunk.root_hash = root_catalog->new_catalog_hash;
341 tag_trunk.size = root_catalog->new_catalog_size;
342 tag_trunk.revision = revision;
343 tag_trunk.timestamp = timestamp;
344
345 retval = history->Insert(tag_trunk_previous);
346 if (!retval)
347 return false;
348 retval = history->Insert(tag_trunk);
349 if (!retval)
350 return false;
351 }
352
353 history->SetPreviousRevision(manifest_upstream_->history());
354 history->DropDatabaseFileOwnership();
355 history.Destroy();
356
357 Future<shash::Any> history_hash_new;
358 upload::Spooler::CallbackPtr callback = spooler_->RegisterListener(
359 &CommandMigrate::UploadHistoryClosure, this, &history_hash_new);
360 spooler_->ProcessHistory(filename_new);
361 spooler_->WaitForUpload();
362 spooler_->UnregisterListener(callback);
363 unlink(filename_new.c_str());
364 *history_hash = history_hash_new.Get();
365 if (history_hash->IsNull()) {
366 Error("failed to upload tag database");
367 return false;
368 }
369
370 return true;
371 }
372
373
374 template<class MigratorT>
375 bool CommandMigrate::DoMigrationAndCommit(
376 const std::string &manifest_path,
377 typename MigratorT::worker_context *context) {
378 // Create a concurrent migration context for catalog migration
379 const unsigned int cpus = GetNumberOfCpuCores();
380 ConcurrentWorkers<MigratorT> concurrent_migration(cpus, cpus * 10, context);
381
382 if (!concurrent_migration.Initialize()) {
383 Error("Failed to initialize worker migration system.");
384 return false;
385 }
386 concurrent_migration.RegisterListener(&CommandMigrate::MigrationCallback,
387 this);
388
389 // Migrate catalogs recursively (starting with the deepest nested catalogs)
390 LogCvmfs(kLogCatalog, kLogStdout, "\nMigrating catalogs...");
391 PendingCatalog *root_catalog = new PendingCatalog(root_catalog_);
392 migration_stopwatch_.Start();
393 ConvertCatalogsRecursively(root_catalog, &concurrent_migration);
394 concurrent_migration.WaitForEmptyQueue();
395 spooler_->WaitForUpload();
396 spooler_->UnregisterListeners();
397 migration_stopwatch_.Stop();
398
399 // check for possible errors during the migration process
400 const unsigned int errors = concurrent_migration.GetNumberOfFailedJobs()
401 + spooler_->GetNumberOfErrors();
402 LogCvmfs(kLogCatalog, kLogStdout,
403 "Catalog Migration finished with %d errors.", errors);
404 if (errors > 0) {
405 LogCvmfs(kLogCatalog, kLogStdout,
406 "\nCatalog Migration produced errors\nAborting...");
407 return false;
408 }
409
410 if (root_catalog->was_updated.Get()) {
411 LogCvmfs(kLogCatalog, kLogStdout,
412 "\nCommitting migrated repository revision...");
413 manifest::Manifest manifest = *manifest_upstream_;
414 manifest.set_catalog_hash(root_catalog->new_catalog_hash);
415 manifest.set_catalog_size(root_catalog->new_catalog_size);
416 manifest.set_root_path(root_catalog->root_path());
417 const catalog::Catalog *new_catalog = (root_catalog->HasNew())
418 ? root_catalog->new_catalog
419 : root_catalog->old_catalog;
420 manifest.set_ttl(new_catalog->GetTTL());
421 manifest.set_revision(new_catalog->GetRevision());
422
423 // Commit the new (migrated) repository revision...
424 if (history_upstream_.IsValid()) {
425 shash::Any history_hash(manifest_upstream_->history());
426 LogCvmfs(kLogCatalog, kLogStdout | kLogNoLinebreak,
427 "Updating repository tag database... ");
428 if (!UpdateUndoTags(root_catalog,
429 new_catalog->GetRevision(),
430 new_catalog->GetLastModified(),
431 &history_hash)) {
432 Error("Updating tag database failed.\nAborting...");
433 return false;
434 }
435 manifest.set_history(history_hash);
436 LogCvmfs(kLogCvmfs, kLogStdout, "%s", history_hash.ToString().c_str());
437 }
438
439 if (!manifest.Export(manifest_path)) {
440 Error("Manifest export failed.\nAborting...");
441 return false;
442 }
443 has_committed_new_revision_ = true;
444 } else {
445 LogCvmfs(kLogCatalog, kLogStdout,
446 "\nNo catalogs migrated, skipping the commit...");
447 }
448
449 // Get rid of the open root catalog
450 delete root_catalog;
451
452 return true;
453 }
454
455
456 void CommandMigrate::CatalogCallback(
457 const CatalogTraversalData<catalog::WritableCatalog> &data) {
458 std::string tree_indent;
459 std::string hash_string;
460 std::string path;
461
462 for (unsigned int i = 1; i < data.tree_level; ++i) {
463 tree_indent += "\u2502 ";
464 }
465
466 if (data.tree_level > 0) {
467 tree_indent += "\u251C\u2500 ";
468 }
469
470 hash_string = data.catalog_hash.ToString();
471
472 path = data.catalog->mountpoint().ToString();
473 if (path.empty()) {
474 path = "/";
475 root_catalog_ = data.catalog;
476 }
477
478 LogCvmfs(kLogCatalog, kLogStdout, "%s%s %s", tree_indent.c_str(),
479 hash_string.c_str(), path.c_str());
480
481 ++catalog_count_;
482 }
483
484
485 void CommandMigrate::MigrationCallback(PendingCatalog * const &data) {
486 // Check if the migration of the catalog was successful
487 if (!data->success) {
488 Error("Catalog migration failed! Aborting...");
489 exit(1);
490 return;
491 }
492
493 if (!data->HasChanges()) {
494 PrintStatusMessage(data, data->GetOldContentHash(), "preserved");
495 data->was_updated.Set(false);
496 return;
497 }
498
499 const string &path = (data->HasNew()) ? data->new_catalog->database_path()
500 : data->old_catalog->database_path();
501
502 // Save the processed catalog in the pending map
503 {
504 const LockGuard<PendingCatalogMap> guard(&pending_catalogs_);
505 assert(pending_catalogs_.find(path) == pending_catalogs_.end());
506 pending_catalogs_[path] = data;
507 }
508 catalog_statistics_list_.Insert(data->statistics);
509
510 // check the size of the uncompressed catalog file
511 const size_t new_catalog_size = GetFileSize(path);
512 if (new_catalog_size <= 0) {
513 Error("Failed to get uncompressed file size of catalog!", data);
514 exit(2);
515 return;
516 }
517 data->new_catalog_size = new_catalog_size;
518
519 // Schedule the compression and upload of the catalog
520 spooler_->ProcessCatalog(path);
521 }
522
523
524 void CommandMigrate::UploadCallback(const upload::SpoolerResult &result) {
525 const string &path = result.local_path;
526
527 // Check if the upload was successful
528 if (result.return_code != 0) {
529 Error("Failed to upload file " + path + "\nAborting...");
530 exit(2);
531 return;
532 }
533 assert(result.file_chunks.size() == 0);
534
535 // Remove the just uploaded file
536 unlink(path.c_str());
537
538 // Uploaded nested catalog marker... generate and cache DirectoryEntry for it
539 if (path == nested_catalog_marker_tmp_path_) {
540 CreateNestedCatalogMarkerDirent(result.content_hash);
541 return;
542 } else {
543 // Find the catalog path in the pending catalogs and remove it from the list
544 PendingCatalog *catalog;
545 {
546 const LockGuard<PendingCatalogMap> guard(&pending_catalogs_);
547 const PendingCatalogMap::iterator i = pending_catalogs_.find(path);
548 assert(i != pending_catalogs_.end());
549 catalog = const_cast<PendingCatalog *>(i->second);
550 pending_catalogs_.erase(i);
551 }
552
553 PrintStatusMessage(catalog, result.content_hash, "migrated and uploaded");
554
555 // The catalog is completely processed... fill the content_hash to allow the
556 // processing of parent catalogs (Notified by 'was_updated'-future)
557 // NOTE: From now on, this PendingCatalog structure could be deleted and
558 // should not be used anymore!
559 catalog->new_catalog_hash = result.content_hash;
560 catalog->was_updated.Set(true);
561 }
562 }
563
564
565 void CommandMigrate::PrintStatusMessage(const PendingCatalog *catalog,
566 const shash::Any &content_hash,
567 const std::string &message) {
568 atomic_inc32(&catalogs_processed_);
569 const unsigned int processed = (atomic_read32(&catalogs_processed_) * 100)
570 / catalog_count_;
571 LogCvmfs(kLogCatalog, kLogStdout, "[%d%%] %s %sC %s", processed,
572 message.c_str(), content_hash.ToString().c_str(),
573 catalog->root_path().c_str());
574 }
575
576
577 template<class MigratorT>
578 void CommandMigrate::ConvertCatalogsRecursively(PendingCatalog *catalog,
579 MigratorT *migrator) {
580 // First migrate all nested catalogs (depth first traversal)
581 const catalog::CatalogList nested_catalogs = catalog->old_catalog
582 ->GetChildren();
583 catalog::CatalogList::const_iterator i = nested_catalogs.begin();
584 const catalog::CatalogList::const_iterator iend = nested_catalogs.end();
585 catalog->nested_catalogs.reserve(nested_catalogs.size());
586 for (; i != iend; ++i) {
587 PendingCatalog *new_nested = new PendingCatalog(*i);
588 catalog->nested_catalogs.push_back(new_nested);
589 ConvertCatalogsRecursively(new_nested, migrator);
590 }
591
592 // Migrate this catalog referencing all its (already migrated) children
593 migrator->Schedule(catalog);
594 }
595
596
597 bool CommandMigrate::RaiseFileDescriptorLimit() const {
598 struct rlimit rpl;
599 memset(&rpl, 0, sizeof(rpl));
600 getrlimit(RLIMIT_NOFILE, &rpl);
601 if (rpl.rlim_cur < file_descriptor_limit_) {
602 if (rpl.rlim_max < file_descriptor_limit_)
603 rpl.rlim_max = file_descriptor_limit_;
604 rpl.rlim_cur = file_descriptor_limit_;
605 const bool retval = setrlimit(RLIMIT_NOFILE, &rpl);
606 if (retval != 0) {
607 return false;
608 }
609 }
610 return true;
611 }
612
613
614 bool CommandMigrate::ConfigureSQLite() const {
615 const int retval = sqlite3_config(SQLITE_CONFIG_MULTITHREAD);
616 return (retval == SQLITE_OK);
617 }
618
619
620 void CommandMigrate::AnalyzeCatalogStatistics() const {
621 const unsigned int number_of_catalogs = catalog_statistics_list_.size();
622 unsigned int aggregated_entry_count = 0;
623 unsigned int aggregated_max_row_id = 0;
624 unsigned int aggregated_hardlink_count = 0;
625 unsigned int aggregated_linkcounts = 0;
626 double aggregated_migration_time = 0.0;
627
628 CatalogStatisticsList::const_iterator i = catalog_statistics_list_.begin();
629 const CatalogStatisticsList::const_iterator iend =
630 catalog_statistics_list_.end();
631 for (; i != iend; ++i) {
632 aggregated_entry_count += i->entry_count;
633 aggregated_max_row_id += i->max_row_id;
634 aggregated_hardlink_count += i->hardlink_group_count;
635 aggregated_linkcounts += i->aggregated_linkcounts;
636 aggregated_migration_time += i->migration_time;
637 }
638
639 // Inode quantization
640 assert(aggregated_max_row_id > 0);
641 const unsigned int unused_inodes = aggregated_max_row_id
642 - aggregated_entry_count;
643 const float ratio = (static_cast<float>(unused_inodes)
644 / static_cast<float>(aggregated_max_row_id))
645 * 100.0f;
646 LogCvmfs(kLogCatalog, kLogStdout,
647 "Actual Entries: %d\n"
648 "Allocated Inodes: %d\n"
649 " Unused Inodes: %d\n"
650 " Percentage of wasted Inodes: %.1f%%\n",
651 aggregated_entry_count, aggregated_max_row_id, unused_inodes, ratio);
652
653 // Hardlink statistics
654 const float average_linkcount = (aggregated_hardlink_count > 0)
655 ? aggregated_linkcounts
656 / aggregated_hardlink_count
657 : 0.0f;
658 LogCvmfs(kLogCatalog, kLogStdout,
659 "Generated Hardlink Groups: %d\n"
660 "Average Linkcount per Group: %.1f\n",
661 aggregated_hardlink_count, average_linkcount);
662
663 // Performance measures
664 const double average_migration_time = aggregated_migration_time
665 / static_cast<double>(
666 number_of_catalogs);
667 LogCvmfs(kLogCatalog, kLogStdout,
668 "Catalog Loading Time: %.2fs\n"
669 "Average Migration Time: %.2fs\n"
670 "Overall Migration Time: %.2fs\n"
671 "Aggregated Migration Time: %.2fs\n",
672 catalog_loading_stopwatch_.GetTime(), average_migration_time,
673 migration_stopwatch_.GetTime(), aggregated_migration_time);
674 }
675
676
677 CommandMigrate::PendingCatalog::~PendingCatalog() {
678 delete old_catalog;
679 old_catalog = NULL;
680
681 if (new_catalog != NULL) {
682 delete new_catalog;
683 new_catalog = NULL;
684 }
685 }
686
687
688 template<class DerivedT>
689 CommandMigrate::AbstractMigrationWorker<DerivedT>::AbstractMigrationWorker(
690 const worker_context *context)
691 : temporary_directory_(context->temporary_directory)
692 , collect_catalog_statistics_(context->collect_catalog_statistics) { }
693
694
695 template<class DerivedT>
696 CommandMigrate::AbstractMigrationWorker<DerivedT>::~AbstractMigrationWorker() {
697 }
698
699
700 template<class DerivedT>
701 void CommandMigrate::AbstractMigrationWorker<DerivedT>::operator()(
702 const expected_data &data) {
703 migration_stopwatch_.Start();
704 const bool success = static_cast<DerivedT *>(this)->RunMigration(data)
705 && UpdateNestedCatalogReferences(data)
706 && UpdateCatalogMetadata(data)
707 && CollectAndAggregateStatistics(data)
708 && CleanupNestedCatalogs(data);
709 data->success = success;
710 migration_stopwatch_.Stop();
711
712 data->statistics.migration_time = migration_stopwatch_.GetTime();
713 migration_stopwatch_.Reset();
714
715 // Note: MigrationCallback() will take care of the result...
716 if (success) {
717 ConcurrentWorker<DerivedT>::master()->JobSuccessful(data);
718 } else {
719 ConcurrentWorker<DerivedT>::master()->JobFailed(data);
720 }
721 }
722
723
724 template<class DerivedT>
725 bool CommandMigrate::AbstractMigrationWorker<
726 DerivedT>::UpdateNestedCatalogReferences(PendingCatalog *data) const {
727 const catalog::Catalog *new_catalog = (data->HasNew()) ? data->new_catalog
728 : data->old_catalog;
729 const catalog::CatalogDatabase &writable = new_catalog->database();
730
731 catalog::SqlCatalog add_nested_catalog(
732 writable,
733 "INSERT OR REPLACE INTO nested_catalogs (path, sha1, size) "
734 " VALUES (:path, :sha1, :size);");
735
736 // go through all nested catalogs and update their references (we are
737 // currently in their parent catalog)
738 // Note: we might need to wait for the nested catalog to be fully processed.
739 PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
740 const PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
741 for (; i != iend; ++i) {
742 PendingCatalog *nested_catalog = *i;
743
744 if (!nested_catalog->was_updated.Get()) {
745 continue;
746 }
747
748 const std::string &root_path = nested_catalog->root_path();
749 const shash::Any catalog_hash = nested_catalog->new_catalog_hash;
750 const size_t catalog_size = nested_catalog->new_catalog_size;
751
752 // insert the updated nested catalog reference into the new catalog
753 const bool retval = add_nested_catalog.BindText(1, root_path)
754 && add_nested_catalog.BindText(2,
755 catalog_hash.ToString())
756 && add_nested_catalog.BindInt64(3, catalog_size)
757 && add_nested_catalog.Execute();
758 if (!retval) {
759 Error("Failed to add nested catalog link", add_nested_catalog, data);
760 return false;
761 }
762 add_nested_catalog.Reset();
763 }
764
765 return true;
766 }
767
768
769 template<class DerivedT>
770 bool CommandMigrate::AbstractMigrationWorker<DerivedT>::UpdateCatalogMetadata(
771 PendingCatalog *data) const {
772 if (!data->HasChanges()) {
773 return true;
774 }
775
776 catalog::WritableCatalog *catalog = (data->HasNew())
777 ? data->new_catalog
778 : GetWritable(data->old_catalog);
779
780 // Set the previous revision hash in the new catalog to the old catalog
781 // we are doing the whole migration as a new snapshot that does not change
782 // any files, but just applies the necessary data schema migrations
783 catalog->SetPreviousRevision(data->old_catalog->hash());
784 catalog->IncrementRevision();
785 catalog->UpdateLastModified();
786
787 return true;
788 }
789
790
791 template<class DerivedT>
792 bool CommandMigrate::AbstractMigrationWorker<
793 DerivedT>::CollectAndAggregateStatistics(PendingCatalog *data) const {
794 if (!collect_catalog_statistics_) {
795 return true;
796 }
797
798 const catalog::Catalog *new_catalog = (data->HasNew()) ? data->new_catalog
799 : data->old_catalog;
800 const catalog::CatalogDatabase &writable = new_catalog->database();
801 bool retval;
802
803 // Find out the discrepancy between MAX(rowid) and COUNT(*)
804 catalog::SqlCatalog wasted_inodes(
805 writable, "SELECT COUNT(*), MAX(rowid) FROM catalog;");
806 retval = wasted_inodes.FetchRow();
807 if (!retval) {
808 Error("Failed to count entries in catalog", wasted_inodes, data);
809 return false;
810 }
811 const unsigned int entry_count = wasted_inodes.RetrieveInt64(0);
812 const unsigned int max_row_id = wasted_inodes.RetrieveInt64(1);
813
814 // Save collected information into the central statistics aggregator
815 data->statistics.root_path = data->root_path();
816 data->statistics.max_row_id = max_row_id;
817 data->statistics.entry_count = entry_count;
818
819 return true;
820 }
821
822
823 template<class DerivedT>
824 bool CommandMigrate::AbstractMigrationWorker<DerivedT>::CleanupNestedCatalogs(
825 PendingCatalog *data) const {
826 // All nested catalogs of PendingCatalog 'data' are fully processed and
827 // accounted. It is safe to get rid of their data structures here!
828 PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
829 const PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
830 for (; i != iend; ++i) {
831 delete *i;
832 }
833
834 data->nested_catalogs.clear();
835 return true;
836 }
837
838
839 /**
840 * Those values _must_ reflect the schema version in catalog_sql.h so that a
841 * legacy catalog migration generates always the latest catalog revision.
842 * This is a deliberately duplicated piece of information to ensure that always
843 * both the catalog management and migration classes get updated.
844 */
845 const float CommandMigrate::MigrationWorker_20x::kSchema = 2.5;
846 const unsigned CommandMigrate::MigrationWorker_20x::kSchemaRevision = 7;
847
848
849 template<class DerivedT>
850 catalog::WritableCatalog *
851 CommandMigrate::AbstractMigrationWorker<DerivedT>::GetWritable(
852 const catalog::Catalog *catalog) const {
853 return dynamic_cast<catalog::WritableCatalog *>(
854 const_cast<catalog::Catalog *>(catalog));
855 }
856
857
858 //------------------------------------------------------------------------------
859
860
861 CommandMigrate::MigrationWorker_20x::MigrationWorker_20x(
862 const worker_context *context)
863 : AbstractMigrationWorker<MigrationWorker_20x>(context)
864 , fix_nested_catalog_transitions_(context->fix_nested_catalog_transitions)
865 , analyze_file_linkcounts_(context->analyze_file_linkcounts)
866 , uid_(context->uid)
867 , gid_(context->gid) { }
868
869
870 bool CommandMigrate::MigrationWorker_20x::RunMigration(
871 PendingCatalog *data) const {
872 // double-check that we are generating compatible catalogs to the actual
873 // catalog management classes
874 assert(kSchema == catalog::CatalogDatabase::kLatestSupportedSchema);
875 assert(kSchemaRevision == catalog::CatalogDatabase::kLatestSchemaRevision);
876
877 return CreateNewEmptyCatalog(data) && CheckDatabaseSchemaCompatibility(data)
878 && AttachOldCatalogDatabase(data) && StartDatabaseTransaction(data)
879 && MigrateFileMetadata(data) && MigrateNestedCatalogMountPoints(data)
880 && FixNestedCatalogTransitionPoints(data)
881 && RemoveDanglingNestedMountpoints(data)
882 && GenerateCatalogStatistics(data) && FindRootEntryInformation(data)
883 && CommitDatabaseTransaction(data) && DetachOldCatalogDatabase(data);
884 }
885
886 bool CommandMigrate::MigrationWorker_20x::CreateNewEmptyCatalog(
887 PendingCatalog *data) const {
888 const string root_path = data->root_path();
889
890 // create a new catalog database schema
891 const string clg_db_path = CreateTempPath(temporary_directory_ + "/catalog",
892 0666);
893 if (clg_db_path.empty()) {
894 Error("Failed to create temporary file for the new catalog database.");
895 return false;
896 }
897 const bool volatile_content = false;
898
899 {
900 // TODO(rmeusel): Attach catalog should work with an open catalog database
901 // as well, to remove this inefficiency
902 const UniquePtr<catalog::CatalogDatabase> new_clg_db(
903 catalog::CatalogDatabase::Create(clg_db_path));
904 if (!new_clg_db.IsValid()
905 || !new_clg_db->InsertInitialValues(root_path, volatile_content, "")) {
906 Error("Failed to create database for new catalog");
907 unlink(clg_db_path.c_str());
908 return false;
909 }
910 }
911
912 // Attach the just created nested catalog database
913 catalog::WritableCatalog
914 *writable_catalog = catalog::WritableCatalog::AttachFreely(
915 root_path, clg_db_path, shash::Any(shash::kSha1));
916 if (writable_catalog == NULL) {
917 Error("Failed to open database for new catalog");
918 unlink(clg_db_path.c_str());
919 return false;
920 }
921
922 data->new_catalog = writable_catalog;
923 return true;
924 }
925
926
927 bool CommandMigrate::MigrationWorker_20x::CheckDatabaseSchemaCompatibility(
928 PendingCatalog *data) const {
929 const catalog::CatalogDatabase &old_catalog = data->old_catalog->database();
930 const catalog::CatalogDatabase &new_catalog = data->new_catalog->database();
931
932 if ((new_catalog.schema_version()
933 < catalog::CatalogDatabase::kLatestSupportedSchema
934 - catalog::CatalogDatabase::kSchemaEpsilon
935 || new_catalog.schema_version()
936 > catalog::CatalogDatabase::kLatestSupportedSchema
937 + catalog::CatalogDatabase::kSchemaEpsilon)
938 || (old_catalog.schema_version()
939 > 2.1 + catalog::CatalogDatabase::kSchemaEpsilon)) {
940 Error("Failed to meet database requirements for migration.", data);
941 return false;
942 }
943 return true;
944 }
945
946
947 bool CommandMigrate::MigrationWorker_20x::AttachOldCatalogDatabase(
948 PendingCatalog *data) const {
949 const catalog::CatalogDatabase &old_catalog = data->old_catalog->database();
950 const catalog::CatalogDatabase &new_catalog = data->new_catalog->database();
951
952 catalog::SqlCatalog sql_attach_new(
953 new_catalog, "ATTACH '" + old_catalog.filename() + "' AS old;");
954 const bool retval = sql_attach_new.Execute();
955
956 // remove the hardlink to the old database file (temporary file), it will not
957 // be needed anymore... data will get deleted when the database is closed
958 unlink(data->old_catalog->database().filename().c_str());
959
960 if (!retval) {
961 Error("Failed to attach database of old catalog", sql_attach_new, data);
962 return false;
963 }
964 return true;
965 }
966
967
968 bool CommandMigrate::MigrationWorker_20x::StartDatabaseTransaction(
969 PendingCatalog *data) const {
970 assert(data->HasNew());
971 data->new_catalog->Transaction();
972 return true;
973 }
974
975
976 bool CommandMigrate::MigrationWorker_20x::MigrateFileMetadata(
977 PendingCatalog *data) const {
978 assert(!data->new_catalog->IsDirty());
979 assert(data->HasNew());
980 bool retval;
981 const catalog::CatalogDatabase &writable = data->new_catalog->database();
982
983 // Hardlinks scratch space.
984 // This temporary table is used for the hardlink analysis results.
985 // The old catalog format did not have a direct notion of hardlinks and their
986 // linkcounts, but this information can be partly retrieved from the under-
987 // lying file system semantics.
988 //
989 // Hardlinks:
990 // groupid : this group id can be used for the new catalog schema
991 // inode : the inodes that were part of a hardlink group before
992 // linkcount : the linkcount for hardlink group id members
993 catalog::SqlCatalog sql_create_hardlinks_table(
994 writable,
995 "CREATE TEMPORARY TABLE hardlinks "
996 " ( hardlink_group_id INTEGER PRIMARY KEY AUTOINCREMENT, "
997 " inode INTEGER, "
998 " linkcount INTEGER, "
999 " CONSTRAINT unique_inode UNIQUE (inode) );");
1000 retval = sql_create_hardlinks_table.Execute();
1001 if (!retval) {
1002 Error("Failed to create temporary hardlink analysis table",
1003 sql_create_hardlinks_table, data);
1004 return false;
1005 }
1006
1007 // Directory Linkcount scratch space.
1008 // Directory linkcounts can be obtained from the directory hierarchy reflected
1009 // in the old style catalogs. The new catalog schema asks for this specific
1010 // linkcount. Directory linkcount analysis results will be put into this
1011 // temporary table
1012 catalog::SqlCatalog sql_create_linkcounts_table(
1013 writable,
1014 "CREATE TEMPORARY TABLE dir_linkcounts "
1015 " ( inode INTEGER PRIMARY KEY, "
1016 " linkcount INTEGER );");
1017 retval = sql_create_linkcounts_table.Execute();
1018 if (!retval) {
1019 Error("Failed to create tmeporary directory linkcount analysis table",
1020 sql_create_linkcounts_table, data);
1021 }
1022
1023 // It is possible to skip this step.
1024 // In that case all hardlink inodes with a (potential) linkcount > 1 will get
1025 // degraded to files containing the same content
1026 if (analyze_file_linkcounts_) {
1027 retval = AnalyzeFileLinkcounts(data);
1028 if (!retval) {
1029 return false;
1030 }
1031 }
1032
1033 // Analyze the linkcounts of directories
1034 // - each directory has a linkcount of at least 2 (empty directory)
1035 // (link in parent directory and self reference (cd .) )
1036 // - for each child directory, the parent's link count is incremented by 1
1037 // (parent reference in child (cd ..) )
1038 //
1039 // Note: nested catalog mountpoints will be miscalculated here, since we can't
1040 // check the number of containing directories. They are defined in a the
1041 // linked nested catalog and need to be added later on.
1042 // (see: MigrateNestedCatalogMountPoints() for details)
1043 catalog::SqlCatalog sql_dir_linkcounts(
1044 writable,
1045 "INSERT INTO dir_linkcounts "
1046 " SELECT c1.inode as inode, "
1047 " SUM(IFNULL(MIN(c2.inode,1),0)) + 2 as linkcount "
1048 " FROM old.catalog as c1 "
1049 " LEFT JOIN old.catalog as c2 "
1050 " ON c2.parent_1 = c1.md5path_1 AND "
1051 " c2.parent_2 = c1.md5path_2 AND "
1052 " c2.flags & :flag_dir_1 "
1053 " WHERE c1.flags & :flag_dir_2 "
1054 " GROUP BY c1.inode;");
1055 retval = sql_dir_linkcounts.BindInt64(1, catalog::SqlDirent::kFlagDir)
1056 && sql_dir_linkcounts.BindInt64(2, catalog::SqlDirent::kFlagDir)
1057 && sql_dir_linkcounts.Execute();
1058 if (!retval) {
1059 Error("Failed to analyze directory specific linkcounts", sql_dir_linkcounts,
1060 data);
1061 if (sql_dir_linkcounts.GetLastError() == SQLITE_CONSTRAINT) {
1062 Error("Obviously your catalogs are corrupted, since we found a directory"
1063 "inode that is a file inode at the same time!");
1064 }
1065 return false;
1066 }
1067
1068 // Copy the old file meta information into the new catalog schema
1069 // here we also add the previously analyzed hardlink/linkcount information
1070 // from both temporary tables "hardlinks" and "dir_linkcounts".
1071 //
1072 // Note: nested catalog mountpoints still need to be treated separately
1073 // (see MigrateNestedCatalogMountPoints() for details)
1074 catalog::SqlCatalog migrate_file_meta_data(
1075 writable,
1076 "INSERT INTO catalog "
1077 " SELECT md5path_1, md5path_2, "
1078 " parent_1, parent_2, "
1079 " IFNULL(hardlink_group_id, 0) << 32 | "
1080 " COALESCE(hardlinks.linkcount, dir_linkcounts.linkcount, 1) "
1081 " AS hardlinks, "
1082 " hash, size, mode, mtime, NULL, " // set empty mtimens
1083 " flags, name, symlink, "
1084 " :uid, "
1085 " :gid, "
1086 " NULL " // set empty xattr BLOB (default)
1087 " FROM old.catalog "
1088 " LEFT JOIN hardlinks "
1089 " ON catalog.inode = hardlinks.inode "
1090 " LEFT JOIN dir_linkcounts "
1091 " ON catalog.inode = dir_linkcounts.inode;");
1092 retval = migrate_file_meta_data.BindInt64(1, uid_)
1093 && migrate_file_meta_data.BindInt64(2, gid_)
1094 && migrate_file_meta_data.Execute();
1095 if (!retval) {
1096 Error("Failed to migrate the file system meta data", migrate_file_meta_data,
1097 data);
1098 return false;
1099 }
1100
1101 // If we deal with a nested catalog, we need to add a .cvmfscatalog entry
1102 // since it was not present in the old repository specification but is needed
1103 // now!
1104 if (!data->IsRoot()) {
1105 const catalog::DirectoryEntry
1106 &nested_marker = CommandMigrate::GetNestedCatalogMarkerDirent();
1107 catalog::SqlDirentInsert insert_nested_marker(writable);
1108 const std::string root_path = data->root_path();
1109 const std::string file_path = root_path + "/"
1110 + nested_marker.name().ToString();
1111 const shash::Md5 &path_hash = shash::Md5(file_path.data(),
1112 file_path.size());
1113 const shash::Md5 &parent_hash = shash::Md5(root_path.data(),
1114 root_path.size());
1115 retval = insert_nested_marker.BindPathHash(path_hash)
1116 && insert_nested_marker.BindParentPathHash(parent_hash)
1117 && insert_nested_marker.BindDirent(nested_marker)
1118 && insert_nested_marker.BindXattrEmpty()
1119 && insert_nested_marker.Execute();
1120 if (!retval) {
1121 Error("Failed to insert nested catalog marker into new nested catalog.",
1122 insert_nested_marker, data);
1123 return false;
1124 }
1125 }
1126
1127 // Copy (and update) the properties fields
1128 //
1129 // Note: The 'schema' is explicitly not copied to the new catalog.
1130 // Each catalog contains a revision, which is also copied here and that
1131 // is later updated by calling catalog->IncrementRevision()
1132 catalog::SqlCatalog copy_properties(writable,
1133 "INSERT OR REPLACE INTO properties "
1134 " SELECT key, value "
1135 " FROM old.properties "
1136 " WHERE key != 'schema';");
1137 retval = copy_properties.Execute();
1138 if (!retval) {
1139 Error("Failed to migrate the properties table.", copy_properties, data);
1140 return false;
1141 }
1142
1143 return true;
1144 }
1145
1146
1147 bool CommandMigrate::MigrationWorker_20x::AnalyzeFileLinkcounts(
1148 PendingCatalog *data) const {
1149 assert(data->HasNew());
1150 const catalog::CatalogDatabase &writable = data->new_catalog->database();
1151 bool retval;
1152
1153 // Analyze the hardlink relationships in the old catalog
1154 // inodes used to be assigned at publishing time, implicitly constituating
1155 // those relationships. We now need them explicitly in the file catalogs
1156 // This looks for directory entries with matching inodes but differing path-
1157 // hashes and saves the results in a temporary table called 'hl_scratch'
1158 //
1159 // Note: We only support hardlink groups that reside in the same directory!
1160 // Therefore we first need to figure out hardlink candidates (which
1161 // might still contain hardlink groups spanning more than one directory)
1162 // In a second step these candidates will be analyzed to kick out un-
1163 // supported hardlink groups.
1164 // Unsupported hardlink groups will be be treated as normal files with
1165 // the same content
1166 catalog::SqlCatalog sql_create_hardlinks_scratch_table(
1167 writable,
1168 "CREATE TEMPORARY TABLE hl_scratch AS "
1169 " SELECT c1.inode AS inode, c1.md5path_1, c1.md5path_2, "
1170 " c1.parent_1 as c1p1, c1.parent_2 as c1p2, "
1171 " c2.parent_1 as c2p1, c2.parent_2 as c2p2 "
1172 " FROM old.catalog AS c1 "
1173 " INNER JOIN old.catalog AS c2 "
1174 " ON c1.inode == c2.inode AND "
1175 " (c1.md5path_1 != c2.md5path_1 OR "
1176 " c1.md5path_2 != c2.md5path_2);");
1177 retval = sql_create_hardlinks_scratch_table.Execute();
1178 if (!retval) {
1179 Error("Failed to create temporary scratch table for hardlink analysis",
1180 sql_create_hardlinks_scratch_table, data);
1181 return false;
1182 }
1183
1184 // Figures out which hardlink candidates are supported by CVMFS and can be
1185 // transferred into the new catalog as so called hardlink groups. Unsupported
1186 // hardlinks need to be discarded and treated as normal files containing the
1187 // exact same data
1188 catalog::SqlCatalog fill_linkcount_table_for_files(
1189 writable,
1190 "INSERT INTO hardlinks (inode, linkcount)"
1191 " SELECT inode, count(*) as linkcount "
1192 " FROM ( "
1193 // recombine supported hardlink inodes with their actual manifested
1194 // hard-links in the catalog.
1195 // Note: for each directory entry pointing to the same supported
1196 // hardlink inode we have a distinct MD5 path hash
1197 " SELECT DISTINCT hl.inode, hl.md5path_1, hl.md5path_2 "
1198 " FROM ( "
1199 // sort out supported hardlink inodes from unsupported ones by
1200 // locality
1201 // Note: see the next comment for the nested SELECT
1202 " SELECT inode "
1203 " FROM ( "
1204 " SELECT inode, count(*) AS cnt "
1205 " FROM ( "
1206 // go through the potential hardlinks and collect location infor-
1207 // mation about them.
1208 // Note: we only support hardlinks that all reside in the same
1209 // directory, thus having the same parent (c1p* == c2p*)
1210 // --> For supported hardlink candidates the SELECT DISTINCT
1211 // will produce only a single row, whereas others produce more
1212 " SELECT DISTINCT inode,c1p1,c1p1,c2p1,c2p2 "
1213 " FROM hl_scratch AS hl "
1214 " ) "
1215 " GROUP BY inode "
1216 " ) "
1217 " WHERE cnt = 1 "
1218 " ) AS supported_hardlinks "
1219 " LEFT JOIN hl_scratch AS hl "
1220 " ON supported_hardlinks.inode = hl.inode "
1221 " ) "
1222 " GROUP BY inode;");
1223 retval = fill_linkcount_table_for_files.Execute();
1224 if (!retval) {
1225 Error("Failed to analyze hardlink relationships for files.",
1226 fill_linkcount_table_for_files, data);
1227 return false;
1228 }
1229
1230 // The file linkcount and hardlink analysis is finished and the scratch table
1231 // can be deleted...
1232 catalog::SqlCatalog drop_hardlink_scratch_space(writable,
1233 "DROP TABLE hl_scratch;");
1234 retval = drop_hardlink_scratch_space.Execute();
1235 if (!retval) {
1236 Error("Failed to remove file linkcount analysis scratch table",
1237 drop_hardlink_scratch_space, data);
1238 return false;
1239 }
1240
1241 // Do some statistics if asked for...
1242 if (collect_catalog_statistics_) {
1243 catalog::SqlCatalog count_hardlinks(
1244 writable, "SELECT count(*), sum(linkcount) FROM hardlinks;");
1245 retval = count_hardlinks.FetchRow();
1246 if (!retval) {
1247 Error("Failed to count the generated file hardlinks for statistics",
1248 count_hardlinks, data);
1249 return false;
1250 }
1251
1252 data->statistics.hardlink_group_count += count_hardlinks.RetrieveInt64(0);
1253 data->statistics.aggregated_linkcounts += count_hardlinks.RetrieveInt64(1);
1254 }
1255
1256 return true;
1257 }
1258
1259
1260 bool CommandMigrate::MigrationWorker_20x::MigrateNestedCatalogMountPoints(
1261 PendingCatalog *data) const {
1262 assert(data->HasNew());
1263 const catalog::CatalogDatabase &writable = data->new_catalog->database();
1264 bool retval;
1265
1266 // preparing the SQL statement for nested catalog mountpoint update
1267 catalog::SqlCatalog update_mntpnt_linkcount(
1268 writable,
1269 "UPDATE catalog "
1270 "SET hardlinks = :linkcount "
1271 "WHERE md5path_1 = :md5_1 AND md5path_2 = :md5_2;");
1272
1273 // update all nested catalog mountpoints
1274 // (Note: we might need to wait for the nested catalog to be processed)
1275 PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
1276 const PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
1277 for (; i != iend; ++i) {
1278 // collect information about the nested catalog
1279 PendingCatalog *nested_catalog = *i;
1280 const catalog::DirectoryEntry root_entry = nested_catalog->root_entry.Get();
1281 const string &root_path = nested_catalog->root_path();
1282
1283 // update the nested catalog mountpoint directory entry with the correct
1284 // linkcount that was determined while processing the nested catalog
1285 const shash::Md5 mountpoint_hash = shash::Md5(root_path.data(),
1286 root_path.size());
1287 retval = update_mntpnt_linkcount.BindInt64(1, root_entry.linkcount())
1288 && update_mntpnt_linkcount.BindMd5(2, 3, mountpoint_hash)
1289 && update_mntpnt_linkcount.Execute();
1290 if (!retval) {
1291 Error("Failed to update linkcount of nested catalog mountpoint",
1292 update_mntpnt_linkcount, data);
1293 return false;
1294 }
1295 update_mntpnt_linkcount.Reset();
1296 }
1297
1298 return true;
1299 }
1300
1301
1302 bool CommandMigrate::MigrationWorker_20x::FixNestedCatalogTransitionPoints(
1303 PendingCatalog *data) const {
1304 assert(data->HasNew());
1305 if (!fix_nested_catalog_transitions_) {
1306 // Fixing transition point mismatches is not enabled...
1307 return true;
1308 }
1309
1310 typedef catalog::DirectoryEntry::Difference Difference;
1311
1312 const catalog::CatalogDatabase &writable = data->new_catalog->database();
1313 bool retval;
1314
1315 catalog::SqlLookupPathHash lookup_mountpoint(writable);
1316 catalog::SqlDirentUpdate update_directory_entry(writable);
1317
1318 // Unbox the nested catalogs (possibly waiting for migration of them first)
1319 PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
1320 const PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
1321 for (; i != iend; ++i) {
1322 // Collect information about the nested catalog
1323 PendingCatalog *nested_catalog = *i;
1324 const catalog::DirectoryEntry nested_root_entry = nested_catalog->root_entry
1325 .Get();
1326 const string &nested_root_path = nested_catalog->root_path();
1327 const shash::Md5 mountpoint_path_hash = shash::Md5(nested_root_path.data(),
1328 nested_root_path.size());
1329
1330 // Retrieve the nested catalog mountpoint from the current catalog
1331 retval = lookup_mountpoint.BindPathHash(mountpoint_path_hash)
1332 && lookup_mountpoint.FetchRow();
1333 if (!retval) {
1334 Error("Failed to fetch nested catalog mountpoint to check for compatible"
1335 "transition points",
1336 lookup_mountpoint, data);
1337 return false;
1338 }
1339
1340 catalog::DirectoryEntry mountpoint_entry = lookup_mountpoint.GetDirent(
1341 data->new_catalog);
1342 lookup_mountpoint.Reset();
1343
1344 // Compare nested catalog mountpoint and nested catalog root entries
1345 const catalog::DirectoryEntry::Differences diffs =
1346 mountpoint_entry.CompareTo(nested_root_entry);
1347
1348 // We MUST deal with two directory entries that are a pair of nested cata-
1349 // log mountpoint and root entry! Thus we expect their transition flags to
1350 // differ and their name to be the same.
1351 assert(diffs & Difference::kNestedCatalogTransitionFlags);
1352 assert((diffs & Difference::kName) == 0);
1353
1354 // Check if there are other differences except the nested catalog transition
1355 // flags and fix them...
1356 if ((diffs ^ Difference::kNestedCatalogTransitionFlags) != 0) {
1357 // If we found differences, we still assume a couple of directory entry
1358 // fields to be the same, otherwise some severe stuff would be wrong...
1359 if ((diffs & Difference::kChecksum) || (diffs & Difference::kLinkcount)
1360 || (diffs & Difference::kSymlink)
1361 || (diffs & Difference::kChunkedFileFlag)) {
1362 Error("Found an irreparable mismatch in a nested catalog transition "
1363 "point at '"
1364 + nested_root_path + "'\nAborting...\n");
1365 }
1366
1367 // Copy the properties from the nested catalog root entry into the mount-
1368 // point entry to bring them in sync again
1369 CommandMigrate::FixNestedCatalogTransitionPoint(nested_root_entry,
1370 &mountpoint_entry);
1371
1372 // save the nested catalog mountpoint entry into the catalog
1373 retval = update_directory_entry.BindPathHash(mountpoint_path_hash)
1374 && update_directory_entry.BindDirent(mountpoint_entry)
1375 && update_directory_entry.Execute();
1376 if (!retval) {
1377 Error("Failed to save resynchronized nested catalog mountpoint into "
1378 "catalog database",
1379 update_directory_entry, data);
1380 return false;
1381 }
1382 update_directory_entry.Reset();
1383
1384 // Fixing of this mountpoint went well... inform the user that this minor
1385 // issue occurred
1386 LogCvmfs(kLogCatalog, kLogStdout,
1387 "NOTE: fixed incompatible nested catalog transition point at: "
1388 "'%s' ",
1389 nested_root_path.c_str());
1390 }
1391 }
1392
1393 return true;
1394 }
1395
1396
1397 void CommandMigrate::FixNestedCatalogTransitionPoint(
1398 const catalog::DirectoryEntry &nested_root,
1399 catalog::DirectoryEntry *mountpoint) {
1400 // Replace some file system parameters in the mountpoint to resync it with
1401 // the nested root of the corresponding nested catalog
1402 //
1403 // Note: this method relies on CommandMigrate being a friend of DirectoryEntry
1404 mountpoint->mode_ = nested_root.mode_;
1405 mountpoint->uid_ = nested_root.uid_;
1406 mountpoint->gid_ = nested_root.gid_;
1407 mountpoint->size_ = nested_root.size_;
1408 mountpoint->mtime_ = nested_root.mtime_;
1409 }
1410
1411
1412 bool CommandMigrate::MigrationWorker_20x::RemoveDanglingNestedMountpoints(
1413 PendingCatalog *data) const {
1414 assert(data->HasNew());
1415 const catalog::CatalogDatabase &writable = data->new_catalog->database();
1416 bool retval = false;
1417
1418 // build a set of registered nested catalog path hashes
1419 typedef catalog::Catalog::NestedCatalogList NestedCatalogList;
1420 typedef std::map<shash::Md5, catalog::Catalog::NestedCatalog>
1421 NestedCatalogMap;
1422 const NestedCatalogList &nested_clgs = data->old_catalog
1423 ->ListNestedCatalogs();
1424 NestedCatalogList::const_iterator i = nested_clgs.begin();
1425 const NestedCatalogList::const_iterator iend = nested_clgs.end();
1426 NestedCatalogMap nested_catalog_path_hashes;
1427 for (; i != iend; ++i) {
1428 const PathString &path = i->mountpoint;
1429 const shash::Md5 hash(path.GetChars(), path.GetLength());
1430 nested_catalog_path_hashes[hash] = *i;
1431 }
1432
1433 // Retrieve nested catalog mountpoints that have child entries directly inside
1434 // the current catalog (which is a malformed state)
1435 catalog::SqlLookupDanglingMountpoints sql_dangling_mountpoints(writable);
1436 catalog::SqlDirentUpdate save_updated_mountpoint(writable);
1437
1438 std::vector<catalog::DirectoryEntry> todo_dirent;
1439 std::vector<shash::Md5> todo_hash;
1440
1441 // go through the list of dangling nested catalog mountpoints and fix them
1442 // where needed (check if there is no nested catalog registered for them)
1443 while (sql_dangling_mountpoints.FetchRow()) {
1444 catalog::DirectoryEntry dangling_mountpoint = sql_dangling_mountpoints
1445 .GetDirent(
1446 data->new_catalog);
1447 const shash::Md5 path_hash = sql_dangling_mountpoints.GetPathHash();
1448 assert(dangling_mountpoint.IsNestedCatalogMountpoint());
1449
1450 // check if the nested catalog mountpoint is registered in the nested cata-
1451 // log list of the currently migrated catalog
1452 const NestedCatalogMap::const_iterator
1453 nested_catalog = nested_catalog_path_hashes.find(path_hash);
1454 if (nested_catalog != nested_catalog_path_hashes.end()) {
1455 LogCvmfs(kLogCatalog, kLogStderr,
1456 "WARNING: found a non-empty nested catalog mountpoint under "
1457 "'%s'",
1458 nested_catalog->second.mountpoint.c_str());
1459 continue;
1460 }
1461
1462 // the mountpoint was confirmed to be dangling and needs to be removed
1463 dangling_mountpoint.set_is_nested_catalog_mountpoint(false);
1464 todo_dirent.push_back(dangling_mountpoint);
1465 todo_hash.push_back(path_hash);
1466 }
1467
1468 for (unsigned i = 0; i < todo_dirent.size(); ++i) {
1469 retval = save_updated_mountpoint.BindPathHash(todo_hash[i])
1470 && save_updated_mountpoint.BindDirent(todo_dirent[i])
1471 && save_updated_mountpoint.Execute()
1472 && save_updated_mountpoint.Reset();
1473 if (!retval) {
1474 Error("Failed to remove dangling nested catalog mountpoint entry in "
1475 "catalog",
1476 save_updated_mountpoint, data);
1477 return false;
1478 }
1479
1480 // tell the user that this intervention has been taken place
1481 LogCvmfs(kLogCatalog, kLogStdout,
1482 "NOTE: fixed dangling nested catalog "
1483 "mountpoint entry called: '%s' ",
1484 todo_dirent[i].name().c_str());
1485 }
1486
1487 return true;
1488 }
1489
1490
1491 const catalog::DirectoryEntry &CommandMigrate::GetNestedCatalogMarkerDirent() {
1492 // This is pre-initialized singleton... it MUST be already there...
1493 assert(nested_catalog_marker_.name_.ToString() == ".cvmfscatalog");
1494 return nested_catalog_marker_;
1495 }
1496
1497 bool CommandMigrate::GenerateNestedCatalogMarkerChunk() {
1498 // Create an empty nested catalog marker file
1499 nested_catalog_marker_tmp_path_ = CreateTempPath(
1500 temporary_directory_ + "/.cvmfscatalog", 0644);
1501 if (nested_catalog_marker_tmp_path_.empty()) {
1502 Error("Failed to create temp file for nested catalog marker dummy.");
1503 return false;
1504 }
1505
1506 // Process and upload it to the backend storage
1507 IngestionSource *source = new FileIngestionSource(
1508 nested_catalog_marker_tmp_path_);
1509 spooler_->Process(source);
1510 return true;
1511 }
1512
1513 void CommandMigrate::CreateNestedCatalogMarkerDirent(
1514 const shash::Any &content_hash) {
1515 // Generate it only once
1516 assert(nested_catalog_marker_.name_.ToString() != ".cvmfscatalog");
1517
1518 // Fill the DirectoryEntry structure will all needed information
1519 nested_catalog_marker_.name_.Assign(".cvmfscatalog", strlen(".cvmfscatalog"));
1520 nested_catalog_marker_.mode_ = 33188;
1521 nested_catalog_marker_.uid_ = uid_;
1522 nested_catalog_marker_.gid_ = gid_;
1523 nested_catalog_marker_.size_ = 0;
1524 nested_catalog_marker_.mtime_ = time(NULL);
1525 nested_catalog_marker_.linkcount_ = 1;
1526 nested_catalog_marker_.checksum_ = content_hash;
1527 }
1528
1529
1530 bool CommandMigrate::MigrationWorker_20x::GenerateCatalogStatistics(
1531 PendingCatalog *data) const {
1532 assert(data->HasNew());
1533 bool retval = false;
1534 const catalog::CatalogDatabase &writable = data->new_catalog->database();
1535
1536 // Aggregated the statistics counters of all nested catalogs
1537 // Note: we might need to wait until nested catalogs are successfully
1538 // processed
1539 catalog::DeltaCounters stats_counters;
1540 PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
1541 const PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
1542 for (; i != iend; ++i) {
1543 const PendingCatalog *nested_catalog = *i;
1544 const catalog::DeltaCounters &s = nested_catalog->nested_statistics.Get();
1545 s.PopulateToParent(&stats_counters);
1546 }
1547
1548 // Count various directory entry types in the catalog to fill up the catalog
1549 // statistics counters introduced in the current catalog schema
1550 catalog::SqlCatalog count_regular_files(
1551 writable,
1552 "SELECT count(*) FROM catalog "
1553 " WHERE flags & :flag_file "
1554 " AND NOT flags & :flag_link;");
1555 catalog::SqlCatalog count_symlinks(
1556 writable, "SELECT count(*) FROM catalog WHERE flags & :flag_link;");
1557 catalog::SqlCatalog count_directories(
1558 writable, "SELECT count(*) FROM catalog WHERE flags & :flag_dir;");
1559 catalog::SqlCatalog aggregate_file_size(
1560 writable,
1561 "SELECT sum(size) FROM catalog WHERE flags & :flag_file "
1562 " AND NOT flags & :flag_link");
1563
1564 // Run the actual counting queries
1565 retval = count_regular_files.BindInt64(1, catalog::SqlDirent::kFlagFile)
1566 && count_regular_files.BindInt64(2, catalog::SqlDirent::kFlagLink)
1567 && count_regular_files.FetchRow();
1568 if (!retval) {
1569 Error("Failed to count regular files.", count_regular_files, data);
1570 return false;
1571 }
1572 retval = count_symlinks.BindInt64(1, catalog::SqlDirent::kFlagLink)
1573 && count_symlinks.FetchRow();
1574 if (!retval) {
1575 Error("Failed to count symlinks.", count_symlinks, data);
1576 return false;
1577 }
1578 retval = count_directories.BindInt64(1, catalog::SqlDirent::kFlagDir)
1579 && count_directories.FetchRow();
1580 if (!retval) {
1581 Error("Failed to count directories.", count_directories, data);
1582 return false;
1583 }
1584 retval = aggregate_file_size.BindInt64(1, catalog::SqlDirent::kFlagFile)
1585 && aggregate_file_size.BindInt64(2, catalog::SqlDirent::kFlagLink)
1586 && aggregate_file_size.FetchRow();
1587 if (!retval) {
1588 Error("Failed to aggregate the file sizes.", aggregate_file_size, data);
1589 return false;
1590 }
1591
1592 // Insert the counted statistics into the DeltaCounters data structure
1593 stats_counters.self.regular_files = count_regular_files.RetrieveInt64(0);
1594 stats_counters.self.symlinks = count_symlinks.RetrieveInt64(0);
1595 stats_counters.self.directories = count_directories.RetrieveInt64(0);
1596 stats_counters.self.nested_catalogs = data->nested_catalogs.size();
1597 stats_counters.self.file_size = aggregate_file_size.RetrieveInt64(0);
1598
1599 // Write back the generated statistics counters into the catalog database
1600 stats_counters.WriteToDatabase(writable);
1601
1602 // Push the generated statistics counters up to the parent catalog
1603 data->nested_statistics.Set(stats_counters);
1604
1605 return true;
1606 }
1607
1608
1609 bool CommandMigrate::MigrationWorker_20x::FindRootEntryInformation(
1610 PendingCatalog *data) const {
1611 const catalog::CatalogDatabase &writable = data->new_catalog->database();
1612 bool retval;
1613
1614 std::string root_path = data->root_path();
1615 const shash::Md5 root_path_hash =
1616 shash::Md5(root_path.data(), root_path.size());
1617
1618 catalog::SqlLookupPathHash lookup_root_entry(writable);
1619 retval = lookup_root_entry.BindPathHash(root_path_hash)
1620 && lookup_root_entry.FetchRow();
1621 if (!retval) {
1622 Error("Failed to retrieve root directory entry of migrated catalog",
1623 lookup_root_entry, data);
1624 return false;
1625 }
1626
1627 const catalog::DirectoryEntry entry =
1628 lookup_root_entry.GetDirent(data->new_catalog);
1629 if (entry.linkcount() < 2 || entry.hardlink_group() > 0) {
1630 Error("Retrieved linkcount of catalog root entry is not sane.", data);
1631 return false;
1632 }
1633
1634 data->root_entry.Set(entry);
1635 return true;
1636 }
1637
1638
1639 bool CommandMigrate::MigrationWorker_20x::CommitDatabaseTransaction(
1640 PendingCatalog *data) const {
1641 assert(data->HasNew());
1642 data->new_catalog->Commit();
1643 return true;
1644 }
1645
1646
1647 bool CommandMigrate::MigrationWorker_20x::DetachOldCatalogDatabase(
1648 PendingCatalog *data) const {
1649 assert(data->HasNew());
1650 const catalog::CatalogDatabase &writable = data->new_catalog->database();
1651 catalog::SqlCatalog detach_old_catalog(writable, "DETACH old;");
1652 const bool retval = detach_old_catalog.Execute();
1653 if (!retval) {
1654 Error("Failed to detach old catalog database.", detach_old_catalog, data);
1655 return false;
1656 }
1657 return true;
1658 }
1659
1660
1661 //------------------------------------------------------------------------------
1662
1663
1664 CommandMigrate::MigrationWorker_217::MigrationWorker_217(
1665 const worker_context *context)
1666 : AbstractMigrationWorker<MigrationWorker_217>(context) { }
1667
1668
1669 bool CommandMigrate::MigrationWorker_217::RunMigration(
1670 PendingCatalog *data) const {
1671 return CheckDatabaseSchemaCompatibility(data)
1672 && StartDatabaseTransaction(data)
1673 && GenerateNewStatisticsCounters(data) && UpdateCatalogSchema(data)
1674 && CommitDatabaseTransaction(data);
1675 }
1676
1677
1678 bool CommandMigrate::MigrationWorker_217::CheckDatabaseSchemaCompatibility(
1679 PendingCatalog *data) const {
1680 assert(!data->HasNew());
1681 const catalog::CatalogDatabase &old_catalog = data->old_catalog->database();
1682
1683 if ((old_catalog.schema_version()
1684 < 2.4 - catalog::CatalogDatabase::kSchemaEpsilon)
1685 || (old_catalog.schema_version()
1686 > 2.4 + catalog::CatalogDatabase::kSchemaEpsilon)) {
1687 Error("Given Catalog is not Schema 2.4.", data);
1688 return false;
1689 }
1690
1691 return true;
1692 }
1693
1694
1695 bool CommandMigrate::MigrationWorker_217::StartDatabaseTransaction(
1696 PendingCatalog *data) const {
1697 assert(!data->HasNew());
1698 GetWritable(data->old_catalog)->Transaction();
1699 return true;
1700 }
1701
1702
1703 bool CommandMigrate::MigrationWorker_217::GenerateNewStatisticsCounters(
1704 PendingCatalog *data) const {
1705 assert(!data->HasNew());
1706 bool retval = false;
1707 const catalog::CatalogDatabase &writable = GetWritable(data->old_catalog)
1708 ->database();
1709
1710 // Aggregated the statistics counters of all nested catalogs
1711 // Note: we might need to wait until nested catalogs are successfully
1712 // processed
1713 catalog::DeltaCounters stats_counters;
1714 PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
1715 const PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
1716 for (; i != iend; ++i) {
1717 const PendingCatalog *nested_catalog = *i;
1718 const catalog::DeltaCounters &s = nested_catalog->nested_statistics.Get();
1719 s.PopulateToParent(&stats_counters);
1720 }
1721
1722 // Count various directory entry types in the catalog to fill up the catalog
1723 // statistics counters introduced in the current catalog schema
1724 catalog::SqlCatalog count_chunked_files(
1725 writable,
1726 "SELECT count(*), sum(size) FROM catalog "
1727 " WHERE flags & :flag_chunked_file;");
1728 catalog::SqlCatalog count_file_chunks(writable,
1729 "SELECT count(*) FROM chunks;");
1730 catalog::SqlCatalog aggregate_file_size(
1731 writable,
1732 "SELECT sum(size) FROM catalog WHERE flags & :flag_file "
1733 " AND NOT flags & :flag_link;");
1734
1735 // Run the actual counting queries
1736 retval = count_chunked_files.BindInt64(1, catalog::SqlDirent::kFlagFileChunk)
1737 && count_chunked_files.FetchRow();
1738 if (!retval) {
1739 Error("Failed to count chunked files.", count_chunked_files, data);
1740 return false;
1741 }
1742 retval = count_file_chunks.FetchRow();
1743 if (!retval) {
1744 Error("Failed to count file chunks", count_file_chunks, data);
1745 return false;
1746 }
1747 retval = aggregate_file_size.BindInt64(1, catalog::SqlDirent::kFlagFile)
1748 && aggregate_file_size.BindInt64(2, catalog::SqlDirent::kFlagLink)
1749 && aggregate_file_size.FetchRow();
1750 if (!retval) {
1751 Error("Failed to aggregate the file sizes.", aggregate_file_size, data);
1752 return false;
1753 }
1754
1755 // Insert the counted statistics into the DeltaCounters data structure
1756 stats_counters.self.chunked_files = count_chunked_files.RetrieveInt64(0);
1757 stats_counters.self.chunked_file_size = count_chunked_files.RetrieveInt64(1);
1758 stats_counters.self.file_chunks = count_file_chunks.RetrieveInt64(0);
1759 stats_counters.self.file_size = aggregate_file_size.RetrieveInt64(0);
1760
1761 // Write back the generated statistics counters into the catalog database
1762 catalog::Counters counters;
1763 retval = counters.ReadFromDatabase(writable, catalog::LegacyMode::kLegacy);
1764 if (!retval) {
1765 Error("Failed to read old catalog statistics counters", data);
1766 return false;
1767 }
1768 counters.ApplyDelta(stats_counters);
1769 retval = counters.InsertIntoDatabase(writable);
1770 if (!retval) {
1771 Error("Failed to write new statistics counters to database", data);
1772 return false;
1773 }
1774
1775 // Push the generated statistics counters up to the parent catalog
1776 data->nested_statistics.Set(stats_counters);
1777
1778 return true;
1779 }
1780
1781
1782 bool CommandMigrate::MigrationWorker_217::UpdateCatalogSchema(
1783 PendingCatalog *data) const {
1784 assert(!data->HasNew());
1785 const catalog::CatalogDatabase &writable = GetWritable(data->old_catalog)
1786 ->database();
1787 catalog::SqlCatalog update_schema_version(
1788 writable,
1789 "UPDATE properties SET value = :schema_version WHERE key = 'schema';");
1790
1791 const bool retval = update_schema_version.BindDouble(1, 2.5)
1792 && update_schema_version.Execute();
1793 if (!retval) {
1794 Error(
1795 "Failed to update catalog schema version", update_schema_version, data);
1796 return false;
1797 }
1798
1799 return true;
1800 }
1801
1802
1803 bool CommandMigrate::MigrationWorker_217::CommitDatabaseTransaction(
1804 PendingCatalog *data) const {
1805 assert(!data->HasNew());
1806 GetWritable(data->old_catalog)->Commit();
1807 return true;
1808 }
1809
1810
1811 //------------------------------------------------------------------------------
1812
1813
1814 CommandMigrate::ChownMigrationWorker::ChownMigrationWorker(
1815 const worker_context *context)
1816 : AbstractMigrationWorker<ChownMigrationWorker>(context)
1817 , uid_map_statement_(GenerateMappingStatement(context->uid_map, "uid"))
1818 , gid_map_statement_(GenerateMappingStatement(context->gid_map, "gid")) { }
1819
1820 bool CommandMigrate::ChownMigrationWorker::RunMigration(
1821 PendingCatalog *data) const {
1822 return ApplyPersonaMappings(data);
1823 }
1824
1825
1826 bool CommandMigrate::ChownMigrationWorker::ApplyPersonaMappings(
1827 PendingCatalog *data) const {
1828 assert(data->old_catalog != NULL);
1829 assert(data->new_catalog == NULL);
1830
1831 if (data->old_catalog->mountpoint()
1832 == PathString("/" + string(catalog::VirtualCatalog::kVirtualPath))) {
1833 // skipping virtual catalog
1834 return true;
1835 }
1836
1837 const catalog::CatalogDatabase &db = GetWritable(data->old_catalog)
1838 ->database();
1839
1840 if (!db.BeginTransaction()) {
1841 return false;
1842 }
1843
1844 catalog::SqlCatalog uid_sql(db, uid_map_statement_);
1845 if (!uid_sql.Execute()) {
1846 Error("Failed to update UIDs", uid_sql, data);
1847 return false;
1848 }
1849
1850 catalog::SqlCatalog gid_sql(db, gid_map_statement_);
1851 if (!gid_sql.Execute()) {
1852 Error("Failed to update GIDs", gid_sql, data);
1853 return false;
1854 }
1855
1856 return db.CommitTransaction();
1857 }
1858
1859
1860 template<class MapT>
1861 std::string CommandMigrate::ChownMigrationWorker::GenerateMappingStatement(
1862 const MapT &map, const std::string &column) const {
1863 assert(map.RuleCount() > 0 || map.HasDefault());
1864
1865 std::string stmt = "UPDATE OR ABORT catalog SET " + column + " = ";
1866
1867 if (map.RuleCount() == 0) {
1868 // map everything to the same value (just a simple UPDATE clause)
1869 stmt += StringifyInt(map.GetDefault());
1870 } else {
1871 // apply multiple ID mappings (UPDATE clause with CASE statement)
1872 stmt += "CASE " + column + " ";
1873 typedef typename MapT::map_type::const_iterator map_iterator;
1874 map_iterator i = map.GetRuleMap().begin();
1875 const map_iterator iend = map.GetRuleMap().end();
1876 for (; i != iend; ++i) {
1877 stmt += "WHEN " + StringifyInt(i->first) + " THEN "
1878 + StringifyInt(i->second) + " ";
1879 }
1880
1881 // add a default (if provided) or leave unchanged if no mapping fits
1882 stmt += (map.HasDefault()) ? "ELSE " + StringifyInt(map.GetDefault()) + " "
1883 : "ELSE " + column + " ";
1884 stmt += "END";
1885 }
1886
1887 stmt += ";";
1888 return stmt;
1889 }
1890
1891
1892 //------------------------------------------------------------------------------
1893
1894
1895 bool CommandMigrate::HardlinkRemovalMigrationWorker::RunMigration(
1896 PendingCatalog *data) const {
1897 return CheckDatabaseSchemaCompatibility(data) && BreakUpHardlinks(data);
1898 }
1899
1900
1901 bool CommandMigrate::HardlinkRemovalMigrationWorker::
1902 CheckDatabaseSchemaCompatibility(PendingCatalog *data) const {
1903 assert(data->old_catalog != NULL);
1904 assert(data->new_catalog == NULL);
1905
1906 const catalog::CatalogDatabase &clg = data->old_catalog->database();
1907 return clg.schema_version() >= 2.4 - catalog::CatalogDatabase::kSchemaEpsilon;
1908 }
1909
1910
1911 bool CommandMigrate::HardlinkRemovalMigrationWorker::BreakUpHardlinks(
1912 PendingCatalog *data) const {
1913 assert(data->old_catalog != NULL);
1914 assert(data->new_catalog == NULL);
1915
1916 const catalog::CatalogDatabase &db = GetWritable(data->old_catalog)
1917 ->database();
1918
1919 if (!db.BeginTransaction()) {
1920 return false;
1921 }
1922
1923 // CernVM-FS catalogs do not contain inodes directly but they are assigned by
1924 // the CVMFS catalog at runtime. Hardlinks are treated with so-called hardlink
1925 // group IDs to indicate hardlink relationships that need to be respected at
1926 // runtime by assigning identical inodes accordingly.
1927 //
1928 // This updates all directory entries of a given catalog that have a linkcount
1929 // greater than 1 and are flagged as a 'file'. Note: Symlinks are flagged both
1930 // as 'file' and as 'symlink', hence they are updated implicitly as well.
1931 //
1932 // The 'hardlinks' field in the catalog contains two 32 bit integers:
1933 // * the linkcount in the lower 32 bits
1934 // * the (so called) hardlink group ID in the higher 32 bits
1935 //
1936 // Files that have a linkcount of exactly 1 do not have any hardlinks and have
1937 // the (implicit) hardlink group ID '0'. Hence, 'hardlinks == 1' means that a
1938 // file doesn't have any hardlinks (linkcount = 1) and doesn't need treatment
1939 // here.
1940 //
1941 // Files that have hardlinks (linkcount > 1) will have a very large integer in
1942 // their 'hardlinks' field (hardlink group ID > 0 in higher 32 bits). Those
1943 // files will be treated by setting their 'hardlinks' field to 1, effectively
1944 // clearing all hardlink information from the directory entry.
1945 const std::string stmt = "UPDATE OR ABORT catalog "
1946 "SET hardlinks = 1 "
1947 "WHERE flags & :file_flag "
1948 " AND hardlinks > 1;";
1949 catalog::SqlCatalog hardlink_removal_sql(db, stmt);
1950 hardlink_removal_sql.BindInt64(1, catalog::SqlDirent::kFlagFile);
1951 hardlink_removal_sql.Execute();
1952
1953 return db.CommitTransaction();
1954 }
1955
1956 //------------------------------------------------------------------------------
1957
1958
1959 bool CommandMigrate::BulkhashRemovalMigrationWorker::RunMigration(
1960 PendingCatalog *data) const {
1961 return CheckDatabaseSchemaCompatibility(data)
1962 && RemoveRedundantBulkHashes(data);
1963 }
1964
1965
1966 bool CommandMigrate::BulkhashRemovalMigrationWorker::
1967 CheckDatabaseSchemaCompatibility(PendingCatalog *data) const {
1968 assert(data->old_catalog != NULL);
1969 assert(data->new_catalog == NULL);
1970
1971 const catalog::CatalogDatabase &clg = data->old_catalog->database();
1972 return clg.schema_version() >= 2.4 - catalog::CatalogDatabase::kSchemaEpsilon;
1973 }
1974
1975
1976 bool CommandMigrate::BulkhashRemovalMigrationWorker::RemoveRedundantBulkHashes(
1977 PendingCatalog *data) const {
1978 assert(data->old_catalog != NULL);
1979 assert(data->new_catalog == NULL);
1980
1981 const catalog::CatalogDatabase &db = GetWritable(data->old_catalog)
1982 ->database();
1983
1984 if (!db.BeginTransaction()) {
1985 return false;
1986 }
1987
1988 // Regular files with both bulk hashes and chunked hashes can drop the bulk
1989 // hash since modern clients >= 2.1.7 won't require them
1990 const std::string stmt = "UPDATE OR ABORT catalog "
1991 "SET hash = NULL "
1992 "WHERE flags & :file_chunked_flag;";
1993 catalog::SqlCatalog bulkhash_removal_sql(db, stmt);
1994 bulkhash_removal_sql.BindInt64(1, catalog::SqlDirent::kFlagFileChunk);
1995 bulkhash_removal_sql.Execute();
1996
1997 return db.CommitTransaction();
1998 }
1999
2000
2001 //------------------------------------------------------------------------------
2002
2003
2004 CommandMigrate::StatsMigrationWorker::StatsMigrationWorker(
2005 const worker_context *context)
2006 : AbstractMigrationWorker<StatsMigrationWorker>(context) { }
2007
2008
2009 bool CommandMigrate::StatsMigrationWorker::RunMigration(
2010 PendingCatalog *data) const {
2011 return CheckDatabaseSchemaCompatibility(data)
2012 && StartDatabaseTransaction(data) && RepairStatisticsCounters(data)
2013 && CommitDatabaseTransaction(data);
2014 }
2015
2016
2017 bool CommandMigrate::StatsMigrationWorker::CheckDatabaseSchemaCompatibility(
2018 PendingCatalog *data) const {
2019 assert(data->old_catalog != NULL);
2020 assert(data->new_catalog == NULL);
2021
2022 const catalog::CatalogDatabase &clg = data->old_catalog->database();
2023 if (clg.schema_version() < 2.5 - catalog::CatalogDatabase::kSchemaEpsilon) {
2024 Error("Given catalog schema is < 2.5.", data);
2025 return false;
2026 }
2027
2028 if (clg.schema_revision() < 5) {
2029 Error("Given catalog revision is < 5", data);
2030 return false;
2031 }
2032
2033 return true;
2034 }
2035
2036
2037 bool CommandMigrate::StatsMigrationWorker::StartDatabaseTransaction(
2038 PendingCatalog *data) const {
2039 assert(!data->HasNew());
2040 GetWritable(data->old_catalog)->Transaction();
2041 return true;
2042 }
2043
2044
2045 bool CommandMigrate::StatsMigrationWorker::RepairStatisticsCounters(
2046 PendingCatalog *data) const {
2047 assert(!data->HasNew());
2048 bool retval = false;
2049 const catalog::CatalogDatabase &writable = GetWritable(data->old_catalog)
2050 ->database();
2051
2052 // Aggregated the statistics counters of all nested catalogs
2053 // Note: we might need to wait until nested catalogs are successfully
2054 // processed
2055 catalog::DeltaCounters stats_counters;
2056 PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
2057 const PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
2058 for (; i != iend; ++i) {
2059 const PendingCatalog *nested_catalog = *i;
2060 const catalog::DeltaCounters &s = nested_catalog->nested_statistics.Get();
2061 s.PopulateToParent(&stats_counters);
2062 }
2063
2064 // Count various directory entry types in the catalog to fill up the catalog
2065 // statistics counters introduced in the current catalog schema
2066 catalog::SqlCatalog count_regular(
2067 writable,
2068 std::string("SELECT count(*), sum(size) FROM catalog ") + "WHERE flags & "
2069 + StringifyInt(catalog::SqlDirent::kFlagFile) + " AND NOT flags & "
2070 + StringifyInt(catalog::SqlDirent::kFlagLink) + " AND NOT flags & "
2071 + StringifyInt(catalog::SqlDirent::kFlagFileSpecial) + ";");
2072 catalog::SqlCatalog count_external(
2073 writable,
2074 std::string("SELECT count(*), sum(size) FROM catalog ") + "WHERE flags & "
2075 + StringifyInt(catalog::SqlDirent::kFlagFileExternal) + ";");
2076 catalog::SqlCatalog count_symlink(
2077 writable,
2078 std::string("SELECT count(*) FROM catalog ") + "WHERE flags & "
2079 + StringifyInt(catalog::SqlDirent::kFlagLink) + ";");
2080 catalog::SqlCatalog count_special(
2081 writable,
2082 std::string("SELECT count(*) FROM catalog ") + "WHERE flags & "
2083 + StringifyInt(catalog::SqlDirent::kFlagFileSpecial) + ";");
2084 catalog::SqlCatalog count_xattr(writable,
2085 std::string("SELECT count(*) FROM catalog ")
2086 + "WHERE xattr IS NOT NULL;");
2087 catalog::SqlCatalog count_chunk(
2088 writable,
2089 std::string("SELECT count(*), sum(size) FROM catalog ") + "WHERE flags & "
2090 + StringifyInt(catalog::SqlDirent::kFlagFileChunk) + ";");
2091 catalog::SqlCatalog count_dir(
2092 writable,
2093 std::string("SELECT count(*) FROM catalog ") + "WHERE flags & "
2094 + StringifyInt(catalog::SqlDirent::kFlagDir) + ";");
2095 catalog::SqlCatalog count_chunk_blobs(writable,
2096 "SELECT count(*) FROM chunks;");
2097
2098 retval = count_regular.FetchRow() && count_external.FetchRow()
2099 && count_symlink.FetchRow() && count_special.FetchRow()
2100 && count_xattr.FetchRow() && count_chunk.FetchRow()
2101 && count_dir.FetchRow() && count_chunk_blobs.FetchRow();
2102 if (!retval) {
2103 Error("Failed to collect catalog statistics", data);
2104 return false;
2105 }
2106
2107 stats_counters.self.regular_files = count_regular.RetrieveInt64(0);
2108 stats_counters.self.symlinks = count_symlink.RetrieveInt64(0);
2109 stats_counters.self.specials = count_special.RetrieveInt64(0);
2110 stats_counters.self.directories = count_dir.RetrieveInt64(0);
2111 stats_counters.self.nested_catalogs = data->nested_catalogs.size();
2112 stats_counters.self.chunked_files = count_chunk.RetrieveInt64(0);
2113 stats_counters.self.file_chunks = count_chunk_blobs.RetrieveInt64(0);
2114 stats_counters.self.file_size = count_regular.RetrieveInt64(1);
2115 stats_counters.self.chunked_file_size = count_chunk.RetrieveInt64(1);
2116 stats_counters.self.xattrs = count_xattr.RetrieveInt64(0);
2117 stats_counters.self.externals = count_external.RetrieveInt64(0);
2118 stats_counters.self.external_file_size = count_external.RetrieveInt64(1);
2119
2120 // Write back the generated statistics counters into the catalog database
2121 catalog::Counters counters;
2122 counters.ApplyDelta(stats_counters);
2123 retval = counters.InsertIntoDatabase(writable);
2124 if (!retval) {
2125 Error("Failed to write new statistics counters to database", data);
2126 return false;
2127 }
2128
2129 // Push the generated statistics counters up to the parent catalog
2130 data->nested_statistics.Set(stats_counters);
2131
2132 return true;
2133 }
2134
2135
2136 bool CommandMigrate::StatsMigrationWorker::CommitDatabaseTransaction(
2137 PendingCatalog *data) const {
2138 assert(!data->HasNew());
2139 GetWritable(data->old_catalog)->Commit();
2140 return true;
2141 }
2142
2143 } // namespace swissknife
2144