GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/swissknife_migrate.cc Lines: 0 811 0.0 %
Date: 2019-02-03 02:48:13 Branches: 0 810 0.0 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 *
4
 * Careful: any real schema migration as of now requires taking care of
5
 * hash algorithm
6
 */
7
8
#include "swissknife_migrate.h"
9
10
#include <sys/resource.h>
11
#include <unistd.h>
12
13
#include "catalog_rw.h"
14
#include "catalog_sql.h"
15
#include "catalog_virtual.h"
16
#include "compression.h"
17
#include "hash.h"
18
#include "logging.h"
19
#include "swissknife_history.h"
20
#include "util_concurrency.h"
21
22
using namespace std;  // NOLINT
23
24
namespace swissknife {
25
26
catalog::DirectoryEntry  CommandMigrate::nested_catalog_marker_;
27
28
CommandMigrate::CommandMigrate() :
29
  file_descriptor_limit_(8192),
30
  catalog_count_(0),
31
  has_committed_new_revision_(false),
32
  uid_(0),
33
  gid_(0),
34
  root_catalog_(NULL)
35
{
36
  atomic_init32(&catalogs_processed_);
37
}
38
39
40
ParameterList CommandMigrate::GetParams() const {
41
  ParameterList r;
42
  r.push_back(Parameter::Mandatory('v',
43
    "migration base version ( 2.0.x | 2.1.7 | chown | hardlink )"));
44
  r.push_back(Parameter::Mandatory('r',
45
    "repository URL (absolute local path or remote URL)"));
46
  r.push_back(Parameter::Mandatory('u', "upstream definition string"));
47
  r.push_back(Parameter::Mandatory('o', "manifest output file"));
48
  r.push_back(Parameter::Mandatory('t',
49
    "temporary directory for catalog decompress"));
50
  r.push_back(Parameter::Optional('p',
51
    "user id to be used for this repository"));
52
  r.push_back(Parameter::Optional('g',
53
    "group id to be used for this repository"));
54
  r.push_back(Parameter::Optional('n', "fully qualified repository name"));
55
  r.push_back(Parameter::Optional('h', "root hash (other than trunk)"));
56
  r.push_back(Parameter::Optional('k', "repository master key(s)"));
57
  r.push_back(Parameter::Optional('i', "UID map for chown"));
58
  r.push_back(Parameter::Optional('j', "GID map for chown"));
59
  r.push_back(Parameter::Switch('f', "fix nested catalog transition points"));
60
  r.push_back(Parameter::Switch('l', "disable linkcount analysis of files"));
61
  r.push_back(Parameter::Switch('s',
62
    "enable collection of catalog statistics"));
63
  return r;
64
}
65
66
67
static void Error(const std::string &message) {
68
  LogCvmfs(kLogCatalog, kLogStderr, message.c_str());
69
}
70
71
72
static void Error(const std::string                     &message,
73
                  const CommandMigrate::PendingCatalog  *catalog) {
74
  const std::string err_msg = message + "\n"
75
                              "Catalog: " + catalog->root_path();
76
  Error(err_msg);
77
}
78
79
80
static void Error(const std::string                     &message,
81
                  const catalog::SqlCatalog             &statement,
82
                  const CommandMigrate::PendingCatalog  *catalog) {
83
  const std::string err_msg =
84
    message + "\n"
85
    "SQLite: " + StringifyInt(statement.GetLastError()) +
86
    " - " + statement.GetLastErrorMsg();
87
  Error(err_msg, catalog);
88
}
89
90
91
int CommandMigrate::Main(const ArgumentList &args) {
92
  shash::Any manual_root_hash;
93
  const std::string &migration_base     = *args.find('v')->second;
94
  const std::string &repo_url           = *args.find('r')->second;
95
  const std::string &spooler            = *args.find('u')->second;
96
  const std::string &manifest_path      = *args.find('o')->second;
97
  const std::string &tmp_dir            = *args.find('t')->second;
98
  const std::string &uid                = (args.count('p') > 0)      ?
99
                                             *args.find('p')->second :
100
                                             "";
101
  const std::string &gid                = (args.count('g') > 0)      ?
102
                                             *args.find('g')->second :
103
                                             "";
104
  const std::string &repo_name          = (args.count('n') > 0)      ?
105
                                             *args.find('n')->second :
106
                                             "";
107
  const std::string &repo_keys          = (args.count('k') > 0)      ?
108
                                             *args.find('k')->second :
109
                                             "";
110
  const std::string &uid_map_path       = (args.count('i') > 0)      ?
111
                                             *args.find('i')->second :
112
                                             "";
113
  const std::string &gid_map_path       = (args.count('j') > 0)      ?
114
                                             *args.find('j')->second :
115
                                             "";
116
  const bool fix_transition_points      = (args.count('f') > 0);
117
  const bool analyze_file_linkcounts    = (args.count('l') == 0);
118
  const bool collect_catalog_statistics = (args.count('s') > 0);
119
  if (args.count('h') > 0) {
120
    manual_root_hash = shash::MkFromHexPtr(shash::HexPtr(
121
      *args.find('h')->second), shash::kSuffixCatalog);
122
  }
123
124
  // We might need a lot of file descriptors
125
  if (!RaiseFileDescriptorLimit()) {
126
    Error("Failed to raise file descriptor limits");
127
    return 2;
128
  }
129
130
  // Put SQLite into multithreaded mode
131
  if (!ConfigureSQLite()) {
132
    Error("Failed to preconfigure SQLite library");
133
    return 3;
134
  }
135
136
  // Create an upstream spooler
137
  temporary_directory_ = tmp_dir;
138
  const upload::SpoolerDefinition spooler_definition(spooler, shash::kSha1);
139
  spooler_ = upload::Spooler::Construct(spooler_definition);
140
  if (!spooler_) {
141
    Error("Failed to create upstream Spooler.");
142
    return 5;
143
  }
144
  spooler_->RegisterListener(&CommandMigrate::UploadCallback, this);
145
146
  // Load the full catalog hierarchy
147
  LogCvmfs(kLogCatalog, kLogStdout, "Loading current catalog tree...");
148
149
  catalog_loading_stopwatch_.Start();
150
  bool loading_successful = false;
151
  if (IsHttpUrl(repo_url)) {
152
    typedef HttpObjectFetcher<catalog::WritableCatalog> ObjectFetcher;
153
154
    const bool follow_redirects = false;
155
    if (!this->InitDownloadManager(follow_redirects) ||
156
        !this->InitVerifyingSignatureManager(repo_keys)) {
157
      LogCvmfs(kLogCatalog, kLogStderr, "Failed to init repo connection");
158
      return 1;
159
    }
160
161
    ObjectFetcher fetcher(repo_name,
162
                          repo_url,
163
                          tmp_dir,
164
                          download_manager(),
165
                          signature_manager());
166
167
    loading_successful = LoadCatalogs(manual_root_hash, &fetcher);
168
  } else {
169
    typedef LocalObjectFetcher<catalog::WritableCatalog> ObjectFetcher;
170
    ObjectFetcher fetcher(repo_url, tmp_dir);
171
    loading_successful = LoadCatalogs(manual_root_hash, &fetcher);
172
  }
173
  catalog_loading_stopwatch_.Stop();
174
175
  if (!loading_successful) {
176
    Error("Failed to load catalog tree");
177
    return 4;
178
  }
179
180
  LogCvmfs(kLogCatalog, kLogStdout, "Loaded %d catalogs", catalog_count_);
181
  assert(root_catalog_ != NULL);
182
183
  // Do the actual migration step
184
  bool migration_succeeded = false;
185
  if (migration_base == "2.0.x") {
186
    if (!ReadPersona(uid, gid)) {
187
      return 1;
188
    }
189
190
    // Generate and upload a nested catalog marker
191
    if (!GenerateNestedCatalogMarkerChunk()) {
192
      Error("Failed to create a nested catalog marker.");
193
      return 6;
194
    }
195
    spooler_->WaitForUpload();
196
197
    // Configure the concurrent catalog migration facility
198
    MigrationWorker_20x::worker_context context(temporary_directory_,
199
                                                collect_catalog_statistics,
200
                                                fix_transition_points,
201
                                                analyze_file_linkcounts,
202
                                                uid_,
203
                                                gid_);
204
    migration_succeeded =
205
      DoMigrationAndCommit<MigrationWorker_20x>(manifest_path, &context);
206
  } else if (migration_base == "2.1.7") {
207
    MigrationWorker_217::worker_context context(temporary_directory_,
208
                                                collect_catalog_statistics);
209
    migration_succeeded =
210
      DoMigrationAndCommit<MigrationWorker_217>(manifest_path, &context);
211
  } else if (migration_base == "chown") {
212
    UidMap uid_map;
213
    GidMap gid_map;
214
    if (!ReadPersonaMaps(uid_map_path, gid_map_path, &uid_map, &gid_map)) {
215
      Error("Failed to read UID and/or GID map");
216
      return 1;
217
    }
218
    ChownMigrationWorker::worker_context context(temporary_directory_,
219
                                                 collect_catalog_statistics,
220
                                                 uid_map,
221
                                                 gid_map);
222
    migration_succeeded =
223
      DoMigrationAndCommit<ChownMigrationWorker>(manifest_path, &context);
224
  } else if (migration_base == "hardlink") {
225
    HardlinkRemovalMigrationWorker::worker_context
226
      context(temporary_directory_, collect_catalog_statistics);
227
    migration_succeeded =
228
      DoMigrationAndCommit<HardlinkRemovalMigrationWorker>(manifest_path,
229
                                                           &context);
230
  } else {
231
    const std::string err_msg = "Unknown migration base: " + migration_base;
232
    Error(err_msg);
233
    return 1;
234
  }
235
236
  // Check if everything went well
237
  if (!migration_succeeded) {
238
    Error("Migration failed!");
239
    return 5;
240
  }
241
242
  // Analyze collected statistics
243
  if (collect_catalog_statistics && has_committed_new_revision_) {
244
    LogCvmfs(kLogCatalog, kLogStdout, "\nCollected statistics results:");
245
    AnalyzeCatalogStatistics();
246
  }
247
248
  LogCvmfs(kLogCatalog, kLogStdout, "\nCatalog Migration succeeded");
249
  return 0;
250
}
251
252
253
bool CommandMigrate::ReadPersona(const std::string &uid,
254
                                   const std::string &gid) {
255
  if (uid.empty()) {
256
    Error("Please provide a user ID");
257
    return false;
258
  }
259
  if (gid.empty()) {
260
    Error("Please provide a group ID");
261
    return false;
262
  }
263
264
  uid_ = String2Int64(uid);
265
  gid_ = String2Int64(gid);
266
  return true;
267
}
268
269
270
271
bool CommandMigrate::ReadPersonaMaps(const std::string &uid_map_path,
272
                                     const std::string &gid_map_path,
273
                                           UidMap      *uid_map,
274
                                           GidMap      *gid_map) const {
275
  if (!uid_map->Read(uid_map_path) || !uid_map->IsValid()) {
276
    Error("Failed to read UID map");
277
    return false;
278
  }
279
280
  if (!gid_map->Read(gid_map_path) || !gid_map->IsValid()) {
281
    Error("Failed to read GID map");
282
    return false;
283
  }
284
285
  if (uid_map->RuleCount() == 0 && !uid_map->HasDefault()) {
286
    Error("UID map appears to be empty");
287
    return false;
288
  }
289
290
  if (gid_map->RuleCount() == 0 && !gid_map->HasDefault()) {
291
    Error("GID map appears to be empty");
292
    return false;
293
  }
294
295
  return true;
296
}
297
298
299
void CommandMigrate::UploadHistoryClosure(
300
  const upload::SpoolerResult &result,
301
  Future<shash::Any> *hash)
302
{
303
  assert(!result.IsChunked());
304
  if (result.return_code != 0) {
305
    LogCvmfs(kLogCvmfs, kLogStderr, "failed to upload history database (%d)",
306
             result.return_code);
307
    hash->Set(shash::Any());
308
  } else {
309
    hash->Set(result.content_hash);
310
  }
311
}
312
313
314
bool CommandMigrate::UpdateUndoTags(
315
  PendingCatalog *root_catalog,
316
  unsigned revision,
317
  time_t timestamp,
318
  shash::Any *history_hash)
319
{
320
  string filename_old = history_upstream_->filename();
321
  string filename_new = filename_old + ".new";
322
  bool retval = CopyPath2Path(filename_old, filename_new);
323
  if (!retval) return false;
324
  UniquePtr<history::SqliteHistory> history(
325
    history::SqliteHistory::OpenWritable(filename_new));
326
  history->TakeDatabaseFileOwnership();
327
328
  history::History::Tag tag_trunk;
329
  bool exists = history->GetByName(CommandTag::kHeadTag, &tag_trunk);
330
  if (exists) {
331
    retval = history->Remove(CommandTag::kHeadTag);
332
    if (!retval) return false;
333
334
    history::History::Tag tag_trunk_previous = tag_trunk;
335
    tag_trunk_previous.name = CommandTag::kPreviousHeadTag;
336
    tag_trunk_previous.description = CommandTag::kPreviousHeadTagDescription;
337
    history->Remove(CommandTag::kPreviousHeadTag);
338
339
    tag_trunk.root_hash = root_catalog->new_catalog_hash;
340
    tag_trunk.size = root_catalog->new_catalog_size;
341
    tag_trunk.revision = root_catalog->new_catalog_size;
342
    tag_trunk.revision = revision;
343
    tag_trunk.timestamp = timestamp;
344
345
    retval = history->Insert(tag_trunk_previous);
346
    if (!retval) return false;
347
    retval = history->Insert(tag_trunk);
348
    if (!retval) return false;
349
  }
350
351
  history->SetPreviousRevision(manifest_upstream_->history());
352
  history->DropDatabaseFileOwnership();
353
  history.Destroy();
354
355
  Future<shash::Any> history_hash_new;
356
  upload::Spooler::CallbackPtr callback = spooler_->RegisterListener(
357
    &CommandMigrate::UploadHistoryClosure, this, &history_hash_new);
358
  spooler_->ProcessHistory(filename_new);
359
  spooler_->WaitForUpload();
360
  spooler_->UnregisterListener(callback);
361
  unlink(filename_new.c_str());
362
  *history_hash = history_hash_new.Get();
363
  if (history_hash->IsNull()) {
364
    Error("failed to upload tag database");
365
    return false;
366
  }
367
368
  return true;
369
}
370
371
372
template <class MigratorT>
373
bool CommandMigrate::DoMigrationAndCommit(
374
  const std::string                   &manifest_path,
375
  typename MigratorT::worker_context  *context
376
) {
377
  // Create a concurrent migration context for catalog migration
378
  const unsigned int cpus = GetNumberOfCpuCores();
379
  ConcurrentWorkers<MigratorT> concurrent_migration(cpus, cpus * 10, context);
380
381
  if (!concurrent_migration.Initialize()) {
382
    Error("Failed to initialize worker migration system.");
383
    return false;
384
  }
385
  concurrent_migration.RegisterListener(&CommandMigrate::MigrationCallback,
386
                                         this);
387
388
  // Migrate catalogs recursively (starting with the deepest nested catalogs)
389
  LogCvmfs(kLogCatalog, kLogStdout, "\nMigrating catalogs...");
390
  PendingCatalog *root_catalog = new PendingCatalog(root_catalog_);
391
  migration_stopwatch_.Start();
392
  ConvertCatalogsRecursively(root_catalog, &concurrent_migration);
393
  concurrent_migration.WaitForEmptyQueue();
394
  spooler_->WaitForUpload();
395
  spooler_->UnregisterListeners();
396
  migration_stopwatch_.Stop();
397
398
  // check for possible errors during the migration process
399
  const unsigned int errors = concurrent_migration.GetNumberOfFailedJobs() +
400
                              spooler_->GetNumberOfErrors();
401
  LogCvmfs(kLogCatalog, kLogStdout,
402
           "Catalog Migration finished with %d errors.", errors);
403
  if (errors > 0) {
404
    LogCvmfs(kLogCatalog, kLogStdout,
405
             "\nCatalog Migration produced errors\nAborting...");
406
    return false;
407
  }
408
409
  if (root_catalog->was_updated.Get()) {
410
    LogCvmfs(kLogCatalog, kLogStdout,
411
             "\nCommitting migrated repository revision...");
412
    manifest::Manifest manifest = *manifest_upstream_;
413
    manifest.set_catalog_hash(root_catalog->new_catalog_hash);
414
    manifest.set_catalog_size(root_catalog->new_catalog_size);
415
    manifest.set_root_path(root_catalog->root_path());
416
    const catalog::Catalog* new_catalog = (root_catalog->HasNew())
417
                                          ? root_catalog->new_catalog
418
                                          : root_catalog->old_catalog;
419
    manifest.set_ttl(new_catalog->GetTTL());
420
    manifest.set_revision(new_catalog->GetRevision());
421
422
    // Commit the new (migrated) repository revision...
423
    if (history_upstream_.IsValid()) {
424
      shash::Any history_hash(manifest_upstream_->history());
425
      LogCvmfs(kLogCatalog, kLogStdout | kLogNoLinebreak,
426
               "Updating repository tag database... ");
427
      if (!UpdateUndoTags(root_catalog,
428
                          new_catalog->GetRevision(),
429
                          new_catalog->GetLastModified(),
430
                          &history_hash))
431
      {
432
        Error("Updateing tag database failed.\nAborting...");
433
        return false;
434
      }
435
      manifest.set_history(history_hash);
436
      LogCvmfs(kLogCvmfs, kLogStdout, "%s", history_hash.ToString().c_str());
437
    }
438
439
    if (!manifest.Export(manifest_path)) {
440
      Error("Manifest export failed.\nAborting...");
441
      return false;
442
    }
443
    has_committed_new_revision_ = true;
444
  } else {
445
    LogCvmfs(kLogCatalog, kLogStdout,
446
             "\nNo catalogs migrated, skipping the commit...");
447
  }
448
449
  // Get rid of the open root catalog
450
  delete root_catalog;
451
452
  return true;
453
}
454
455
456
void CommandMigrate::CatalogCallback(
457
                   const CatalogTraversalData<catalog::WritableCatalog> &data) {
458
  std::string tree_indent;
459
  std::string hash_string;
460
  std::string path;
461
462
  for (unsigned int i = 1; i < data.tree_level; ++i) {
463
    tree_indent += "\u2502  ";
464
  }
465
466
  if (data.tree_level > 0) {
467
    tree_indent += "\u251C\u2500 ";
468
  }
469
470
  hash_string = data.catalog_hash.ToString();
471
472
  path = data.catalog->mountpoint().ToString();
473
  if (path.empty()) {
474
    path = "/";
475
    root_catalog_ = data.catalog;
476
  }
477
478
  LogCvmfs(kLogCatalog, kLogStdout, "%s%s %s",
479
    tree_indent.c_str(),
480
    hash_string.c_str(),
481
    path.c_str());
482
483
  ++catalog_count_;
484
}
485
486
487
void CommandMigrate::MigrationCallback(PendingCatalog *const &data) {
488
  // Check if the migration of the catalog was successful
489
  if (!data->success) {
490
    Error("Catalog migration failed! Aborting...");
491
    exit(1);
492
    return;
493
  }
494
495
  if (!data->HasChanges()) {
496
    PrintStatusMessage(data, data->GetOldContentHash(), "preserved");
497
    data->was_updated.Set(false);
498
    return;
499
  }
500
501
  const string &path = (data->HasNew()) ? data->new_catalog->database_path()
502
                                        : data->old_catalog->database_path();
503
504
  // Save the processed catalog in the pending map
505
  {
506
    LockGuard<PendingCatalogMap> guard(&pending_catalogs_);
507
    assert(pending_catalogs_.find(path) == pending_catalogs_.end());
508
    pending_catalogs_[path] = data;
509
  }
510
  catalog_statistics_list_.Insert(data->statistics);
511
512
  // check the size of the uncompressed catalog file
513
  size_t new_catalog_size = GetFileSize(path);
514
  if (new_catalog_size <= 0) {
515
    Error("Failed to get uncompressed file size of catalog!", data);
516
    exit(2);
517
    return;
518
  }
519
  data->new_catalog_size = new_catalog_size;
520
521
  // Schedule the compression and upload of the catalog
522
  spooler_->ProcessCatalog(path);
523
}
524
525
526
void CommandMigrate::UploadCallback(const upload::SpoolerResult &result) {
527
  const string &path = result.local_path;
528
529
  // Check if the upload was successful
530
  if (result.return_code != 0) {
531
    Error("Failed to upload file " + path + "\nAborting...");
532
    exit(2);
533
    return;
534
  }
535
  assert(result.file_chunks.size() == 0);
536
537
  // Remove the just uploaded file
538
  unlink(path.c_str());
539
540
  // Uploaded nested catalog marker... generate and cache DirectoryEntry for it
541
  if (path == nested_catalog_marker_tmp_path_) {
542
    CreateNestedCatalogMarkerDirent(result.content_hash);
543
    return;
544
  } else {
545
    // Find the catalog path in the pending catalogs and remove it from the list
546
    PendingCatalog *catalog;
547
    {
548
      LockGuard<PendingCatalogMap> guard(&pending_catalogs_);
549
      PendingCatalogMap::iterator i = pending_catalogs_.find(path);
550
      assert(i != pending_catalogs_.end());
551
      catalog = const_cast<PendingCatalog*>(i->second);
552
      pending_catalogs_.erase(i);
553
    }
554
555
    PrintStatusMessage(catalog, result.content_hash, "migrated and uploaded");
556
557
    // The catalog is completely processed... fill the content_hash to allow the
558
    // processing of parent catalogs (Notified by 'was_updated'-future)
559
    // NOTE: From now on, this PendingCatalog structure could be deleted and
560
    //       should not be used anymore!
561
    catalog->new_catalog_hash = result.content_hash;
562
    catalog->was_updated.Set(true);
563
  }
564
}
565
566
567
void CommandMigrate::PrintStatusMessage(const PendingCatalog *catalog,
568
                                        const shash::Any     &content_hash,
569
                                        const std::string    &message) {
570
  atomic_inc32(&catalogs_processed_);
571
  const unsigned int processed = (atomic_read32(&catalogs_processed_) * 100) /
572
                                  catalog_count_;
573
  LogCvmfs(kLogCatalog, kLogStdout, "[%d%%] %s %sC %s",
574
           processed,
575
           message.c_str(),
576
           content_hash.ToString().c_str(),
577
           catalog->root_path().c_str());
578
}
579
580
581
template <class MigratorT>
582
void CommandMigrate::ConvertCatalogsRecursively(PendingCatalog *catalog,
583
                                                MigratorT       *migrator) {
584
  // First migrate all nested catalogs (depth first traversal)
585
  const catalog::CatalogList nested_catalogs =
586
    catalog->old_catalog->GetChildren();
587
  catalog::CatalogList::const_iterator i    = nested_catalogs.begin();
588
  catalog::CatalogList::const_iterator iend = nested_catalogs.end();
589
  catalog->nested_catalogs.reserve(nested_catalogs.size());
590
  for (; i != iend; ++i) {
591
    PendingCatalog *new_nested = new PendingCatalog(*i);
592
    catalog->nested_catalogs.push_back(new_nested);
593
    ConvertCatalogsRecursively(new_nested, migrator);
594
  }
595
596
  // Migrate this catalog referencing all its (already migrated) children
597
  migrator->Schedule(catalog);
598
}
599
600
601
bool CommandMigrate::RaiseFileDescriptorLimit() const {
602
  struct rlimit rpl;
603
  memset(&rpl, 0, sizeof(rpl));
604
  getrlimit(RLIMIT_NOFILE, &rpl);
605
  if (rpl.rlim_cur < file_descriptor_limit_) {
606
    if (rpl.rlim_max < file_descriptor_limit_)
607
      rpl.rlim_max = file_descriptor_limit_;
608
    rpl.rlim_cur = file_descriptor_limit_;
609
    const bool retval = setrlimit(RLIMIT_NOFILE, &rpl);
610
    if (retval != 0) {
611
      return false;
612
    }
613
  }
614
  return true;
615
}
616
617
618
bool CommandMigrate::ConfigureSQLite() const {
619
  int retval = sqlite3_config(SQLITE_CONFIG_MULTITHREAD);
620
  return (retval == SQLITE_OK);
621
}
622
623
624
void CommandMigrate::AnalyzeCatalogStatistics() const {
625
  const unsigned int number_of_catalogs = catalog_statistics_list_.size();
626
  unsigned int       aggregated_entry_count = 0;
627
  unsigned int       aggregated_max_row_id = 0;
628
  unsigned int       aggregated_hardlink_count = 0;
629
  unsigned int       aggregated_linkcounts = 0;
630
  double             aggregated_migration_time = 0.0;
631
632
  CatalogStatisticsList::const_iterator i    = catalog_statistics_list_.begin();
633
  CatalogStatisticsList::const_iterator iend = catalog_statistics_list_.end();
634
  for (; i != iend; ++i) {
635
    aggregated_entry_count    += i->entry_count;
636
    aggregated_max_row_id     += i->max_row_id;
637
    aggregated_hardlink_count += i->hardlink_group_count;
638
    aggregated_linkcounts     += i->aggregated_linkcounts;
639
    aggregated_migration_time += i->migration_time;
640
  }
641
642
  // Inode quantization
643
  assert(aggregated_max_row_id > 0);
644
  const unsigned int unused_inodes =
645
                                 aggregated_max_row_id - aggregated_entry_count;
646
  const float ratio =
647
    (static_cast<float>(unused_inodes) /
648
     static_cast<float>(aggregated_max_row_id)) * 100.0f;
649
  LogCvmfs(kLogCatalog, kLogStdout, "Actual Entries:                %d\n"
650
                                    "Allocated Inodes:              %d\n"
651
                                    "  Unused Inodes:               %d\n"
652
                                    "  Percentage of wasted Inodes: %.1f%%\n",
653
           aggregated_entry_count, aggregated_max_row_id, unused_inodes, ratio);
654
655
  // Hardlink statistics
656
  const float average_linkcount = (aggregated_hardlink_count > 0)
657
                                  ? aggregated_linkcounts /
658
                                    aggregated_hardlink_count
659
                                  : 0.0f;
660
  LogCvmfs(kLogCatalog, kLogStdout, "Generated Hardlink Groups:     %d\n"
661
                                    "Average Linkcount per Group:   %.1f\n",
662
           aggregated_hardlink_count, average_linkcount);
663
664
  // Performance measures
665
  const double average_migration_time =
666
    aggregated_migration_time / static_cast<double>(number_of_catalogs);
667
  LogCvmfs(kLogCatalog, kLogStdout, "Catalog Loading Time:          %.2fs\n"
668
                                    "Average Migration Time:        %.2fs\n"
669
                                    "Overall Migration Time:        %.2fs\n"
670
                                    "Aggregated Migration Time:     %.2fs\n",
671
           catalog_loading_stopwatch_.GetTime(),
672
           average_migration_time,
673
           migration_stopwatch_.GetTime(),
674
           aggregated_migration_time);
675
}
676
677
678
CommandMigrate::PendingCatalog::~PendingCatalog() {
679
  delete old_catalog;
680
  old_catalog = NULL;
681
682
  if (new_catalog != NULL) {
683
    delete new_catalog;
684
    new_catalog = NULL;
685
  }
686
}
687
688
689
template<class DerivedT>
690
CommandMigrate::AbstractMigrationWorker<DerivedT>::AbstractMigrationWorker(
691
  const worker_context *context)
692
  : temporary_directory_(context->temporary_directory)
693
  , collect_catalog_statistics_(context->collect_catalog_statistics)
694
{ }
695
696
697
template<class DerivedT>
698
CommandMigrate::AbstractMigrationWorker<DerivedT>::~AbstractMigrationWorker() {}
699
700
701
template<class DerivedT>
702
void CommandMigrate::AbstractMigrationWorker<DerivedT>::operator()(
703
                                                    const expected_data &data) {
704
  migration_stopwatch_.Start();
705
  const bool success = static_cast<DerivedT*>(this)->RunMigration(data) &&
706
                       UpdateNestedCatalogReferences(data) &&
707
                       UpdateCatalogMetadata(data)         &&
708
                       CollectAndAggregateStatistics(data) &&
709
                       CleanupNestedCatalogs(data);
710
  data->success = success;
711
  migration_stopwatch_.Stop();
712
713
  data->statistics.migration_time = migration_stopwatch_.GetTime();
714
  migration_stopwatch_.Reset();
715
716
  // Note: MigrationCallback() will take care of the result...
717
  if (success) {
718
    ConcurrentWorker<DerivedT>::master()->JobSuccessful(data);
719
  } else {
720
    ConcurrentWorker<DerivedT>::master()->JobFailed(data);
721
  }
722
}
723
724
725
template<class DerivedT>
726
bool CommandMigrate::AbstractMigrationWorker<DerivedT>::
727
     UpdateNestedCatalogReferences(PendingCatalog *data) const
728
{
729
  const catalog::Catalog *new_catalog =
730
    (data->HasNew()) ? data->new_catalog : data->old_catalog;
731
  const catalog::CatalogDatabase &writable = new_catalog->database();
732
733
  catalog::SqlCatalog add_nested_catalog(writable,
734
    "INSERT OR REPLACE INTO nested_catalogs (path,   sha1,  size) "
735
    "                VALUES                 (:path, :sha1, :size);");
736
737
  // go through all nested catalogs and update their references (we are curently
738
  // in their parent catalog)
739
  // Note: we might need to wait for the nested catalog to be fully processed.
740
  PendingCatalogList::const_iterator i    = data->nested_catalogs.begin();
741
  PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
742
  for (; i != iend; ++i) {
743
    PendingCatalog    *nested_catalog  = *i;
744
745
    if (!nested_catalog->was_updated.Get()) {
746
      continue;
747
    }
748
749
    const std::string &root_path    = nested_catalog->root_path();
750
    const shash::Any   catalog_hash = nested_catalog->new_catalog_hash;
751
    const size_t       catalog_size = nested_catalog->new_catalog_size;
752
753
    // insert the updated nested catalog reference into the new catalog
754
    const bool retval =
755
      add_nested_catalog.BindText(1, root_path)               &&
756
      add_nested_catalog.BindText(2, catalog_hash.ToString()) &&
757
      add_nested_catalog.BindInt64(3, catalog_size)           &&
758
      add_nested_catalog.Execute();
759
    if (!retval) {
760
      Error("Failed to add nested catalog link", add_nested_catalog, data);
761
      return false;
762
    }
763
    add_nested_catalog.Reset();
764
  }
765
766
  return true;
767
}
768
769
770
template<class DerivedT>
771
bool CommandMigrate::AbstractMigrationWorker<DerivedT>::
772
     UpdateCatalogMetadata(PendingCatalog *data) const
773
{
774
  if (!data->HasChanges()) {
775
    return true;
776
  }
777
778
  catalog::WritableCatalog *catalog =
779
    (data->HasNew()) ? data->new_catalog : GetWritable(data->old_catalog);
780
781
  // Set the previous revision hash in the new catalog to the old catalog
782
  // we are doing the whole migration as a new snapshot that does not change
783
  // any files, but just applies the necessary data schema migrations
784
  catalog->SetPreviousRevision(data->old_catalog->hash());
785
  catalog->IncrementRevision();
786
  catalog->UpdateLastModified();
787
788
  return true;
789
}
790
791
792
template<class DerivedT>
793
bool CommandMigrate::AbstractMigrationWorker<DerivedT>::
794
     CollectAndAggregateStatistics(PendingCatalog *data) const
795
{
796
  if (!collect_catalog_statistics_) {
797
    return true;
798
  }
799
800
  const catalog::Catalog *new_catalog =
801
    (data->HasNew()) ? data->new_catalog : data->old_catalog;
802
  const catalog::CatalogDatabase &writable = new_catalog->database();
803
  bool retval;
804
805
  // Find out the discrepancy between MAX(rowid) and COUNT(*)
806
  catalog::SqlCatalog wasted_inodes(writable,
807
    "SELECT COUNT(*), MAX(rowid) FROM catalog;");
808
  retval = wasted_inodes.FetchRow();
809
  if (!retval) {
810
    Error("Failed to count entries in catalog", wasted_inodes, data);
811
    return false;
812
  }
813
  const unsigned int entry_count = wasted_inodes.RetrieveInt64(0);
814
  const unsigned int max_row_id  = wasted_inodes.RetrieveInt64(1);
815
816
  // Save collected information into the central statistics aggregator
817
  data->statistics.root_path   = data->root_path();
818
  data->statistics.max_row_id  = max_row_id;
819
  data->statistics.entry_count = entry_count;
820
821
  return true;
822
}
823
824
825
template<class DerivedT>
826
bool CommandMigrate::AbstractMigrationWorker<DerivedT>::CleanupNestedCatalogs(
827
  PendingCatalog *data) const
828
{
829
  // All nested catalogs of PendingCatalog 'data' are fully processed and
830
  // accounted. It is safe to get rid of their data structures here!
831
  PendingCatalogList::const_iterator i    = data->nested_catalogs.begin();
832
  PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
833
  for (; i != iend; ++i) {
834
    delete *i;
835
  }
836
837
  data->nested_catalogs.clear();
838
  return true;
839
}
840
841
842
/**
843
 * Those values _must_ reflect the schema version in catalog_sql.h so that a
844
 * legacy catalog migration generates always the latest catalog revision.
845
 * This is a deliberately duplicated piece of information to ensure that always
846
 * both the catalog management and migration classes get updated.
847
 */
848
const float    CommandMigrate::MigrationWorker_20x::kSchema         = 2.5;
849
const unsigned CommandMigrate::MigrationWorker_20x::kSchemaRevision = 5;
850
851
852
template<class DerivedT>
853
catalog::WritableCatalog*
854
CommandMigrate::AbstractMigrationWorker<DerivedT>::GetWritable(
855
                                        const catalog::Catalog *catalog) const {
856
  return dynamic_cast<catalog::WritableCatalog*>(const_cast<catalog::Catalog*>(
857
    catalog));
858
}
859
860
861
CommandMigrate::MigrationWorker_20x::MigrationWorker_20x(
862
  const worker_context *context)
863
  : AbstractMigrationWorker<MigrationWorker_20x>(context)
864
  , fix_nested_catalog_transitions_(context->fix_nested_catalog_transitions)
865
  , analyze_file_linkcounts_(context->analyze_file_linkcounts)
866
  , uid_(context->uid)
867
  , gid_(context->gid) { }
868
869
870
bool CommandMigrate::MigrationWorker_20x::RunMigration(PendingCatalog *data)
871
  const
872
{
873
  // double-check that we are generating compatible catalogs to the actual
874
  // catalog management classes
875
  assert(kSchema         == catalog::CatalogDatabase::kLatestSupportedSchema);
876
  assert(kSchemaRevision == catalog::CatalogDatabase::kLatestSchemaRevision);
877
878
  return CreateNewEmptyCatalog(data) &&
879
         CheckDatabaseSchemaCompatibility(data) &&
880
         AttachOldCatalogDatabase(data) &&
881
         StartDatabaseTransaction(data) &&
882
         MigrateFileMetadata(data) &&
883
         MigrateNestedCatalogMountPoints(data) &&
884
         FixNestedCatalogTransitionPoints(data) &&
885
         RemoveDanglingNestedMountpoints(data) &&
886
         GenerateCatalogStatistics(data) &&
887
         FindRootEntryInformation(data) &&
888
         CommitDatabaseTransaction(data) &&
889
         DetachOldCatalogDatabase(data);
890
}
891
892
bool CommandMigrate::MigrationWorker_20x::CreateNewEmptyCatalog(
893
  PendingCatalog *data) const
894
{
895
  const string root_path = data->root_path();
896
897
  // create a new catalog database schema
898
  const string clg_db_path =
899
    CreateTempPath(temporary_directory_ + "/catalog", 0666);
900
  if (clg_db_path.empty()) {
901
    Error("Failed to create temporary file for the new catalog database.");
902
    return false;
903
  }
904
  const bool volatile_content = false;
905
906
  {
907
    // TODO(rmeusel): Attach catalog should work with an open catalog database
908
    // as well, to remove this inefficiency
909
    UniquePtr<catalog::CatalogDatabase>
910
      new_clg_db(catalog::CatalogDatabase::Create(clg_db_path));
911
    if (!new_clg_db.IsValid() ||
912
        !new_clg_db->InsertInitialValues(root_path, volatile_content, "")) {
913
      Error("Failed to create database for new catalog");
914
      unlink(clg_db_path.c_str());
915
      return false;
916
    }
917
  }
918
919
  // Attach the just created nested catalog database
920
  catalog::WritableCatalog *writable_catalog =
921
    catalog::WritableCatalog::AttachFreely(root_path, clg_db_path,
922
                                           shash::Any(shash::kSha1));
923
  if (writable_catalog == NULL) {
924
    Error("Failed to open database for new catalog");
925
    unlink(clg_db_path.c_str());
926
    return false;
927
  }
928
929
  data->new_catalog = writable_catalog;
930
  return true;
931
}
932
933
934
bool CommandMigrate::MigrationWorker_20x::CheckDatabaseSchemaCompatibility(
935
  PendingCatalog *data) const
936
{
937
  const catalog::CatalogDatabase &old_catalog = data->old_catalog->database();
938
  const catalog::CatalogDatabase &new_catalog = data->new_catalog->database();
939
940
  if ((new_catalog.schema_version() <
941
         catalog::CatalogDatabase::kLatestSupportedSchema -
942
         catalog::CatalogDatabase::kSchemaEpsilon
943
       ||
944
       new_catalog.schema_version() >
945
         catalog::CatalogDatabase::kLatestSupportedSchema +
946
         catalog::CatalogDatabase::kSchemaEpsilon)
947
       ||
948
       (old_catalog.schema_version() > 2.1 +
949
         catalog::CatalogDatabase::kSchemaEpsilon))
950
  {
951
    Error("Failed to meet database requirements for migration.", data);
952
    return false;
953
  }
954
  return true;
955
}
956
957
958
bool CommandMigrate::MigrationWorker_20x::AttachOldCatalogDatabase(
959
  PendingCatalog *data) const
960
{
961
  const catalog::CatalogDatabase &old_catalog = data->old_catalog->database();
962
  const catalog::CatalogDatabase &new_catalog = data->new_catalog->database();
963
964
  catalog::SqlCatalog sql_attach_new(new_catalog,
965
    "ATTACH '" + old_catalog.filename() + "' AS old;");
966
  bool retval = sql_attach_new.Execute();
967
968
  // remove the hardlink to the old database file (temporary file), it will not
969
  // be needed anymore... data will get deleted when the database is closed
970
  unlink(data->old_catalog->database().filename().c_str());
971
972
  if (!retval) {
973
    Error("Failed to attach database of old catalog", sql_attach_new, data);
974
    return false;
975
  }
976
  return true;
977
}
978
979
980
bool CommandMigrate::MigrationWorker_20x::StartDatabaseTransaction(
981
  PendingCatalog *data) const
982
{
983
  assert(data->HasNew());
984
  data->new_catalog->Transaction();
985
  return true;
986
}
987
988
989
bool CommandMigrate::MigrationWorker_20x::MigrateFileMetadata(
990
  PendingCatalog *data) const
991
{
992
  assert(!data->new_catalog->IsDirty());
993
  assert(data->HasNew());
994
  bool retval;
995
  const catalog::CatalogDatabase &writable = data->new_catalog->database();
996
997
  // Hardlinks scratch space.
998
  // This temporary table is used for the hardlink analysis results.
999
  // The old catalog format did not have a direct notion of hardlinks and their
1000
  // linkcounts,  but this information can be partly retrieved from the under-
1001
  // lying file system semantics.
1002
  //
1003
  //   Hardlinks:
1004
  //     groupid   : this group id can be used for the new catalog schema
1005
  //     inode     : the inodes that were part of a hardlink group before
1006
  //     linkcount : the linkcount for hardlink group id members
1007
  catalog::SqlCatalog sql_create_hardlinks_table(writable,
1008
    "CREATE TEMPORARY TABLE hardlinks "
1009
    "  (  hardlink_group_id  INTEGER PRIMARY KEY AUTOINCREMENT, "
1010
    "     inode              INTEGER, "
1011
    "     linkcount          INTEGER, "
1012
    "     CONSTRAINT unique_inode UNIQUE (inode)  );");
1013
  retval = sql_create_hardlinks_table.Execute();
1014
  if (!retval) {
1015
    Error("Failed to create temporary hardlink analysis table",
1016
          sql_create_hardlinks_table, data);
1017
    return false;
1018
  }
1019
1020
  // Directory Linkcount scratch space.
1021
  // Directory linkcounts can be obtained from the directory hierarchy reflected
1022
  // in the old style catalogs. The new catalog schema asks for this specific
1023
  // linkcount. Directory linkcount analysis results will be put into this
1024
  // temporary table
1025
  catalog::SqlCatalog sql_create_linkcounts_table(writable,
1026
    "CREATE TEMPORARY TABLE dir_linkcounts "
1027
    "  (  inode      INTEGER PRIMARY KEY, "
1028
    "     linkcount  INTEGER  );");
1029
  retval = sql_create_linkcounts_table.Execute();
1030
  if (!retval) {
1031
    Error("Failed to create tmeporary directory linkcount analysis table",
1032
          sql_create_linkcounts_table, data);
1033
  }
1034
1035
  // It is possible to skip this step.
1036
  // In that case all hardlink inodes with a (potential) linkcount > 1 will get
1037
  // degraded to files containing the same content
1038
  if (analyze_file_linkcounts_) {
1039
    retval = AnalyzeFileLinkcounts(data);
1040
    if (!retval) {
1041
      return false;
1042
    }
1043
  }
1044
1045
  // Analyze the linkcounts of directories
1046
  //   - each directory has a linkcount of at least 2 (empty directory)
1047
  //     (link in parent directory and self reference (cd .) )
1048
  //   - for each child directory, the parent's link count is incremented by 1
1049
  //     (parent reference in child (cd ..) )
1050
  //
1051
  // Note: nested catalog mountpoints will be miscalculated here, since we can't
1052
  //       check the number of containing directories. They are defined in a the
1053
  //       linked nested catalog and need to be added later on.
1054
  //       (see: MigrateNestedCatalogMountPoints() for details)
1055
  catalog::SqlCatalog sql_dir_linkcounts(writable,
1056
    "INSERT INTO dir_linkcounts "
1057
    "  SELECT c1.inode as inode, "
1058
    "         SUM(IFNULL(MIN(c2.inode,1),0)) + 2 as linkcount "
1059
    "  FROM old.catalog as c1 "
1060
    "  LEFT JOIN old.catalog as c2 "
1061
    "    ON c2.parent_1 = c1.md5path_1 AND "
1062
    "       c2.parent_2 = c1.md5path_2 AND "
1063
    "       c2.flags & :flag_dir_1 "
1064
    "  WHERE c1.flags & :flag_dir_2 "
1065
    "  GROUP BY c1.inode;");
1066
  retval =
1067
    sql_dir_linkcounts.BindInt64(1, catalog::SqlDirent::kFlagDir) &&
1068
    sql_dir_linkcounts.BindInt64(2, catalog::SqlDirent::kFlagDir) &&
1069
    sql_dir_linkcounts.Execute();
1070
  if (!retval) {
1071
    Error("Failed to analyze directory specific linkcounts",
1072
          sql_dir_linkcounts, data);
1073
    if (sql_dir_linkcounts.GetLastError() == SQLITE_CONSTRAINT) {
1074
      Error("Obviously your catalogs are corrupted, since we found a directory"
1075
            "inode that is a file inode at the same time!");
1076
    }
1077
    return false;
1078
  }
1079
1080
  // Copy the old file meta information into the new catalog schema
1081
  //   here we also add the previously analyzed hardlink/linkcount information
1082
  //   from both temporary tables "hardlinks" and "dir_linkcounts".
1083
  //
1084
  // Note: nested catalog mountpoints still need to be treated separately
1085
  //       (see MigrateNestedCatalogMountPoints() for details)
1086
  catalog::SqlCatalog migrate_file_meta_data(writable,
1087
    "INSERT INTO catalog "
1088
    "  SELECT md5path_1, md5path_2, "
1089
    "         parent_1, parent_2, "
1090
    "         IFNULL(hardlink_group_id, 0) << 32 | "
1091
    "         COALESCE(hardlinks.linkcount, dir_linkcounts.linkcount, 1) "
1092
    "           AS hardlinks, "
1093
    "         hash, size, mode, mtime, "
1094
    "         flags, name, symlink, "
1095
    "         :uid, "
1096
    "         :gid, "
1097
    "         NULL "  // set empty xattr BLOB (default)
1098
    "  FROM old.catalog "
1099
    "  LEFT JOIN hardlinks "
1100
    "    ON catalog.inode = hardlinks.inode "
1101
    "  LEFT JOIN dir_linkcounts "
1102
    "    ON catalog.inode = dir_linkcounts.inode;");
1103
  retval = migrate_file_meta_data.BindInt64(1, uid_) &&
1104
           migrate_file_meta_data.BindInt64(2, gid_) &&
1105
           migrate_file_meta_data.Execute();
1106
  if (!retval) {
1107
    Error("Failed to migrate the file system meta data",
1108
          migrate_file_meta_data, data);
1109
    return false;
1110
  }
1111
1112
  // If we deal with a nested catalog, we need to add a .cvmfscatalog entry
1113
  // since it was not present in the old repository specification but is needed
1114
  // now!
1115
  if (!data->IsRoot()) {
1116
    const catalog::DirectoryEntry &nested_marker =
1117
      CommandMigrate::GetNestedCatalogMarkerDirent();
1118
    catalog::SqlDirentInsert insert_nested_marker(writable);
1119
    const std::string   root_path   = data->root_path();
1120
    const std::string   file_path   = root_path +
1121
                                      "/" + nested_marker.name().ToString();
1122
    const shash::Md5    &path_hash   = shash::Md5(file_path.data(),
1123
                                                  file_path.size());
1124
    const shash::Md5    &parent_hash = shash::Md5(root_path.data(),
1125
                                                  root_path.size());
1126
    retval = insert_nested_marker.BindPathHash(path_hash)         &&
1127
             insert_nested_marker.BindParentPathHash(parent_hash) &&
1128
             insert_nested_marker.BindDirent(nested_marker)       &&
1129
             insert_nested_marker.BindXattrEmpty()                &&
1130
             insert_nested_marker.Execute();
1131
    if (!retval) {
1132
      Error("Failed to insert nested catalog marker into new nested catalog.",
1133
            insert_nested_marker, data);
1134
      return false;
1135
    }
1136
  }
1137
1138
  // Copy (and update) the properties fields
1139
  //
1140
  // Note: The 'schema' is explicitly not copied to the new catalog.
1141
  //       Each catalog contains a revision, which is also copied here and that
1142
  //       is later updated by calling catalog->IncrementRevision()
1143
  catalog::SqlCatalog copy_properties(writable,
1144
    "INSERT OR REPLACE INTO properties "
1145
    "  SELECT key, value "
1146
    "  FROM old.properties "
1147
    "  WHERE key != 'schema';");
1148
  retval = copy_properties.Execute();
1149
  if (!retval) {
1150
    Error("Failed to migrate the properties table.", copy_properties, data);
1151
    return false;
1152
  }
1153
1154
  return true;
1155
}
1156
1157
1158
bool CommandMigrate::MigrationWorker_20x::AnalyzeFileLinkcounts(
1159
  PendingCatalog *data) const
1160
{
1161
  assert(data->HasNew());
1162
  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1163
  bool retval;
1164
1165
  // Analyze the hardlink relationships in the old catalog
1166
  //   inodes used to be assigned at publishing time, implicitly constituating
1167
  //   those relationships. We now need them explicitly in the file catalogs
1168
  // This looks for directory entries with matching inodes but differing path-
1169
  // hashes and saves the results in a temporary table called 'hl_scratch'
1170
  //
1171
  // Note: We only support hardlink groups that reside in the same directory!
1172
  //       Therefore we first need to figure out hardlink candidates (which
1173
  //       might still contain hardlink groups spanning more than one directory)
1174
  //       In a second step these candidates will be analyzed to kick out un-
1175
  //       supported hardlink groups.
1176
  //       Unsupported hardlink groups will be be treated as normal files with
1177
  //       the same content
1178
  catalog::SqlCatalog sql_create_hardlinks_scratch_table(writable,
1179
    "CREATE TEMPORARY TABLE hl_scratch AS "
1180
    "  SELECT c1.inode AS inode, c1.md5path_1, c1.md5path_2, "
1181
    "         c1.parent_1 as c1p1, c1.parent_2 as c1p2, "
1182
    "         c2.parent_1 as c2p1, c2.parent_2 as c2p2 "
1183
    "  FROM old.catalog AS c1 "
1184
    "  INNER JOIN old.catalog AS c2 "
1185
    "  ON c1.inode == c2.inode AND "
1186
    "     (c1.md5path_1 != c2.md5path_1 OR "
1187
    "      c1.md5path_2 != c2.md5path_2);");
1188
  retval = sql_create_hardlinks_scratch_table.Execute();
1189
  if (!retval) {
1190
    Error("Failed to create temporary scratch table for hardlink analysis",
1191
          sql_create_hardlinks_scratch_table, data);
1192
    return false;
1193
  }
1194
1195
  // Figures out which hardlink candidates are supported by CVMFS and can be
1196
  // transferred into the new catalog as so called hardlink groups. Unsupported
1197
  // hardlinks need to be discarded and treated as normal files containing the
1198
  // exact same data
1199
  catalog::SqlCatalog fill_linkcount_table_for_files(writable,
1200
    "INSERT INTO hardlinks (inode, linkcount)"
1201
    "  SELECT inode, count(*) as linkcount "
1202
    "  FROM ( "
1203
         // recombine supported hardlink inodes with their actual manifested
1204
         // hard-links in the catalog.
1205
         // Note: for each directory entry pointing to the same supported
1206
         // hardlink inode we have a distinct MD5 path hash
1207
    "    SELECT DISTINCT hl.inode, hl.md5path_1, hl.md5path_2 "
1208
    "    FROM ( "
1209
           // sort out supported hardlink inodes from unsupported ones by
1210
           // locality
1211
           // Note: see the next comment for the nested SELECT
1212
    "      SELECT inode "
1213
    "      FROM ( "
1214
    "        SELECT inode, count(*) AS cnt "
1215
    "        FROM ( "
1216
               // go through the potential hardlinks and collect location infor-
1217
               // mation about them.
1218
               // Note: we only support hardlinks that all reside in the same
1219
               //       directory, thus having the same parent (c1p* == c2p*)
1220
               //   --> For supported hardlink candidates the SELECT DISTINCT
1221
               // will produce only a single row, whereas others produce more
1222
    "          SELECT DISTINCT inode,c1p1,c1p1,c2p1,c2p2 "
1223
    "          FROM hl_scratch AS hl "
1224
    "        ) "
1225
    "        GROUP BY inode "
1226
    "      ) "
1227
    "      WHERE cnt = 1 "
1228
    "    ) AS supported_hardlinks "
1229
    "    LEFT JOIN hl_scratch AS hl "
1230
    "    ON supported_hardlinks.inode = hl.inode "
1231
    "  ) "
1232
    "  GROUP BY inode;");
1233
  retval = fill_linkcount_table_for_files.Execute();
1234
  if (!retval) {
1235
    Error("Failed to analyze hardlink relationships for files.",
1236
          fill_linkcount_table_for_files, data);
1237
    return false;
1238
  }
1239
1240
  // The file linkcount and hardlink analysis is finished and the scratch table
1241
  // can be deleted...
1242
  catalog::SqlCatalog drop_hardlink_scratch_space(writable,
1243
                                                  "DROP TABLE hl_scratch;");
1244
  retval = drop_hardlink_scratch_space.Execute();
1245
  if (!retval) {
1246
    Error("Failed to remove file linkcount analysis scratch table",
1247
          drop_hardlink_scratch_space, data);
1248
    return false;
1249
  }
1250
1251
  // Do some statistics if asked for...
1252
  if (collect_catalog_statistics_) {
1253
    catalog::SqlCatalog count_hardlinks(writable,
1254
      "SELECT count(*), sum(linkcount) FROM hardlinks;");
1255
    retval = count_hardlinks.FetchRow();
1256
    if (!retval) {
1257
      Error("Failed to count the generated file hardlinks for statistics",
1258
            count_hardlinks, data);
1259
      return false;
1260
    }
1261
1262
    data->statistics.hardlink_group_count  += count_hardlinks.RetrieveInt64(0);
1263
    data->statistics.aggregated_linkcounts += count_hardlinks.RetrieveInt64(1);
1264
  }
1265
1266
  return true;
1267
}
1268
1269
1270
bool CommandMigrate::MigrationWorker_20x::MigrateNestedCatalogMountPoints(
1271
  PendingCatalog *data) const
1272
{
1273
  assert(data->HasNew());
1274
  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1275
  bool retval;
1276
1277
  // preparing the SQL statement for nested catalog mountpoint update
1278
  catalog::SqlCatalog update_mntpnt_linkcount(writable,
1279
    "UPDATE catalog "
1280
    "SET hardlinks = :linkcount "
1281
    "WHERE md5path_1 = :md5_1 AND md5path_2 = :md5_2;");
1282
1283
  // update all nested catalog mountpoints
1284
  // (Note: we might need to wait for the nested catalog to be processed)
1285
  PendingCatalogList::const_iterator i    = data->nested_catalogs.begin();
1286
  PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
1287
  for (; i != iend; ++i) {
1288
    // collect information about the nested catalog
1289
    PendingCatalog *nested_catalog = *i;
1290
    const catalog::DirectoryEntry root_entry = nested_catalog->root_entry.Get();
1291
    const string &root_path = nested_catalog->root_path();
1292
1293
    // update the nested catalog mountpoint directory entry with the correct
1294
    // linkcount that was determined while processing the nested catalog
1295
    const shash::Md5 mountpoint_hash = shash::Md5(root_path.data(),
1296
                                                  root_path.size());
1297
    retval =
1298
      update_mntpnt_linkcount.BindInt64(1, root_entry.linkcount()) &&
1299
      update_mntpnt_linkcount.BindMd5(2, 3, mountpoint_hash)       &&
1300
      update_mntpnt_linkcount.Execute();
1301
    if (!retval) {
1302
      Error("Failed to update linkcount of nested catalog mountpoint",
1303
            update_mntpnt_linkcount, data);
1304
      return false;
1305
    }
1306
    update_mntpnt_linkcount.Reset();
1307
  }
1308
1309
  return true;
1310
}
1311
1312
1313
bool CommandMigrate::MigrationWorker_20x::FixNestedCatalogTransitionPoints(
1314
  PendingCatalog *data) const
1315
{
1316
  assert(data->HasNew());
1317
  if (!fix_nested_catalog_transitions_) {
1318
    // Fixing transition point mismatches is not enabled...
1319
    return true;
1320
  }
1321
1322
  typedef catalog::DirectoryEntry::Difference Difference;
1323
1324
  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1325
  bool retval;
1326
1327
  catalog::SqlLookupPathHash lookup_mountpoint(writable);
1328
  catalog::SqlDirentUpdate   update_directory_entry(writable);
1329
1330
  // Unbox the nested catalogs (possibly waiting for migration of them first)
1331
  PendingCatalogList::const_iterator i    = data->nested_catalogs.begin();
1332
  PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
1333
  for (; i != iend; ++i) {
1334
    // Collect information about the nested catalog
1335
    PendingCatalog *nested_catalog = *i;
1336
    const catalog::DirectoryEntry nested_root_entry =
1337
      nested_catalog->root_entry.Get();
1338
    const string &nested_root_path = nested_catalog->root_path();
1339
    const shash::Md5 mountpoint_path_hash =
1340
      shash::Md5(nested_root_path.data(), nested_root_path.size());
1341
1342
    // Retrieve the nested catalog mountpoint from the current catalog
1343
    retval = lookup_mountpoint.BindPathHash(mountpoint_path_hash) &&
1344
             lookup_mountpoint.FetchRow();
1345
    if (!retval) {
1346
      Error("Failed to fetch nested catalog mountpoint to check for compatible"
1347
            "transition points", lookup_mountpoint, data);
1348
      return false;
1349
    }
1350
1351
    catalog::DirectoryEntry mountpoint_entry =
1352
      lookup_mountpoint.GetDirent(data->new_catalog);
1353
    lookup_mountpoint.Reset();
1354
1355
    // Compare nested catalog mountpoint and nested catalog root entries
1356
    catalog::DirectoryEntry::Differences diffs =
1357
      mountpoint_entry.CompareTo(nested_root_entry);
1358
1359
    // We MUST deal with two directory entries that are a pair of nested cata-
1360
    // log mountpoint and root entry! Thus we expect their transition flags to
1361
    // differ and their name to be the same.
1362
    assert(diffs & Difference::kNestedCatalogTransitionFlags);
1363
    assert((diffs & Difference::kName) == 0);
1364
1365
    // Check if there are other differences except the nested catalog transition
1366
    // flags and fix them...
1367
    if ((diffs ^ Difference::kNestedCatalogTransitionFlags) != 0) {
1368
      // If we found differences, we still assume a couple of directory entry
1369
      // fields to be the same, otherwise some severe stuff would be wrong...
1370
      if ((diffs & Difference::kChecksum)        ||
1371
          (diffs & Difference::kLinkcount)       ||
1372
          (diffs & Difference::kSymlink)         ||
1373
          (diffs & Difference::kChunkedFileFlag)    )
1374
      {
1375
        Error("Found an irreparable mismatch in a nested catalog transition "
1376
              "point at '" + nested_root_path + "'\nAborting...\n");
1377
      }
1378
1379
      // Copy the properties from the nested catalog root entry into the mount-
1380
      // point entry to bring them in sync again
1381
      CommandMigrate::FixNestedCatalogTransitionPoint(
1382
        nested_root_entry, &mountpoint_entry);
1383
1384
      // save the nested catalog mountpoint entry into the catalog
1385
      retval = update_directory_entry.BindPathHash(mountpoint_path_hash) &&
1386
               update_directory_entry.BindDirent(mountpoint_entry)       &&
1387
               update_directory_entry.Execute();
1388
      if (!retval) {
1389
        Error("Failed to save resynchronized nested catalog mountpoint into "
1390
              "catalog database", update_directory_entry, data);
1391
        return false;
1392
      }
1393
      update_directory_entry.Reset();
1394
1395
      // Fixing of this mountpoint went well... inform the user that this minor
1396
      // issue occured
1397
      LogCvmfs(kLogCatalog, kLogStdout, "NOTE: fixed incompatible nested "
1398
                                        "catalog transition point at: '%s' ",
1399
               nested_root_path.c_str());
1400
    }
1401
  }
1402
1403
  return true;
1404
}
1405
1406
1407
void CommandMigrate::FixNestedCatalogTransitionPoint(
1408
  const catalog::DirectoryEntry &nested_root,
1409
  catalog::DirectoryEntry *mountpoint
1410
) {
1411
  // Replace some file system parameters in the mountpoint to resync it with
1412
  // the nested root of the corresponding nested catalog
1413
  //
1414
  // Note: this method relies on CommandMigrate being a friend of DirectoryEntry
1415
  mountpoint->mode_  = nested_root.mode_;
1416
  mountpoint->uid_   = nested_root.uid_;
1417
  mountpoint->gid_   = nested_root.gid_;
1418
  mountpoint->size_  = nested_root.size_;
1419
  mountpoint->mtime_ = nested_root.mtime_;
1420
}
1421
1422
1423
bool CommandMigrate::MigrationWorker_20x::RemoveDanglingNestedMountpoints(
1424
  PendingCatalog *data) const
1425
{
1426
  assert(data->HasNew());
1427
  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1428
  bool retval = false;
1429
1430
  // build a set of registered nested catalog path hashes
1431
  typedef catalog::Catalog::NestedCatalogList NestedCatalogList;
1432
  typedef std::map<shash::Md5, catalog::Catalog::NestedCatalog>
1433
    NestedCatalogMap;
1434
  const NestedCatalogList& nested_clgs =
1435
    data->old_catalog->ListNestedCatalogs();
1436
  NestedCatalogList::const_iterator i = nested_clgs.begin();
1437
  const NestedCatalogList::const_iterator iend = nested_clgs.end();
1438
  NestedCatalogMap nested_catalog_path_hashes;
1439
  for (; i != iend; ++i) {
1440
    const PathString &path = i->mountpoint;
1441
    const shash::Md5  hash(path.GetChars(), path.GetLength());
1442
    nested_catalog_path_hashes[hash] = *i;
1443
  }
1444
1445
  // Retrieve nested catalog mountpoints that have child entries directly inside
1446
  // the current catalog (which is a malformed state)
1447
  catalog::SqlLookupDanglingMountpoints sql_dangling_mountpoints(writable);
1448
  catalog::SqlDirentUpdate save_updated_mountpoint(writable);
1449
1450
  // go through the list of dangling nested catalog mountpoints and fix them
1451
  // where needed (check if there is no nested catalog registered for them)
1452
  while (sql_dangling_mountpoints.FetchRow()) {
1453
    catalog::DirectoryEntry dangling_mountpoint =
1454
      sql_dangling_mountpoints.GetDirent(data->new_catalog);
1455
    const shash::Md5 path_hash = sql_dangling_mountpoints.GetPathHash();
1456
    assert(dangling_mountpoint.IsNestedCatalogMountpoint());
1457
1458
    // check if the nested catalog mountpoint is registered in the nested cata-
1459
    // log list of the currently migrated catalog
1460
    const NestedCatalogMap::const_iterator nested_catalog =
1461
                                     nested_catalog_path_hashes.find(path_hash);
1462
    if (nested_catalog != nested_catalog_path_hashes.end()) {
1463
      LogCvmfs(kLogCatalog, kLogStderr,
1464
               "WARNING: found a non-empty nested catalog mountpoint under "
1465
               "'%s'", nested_catalog->second.mountpoint.c_str());
1466
      continue;
1467
    }
1468
1469
    // the mountpoint was confirmed to be dangling and needs to be removed
1470
    dangling_mountpoint.set_is_nested_catalog_mountpoint(false);
1471
1472
    // save the updated nested catalog root entry into the catalog
1473
1474
    retval = save_updated_mountpoint.BindPathHash(path_hash)         &&
1475
             save_updated_mountpoint.BindDirent(dangling_mountpoint) &&
1476
             save_updated_mountpoint.Execute()                       &&
1477
             save_updated_mountpoint.Reset();
1478
    if (!retval) {
1479
      Error("Failed to remove dangling nested catalog mountpoint entry in "
1480
            "catalog", save_updated_mountpoint, data);
1481
      return false;
1482
    }
1483
1484
    // tell the user that this intervention has been taken place
1485
    LogCvmfs(kLogCatalog, kLogStdout, "NOTE: fixed dangling nested catalog "
1486
                                      "mountpoint entry called: '%s' ",
1487
                                      dangling_mountpoint.name().c_str());
1488
  }
1489
1490
  return true;
1491
}
1492
1493
1494
const catalog::DirectoryEntry& CommandMigrate::GetNestedCatalogMarkerDirent() {
1495
  // This is pre-initialized singleton... it MUST be already there...
1496
  assert(nested_catalog_marker_.name_.ToString() == ".cvmfscatalog");
1497
  return nested_catalog_marker_;
1498
}
1499
1500
bool CommandMigrate::GenerateNestedCatalogMarkerChunk() {
1501
  // Create an empty nested catalog marker file
1502
  nested_catalog_marker_tmp_path_ =
1503
      CreateTempPath(temporary_directory_ + "/.cvmfscatalog", 0644);
1504
  if (nested_catalog_marker_tmp_path_.empty()) {
1505
    Error("Failed to create temp file for nested catalog marker dummy.");
1506
    return false;
1507
  }
1508
1509
  // Process and upload it to the backend storage
1510
  IngestionSource *source =
1511
      new FileIngestionSource(nested_catalog_marker_tmp_path_);
1512
  spooler_->Process(source);
1513
  return true;
1514
}
1515
1516
void CommandMigrate::CreateNestedCatalogMarkerDirent(
1517
  const shash::Any &content_hash)
1518
{
1519
  // Generate it only once
1520
  assert(nested_catalog_marker_.name_.ToString() != ".cvmfscatalog");
1521
1522
  // Fill the DirectoryEntry structure will all needed information
1523
  nested_catalog_marker_.name_.Assign(".cvmfscatalog", strlen(".cvmfscatalog"));
1524
  nested_catalog_marker_.mode_      = 33188;
1525
  nested_catalog_marker_.uid_       = uid_;
1526
  nested_catalog_marker_.gid_       = gid_;
1527
  nested_catalog_marker_.size_      = 0;
1528
  nested_catalog_marker_.mtime_     = time(NULL);
1529
  nested_catalog_marker_.linkcount_ = 1;
1530
  nested_catalog_marker_.checksum_  = content_hash;
1531
}
1532
1533
1534
bool CommandMigrate::MigrationWorker_20x::GenerateCatalogStatistics(
1535
  PendingCatalog *data) const
1536
{
1537
  assert(data->HasNew());
1538
  bool retval = false;
1539
  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1540
1541
  // Aggregated the statistics counters of all nested catalogs
1542
  // Note: we might need to wait until nested catalogs are sucessfully processed
1543
  catalog::DeltaCounters stats_counters;
1544
  PendingCatalogList::const_iterator i    = data->nested_catalogs.begin();
1545
  PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
1546
  for (; i != iend; ++i) {
1547
    const PendingCatalog *nested_catalog = *i;
1548
    const catalog::DeltaCounters &s = nested_catalog->nested_statistics.Get();
1549
    s.PopulateToParent(&stats_counters);
1550
  }
1551
1552
  // Count various directory entry types in the catalog to fill up the catalog
1553
  // statistics counters introduced in the current catalog schema
1554
  catalog::SqlCatalog count_regular_files(writable,
1555
    "SELECT count(*) FROM catalog "
1556
    "                WHERE  flags & :flag_file "
1557
    "                       AND NOT flags & :flag_link;");
1558
  catalog::SqlCatalog count_symlinks(writable,
1559
    "SELECT count(*) FROM catalog WHERE flags & :flag_link;");
1560
  catalog::SqlCatalog count_directories(writable,
1561
    "SELECT count(*) FROM catalog WHERE flags & :flag_dir;");
1562
  catalog::SqlCatalog aggregate_file_size(writable,
1563
    "SELECT sum(size) FROM catalog WHERE  flags & :flag_file "
1564
    "                                     AND NOT flags & :flag_link");
1565
1566
  // Run the actual counting queries
1567
  retval =
1568
    count_regular_files.BindInt64(1, catalog::SqlDirent::kFlagFile) &&
1569
    count_regular_files.BindInt64(2, catalog::SqlDirent::kFlagLink) &&
1570
    count_regular_files.FetchRow();
1571
  if (!retval) {
1572
    Error("Failed to count regular files.", count_regular_files, data);
1573
    return false;
1574
  }
1575
  retval =
1576
    count_symlinks.BindInt64(1, catalog::SqlDirent::kFlagLink) &&
1577
    count_symlinks.FetchRow();
1578
  if (!retval) {
1579
    Error("Failed to count symlinks.", count_symlinks, data);
1580
    return false;
1581
  }
1582
  retval =
1583
    count_directories.BindInt64(1, catalog::SqlDirent::kFlagDir) &&
1584
    count_directories.FetchRow();
1585
  if (!retval) {
1586
    Error("Failed to count directories.", count_directories, data);
1587
    return false;
1588
  }
1589
  retval =
1590
    aggregate_file_size.BindInt64(1, catalog::SqlDirent::kFlagFile) &&
1591
    aggregate_file_size.BindInt64(2, catalog::SqlDirent::kFlagLink) &&
1592
    aggregate_file_size.FetchRow();
1593
  if (!retval) {
1594
    Error("Failed to aggregate the file sizes.", aggregate_file_size, data);
1595
    return false;
1596
  }
1597
1598
  // Insert the counted statistics into the DeltaCounters data structure
1599
  stats_counters.self.regular_files    = count_regular_files.RetrieveInt64(0);
1600
  stats_counters.self.symlinks         = count_symlinks.RetrieveInt64(0);
1601
  stats_counters.self.directories      = count_directories.RetrieveInt64(0);
1602
  stats_counters.self.nested_catalogs  = data->nested_catalogs.size();
1603
  stats_counters.self.file_size        = aggregate_file_size.RetrieveInt64(0);
1604
1605
  // Write back the generated statistics counters into the catalog database
1606
  stats_counters.WriteToDatabase(writable);
1607
1608
  // Push the generated statistics counters up to the parent catalog
1609
  data->nested_statistics.Set(stats_counters);
1610
1611
  return true;
1612
}
1613
1614
1615
bool CommandMigrate::MigrationWorker_20x::FindRootEntryInformation(
1616
  PendingCatalog *data) const
1617
{
1618
  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1619
  bool retval;
1620
1621
  std::string root_path = data->root_path();
1622
  shash::Md5 root_path_hash = shash::Md5(root_path.data(), root_path.size());
1623
1624
  catalog::SqlLookupPathHash lookup_root_entry(writable);
1625
  retval = lookup_root_entry.BindPathHash(root_path_hash) &&
1626
           lookup_root_entry.FetchRow();
1627
  if (!retval) {
1628
    Error("Failed to retrieve root directory entry of migrated catalog",
1629
          lookup_root_entry, data);
1630
    return false;
1631
  }
1632
1633
  catalog::DirectoryEntry entry =
1634
    lookup_root_entry.GetDirent(data->new_catalog);
1635
  if (entry.linkcount() < 2 || entry.hardlink_group() > 0) {
1636
    Error("Retrieved linkcount of catalog root entry is not sane.", data);
1637
    return false;
1638
  }
1639
1640
  data->root_entry.Set(entry);
1641
  return true;
1642
}
1643
1644
1645
bool CommandMigrate::MigrationWorker_20x::CommitDatabaseTransaction(
1646
  PendingCatalog *data) const
1647
{
1648
  assert(data->HasNew());
1649
  data->new_catalog->Commit();
1650
  return true;
1651
}
1652
1653
1654
bool CommandMigrate::MigrationWorker_20x::DetachOldCatalogDatabase(
1655
  PendingCatalog *data) const
1656
{
1657
  assert(data->HasNew());
1658
  const catalog::CatalogDatabase &writable = data->new_catalog->database();
1659
  catalog::SqlCatalog detach_old_catalog(writable, "DETACH old;");
1660
  const bool retval = detach_old_catalog.Execute();
1661
  if (!retval) {
1662
    Error("Failed to detach old catalog database.", detach_old_catalog, data);
1663
    return false;
1664
  }
1665
  return true;
1666
}
1667
1668
1669
//------------------------------------------------------------------------------
1670
1671
1672
CommandMigrate::MigrationWorker_217::MigrationWorker_217(
1673
  const worker_context *context)
1674
  : AbstractMigrationWorker<MigrationWorker_217>(context)
1675
{ }
1676
1677
1678
bool CommandMigrate::MigrationWorker_217::RunMigration(PendingCatalog *data)
1679
  const
1680
{
1681
  return CheckDatabaseSchemaCompatibility(data) &&
1682
         StartDatabaseTransaction(data) &&
1683
         GenerateNewStatisticsCounters(data) &&
1684
         UpdateCatalogSchema(data) &&
1685
         CommitDatabaseTransaction(data);
1686
}
1687
1688
1689
bool CommandMigrate::MigrationWorker_217::CheckDatabaseSchemaCompatibility(
1690
  PendingCatalog *data) const
1691
{
1692
  assert(!data->HasNew());
1693
  const catalog::CatalogDatabase &old_catalog = data->old_catalog->database();
1694
1695
  if ((old_catalog.schema_version() < 2.4 -
1696
       catalog::CatalogDatabase::kSchemaEpsilon)
1697
      ||
1698
      (old_catalog.schema_version() > 2.4 +
1699
       catalog::CatalogDatabase::kSchemaEpsilon))
1700
  {
1701
    Error("Given Catalog is not Schema 2.4.", data);
1702
    return false;
1703
  }
1704
1705
  return true;
1706
}
1707
1708
1709
bool CommandMigrate::MigrationWorker_217::StartDatabaseTransaction(
1710
  PendingCatalog *data) const
1711
{
1712
  assert(!data->HasNew());
1713
  GetWritable(data->old_catalog)->Transaction();
1714
  return true;
1715
}
1716
1717
1718
bool CommandMigrate::MigrationWorker_217::GenerateNewStatisticsCounters
1719
                                                  (PendingCatalog *data) const {
1720
  assert(!data->HasNew());
1721
  bool retval = false;
1722
  const catalog::CatalogDatabase &writable =
1723
    GetWritable(data->old_catalog)->database();
1724
1725
  // Aggregated the statistics counters of all nested catalogs
1726
  // Note: we might need to wait until nested catalogs are sucessfully processed
1727
  catalog::DeltaCounters stats_counters;
1728
  PendingCatalogList::const_iterator i = data->nested_catalogs.begin();
1729
  PendingCatalogList::const_iterator iend = data->nested_catalogs.end();
1730
  for (; i != iend; ++i) {
1731
    const PendingCatalog *nested_catalog = *i;
1732
    const catalog::DeltaCounters &s = nested_catalog->nested_statistics.Get();
1733
    s.PopulateToParent(&stats_counters);
1734
  }
1735
1736
  // Count various directory entry types in the catalog to fill up the catalog
1737
  // statistics counters introduced in the current catalog schema
1738
  catalog::SqlCatalog count_chunked_files(writable,
1739
    "SELECT count(*), sum(size) FROM catalog "
1740
    "                WHERE flags & :flag_chunked_file;");
1741
  catalog::SqlCatalog count_file_chunks(writable,
1742
    "SELECT count(*) FROM chunks;");
1743
  catalog::SqlCatalog aggregate_file_size(writable,
1744
    "SELECT sum(size) FROM catalog WHERE  flags & :flag_file "
1745
    "                                     AND NOT flags & :flag_link;");
1746
1747
  // Run the actual counting queries
1748
  retval =
1749
    count_chunked_files.BindInt64(1, catalog::SqlDirent::kFlagFileChunk) &&
1750
    count_chunked_files.FetchRow();
1751
  if (!retval) {
1752
    Error("Failed to count chunked files.", count_chunked_files, data);
1753
    return false;
1754
  }
1755
  retval = count_file_chunks.FetchRow();
1756
  if (!retval) {
1757
    Error("Failed to count file chunks", count_file_chunks, data);
1758
    return false;
1759
  }
1760
  retval =
1761
    aggregate_file_size.BindInt64(1, catalog::SqlDirent::kFlagFile) &&
1762
    aggregate_file_size.BindInt64(2, catalog::SqlDirent::kFlagLink) &&
1763
    aggregate_file_size.FetchRow();
1764
  if (!retval) {
1765
    Error("Failed to aggregate the file sizes.", aggregate_file_size, data);
1766
    return false;
1767
  }
1768
1769
  // Insert the counted statistics into the DeltaCounters data structure
1770
  stats_counters.self.chunked_files     = count_chunked_files.RetrieveInt64(0);
1771
  stats_counters.self.chunked_file_size = count_chunked_files.RetrieveInt64(1);
1772
  stats_counters.self.file_chunks       = count_file_chunks.RetrieveInt64(0);
1773
  stats_counters.self.file_size         = aggregate_file_size.RetrieveInt64(0);
1774
1775
  // Write back the generated statistics counters into the catalog database
1776
  catalog::Counters counters;
1777
  retval = counters.ReadFromDatabase(writable, catalog::LegacyMode::kLegacy);
1778
  if (!retval) {
1779
    Error("Failed to read old catalog statistics counters", data);
1780
    return false;
1781
  }
1782
  counters.ApplyDelta(stats_counters);
1783
  retval = counters.InsertIntoDatabase(writable);
1784
  if (!retval) {
1785
    Error("Failed to write new statistics counters to database", data);
1786
    return false;
1787
  }
1788
1789
  // Push the generated statistics counters up to the parent catalog
1790
  data->nested_statistics.Set(stats_counters);
1791
1792
  return true;
1793
}
1794
1795
1796
bool CommandMigrate::MigrationWorker_217::UpdateCatalogSchema
1797
                                                  (PendingCatalog *data) const {
1798
  assert(!data->HasNew());
1799
  const catalog::CatalogDatabase &writable =
1800
    GetWritable(data->old_catalog)->database();
1801
  catalog::SqlCatalog update_schema_version(writable,
1802
    "UPDATE properties SET value = :schema_version WHERE key = 'schema';");
1803
1804
  const bool retval =
1805
    update_schema_version.BindDouble(1, 2.5) &&
1806
    update_schema_version.Execute();
1807
  if (!retval) {
1808
    Error("Failed to update catalog schema version",
1809
          update_schema_version,
1810
          data);
1811
    return false;
1812
  }
1813
1814
  return true;
1815
}
1816
1817
1818
bool CommandMigrate::MigrationWorker_217::CommitDatabaseTransaction
1819
                                                  (PendingCatalog *data) const {
1820
  assert(!data->HasNew());
1821
  GetWritable(data->old_catalog)->Commit();
1822
  return true;
1823
}
1824
1825
1826
//------------------------------------------------------------------------------
1827
1828
1829
CommandMigrate::ChownMigrationWorker::ChownMigrationWorker(
1830
                                                const worker_context *context)
1831
  : AbstractMigrationWorker<ChownMigrationWorker>(context)
1832
  , uid_map_statement_(GenerateMappingStatement(context->uid_map, "uid"))
1833
  , gid_map_statement_(GenerateMappingStatement(context->gid_map, "gid"))
1834
{}
1835
1836
bool CommandMigrate::ChownMigrationWorker::RunMigration(
1837
                                                   PendingCatalog *data) const {
1838
  return ApplyPersonaMappings(data);
1839
}
1840
1841
1842
bool CommandMigrate::ChownMigrationWorker::ApplyPersonaMappings(
1843
                                                   PendingCatalog *data) const {
1844
  assert(data->old_catalog != NULL);
1845
  assert(data->new_catalog == NULL);
1846
1847
  if (data->old_catalog->mountpoint() ==
1848
      PathString("/" + string(catalog::VirtualCatalog::kVirtualPath)))
1849
  {
1850
    // skipping virtual catalog
1851
    return true;
1852
  }
1853
1854
  const catalog::CatalogDatabase &db =
1855
                                     GetWritable(data->old_catalog)->database();
1856
1857
  if (!db.BeginTransaction()) {
1858
    return false;
1859
  }
1860
1861
  catalog::SqlCatalog uid_sql(db, uid_map_statement_);
1862
  if (!uid_sql.Execute()) {
1863
    Error("Failed to update UIDs", uid_sql, data);
1864
    return false;
1865
  }
1866
1867
  catalog::SqlCatalog gid_sql(db, gid_map_statement_);
1868
  if (!gid_sql.Execute()) {
1869
    Error("Failed to update GIDs", gid_sql, data);
1870
    return false;
1871
  }
1872
1873
  return db.CommitTransaction();
1874
}
1875
1876
1877
template <class MapT>
1878
std::string CommandMigrate::ChownMigrationWorker::GenerateMappingStatement(
1879
                                             const MapT         &map,
1880
                                             const std::string  &column) const {
1881
  assert(map.RuleCount() > 0 || map.HasDefault());
1882
1883
  std::string stmt = "UPDATE OR ABORT catalog SET " + column + " = ";
1884
1885
  if (map.RuleCount() == 0) {
1886
    // map everything to the same value (just a simple UPDATE clause)
1887
    stmt += StringifyInt(map.GetDefault());
1888
  } else {
1889
    // apply multiple ID mappings (UPDATE clause with CASE statement)
1890
    stmt += "CASE " + column + " ";
1891
    typedef typename MapT::map_type::const_iterator map_iterator;
1892
          map_iterator i    = map.GetRuleMap().begin();
1893
    const map_iterator iend = map.GetRuleMap().end();
1894
    for (; i != iend; ++i) {
1895
      stmt += "WHEN " + StringifyInt(i->first) +
1896
             " THEN " + StringifyInt(i->second) + " ";
1897
    }
1898
1899
    // add a default (if provided) or leave unchanged if no mapping fits
1900
    stmt += (map.HasDefault())
1901
                ? "ELSE " + StringifyInt(map.GetDefault()) + " "
1902
                : "ELSE " + column + " ";
1903
    stmt += "END";
1904
  }
1905
1906
  stmt += ";";
1907
  return stmt;
1908
}
1909
1910
1911
//------------------------------------------------------------------------------
1912
1913
1914
bool CommandMigrate::HardlinkRemovalMigrationWorker::RunMigration(
1915
                                                   PendingCatalog *data) const {
1916
  return CheckDatabaseSchemaCompatibility(data) &&
1917
         BreakUpHardlinks(data);
1918
}
1919
1920
1921
bool
1922
CommandMigrate::HardlinkRemovalMigrationWorker::CheckDatabaseSchemaCompatibility
1923
                                                  (PendingCatalog *data) const {
1924
  assert(data->old_catalog != NULL);
1925
  assert(data->new_catalog == NULL);
1926
1927
  const catalog::CatalogDatabase &clg = data->old_catalog->database();
1928
  return clg.schema_version() >= 2.4 - catalog::CatalogDatabase::kSchemaEpsilon;
1929
}
1930
1931
1932
bool CommandMigrate::HardlinkRemovalMigrationWorker::BreakUpHardlinks(
1933
                                                   PendingCatalog *data) const {
1934
  assert(data->old_catalog != NULL);
1935
  assert(data->new_catalog == NULL);
1936
1937
  const catalog::CatalogDatabase &db =
1938
                                     GetWritable(data->old_catalog)->database();
1939
1940
  if (!db.BeginTransaction()) {
1941
    return false;
1942
  }
1943
1944
  // CernVM-FS catalogs do not contain inodes directly but they are assigned by
1945
  // the CVMFS catalog at runtime. Hardlinks are treated with so-called hardlink
1946
  // group IDs to indicate hardlink relationships that need to be respected at
1947
  // runtime by assigning identical inodes accordingly.
1948
  //
1949
  // This updates all directory entries of a given catalog that have a linkcount
1950
  // greater than 1 and are flagged as a 'file'. Note: Symlinks are flagged both
1951
  // as 'file' and as 'symlink', hence they are updated implicitly as well.
1952
  //
1953
  // The 'hardlinks' field in the catalog contains two 32 bit integers:
1954
  //   * the linkcount in the lower 32 bits
1955
  //   * the (so called) hardlink group ID in the higher 32 bits
1956
  //
1957
  // Files that have a linkcount of exactly 1 do not have any hardlinks and have
1958
  // the (implicit) hardlink group ID '0'. Hence, 'hardlinks == 1' means that a
1959
  // file doesn't have any hardlinks (linkcount = 1) and doesn't need treatment
1960
  // here.
1961
  //
1962
  // Files that have hardlinks (linkcount > 1) will have a very large integer in
1963
  // their 'hardlinks' field (hardlink group ID > 0 in higher 32 bits). Those
1964
  // files will be treated by setting their 'hardlinks' field to 1, effectively
1965
  // clearing all hardlink information from the directory entry.
1966
  const std::string stmt = "UPDATE OR ABORT catalog "
1967
                           "SET hardlinks = 1 "
1968
                           "WHERE flags & :file_flag "
1969
                           "  AND hardlinks > 1;";
1970
  catalog::SqlCatalog hardlink_removal_sql(db, stmt);
1971
  hardlink_removal_sql.BindInt64(1, catalog::SqlDirent::kFlagFile);
1972
  hardlink_removal_sql.Execute();
1973
1974
  return db.CommitTransaction();
1975
}
1976
1977
}  // namespace swissknife