GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/catalog_mgr_rw.cc Lines: 246 503 48.9 %
Date: 2019-02-03 02:48:13 Branches: 71 266 26.7 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM file system.
3
 */
4
5
#define __STDC_FORMAT_MACROS
6
7
#include "catalog_mgr_rw.h"
8
9
#include <inttypes.h>
10
#include <unistd.h>
11
12
#include <cassert>
13
#include <cstdio>
14
#include <cstdlib>
15
#include <string>
16
17
#include "catalog_balancer.h"
18
#include "catalog_rw.h"
19
#include "logging.h"
20
#include "manifest.h"
21
#include "smalloc.h"
22
#include "statistics.h"
23
#include "upload.h"
24
#include "util/posix.h"
25
26
using namespace std;  // NOLINT
27
28
namespace catalog {
29
30
14
WritableCatalogManager::WritableCatalogManager(
31
  const shash::Any          &base_hash,
32
  const std::string         &stratum0,
33
  const string              &dir_temp,
34
  upload::Spooler           *spooler,
35
  download::DownloadManager *download_manager,
36
  bool                       enforce_limits,
37
  const unsigned             nested_kcatalog_limit,
38
  const unsigned             root_kcatalog_limit,
39
  const unsigned             file_mbyte_limit,
40
  perf::Statistics          *statistics,
41
  bool                       is_balanceable,
42
  unsigned                   max_weight,
43
  unsigned                   min_weight)
44
  : SimpleCatalogManager(base_hash, stratum0, dir_temp, download_manager,
45
      statistics)
46
  , spooler_(spooler)
47
  , enforce_limits_(enforce_limits)
48
  , nested_kcatalog_limit_(nested_kcatalog_limit)
49
  , root_kcatalog_limit_(root_kcatalog_limit)
50
  , file_mbyte_limit_(file_mbyte_limit)
51
  , is_balanceable_(is_balanceable)
52
  , max_weight_(max_weight)
53
  , min_weight_(min_weight)
54
14
  , balance_weight_(max_weight / 2)
55
{
56
  sync_lock_ =
57
14
    reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
58
14
  int retval = pthread_mutex_init(sync_lock_, NULL);
59
14
  assert(retval == 0);
60
  catalog_processing_lock_ =
61
14
    reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
62
14
  retval = pthread_mutex_init(catalog_processing_lock_, NULL);
63
14
  assert(retval == 0);
64
}
65
66
67
28
WritableCatalogManager::~WritableCatalogManager() {
68
14
  pthread_mutex_destroy(sync_lock_);
69
14
  free(sync_lock_);
70
14
  pthread_mutex_destroy(catalog_processing_lock_);
71
14
  free(catalog_processing_lock_);
72

28
}
73
74
75
/**
76
 * This method is virtual in AbstractCatalogManager.  It returns a new catalog
77
 * structure in the form the different CatalogManagers need it.
78
 * In this case it returns a stub for a WritableCatalog.
79
 * @param mountpoint     the mount point of the catalog stub to create
80
 * @param catalog_hash   the content hash of the catalog to create
81
 * @param parent_catalog the parent of the catalog stub to create
82
 * @return a pointer to the catalog stub structure created
83
 */
84
23
Catalog* WritableCatalogManager::CreateCatalog(
85
  const PathString &mountpoint,
86
  const shash::Any &catalog_hash,
87
  Catalog          *parent_catalog)
88
{
89
  return new WritableCatalog(mountpoint.ToString(),
90
                             catalog_hash,
91
23
                             parent_catalog);
92
}
93
94
95
23
void WritableCatalogManager::ActivateCatalog(Catalog *catalog) {
96
23
  catalog->TakeDatabaseFileOwnership();
97
23
}
98
99
100
/**
101
 * This method is invoked if we create a completely new repository.
102
 * The new root catalog will already contain a root entry.
103
 * It is uploaded by a Forklift to the upstream storage.
104
 * @return true on success, false otherwise
105
 */
106
20
manifest::Manifest *WritableCatalogManager::CreateRepository(
107
  const string      &dir_temp,
108
  const bool         volatile_content,
109
  const std::string &voms_authz,
110
  upload::Spooler   *spooler)
111
{
112
  // Create a new root catalog at file_path
113
20
  string file_path = dir_temp + "/new_root_catalog";
114
115
20
  shash::Algorithms hash_algorithm = spooler->GetHashAlgorithm();
116
117
  // A newly created catalog always needs a root entry
118
  // we create and configure this here
119
20
  DirectoryEntry root_entry;
120
20
  root_entry.inode_             = DirectoryEntry::kInvalidInode;
121
20
  root_entry.mode_              = 16877;
122
20
  root_entry.size_              = 4096;
123
20
  root_entry.mtime_             = time(NULL);
124
20
  root_entry.uid_               = getuid();
125
20
  root_entry.gid_               = getgid();
126
20
  root_entry.checksum_          = shash::Any(hash_algorithm);
127
20
  root_entry.linkcount_         = 2;
128
20
  string root_path = "";
129
130
  // Create the database schema and the inital root entry
131
  {
132
20
    UniquePtr<CatalogDatabase> new_clg_db(CatalogDatabase::Create(file_path));
133

20
    if (!new_clg_db.IsValid() ||
134
        !new_clg_db->InsertInitialValues(root_path,
135
                                         volatile_content,
136
                                         voms_authz,
137
                                         root_entry))
138
    {
139
      LogCvmfs(kLogCatalog, kLogStderr, "creation of catalog '%s' failed",
140
               file_path.c_str());
141
      return NULL;
142
    }
143
  }
144
145
  // Compress root catalog;
146
20
  int64_t catalog_size = GetFileSize(file_path);
147
20
  if (catalog_size < 0) {
148
    unlink(file_path.c_str());
149
    return NULL;
150
  }
151
20
  string file_path_compressed = file_path + ".compressed";
152
20
  shash::Any hash_catalog(hash_algorithm, shash::kSuffixCatalog);
153
  bool retval = zlib::CompressPath2Path(file_path, file_path_compressed,
154
20
                                        &hash_catalog);
155
20
  if (!retval) {
156
    LogCvmfs(kLogCatalog, kLogStderr, "compression of catalog '%s' failed",
157
             file_path.c_str());
158
    unlink(file_path.c_str());
159
    return NULL;
160
  }
161
20
  unlink(file_path.c_str());
162
163
  // Create manifest
164
20
  const string manifest_path = dir_temp + "/manifest";
165
  manifest::Manifest *manifest =
166
20
    new manifest::Manifest(hash_catalog, catalog_size, "");
167
20
  if (!voms_authz.empty()) {
168
    manifest->set_has_alt_catalog_path(true);
169
  }
170
171
  // Upload catalog
172
20
  spooler->Upload(file_path_compressed, "data/" + hash_catalog.MakePath());
173
20
  spooler->WaitForUpload();
174
20
  unlink(file_path_compressed.c_str());
175
20
  if (spooler->GetNumberOfErrors() > 0) {
176
    LogCvmfs(kLogCatalog, kLogStderr, "failed to commit catalog %s",
177
             file_path_compressed.c_str());
178
    delete manifest;
179
    return NULL;
180
  }
181
182
20
  return manifest;
183
}
184
185
186
/**
187
 * Retrieve the catalog containing the given path.
188
 * Other than AbstractCatalogManager::FindCatalog() this mounts nested
189
 * catalogs if necessary and returns  WritableCatalog objects.
190
 * Furthermore it optionally returns the looked-up DirectoryEntry.
191
 *
192
 * @param path    the path to look for
193
 * @param result  the retrieved catalog (as a pointer)
194
 * @param dirent  is set to looked up DirectoryEntry for 'path' if non-NULL
195
 * @return        true if catalog was found
196
 */
197
84
bool WritableCatalogManager::FindCatalog(const string     &path,
198
                                         WritableCatalog **result,
199
                                         DirectoryEntry   *dirent) {
200
84
  const PathString ps_path(path);
201
202
  Catalog *best_fit =
203
84
    AbstractCatalogManager<Catalog>::FindCatalog(ps_path);
204
84
  assert(best_fit != NULL);
205
84
  Catalog *catalog = NULL;
206
84
  bool retval = MountSubtree(ps_path, best_fit, &catalog);
207
84
  if (!retval)
208
    return false;
209
210
84
  catalog::DirectoryEntry dummy;
211
84
  if (NULL == dirent) {
212
41
    dirent = &dummy;
213
  }
214
84
  bool found = catalog->LookupPath(ps_path, dirent);
215

84
  if (!found || !catalog->IsWritable())
216
    return false;
217
218
84
  *result = static_cast<WritableCatalog *>(catalog);
219
84
  return true;
220
}
221
222
223
WritableCatalog *WritableCatalogManager::GetHostingCatalog(
224
  const std::string &path)
225
{
226
  WritableCatalog *result = NULL;
227
  bool retval = FindCatalog(MakeRelativePath(path), &result, NULL);
228
  if (!retval) return NULL;
229
  return result;
230
}
231
232
233
/**
234
 * Remove the given file from the catalogs.
235
 * @param file_path the full path to the file to be removed
236
 * @return true on success, false otherwise
237
 */
238
2
void WritableCatalogManager::RemoveFile(const std::string &path) {
239
2
  const string file_path = MakeRelativePath(path);
240
2
  const string parent_path = GetParentPath(file_path);
241
242
2
  SyncLock();
243
  WritableCatalog *catalog;
244
2
  if (!FindCatalog(parent_path, &catalog)) {
245
    LogCvmfs(kLogCatalog, kLogStderr, "catalog for file '%s' cannot be found",
246
             file_path.c_str());
247
    assert(false);
248
  }
249
250
2
  catalog->RemoveEntry(file_path);
251
2
  SyncUnlock();
252
2
}
253
254
255
/**
256
 * Remove the given directory from the catalogs.
257
 * @param directory_path the full path to the directory to be removed
258
 * @return true on success, false otherwise
259
 */
260
1
void WritableCatalogManager::RemoveDirectory(const std::string &path) {
261
1
  const string directory_path = MakeRelativePath(path);
262
1
  const string parent_path = GetParentPath(directory_path);
263
264
1
  SyncLock();
265
  WritableCatalog *catalog;
266
1
  DirectoryEntry parent_entry;
267
1
  if (!FindCatalog(parent_path, &catalog, &parent_entry)) {
268
    LogCvmfs(kLogCatalog, kLogStderr,
269
             "catalog for directory '%s' cannot be found",
270
             directory_path.c_str());
271
    assert(false);
272
  }
273
274
1
  parent_entry.set_linkcount(parent_entry.linkcount() - 1);
275
276
1
  catalog->RemoveEntry(directory_path);
277
1
  catalog->UpdateEntry(parent_entry, parent_path);
278
1
  if (parent_entry.IsNestedCatalogRoot()) {
279
    LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating transition point %s",
280
             parent_path.c_str());
281
    WritableCatalog *parent_catalog =
282
      reinterpret_cast<WritableCatalog *>(catalog->parent());
283
    parent_entry.set_is_nested_catalog_mountpoint(true);
284
    parent_entry.set_is_nested_catalog_root(false);
285
    parent_catalog->UpdateEntry(parent_entry, parent_path);
286
  }
287
1
  SyncUnlock();
288
1
}
289
290
/**
291
 * Clone the file called `source` changing its name into `destination`, the
292
 * source file is keep intact.
293
 * @params destination, the name of the new file, complete path
294
 * @params source, the name of the file to clone, which must be already in the
295
 * repository
296
 * @return void
297
 */
298
void WritableCatalogManager::Clone(const std::string destination,
299
                                   const std::string source) {
300
  const std::string relative_source = MakeRelativePath(source);
301
302
  DirectoryEntry source_dirent;
303
  if (!LookupPath(relative_source, kLookupSole, &source_dirent)) {
304
    LogCvmfs(kLogCatalog, kLogStderr,
305
             "catalog for file '%s' cannot be found aborting", source.c_str());
306
    assert(false);
307
  }
308
  if (source_dirent.IsDirectory()) {
309
    LogCvmfs(kLogCatalog, kLogStderr,
310
             "Trying to clone a directory: '%s' aborting", source.c_str());
311
    assert(false);
312
  }
313
314
  // if the file is already there we remove it and we add it back
315
  DirectoryEntry check_dirent;
316
  bool destination_already_present =
317
      LookupPath(MakeRelativePath(destination), kLookupSole, &check_dirent);
318
  if (destination_already_present) {
319
    this->RemoveFile(destination);
320
  }
321
322
  DirectoryEntry destination_dirent(source_dirent);
323
  std::string destination_dirname;
324
  std::string destination_filename;
325
  SplitPath(destination, &destination_dirname, &destination_filename);
326
327
  destination_dirent.name_.Assign(
328
      NameString(destination_filename.c_str(), destination_filename.length()));
329
330
  this->AddFile(destination_dirent, empty_xattrs, destination_dirname);
331
}
332
333
/**
334
 * Add a new directory to the catalogs.
335
 * @param entry a DirectoryEntry structure describing the new directory
336
 * @param parent_directory the absolute path of the directory containing the
337
 *                         directory to be created
338
 * @return true on success, false otherwise
339
 */
340
35
void WritableCatalogManager::AddDirectory(const DirectoryEntryBase &entry,
341
                                          const std::string &parent_directory)
342
{
343
35
  const string parent_path = MakeRelativePath(parent_directory);
344
35
  string directory_path = parent_path + "/";
345
35
  directory_path.append(entry.name().GetChars(), entry.name().GetLength());
346
347
35
  SyncLock();
348
  WritableCatalog *catalog;
349
35
  DirectoryEntry parent_entry;
350
35
  if (!FindCatalog(parent_path, &catalog, &parent_entry)) {
351
    LogCvmfs(kLogCatalog, kLogStderr,
352
             "catalog for directory '%s' cannot be found",
353
             directory_path.c_str());
354
    assert(false);
355
  }
356
357
35
  DirectoryEntry fixed_hardlink_count(entry);
358
35
  fixed_hardlink_count.set_linkcount(2);
359
  // No support for extended attributes on directories yet
360
  catalog->AddEntry(fixed_hardlink_count, empty_xattrs,
361
35
                    directory_path, parent_path);
362
363
35
  parent_entry.set_linkcount(parent_entry.linkcount() + 1);
364
35
  catalog->UpdateEntry(parent_entry, parent_path);
365
35
  if (parent_entry.IsNestedCatalogRoot()) {
366
    LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating transition point %s",
367
             parent_path.c_str());
368
    WritableCatalog *parent_catalog =
369
      reinterpret_cast<WritableCatalog *>(catalog->parent());
370
    parent_entry.set_is_nested_catalog_mountpoint(true);
371
    parent_entry.set_is_nested_catalog_root(false);
372
    parent_catalog->UpdateEntry(parent_entry, parent_path);
373
  }
374
35
  SyncUnlock();
375
35
}
376
377
/**
378
 * Add a new file to the catalogs.
379
 * @param entry a DirectoryEntry structure describing the new file
380
 * @param parent_directory the absolute path of the directory containing the
381
 *                         file to be created
382
 * @return true on success, false otherwise
383
 */
384
39
void WritableCatalogManager::AddFile(
385
  const DirectoryEntry  &entry,
386
  const XattrList       &xattrs,
387
  const std::string     &parent_directory)
388
{
389
39
  const string parent_path = MakeRelativePath(parent_directory);
390
39
  const string file_path   = entry.GetFullPath(parent_path);
391
392
39
  SyncLock();
393
  WritableCatalog *catalog;
394
39
  if (!FindCatalog(parent_path, &catalog)) {
395
    LogCvmfs(kLogCatalog, kLogStderr, "catalog for file '%s' cannot be found",
396
             file_path.c_str());
397
    assert(false);
398
  }
399
400
  assert(!entry.IsRegular() || entry.IsChunkedFile() ||
401

39
         !entry.checksum().IsNull());
402

39
  assert(entry.IsRegular() || !entry.IsExternalFile());
403
404
  // check if file is too big
405
39
  unsigned mbytes = entry.size() / (1024 * 1024);
406

39
  if ((file_mbyte_limit_ > 0) && (mbytes > file_mbyte_limit_)) {
407
    LogCvmfs(kLogCatalog, kLogStderr,
408
             "%s: file at %s is larger than %u megabytes (%u). "
409
             "CernVM-FS works best with small files. "
410
             "Please remove the file or increase the limit.",
411
             enforce_limits_ ? "FATAL" : "WARNING",
412
             file_path.c_str(),
413
             file_mbyte_limit_,
414
             mbytes);
415
    assert(!enforce_limits_);
416
  }
417
418
39
  catalog->AddEntry(entry, xattrs, file_path, parent_path);
419
39
  SyncUnlock();
420
39
}
421
422
423
void WritableCatalogManager::AddChunkedFile(
424
  const DirectoryEntryBase  &entry,
425
  const XattrList           &xattrs,
426
  const std::string         &parent_directory,
427
  const FileChunkList       &file_chunks)
428
{
429
  assert(file_chunks.size() > 0);
430
431
  DirectoryEntry full_entry(entry);
432
  full_entry.set_is_chunked_file(true);
433
434
  AddFile(full_entry, xattrs, parent_directory);
435
436
  const string parent_path = MakeRelativePath(parent_directory);
437
  const string file_path   = entry.GetFullPath(parent_path);
438
439
  SyncLock();
440
  WritableCatalog *catalog;
441
  if (!FindCatalog(parent_path, &catalog)) {
442
    LogCvmfs(kLogCatalog, kLogStderr, "catalog for file '%s' cannot be found",
443
             file_path.c_str());
444
    assert(false);
445
  }
446
447
  for (unsigned i = 0; i < file_chunks.size(); ++i) {
448
    catalog->AddFileChunk(file_path, *file_chunks.AtPtr(i));
449
  }
450
  SyncUnlock();
451
}
452
453
454
/**
455
 * Add a hardlink group to the catalogs.
456
 * @param entries a list of DirectoryEntries describing the new files
457
 * @param parent_directory the absolute path of the directory containing the
458
 *                         files to be created
459
 * @return true on success, false otherwise
460
 */
461
void WritableCatalogManager::AddHardlinkGroup(
462
  const DirectoryEntryBaseList &entries,
463
  const XattrList &xattrs,
464
  const std::string &parent_directory,
465
  const FileChunkList &file_chunks)
466
{
467
  assert(entries.size() >= 1);
468
  assert(file_chunks.IsEmpty() || entries[0].IsRegular());
469
  if (entries.size() == 1) {
470
    DirectoryEntry fix_linkcount(entries[0]);
471
    fix_linkcount.set_linkcount(1);
472
    if (file_chunks.IsEmpty())
473
      return AddFile(fix_linkcount, xattrs, parent_directory);
474
    return AddChunkedFile(fix_linkcount, xattrs, parent_directory, file_chunks);
475
  }
476
477
  LogCvmfs(kLogCatalog, kLogVerboseMsg, "adding hardlink group %s/%s",
478
           parent_directory.c_str(), entries[0].name().c_str());
479
480
  // Hardlink groups have to reside in the same directory.
481
  // Therefore we only have one parent directory here
482
  const string parent_path = MakeRelativePath(parent_directory);
483
484
  // check if hard link is too big
485
  unsigned mbytes = entries[0].size() / (1024 * 1024);
486
  if ((file_mbyte_limit_ > 0) && (mbytes > file_mbyte_limit_)) {
487
    LogCvmfs(kLogCatalog, kLogStderr,
488
             "%s: hard link at %s is larger than %u megabytes (%u). "
489
             "CernVM-FS works best with small files. "
490
             "Please remove the file or increase the limit.",
491
             enforce_limits_ ? "FATAL" : "WARNING",
492
             (parent_path + entries[0].name().ToString()).c_str(),
493
             file_mbyte_limit_,
494
             mbytes);
495
    assert(!enforce_limits_);
496
  }
497
498
  SyncLock();
499
  WritableCatalog *catalog;
500
  if (!FindCatalog(parent_path, &catalog)) {
501
    LogCvmfs(kLogCatalog, kLogStderr,
502
             "catalog for hardlink group containing '%s' cannot be found",
503
             parent_path.c_str());
504
    assert(false);
505
  }
506
507
  // Get a valid hardlink group id for the catalog the group will end up in
508
  // TODO(unkown): Compaction
509
  uint32_t new_group_id = catalog->GetMaxLinkId() + 1;
510
  LogCvmfs(kLogCatalog, kLogVerboseMsg, "hardlink group id %u issued",
511
           new_group_id);
512
  assert(new_group_id > 0);
513
514
  // Add the file entries to the catalog
515
  for (DirectoryEntryBaseList::const_iterator i = entries.begin(),
516
       iEnd = entries.end(); i != iEnd; ++i)
517
  {
518
    string file_path = parent_path + "/";
519
    file_path.append(i->name().GetChars(), i->name().GetLength());
520
521
    // create a fully fledged DirectoryEntry to add the hardlink group to it
522
    // which is CVMFS specific meta data.
523
    DirectoryEntry hardlink(*i);
524
    hardlink.set_hardlink_group(new_group_id);
525
    hardlink.set_linkcount(entries.size());
526
    hardlink.set_is_chunked_file(!file_chunks.IsEmpty());
527
528
    catalog->AddEntry(hardlink, xattrs, file_path, parent_path);
529
    if (hardlink.IsChunkedFile()) {
530
      for (unsigned i = 0; i < file_chunks.size(); ++i) {
531
        catalog->AddFileChunk(file_path, *file_chunks.AtPtr(i));
532
      }
533
    }
534
  }
535
  SyncUnlock();
536
}
537
538
539
void WritableCatalogManager::ShrinkHardlinkGroup(const string &remove_path) {
540
  const string relative_path = MakeRelativePath(remove_path);
541
542
  SyncLock();
543
  WritableCatalog *catalog;
544
  if (!FindCatalog(relative_path, &catalog)) {
545
    LogCvmfs(kLogCatalog, kLogStderr,
546
             "catalog for hardlink group containing '%s' cannot be found",
547
             remove_path.c_str());
548
    assert(false);
549
  }
550
551
  catalog->IncLinkcount(relative_path, -1);
552
  SyncUnlock();
553
}
554
555
556
/**
557
 * Update entry meta data (mode, owner, ...).
558
 * CVMFS specific meta data (i.e. nested catalog transition points) are NOT
559
 * changed by this method, although transition points intrinsics are taken into
560
 * account, to keep nested catalogs consistent.
561
 * @param entry      the directory entry to be touched
562
 * @param path       the path of the directory entry to be touched
563
 */
564
void WritableCatalogManager::TouchDirectory(const DirectoryEntryBase &entry,
565
                                            const std::string &directory_path)
566
{
567
  assert(entry.IsDirectory());
568
569
  const string entry_path = MakeRelativePath(directory_path);
570
  const string parent_path = GetParentPath(entry_path);
571
572
  SyncLock();
573
  // find the catalog to be updated
574
  WritableCatalog *catalog;
575
  if (!FindCatalog(parent_path, &catalog)) {
576
    LogCvmfs(kLogCatalog, kLogStderr, "catalog for entry '%s' cannot be found",
577
             entry_path.c_str());
578
    assert(false);
579
  }
580
581
  catalog->TouchEntry(entry, entry_path);
582
583
  // since we deal with a directory here, we might just touch a
584
  // nested catalog transition point. If this is the case we would need to
585
  // update two catalog entries:
586
  //   * the nested catalog MOUNTPOINT in the parent catalog
587
  //   * the nested catalog ROOT in the nested catalog
588
589
  // first check if we really have a nested catalog transition point
590
  catalog::DirectoryEntry potential_transition_point;
591
  PathString transition_path(entry_path.data(), entry_path.length());
592
  bool retval = catalog->LookupPath(transition_path,
593
                                    &potential_transition_point);
594
  assert(retval);
595
  if (potential_transition_point.IsNestedCatalogMountpoint()) {
596
    LogCvmfs(kLogCatalog, kLogVerboseMsg,
597
             "updating transition point at %s", entry_path.c_str());
598
599
    // find and mount nested catalog assciated to this transition point
600
    shash::Any nested_hash;
601
    uint64_t nested_size;
602
    retval = catalog->FindNested(transition_path, &nested_hash, &nested_size);
603
    assert(retval);
604
    Catalog *nested_catalog;
605
    nested_catalog = MountCatalog(transition_path, nested_hash, catalog);
606
    assert(nested_catalog != NULL);
607
608
    // update nested catalog root in the child catalog
609
    reinterpret_cast<WritableCatalog *>(nested_catalog)->
610
      TouchEntry(entry, entry_path);
611
  }
612
613
  SyncUnlock();
614
}
615
616
617
/**
618
 * Create a new nested catalog.  Includes moving all entries belonging there
619
 * from it's parent catalog.
620
 * @param mountpoint the path of the directory to become a nested root
621
 * @return true on success, false otherwise
622
 */
623
7
void WritableCatalogManager::CreateNestedCatalog(const std::string &mountpoint)
624
{
625
7
  const string nested_root_path = MakeRelativePath(mountpoint);
626
7
  const PathString ps_nested_root_path(nested_root_path);
627
628
7
  SyncLock();
629
  // Find the catalog currently containing the directory structure, which
630
  // will be represented as a new nested catalog and its root-entry/mountpoint
631
  // along the way
632
7
  WritableCatalog *old_catalog = NULL;
633
7
  DirectoryEntry new_root_entry;
634
7
  if (!FindCatalog(nested_root_path, &old_catalog, &new_root_entry)) {
635
    LogCvmfs(kLogCatalog, kLogStderr, "failed to create nested catalog '%s': "
636
             "mountpoint was not found in current catalog structure",
637
             nested_root_path.c_str());
638
    assert(false);
639
  }
640
641
  // Create the database schema and the inital root entry
642
  // for the new nested catalog
643
  const string database_file_path = CreateTempPath(dir_temp() + "/catalog",
644
7
                                                   0666);
645
7
  const bool volatile_content = false;
646
7
  CatalogDatabase *new_catalog_db = CatalogDatabase::Create(database_file_path);
647
7
  assert(NULL != new_catalog_db);
648
  // Note we do not set the external_data bit for nested catalogs
649
  bool retval =
650
           new_catalog_db->InsertInitialValues(nested_root_path,
651
                                               volatile_content,
652
                                               "",  // At this point, only root
653
                                                    // catalog gets VOMS authz
654
7
                                               new_root_entry);
655
7
  assert(retval);
656
  // TODO(rmeusel): we need a way to attach a catalog directy from an open
657
  // database to remove this indirection
658
7
  delete new_catalog_db;
659
7
  new_catalog_db = NULL;
660
661
  // Attach the just created nested catalog
662
  Catalog *new_catalog =
663
7
    CreateCatalog(ps_nested_root_path, shash::Any(), old_catalog);
664
7
  retval = AttachCatalog(database_file_path, new_catalog);
665
7
  assert(retval);
666
667
7
  assert(new_catalog->IsWritable());
668
7
  WritableCatalog *wr_new_catalog = static_cast<WritableCatalog *>(new_catalog);
669
670
  // From now on, there are two catalogs, spanning the same directory structure
671
  // we have to split the overlapping directory entries from the old catalog
672
  // to the new catalog to re-gain a valid catalog structure
673
7
  old_catalog->Partition(wr_new_catalog);
674
675
  // Add the newly created nested catalog to the references of the containing
676
  // catalog
677
  old_catalog->InsertNestedCatalog(new_catalog->mountpoint().ToString(), NULL,
678
7
                                   shash::Any(spooler_->GetHashAlgorithm()), 0);
679
680
  // Fix subtree counters in new nested catalogs: subtree is the sum of all
681
  // entries of all "grand-nested" catalogs
682
  // Note: taking a copy of the nested catalog list here
683
  const Catalog::NestedCatalogList &grand_nested =
684
7
    wr_new_catalog->ListOwnNestedCatalogs();
685
7
  DeltaCounters fix_subtree_counters;
686
14
  for (Catalog::NestedCatalogList::const_iterator i = grand_nested.begin(),
687
7
       iEnd = grand_nested.end(); i != iEnd; ++i)
688
  {
689
    WritableCatalog *grand_catalog;
690
    retval = FindCatalog(i->mountpoint.ToString(), &grand_catalog);
691
    assert(retval);
692
    const Counters &grand_counters = grand_catalog->GetCounters();
693
    grand_counters.AddAsSubtree(&fix_subtree_counters);
694
  }
695
7
  DeltaCounters save_counters = wr_new_catalog->delta_counters_;
696
7
  wr_new_catalog->delta_counters_ = fix_subtree_counters;
697
7
  wr_new_catalog->UpdateCounters();
698
7
  wr_new_catalog->delta_counters_ = save_counters;
699
700
7
  SyncUnlock();
701
7
}
702
703
704
/**
705
 * Remove a nested catalog
706
 *
707
 * If the merged parameter is true, when you remove a nested catalog
708
 * all entries currently held by it will be merged into its parent
709
 * catalog.
710
 * @param mountpoint - the path of the nested catalog to be removed
711
 * @param merge - merge the subtree associated with the nested catalog
712
 *                into its parent catalog
713
 * @return - true on success, false otherwise
714
 */
715
void WritableCatalogManager::RemoveNestedCatalog(const string &mountpoint,
716
                                                 const bool merge) {
717
  const string nested_root_path = MakeRelativePath(mountpoint);
718
719
  SyncLock();
720
  // Find the catalog which should be removed
721
  WritableCatalog *nested_catalog = NULL;
722
  if (!FindCatalog(nested_root_path, &nested_catalog)) {
723
    LogCvmfs(kLogCatalog, kLogStderr, "failed to remove nested catalog '%s': "
724
             "mountpoint was not found in current catalog structure",
725
             nested_root_path.c_str());
726
    assert(false);
727
  }
728
729
  // Check if the found catalog is really the nested catalog to be deleted
730
  assert(!nested_catalog->IsRoot() &&
731
         (nested_catalog->mountpoint().ToString() == nested_root_path));
732
733
  if (merge) {
734
    // Merge all data from the nested catalog into it's parent
735
    nested_catalog->MergeIntoParent();
736
  } else {
737
    nested_catalog->RemoveFromParent();
738
  }
739
740
  // Delete the catalog database file from the working copy
741
  if (unlink(nested_catalog->database_path().c_str()) != 0) {
742
    LogCvmfs(kLogCatalog, kLogStderr,
743
             "unable to delete the removed nested catalog database file '%s'",
744
             nested_catalog->database_path().c_str());
745
    assert(false);
746
  }
747
748
  // Remove the catalog from internal data structures
749
  DetachCatalog(nested_catalog);
750
  SyncUnlock();
751
}
752
753
754
/**
755
 * Checks if a nested catalog starts at this path.  The path must be valid.
756
 */
757
bool WritableCatalogManager::IsTransitionPoint(const string &mountpoint) {
758
  const string path = MakeRelativePath(mountpoint);
759
760
  SyncLock();
761
  WritableCatalog *catalog;
762
  DirectoryEntry entry;
763
  if (!FindCatalog(path, &catalog, &entry)) {
764
    LogCvmfs(kLogCatalog, kLogStderr,
765
             "catalog for directory '%s' cannot be found", path.c_str());
766
    assert(false);
767
  }
768
  const bool result = entry.IsNestedCatalogRoot();
769
  SyncUnlock();
770
  return result;
771
}
772
773
774
void WritableCatalogManager::PrecalculateListings() {
775
  // TODO(jblomer): meant for micro catalogs
776
}
777
778
779
void WritableCatalogManager::SetTTL(const uint64_t new_ttl) {
780
  SyncLock();
781
  reinterpret_cast<WritableCatalog *>(GetRootCatalog())->SetTTL(new_ttl);
782
  SyncUnlock();
783
}
784
785
786
bool WritableCatalogManager::SetVOMSAuthz(const std::string &voms_authz) {
787
  bool result;
788
  SyncLock();
789
  result = reinterpret_cast<WritableCatalog *>(
790
    GetRootCatalog())->SetVOMSAuthz(voms_authz);
791
  SyncUnlock();
792
  return result;
793
}
794
795
796
8
bool WritableCatalogManager::Commit(const bool           stop_for_tweaks,
797
                                    const uint64_t       manual_revision,
798
                                    manifest::Manifest  *manifest) {
799
  WritableCatalog *root_catalog =
800
8
    reinterpret_cast<WritableCatalog *>(GetRootCatalog());
801
8
  root_catalog->SetDirty();
802
803
  // set root catalog revision to manually provided number if available
804
8
  if (manual_revision > 0) {
805
    const uint64_t revision = root_catalog->GetRevision();
806
    if (revision >= manual_revision) {
807
      LogCvmfs(kLogCatalog, kLogStderr, "Manual revision (%d) must not be "
808
                                        "smaller than the current root "
809
                                        "catalog's (%d). Skipped!",
810
                                        manual_revision, revision);
811
    } else {
812
      // Gets incremented by FinalizeCatalog() afterwards!
813
      root_catalog->SetRevision(manual_revision - 1);
814
    }
815
  }
816
817
  // do the actual catalog snapshotting and upload
818
8
  CatalogInfo root_catalog_info;
819
8
  if (getenv("_CVMFS_SERIALIZED_CATALOG_PROCESSING_") == NULL)
820
8
    root_catalog_info = SnapshotCatalogs(stop_for_tweaks);
821
  else
822
    root_catalog_info = SnapshotCatalogsSerialized(stop_for_tweaks);
823
8
  if (spooler_->GetNumberOfErrors() > 0) {
824
    LogCvmfs(kLogCatalog, kLogStderr, "failed to commit catalogs");
825
    return false;
826
  }
827
828
  // .cvmfspublished export
829
8
  LogCvmfs(kLogCatalog, kLogVerboseMsg, "Committing repository manifest");
830
8
  set_base_hash(root_catalog_info.content_hash);
831
832
8
  manifest->set_catalog_hash(root_catalog_info.content_hash);
833
8
  manifest->set_catalog_size(root_catalog_info.size);
834
8
  manifest->set_root_path("");
835
8
  manifest->set_ttl(root_catalog_info.ttl);
836
8
  manifest->set_revision(root_catalog_info.revision);
837
838
8
  return true;
839
}
840
841
842
/**
843
 * Handles the snapshotting of dirty (i.e. modified) catalogs while trying to
844
 * parallize the compression and upload as much as possible. We use a parallel
845
 * depth first post order tree traversal based on 'continuations'.
846
 *
847
 * The idea is as follows:
848
 *  1. find all leaf-catalogs (i.e. dirty catalogs with no dirty children)
849
 *     --> these can be processed and uploaded immedately and independently
850
 *         see WritableCatalogManager::GetModifiedCatalogLeafs()
851
 *  2. annotate non-leaf catalogs with their number of dirty children
852
 *     --> a finished child will notify it's parent and decrement this number
853
 *         see WritableCatalogManager::CatalogUploadCallback()
854
 *  3. if a non-leaf catalog's dirty children number reaches 0, it is scheduled
855
 *     for processing as well (continuation)
856
 *     --> the parallel processing walks bottom-up through the catalog tree
857
 *         see WritableCatalogManager::CatalogUploadCallback()
858
 *  4. when the root catalog is reached, we notify the main thread and return
859
 *     --> done through a Future<> in WritableCatalogManager::SnapshotCatalogs
860
 *
861
 * Note: The catalog finalisation (see WritableCatalogManager::FinalizeCatalog)
862
 *       happens in a worker thread (i.e. the callback method) for non-leaf
863
 *       catalogs.
864
 *
865
 * TODO(rmeusel): since all leaf catalogs are finalized in the main thread, we
866
 *                sacrafice some potential concurrency for simplicity.
867
 */
868
8
WritableCatalogManager::CatalogInfo WritableCatalogManager::SnapshotCatalogs(
869
                                                   const bool stop_for_tweaks) {
870
  // prepare environment for parallel processing
871
8
  Future<CatalogInfo>  root_catalog_info_future;
872
  CatalogUploadContext upload_context;
873
8
  upload_context.root_catalog_info = &root_catalog_info_future;
874
8
  upload_context.stop_for_tweaks   = stop_for_tweaks;
875
876
  spooler_->RegisterListener(
877
8
    &WritableCatalogManager::CatalogUploadCallback, this, upload_context);
878
879
  // find dirty leaf catalogs and annotate non-leaf catalogs (dirty child count)
880
  // post-condition: the entire catalog tree is ready for concurrent processing
881
8
  WritableCatalogList leafs_to_snapshot;
882
8
  GetModifiedCatalogLeafs(&leafs_to_snapshot);
883
884
  // finalize and schedule the catalog processing
885
8
        WritableCatalogList::const_iterator i    = leafs_to_snapshot.begin();
886
8
  const WritableCatalogList::const_iterator iend = leafs_to_snapshot.end();
887
18
  for (; i != iend; ++i) {
888
10
    FinalizeCatalog(*i, stop_for_tweaks);
889
10
    ScheduleCatalogProcessing(*i);
890
  }
891
892
8
  LogCvmfs(kLogCatalog, kLogVerboseMsg, "waiting for upload of catalogs");
893
8
  CatalogInfo& root_catalog_info = root_catalog_info_future.Get();
894
8
  spooler_->WaitForUpload();
895
896
8
  spooler_->UnregisterListeners();
897
8
  return root_catalog_info;
898
}
899
900
901
15
void WritableCatalogManager::FinalizeCatalog(WritableCatalog *catalog,
902
                                             const bool stop_for_tweaks) {
903
  // update meta information of this catalog
904
  LogCvmfs(kLogCatalog, kLogVerboseMsg, "creating snapshot of catalog '%s'",
905
15
           catalog->mountpoint().c_str());
906
907
15
  catalog->UpdateCounters();
908
15
  catalog->UpdateLastModified();
909
15
  catalog->IncrementRevision();
910
911
  // update the previous catalog revision pointer
912
15
  if (catalog->IsRoot()) {
913
    LogCvmfs(kLogCatalog, kLogVerboseMsg, "setting '%s' as previous revision "
914
                                          "for root catalog",
915
8
             base_hash().ToStringWithSuffix().c_str());
916
8
    catalog->SetPreviousRevision(base_hash());
917
  } else {
918
    // Multiple catalogs might query the parent concurrently
919
7
    SyncLock();
920
7
    shash::Any hash_previous;
921
    uint64_t size_previous;
922
    const bool retval =
923
      catalog->parent()->FindNested(catalog->mountpoint(),
924
7
                                    &hash_previous, &size_previous);
925
7
    assert(retval);
926
7
    SyncUnlock();
927
928
    LogCvmfs(kLogCatalog, kLogVerboseMsg, "found '%s' as previous revision "
929
                                          "for nested catalog '%s'",
930
             hash_previous.ToStringWithSuffix().c_str(),
931
7
             catalog->mountpoint().c_str());
932
7
    catalog->SetPreviousRevision(hash_previous);
933
  }
934
15
  catalog->Commit();
935
936
  // check if catalog has too many entries
937
  uint64_t catalog_limit = uint64_t(1000) *
938
    uint64_t((catalog->IsRoot()
939
              ? root_kcatalog_limit_
940
15
              : nested_kcatalog_limit_));
941

15
  if ((catalog_limit > 0) &&
942
      (catalog->GetCounters().GetSelfEntries() > catalog_limit)) {
943
    LogCvmfs(kLogCatalog, kLogStderr,
944
             "%s: catalog at %s has more than %u entries (%u). "
945
             "Large catalogs stress the CernVM-FS transport infrastructure. "
946
             "Please split it into nested catalogs or increase the limit.",
947
             enforce_limits_ ? "FATAL" : "WARNING",
948
             (catalog->IsRoot() ? "/" : catalog->mountpoint().c_str()),
949
             catalog_limit,
950
             catalog->GetCounters().GetSelfEntries());
951
    assert(!enforce_limits_);
952
  }
953
954
  // allow for manual adjustments in the catalog
955
15
  if (stop_for_tweaks) {
956
    LogCvmfs(kLogCatalog, kLogStdout, "Allowing for tweaks in %s at %s "
957
                                      "(hit return to continue)",
958
             catalog->database_path().c_str(), catalog->mountpoint().c_str());
959
    int read_char = getchar();
960
    assert(read_char != EOF);
961
  }
962
963
  // compaction of bloated catalogs (usually after high database churn)
964
15
  catalog->VacuumDatabaseIfNecessary();
965
15
}
966
967
968
15
void WritableCatalogManager::ScheduleCatalogProcessing(
969
                                                     WritableCatalog *catalog) {
970
  {
971
15
    MutexLockGuard guard(catalog_processing_lock_);
972
    // register catalog object for WritableCatalogManager::CatalogUploadCallback
973
15
    catalog_processing_map_[catalog->database_path()] = catalog;
974
  }
975
15
  spooler_->ProcessCatalog(catalog->database_path());
976
15
}
977
978
979
15
void WritableCatalogManager::CatalogUploadCallback(
980
                          const upload::SpoolerResult &result,
981
                          const CatalogUploadContext   catalog_upload_context) {
982
15
  if (result.return_code != 0) {
983
    LogCvmfs(kLogCatalog, kLogStderr, "failed to upload '%s' (retval: %d)",
984
             result.local_path.c_str(), result.return_code);
985
    assert(false);
986
  }
987
988
  // retrieve the catalog object based on the callback information
989
  // see WritableCatalogManager::ScheduleCatalogProcessing()
990
15
  WritableCatalog *catalog = NULL;
991
  {
992
15
    MutexLockGuard guard(catalog_processing_lock_);
993
    std::map<std::string, WritableCatalog*>::iterator c =
994
15
      catalog_processing_map_.find(result.local_path);
995
15
    assert(c != catalog_processing_map_.end());
996
15
    catalog = c->second;
997
  }
998
999
15
  uint64_t catalog_size = GetFileSize(result.local_path);
1000
15
  assert(catalog_size > 0);
1001
1002
15
  SyncLock();
1003
15
  if (catalog->HasParent()) {
1004
    // finalized nested catalogs will update their parent's pointer and schedule
1005
    // them for processing (continuation) if the 'dirty children count' == 0
1006
7
    LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating nested catalog link");
1007
7
    WritableCatalog *parent = catalog->GetWritableParent();
1008
1009
    parent->UpdateNestedCatalog(catalog->mountpoint().ToString(),
1010
                                result.content_hash,
1011
                                catalog_size,
1012
7
                                catalog->delta_counters_);
1013
7
    catalog->delta_counters_.SetZero();
1014
1015
    const int remaining_dirty_children =
1016
7
      catalog->GetWritableParent()->DecrementDirtyChildren();
1017
1018
7
    SyncUnlock();
1019
1020
    // continuation of the dirty catalog tree traversal
1021
    // see WritableCatalogManager::SnapshotCatalogs()
1022
7
    if (remaining_dirty_children == 0) {
1023
5
      FinalizeCatalog(parent, catalog_upload_context.stop_for_tweaks);
1024
5
      ScheduleCatalogProcessing(parent);
1025
    }
1026
1027
8
  } else if (catalog->IsRoot()) {
1028
    // once the root catalog is reached, we are done with processing and report
1029
    // back to the main via a Future<> and provide the necessary information
1030
8
    CatalogInfo root_catalog_info;
1031
8
    root_catalog_info.size         = catalog_size;
1032
8
    root_catalog_info.ttl          = catalog->GetTTL();
1033
8
    root_catalog_info.content_hash = result.content_hash;
1034
8
    root_catalog_info.revision     = catalog->GetRevision();
1035
8
    catalog_upload_context.root_catalog_info->Set(root_catalog_info);
1036
8
    SyncUnlock();
1037
  } else {
1038
    assert(false && "inconsistent state detected");
1039
  }
1040
15
}
1041
1042
1043
/**
1044
 * Finds dirty catalogs that can be snapshot right away and annotates all the
1045
 * other catalogs with their number of dirty decendants.
1046
 * Note that there is a convenience wrapper to start the recursion:
1047
 *   WritableCatalogManager::GetModifiedCatalogLeafs()
1048
 *
1049
 * @param catalog  the catalog for this recursion step
1050
 * @param result   the result list to be appended to
1051
 * @return         true if 'catalog' is dirty
1052
 */
1053
15
bool WritableCatalogManager::GetModifiedCatalogLeafsRecursively(
1054
                                            Catalog             *catalog,
1055
                                            WritableCatalogList *result) const {
1056
15
  WritableCatalog *wr_catalog = static_cast<WritableCatalog *>(catalog);
1057
1058
  // Look for dirty catalogs in the descendants of *catalog
1059
15
  int dirty_children = 0;
1060
15
  CatalogList children = wr_catalog->GetChildren();
1061
15
        CatalogList::const_iterator i    = children.begin();
1062
15
  const CatalogList::const_iterator iend = children.end();
1063
22
  for (; i != iend; ++i) {
1064
7
    if (GetModifiedCatalogLeafsRecursively(*i, result)) {
1065
7
      ++dirty_children;
1066
    }
1067
  }
1068
1069
  // a catalog is dirty if itself or one of its children has changed
1070
  // a leaf catalog doesn't have any dirty children
1071
15
  wr_catalog->set_dirty_children(dirty_children);
1072

15
  const bool is_dirty = wr_catalog->IsDirty() || dirty_children > 0;
1073
15
  const bool is_leaf  = dirty_children == 0;
1074

15
  if (is_dirty && is_leaf) {
1075
10
    result->push_back(const_cast<WritableCatalog *>(wr_catalog));
1076
  }
1077
1078
15
  return is_dirty;
1079
}
1080
1081
1082
void WritableCatalogManager::DoBalance() {
1083
  CatalogList catalog_list = GetCatalogs();
1084
  reverse(catalog_list.begin(), catalog_list.end());
1085
  for (unsigned i = 0; i < catalog_list.size(); ++i) {
1086
    FixWeight(static_cast<WritableCatalog*>(catalog_list[i]));
1087
  }
1088
}
1089
1090
void WritableCatalogManager::FixWeight(WritableCatalog* catalog) {
1091
  // firstly check underflow because they can provoke overflows
1092
  if (catalog->GetNumEntries() < min_weight_ &&
1093
      !catalog->IsRoot() &&
1094
      catalog->IsAutogenerated()) {
1095
    LogCvmfs(kLogCatalog, kLogStdout,
1096
             "Deleting an autogenerated catalog in '%s'",
1097
             catalog->mountpoint().c_str());
1098
    // Remove the .cvmfscatalog and .cvmfsautocatalog files first
1099
    string path = catalog->mountpoint().ToString();
1100
    catalog->RemoveEntry(path + "/.cvmfscatalog");
1101
    catalog->RemoveEntry(path + "/.cvmfsautocatalog");
1102
    // Remove the actual catalog
1103
    string catalog_path = catalog->mountpoint().ToString().substr(1);
1104
    RemoveNestedCatalog(catalog_path);
1105
  } else if (catalog->GetNumEntries() > max_weight_) {
1106
    CatalogBalancer<WritableCatalogManager> catalog_balancer(this);
1107
    catalog_balancer.Balance(catalog);
1108
  }
1109
}
1110
1111
1112
//****************************************************************************
1113
// Workaround -- Serialized Catalog Committing
1114
1115
int WritableCatalogManager::GetModifiedCatalogsRecursively(
1116
  const Catalog *catalog,
1117
  WritableCatalogList *result) const
1118
{
1119
  // A catalog must be snapshot, if itself or one of it's descendants is dirty.
1120
  // So we traverse the catalog tree recursively and look for dirty catalogs
1121
  // on the way.
1122
  const WritableCatalog *wr_catalog =
1123
    static_cast<const WritableCatalog *>(catalog);
1124
  // This variable will contain the number of dirty catalogs in the sub tree
1125
  // with *catalog as it's root.
1126
  int dirty_catalogs = (wr_catalog->IsDirty()) ? 1 : 0;
1127
1128
  // Look for dirty catalogs in the descendants of *catalog
1129
  CatalogList children = wr_catalog->GetChildren();
1130
  for (CatalogList::const_iterator i = children.begin(), iEnd = children.end();
1131
       i != iEnd; ++i)
1132
  {
1133
    dirty_catalogs += GetModifiedCatalogsRecursively(*i, result);
1134
  }
1135
1136
  // If we found a dirty catalog in the checked sub tree, the root (*catalog)
1137
  // must be snapshot and ends up in the result list
1138
  if (dirty_catalogs > 0)
1139
    result->push_back(const_cast<WritableCatalog *>(wr_catalog));
1140
1141
  // tell the upper layer about number of catalogs
1142
  return dirty_catalogs;
1143
}
1144
1145
1146
void WritableCatalogManager::CatalogUploadSerializedCallback(
1147
  const upload::SpoolerResult &result,
1148
  const CatalogUploadContext unused)
1149
{
1150
  if (result.return_code != 0) {
1151
    LogCvmfs(kLogCatalog, kLogStderr, "failed to upload '%s' (retval: %d)",
1152
             result.local_path.c_str(), result.return_code);
1153
    assert(false);
1154
  }
1155
  unlink(result.local_path.c_str());
1156
}
1157
1158
1159
WritableCatalogManager::CatalogInfo
1160
WritableCatalogManager::SnapshotCatalogsSerialized(
1161
  const bool stop_for_tweaks)
1162
{
1163
  LogCvmfs(kLogCvmfs, kLogStdout, "Serialized committing of file catalogs...");
1164
  reinterpret_cast<WritableCatalog *>(GetRootCatalog())->SetDirty();
1165
  WritableCatalogList catalogs_to_snapshot;
1166
  GetModifiedCatalogs(&catalogs_to_snapshot);
1167
  CatalogUploadContext unused;
1168
  unused.root_catalog_info = NULL;
1169
  unused.stop_for_tweaks = false;
1170
  spooler_->RegisterListener(
1171
    &WritableCatalogManager::CatalogUploadSerializedCallback, this, unused);
1172
1173
  CatalogInfo root_catalog_info;
1174
  WritableCatalogList::const_iterator i = catalogs_to_snapshot.begin();
1175
  const WritableCatalogList::const_iterator iend = catalogs_to_snapshot.end();
1176
  for (; i != iend; ++i) {
1177
    FinalizeCatalog(*i, stop_for_tweaks);
1178
1179
    // Compress and upload catalog
1180
    shash::Any hash_catalog(spooler_->GetHashAlgorithm(),
1181
                            shash::kSuffixCatalog);
1182
    if (!zlib::CompressPath2Null((*i)->database_path(),
1183
                                 &hash_catalog))
1184
    {
1185
      PrintError("could not compress catalog " + (*i)->mountpoint().ToString());
1186
      assert(false);
1187
    }
1188
1189
    int64_t catalog_size = GetFileSize((*i)->database_path());
1190
    assert(catalog_size > 0);
1191
1192
    if ((*i)->HasParent()) {
1193
      LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating nested catalog link");
1194
      WritableCatalog *parent = (*i)->GetWritableParent();
1195
      parent->UpdateNestedCatalog((*i)->mountpoint().ToString(), hash_catalog,
1196
                                  catalog_size, (*i)->delta_counters_);
1197
      (*i)->delta_counters_.SetZero();
1198
    } else if ((*i)->IsRoot()) {
1199
      root_catalog_info.size = catalog_size;
1200
      root_catalog_info.ttl = (*i)->GetTTL();
1201
      root_catalog_info.content_hash = hash_catalog;
1202
      root_catalog_info.revision = (*i)->GetRevision();
1203
    } else {
1204
      assert(false && "inconsistent state detected");
1205
    }
1206
1207
    spooler_->ProcessCatalog((*i)->database_path());
1208
  }
1209
  spooler_->WaitForUpload();
1210
1211
  spooler_->UnregisterListeners();
1212
  return root_catalog_info;
1213
}
1214
1215

45
}  // namespace catalog