GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/swissknife_migrate.cc
Date: 2024-04-21 02:33:16
Exec Total Coverage
Lines: 0 1067 0.0%
Branches: 0 650 0.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 *
4 * Careful: any real schema migration as of now requires taking care of
5 * hash algorithm
6 */
7
8 #include "swissknife_migrate.h"
9
10 #include <sys/resource.h>
11 #include <unistd.h>
12
13 #include "catalog_rw.h"
14 #include "catalog_sql.h"
15 #include "catalog_virtual.h"
16 #include "compression.h"
17 #include "crypto/hash.h"
18 #include "swissknife_history.h"
19 #include "util/concurrency.h"
20 #include "util/logging.h"
21
22 using namespace std; // NOLINT
23
24 namespace swissknife {
25
26 catalog::DirectoryEntry CommandMigrate::nested_catalog_marker_;
27
28 CommandMigrate::CommandMigrate() :
29 file_descriptor_limit_(8192),
30 catalog_count_(0),
31 has_committed_new_revision_(false),
32 uid_(0),
33 gid_(0),
34 root_catalog_(NULL)
35 {
36 atomic_init32(&catalogs_processed_);
37 }
38
39
40 ParameterList CommandMigrate::GetParams() const {
41 ParameterList r;
42 r.push_back(Parameter::Mandatory('v',
43 "migration base version ( 2.0.x | 2.1.7 | chown | hardlink | bulkhash | "
44 "stats)"));
45 r.push_back(Parameter::Mandatory('r',
46 "repository URL (absolute local path or remote URL)"));
47 r.push_back(Parameter::Mandatory('u', "upstream definition string"));
48 r.push_back(Parameter::Mandatory('o', "manifest output file"));
49 r.push_back(Parameter::Mandatory('t',
50 "temporary directory for catalog decompress"));
51 r.push_back(Parameter::Optional('p',
52 "user id to be used for this repository"));
53 r.push_back(Parameter::Optional('g',
54 "group id to be used for this repository"));
55 r.push_back(Parameter::Optional('n', "fully qualified repository name"));
56 r.push_back(Parameter::Optional('h', "root hash (other than trunk)"));
57 r.push_back(Parameter::Optional('k', "repository master key(s)"));
58 r.push_back(Parameter::Optional('i', "UID map for chown"));
59 r.push_back(Parameter::Optional('j', "GID map for chown"));
60 r.push_back(Parameter::Optional('@', "proxy url"));
61 r.push_back(Parameter::Switch('f', "fix nested catalog transition points"));
62 r.push_back(Parameter::Switch('l', "disable linkcount analysis of files"));
63 r.push_back(Parameter::Switch('s',
64 "enable collection of catalog statistics"));
65 return r;
66 }
67
68
69 static void Error(const std::string &message) {
70 LogCvmfs(kLogCatalog, kLogStderr, "%s", message.c_str());
71 }
72
73
74 static void Error(const std::string &message,
75 const CommandMigrate::PendingCatalog *catalog) {
76 const std::string err_msg = message + "\n"
77 "Catalog: " + catalog->root_path();
78 Error(err_msg);
79 }
80
81
82 static void Error(const std::string &message,
83 const catalog::SqlCatalog &statement,
84 const CommandMigrate::PendingCatalog *catalog) {
85 const std::string err_msg =
86 message + "\n"
87 "SQLite: " + StringifyInt(statement.GetLastError()) +
88 " - " + statement.GetLastErrorMsg();
89 Error(err_msg, catalog);
90 }
91
92
93 int CommandMigrate::Main(const ArgumentList &args) {
94 shash::Any manual_root_hash;
95 const std::string &migration_base = *args.find('v')->second;
96 const std::string &repo_url = *args.find('r')->second;
97 const std::string &spooler = *args.find('u')->second;
98 const std::string &manifest_path = *args.find('o')->second;
99 const std::string &tmp_dir = *args.find('t')->second;
100 const std::string &uid = (args.count('p') > 0) ?
101 *args.find('p')->second :
102 "";
103 const std::string &gid = (args.count('g') > 0) ?
104 *args.find('g')->second :
105 "";
106 const std::string &repo_name = (args.count('n') > 0) ?
107 *args.find('n')->second :
108 "";
109 const std::string &repo_keys = (args.count('k') > 0) ?
110 *args.find('k')->second :
111 "";
112 const std::string &uid_map_path = (args.count('i') > 0) ?
113 *args.find('i')->second :
114 "";
115 const std::string &gid_map_path = (args.count('j') > 0) ?
116 *args.find('j')->second :
117 "";
118 const bool fix_transition_points = (args.count('f') > 0);
119 const bool analyze_file_linkcounts = (args.count('l') == 0);
120 const bool collect_catalog_statistics = (args.count('s') > 0);
121 if (args.count('h') > 0) {
122 manual_root_hash = shash::MkFromHexPtr(shash::HexPtr(
123 *args.find('h')->second), shash::kSuffixCatalog);
124 }
125
126 // We might need a lot of file descriptors
127 if (!RaiseFileDescriptorLimit()) {
128 Error("Failed to raise file descriptor limits");
129 return 2;
130 }
131
132 // Put SQLite into multithreaded mode
133 if (!ConfigureSQLite()) {
134 Error("Failed to preconfigure SQLite library");
135 return 3;
136 }
137
138 // Create an upstream spooler
139 temporary_directory_ = tmp_dir;
140 const upload::SpoolerDefinition spooler_definition(spooler, shash::kSha1);
141 spooler_ = upload::Spooler::Construct(spooler_definition);
142 if (!spooler_.IsValid()) {
143 Error("Failed to create upstream Spooler.");
144 return 5;
145 }
146 spooler_->RegisterListener(&CommandMigrate::UploadCallback, this);
147
148 // Load the full catalog hierarchy
149 LogCvmfs(kLogCatalog, kLogStdout, "Loading current catalog tree...");
150
151 catalog_loading_stopwatch_.Start();
152 bool loading_successful = false;
153 if (IsHttpUrl(repo_url)) {
154 typedef HttpObjectFetcher<catalog::WritableCatalog> ObjectFetcher;
155
156 const bool follow_redirects = false;
157 const string proxy = (args.count('@') > 0) ? *args.find('@')->second : "";
158 if (!this->InitDownloadManager(follow_redirects, proxy) ||
159 !this->InitVerifyingSignatureManager(repo_keys)) {
160 LogCvmfs(kLogCatalog, kLogStderr, "Failed to init repo connection");
161 return 1;
162 }
163
164 ObjectFetcher fetcher(repo_name,
165 repo_url,
166 tmp_dir,
167 download_manager(),
168 signature_manager());
169
170 loading_successful = LoadCatalogs(manual_root_hash, &fetcher);
171 } else {
172 typedef LocalObjectFetcher<catalog::WritableCatalog> ObjectFetcher;
173 ObjectFetcher fetcher(repo_url, tmp_dir);
174 loading_successful = LoadCatalogs(manual_root_hash, &fetcher);
175 }
176 catalog_loading_stopwatch_.Stop();
177
178 if (!loading_successful) {
179 Error("Failed to load catalog tree");
180 return 4;
181 }
182
183 LogCvmfs(kLogCatalog, kLogStdout, "Loaded %d catalogs", catalog_count_);
184 assert(root_catalog_ != NULL);
185
186 // Do the actual migration step
187 bool migration_succeeded = false;
188 if (migration_base == "2.0.x") {
189 if (!ReadPersona(uid, gid)) {
190 return 1;
191 }
192
193 // Generate and upload a nested catalog marker
194 if (!GenerateNestedCatalogMarkerChunk()) {
195 Error("Failed to create a nested catalog marker.");
196 return 6;
197 }
198 spooler_->WaitForUpload();
199
200 // Configure the concurrent catalog migration facility
201 MigrationWorker_20x::worker_context context(temporary_directory_,
202 collect_catalog_statistics,
203 fix_transition_points,
204 analyze_file_linkcounts,
205 uid_,
206 gid_);
207 migration_succeeded =
208 DoMigrationAndCommit<MigrationWorker_20x>(manifest_path, &context);
209 } else if (migration_base == "2.1.7") {
210 MigrationWorker_217::worker_context context(temporary_directory_,
211 collect_catalog_statistics);
212 migration_succeeded =
213 DoMigrationAndCommit<MigrationWorker_217>(manifest_path, &context);
214 } else if (migration_base == "chown") {
215 UidMap uid_map;
216 GidMap gid_map;
217 if (!ReadPersonaMaps(uid_map_path, gid_map_path, &uid_map, &gid_map)) {
218 Error("Failed to read UID and/or GID map");
219 return 1;
220 }
221 ChownMigrationWorker::worker_context context(temporary_directory_,
222 collect_catalog_statistics,
223 uid_map,
224 gid_map);
225 migration_succeeded =
226 DoMigrationAndCommit<ChownMigrationWorker>(manifest_path, &context);
227 } else if (migration_base == "hardlink") {
228 HardlinkRemovalMigrationWorker::worker_context
229 context(temporary_directory_, collect_catalog_statistics);
230 migration_succeeded =
231 DoMigrationAndCommit<HardlinkRemovalMigrationWorker>(manifest_path,
232 &context);
233 } else if (migration_base == "bulkhash") {
234 BulkhashRemovalMigrationWorker::worker_context
235 context(temporary_directory_, collect_catalog_statistics);
236 migration_succeeded =
237 DoMigrationAndCommit<BulkhashRemovalMigrationWorker>(manifest_path,
238 &context);
239 } else if (migration_base == "stats") {
240 StatsMigrationWorker::worker_context context(
241 temporary_directory_, collect_catalog_statistics);
242 migration_succeeded =
243 DoMigrationAndCommit<StatsMigrationWorker>(manifest_path, &context);
244 } else {
245 const std::string err_msg = "Unknown migration base: " + migration_base;
246 Error(err_msg);
247 return 1;
248 }
249
250 // Check if everything went well
251 if (!migration_succeeded) {
252 Error("Migration failed!");
253 return 5;
254 }
255
256 // Analyze collected statistics
257 if (collect_catalog_statistics && has_committed_new_revision_) {
258 LogCvmfs(kLogCatalog, kLogStdout, "\nCollected statistics results:");
259 AnalyzeCatalogStatistics();
260 }
261
262 LogCvmfs(kLogCatalog, kLogStdout, "\nCatalog Migration succeeded");
263 return 0;
264 }
265
266
267 bool CommandMigrate::ReadPersona(const std::string &uid,
268 const std::string &gid) {
269 if (uid.empty()) {
270 Error("Please provide a user ID");
271 return false;
272 }
273 if (gid.empty()) {
274 Error("Please provide a group ID");
275 return false;
276 }
277
278 uid_ = String2Int64(uid);
279 gid_ = String2Int64(gid);
280 return true;
281 }
282
283
284
285 bool CommandMigrate::ReadPersonaMaps(const std::string &uid_map_path,
286 const std::string &gid_map_path,
287 UidMap *uid_map,
288 GidMap *gid_map) const {
289 if (!uid_map->Read(uid_map_path) || !uid_map->IsValid()) {
290 Error("Failed to read UID map");
291 return false;
292 }
293
294 if (!gid_map->Read(gid_map_path) || !gid_map->IsValid()) {
295 Error("Failed to read GID map");
296 return false;
297 }
298
299 if (uid_map->RuleCount() == 0 && !uid_map->HasDefault()) {
300 Error("UID map appears to be empty");
301 return false;
302 }
303
304 if (gid_map->RuleCount() == 0 && !gid_map->HasDefault()) {
305 Error("GID map appears to be empty");
306 return false;
307 }
308
309 return true;
310 }
311
312
313 void CommandMigrate::UploadHistoryClosure(
314 const upload::SpoolerResult &result,
315 Future<shash::Any> *hash)
316 {
317 assert(!result.IsChunked());
318 if (result.return_code != 0) {
319 LogCvmfs(kLogCvmfs, kLogStderr, "failed to upload history database (%d)",
320 result.return_code);
321 hash->Set(shash::Any());
322 } else {
323 hash->Set(result.content_hash);
324 }
325 }
326
327
328 bool CommandMigrate::UpdateUndoTags(
329 PendingCatalog *root_catalog,
330 uint64_t revision,
331 time_t timestamp,
332 shash::Any *history_hash)
333 {
334 string filename_old = history_upstream_->filename();
335 string filename_new = filename_old + ".new";
336 bool retval = CopyPath2Path(filename_old, filename_new);
337 if (!retval) return false;
338 UniquePtr<history::SqliteHistory> history(
339 history::SqliteHistory::OpenWritable(filename_new));
340 history->TakeDatabaseFileOwnership();
341
342 history::History::Tag tag_trunk;
343 bool exists = history->GetByName(CommandTag::kHeadTag, &tag_trunk);
344 if (exists) {
345 retval = history->Remove(CommandTag::kHeadTag);
346 if (!retval) return false;
347
348 history::History::Tag tag_trunk_previous = tag_trunk;
349 tag_trunk_previous.name = CommandTag::kPreviousHeadTag;
350 tag_trunk_previous.description = CommandTag::kPreviousHeadTagDescription;
351 history->Remove(CommandTag::kPreviousHeadTag);
352
353 tag_trunk.root_hash = root_catalog->new_catalog_hash;
354 tag_trunk.size = root_catalog->new_catalog_size;
355 tag_trunk.revision = revision;
356 tag_trunk.timestamp = timestamp;
357
358 retval = history->Insert(tag_trunk_previous);
359 if (!retval) return false;
360 retval = history->Insert(tag_trunk);
361 if (!retval) return false;
362 }
363
364 history->SetPreviousRevision(manifest_upstream_->history());
365 history->DropDatabaseFileOwnership();
366 history.Destroy();
367
368 Future<shash::Any> history_hash_new;
369 upload::Spooler::CallbackPtr callback = spooler_->RegisterListener(
370 &CommandMigrate::UploadHistoryClosure, this, &history_hash_new);
371 spooler_->ProcessHistory(filename_new);
372 spooler_->WaitForUpload();
373 spooler_->UnregisterListener(callback);
374 unlink(filename_new.c_str());
375 *history_hash = history_hash_new.Get();
376 if (history_hash->IsNull()) {
377 Error("failed to upload tag database");
378 return false;
379 }
380
381 return true;
382 }
383
384
385 template <class MigratorT>
386 bool CommandMigrate::DoMigrationAndCommit(
387 const std::string &manifest_path,
388 typename MigratorT::worker_context *context
389 ) {
390 // Create a concurrent migration context for catalog migration
391 const unsigned int cpus = GetNumberOfCpuCores();
392 ConcurrentWorkers<MigratorT> concurrent_migration(cpus, cpus * 10, context);
393
394 if (!concurrent_migration.Initialize()) {
395 Error("Failed to initialize worker migration system.");
396 return false;
397 }
398 concurrent_migration.RegisterListener(&CommandMigrate::MigrationCallback,
399 this);
400
401 // Migrate catalogs recursively (starting with the deepest nested catalogs)
402 LogCvmfs(kLogCatalog, kLogStdout, "\nMigrating catalogs...");
403 PendingCatalog *root_catalog = new PendingCatalog(root_catalog_);
404 migration_stopwatch_.Start();
405 ConvertCatalogsRecursively(root_catalog, &concurrent_migration);
406 concurrent_migration.WaitForEmptyQueue();
407 spooler_->WaitForUpload();
408 spooler_->UnregisterListeners();
409 migration_stopwatch_.Stop();
410
411 // check for possible errors during the migration process
412 const unsigned int errors = concurrent_migration.GetNumberOfFailedJobs() +
413 spooler_->GetNumberOfErrors();
414 LogCvmfs(kLogCatalog, kLogStdout,
415 "Catalog Migration finished with %d errors.", errors);
416 if (errors > 0) {
417 LogCvmfs(kLogCatalog, kLogStdout,
418 "\nCatalog Migration produced errors\nAborting...");
419 return false;
420 }
421
422 if (root_catalog->was_updated.Get()) {
423 LogCvmfs(kLogCatalog, kLogStdout,
424 "\nCommitting migrated repository revision...");
425 manifest::Manifest manifest = *manifest_upstream_;
426 manifest.set_catalog_hash(root_catalog->new_catalog_hash);
427 manifest.set_catalog_size(root_catalog->new_catalog_size);
428 manifest.set_root_path(root_catalog->root_path());
429 const catalog::Catalog* new_catalog = (root_catalog->HasNew())
430 ? root_catalog->new_catalog
431 : root_catalog->old_catalog;
432 manifest.set_ttl(new_catalog->GetTTL());
433 manifest.set_revision(new_catalog->GetRevision());
434
435 // Commit the new (migrated) repository revision...
436 if (history_upstream_.IsValid()) {
437 shash::Any history_hash(manifest_upstream_->history());
438 LogCvmfs(kLogCatalog, kLogStdout | kLogNoLinebreak,
439 "Updating repository tag database... ");
440 if (!UpdateUndoTags(root_catalog,
441 new_catalog->GetRevision(),
442 new_catalog->GetLastModified(),
443 &history_hash))
444 {
445 Error("Updating tag database failed.\nAborting...");
446 return false;
447 }
448 manifest.set_history(history_hash);
449 LogCvmfs(kLogCvmfs, kLogStdout, "%s", history_hash.ToString().c_str());
450 }
451
452 if (!manifest.Export(manifest_path)) {
453 Error("Manifest export failed.\nAborting...");
454 return false;
455 }
456 has_committed_new_revision_ = true;
457 } else {
458 LogCvmfs(kLogCatalog, kLogStdout,
459 "\nNo catalogs migrated, skipping the commit...");
460 }
461
462 // Get rid of the open root catalog
463 delete root_catalog;
464
465 return true;
466 }
467
468
469 void CommandMigrate::CatalogCallback(
470 const CatalogTraversalData<catalog::WritableCatalog> &data) {
471 std::string tree_indent;
472 std::string hash_string;
473 std::string path;
474
475 for (unsigned int i = 1; i < data.tree_level; ++i) {
476 tree_indent += "\u2502 ";
477 }
478
479 if (data.tree_level > 0) {
480 tree_indent += "\u251C\u2500 ";
481 }
482
483 hash_string = data.catalog_hash.ToString();
484
485 path = data.catalog->mountpoint().ToString();
486 if (path.empty()) {
487 path = "/";
488 root_catalog_ = data.catalog;
489 }
490
491 LogCvmfs(kLogCatalog, kLogStdout, "%s%s %s",
492 tree_indent.c_str(),
493 hash_string.c_str(),
494 path.c_str());
495
496 ++catalog_count_;
497 }
498
499
500 void CommandMigrate::MigrationCallback(PendingCatalog *const &data) {
501 // Check if the migration of the catalog was successful
502 if (!data->success) {
503 Error("Catalog migration failed! Aborting...");
504 exit(1);
505 return;
506 }
507
508 if (!data->HasChanges()) {
509 PrintStatusMessage(data, data->GetOldContentHash(), "preserved");
510 data->was_updated.Set(false);
511 return;
512 }
513
514 const string &path = (data->HasNew()) ? data->new_catalog->database_path()
515 : data->old_catalog->database_path();
516
517 // Save the processed catalog in the pending map
518 {
519 LockGuard<PendingCatalogMap> guard(&pending_catalogs_);
520 assert(pending_catalogs_.find(path) == pending_catalogs_.end());
521 pending_catalogs_[path] = data;
522 }
523 catalog_statistics_list_.Insert(data->statistics);
524
525 // check the size of the uncompressed catalog file
526 size_t new_catalog_size = GetFileSize(path);
527 if (new_catalog_size <= 0) {
528 Error("Failed to get uncompressed file size of catalog!", data);
529 exit(2);
530 return;
531 }
532 data->new_catalog_size = new_catalog_size;
533
534 // Schedule the compression and upload of the catalog
535 spooler_->ProcessCatalog(path);
536 }
537
538
539 void CommandMigrate::UploadCallback(const upload::SpoolerResult &result) {
540 const string &path = result.local_path;
541
542 // Check if the upload was successful
543 if (result.return_code != 0) {
544 Error("Failed to upload file " + path + "\nAborting...");
545 exit(2);
546 return;
547 }
548 assert(result.file_chunks.size() == 0);
549
550 // Remove the just uploaded file
551 unlink(path.c_str());
552
553 // Uploaded nested catalog marker... generate and cache DirectoryEntry for it
554 if (path == nested_catalog_marker_tmp_path_) {
555 CreateNestedCatalogMarkerDirent(result.content_hash);
556 return;
557 } else {
558 // Find the catalog path in the pending catalogs and remove it from the list
559 PendingCatalog *catalog;
560 {
561 LockGuard<PendingCatalogMap> guard(&pending_catalogs_);
562 PendingCatalogMap::iterator i = pending_catalogs_.find(path);
563 assert(i != pending_catalogs_.end());
564 catalog = const_cast<PendingCatalog*>(i->second);
565 pending_catalogs_.erase(i);
566 }
567
568 PrintStatusMessage(catalog, result.content_hash, "migrated and uploaded");
569
570 // The catalog is completely processed... fill the content_hash to allow the
571 // processing of parent catalogs (Notified by 'was_updated'-future)
572 // NOTE: From now on, this PendingCatalog structure could be deleted and
573 // should not be used anymore!
574 catalog->new_catalog_hash = result.content_hash;
575 catalog->was_updated.Set(true);
576 }
577 }
578
579
580 void CommandMigrate::PrintStatusMessage(const PendingCatalog *catalog,
581 const shash::Any &content_hash,
582 const std::string &message) {
583 atomic_inc32(&catalogs_processed_);
584 const unsigned int processed = (atomic_read32(&catalogs_processed_) * 100) /
585 catalog_count_;
586 LogCvmfs(kLogCatalog, kLogStdout, "[%d%%] %s %sC %s",
587 processed,
588 message.c_str(),
589 content_hash.ToString().c_str(),
590 catalog->root_path().c_str());
591 }
592
593
594 template <class MigratorT>
595 void CommandMigrate::ConvertCatalogsRecursively(PendingCatalog *catalog,
596 MigratorT *migrator) {
597 // First migrate all nested catalogs (depth first traversal)
598 const catalog::CatalogList nested_catalogs =
599 catalog->old_catalog->GetChildren();
600 catalog::CatalogList::const_iterator i = nested_catalogs.begin();
601 catalog::CatalogList::const_iterator iend = nested_catalogs.end();
602 catalog->nested_catalogs.reserve(nested_catalogs.size());
603 for (; i != iend; ++i) {
604 PendingCatalog *new_nested = new PendingCatalog(*i);
605 catalog->nested_catalogs.push_back(new_nested);
606 ConvertCatalogsRecursively(new_nested, migrator);
607 }
608
609 // Migrate this catalog referencing all its (already migrated) children
610 migrator->Schedule(catalog);
611 }
612
613
614 bool CommandMigrate::RaiseFileDescriptorLimit() const {
615 struct rlimit rpl;
616 memset(&rpl, 0, sizeof(rpl));
617 getrlimit(RLIMIT_NOFILE, &rpl);
618 if (rpl.rlim_cur < file_descriptor_limit_) {
619 if (rpl.rlim_max < file_descriptor_limit_)
620 rpl.rlim_max = file_descriptor_limit_;
621 rpl.rlim_cur = file_descriptor_limit_;
622 const bool retval = setrlimit(RLIMIT_NOFILE, &rpl);
623 if (retval != 0) {
624 return false;
625 }
626 }
627 return true;
628 }
629
630
631 bool CommandMigrate::ConfigureSQLite() const {
632 int retval = sqlite3_config(SQLITE_CONFIG_MULTITHREAD);
633 return (retval == SQLITE_OK);
634 }
635
636
637 void CommandMigrate::AnalyzeCatalogStatistics() const {
638 const unsigned int number_of_catalogs = catalog_statistics_list_.size();
639 unsigned int aggregated_entry_count = 0;
640 unsigned int aggregated_max_row_id = 0;
641 unsigned int aggregated_hardlink_count = 0;
642 unsigned int aggregated_linkcounts = 0;
643 double aggregated_migration_time = 0.0;
644
645 CatalogStatisticsList::const_iterator i = catalog_statistics_list_.begin();
646 CatalogStatisticsList::const_iterator iend = catalog_statistics_list_.end();
647 for (; i != iend; ++i) {
648 aggregated_entry_count += i->entry_count;
649 aggregated_max_row_id += i->max_row_id;
650 aggregated_hardlink_count += i->hardlink_group_count;
651 aggregated_linkcounts += i->aggregated_linkcounts;
652 aggregated_migration_time += i->migration_time;
653 }
654
655 // Inode quantization
656 assert(aggregated_max_row_id > 0);
657 const unsigned int unused_inodes =
658 aggregated_max_row_id - aggregated_entry_count;
659 const float ratio =
660 (static_cast<float>(unused_inodes) /
661 static_cast<float>(aggregated_max_row_id)) * 100.0f;
662 LogCvmfs(kLogCatalog, kLogStdout, "Actual Entries: %d\n"
663 "Allocated Inodes: %d\n"
664 " Unused Inodes: %d\n"
665 " Percentage of wasted Inodes: %.1f%%\n",
666 aggregated_entry_count, aggregated_max_row_id, unused_inodes, ratio);
667
668 // Hardlink statistics
669 const float average_linkcount = (aggregated_hardlink_count > 0)
670 ? aggregated_linkcounts /
671 aggregated_hardlink_count
672 : 0.0f;
673 LogCvmfs(kLogCatalog, kLogStdout, "Generated Hardlink Groups: %d\n"
674 "Average Linkcount per Group: %.1f\n",
675 aggregated_hardlink_count, average_linkcount);
676
677 // Performance measures
678 const double average_migration_time =
679 aggregated_migration_time / static_cast<double>(number_of_catalogs);
680 LogCvmfs(kLogCatalog, kLogStdout, "Catalog Loading Time: %.2fs\n"
681 "Average Migration Time: %.2fs\n"
682 "Overall Migration Time: %.2fs\n"
683 "Aggregated Migration Time: %.2fs\n",
684 catalog_loading_stopwatch_.GetTime(),
685 average_migration_time,
686 migration_stopwatch_.GetTime(),
687 aggregated_migration_time);
688 }
689
690
691 CommandMigrate::PendingCatalog::~PendingCatalog() {
692 delete old_catalog;
693 old_catalog = NULL;
694
695 if (new_catalog != NULL) {
696 delete new_catalog;
697 new_catalog = NULL;
698 }
699 }
700
701
702 template<class DerivedT>
703 CommandMigrate::AbstractMigrationWorker<DerivedT>::AbstractMigrationWorker(
704 const worker_context *context)
705 : temporary_directory_(context->temporary_directory)
706 , collect_catalog_statistics_(context->collect_catalog_statistics)
707 { }
708
709
710 template<class DerivedT>
711 CommandMigrate::AbstractMigrationWorker<DerivedT>::~AbstractMigrationWorker() {}
712
713
714 template<class DerivedT>
715 void CommandMigrate::AbstractMigrationWorker<DerivedT>::operator()(
716 const expected_data &data) {
717 migration_stopwatch_.Start();
718 const bool success = static_cast<DerivedT*>(this)->RunMigration(data) &&
719 UpdateNestedCatalogReferences(data) &&
720 UpdateCatalogMetadata(data) &&
721 CollectAndAggregateStatistics(data) &&
722 CleanupNestedCatalogs(data);
723 data->success = success;
724 migration_stopwatch_.Stop();
725
726 data->statistics.migration_time = migration_stopwatch_.GetTime();
727 migration_stopwatch_.Reset();
728
729 // Note: MigrationCallback() will take care of the result...
730 if (success) {
731 ConcurrentWorker<DerivedT>::master()->JobSuccessful(data);
732 } else {
733 ConcurrentWorker<DerivedT>::master()->JobFailed(data);
734 }
735 }
736
737
738 template<class DerivedT>
739 bool CommandMigrate::AbstractMigrationWorker<DerivedT>::
740 UpdateNestedCatalogReferences(PendingCatalog *data) const
741 {
742 const catalog::Catalog *new_catalog =
743 (data->HasNew()) ? data->new_catalog : data->old_catalog;
744 const catalog::CatalogDatabase &writable = new_catalog->database();
745
746 catalog::SqlCatalog add_nested_catalog(writable,
747 "INSERT OR REPLACE INTO nested_catalogs (path, sha1, size) "
748 " VALUES (:path, :sha1, :size);");
749
750 // go through all nested catalogs and update their references (we are
751 // currently in their parent catalog)
752 // Note: we might need to wait for the nested catalog to be fully processed.
753 PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
754 PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
755 for (; i != iend; ++i) {
756 PendingCatalog *nested_catalog = *i;
757
758 if (!nested_catalog->was_updated.Get()) {
759 continue;
760 }
761
762 const std::string &root_path = nested_catalog->root_path();
763 const shash::Any catalog_hash = nested_catalog->new_catalog_hash;
764 const size_t catalog_size = nested_catalog->new_catalog_size;
765
766 // insert the updated nested catalog reference into the new catalog
767 const bool retval =
768 add_nested_catalog.BindText(1, root_path) &&
769 add_nested_catalog.BindText(2, catalog_hash.ToString()) &&
770 add_nested_catalog.BindInt64(3, catalog_size) &&
771 add_nested_catalog.Execute();
772 if (!retval) {
773 Error("Failed to add nested catalog link", add_nested_catalog, data);
774 return false;
775 }
776 add_nested_catalog.Reset();
777 }
778
779 return true;
780 }
781
782
783 template<class DerivedT>
784 bool CommandMigrate::AbstractMigrationWorker<DerivedT>::
785 UpdateCatalogMetadata(PendingCatalog *data) const
786 {
787 if (!data->HasChanges()) {
788 return true;
789 }
790
791 catalog::WritableCatalog *catalog =
792 (data->HasNew()) ? data->new_catalog : GetWritable(data->old_catalog);
793
794 // Set the previous revision hash in the new catalog to the old catalog
795 // we are doing the whole migration as a new snapshot that does not change
796 // any files, but just applies the necessary data schema migrations
797 catalog->SetPreviousRevision(data->old_catalog->hash());
798 catalog->IncrementRevision();
799 catalog->UpdateLastModified();
800
801 return true;
802 }
803
804
805 template<class DerivedT>
806 bool CommandMigrate::AbstractMigrationWorker<DerivedT>::
807 CollectAndAggregateStatistics(PendingCatalog *data) const
808 {
809 if (!collect_catalog_statistics_) {
810 return true;
811 }
812
813 const catalog::Catalog *new_catalog =
814 (data->HasNew()) ? data->new_catalog : data->old_catalog;
815 const catalog::CatalogDatabase &writable = new_catalog->database();
816 bool retval;
817
818 // Find out the discrepancy between MAX(rowid) and COUNT(*)
819 catalog::SqlCatalog wasted_inodes(writable,
820 "SELECT COUNT(*), MAX(rowid) FROM catalog;");
821 retval = wasted_inodes.FetchRow();
822 if (!retval) {
823 Error("Failed to count entries in catalog", wasted_inodes, data);
824 return false;
825 }
826 const unsigned int entry_count = wasted_inodes.RetrieveInt64(0);
827 const unsigned int max_row_id = wasted_inodes.RetrieveInt64(1);
828
829 // Save collected information into the central statistics aggregator
830 data->statistics.root_path = data->root_path();
831 data->statistics.max_row_id = max_row_id;
832 data->statistics.entry_count = entry_count;
833
834 return true;
835 }
836
837
838 template<class DerivedT>
839 bool CommandMigrate::AbstractMigrationWorker<DerivedT>::CleanupNestedCatalogs(
840 PendingCatalog *data) const
841 {
842 // All nested catalogs of PendingCatalog 'data' are fully processed and
843 // accounted. It is safe to get rid of their data structures here!
844 PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
845 PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
846 for (; i != iend; ++i) {
847 delete *i;
848 }
849
850 data->nested_catalogs.clear();
851 return true;
852 }
853
854
855 /**
856 * Those values _must_ reflect the schema version in catalog_sql.h so that a
857 * legacy catalog migration generates always the latest catalog revision.
858 * This is a deliberately duplicated piece of information to ensure that always
859 * both the catalog management and migration classes get updated.
860 */
861 const float CommandMigrate::MigrationWorker_20x::kSchema = 2.5;
862 const unsigned CommandMigrate::MigrationWorker_20x::kSchemaRevision = 6;
863
864
865 template<class DerivedT>
866 catalog::WritableCatalog*
867 CommandMigrate::AbstractMigrationWorker<DerivedT>::GetWritable(
868 const catalog::Catalog *catalog) const {
869 return dynamic_cast<catalog::WritableCatalog*>(const_cast<catalog::Catalog*>(
870 catalog));
871 }
872
873
874 //------------------------------------------------------------------------------
875
876
877 CommandMigrate::MigrationWorker_20x::MigrationWorker_20x(
878 const worker_context *context)
879 : AbstractMigrationWorker<MigrationWorker_20x>(context)
880 , fix_nested_catalog_transitions_(context->fix_nested_catalog_transitions)
881 , analyze_file_linkcounts_(context->analyze_file_linkcounts)
882 , uid_(context->uid)
883 , gid_(context->gid) { }
884
885
886 bool CommandMigrate::MigrationWorker_20x::RunMigration(PendingCatalog *data)
887 const
888 {
889 // double-check that we are generating compatible catalogs to the actual
890 // catalog management classes
891 assert(kSchema == catalog::CatalogDatabase::kLatestSupportedSchema);
892 assert(kSchemaRevision == catalog::CatalogDatabase::kLatestSchemaRevision);
893
894 return CreateNewEmptyCatalog(data) &&
895 CheckDatabaseSchemaCompatibility(data) &&
896 AttachOldCatalogDatabase(data) &&
897 StartDatabaseTransaction(data) &&
898 MigrateFileMetadata(data) &&
899 MigrateNestedCatalogMountPoints(data) &&
900 FixNestedCatalogTransitionPoints(data) &&
901 RemoveDanglingNestedMountpoints(data) &&
902 GenerateCatalogStatistics(data) &&
903 FindRootEntryInformation(data) &&
904 CommitDatabaseTransaction(data) &&
905 DetachOldCatalogDatabase(data);
906 }
907
908 bool CommandMigrate::MigrationWorker_20x::CreateNewEmptyCatalog(
909 PendingCatalog *data) const
910 {
911 const string root_path = data->root_path();
912
913 // create a new catalog database schema
914 const string clg_db_path =
915 CreateTempPath(temporary_directory_ + "/catalog", 0666);
916 if (clg_db_path.empty()) {
917 Error("Failed to create temporary file for the new catalog database.");
918 return false;
919 }
920 const bool volatile_content = false;
921
922 {
923 // TODO(rmeusel): Attach catalog should work with an open catalog database
924 // as well, to remove this inefficiency
925 UniquePtr<catalog::CatalogDatabase>
926 new_clg_db(catalog::CatalogDatabase::Create(clg_db_path));
927 if (!new_clg_db.IsValid() ||
928 !new_clg_db->InsertInitialValues(root_path, volatile_content, "")) {
929 Error("Failed to create database for new catalog");
930 unlink(clg_db_path.c_str());
931 return false;
932 }
933 }
934
935 // Attach the just created nested catalog database
936 catalog::WritableCatalog *writable_catalog =
937 catalog::WritableCatalog::AttachFreely(root_path, clg_db_path,
938 shash::Any(shash::kSha1));
939 if (writable_catalog == NULL) {
940 Error("Failed to open database for new catalog");
941 unlink(clg_db_path.c_str());
942 return false;
943 }
944
945 data->new_catalog = writable_catalog;
946 return true;
947 }
948
949
950 bool CommandMigrate::MigrationWorker_20x::CheckDatabaseSchemaCompatibility(
951 PendingCatalog *data) const
952 {
953 const catalog::CatalogDatabase &old_catalog = data->old_catalog->database();
954 const catalog::CatalogDatabase &new_catalog = data->new_catalog->database();
955
956 if ((new_catalog.schema_version() <
957 catalog::CatalogDatabase::kLatestSupportedSchema -
958 catalog::CatalogDatabase::kSchemaEpsilon
959 ||
960 new_catalog.schema_version() >
961 catalog::CatalogDatabase::kLatestSupportedSchema +
962 catalog::CatalogDatabase::kSchemaEpsilon)
963 ||
964 (old_catalog.schema_version() > 2.1 +
965 catalog::CatalogDatabase::kSchemaEpsilon))
966 {
967 Error("Failed to meet database requirements for migration.", data);
968 return false;
969 }
970 return true;
971 }
972
973
974 bool CommandMigrate::MigrationWorker_20x::AttachOldCatalogDatabase(
975 PendingCatalog *data) const
976 {
977 const catalog::CatalogDatabase &old_catalog = data->old_catalog->database();
978 const catalog::CatalogDatabase &new_catalog = data->new_catalog->database();
979
980 catalog::SqlCatalog sql_attach_new(new_catalog,
981 "ATTACH '" + old_catalog.filename() + "' AS old;");
982 bool retval = sql_attach_new.Execute();
983
984 // remove the hardlink to the old database file (temporary file), it will not
985 // be needed anymore... data will get deleted when the database is closed
986 unlink(data->old_catalog->database().filename().c_str());
987
988 if (!retval) {
989 Error("Failed to attach database of old catalog", sql_attach_new, data);
990 return false;
991 }
992 return true;
993 }
994
995
996 bool CommandMigrate::MigrationWorker_20x::StartDatabaseTransaction(
997 PendingCatalog *data) const
998 {
999 assert(data->HasNew());
1000 data->new_catalog->Transaction();
1001 return true;
1002 }
1003
1004
1005 bool CommandMigrate::MigrationWorker_20x::MigrateFileMetadata(
1006 PendingCatalog *data) const
1007 {
1008 assert(!data->new_catalog->IsDirty());
1009 assert(data->HasNew());
1010 bool retval;
1011 const catalog::CatalogDatabase &writable = data->new_catalog->database();
1012
1013 // Hardlinks scratch space.
1014 // This temporary table is used for the hardlink analysis results.
1015 // The old catalog format did not have a direct notion of hardlinks and their
1016 // linkcounts, but this information can be partly retrieved from the under-
1017 // lying file system semantics.
1018 //
1019 // Hardlinks:
1020 // groupid : this group id can be used for the new catalog schema
1021 // inode : the inodes that were part of a hardlink group before
1022 // linkcount : the linkcount for hardlink group id members
1023 catalog::SqlCatalog sql_create_hardlinks_table(writable,
1024 "CREATE TEMPORARY TABLE hardlinks "
1025 " ( hardlink_group_id INTEGER PRIMARY KEY AUTOINCREMENT, "
1026 " inode INTEGER, "
1027 " linkcount INTEGER, "
1028 " CONSTRAINT unique_inode UNIQUE (inode) );");
1029 retval = sql_create_hardlinks_table.Execute();
1030 if (!retval) {
1031 Error("Failed to create temporary hardlink analysis table",
1032 sql_create_hardlinks_table, data);
1033 return false;
1034 }
1035
1036 // Directory Linkcount scratch space.
1037 // Directory linkcounts can be obtained from the directory hierarchy reflected
1038 // in the old style catalogs. The new catalog schema asks for this specific
1039 // linkcount. Directory linkcount analysis results will be put into this
1040 // temporary table
1041 catalog::SqlCatalog sql_create_linkcounts_table(writable,
1042 "CREATE TEMPORARY TABLE dir_linkcounts "
1043 " ( inode INTEGER PRIMARY KEY, "
1044 " linkcount INTEGER );");
1045 retval = sql_create_linkcounts_table.Execute();
1046 if (!retval) {
1047 Error("Failed to create tmeporary directory linkcount analysis table",
1048 sql_create_linkcounts_table, data);
1049 }
1050
1051 // It is possible to skip this step.
1052 // In that case all hardlink inodes with a (potential) linkcount > 1 will get
1053 // degraded to files containing the same content
1054 if (analyze_file_linkcounts_) {
1055 retval = AnalyzeFileLinkcounts(data);
1056 if (!retval) {
1057 return false;
1058 }
1059 }
1060
1061 // Analyze the linkcounts of directories
1062 // - each directory has a linkcount of at least 2 (empty directory)
1063 // (link in parent directory and self reference (cd .) )
1064 // - for each child directory, the parent's link count is incremented by 1
1065 // (parent reference in child (cd ..) )
1066 //
1067 // Note: nested catalog mountpoints will be miscalculated here, since we can't
1068 // check the number of containing directories. They are defined in a the
1069 // linked nested catalog and need to be added later on.
1070 // (see: MigrateNestedCatalogMountPoints() for details)
1071 catalog::SqlCatalog sql_dir_linkcounts(writable,
1072 "INSERT INTO dir_linkcounts "
1073 " SELECT c1.inode as inode, "
1074 " SUM(IFNULL(MIN(c2.inode,1),0)) + 2 as linkcount "
1075 " FROM old.catalog as c1 "
1076 " LEFT JOIN old.catalog as c2 "
1077 " ON c2.parent_1 = c1.md5path_1 AND "
1078 " c2.parent_2 = c1.md5path_2 AND "
1079 " c2.flags & :flag_dir_1 "
1080 " WHERE c1.flags & :flag_dir_2 "
1081 " GROUP BY c1.inode;");
1082 retval =
1083 sql_dir_linkcounts.BindInt64(1, catalog::SqlDirent::kFlagDir) &&
1084 sql_dir_linkcounts.BindInt64(2, catalog::SqlDirent::kFlagDir) &&
1085 sql_dir_linkcounts.Execute();
1086 if (!retval) {
1087 Error("Failed to analyze directory specific linkcounts",
1088 sql_dir_linkcounts, data);
1089 if (sql_dir_linkcounts.GetLastError() == SQLITE_CONSTRAINT) {
1090 Error("Obviously your catalogs are corrupted, since we found a directory"
1091 "inode that is a file inode at the same time!");
1092 }
1093 return false;
1094 }
1095
1096 // Copy the old file meta information into the new catalog schema
1097 // here we also add the previously analyzed hardlink/linkcount information
1098 // from both temporary tables "hardlinks" and "dir_linkcounts".
1099 //
1100 // Note: nested catalog mountpoints still need to be treated separately
1101 // (see MigrateNestedCatalogMountPoints() for details)
1102 catalog::SqlCatalog migrate_file_meta_data(writable,
1103 "INSERT INTO catalog "
1104 " SELECT md5path_1, md5path_2, "
1105 " parent_1, parent_2, "
1106 " IFNULL(hardlink_group_id, 0) << 32 | "
1107 " COALESCE(hardlinks.linkcount, dir_linkcounts.linkcount, 1) "
1108 " AS hardlinks, "
1109 " hash, size, mode, mtime, "
1110 " flags, name, symlink, "
1111 " :uid, "
1112 " :gid, "
1113 " NULL " // set empty xattr BLOB (default)
1114 " FROM old.catalog "
1115 " LEFT JOIN hardlinks "
1116 " ON catalog.inode = hardlinks.inode "
1117 " LEFT JOIN dir_linkcounts "
1118 " ON catalog.inode = dir_linkcounts.inode;");
1119 retval = migrate_file_meta_data.BindInt64(1, uid_) &&
1120 migrate_file_meta_data.BindInt64(2, gid_) &&
1121 migrate_file_meta_data.Execute();
1122 if (!retval) {
1123 Error("Failed to migrate the file system meta data",
1124 migrate_file_meta_data, data);
1125 return false;
1126 }
1127
1128 // If we deal with a nested catalog, we need to add a .cvmfscatalog entry
1129 // since it was not present in the old repository specification but is needed
1130 // now!
1131 if (!data->IsRoot()) {
1132 const catalog::DirectoryEntry &nested_marker =
1133 CommandMigrate::GetNestedCatalogMarkerDirent();
1134 catalog::SqlDirentInsert insert_nested_marker(writable);
1135 const std::string root_path = data->root_path();
1136 const std::string file_path = root_path +
1137 "/" + nested_marker.name().ToString();
1138 const shash::Md5 &path_hash = shash::Md5(file_path.data(),
1139 file_path.size());
1140 const shash::Md5 &parent_hash = shash::Md5(root_path.data(),
1141 root_path.size());
1142 retval = insert_nested_marker.BindPathHash(path_hash) &&
1143 insert_nested_marker.BindParentPathHash(parent_hash) &&
1144 insert_nested_marker.BindDirent(nested_marker) &&
1145 insert_nested_marker.BindXattrEmpty() &&
1146 insert_nested_marker.Execute();
1147 if (!retval) {
1148 Error("Failed to insert nested catalog marker into new nested catalog.",
1149 insert_nested_marker, data);
1150 return false;
1151 }
1152 }
1153
1154 // Copy (and update) the properties fields
1155 //
1156 // Note: The 'schema' is explicitly not copied to the new catalog.
1157 // Each catalog contains a revision, which is also copied here and that
1158 // is later updated by calling catalog->IncrementRevision()
1159 catalog::SqlCatalog copy_properties(writable,
1160 "INSERT OR REPLACE INTO properties "
1161 " SELECT key, value "
1162 " FROM old.properties "
1163 " WHERE key != 'schema';");
1164 retval = copy_properties.Execute();
1165 if (!retval) {
1166 Error("Failed to migrate the properties table.", copy_properties, data);
1167 return false;
1168 }
1169
1170 return true;
1171 }
1172
1173
1174 bool CommandMigrate::MigrationWorker_20x::AnalyzeFileLinkcounts(
1175 PendingCatalog *data) const
1176 {
1177 assert(data->HasNew());
1178 const catalog::CatalogDatabase &writable = data->new_catalog->database();
1179 bool retval;
1180
1181 // Analyze the hardlink relationships in the old catalog
1182 // inodes used to be assigned at publishing time, implicitly constituating
1183 // those relationships. We now need them explicitly in the file catalogs
1184 // This looks for directory entries with matching inodes but differing path-
1185 // hashes and saves the results in a temporary table called 'hl_scratch'
1186 //
1187 // Note: We only support hardlink groups that reside in the same directory!
1188 // Therefore we first need to figure out hardlink candidates (which
1189 // might still contain hardlink groups spanning more than one directory)
1190 // In a second step these candidates will be analyzed to kick out un-
1191 // supported hardlink groups.
1192 // Unsupported hardlink groups will be be treated as normal files with
1193 // the same content
1194 catalog::SqlCatalog sql_create_hardlinks_scratch_table(writable,
1195 "CREATE TEMPORARY TABLE hl_scratch AS "
1196 " SELECT c1.inode AS inode, c1.md5path_1, c1.md5path_2, "
1197 " c1.parent_1 as c1p1, c1.parent_2 as c1p2, "
1198 " c2.parent_1 as c2p1, c2.parent_2 as c2p2 "
1199 " FROM old.catalog AS c1 "
1200 " INNER JOIN old.catalog AS c2 "
1201 " ON c1.inode == c2.inode AND "
1202 " (c1.md5path_1 != c2.md5path_1 OR "
1203 " c1.md5path_2 != c2.md5path_2);");
1204 retval = sql_create_hardlinks_scratch_table.Execute();
1205 if (!retval) {
1206 Error("Failed to create temporary scratch table for hardlink analysis",
1207 sql_create_hardlinks_scratch_table, data);
1208 return false;
1209 }
1210
1211 // Figures out which hardlink candidates are supported by CVMFS and can be
1212 // transferred into the new catalog as so called hardlink groups. Unsupported
1213 // hardlinks need to be discarded and treated as normal files containing the
1214 // exact same data
1215 catalog::SqlCatalog fill_linkcount_table_for_files(writable,
1216 "INSERT INTO hardlinks (inode, linkcount)"
1217 " SELECT inode, count(*) as linkcount "
1218 " FROM ( "
1219 // recombine supported hardlink inodes with their actual manifested
1220 // hard-links in the catalog.
1221 // Note: for each directory entry pointing to the same supported
1222 // hardlink inode we have a distinct MD5 path hash
1223 " SELECT DISTINCT hl.inode, hl.md5path_1, hl.md5path_2 "
1224 " FROM ( "
1225 // sort out supported hardlink inodes from unsupported ones by
1226 // locality
1227 // Note: see the next comment for the nested SELECT
1228 " SELECT inode "
1229 " FROM ( "
1230 " SELECT inode, count(*) AS cnt "
1231 " FROM ( "
1232 // go through the potential hardlinks and collect location infor-
1233 // mation about them.
1234 // Note: we only support hardlinks that all reside in the same
1235 // directory, thus having the same parent (c1p* == c2p*)
1236 // --> For supported hardlink candidates the SELECT DISTINCT
1237 // will produce only a single row, whereas others produce more
1238 " SELECT DISTINCT inode,c1p1,c1p1,c2p1,c2p2 "
1239 " FROM hl_scratch AS hl "
1240 " ) "
1241 " GROUP BY inode "
1242 " ) "
1243 " WHERE cnt = 1 "
1244 " ) AS supported_hardlinks "
1245 " LEFT JOIN hl_scratch AS hl "
1246 " ON supported_hardlinks.inode = hl.inode "
1247 " ) "
1248 " GROUP BY inode;");
1249 retval = fill_linkcount_table_for_files.Execute();
1250 if (!retval) {
1251 Error("Failed to analyze hardlink relationships for files.",
1252 fill_linkcount_table_for_files, data);
1253 return false;
1254 }
1255
1256 // The file linkcount and hardlink analysis is finished and the scratch table
1257 // can be deleted...
1258 catalog::SqlCatalog drop_hardlink_scratch_space(writable,
1259 "DROP TABLE hl_scratch;");
1260 retval = drop_hardlink_scratch_space.Execute();
1261 if (!retval) {
1262 Error("Failed to remove file linkcount analysis scratch table",
1263 drop_hardlink_scratch_space, data);
1264 return false;
1265 }
1266
1267 // Do some statistics if asked for...
1268 if (collect_catalog_statistics_) {
1269 catalog::SqlCatalog count_hardlinks(writable,
1270 "SELECT count(*), sum(linkcount) FROM hardlinks;");
1271 retval = count_hardlinks.FetchRow();
1272 if (!retval) {
1273 Error("Failed to count the generated file hardlinks for statistics",
1274 count_hardlinks, data);
1275 return false;
1276 }
1277
1278 data->statistics.hardlink_group_count += count_hardlinks.RetrieveInt64(0);
1279 data->statistics.aggregated_linkcounts += count_hardlinks.RetrieveInt64(1);
1280 }
1281
1282 return true;
1283 }
1284
1285
1286 bool CommandMigrate::MigrationWorker_20x::MigrateNestedCatalogMountPoints(
1287 PendingCatalog *data) const
1288 {
1289 assert(data->HasNew());
1290 const catalog::CatalogDatabase &writable = data->new_catalog->database();
1291 bool retval;
1292
1293 // preparing the SQL statement for nested catalog mountpoint update
1294 catalog::SqlCatalog update_mntpnt_linkcount(writable,
1295 "UPDATE catalog "
1296 "SET hardlinks = :linkcount "
1297 "WHERE md5path_1 = :md5_1 AND md5path_2 = :md5_2;");
1298
1299 // update all nested catalog mountpoints
1300 // (Note: we might need to wait for the nested catalog to be processed)
1301 PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
1302 PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
1303 for (; i != iend; ++i) {
1304 // collect information about the nested catalog
1305 PendingCatalog *nested_catalog = *i;
1306 const catalog::DirectoryEntry root_entry = nested_catalog->root_entry.Get();
1307 const string &root_path = nested_catalog->root_path();
1308
1309 // update the nested catalog mountpoint directory entry with the correct
1310 // linkcount that was determined while processing the nested catalog
1311 const shash::Md5 mountpoint_hash = shash::Md5(root_path.data(),
1312 root_path.size());
1313 retval =
1314 update_mntpnt_linkcount.BindInt64(1, root_entry.linkcount()) &&
1315 update_mntpnt_linkcount.BindMd5(2, 3, mountpoint_hash) &&
1316 update_mntpnt_linkcount.Execute();
1317 if (!retval) {
1318 Error("Failed to update linkcount of nested catalog mountpoint",
1319 update_mntpnt_linkcount, data);
1320 return false;
1321 }
1322 update_mntpnt_linkcount.Reset();
1323 }
1324
1325 return true;
1326 }
1327
1328
1329 bool CommandMigrate::MigrationWorker_20x::FixNestedCatalogTransitionPoints(
1330 PendingCatalog *data) const
1331 {
1332 assert(data->HasNew());
1333 if (!fix_nested_catalog_transitions_) {
1334 // Fixing transition point mismatches is not enabled...
1335 return true;
1336 }
1337
1338 typedef catalog::DirectoryEntry::Difference Difference;
1339
1340 const catalog::CatalogDatabase &writable = data->new_catalog->database();
1341 bool retval;
1342
1343 catalog::SqlLookupPathHash lookup_mountpoint(writable);
1344 catalog::SqlDirentUpdate update_directory_entry(writable);
1345
1346 // Unbox the nested catalogs (possibly waiting for migration of them first)
1347 PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
1348 PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
1349 for (; i != iend; ++i) {
1350 // Collect information about the nested catalog
1351 PendingCatalog *nested_catalog = *i;
1352 const catalog::DirectoryEntry nested_root_entry =
1353 nested_catalog->root_entry.Get();
1354 const string &nested_root_path = nested_catalog->root_path();
1355 const shash::Md5 mountpoint_path_hash =
1356 shash::Md5(nested_root_path.data(), nested_root_path.size());
1357
1358 // Retrieve the nested catalog mountpoint from the current catalog
1359 retval = lookup_mountpoint.BindPathHash(mountpoint_path_hash) &&
1360 lookup_mountpoint.FetchRow();
1361 if (!retval) {
1362 Error("Failed to fetch nested catalog mountpoint to check for compatible"
1363 "transition points", lookup_mountpoint, data);
1364 return false;
1365 }
1366
1367 catalog::DirectoryEntry mountpoint_entry =
1368 lookup_mountpoint.GetDirent(data->new_catalog);
1369 lookup_mountpoint.Reset();
1370
1371 // Compare nested catalog mountpoint and nested catalog root entries
1372 catalog::DirectoryEntry::Differences diffs =
1373 mountpoint_entry.CompareTo(nested_root_entry);
1374
1375 // We MUST deal with two directory entries that are a pair of nested cata-
1376 // log mountpoint and root entry! Thus we expect their transition flags to
1377 // differ and their name to be the same.
1378 assert(diffs & Difference::kNestedCatalogTransitionFlags);
1379 assert((diffs & Difference::kName) == 0);
1380
1381 // Check if there are other differences except the nested catalog transition
1382 // flags and fix them...
1383 if ((diffs ^ Difference::kNestedCatalogTransitionFlags) != 0) {
1384 // If we found differences, we still assume a couple of directory entry
1385 // fields to be the same, otherwise some severe stuff would be wrong...
1386 if ((diffs & Difference::kChecksum) ||
1387 (diffs & Difference::kLinkcount) ||
1388 (diffs & Difference::kSymlink) ||
1389 (diffs & Difference::kChunkedFileFlag) )
1390 {
1391 Error("Found an irreparable mismatch in a nested catalog transition "
1392 "point at '" + nested_root_path + "'\nAborting...\n");
1393 }
1394
1395 // Copy the properties from the nested catalog root entry into the mount-
1396 // point entry to bring them in sync again
1397 CommandMigrate::FixNestedCatalogTransitionPoint(
1398 nested_root_entry, &mountpoint_entry);
1399
1400 // save the nested catalog mountpoint entry into the catalog
1401 retval = update_directory_entry.BindPathHash(mountpoint_path_hash) &&
1402 update_directory_entry.BindDirent(mountpoint_entry) &&
1403 update_directory_entry.Execute();
1404 if (!retval) {
1405 Error("Failed to save resynchronized nested catalog mountpoint into "
1406 "catalog database", update_directory_entry, data);
1407 return false;
1408 }
1409 update_directory_entry.Reset();
1410
1411 // Fixing of this mountpoint went well... inform the user that this minor
1412 // issue occurred
1413 LogCvmfs(kLogCatalog, kLogStdout,
1414 "NOTE: fixed incompatible nested catalog transition point at: "
1415 "'%s' ", nested_root_path.c_str());
1416 }
1417 }
1418
1419 return true;
1420 }
1421
1422
1423 void CommandMigrate::FixNestedCatalogTransitionPoint(
1424 const catalog::DirectoryEntry &nested_root,
1425 catalog::DirectoryEntry *mountpoint
1426 ) {
1427 // Replace some file system parameters in the mountpoint to resync it with
1428 // the nested root of the corresponding nested catalog
1429 //
1430 // Note: this method relies on CommandMigrate being a friend of DirectoryEntry
1431 mountpoint->mode_ = nested_root.mode_;
1432 mountpoint->uid_ = nested_root.uid_;
1433 mountpoint->gid_ = nested_root.gid_;
1434 mountpoint->size_ = nested_root.size_;
1435 mountpoint->mtime_ = nested_root.mtime_;
1436 }
1437
1438
1439 bool CommandMigrate::MigrationWorker_20x::RemoveDanglingNestedMountpoints(
1440 PendingCatalog *data) const
1441 {
1442 assert(data->HasNew());
1443 const catalog::CatalogDatabase &writable = data->new_catalog->database();
1444 bool retval = false;
1445
1446 // build a set of registered nested catalog path hashes
1447 typedef catalog::Catalog::NestedCatalogList NestedCatalogList;
1448 typedef std::map<shash::Md5, catalog::Catalog::NestedCatalog>
1449 NestedCatalogMap;
1450 const NestedCatalogList& nested_clgs =
1451 data->old_catalog->ListNestedCatalogs();
1452 NestedCatalogList::const_iterator i = nested_clgs.begin();
1453 const NestedCatalogList::const_iterator iend = nested_clgs.end();
1454 NestedCatalogMap nested_catalog_path_hashes;
1455 for (; i != iend; ++i) {
1456 const PathString &path = i->mountpoint;
1457 const shash::Md5 hash(path.GetChars(), path.GetLength());
1458 nested_catalog_path_hashes[hash] = *i;
1459 }
1460
1461 // Retrieve nested catalog mountpoints that have child entries directly inside
1462 // the current catalog (which is a malformed state)
1463 catalog::SqlLookupDanglingMountpoints sql_dangling_mountpoints(writable);
1464 catalog::SqlDirentUpdate save_updated_mountpoint(writable);
1465
1466 std::vector<catalog::DirectoryEntry> todo_dirent;
1467 std::vector<shash::Md5> todo_hash;
1468
1469 // go through the list of dangling nested catalog mountpoints and fix them
1470 // where needed (check if there is no nested catalog registered for them)
1471 while (sql_dangling_mountpoints.FetchRow()) {
1472 catalog::DirectoryEntry dangling_mountpoint =
1473 sql_dangling_mountpoints.GetDirent(data->new_catalog);
1474 const shash::Md5 path_hash = sql_dangling_mountpoints.GetPathHash();
1475 assert(dangling_mountpoint.IsNestedCatalogMountpoint());
1476
1477 // check if the nested catalog mountpoint is registered in the nested cata-
1478 // log list of the currently migrated catalog
1479 const NestedCatalogMap::const_iterator nested_catalog =
1480 nested_catalog_path_hashes.find(path_hash);
1481 if (nested_catalog != nested_catalog_path_hashes.end()) {
1482 LogCvmfs(kLogCatalog, kLogStderr,
1483 "WARNING: found a non-empty nested catalog mountpoint under "
1484 "'%s'", nested_catalog->second.mountpoint.c_str());
1485 continue;
1486 }
1487
1488 // the mountpoint was confirmed to be dangling and needs to be removed
1489 dangling_mountpoint.set_is_nested_catalog_mountpoint(false);
1490 todo_dirent.push_back(dangling_mountpoint);
1491 todo_hash.push_back(path_hash);
1492 }
1493
1494 for (unsigned i = 0; i < todo_dirent.size(); ++i) {
1495 retval = save_updated_mountpoint.BindPathHash(todo_hash[i]) &&
1496 save_updated_mountpoint.BindDirent(todo_dirent[i]) &&
1497 save_updated_mountpoint.Execute() &&
1498 save_updated_mountpoint.Reset();
1499 if (!retval) {
1500 Error("Failed to remove dangling nested catalog mountpoint entry in "
1501 "catalog", save_updated_mountpoint, data);
1502 return false;
1503 }
1504
1505 // tell the user that this intervention has been taken place
1506 LogCvmfs(kLogCatalog, kLogStdout, "NOTE: fixed dangling nested catalog "
1507 "mountpoint entry called: '%s' ",
1508 todo_dirent[i].name().c_str());
1509 }
1510
1511 return true;
1512 }
1513
1514
1515 const catalog::DirectoryEntry& CommandMigrate::GetNestedCatalogMarkerDirent() {
1516 // This is pre-initialized singleton... it MUST be already there...
1517 assert(nested_catalog_marker_.name_.ToString() == ".cvmfscatalog");
1518 return nested_catalog_marker_;
1519 }
1520
1521 bool CommandMigrate::GenerateNestedCatalogMarkerChunk() {
1522 // Create an empty nested catalog marker file
1523 nested_catalog_marker_tmp_path_ =
1524 CreateTempPath(temporary_directory_ + "/.cvmfscatalog", 0644);
1525 if (nested_catalog_marker_tmp_path_.empty()) {
1526 Error("Failed to create temp file for nested catalog marker dummy.");
1527 return false;
1528 }
1529
1530 // Process and upload it to the backend storage
1531 IngestionSource *source =
1532 new FileIngestionSource(nested_catalog_marker_tmp_path_);
1533 spooler_->Process(source);
1534 return true;
1535 }
1536
1537 void CommandMigrate::CreateNestedCatalogMarkerDirent(
1538 const shash::Any &content_hash)
1539 {
1540 // Generate it only once
1541 assert(nested_catalog_marker_.name_.ToString() != ".cvmfscatalog");
1542
1543 // Fill the DirectoryEntry structure will all needed information
1544 nested_catalog_marker_.name_.Assign(".cvmfscatalog", strlen(".cvmfscatalog"));
1545 nested_catalog_marker_.mode_ = 33188;
1546 nested_catalog_marker_.uid_ = uid_;
1547 nested_catalog_marker_.gid_ = gid_;
1548 nested_catalog_marker_.size_ = 0;
1549 nested_catalog_marker_.mtime_ = time(NULL);
1550 nested_catalog_marker_.linkcount_ = 1;
1551 nested_catalog_marker_.checksum_ = content_hash;
1552 }
1553
1554
1555 bool CommandMigrate::MigrationWorker_20x::GenerateCatalogStatistics(
1556 PendingCatalog *data) const
1557 {
1558 assert(data->HasNew());
1559 bool retval = false;
1560 const catalog::CatalogDatabase &writable = data->new_catalog->database();
1561
1562 // Aggregated the statistics counters of all nested catalogs
1563 // Note: we might need to wait until nested catalogs are successfully
1564 // processed
1565 catalog::DeltaCounters stats_counters;
1566 PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
1567 PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
1568 for (; i != iend; ++i) {
1569 const PendingCatalog *nested_catalog = *i;
1570 const catalog::DeltaCounters &s = nested_catalog->nested_statistics.Get();
1571 s.PopulateToParent(&stats_counters);
1572 }
1573
1574 // Count various directory entry types in the catalog to fill up the catalog
1575 // statistics counters introduced in the current catalog schema
1576 catalog::SqlCatalog count_regular_files(writable,
1577 "SELECT count(*) FROM catalog "
1578 " WHERE flags & :flag_file "
1579 " AND NOT flags & :flag_link;");
1580 catalog::SqlCatalog count_symlinks(writable,
1581 "SELECT count(*) FROM catalog WHERE flags & :flag_link;");
1582 catalog::SqlCatalog count_directories(writable,
1583 "SELECT count(*) FROM catalog WHERE flags & :flag_dir;");
1584 catalog::SqlCatalog aggregate_file_size(writable,
1585 "SELECT sum(size) FROM catalog WHERE flags & :flag_file "
1586 " AND NOT flags & :flag_link");
1587
1588 // Run the actual counting queries
1589 retval =
1590 count_regular_files.BindInt64(1, catalog::SqlDirent::kFlagFile) &&
1591 count_regular_files.BindInt64(2, catalog::SqlDirent::kFlagLink) &&
1592 count_regular_files.FetchRow();
1593 if (!retval) {
1594 Error("Failed to count regular files.", count_regular_files, data);
1595 return false;
1596 }
1597 retval =
1598 count_symlinks.BindInt64(1, catalog::SqlDirent::kFlagLink) &&
1599 count_symlinks.FetchRow();
1600 if (!retval) {
1601 Error("Failed to count symlinks.", count_symlinks, data);
1602 return false;
1603 }
1604 retval =
1605 count_directories.BindInt64(1, catalog::SqlDirent::kFlagDir) &&
1606 count_directories.FetchRow();
1607 if (!retval) {
1608 Error("Failed to count directories.", count_directories, data);
1609 return false;
1610 }
1611 retval =
1612 aggregate_file_size.BindInt64(1, catalog::SqlDirent::kFlagFile) &&
1613 aggregate_file_size.BindInt64(2, catalog::SqlDirent::kFlagLink) &&
1614 aggregate_file_size.FetchRow();
1615 if (!retval) {
1616 Error("Failed to aggregate the file sizes.", aggregate_file_size, data);
1617 return false;
1618 }
1619
1620 // Insert the counted statistics into the DeltaCounters data structure
1621 stats_counters.self.regular_files = count_regular_files.RetrieveInt64(0);
1622 stats_counters.self.symlinks = count_symlinks.RetrieveInt64(0);
1623 stats_counters.self.directories = count_directories.RetrieveInt64(0);
1624 stats_counters.self.nested_catalogs = data->nested_catalogs.size();
1625 stats_counters.self.file_size = aggregate_file_size.RetrieveInt64(0);
1626
1627 // Write back the generated statistics counters into the catalog database
1628 stats_counters.WriteToDatabase(writable);
1629
1630 // Push the generated statistics counters up to the parent catalog
1631 data->nested_statistics.Set(stats_counters);
1632
1633 return true;
1634 }
1635
1636
1637 bool CommandMigrate::MigrationWorker_20x::FindRootEntryInformation(
1638 PendingCatalog *data) const
1639 {
1640 const catalog::CatalogDatabase &writable = data->new_catalog->database();
1641 bool retval;
1642
1643 std::string root_path = data->root_path();
1644 shash::Md5 root_path_hash = shash::Md5(root_path.data(), root_path.size());
1645
1646 catalog::SqlLookupPathHash lookup_root_entry(writable);
1647 retval = lookup_root_entry.BindPathHash(root_path_hash) &&
1648 lookup_root_entry.FetchRow();
1649 if (!retval) {
1650 Error("Failed to retrieve root directory entry of migrated catalog",
1651 lookup_root_entry, data);
1652 return false;
1653 }
1654
1655 catalog::DirectoryEntry entry =
1656 lookup_root_entry.GetDirent(data->new_catalog);
1657 if (entry.linkcount() < 2 || entry.hardlink_group() > 0) {
1658 Error("Retrieved linkcount of catalog root entry is not sane.", data);
1659 return false;
1660 }
1661
1662 data->root_entry.Set(entry);
1663 return true;
1664 }
1665
1666
1667 bool CommandMigrate::MigrationWorker_20x::CommitDatabaseTransaction(
1668 PendingCatalog *data) const
1669 {
1670 assert(data->HasNew());
1671 data->new_catalog->Commit();
1672 return true;
1673 }
1674
1675
1676 bool CommandMigrate::MigrationWorker_20x::DetachOldCatalogDatabase(
1677 PendingCatalog *data) const
1678 {
1679 assert(data->HasNew());
1680 const catalog::CatalogDatabase &writable = data->new_catalog->database();
1681 catalog::SqlCatalog detach_old_catalog(writable, "DETACH old;");
1682 const bool retval = detach_old_catalog.Execute();
1683 if (!retval) {
1684 Error("Failed to detach old catalog database.", detach_old_catalog, data);
1685 return false;
1686 }
1687 return true;
1688 }
1689
1690
1691 //------------------------------------------------------------------------------
1692
1693
1694 CommandMigrate::MigrationWorker_217::MigrationWorker_217(
1695 const worker_context *context)
1696 : AbstractMigrationWorker<MigrationWorker_217>(context)
1697 { }
1698
1699
1700 bool CommandMigrate::MigrationWorker_217::RunMigration(PendingCatalog *data)
1701 const
1702 {
1703 return CheckDatabaseSchemaCompatibility(data) &&
1704 StartDatabaseTransaction(data) &&
1705 GenerateNewStatisticsCounters(data) &&
1706 UpdateCatalogSchema(data) &&
1707 CommitDatabaseTransaction(data);
1708 }
1709
1710
1711 bool CommandMigrate::MigrationWorker_217::CheckDatabaseSchemaCompatibility(
1712 PendingCatalog *data) const
1713 {
1714 assert(!data->HasNew());
1715 const catalog::CatalogDatabase &old_catalog = data->old_catalog->database();
1716
1717 if ((old_catalog.schema_version() < 2.4 -
1718 catalog::CatalogDatabase::kSchemaEpsilon)
1719 ||
1720 (old_catalog.schema_version() > 2.4 +
1721 catalog::CatalogDatabase::kSchemaEpsilon))
1722 {
1723 Error("Given Catalog is not Schema 2.4.", data);
1724 return false;
1725 }
1726
1727 return true;
1728 }
1729
1730
1731 bool CommandMigrate::MigrationWorker_217::StartDatabaseTransaction(
1732 PendingCatalog *data) const
1733 {
1734 assert(!data->HasNew());
1735 GetWritable(data->old_catalog)->Transaction();
1736 return true;
1737 }
1738
1739
1740 bool CommandMigrate::MigrationWorker_217::GenerateNewStatisticsCounters
1741 (PendingCatalog *data) const {
1742 assert(!data->HasNew());
1743 bool retval = false;
1744 const catalog::CatalogDatabase &writable =
1745 GetWritable(data->old_catalog)->database();
1746
1747 // Aggregated the statistics counters of all nested catalogs
1748 // Note: we might need to wait until nested catalogs are successfully
1749 // processed
1750 catalog::DeltaCounters stats_counters;
1751 PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
1752 PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
1753 for (; i != iend; ++i) {
1754 const PendingCatalog *nested_catalog = *i;
1755 const catalog::DeltaCounters &s = nested_catalog->nested_statistics.Get();
1756 s.PopulateToParent(&stats_counters);
1757 }
1758
1759 // Count various directory entry types in the catalog to fill up the catalog
1760 // statistics counters introduced in the current catalog schema
1761 catalog::SqlCatalog count_chunked_files(writable,
1762 "SELECT count(*), sum(size) FROM catalog "
1763 " WHERE flags & :flag_chunked_file;");
1764 catalog::SqlCatalog count_file_chunks(writable,
1765 "SELECT count(*) FROM chunks;");
1766 catalog::SqlCatalog aggregate_file_size(writable,
1767 "SELECT sum(size) FROM catalog WHERE flags & :flag_file "
1768 " AND NOT flags & :flag_link;");
1769
1770 // Run the actual counting queries
1771 retval =
1772 count_chunked_files.BindInt64(1, catalog::SqlDirent::kFlagFileChunk) &&
1773 count_chunked_files.FetchRow();
1774 if (!retval) {
1775 Error("Failed to count chunked files.", count_chunked_files, data);
1776 return false;
1777 }
1778 retval = count_file_chunks.FetchRow();
1779 if (!retval) {
1780 Error("Failed to count file chunks", count_file_chunks, data);
1781 return false;
1782 }
1783 retval =
1784 aggregate_file_size.BindInt64(1, catalog::SqlDirent::kFlagFile) &&
1785 aggregate_file_size.BindInt64(2, catalog::SqlDirent::kFlagLink) &&
1786 aggregate_file_size.FetchRow();
1787 if (!retval) {
1788 Error("Failed to aggregate the file sizes.", aggregate_file_size, data);
1789 return false;
1790 }
1791
1792 // Insert the counted statistics into the DeltaCounters data structure
1793 stats_counters.self.chunked_files = count_chunked_files.RetrieveInt64(0);
1794 stats_counters.self.chunked_file_size = count_chunked_files.RetrieveInt64(1);
1795 stats_counters.self.file_chunks = count_file_chunks.RetrieveInt64(0);
1796 stats_counters.self.file_size = aggregate_file_size.RetrieveInt64(0);
1797
1798 // Write back the generated statistics counters into the catalog database
1799 catalog::Counters counters;
1800 retval = counters.ReadFromDatabase(writable, catalog::LegacyMode::kLegacy);
1801 if (!retval) {
1802 Error("Failed to read old catalog statistics counters", data);
1803 return false;
1804 }
1805 counters.ApplyDelta(stats_counters);
1806 retval = counters.InsertIntoDatabase(writable);
1807 if (!retval) {
1808 Error("Failed to write new statistics counters to database", data);
1809 return false;
1810 }
1811
1812 // Push the generated statistics counters up to the parent catalog
1813 data->nested_statistics.Set(stats_counters);
1814
1815 return true;
1816 }
1817
1818
1819 bool CommandMigrate::MigrationWorker_217::UpdateCatalogSchema
1820 (PendingCatalog *data) const {
1821 assert(!data->HasNew());
1822 const catalog::CatalogDatabase &writable =
1823 GetWritable(data->old_catalog)->database();
1824 catalog::SqlCatalog update_schema_version(writable,
1825 "UPDATE properties SET value = :schema_version WHERE key = 'schema';");
1826
1827 const bool retval =
1828 update_schema_version.BindDouble(1, 2.5) &&
1829 update_schema_version.Execute();
1830 if (!retval) {
1831 Error("Failed to update catalog schema version",
1832 update_schema_version,
1833 data);
1834 return false;
1835 }
1836
1837 return true;
1838 }
1839
1840
1841 bool CommandMigrate::MigrationWorker_217::CommitDatabaseTransaction
1842 (PendingCatalog *data) const {
1843 assert(!data->HasNew());
1844 GetWritable(data->old_catalog)->Commit();
1845 return true;
1846 }
1847
1848
1849 //------------------------------------------------------------------------------
1850
1851
1852 CommandMigrate::ChownMigrationWorker::ChownMigrationWorker(
1853 const worker_context *context)
1854 : AbstractMigrationWorker<ChownMigrationWorker>(context)
1855 , uid_map_statement_(GenerateMappingStatement(context->uid_map, "uid"))
1856 , gid_map_statement_(GenerateMappingStatement(context->gid_map, "gid"))
1857 {}
1858
1859 bool CommandMigrate::ChownMigrationWorker::RunMigration(
1860 PendingCatalog *data) const {
1861 return ApplyPersonaMappings(data);
1862 }
1863
1864
1865 bool CommandMigrate::ChownMigrationWorker::ApplyPersonaMappings(
1866 PendingCatalog *data) const {
1867 assert(data->old_catalog != NULL);
1868 assert(data->new_catalog == NULL);
1869
1870 if (data->old_catalog->mountpoint() ==
1871 PathString("/" + string(catalog::VirtualCatalog::kVirtualPath)))
1872 {
1873 // skipping virtual catalog
1874 return true;
1875 }
1876
1877 const catalog::CatalogDatabase &db =
1878 GetWritable(data->old_catalog)->database();
1879
1880 if (!db.BeginTransaction()) {
1881 return false;
1882 }
1883
1884 catalog::SqlCatalog uid_sql(db, uid_map_statement_);
1885 if (!uid_sql.Execute()) {
1886 Error("Failed to update UIDs", uid_sql, data);
1887 return false;
1888 }
1889
1890 catalog::SqlCatalog gid_sql(db, gid_map_statement_);
1891 if (!gid_sql.Execute()) {
1892 Error("Failed to update GIDs", gid_sql, data);
1893 return false;
1894 }
1895
1896 return db.CommitTransaction();
1897 }
1898
1899
1900 template <class MapT>
1901 std::string CommandMigrate::ChownMigrationWorker::GenerateMappingStatement(
1902 const MapT &map,
1903 const std::string &column) const {
1904 assert(map.RuleCount() > 0 || map.HasDefault());
1905
1906 std::string stmt = "UPDATE OR ABORT catalog SET " + column + " = ";
1907
1908 if (map.RuleCount() == 0) {
1909 // map everything to the same value (just a simple UPDATE clause)
1910 stmt += StringifyInt(map.GetDefault());
1911 } else {
1912 // apply multiple ID mappings (UPDATE clause with CASE statement)
1913 stmt += "CASE " + column + " ";
1914 typedef typename MapT::map_type::const_iterator map_iterator;
1915 map_iterator i = map.GetRuleMap().begin();
1916 const map_iterator iend = map.GetRuleMap().end();
1917 for (; i != iend; ++i) {
1918 stmt += "WHEN " + StringifyInt(i->first) +
1919 " THEN " + StringifyInt(i->second) + " ";
1920 }
1921
1922 // add a default (if provided) or leave unchanged if no mapping fits
1923 stmt += (map.HasDefault())
1924 ? "ELSE " + StringifyInt(map.GetDefault()) + " "
1925 : "ELSE " + column + " ";
1926 stmt += "END";
1927 }
1928
1929 stmt += ";";
1930 return stmt;
1931 }
1932
1933
1934 //------------------------------------------------------------------------------
1935
1936
1937 bool CommandMigrate::HardlinkRemovalMigrationWorker::RunMigration(
1938 PendingCatalog *data) const {
1939 return CheckDatabaseSchemaCompatibility(data) &&
1940 BreakUpHardlinks(data);
1941 }
1942
1943
1944 bool
1945 CommandMigrate::HardlinkRemovalMigrationWorker::CheckDatabaseSchemaCompatibility
1946 (PendingCatalog *data) const {
1947 assert(data->old_catalog != NULL);
1948 assert(data->new_catalog == NULL);
1949
1950 const catalog::CatalogDatabase &clg = data->old_catalog->database();
1951 return clg.schema_version() >= 2.4 - catalog::CatalogDatabase::kSchemaEpsilon;
1952 }
1953
1954
1955 bool CommandMigrate::HardlinkRemovalMigrationWorker::BreakUpHardlinks(
1956 PendingCatalog *data) const {
1957 assert(data->old_catalog != NULL);
1958 assert(data->new_catalog == NULL);
1959
1960 const catalog::CatalogDatabase &db =
1961 GetWritable(data->old_catalog)->database();
1962
1963 if (!db.BeginTransaction()) {
1964 return false;
1965 }
1966
1967 // CernVM-FS catalogs do not contain inodes directly but they are assigned by
1968 // the CVMFS catalog at runtime. Hardlinks are treated with so-called hardlink
1969 // group IDs to indicate hardlink relationships that need to be respected at
1970 // runtime by assigning identical inodes accordingly.
1971 //
1972 // This updates all directory entries of a given catalog that have a linkcount
1973 // greater than 1 and are flagged as a 'file'. Note: Symlinks are flagged both
1974 // as 'file' and as 'symlink', hence they are updated implicitly as well.
1975 //
1976 // The 'hardlinks' field in the catalog contains two 32 bit integers:
1977 // * the linkcount in the lower 32 bits
1978 // * the (so called) hardlink group ID in the higher 32 bits
1979 //
1980 // Files that have a linkcount of exactly 1 do not have any hardlinks and have
1981 // the (implicit) hardlink group ID '0'. Hence, 'hardlinks == 1' means that a
1982 // file doesn't have any hardlinks (linkcount = 1) and doesn't need treatment
1983 // here.
1984 //
1985 // Files that have hardlinks (linkcount > 1) will have a very large integer in
1986 // their 'hardlinks' field (hardlink group ID > 0 in higher 32 bits). Those
1987 // files will be treated by setting their 'hardlinks' field to 1, effectively
1988 // clearing all hardlink information from the directory entry.
1989 const std::string stmt = "UPDATE OR ABORT catalog "
1990 "SET hardlinks = 1 "
1991 "WHERE flags & :file_flag "
1992 " AND hardlinks > 1;";
1993 catalog::SqlCatalog hardlink_removal_sql(db, stmt);
1994 hardlink_removal_sql.BindInt64(1, catalog::SqlDirent::kFlagFile);
1995 hardlink_removal_sql.Execute();
1996
1997 return db.CommitTransaction();
1998 }
1999
2000 //------------------------------------------------------------------------------
2001
2002
2003 bool CommandMigrate::BulkhashRemovalMigrationWorker::RunMigration(
2004 PendingCatalog *data) const {
2005 return CheckDatabaseSchemaCompatibility(data) &&
2006 RemoveRedundantBulkHashes(data);
2007 }
2008
2009
2010 bool
2011 CommandMigrate::BulkhashRemovalMigrationWorker::CheckDatabaseSchemaCompatibility
2012 (PendingCatalog *data) const {
2013 assert(data->old_catalog != NULL);
2014 assert(data->new_catalog == NULL);
2015
2016 const catalog::CatalogDatabase &clg = data->old_catalog->database();
2017 return clg.schema_version() >= 2.4 - catalog::CatalogDatabase::kSchemaEpsilon;
2018 }
2019
2020
2021 bool CommandMigrate::BulkhashRemovalMigrationWorker::RemoveRedundantBulkHashes(
2022 PendingCatalog *data) const {
2023 assert(data->old_catalog != NULL);
2024 assert(data->new_catalog == NULL);
2025
2026 const catalog::CatalogDatabase &db =
2027 GetWritable(data->old_catalog)->database();
2028
2029 if (!db.BeginTransaction()) {
2030 return false;
2031 }
2032
2033 // Regular files with both bulk hashes and chunked hashes can drop the bulk
2034 // hash since modern clients >= 2.1.7 won't require them
2035 const std::string stmt = "UPDATE OR ABORT catalog "
2036 "SET hash = NULL "
2037 "WHERE flags & :file_chunked_flag;";
2038 catalog::SqlCatalog bulkhash_removal_sql(db, stmt);
2039 bulkhash_removal_sql.BindInt64(1, catalog::SqlDirent::kFlagFileChunk);
2040 bulkhash_removal_sql.Execute();
2041
2042 return db.CommitTransaction();
2043 }
2044
2045
2046 //------------------------------------------------------------------------------
2047
2048
2049 CommandMigrate::StatsMigrationWorker::StatsMigrationWorker(
2050 const worker_context *context)
2051 : AbstractMigrationWorker<StatsMigrationWorker>(context)
2052 { }
2053
2054
2055 bool CommandMigrate::StatsMigrationWorker::RunMigration(PendingCatalog *data)
2056 const
2057 {
2058 return CheckDatabaseSchemaCompatibility(data) &&
2059 StartDatabaseTransaction(data) &&
2060 RepairStatisticsCounters(data) &&
2061 CommitDatabaseTransaction(data);
2062 }
2063
2064
2065 bool CommandMigrate::StatsMigrationWorker::CheckDatabaseSchemaCompatibility(
2066 PendingCatalog *data) const
2067 {
2068 assert(data->old_catalog != NULL);
2069 assert(data->new_catalog == NULL);
2070
2071 const catalog::CatalogDatabase &clg = data->old_catalog->database();
2072 if (clg.schema_version() < 2.5 - catalog::CatalogDatabase::kSchemaEpsilon) {
2073 Error("Given catalog schema is < 2.5.", data);
2074 return false;
2075 }
2076
2077 if (clg.schema_revision() < 5) {
2078 Error("Given catalog revision is < 5", data);
2079 return false;
2080 }
2081
2082 return true;
2083 }
2084
2085
2086 bool CommandMigrate::StatsMigrationWorker::StartDatabaseTransaction(
2087 PendingCatalog *data) const
2088 {
2089 assert(!data->HasNew());
2090 GetWritable(data->old_catalog)->Transaction();
2091 return true;
2092 }
2093
2094
2095 bool CommandMigrate::StatsMigrationWorker::RepairStatisticsCounters(
2096 PendingCatalog *data) const
2097 {
2098 assert(!data->HasNew());
2099 bool retval = false;
2100 const catalog::CatalogDatabase &writable =
2101 GetWritable(data->old_catalog)->database();
2102
2103 // Aggregated the statistics counters of all nested catalogs
2104 // Note: we might need to wait until nested catalogs are successfully
2105 // processed
2106 catalog::DeltaCounters stats_counters;
2107 PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
2108 PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
2109 for (; i != iend; ++i) {
2110 const PendingCatalog *nested_catalog = *i;
2111 const catalog::DeltaCounters &s = nested_catalog->nested_statistics.Get();
2112 s.PopulateToParent(&stats_counters);
2113 }
2114
2115 // Count various directory entry types in the catalog to fill up the catalog
2116 // statistics counters introduced in the current catalog schema
2117 catalog::SqlCatalog count_regular(writable,
2118 std::string("SELECT count(*), sum(size) FROM catalog ") +
2119 "WHERE flags & " + StringifyInt(catalog::SqlDirent::kFlagFile) +
2120 " AND NOT flags & " + StringifyInt(catalog::SqlDirent::kFlagLink) +
2121 " AND NOT flags & " + StringifyInt(catalog::SqlDirent::kFlagFileSpecial) +
2122 ";");
2123 catalog::SqlCatalog count_external(writable,
2124 std::string("SELECT count(*), sum(size) FROM catalog ") +
2125 "WHERE flags & " + StringifyInt(catalog::SqlDirent::kFlagFileExternal) +
2126 ";");
2127 catalog::SqlCatalog count_symlink(writable,
2128 std::string("SELECT count(*) FROM catalog ") +
2129 "WHERE flags & " + StringifyInt(catalog::SqlDirent::kFlagLink) + ";");
2130 catalog::SqlCatalog count_special(writable,
2131 std::string("SELECT count(*) FROM catalog ") +
2132 "WHERE flags & " + StringifyInt(catalog::SqlDirent::kFlagFileSpecial) +
2133 ";");
2134 catalog::SqlCatalog count_xattr(writable,
2135 std::string("SELECT count(*) FROM catalog ") +
2136 "WHERE xattr IS NOT NULL;");
2137 catalog::SqlCatalog count_chunk(writable,
2138 std::string("SELECT count(*), sum(size) FROM catalog ") +
2139 "WHERE flags & " + StringifyInt(catalog::SqlDirent::kFlagFileChunk) + ";");
2140 catalog::SqlCatalog count_dir(writable,
2141 std::string("SELECT count(*) FROM catalog ") +
2142 "WHERE flags & " + StringifyInt(catalog::SqlDirent::kFlagDir) + ";");
2143 catalog::SqlCatalog count_chunk_blobs(writable,
2144 "SELECT count(*) FROM chunks;");
2145
2146 retval = count_regular.FetchRow() &&
2147 count_external.FetchRow() &&
2148 count_symlink.FetchRow() &&
2149 count_special.FetchRow() &&
2150 count_xattr.FetchRow() &&
2151 count_chunk.FetchRow() &&
2152 count_dir.FetchRow() &&
2153 count_chunk_blobs.FetchRow();
2154 if (!retval) {
2155 Error("Failed to collect catalog statistics", data);
2156 return false;
2157 }
2158
2159 stats_counters.self.regular_files = count_regular.RetrieveInt64(0);
2160 stats_counters.self.symlinks = count_symlink.RetrieveInt64(0);
2161 stats_counters.self.specials = count_special.RetrieveInt64(0);
2162 stats_counters.self.directories = count_dir.RetrieveInt64(0);
2163 stats_counters.self.nested_catalogs = data->nested_catalogs.size();
2164 stats_counters.self.chunked_files = count_chunk.RetrieveInt64(0);
2165 stats_counters.self.file_chunks = count_chunk_blobs.RetrieveInt64(0);
2166 stats_counters.self.file_size = count_regular.RetrieveInt64(1);
2167 stats_counters.self.chunked_file_size = count_chunk.RetrieveInt64(1);
2168 stats_counters.self.xattrs = count_xattr.RetrieveInt64(0);
2169 stats_counters.self.externals = count_external.RetrieveInt64(0);
2170 stats_counters.self.external_file_size = count_external.RetrieveInt64(1);
2171
2172 // Write back the generated statistics counters into the catalog database
2173 catalog::Counters counters;
2174 counters.ApplyDelta(stats_counters);
2175 retval = counters.InsertIntoDatabase(writable);
2176 if (!retval) {
2177 Error("Failed to write new statistics counters to database", data);
2178 return false;
2179 }
2180
2181 // Push the generated statistics counters up to the parent catalog
2182 data->nested_statistics.Set(stats_counters);
2183
2184 return true;
2185 }
2186
2187
2188 bool CommandMigrate::StatsMigrationWorker::CommitDatabaseTransaction(
2189 PendingCatalog *data) const
2190 {
2191 assert(!data->HasNew());
2192 GetWritable(data->old_catalog)->Commit();
2193 return true;
2194 }
2195
2196 } // namespace swissknife
2197